Commit 1505180d by baiquan

增加doudian_request

parent d7c649a4
......@@ -27,7 +27,7 @@ celery -A celery_app worker -l info -P gevent
### 启动API服务
```bash
uvicorn api:app --host 0.0.0.0 --port 9001 --reload
uvicorn api:app --host 0.0.0.0 --port 9001 --reload --timeout-keep-alive 120
```
import requests
from loguru import logger
from retrying import retry
from utils.errors import CookiesExpiredError
HEADERS = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'zh-CN,zh;q=0.9',
'content-type': 'application/json',
'origin': 'https://fxg.jinritemai.com',
'priority': 'u=1, i',
'referer': 'https://fxg.jinritemai.com',
'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
}
@retry(stop_max_attempt_number=3, wait_fixed=3000)
def doudian_request(method: str, url: str, proxies:dict, params:dict=None, data: str = None, json: dict = None,headers=None, cookies=None, match_str="") -> any:
if headers is None:
headers = HEADERS
logger.info(f'doudian_request-->{url}')
method = method.upper()
if method == 'GET':
response = requests.get(url, params=params, headers=headers, cookies=cookies, proxies=proxies)
elif method == 'POST':
response = requests.post(url, json=json, params=params, data=data, headers=headers, cookies=cookies, proxies=proxies)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
# 处理响应
if (response.status_code == 403 and "Forbidden" in response.text) or "登录信息已失效" in response.text or "无权访问此接口" in response.text:
raise CookiesExpiredError("请求失败-->cookies已过期!")
if response.text:
response_data = response.json()
if response_data.get('code') == 0 or response_data.get('status_code') == 0 or response_data.get('success') == 0:
# logger.info(f'send_request响应结果-->{response_data}')
return {
'data': response_data.get('data', {}),
'response_data': response_data
}
elif match_str and match_str in response.text:
return {
'data': response_data.get('data', {}),
'response_data': response_data
}
raise Exception(f"请求失败 --> {response.text}")
\ No newline at end of file
......@@ -2,10 +2,11 @@ import json
from urllib.parse import urlencode
import execjs
import requests
from loguru import logger
from service.doudian_request import doudian_request
from utils.common import check_proxy
def generate_a_bogus(params:str | dict,data: dict,ua: str, params_type: int=1):
"""生成加密参数"""
# 直接读取JS文件
......@@ -19,12 +20,8 @@ def generate_a_bogus(params:str | dict,data: dict,ua: str, params_type: int=1):
raise Exception('params_type error')
# 执行加密算法
ctx = execjs.compile(js_code)
logger.info(f'params: {params}')
logger.info(f'params_type: {type(params)}')
if type(params) == dict:
params = urlencode(params)
logger.info(f'data: {data}')
logger.info(f'data_type: {type(data)}')
data = json.dumps(data)
result = ctx.call('get_ab', params, data, ua)
return result
......@@ -55,7 +52,7 @@ def get_schema(headers: dict, proxy_url: str, category_id: str):
a_bogus = generate_a_bogus(params, json_data, ua)
params['a_bogus'] = a_bogus
proxies = check_proxy(proxy_url)
response = requests.post(url, params=params, headers=headers, json=json_data, verify=False, proxies=proxies)
response = doudian_request("POST", url, params=params, proxies=proxies, json=json_data, headers=headers)
return response
def create_global_promotion(params:str | dict,data: dict,headers: dict, proxy_url: str):
......@@ -70,5 +67,5 @@ def create_global_promotion(params:str | dict,data: dict,headers: dict, proxy_ur
params['a_bogus'] = a_bogus
url_params = urlencode(params)
url = 'https://qianchuan.jinritemai.com/ad/api/creation/v1/ad/create?' + url_params
response = requests.post(url, json=data, headers=headers, verify=False, proxies=proxies)
response = doudian_request("POST", url, proxies=proxies, json=data, headers=headers)
return response
......@@ -10,10 +10,10 @@ from datetime import datetime
import cv2
import requests
from loguru import logger
from retrying import retry
from tqdm import tqdm
from service.doudian_request import doudian_request
from utils.common import check_proxy
HEADERS = {
......@@ -55,7 +55,7 @@ def download_video(url: str, file_: str,headers: dict):
# 判断文件是否下载完成
file_size = os.path.getsize(file_)
if file_size > 10000:
logger.success(f"下载完成: {file_}")
logger.info(f"下载完成: {file_}")
else:
raise Exception(f"下载失败: {file_}")
......@@ -238,7 +238,6 @@ class Upload:
self.headers = task.get('headers')
self.file_path_ = task.get('file_path_')
@retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_upload_token(self, upload_id = None):
"""
获取上传token
......@@ -256,16 +255,9 @@ class Upload:
'appid': '1',
}
url = f'https://fxg.jinritemai.com/product/video/uploadVideoToken'
response = requests.get(url, cookies=self.cookies, headers=headers, params=params, proxies=self.proxies)
logger.info(response.json())
if "登录信息已失效" in response.text or response.json().get("code", -1) == "10008":
raise Exception(f'获取上传token失败: 登录信息已失效,请重新登录~')
upload_token = response.json()['data']
if upload_token:
return upload_token
response = doudian_request("GET", url, proxies=self.proxies, cookies=self.cookies, headers=headers, params=params)
return response['data']
@retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_upload_nodes(self):
"""
获取上传节点
......@@ -305,14 +297,11 @@ class Upload:
'x-amz-security-token': self.session_token,
}
headers = {**headers, **self.headers}
response = requests.get('https://vod.bytedanceapi.com/', params=params,cookies=self.cookies, headers=headers, timeout=5, proxies=self.proxies)
upload_data = response.json()
response = doudian_request("GET",'https://vod.bytedanceapi.com/', params=params,cookies=self.cookies, headers=headers, proxies=self.proxies, match_str="UploadNodes")
upload_data = response['response_data']
upload_nodes = upload_data['Result']['InnerUploadAddress']['UploadNodes']
if upload_nodes:
return upload_nodes
return upload_nodes
@retry(stop_max_attempt_number=3, wait_fixed=1000)
def upload_video_init(self):
"""
上传视频初始化
......@@ -320,41 +309,34 @@ class Upload:
:return:
"""
for upload_node in self.upload_nodes:
try:
auth = upload_node['StoreInfos'][0]['Auth']
store_uri = upload_node['StoreInfos'][0]['StoreUri']
session_key = upload_node['SessionKey']
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Authorization': auth,
'Connection': 'keep-alive',
'Content-Type': 'multipart/form-data; boundary=----WebKitFormBoundaryQa2RZl128VYwAqKv',
'Origin': 'https://fxg.jinritemai.com',
'Referer': 'https://fxg.jinritemai.com',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'X-Storage-U': '',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
headers = {**headers, **self.headers}
params = {
'uploadmode': 'part',
'phase': 'init',
}
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}?uploads'
response = requests.post(url, params=params ,cookies=self.cookies, headers=headers, timeout=5, proxies=self.proxies)
upload_id = response.json()['payload']['uploadID']
if upload_id:
return upload_id, session_key, auth, store_uri
continue
except Exception as e:
logger.error(f'upload_video_init error {e}')
continue
auth = upload_node['StoreInfos'][0]['Auth']
store_uri = upload_node['StoreInfos'][0]['StoreUri']
session_key = upload_node['SessionKey']
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Authorization': auth,
'Connection': 'keep-alive',
'Content-Type': 'multipart/form-data; boundary=----WebKitFormBoundaryQa2RZl128VYwAqKv',
'Origin': 'https://fxg.jinritemai.com',
'Referer': 'https://fxg.jinritemai.com',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'X-Storage-U': '',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
headers = {**headers, **self.headers}
params = {
'uploadmode': 'part',
'phase': 'init',
}
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}?uploads'
response = doudian_request("POST", url, cookies=self.cookies, headers=headers, proxies=self.proxies, params=params, match_str="uploadID")
upload_id = response['response_data']['payload']['uploadID']
return upload_id, session_key, auth, store_uri
@retry(stop_max_attempt_number=3, wait_fixed=1000)
def upload_video_split(self, slice_, chunks, auth, store_uri):
"""
上传视频
......@@ -390,11 +372,9 @@ class Upload:
}
data = chunks[slice_['part_offset']]
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}'
response = requests.post(url, headers=headers,cookies=self.cookies, data=data, params=params, timeout=60, proxies=self.proxies)
if response.json()['success'] == 0 and response.text:
return response
response = doudian_request("POST", url, params=params, data=data, headers=headers,cookies=self.cookies, proxies=self.proxies)
return response
@retry(stop_max_attempt_number=3, wait_fixed=1000)
def upload_video_finish(self, auth, store_uri):
"""
上传视频完成
......@@ -425,11 +405,9 @@ class Upload:
data = ','.join([f"{s['part_number']}:{s['crc32']}" for s in self.slices])
url = f'https://tos-d-ct-lf.snssdk.com/{store_uri}'
response = requests.post(url, params=params,cookies=self.cookies, headers=headers, data=data, timeout=5, proxies=self.proxies)
if response.json()['success'] == 0:
return response
response = doudian_request("POST", url, params=params, cookies=self.cookies, headers=headers, data=data, proxies=self.proxies)
return response
@retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_upload_result(self):
"""
获取上传结果
......@@ -473,11 +451,10 @@ class Upload:
'x-amz-security-token': self.session_token,
}
headers = {**headers, **self.headers}
response = requests.post('https://vod.bytedanceapi.com/',cookies=self.cookies, params=params, headers=headers,
data=data, timeout=5, proxies=self.proxies)
logger.info(response.text)
if response.json()['Result']['Results'][0]:
return response.json()['Result']['Results'][0]
url = 'https://vod.bytedanceapi.com/'
response = doudian_request("POST", url,cookies=self.cookies, params=params, headers=headers, data=data, proxies=self.proxies, match_str="RequestId")
response_data = response.get("response_data")
return response_data['Result']['Results'][0]
def change_video_status(self, video_id, video_name):
"""
......@@ -499,8 +476,8 @@ class Upload:
'appid': 1,
}
url = 'https://fxg.jinritemai.com/product/video/changeVideoStatus'
response = requests.post(url, params=params, cookies=self.cookies, headers=self.headers, json=json_data, proxies=self.proxies)
logger.info(response.text)
response = doudian_request("POST", url, params=params, cookies=self.cookies, headers=self.headers, json=json_data, proxies=self.proxies)
logger.info(response)
def get_upload_video_info(self, video_id):
"""
......@@ -508,11 +485,13 @@ class Upload:
:return:
"""
play_auth_token = self.get_upload_token(video_id)['play_auth_token']
response = requests.get(f'https://vod.bytedanceapi.com/?{play_auth_token}', headers=self.headers, cookies=self.cookies, timeout=5, proxies=self.proxies)
logger.info(response.text)
if response.json()['Result']['Data']['Status'] != 10 and response.json()['Result']['Data']['Status'] != 1000:
url = f'https://vod.bytedanceapi.com/?{play_auth_token}'
response = doudian_request("GET", url, headers=self.headers, cookies=self.cookies, proxies=self.proxies, match_str="PlayInfoList")
logger.info(response)
response_data = response.get("response_data")
if response_data['Result']['Data']['Status'] != 10 and response_data['Result']['Data']['Status'] != 1000:
raise Exception('上传视频失败')
return response.json()['Result']['Data']['PlayInfoList'][0]
return response_data['Result']['Data']['PlayInfoList'][0]
def run(self):
......@@ -618,7 +597,7 @@ def upload_video(task):
if not os.path.exists(file_path_):
logger.info(f"文件 {file_name} 不存在,开始下载")
download_video(task['video_url'], file_path_, headers=task['headers'])
logger.success("下载完成")
logger.info("下载完成")
else:
logger.info(f"文件 {file_name} 已存在,跳过下载")
video_duration = get_video_duration(file_path_)
......@@ -638,7 +617,6 @@ def upload_video(task):
task['file_path_'] = file_path_
upload = Upload(task)
video_id, video_info = upload.run()
logger.success(f"上传成功,视频ID为:{video_id}")
result = {
'video_id': video_id,
'video_info': video_info,
......
......@@ -2,7 +2,6 @@ import asyncio
from celery import shared_task
from loguru import logger
from retrying import retry
from service.create_template import create_template
from service.doudian_service import generate_a_bogus, get_schema, create_global_promotion
......@@ -10,7 +9,7 @@ from service.hub_ import closeBrowser
from service.login import login
from service.sync_shop import syncShop, syncShopInfo
from service.upload_video import upload_video
from utils.errors import AppError, ABogusParamsError, CookiesExpiredError
from utils.errors import AppError
@shared_task(name='sync_shop')
......@@ -26,7 +25,7 @@ def execute_sync_shop(container_name: str):
'error_type': ''
}
except AppError as e:
logger.error(f'店铺同步异常:{e.msg}')
logger.error(f'店铺同步失败:{e.msg}')
return {
'code': e.code,
'msg': e.msg,
......@@ -59,7 +58,7 @@ def execute_sync_shop_info(browser_id: str, listen_url: str, open_url: str):
'error_type': ''
}
except AppError as e:
logger.error(f'同步店铺信息异常:{e.msg}')
logger.error(f'同步店铺信息失败:{e.msg}')
return {
'code': e.code,
'msg': e.msg,
......@@ -89,7 +88,7 @@ def execute_sync_create_template(cookies: dict, template_params: dict, proxies:
'error_type': ''
}
except AppError as e:
logger.error(f'创建运费模板异常:{e.msg}')
logger.error(f'创建运费模板失败:{e.msg}')
return {
'code': e.code,
'msg': e.msg,
......@@ -97,10 +96,10 @@ def execute_sync_create_template(cookies: dict, template_params: dict, proxies:
'error_type': type(e).__name__
}
except Exception as e:
logger.error(f'创建运费模板:{e}')
logger.error(f'创建运费模板异常:{e}')
return {
'code': 500,
'msg': f'创建运费模板:{e}',
'msg': f'创建运费模板异常:{e}',
'data': None,
'error_type': 'InternalError'
}
......@@ -117,7 +116,7 @@ def execute_doudian_login(account: str, password: str, headers: dict, proxies: d
'error_type': ''
}
except AppError as e:
logger.error(f'登录异常:{e.msg}')
logger.error(f'登录失败:{e.msg}')
return {
'code': e.code,
'msg': e.msg,
......@@ -138,6 +137,7 @@ def execute_doudian_upload_video(task: dict):
"""上传视频"""
try:
result = upload_video(task)
logger.success(f'视频上传成功-->{result}')
return {
'code': 200,
'msg': 'success',
......@@ -145,7 +145,7 @@ def execute_doudian_upload_video(task: dict):
'error_type': ''
}
except AppError as e:
logger.error(f'上传视频异常:{e.msg}')
logger.error(f'上传视频失败:{e.msg}')
return {
'code': e.code,
'msg': e.msg,
......@@ -166,6 +166,7 @@ def execute_generate_a_bogus(params:str | dict,data: dict,ua: str, params_type:
"""生成加密参数"""
try:
result = generate_a_bogus(params,data,ua, params_type)
logger.success(f'生成加密成功-->{result}')
return {
'code': 200,
'msg': 'success',
......@@ -186,21 +187,14 @@ def execute_get_schema(headers: dict, proxy_url: str, category_id: str):
"""获取类目参数"""
try:
response = get_schema(headers, proxy_url, category_id)
if not response.text:
raise ABogusParamsError(data=response.text)
if response.json().get('code') == 0:
result = response.json().get('data', {})
return {
'code': 200,
'msg': 'success',
'data': result,
'error_type': ''
}
elif response.json().get('code') == 10004 or response.json().get('code') == 10008:
raise CookiesExpiredError(data=response.text)
logger.error(f'获取类目参数异常-->{response.text}')
raise AppError(msg=response.json().get('msg'), data=response.text)
result = response.get('data', {})
logger.success(f'获取类目参数成功-->{result}')
return {
'code': 200,
'msg': 'success',
'data': result,
'error_type': ''
}
except AppError as e:
logger.error(f'获取类目参数失败-->{e.msg}')
return {
......@@ -218,19 +212,32 @@ def execute_get_schema(headers: dict, proxy_url: str, category_id: str):
'error_type': 'InternalError'
}
@retry(stop_max_attempt_number=3, wait_fixed=3000)
@shared_task(name='create_global_promotion')
def execute_create_global_promotion(params:str|dict, data: dict, headers: dict, proxy_url: str):
"""创建全域推广"""
response = create_global_promotion(params, data, headers, proxy_url)
response_data = response.json()
if response_data.get('status_code', -1) != 0:
logger.error(f"创建全局推广失败 --> {response.text}")
raise Exception(f"创建全局推广失败 --> {response.text}")
result = response_data.get('data', {})
return {
'code': 200,
'msg': 'success',
'data': result,
'error_type': ''
}
try:
response = create_global_promotion(params, data, headers, proxy_url)
result = response.get('data', {})
logger.success(f'创建全域推广成功-->{result}')
return {
'code': 200,
'msg': 'success',
'data': result,
'error_type': ''
}
except AppError as e:
logger.error(f'创建全局推广失败-->{e.msg}')
return {
'code': e.code,
'msg': e.msg,
'data': e.data,
'error_type': type(e).__name__
}
except Exception as e:
logger.error(f'创建全局推广错误-->{e}')
return {
'code': 500,
'msg': f'创建全局推广错误:{e}',
'data': None,
'error_type': 'InternalError'
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment