Commit 2e6d0910 by baiquan

使用多线程上传视频切片

parent 1505180d
......@@ -3,18 +3,19 @@ import hmac
import json
import os
import random
import threading
import urllib.parse
import zlib
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from queue import Queue
import cv2
import requests
from loguru import logger
from tqdm import tqdm
from service.doudian_request import doudian_request
from utils.common import check_proxy
HEADERS = {
"Content-Type": "application/json",
......@@ -38,6 +39,10 @@ DEFAULT_HEADER = {
'sec-fetch-site': 'same-origin',
}
# 配置线程池参数
MAX_WORKERS = 5 # 最大并发线程数
RETRY_COUNT = 2 # 单个分片重试次数
def download_video(url: str, file_: str,headers: dict):
"""
下载视频
......@@ -84,36 +89,73 @@ def random_s():
return random_str
def video_split(filePath, chunk_size=3):
def video_split(file_path, max_workers=MAX_WORKERS):
"""
视频切片
:param filePath:
:param chunk_size:
视频切片,根据线程数优化分片大小
:param file_path:
:param max_workers:
:return:
"""
chunk_size_bytes = chunk_size * 1024 * 1024
with open(filePath, 'rb') as file:
with open(file_path, 'rb') as file:
file_data = file.read()
file_size = len(file_data)
# 配置分片大小参数
MIN_CHUNK_SIZE = 2 * 1024 * 1024 # 2MB最小分片
MAX_CHUNK_SIZE = 10 * 1024 * 1024 # 10MB最大分片
BASE_MULTIPLIER = 1.2 # 基本倍数缓冲系数
# 计算基础分片大小
base_chunk_size = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, file_size // max_workers))
# 计算优化的分片数量和大小
# 确保分片数量是线程数的整数倍
num_chunks = max_workers * max(1, round(file_size / (max_workers * base_chunk_size * BASE_MULTIPLIER)))
# 重新计算实际分片大小(向上取整)
chunk_size_bytes = (file_size + num_chunks - 1) // num_chunks
# 确保分片大小在限制范围内
chunk_size_bytes = max(MIN_CHUNK_SIZE, min(MAX_CHUNK_SIZE, chunk_size_bytes))
# 重新计算实际分片数量(向上取整)
total_slices = (file_size + chunk_size_bytes - 1) // chunk_size_bytes
# 确保分片数量是线程数的整数倍
if total_slices % max_workers != 0:
total_slices = ((total_slices // max_workers) + 1) * max_workers
logger.debug(f"文件大小: {file_size} bytes | 线程数: {max_workers} | "
f"分片大小: {chunk_size_bytes} bytes | 分片数量: {total_slices}")
total_slices = (len(file_data) + chunk_size_bytes - 1) // chunk_size_bytes
slices = []
chunks = {}
for i in tqdm(range(total_slices), desc='切片'):
for i in tqdm(range(total_slices), desc='优化切片'):
start = i * chunk_size_bytes
end = min((i + 1) * chunk_size_bytes, len(file_data))
data = file_data[start:end]
end = min((i + 1) * chunk_size_bytes, file_size)
# 对于超出文件长度的分片,数据为空
if start < file_size:
data = file_data[start:end]
else:
data = b'' # 超出文件长度的空分片
# 生成CRC32校验码(仅当有实际数据时)
crc32 = hex(zlib.crc32(data) & 0xFFFFFFFF)[2:].zfill(8) if data else '00000000'
crc32 = hex(zlib.crc32(data) & 0xFFFFFFFF)[2:].zfill(8)
# 每一段视频的请求参数信息
slice_info = {
'part_number': i + 1, # 当前视频切片的顺序
'part_offset': start, # 上传文件的写入地址
"crc32": crc32
'part_number': i + 1, # 分片序号从1开始
'part_offset': start, # 在文件中的起始位置
"crc32": crc32, # 数据校验码
"data_size": len(data) # 实际数据长度
}
chunks[start] = data
# 存储分片数据(仅当有实际数据时)
if data:
chunks[start] = data
slices.append(slice_info)
return slices, chunks
......@@ -223,11 +265,14 @@ class AWSV4Signer:
class Upload:
def __init__(self, task):
self.video_id = None
self.store_uri = None
self.auth = None
self.play_auth_token = None
self.slices = None
self.session_key = None
self.upload_id = None
self.upload_nodes = None
self.token = None
self.secret_access_key = None
self.session_token = None
self.access_key_id = None
......@@ -236,12 +281,10 @@ class Upload:
self.proxies = task.get("proxies")
self.cookies = task.get('cookie')
self.headers = task.get('headers')
self.file_path_ = task.get('file_path_')
def get_upload_token(self, upload_id = None):
"""
获取上传token
:return:
"""
headers = {**DEFAULT_HEADER, **self.headers}
......@@ -256,12 +299,17 @@ class Upload:
}
url = f'https://fxg.jinritemai.com/product/video/uploadVideoToken'
response = doudian_request("GET", url, proxies=self.proxies, cookies=self.cookies, headers=headers, params=params)
return response['data']
logger.info("获取上传token成功")
upload_token = response['data']['auth_token']
self.access_key_id = upload_token['AccessKeyID']
self.session_token = upload_token['SessionToken']
self.secret_access_key = upload_token['SecretAccessKey']
if upload_id:
self.play_auth_token = response['data']['play_auth_token']
def get_upload_nodes(self):
"""
获取上传节点
:return:
"""
iso_8601 = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
......@@ -278,7 +326,11 @@ class Upload:
'X-Amz-Date': iso_8601,
'x-amz-security-token': self.session_token,
}
signer = AWSV4Signer(self.token)
signer = AWSV4Signer({
"accessKeyId": self.access_key_id,
"secretAccessKey": self.secret_access_key,
"sessionToken": self.session_token
})
signature = signer.calculate_signature("GET", params, signature_header)
authorization = f'AWS4-HMAC-SHA256 Credential={self.access_key_id}/{iso_8601[:8]}/cn-north-1/vod/aws4_request, SignedHeaders=x-amz-date;x-amz-security-token, Signature={signature}'
headers = {
......@@ -300,12 +352,11 @@ class Upload:
response = doudian_request("GET",'https://vod.bytedanceapi.com/', params=params,cookies=self.cookies, headers=headers, proxies=self.proxies, match_str="UploadNodes")
upload_data = response['response_data']
upload_nodes = upload_data['Result']['InnerUploadAddress']['UploadNodes']
return upload_nodes
self.upload_nodes = upload_nodes
def upload_video_init(self):
"""
上传视频初始化
:return:
"""
for upload_node in self.upload_nodes:
......@@ -335,23 +386,21 @@ class Upload:
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}?uploads'
response = doudian_request("POST", url, cookies=self.cookies, headers=headers, proxies=self.proxies, params=params, match_str="uploadID")
upload_id = response['response_data']['payload']['uploadID']
return upload_id, session_key, auth, store_uri
self.upload_id, self.session_key, self.auth, self.store_uri = upload_id, session_key, auth, store_uri
def upload_video_split(self, slice_, chunks, auth, store_uri):
def upload_single_slice(self, slice_, chunk):
"""
上传视频
:param slice_:
:param chunks:
:param auth:
:param store_uri:
:param chunk:
:return:
"""
crc32 = slice_['crc32']
part_number = slice_['part_number']
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Authorization': auth,
'Authorization': self.auth,
'Connection': 'keep-alive',
'Content-CRC32': crc32,
'Content-Disposition': 'attachment; filename="undefined"',
......@@ -368,25 +417,27 @@ class Upload:
headers = {**headers, **self.headers}
params = {
'uploadID': self.upload_id,
'partNumber': slice_['part_number'],
'partNumber': part_number,
}
data = chunks[slice_['part_offset']]
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}'
response = doudian_request("POST", url, params=params, data=data, headers=headers,cookies=self.cookies, proxies=self.proxies)
return response
def upload_video_finish(self, auth, store_uri):
url = f'https://tos-d-ct-lf.snssdk.com/{self.store_uri}'
try:
doudian_request("POST", url, params=params, data=chunk, headers=headers, cookies=self.cookies,proxies=self.proxies)
return True
except requests.Timeout:
logger.warning(f"分片 {part_number} 上传超时")
return False
except Exception as e:
logger.error(f"分片 {part_number} 上传错误: {str(e)}")
return False
def upload_video_finish(self):
"""
上传视频完成
:param auth:
:param store_uri:
:return:
"""
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Authorization': auth,
'Authorization': self.auth,
'Connection': 'keep-alive',
'Content-Type': 'text/plain;charset=UTF-8',
'Origin': 'https://fxg.jinritemai.com',
......@@ -404,14 +455,13 @@ class Upload:
}
data = ','.join([f"{s['part_number']}:{s['crc32']}" for s in self.slices])
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}'
url = f'https://tos-d-ct-lf.snssdk.com/{self.store_uri}'
response = doudian_request("POST", url, params=params, cookies=self.cookies, headers=headers, data=data, proxies=self.proxies)
return response
def get_upload_result(self):
"""
获取上传结果
:return:
"""
p = {
......@@ -430,7 +480,11 @@ class Upload:
'x-amz-security-token': self.session_token,
'X-Amz-Content-Sha256': hashlib.sha256(data.encode('utf-8')).hexdigest(),
}
signer = AWSV4Signer(self.token)
signer = AWSV4Signer({
"accessKeyId": self.access_key_id,
"secretAccessKey": self.secret_access_key,
"sessionToken": self.session_token
})
signature = signer.calculate_signature("POST", params, signature_header, data)
authorization = f'AWS4-HMAC-SHA256 Credential={self.access_key_id}/{iso_8601[:8]}/cn-north-1/vod/aws4_request, SignedHeaders=x-amz-content-sha256;x-amz-date;x-amz-security-token, Signature={signature}'
headers = {
......@@ -454,9 +508,9 @@ class Upload:
url = 'https://vod.bytedanceapi.com/'
response = doudian_request("POST", url,cookies=self.cookies, params=params, headers=headers, data=data, proxies=self.proxies, match_str="RequestId")
response_data = response.get("response_data")
return response_data['Result']['Results'][0]
self.video_id = response_data['Result']['Results'][0]['Vid']
def change_video_status(self, video_id, video_name):
def change_video_status(self):
"""
更改上传视频状态
:return:
......@@ -468,10 +522,10 @@ class Upload:
json_data = {
'vid_list': [
video_id,
self.video_id,
],
'name_list': [
video_name,
self.file_name,
],
'appid': 1,
}
......@@ -479,13 +533,13 @@ class Upload:
response = doudian_request("POST", url, params=params, cookies=self.cookies, headers=self.headers, json=json_data, proxies=self.proxies)
logger.info(response)
def get_upload_video_info(self, video_id):
def get_upload_video_info(self):
"""
获取上传视频信息
:return:
"""
play_auth_token = self.get_upload_token(video_id)['play_auth_token']
url = f'https://vod.bytedanceapi.com/?{play_auth_token}'
self.get_upload_token(self.video_id)
url = f'https://vod.bytedanceapi.com/?{self.play_auth_token}'
response = doudian_request("GET", url, headers=self.headers, cookies=self.cookies, proxies=self.proxies, match_str="PlayInfoList")
logger.info(response)
response_data = response.get("response_data")
......@@ -494,49 +548,6 @@ class Upload:
return response_data['Result']['Data']['PlayInfoList'][0]
def run(self):
"""
运行
:return:
"""
proxies = self.proxies
addr = proxies['addr']
port = proxies['port']
username = proxies['username']
proxies_password = proxies['password']
proxy_url = f"socks5h://{username}:{proxies_password}@{addr}:{port}"
proxies = check_proxy(proxy_url)
self.proxies = proxies
self.file_size = os.path.getsize(self.file_path_)
logger.info("开始获取上传token")
upload_token = self.get_upload_token()['auth_token']
logger.info("获取上传token成功")
self.access_key_id = upload_token['AccessKeyID']
self.session_token = upload_token['SessionToken']
self.secret_access_key = upload_token['SecretAccessKey']
self.token = {
"accessKeyId": self.access_key_id,
"secretAccessKey": self.secret_access_key,
"sessionToken": self.session_token
}
logger.info("开始获取上传节点")
self.upload_nodes = self.get_upload_nodes()
logger.info("获取上传节点成功")
self.upload_id, self.session_key, auth, store_uri = self.upload_video_init()
logger.info("开始上传视频")
self.slices, chunks = video_split(self.file_path_)
for slice_ in self.slices:
self.upload_video_split(slice_, chunks, auth, store_uri)
logger.info("视频已上传")
self.upload_video_finish(auth, store_uri)
video_id = self.get_upload_result()['Vid']
self.change_video_status(video_id, self.file_name)
logger.info("视频状态已更改")
video_info = self.get_upload_video_info(video_id)
return video_id, video_info
def check_video_aspect_ratio(video_path):
"""
检查视频文件的长宽比是否符合1:1、3:4或9:16的比例
......@@ -586,25 +597,68 @@ def check_video_aspect_ratio(video_path):
closest_ratio = min(target_ratios.items(), key=lambda x: abs(aspect_ratio - x[1]))
return False, aspect_ratio, closest_ratio[0]
def upload_video(task):
def upload_video_with_multithreading(task):
"""多线程视频上传主函数"""
# 准备视频文件(同原逻辑)
file_path = prepare_video_file(task)
# 初始化上传
upload = Upload(task)
upload.file_size = os.path.getsize(file_path)
upload.file_path = file_path
# 获取上传凭证和节点
logger.info("开始获取上传token")
upload.get_upload_token()
logger.info("开始获取上传节点")
upload.get_upload_nodes()
logger.info("获取上传节点成功")
upload.upload_video_init()
# 分片并创建上传队列
logger.info("开始上传视频")
upload.slices, chunks = video_split(file_path)
upload_queue = create_upload_queue(upload.slices, chunks)
# 多线程上传
completed = threaded_upload(upload_queue, upload, MAX_WORKERS)
if not completed:
raise Exception("部分分片上传失败")
logger.info("视频已上传")
# 完成上传流程
upload.upload_video_finish()
upload.get_upload_result()
# 后续处理
upload.change_video_status()
logger.info("视频状态已更改")
video_info = upload.get_upload_video_info()
result = {
'video_id': upload.video_id,
'video_info': video_info,
'file_path': file_path,
}
return result
def prepare_video_file(task):
"""准备视频文件"""
if not os.path.exists(VIDEO_PATH):
os.makedirs(VIDEO_PATH)
# 下载视频
file_name = f"{task['file_name']}"
# 判断文件是否存在
file_path_ = os.path.join(VIDEO_PATH, file_name)
if not os.path.exists(file_path_):
file_path = os.path.join(VIDEO_PATH, file_name)
if not os.path.exists(file_path):
logger.info(f"文件 {file_name} 不存在,开始下载")
download_video(task['video_url'], file_path_, headers=task['headers'])
logger.info("下载完成")
download_video(task['video_url'], file_path, headers=task['headers'])
else:
logger.info(f"文件 {file_name} 已存在,跳过下载")
video_duration = get_video_duration(file_path_)
video_duration = get_video_duration(file_path)
if video_duration > 60:
logger.error("视频时长大于60秒,上传失败")
raise Exception("视频时长大于60秒,上传失败")
is_valid, actual_ratio, best_match = check_video_aspect_ratio(file_path_)
is_valid, actual_ratio, best_match = check_video_aspect_ratio(file_path)
if is_valid:
logger.info(f"视频比例验证通过! ({best_match}, 实际比例: {actual_ratio:.4f})")
else:
......@@ -613,17 +667,71 @@ def upload_video(task):
logger.error(f"最接近的标准比例: {best_match}")
logger.error(f"请上传1:1、3:4或9:16比例的视频")
raise Exception("视频长宽比错误,请上传1:1比例或者3:4比例或者9:16比例的主图视频")
task['file_path_'] = file_path_
upload = Upload(task)
video_id, video_info = upload.run()
result = {
'video_id': video_id,
'video_info': video_info,
'file_path': file_path_,
}
return result
return file_path
def create_upload_queue(slices, chunks):
"""创建分片上传队列"""
queue = Queue()
for slice_info in slices:
queue.put({
'slice': slice_info,
'chunk': chunks[slice_info['part_offset']],
'attempt': 0 # 当前尝试次数
})
return queue
def threaded_upload(upload_queue, upload, max_workers):
"""多线程执行分片上传"""
success_count = 0
total_slices = upload_queue.qsize()
lock = threading.Lock()
exceptions = []
def worker():
nonlocal success_count
while True:
try:
task_item = upload_queue.get_nowait()
except:
break
try:
# 单分片上传逻辑
success = upload.upload_single_slice(task_item['slice'],task_item['chunk'])
with lock:
if success:
success_count += 1
logger.debug(f"分片 {task_item['slice']['part_number']}/{total_slices} 上传成功")
else:
# 失败重试逻辑
if task_item['attempt'] < RETRY_COUNT:
task_item['attempt'] += 1
upload_queue.put(task_item)
logger.warning(
f"分片 {task_item['slice']['part_number']} 上传失败,重试 {task_item['attempt']}/{RETRY_COUNT}")
else:
logger.error(f"分片 {task_item['slice']['part_number']} 失败次数超过上限")
except Exception as e:
exceptions.append(e)
logger.exception(f"分片 {task_item['slice']['part_number']} 上传异常")
finally:
upload_queue.task_done()
# 启动线程池
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for _ in range(max_workers):
executor.submit(worker)
# 等待所有任务完成
upload_queue.join()
if exceptions:
logger.error(f"上传过程中发生 {len(exceptions)} 个异常")
raise exceptions[0]
return success_count == total_slices
if __name__ == '__main__':
......@@ -648,8 +756,8 @@ if __name__ == '__main__':
"sid_tt": "70f5161b7b961c32c0d0c92b76e1e954",
"uid_tt": "31ac2392728b8f0638528e63720d63c6",
"uid_tt_ss": "31ac2392728b8f0638528e63720d63c6",
"PHPSESSID": "ea6b555fb9074efe8cda5f15a4e1095e",
"PHPSESSID_SS": "ea6b555fb9074efe8cda5f15a4e1095e",
"PHPSESSID": "dfb09ab6c168ddbb2fe1d6cb9f8c8a36",
"PHPSESSID_SS": "dfb09ab6c168ddbb2fe1d6cb9f8c8a36",
"ucas_c0": "CkEKBTEuMC4wEIqIgdL41uSGaBjmJiCb66DF883DAiiwITC-7dD8pMyQA0C6pbbABki62fLCBlCjvLzOwPzd-WdYbhIUQePNSvIzA2AqfMku-VVaYxocXyI",
"ucas_c0_ss": "CkEKBTEuMC4wEIqIgdL41uSGaBjmJiCb66DF883DAiiwITC-7dD8pMyQA0C6pbbABki62fLCBlCjvLzOwPzd-WdYbhIUQePNSvIzA2AqfMku-VVaYxocXyI",
"sid_guard": "70f5161b7b961c32c0d0c92b76e1e954%7C1745720009%7C5183999%7CThu%2C+26-Jun-2025+02%3A13%3A28+GMT",
......@@ -661,7 +769,7 @@ if __name__ == '__main__':
"ttwid": "1%7CMFdTuHX3M0W14Bao11G9c4hqdaJywwj6SGMPYOTz93o%7C1745735758%7Cfb86819121428a8f2806469e0343c58aea3fe6a904990a2646c5184180c5ae09"
},
"video_url": "https://cloud.video.taobao.com/play/u/2200778999140/p/2/e/6/t/1/504992565058.mp4?appKey=38829",
"file_name": "efad540ae5554568509afb543a861b3a_raw.mp4",
"file_name": "efdd2da320a8799a691358fe67facf35.mp4",
"proxies": {
"addr": "58.251.251.226",
"port": "9001",
......@@ -669,4 +777,4 @@ if __name__ == '__main__':
"password": "3X4Q79VMDPK0"
}
}
print(upload_video(task))
print(upload_video_with_multithreading(task))
......@@ -8,7 +8,7 @@ from service.doudian_service import generate_a_bogus, get_schema, create_global_
from service.hub_ import closeBrowser
from service.login import login
from service.sync_shop import syncShop, syncShopInfo
from service.upload_video import upload_video
from service.upload_video import upload_video_with_multithreading
from utils.errors import AppError
......@@ -136,7 +136,7 @@ def execute_doudian_login(account: str, password: str, headers: dict, proxies: d
def execute_doudian_upload_video(task: dict):
"""上传视频"""
try:
result = upload_video(task)
result = upload_video_with_multithreading(task)
logger.success(f'视频上传成功-->{result}')
return {
'code': 200,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment