Commit 08930045 by baiquan

使用多线程上传视频切片

parent bcdda3a2
...@@ -96,66 +96,32 @@ def video_split(file_path, max_workers=MAX_WORKERS): ...@@ -96,66 +96,32 @@ def video_split(file_path, max_workers=MAX_WORKERS):
:param max_workers: :param max_workers:
:return: :return:
""" """
file_size = os.path.getsize(file_path)
min_chunk_size = 2 * 1024 * 1024 # 2MB
# 根据线程数和文件大小计算分片大小
chunk_size_bytes = max(min_chunk_size, int(file_size / max_workers))
with open(file_path, 'rb') as file: with open(file_path, 'rb') as file:
file_data = file.read() file_data = file.read()
file_size = len(file_data)
# 配置分片大小参数
MIN_CHUNK_SIZE = 2 * 1024 * 1024 # 2MB最小分片
MAX_CHUNK_SIZE = 10 * 1024 * 1024 # 10MB最大分片
BASE_MULTIPLIER = 1.2 # 基本倍数缓冲系数
# 计算基础分片大小
base_chunk_size = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, file_size // max_workers))
# 计算优化的分片数量和大小
# 确保分片数量是线程数的整数倍
num_chunks = max_workers * max(1, round(file_size / (max_workers * base_chunk_size * BASE_MULTIPLIER)))
# 重新计算实际分片大小(向上取整)
chunk_size_bytes = (file_size + num_chunks - 1) // num_chunks
# 确保分片大小在限制范围内
chunk_size_bytes = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, chunk_size_bytes))
# 重新计算实际分片数量(向上取整)
total_slices = (file_size + chunk_size_bytes - 1) // chunk_size_bytes
# 确保分片数量是线程数的整数倍
if total_slices % max_workers != 0:
total_slices = ((total_slices // max_workers) + 1) * max_workers
logger.debug(f"文件大小: {file_size} bytes | 线程数: {max_workers} | "
f"分片大小: {chunk_size_bytes} bytes | 分片数量: {total_slices}")
total_slices = (len(file_data) + chunk_size_bytes - 1) // chunk_size_bytes
slices = [] slices = []
chunks = {} chunks = {}
for i in tqdm(range(total_slices), desc='优化切片'): for i in tqdm(range(total_slices), desc='切片'):
start = i * chunk_size_bytes start = i * chunk_size_bytes
end = min((i + 1) * chunk_size_bytes, file_size) end = min((i + 1) * chunk_size_bytes, len(file_data))
data = file_data[start:end]
# 对于超出文件长度的分片,数据为空
if start < file_size:
data = file_data[start:end]
else:
data = b'' # 超出文件长度的空分片
# 生成CRC32校验码(仅当有实际数据时)
crc32 = hex(zlib.crc32(data) & 0xFFFFFFFF)[2:].zfill(8) if data else '00000000'
crc32 = hex(zlib.crc32(data) & 0xFFFFFFFF)[2:].zfill(8)
# 每一段视频的请求参数信息
slice_info = { slice_info = {
'part_number': i + 1, # 分片序号从1开始 'part_number': i + 1, # 当前视频切片的顺序
'part_offset': start, # 在文件中的起始位置 'part_offset': start, # 上传文件的写入地址
"crc32": crc32, # 数据校验码 "crc32": crc32
"data_size": len(data) # 实际数据长度
} }
chunks[start] = data
# 存储分片数据(仅当有实际数据时)
if data:
chunks[start] = data
slices.append(slice_info) slices.append(slice_info)
return slices, chunks return slices, chunks
...@@ -621,6 +587,7 @@ def upload_video_with_multithreading(task): ...@@ -621,6 +587,7 @@ def upload_video_with_multithreading(task):
upload.slices, chunks = video_split(file_path) upload.slices, chunks = video_split(file_path)
upload_queue = create_upload_queue(upload.slices, chunks) upload_queue = create_upload_queue(upload.slices, chunks)
# 多线程上传 # 多线程上传
logger.info("开始多线程上传")
completed = threaded_upload(upload_queue, upload, MAX_WORKERS) completed = threaded_upload(upload_queue, upload, MAX_WORKERS)
if not completed: if not completed:
raise Exception("部分分片上传失败") raise Exception("部分分片上传失败")
...@@ -686,6 +653,7 @@ def threaded_upload(upload_queue, upload, max_workers): ...@@ -686,6 +653,7 @@ def threaded_upload(upload_queue, upload, max_workers):
"""多线程执行分片上传""" """多线程执行分片上传"""
success_count = 0 success_count = 0
total_slices = upload_queue.qsize() total_slices = upload_queue.qsize()
logger.info(f"开始上传 {total_slices} 个分片")
lock = threading.Lock() lock = threading.Lock()
exceptions = [] exceptions = []
...@@ -694,12 +662,14 @@ def threaded_upload(upload_queue, upload, max_workers): ...@@ -694,12 +662,14 @@ def threaded_upload(upload_queue, upload, max_workers):
while True: while True:
try: try:
task_item = upload_queue.get_nowait() task_item = upload_queue.get_nowait()
logger.debug(f"开始处理分片 {task_item['slice']['part_number']}/{total_slices}")
except: except:
break break
try: try:
# 单分片上传逻辑 # 单分片上传逻辑
success = upload.upload_single_slice(task_item['slice'],task_item['chunk']) success = upload.upload_single_slice(task_item['slice'],task_item['chunk'])
logger.debug(f"分片 {task_item['slice']['part_number']}/{total_slices} 上传完成")
with lock: with lock:
if success: if success:
success_count += 1 success_count += 1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment