Commit fa005a26 by baiquan

add upload_video

parent 2f91b362
......@@ -23,13 +23,17 @@ class SyncShopInfoRequest(BaseModel):
class CreateTemplateRequest(BaseModel):
cookies: dict
template_params: dict
proxies: dict = None
proxies: dict
class DoudianLoginRequest(BaseModel):
account: str
password: str
headers: dict
proxies: dict = None
proxies: dict
class DoudianUploadVideoRequest(BaseModel):
task: dict
@app.post(
......@@ -135,6 +139,26 @@ def doudian_login(req: DoudianLoginRequest):
'error_type': 'TimeoutError'
}
@sync_router.post(
"/upload_video",
status_code=status.HTTP_200_OK,
summary="抖店上传视频",
)
def doudian_upload_video(req: DoudianUploadVideoRequest):
try:
task = celery_app.send_task('doudian_upload_video', kwargs=req.dict())
result = task.get(timeout=60)
logger.info(f"任务结果: {result}")
return result
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
return {
'code': 504,
'msg': '请求超时',
'data': None,
'error_type': 'TimeoutError'
}
# 注册路由
app.include_router(sync_router)
......
from celery import shared_task
from upload_video import upload_video
from login import login
from errors import *
from hub_ import *
......@@ -123,4 +124,32 @@ def execute_doudian_login(account: str, password: str, headers: dict, proxies: d
'msg': f'登录异常:{e}',
'data': None,
'error_type': 'InternalError'
}
@shared_task(name='doudian_upload_video')
def execute_doudian_upload_video(task: dict):
"""上传视频"""
try:
result = upload_video(task)
return {
'code': 200,
'msg': 'success',
'data': result,
'error_type': ''
}
except AppError as e:
logger.error(f'上传视频异常:{e}')
return {
'code': e.code,
'msg': e.msg,
'data': e.data,
'error_type': type(e).__name__
}
except Exception as e:
logger.error(f'上传视频异常:{e}')
return {
'code': 500,
'msg': f'上传视频异常:{e}',
'data': None,
'error_type': 'InternalError'
}
\ No newline at end of file
import hashlib
import hmac
import json
import os
import random
import time
import urllib.parse
import uuid
import zlib
from datetime import datetime
import cv2
import requests
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_fixed
from tqdm import tqdm
DOMAIN = "http://20tools.net"
HEADERS = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization-Code": "",
}
VIDEO_PATH = "./video"
DEFAULT_HEADER = {
'accept': '*/*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'content-type': 'application/json',
'origin': 'https://fxg.jinritemai.com',
'priority': 'u=1, i',
'referer': 'https://fxg.jinritemai.com',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
}
def download_video(url: str, file_: str,headers: dict, proxies: dict):
"""
下载视频
:param proxies:
:param headers:
:param url: 视频地址
:param file_: 文件地址
:return:
"""
response = requests.get(url, verify=False, stream=True, headers=headers, proxies=proxies)
with open(file_, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk: f.write(chunk)
# 判断文件是否下载完成
file_size = os.path.getsize(file_)
if file_size > 10000:
logger.success(f"下载完成: {file_}")
else:
raise Exception(f"下载失败: {file_}")
def get_video_duration(filename):
cap = cv2.VideoCapture(filename)
if cap.isOpened():
rate = cap.get(5)
frame_num = cap.get(7)
duration = frame_num / rate
return duration
return -1
def random_s():
"""
随机字符串
:return:
"""
digits = '0123456789'
ascii_letters = 'abcdefghigklmnopqrstuvwxyz'
l_rand = digits + ascii_letters
str_list = [random.choice(l_rand) for _ in range(11)]
random_str = ''.join(str_list)
return random_str
def video_split(filePath, chunk_size=3):
"""
视频切片
:param filePath:
:param chunk_size:
:return:
"""
chunk_size_bytes = chunk_size * 1024 * 1024
with open(filePath, 'rb') as file:
file_data = file.read()
total_slices = (len(file_data) + chunk_size_bytes - 1) // chunk_size_bytes
slices = []
chunks = {}
for i in tqdm(range(total_slices), desc='切片'):
start = i * chunk_size_bytes
end = min((i + 1) * chunk_size_bytes, len(file_data))
data = file_data[start:end]
crc32 = hex(zlib.crc32(data) & 0xFFFFFFFF)[2:].zfill(8)
# 每一段视频的请求参数信息
slice_info = {
'part_number': i + 1, # 当前视频切片的顺序
'part_offset': start, # 上传文件的写入地址
"crc32": crc32
}
chunks[start] = data
slices.append(slice_info)
return slices, chunks
class AWSV4Signer:
def __init__(self, credentials):
self.algorithm = "AWS4-HMAC-SHA256"
self.v4_identifier = "aws4_request"
self.constants = {
"date_header": "X-Amz-Date",
"token_header": "x-amz-security-token",
"content_sha256_header": "X-Amz-Content-Sha256",
"k_date_prefix": "AWS4",
"unsigned_headers": [
"authorization", "content-type", "content-length",
"user-agent", "presigned-expires", "expect", "x-amzn-trace-id"
]
}
self.credentials = credentials
def _hmac_sha256(self, key, message, is_hex=False):
if isinstance(key, str):
key = key.encode('utf-8')
if isinstance(message, str):
message = message.encode('utf-8')
digest = hmac.new(key, message, hashlib.sha256).digest()
return digest.hex() if is_hex else digest
def _sha256(self, data):
return hashlib.sha256(data.encode('utf-8')).hexdigest()
def _get_signing_key(self, date_stamp):
k_date = self._hmac_sha256(
f"{self.constants['k_date_prefix']}{self.credentials['secretAccessKey']}",
date_stamp
)
k_region = self._hmac_sha256(k_date, "cn-north-1")
k_service = self._hmac_sha256(k_region, "vod")
return self._hmac_sha256(k_service, self.v4_identifier)
def _uri_encode(self, s):
return urllib.parse.quote(s, safe="-_.~%") # Keep Python's default safe chars
def _canonical_query_string(self, params):
encoded_params = []
for key in sorted(params):
value = params[key]
if value is None:
continue
encoded_key = self._uri_encode(key)
if isinstance(value, list):
encoded_values = sorted([self._uri_encode(v) for v in value])
encoded_params.append(f"{encoded_key}={','.join(encoded_values)}")
else:
encoded_params.append(f"{encoded_key}={self._uri_encode(str(value))}")
return '&'.join(encoded_params)
def _is_signable_header(self, header):
header_lower = header.lower()
return header_lower.startswith("x-amz-") or header_lower not in self.constants["unsigned_headers"]
def _canonical_headers(self, headers):
headers_list = []
for header in sorted(headers.keys(), key=lambda x: x.lower()):
if self._is_signable_header(header):
value = ' '.join(str(headers[header]).strip().split())
headers_list.append(f"{header.lower()}:{value}")
return '\n'.join(headers_list)
def _signed_headers(self, headers):
signable = [h.lower() for h in headers if self._is_signable_header(h.lower())]
return ';'.join(sorted(signable))
def _create_scope(self, date_stamp):
return f"{date_stamp[:8]}/cn-north-1/vod/{self.v4_identifier}"
def _hex_encoded_body_hash(self, headers, data):
if self.constants["content_sha256_header"] in headers:
return headers[self.constants["content_sha256_header"]]
return self._sha256(self._canonical_query_string(data or {}))
def _canonical_request(self, method, params, headers, data):
return '\n'.join([
method,
'/',
self._canonical_query_string(params),
self._canonical_headers(headers) + '\n',
self._signed_headers(headers),
self._hex_encoded_body_hash(headers, data)
])
def _string_to_sign(self, timestamp, canonical_request):
scope = self._create_scope(timestamp)
return '\n'.join([
self.algorithm,
timestamp,
scope,
self._sha256(canonical_request)
])
def calculate_signature(self, method, params, headers, data=''):
timestamp = headers[self.constants["date_header"]]
canonical_req = self._canonical_request(method, params, headers, data)
string_to_sign = self._string_to_sign(timestamp, canonical_req)
signing_key = self._get_signing_key(timestamp[:8])
return self._hmac_sha256(signing_key, string_to_sign, is_hex=True)
class Upload:
def __init__(self, task):
self.slices = None
self.session_key = None
self.upload_id = None
self.upload_nodes = None
self.token = None
self.secret_access_key = None
self.session_token = None
self.access_key_id = None
self.file_size = None
self.proxies = task.get("proxies")
self.cookies = task.get('cookie')
self.headers = task.get('headers')
self.file_path_ = task.get('file_path_')
# @retry(stop=stop_after_attempt(5), wait=wait_fixed(1))
def get_upload_token(self):
"""
获取上传token
:return:
"""
headers = {**DEFAULT_HEADER, **self.headers}
print(self.cookies)
print(headers)
url = 'https://fxg.jinritemai.com/product/video/uploadVideoToken?type=video&useBoe=false&appid=1&_bid=ffa_goods'
response = requests.get(url,cookies=self.cookies,headers=headers)
print(response.text)
upload_token = response.json()['data']['auth_token']
if upload_token:
return upload_token
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1))
def get_upload_nodes(self):
"""
获取上传节点
:return:
"""
iso_8601 = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
params = {
"Action": "ApplyUploadInner",
"Version": "2020-11-19",
"SpaceName": "shop_center",
"FileType": "video",
"IsInner": 1,
"FileSize": self.file_size,
"s": random_s()
}
signature_header = {
'X-Amz-Date': iso_8601,
'x-amz-security-token': self.session_token,
}
signer = AWSV4Signer(self.token)
signature = signer.calculate_signature("GET", params, signature_header)
authorization = f'AWS4-HMAC-SHA256 Credential={self.access_key_id}/{iso_8601[:8]}/cn-north-1/vod/aws4_request, SignedHeaders=x-amz-date;x-amz-security-token, Signature={signature}'
headers = {
'accept': '*/*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'authorization': authorization,
'origin': 'https://fxg.jinritemai.com',
'priority': 'u=1, i',
'referer': 'https://fxg.jinritemai.com',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'x-amz-date': iso_8601,
'x-amz-security-token': self.session_token,
}
headers = {**headers, **self.headers}
response = requests.get('https://vod.bytedanceapi.com/', params=params, headers=headers, proxies=self.proxies, timeout=5)
upload_data = response.json()
upload_nodes = upload_data['Result']['InnerUploadAddress']['UploadNodes']
if upload_nodes:
return upload_nodes
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1))
def upload_video_init(self):
"""
上传视频初始化
:return:
"""
for upload_node in self.upload_nodes:
try:
auth = upload_node['StoreInfos'][0]['Auth']
store_uri = upload_node['StoreInfos'][0]['StoreUri']
session_key = upload_node['SessionKey']
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Authorization': auth,
'Connection': 'keep-alive',
'Content-Type': 'multipart/form-data; boundary=----WebKitFormBoundaryQa2RZl128VYwAqKv',
'Origin': 'https://fxg.jinritemai.com',
'Referer': 'https://fxg.jinritemai.com',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'X-Storage-U': '',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
headers = {**headers, **self.headers}
params = {
'uploadmode': 'part',
'phase': 'init',
}
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}?uploads'
response = requests.post(url, params=params, headers=headers, proxies=self.proxies, timeout=5)
upload_id = response.json()['payload']['uploadID']
if upload_id:
return upload_id, session_key, auth, store_uri
continue
except Exception as e:
logger.error(f'upload_video_init error {e}')
continue
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1))
def upload_video_split(self, slice_, chunks, auth, store_uri):
"""
上传视频
:param slice_:
:param chunks:
:param auth:
:param store_uri:
:return:
"""
crc32 = slice_['crc32']
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Authorization': auth,
'Connection': 'keep-alive',
'Content-CRC32': crc32,
'Content-Disposition': 'attachment; filename="undefined"',
'Content-Type': 'application/octet-stream',
'Origin': 'https://fxg.jinritemai.com',
'Referer': 'https://fxg.jinritemai.com',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'X-Storage-U': '',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
headers = {**headers, **self.headers}
params = {
'uploadID': self.upload_id,
'partNumber': slice_['part_number'],
}
data = chunks[slice_['part_offset']]
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}'
response = requests.post(url, headers=headers, data=data, params=params, proxies=self.proxies, timeout=60)
if response.json()['success'] == 0 and response.text:
return response
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1))
def upload_video_finish(self, auth, store_uri):
"""
上传视频完成
:param auth:
:param store_uri:
:return:
"""
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Authorization': auth,
'Connection': 'keep-alive',
'Content-Type': 'text/plain;charset=UTF-8',
'Origin': 'https://fxg.jinritemai.com',
'Referer': 'https://fxg.jinritemai.com',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'X-Storage-U': '',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
headers = {**headers, **self.headers}
params = {
'uploadID': self.upload_id,
}
data = ','.join([f"{s['part_number']}:{s['crc32']}" for s in self.slices])
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}'
response = requests.post(url, params=params, headers=headers, data=data, proxies=self.proxies, timeout=5)
if response.json()['success'] == 0:
return response
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1))
def get_upload_result(self):
"""
获取上传结果
:return:
"""
p = {
"SessionKey": self.session_key,
"Functions": []
}
data = json.dumps(p)
params = {
'Action': 'CommitUploadInner',
'Version': '2020-11-19',
'SpaceName': 'shop_center',
}
iso_8601 = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
signature_header = {
'X-Amz-Date': iso_8601,
'x-amz-security-token': self.session_token,
'X-Amz-Content-Sha256': hashlib.sha256(data.encode('utf-8')).hexdigest(),
}
signer = AWSV4Signer(self.token)
signature = signer.calculate_signature("POST", params, signature_header, data)
authorization = f'AWS4-HMAC-SHA256 Credential={self.access_key_id}/{iso_8601[:8]}/cn-north-1/vod/aws4_request, SignedHeaders=x-amz-content-sha256;x-amz-date;x-amz-security-token, Signature={signature}'
headers = {
'accept': '*/*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'authorization': authorization,
'content-type': 'text/plain;charset=UTF-8',
'origin': 'https://fxg.jinritemai.com',
'priority': 'u=1, i',
'referer': 'https://fxg.jinritemai.com',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'x-amz-content-sha256': hashlib.sha256(data.encode('utf-8')).hexdigest(),
'x-amz-date': iso_8601,
'x-amz-security-token': self.session_token,
}
headers = {**headers, **self.headers}
response = requests.post('https://vod.bytedanceapi.com/', params=params, headers=headers,
data=data, proxies=self.proxies, timeout=5)
if response.json()['Result']['Results'][0]:
return response.json()['Result']['Results'][0]
def run(self):
"""
运行
:return:
"""
print(self.file_path_)
self.file_size = os.path.getsize(self.file_path_)
logger.info("开始获取上传token")
upload_token = self.get_upload_token()
logger.info("获取上传token成功")
self.access_key_id = upload_token['AccessKeyID']
self.session_token = upload_token['SessionToken']
self.secret_access_key = upload_token['SecretAccessKey']
self.token = {
"accessKeyId": self.access_key_id,
"secretAccessKey": self.secret_access_key,
"sessionToken": self.session_token
}
logger.info("开始获取上传节点")
self.upload_nodes = self.get_upload_nodes()
logger.info("获取上传节点成功")
self.upload_id, self.session_key, auth, store_uri = self.upload_video_init()
logger.info("开始上传视频")
self.slices, chunks = video_split(self.file_path_)
for slice_ in self.slices:
self.upload_video_split(slice_, chunks, auth, store_uri)
logger.info("上传视频成功")
self.upload_video_finish(auth, store_uri)
video_id = self.get_upload_result()['Vid']
return video_id
def upload_video(task):
if not os.path.exists(VIDEO_PATH):
os.makedirs(VIDEO_PATH)
# 下载视频
file_name = f"{task['file_name']}"
# 判断文件是否存在
file_path_ = os.path.join(VIDEO_PATH, file_name)
if not os.path.exists(file_path_):
logger.info(f"文件 {file_name} 不存在,开始下载")
download_video(task['video_url'], file_path_, headers=task['headers'], proxies=task['proxies'])
logger.success("下载完成")
video_duration = get_video_duration(file_path_)
if video_duration > 60:
logger.error("视频时长大于60秒,上传失败")
raise Exception("视频时长大于60秒,上传失败")
else:
logger.info(f"文件 {file_name} 已存在,跳过下载")
task['file_path_'] = file_path_
upload = Upload(task)
video_id = upload.run()
logger.success(f"上传成功,视频ID为:{video_id}")
result = {
'video_id': video_id,
'file_path': file_path_,
}
return result
if __name__ == '__main__':
task = {
'video_url': 'https://cloud.video.taobao.com/play/u/2200778999140/p/2/e/6/t/1/504992565058.mp4?appKey=38829',
'file_name': '8d481cb3103b082871bc20dc50fba5f4.mp4',
'proxies': {
"http": "socks5h://UKZ5TYSJUQNB:3X4Q79VMDPK0@58.251.251.226:9001",
"https": "socks5h://UKZ5TYSJUQNB:3X4Q79VMDPK0@58.251.251.226:9001"
},
'cookie': {
'PHPSESSID': 'ea6b555fb9074efe8cda5f15a4e1095e',
'PHPSESSID_SS': 'ea6b555fb9074efe8cda5f15a4e1095e',
},
'headers': {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"'
},
}
print(upload_video(task))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment