Commit f901d7c0 by baiquan

增加对上传的视频进行检查是否损坏

parent 4a10b1b0
......@@ -44,117 +44,6 @@ DEFAULT_HEADER = {
MAX_WORKERS = 5 # 最大并发线程数
RETRY_COUNT = 2 # 单个分片重试次数
def download_video(url: str, file_: str,headers: dict):
"""
下载视频
:param headers:
:param url: 视频地址
:param file_: 文件地址
:return:
"""
response = requests.get(url, verify=False, stream=True, headers=headers)
with open(file_, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk: f.write(chunk)
# 判断文件是否下载完成
file_size = os.path.getsize(file_)
if file_size > 10000:
logger.info(f"下载完成: {file_}")
else:
raise Exception(f"下载失败: {file_}")
def get_video_duration(filename):
cap = cv2.VideoCapture(filename)
if cap.isOpened():
rate = cap.get(5)
frame_num = cap.get(7)
duration = frame_num / rate
return duration
return -1
def random_s():
"""
随机字符串
:return:
"""
digits = '0123456789'
ascii_letters = 'abcdefghigklmnopqrstuvwxyz'
l_rand = digits + ascii_letters
str_list = [random.choice(l_rand) for _ in range(11)]
random_str = ''.join(str_list)
return random_str
def video_split(file_path, max_workers=MAX_WORKERS):
"""
视频切片,根据线程数优化分片大小
:param file_path:
:param max_workers:
:return:
"""
with open(file_path, 'rb') as file:
file_data = file.read()
file_size = len(file_data)
# 配置分片大小参数
MIN_CHUNK_SIZE = 2 * 1024 * 1024 # 2MB最小分片
MAX_CHUNK_SIZE = 10 * 1024 * 1024 # 10MB最大分片
# 计算基础分片大小
base_chunk_size = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, file_size // max(1, max_workers)))
# 计算实际需要的分片数(向上取整)
total_slices_needed = (file_size + base_chunk_size - 1) // base_chunk_size
# 调整为线程数的倍数(但不超过文件大小限制)
total_slices = min(
max_workers * max(1, math.ceil(total_slices_needed / max_workers)),
math.ceil(file_size / MIN_CHUNK_SIZE) # 最大不超过按最小分片大小计算的数量
)
# 重新计算实际分片大小(向上取整)
chunk_size_bytes = (file_size + total_slices - 1) // total_slices
# 确保分片大小在限制范围内
chunk_size_bytes = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, chunk_size_bytes))
# 重新计算实际分片数量(向上取整)
total_slices = (file_size + chunk_size_bytes - 1) // chunk_size_bytes
logger.debug(f"文件大小: {file_size} bytes | 线程数: {max_workers} | "
f"分片大小: {chunk_size_bytes} bytes | 分片数量: {total_slices}")
slices = []
chunks = {}
for i in tqdm(range(total_slices), desc='优化切片'):
start = i * chunk_size_bytes
end = min((i + 1) * chunk_size_bytes, file_size)
# 仅当分片在文件范围内才读取数据
if start < file_size:
data = file_data[start:end]
crc32 = hex(zlib.crc32(data) & 0xFFFFFFFF)[2:].zfill(8)
else:
# 超出文件范围的分片跳过
continue
slice_info = {
'part_number': i + 1,
'part_offset': start,
"crc32": crc32,
"data_size": len(data)
}
chunks[start] = data
slices.append(slice_info)
return slices, chunks
class AWSV4Signer:
......@@ -546,13 +435,126 @@ class Upload:
return response_data['Result']['Data']['PlayInfoList'][0]
def download_video(url: str, file_: str,headers: dict):
"""
下载视频
:param headers:
:param url: 视频地址
:param file_: 文件地址
:return:
"""
response = requests.get(url, verify=False, stream=True, headers=headers)
with open(file_, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk: f.write(chunk)
# 判断文件是否下载完成
file_size = os.path.getsize(file_)
if file_size > 10000 and not is_video_corrupted(file_):
logger.info(f"下载完成: {file_}")
else:
raise Exception(f"下载失败: {file_}")
def get_video_duration(filename):
"""
获取视频时长
:param filename:
:return:
"""
cap = cv2.VideoCapture(filename)
if cap.isOpened():
rate = cap.get(5)
frame_num = cap.get(7)
duration = frame_num / rate
return duration
return -1
def random_s():
"""
随机字符串
:return:
"""
digits = '0123456789'
ascii_letters = 'abcdefghigklmnopqrstuvwxyz'
l_rand = digits + ascii_letters
str_list = [random.choice(l_rand) for _ in range(11)]
random_str = ''.join(str_list)
return random_str
def video_split(file_path, max_workers=MAX_WORKERS):
"""
视频切片,根据线程数优化分片大小
:param file_path:
:param max_workers:
:return:
"""
with open(file_path, 'rb') as file:
file_data = file.read()
file_size = len(file_data)
# 配置分片大小参数
MIN_CHUNK_SIZE = 2 * 1024 * 1024 # 2MB最小分片
MAX_CHUNK_SIZE = 10 * 1024 * 1024 # 10MB最大分片
# 计算基础分片大小
base_chunk_size = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, file_size // max(1, max_workers)))
# 计算实际需要的分片数(向上取整)
total_slices_needed = (file_size + base_chunk_size - 1) // base_chunk_size
# 调整为线程数的倍数(但不超过文件大小限制)
total_slices = min(
max_workers * max(1, math.ceil(total_slices_needed / max_workers)),
math.ceil(file_size / MIN_CHUNK_SIZE) # 最大不超过按最小分片大小计算的数量
)
# 重新计算实际分片大小(向上取整)
chunk_size_bytes = (file_size + total_slices - 1) // total_slices
# 确保分片大小在限制范围内
chunk_size_bytes = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, chunk_size_bytes))
# 重新计算实际分片数量(向上取整)
total_slices = (file_size + chunk_size_bytes - 1) // chunk_size_bytes
logger.debug(f"文件大小: {file_size} bytes | 线程数: {max_workers} | "
f"分片大小: {chunk_size_bytes} bytes | 分片数量: {total_slices}")
slices = []
chunks = {}
for i in tqdm(range(total_slices), desc='优化切片'):
start = i * chunk_size_bytes
end = min((i + 1) * chunk_size_bytes, file_size)
# 仅当分片在文件范围内才读取数据
if start < file_size:
data = file_data[start:end]
crc32 = hex(zlib.crc32(data) & 0xFFFFFFFF)[2:].zfill(8)
else:
# 超出文件范围的分片跳过
continue
slice_info = {
'part_number': i + 1,
'part_offset': start,
"crc32": crc32,
"data_size": len(data)
}
chunks[start] = data
slices.append(slice_info)
return slices, chunks
def check_video_aspect_ratio(video_path):
"""
检查视频文件的长宽比是否符合1:1、3:4或9:16的比例
参数:
video_path: 视频文件路径
返回:
tuple: (是否符合要求, 实际宽高比, 最接近的目标比例)
"""
......@@ -595,6 +597,24 @@ def check_video_aspect_ratio(video_path):
closest_ratio = min(target_ratios.items(), key=lambda x: abs(aspect_ratio - x[1]))
return False, aspect_ratio, closest_ratio[0]
def is_video_corrupted(file_path):
"""尝试打开视频并读取几帧来检查是否损坏"""
ret = None
try:
cap = cv2.VideoCapture(file_path)
if not cap.isOpened():
return True
# 尝试读取前10帧
for _ in range(10):
ret, frame = cap.read()
if not ret:
break
# 如果一帧都读不出来,视为损坏
cap.release()
return not ret # 如果最后一次读取失败,则返回True(损坏)
except Exception as e:
logger.error(f"视频检查异常: {e}")
return True
def upload_video_with_multithreading(task):
"""多线程视频上传主函数"""
......@@ -652,6 +672,9 @@ def prepare_video_file(task):
download_video(task['video_url'], file_path, headers=task['headers'])
else:
logger.info(f"文件 {file_name} 已存在,跳过下载")
if is_video_corrupted(file_path):
logger.error("视频文件已损坏,正在重新下载")
download_video(task['video_url'], file_path, headers=task['headers'])
video_duration = get_video_duration(file_path)
if video_duration > 60:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment