Commit d688faec by baiquan

feat(api): 添加创建商品接口

- 新增 create_goods 接口和相关请求模型
- 实现 create_goods 函数以调用抖店接口创建商品
- 添加 execute_create_goods 任务处理函数
parent 87f7ae98
...@@ -6,7 +6,7 @@ from pydantic import BaseModel ...@@ -6,7 +6,7 @@ from pydantic import BaseModel
from celery_app import celery_app from celery_app import celery_app
from task_worker import execute_generate_a_bogus, execute_sync_shop, execute_sync_shop_info, \ from task_worker import execute_generate_a_bogus, execute_sync_shop, execute_sync_shop_info, \
execute_sync_create_template, execute_doudian_login, execute_doudian_upload_video, execute_get_schema, \ execute_sync_create_template, execute_doudian_login, execute_doudian_upload_video, execute_get_schema, \
execute_create_global_promotion execute_create_global_promotion, execute_create_goods
from utils.errors import ParamsError, NotFoundError, AppError from utils.errors import ParamsError, NotFoundError, AppError
import os import os
...@@ -53,6 +53,11 @@ class GetSchemaRequest(BaseModel): ...@@ -53,6 +53,11 @@ class GetSchemaRequest(BaseModel):
proxy_url: str proxy_url: str
category_id: str category_id: str
class CreateGoodsRequest(BaseModel):
headers: dict
proxy_url: str
data: dict
class CreateGlobalPromotionRequest(BaseModel): class CreateGlobalPromotionRequest(BaseModel):
params: str | dict params: str | dict
data: dict data: dict
...@@ -174,6 +179,15 @@ def get_schema_request(req: GetSchemaRequest): ...@@ -174,6 +179,15 @@ def get_schema_request(req: GetSchemaRequest):
@sync_router.post( @sync_router.post(
"/create_goods",
status_code=status.HTTP_200_OK,
summary="获取类目参数",
)
def create_goods_request(req: CreateGoodsRequest):
return execute_create_goods(req.headers, req.proxy_url, req.data)
@sync_router.post(
"/create_global_promotion", "/create_global_promotion",
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
summary="创建全域推广", summary="创建全域推广",
......
...@@ -59,6 +59,28 @@ def get_schema(headers: dict, proxy_url: str, category_id: str): ...@@ -59,6 +59,28 @@ def get_schema(headers: dict, proxy_url: str, category_id: str):
response = doudian_request("POST", url, params=params, proxies=proxies, json=json_data, headers=headers) response = doudian_request("POST", url, params=params, proxies=proxies, json=json_data, headers=headers)
return response return response
def create_goods(headers: dict, proxy_url: str, data):
"""获取商品类目"""
params = {
'check_status': '2',
'appid': '1',
}
url = 'https://fxg.jinritemai.com/product/tproduct/addWithSchema'
if headers.get("User-Agent"):
ua = headers['User-Agent']
elif headers.get("user-agent"):
ua = headers['user-agent']
else:
raise Exception('User-Agent not found')
a_bogus = generate_a_bogus(params, data, ua)
params['a_bogus'] = a_bogus
proxies = check_proxy(proxy_url)
response = doudian_request("POST", url, params=params, proxies=proxies, json=data, headers=headers)
return response
def create_global_promotion(params:str | dict,data: dict,headers: dict, proxy_url: str): def create_global_promotion(params:str | dict,data: dict,headers: dict, proxy_url: str):
"""创建全域推广""" """创建全域推广"""
proxies = check_proxy(proxy_url) proxies = check_proxy(proxy_url)
......
...@@ -4,7 +4,7 @@ from celery import shared_task ...@@ -4,7 +4,7 @@ from celery import shared_task
from loguru import logger from loguru import logger
from service.create_template import create_template from service.create_template import create_template
from service.doudian_service import generate_a_bogus, get_schema, create_global_promotion from service.doudian_service import generate_a_bogus, get_schema, create_global_promotion, create_goods
from service.hub_ import closeBrowser from service.hub_ import closeBrowser
from service.login import login from service.login import login
from service.sync_shop import syncShop, syncShopInfo from service.sync_shop import syncShop, syncShopInfo
...@@ -212,6 +212,38 @@ def execute_get_schema(headers: dict, proxy_url: str, category_id: str): ...@@ -212,6 +212,38 @@ def execute_get_schema(headers: dict, proxy_url: str, category_id: str):
'error_type': 'InternalError' 'error_type': 'InternalError'
} }
@shared_task(name='create_goods')
def execute_create_goods(headers: dict, proxy_url: str, data):
"""获取类目参数"""
try:
response = create_goods(headers, proxy_url, data)
result = response.get('data', {})
logger.success(f'创建商品成功-->{str(result)[:200]}·······{str(result)[-200:]}')
return {
'code': 200,
'msg': 'success',
'data': result,
'error_type': ''
}
except AppError as e:
logger.error(f'创建商品失败-->{e.msg}')
return {
'code': e.code,
'msg': e.msg,
'data': e.data,
'error_type': type(e).__name__
}
except Exception as e:
logger.error(f'创建商品错误-->{e}')
return {
'code': 500,
'msg': f'创建商品错误:{str(e)[:1000]}',
'data': None,
'error_type': 'InternalError'
}
@shared_task(name='create_global_promotion') @shared_task(name='create_global_promotion')
def execute_create_global_promotion(params:str|dict, data: dict, headers: dict, proxy_url: str): def execute_create_global_promotion(params:str|dict, data: dict, headers: dict, proxy_url: str):
"""创建全域推广""" """创建全域推广"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment