Commit e92f1527 by baiquan

添加获取类目参数接口

parent 15ba1121
......@@ -44,6 +44,11 @@ class GenerateABogusRequest(BaseModel):
data: dict = {}
ua: str
class GetSchemaRequest(BaseModel):
headers: dict
proxies: dict
category_id: str
@app.post(
"/sync_shop",
......@@ -188,6 +193,27 @@ def generate_a_bogus(req: GenerateABogusRequest):
'error_type': 'TimeoutError'
}
@sync_router.post(
"/get_schema",
status_code=status.HTTP_200_OK,
summary="获取类目参数",
)
def get_schema(req: GetSchemaRequest):
try:
task = celery_app.send_task('get_schema', kwargs=req.dict())
result = task.get(timeout=60)
logger.info(f"任务结果: {result}")
return result
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
return {
'code': 504,
'msg': '请求超时',
'data': None,
'error_type': 'TimeoutError'
}
# 注册路由
app.include_router(sync_router)
......
......@@ -5,13 +5,14 @@ from urllib.parse import urlencode
import execjs
from celery import shared_task
from loguru import logger
from common import setup_proxy
from errors import AppError
from hub_ import closeBrowser
from login import login
from main import syncShop, syncShopInfo, createTemplate
from upload_video import upload_video
from loguru import logger
# 创建 logs 目录(如果不存在)
os.makedirs("logs", exist_ok=True)
......@@ -176,12 +177,8 @@ def execute_generate_a_bogus(params:str | dict,data: dict,ua: str):
js_code = f.read()
# 执行加密算法
ctx = execjs.compile(js_code)
logger.info(f'params: {params}')
logger.info(f'params_type: {type(params)}')
if type(params) == dict:
params = urlencode(params)
logger.info(f'data: {data}')
logger.info(f'data_type: {type(data)}')
data = json.dumps(data)
result = ctx.call('get_ab', params, data, ua)
return {
......@@ -197,4 +194,65 @@ def execute_generate_a_bogus(params:str | dict,data: dict,ua: str):
'msg': f'生成加密异常:{e}',
'data': None,
'error_type': 'InternalError'
}
@shared_task(name='get_schema')
def execute_get_schema(headers: dict, proxies: dict, category_id: str):
"""获取类目参数"""
try:
params = {
'appid': '1',
'_bid': 'ffa_goods',
'msToken': 'R-b2DYMiOLeDM9hJGu-ChiFknO18wFgHEk0gAZ6yW_0n3uCmbNRLJjqd_1JOtdmuUqWk_yO6jvOZizNvhhiJV5fyTMhX72vFvcamLeGAmoNoAEnLPWU4QH1pTkDzaeFMsslo5qx_cjx6RuQMWwkMWGhU_mr3xdR9',
}
json_data = {
'context': {
'feature': {},
'operation_type': 'select_normal,normal',
'category_id': category_id,
'ability': [],
},
'appid': 1,
}
url = 'https://fxg.jinritemai.com/product/tproduct/getSchema'
res = execute_generate_a_bogus(params, json_data, headers['user-agent'])
if res.get('code') != 200:
return {
'code': 500,
'msg': '生成加密参数失败',
'data': None,
'error_type': ''
}
params['a_bogus'] = res['data']
addr = proxies['addr']
port = proxies['port']
username = proxies['username']
proxies_password = proxies['password']
session = setup_proxy(addr, int(port), username, proxies_password)
response = session.post(url, params=params, headers=headers, json=json_data)
if response.text and response.json().get('code') == 0:
result = response.json().get('data', {})
return {
'code': 200,
'msg': 'success',
'data': result,
'error_type': ''
}
return {
'code': 500,
'msg': '获取类目参数失败',
'data': None,
'error_type': ''
}
except Exception as e:
logger.error(f'获取类目参数异常:{e}')
logger.error(f'params-->{params}')
logger.error(f'json_data-->{json_data}')
logger.error(f'headers-->{headers}')
logger.error(f'proxies-->{proxies}')
return {
'code': 500,
'msg': f'获取类目参数失败:{e}',
'data': None,
'error_type': 'InternalError'
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment