Commit 4a10b1b0 by baiquan

优化多线程上传视频切片

parent 08930045
import hashlib import hashlib
import hmac import hmac
import json import json
import math
import os import os
import random import random
import threading import threading
...@@ -96,32 +97,63 @@ def video_split(file_path, max_workers=MAX_WORKERS): ...@@ -96,32 +97,63 @@ def video_split(file_path, max_workers=MAX_WORKERS):
:param max_workers: :param max_workers:
:return: :return:
""" """
file_size = os.path.getsize(file_path)
min_chunk_size = 2 * 1024 * 1024 # 2MB
# 根据线程数和文件大小计算分片大小
chunk_size_bytes = max(min_chunk_size, int(file_size / max_workers))
with open(file_path, 'rb') as file: with open(file_path, 'rb') as file:
file_data = file.read() file_data = file.read()
file_size = len(file_data)
# 配置分片大小参数
MIN_CHUNK_SIZE = 2 * 1024 * 1024 # 2MB最小分片
MAX_CHUNK_SIZE = 10 * 1024 * 1024 # 10MB最大分片
# 计算基础分片大小
base_chunk_size = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, file_size // max(1, max_workers)))
# 计算实际需要的分片数(向上取整)
total_slices_needed = (file_size + base_chunk_size - 1) // base_chunk_size
# 调整为线程数的倍数(但不超过文件大小限制)
total_slices = min(
max_workers * max(1, math.ceil(total_slices_needed / max_workers)),
math.ceil(file_size / MIN_CHUNK_SIZE) # 最大不超过按最小分片大小计算的数量
)
# 重新计算实际分片大小(向上取整)
chunk_size_bytes = (file_size + total_slices - 1) // total_slices
# 确保分片大小在限制范围内
chunk_size_bytes = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, chunk_size_bytes))
# 重新计算实际分片数量(向上取整)
total_slices = (file_size + chunk_size_bytes - 1) // chunk_size_bytes
logger.debug(f"文件大小: {file_size} bytes | 线程数: {max_workers} | "
f"分片大小: {chunk_size_bytes} bytes | 分片数量: {total_slices}")
total_slices = (len(file_data) + chunk_size_bytes - 1) // chunk_size_bytes
slices = [] slices = []
chunks = {} chunks = {}
for i in tqdm(range(total_slices), desc='切片'): for i in tqdm(range(total_slices), desc='优化切片'):
start = i * chunk_size_bytes start = i * chunk_size_bytes
end = min((i + 1) * chunk_size_bytes, len(file_data)) end = min((i + 1) * chunk_size_bytes, file_size)
data = file_data[start:end]
# 仅当分片在文件范围内才读取数据
if start < file_size:
data = file_data[start:end]
crc32 = hex(zlib.crc32(data) & 0xFFFFFFFF)[2:].zfill(8) crc32 = hex(zlib.crc32(data) & 0xFFFFFFFF)[2:].zfill(8)
# 每一段视频的请求参数信息 else:
# 超出文件范围的分片跳过
continue
slice_info = { slice_info = {
'part_number': i + 1, # 当前视频切片的顺序 'part_number': i + 1,
'part_offset': start, # 上传文件的写入地址 'part_offset': start,
"crc32": crc32 "crc32": crc32,
"data_size": len(data)
} }
chunks[start] = data chunks[start] = data
slices.append(slice_info) slices.append(slice_info)
return slices, chunks return slices, chunks
...@@ -653,7 +685,6 @@ def threaded_upload(upload_queue, upload, max_workers): ...@@ -653,7 +685,6 @@ def threaded_upload(upload_queue, upload, max_workers):
"""多线程执行分片上传""" """多线程执行分片上传"""
success_count = 0 success_count = 0
total_slices = upload_queue.qsize() total_slices = upload_queue.qsize()
logger.info(f"开始上传 {total_slices} 个分片")
lock = threading.Lock() lock = threading.Lock()
exceptions = [] exceptions = []
...@@ -669,7 +700,6 @@ def threaded_upload(upload_queue, upload, max_workers): ...@@ -669,7 +700,6 @@ def threaded_upload(upload_queue, upload, max_workers):
try: try:
# 单分片上传逻辑 # 单分片上传逻辑
success = upload.upload_single_slice(task_item['slice'],task_item['chunk']) success = upload.upload_single_slice(task_item['slice'],task_item['chunk'])
logger.debug(f"分片 {task_item['slice']['part_number']}/{total_slices} 上传完成")
with lock: with lock:
if success: if success:
success_count += 1 success_count += 1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment