Commit 76cca24d by baiquan

重构项目

parent 951cc933
# 项目 README
## 简介
这是一个基于 Python 的抖店API项目,旨在提供一个高效、易用的工具来完成特定任务。该项目包含了完整的代码实现以及相关依赖,方便用户快速上手和使用。
## 安装依赖
在运行项目之前,请确保已经安装了 Python、Redis 和 必要的依赖库。可以使用以下命令安装所需的依赖:
```bash
pip install -r requirements.txt
```
## 启动命令
### 启动Celery Worker
```bash
celery -A celery_app worker -l info -P gevent
```
### 启动API服务
```bash
uvicorn api:app --host 0.0.0.0 --port 9001 --reload
```
......@@ -4,7 +4,10 @@ from fastapi import FastAPI, APIRouter
from fastapi import status
from pydantic import BaseModel
from celery_app import celery_app
from errors import *
from task_worker import execute_generate_a_bogus, execute_sync_shop, execute_sync_shop_info, \
execute_sync_create_template, execute_doudian_login, execute_doudian_upload_video, execute_get_schema, \
execute_create_global_promotion
from utils.errors import ParamsError, NotFoundError, AppError
import os
# 创建 logs 目录(如果不存在)
......@@ -43,12 +46,18 @@ class GenerateABogusRequest(BaseModel):
params: str | dict
data: dict = {}
ua: str
params_type: int=1
class GetSchemaRequest(BaseModel):
headers: dict
proxy_url: str
category_id: str
class CreateGlobalPromotionRequest(BaseModel):
params: str | dict
data: dict
headers: dict
proxy_url: str
@app.post(
"/sync_shop",
......@@ -56,142 +65,103 @@ class GetSchemaRequest(BaseModel):
summary="异步触发店铺同步",
description="提交店铺同步任务到后台异步执行,返回任务ID用于查询状态",
)
async def async_sync_shop(req: SyncShopRequest):
async def async_sync_shop_request(req: SyncShopRequest):
"""异步执行店铺同步"""
task = celery_app.send_task('sync_shop', kwargs=req.dict())
return {"task_id": task.id, "status_url": f"/tasks/{task.id}"}
# 新增同步接口
@sync_router.post(
"/sync_shop",
status_code=status.HTTP_200_OK,
summary="同步执行店铺同步"
)
def sync_shop_request(req: SyncShopRequest):
"""同步执行店铺同步(阻塞式)"""
return execute_sync_shop(req.container_name)
@app.post(
"/sync_shop_info",
status_code=status.HTTP_202_ACCEPTED,
summary="异步触发店铺信息同步",
description="提交店铺信息同步任务到后台异步执行,返回任务ID用于查询状态",
)
async def async_sync_shop_info(req: SyncShopInfoRequest):
async def async_sync_shop_info_request(req: SyncShopInfoRequest):
"""异步执行店铺信息同步"""
task = celery_app.send_task('sync_shop_info', kwargs=req.dict())
return {"task_id": task.id, "status_url": f"/tasks/{task.id}"}
# 新增同步接口
@sync_router.post(
"/sync_shop",
status_code=status.HTTP_200_OK,
summary="同步执行店铺同步"
)
def sync_shop(req: SyncShopRequest):
"""同步执行店铺同步(阻塞式)"""
try:
task = celery_app.send_task('sync_shop', kwargs=req.dict())
result = task.get(timeout=60) # 60秒超时
return result
except TimeoutError:
return {
'code': 504,
'msg': '请求超时',
'data': None,
'error_type': 'TimeoutError'
}
@sync_router.post(
"/sync_shop_info",
status_code=status.HTTP_200_OK,
summary="同步执行店铺信息同步",
)
def sync_shop_info(req: SyncShopInfoRequest):
def sync_shop_info_request(req: SyncShopInfoRequest):
"""同步执行店铺信息同步(阻塞式)"""
try:
task = celery_app.send_task('sync_shop_info', kwargs=req.dict())
result = task.get(timeout=180) # 浏览器操作较慢,设置3分钟超时
logger.info(f"任务结果: {result}")
return result
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
return {
'code': 504,
'msg': '请求超时',
'data': None,
'error_type': 'TimeoutError'
}
return execute_sync_shop_info(req.browser_id, req.listen_url, req.open_url)
@sync_router.post(
"/create_template",
status_code=status.HTTP_200_OK,
summary="创建运费模板",
)
def sync_create_template(req: CreateTemplateRequest):
try:
task = celery_app.send_task('sync_create_template', kwargs=req.dict())
result = task.get(timeout=60)
logger.info(f"任务结果: {result}")
return result
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
return {
'code': 504,
'msg': '请求超时',
'data': None,
'error_type': 'TimeoutError'
}
def sync_create_template_request(req: CreateTemplateRequest):
return execute_sync_create_template(req.cookies, req.template_params, req.proxies)
@app.post(
"/login",
status_code=status.HTTP_202_ACCEPTED,
summary="异步触发抖店登录",
description="提交抖店登录任务到后台异步执行,返回任务ID用于查询状态",
)
async def async_doudian_upload_video_request(req: DoudianLoginRequest):
"""异步执行抖店登录"""
task = celery_app.send_task('doudian_login', kwargs=req.dict())
return {"task_id": task.id, "status_url": f"/tasks/{task.id}"}
@sync_router.post(
"/login",
status_code=status.HTTP_200_OK,
summary="抖店登录",
)
def doudian_login(req: DoudianLoginRequest):
try:
task = celery_app.send_task('doudian_login', kwargs=req.dict())
result = task.get(timeout=60)
logger.info(f"任务结果: {result}")
return result
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
return {
'code': 504,
'msg': '请求超时',
'data': None,
'error_type': 'TimeoutError'
}
def doudian_login_request(req: DoudianLoginRequest):
return execute_doudian_login(req.account, req.password, req.headers, req.proxies)
@app.post(
"/upload_video",
status_code=status.HTTP_202_ACCEPTED,
summary="异步触发抖店上传视频",
description="提交抖店上传视频任务到后台异步执行,返回任务ID用于查询状态",
)
async def async_doudian_upload_video_request(req: DoudianUploadVideoRequest):
"""异步执行抖店上传视频"""
task = celery_app.send_task('doudian_upload_video', kwargs=req.dict())
return {"task_id": task.id, "status_url": f"/tasks/{task.id}"}
@sync_router.post(
"/upload_video",
status_code=status.HTTP_200_OK,
summary="抖店上传视频",
)
def doudian_upload_video(req: DoudianUploadVideoRequest):
try:
task = celery_app.send_task('doudian_upload_video', kwargs=req.dict())
result = task.get(timeout=60)
logger.info(f"任务结果: {result}")
return result
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
return {
'code': 504,
'msg': '请求超时',
'data': None,
'error_type': 'TimeoutError'
}
def doudian_upload_video_request(req: DoudianUploadVideoRequest):
return execute_doudian_upload_video(req.task)
@sync_router.post(
"/generate_a_bogus",
status_code=status.HTTP_200_OK,
summary="生成加密参数",
)
def generate_a_bogus(req: GenerateABogusRequest):
try:
task = celery_app.send_task('generate_a_bogus', kwargs=req.dict())
result = task.get(timeout=60)
logger.info(f"任务结果: {result}")
return result
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
return {
'code': 504,
'msg': '请求超时',
'data': None,
'error_type': 'TimeoutError'
}
def generate_a_bogus_request(req: GenerateABogusRequest):
return execute_generate_a_bogus(req.params, req.data, req.ua, req.params_type)
@sync_router.post(
......@@ -199,20 +169,18 @@ def generate_a_bogus(req: GenerateABogusRequest):
status_code=status.HTTP_200_OK,
summary="获取类目参数",
)
def get_schema(req: GetSchemaRequest):
try:
task = celery_app.send_task('get_schema', kwargs=req.dict())
result = task.get(timeout=60)
logger.info(f"任务结果: {result}")
return result
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
return {
'code': 504,
'msg': '请求超时',
'data': None,
'error_type': 'TimeoutError'
}
def get_schema_request(req: GetSchemaRequest):
return execute_get_schema(req.headers, req.proxy_url, req.category_id)
@sync_router.post(
"/create_global_promotion",
status_code=status.HTTP_200_OK,
summary="创建全域推广",
)
def create_global_promotion_request(req: CreateGlobalPromotionRequest):
return execute_create_global_promotion(req.params, req.data, req.headers, req.proxy_url)
# 注册路由
app.include_router(sync_router)
......@@ -223,7 +191,7 @@ app.include_router(sync_router)
summary="查询任务状态",
description="根据任务ID查询异步任务执行状态",
)
def get_task_result(task_id: str):
def get_task_result_request(task_id: str):
try:
if not task_id:
raise ParamsError(msg="任务ID不能为空")
......
from celery import Celery
from config import settings
from utils.config import settings
celery_app = Celery(
'doudian_tasks',
......
import pandas as pd
from loguru import logger
from utils.config import settings
def get_mysql_connection():
"""获取 MySQL 数据库连接"""
try:
connection = pymysql.connect(
host=settings.DB_HOST,
port=settings.DB_PORT, # 默认端口3306
user=settings.DB_USER,
password=settings.DB_PASSWORD,
database=settings.DB_NAME,
charset=settings.DB_CHARSET,
cursorclass=pymysql.cursors.DictCursor,
)
return connection
except pymysql.Error as e:
raise ConnectionError(f"MySQL 连接失败: {e}")
def import_doudian_accounts_to_mysql(file_path):
# 读取Excel文件
df = pd.read_csv(file_path)
# 连接MySQL数据库
connection = get_mysql_connection()
try:
with connection.cursor() as cursor:
# 创建表(如果不存在)
create_table_sql = """
CREATE TABLE IF NOT EXISTS doudian_accounts (
id INT AUTO_INCREMENT PRIMARY KEY,
environment VARCHAR(50) NOT NULL,
environment_id BIGINT NOT NULL UNIQUE,
account VARCHAR(50) NOT NULL,
password VARCHAR(50) NOT NULL,
company VARCHAR(100) NOT NULL
)
"""
cursor.execute(create_table_sql)
# 准备插入或更新数据的SQL
insert_sql = """
INSERT INTO doudian_accounts (environment, environment_id, account, password, company)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
environment = VALUES(environment),
account = VALUES(account),
password = VALUES(password),
company = VALUES(company)
"""
# 遍历DataFrame并执行插入/更新
for _, row in df.iterrows():
# 确保所有字段都有值,不能为NaN
if pd.notna(row['环境ID']):
data = (
str(row['环境']) if pd.notna(row['环境']) else '',
int(row['环境ID']) if pd.notna(row['环境ID']) else 0, # 假设环境ID不能为空
str(row['账号']) if pd.notna(row['账号']) else '',
str(row['密码']) if pd.notna(row['密码']) else '',
str(row['公司']) if pd.notna(row['公司']) else ''
)
cursor.execute(insert_sql, data)
# 提交事务
connection.commit()
logger.info(f"成功导入/更新 {len(df)} 条记录")
except Exception as e:
# 发生错误时回滚
connection.rollback()
logger.error(f"导入数据时发生错误: {e}")
finally:
connection.close()
import pymysql
def find_account_by_environment_id(environment_id):
"""
根据环境ID查找账号密码信息
参数:
environment_id: 要查找的环境ID
返回:
包含账号信息的字典,如果未找到则返回None
"""
# 数据库连接配置
# 连接MySQL数据库
connection = get_mysql_connection()
try:
with connection.cursor() as cursor:
# 执行查询SQL
sql = """
SELECT environment, environment_id, account, password, company
FROM doudian_accounts
WHERE environment_id = %s
"""
cursor.execute(sql, (environment_id,))
# 获取查询结果
result = cursor.fetchone()
if result:
logger.info("找到匹配记录:")
logger.info(f"环境: {result['environment']}")
logger.info(f"环境ID: {result['environment_id']}")
logger.info(f"账号: {result['account']}")
# logger.info(f"密码: {result['password']}")
# logger.info(f"公司: {result['company']}")
return result
else:
logger.error(f"未找到环境ID为 {environment_id} 的记录")
return None
except Exception as e:
logger.error(f"查询时发生错误: {e}")
return None
finally:
connection.close()
# 使用示例
if __name__ == "__main__":
# 查找环境ID为1075702795的账号
account_info = find_account_by_environment_id(1075702795)
# 如果需要将结果用于其他处理
if account_info:
print("\n获取到的账号信息:")
print(account_info)
# # 使用示例
# if __name__ == "__main__":
# excel_file = "doudian_accounts.csv" # 你的Excel文件路径
# import_doudian_accounts_to_mysql(excel_file)
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -21,3 +21,8 @@ tqdm~=4.67.1
gevent~=25.5.1
redis
uvicorn
pandas~=2.2.3
PyYAML~=6.0.2
PyMySQL~=1.1.1
mysql~=0.0.3
mysql-connector-python~=9.3.0
\ No newline at end of file
import random
import time
from urllib.parse import urlencode
import execjs
from curl_cffi import requests, CurlMime
from common import get_ms_token
from login import login
from utils.common import get_ms_token
headers = {
'Accept': 'application/json, text/plain, */*',
......
import json
from urllib.parse import urlencode
import execjs
import requests
from loguru import logger
from utils.common import check_proxy
def generate_a_bogus(params:str | dict,data: dict,ua: str, params_type: int=1):
"""生成加密参数"""
# 直接读取JS文件
if params_type == 1:
with open('./js/a_bogus.js', 'r', encoding='gbk') as f:
js_code = f.read()
elif params_type == 2:
with open('./js/short_a_bogus.js', 'r', encoding='gbk') as f:
js_code = f.read()
else:
raise Exception('params_type error')
# 执行加密算法
ctx = execjs.compile(js_code)
logger.info(f'params: {params}')
logger.info(f'params_type: {type(params)}')
if type(params) == dict:
params = urlencode(params)
logger.info(f'data: {data}')
logger.info(f'data_type: {type(data)}')
data = json.dumps(data)
result = ctx.call('get_ab', params, data, ua)
return result
def get_schema(headers: dict, proxy_url: str, category_id: str):
params = {
'appid': '1',
'_bid': 'ffa_goods',
'msToken': 'R-b2DYMiOLeDM9hJGu-ChiFknO18wFgHEk0gAZ6yW_0n3uCmbNRLJjqd_1JOtdmuUqWk_yO6jvOZizNvhhiJV5fyTMhX72vFvcamLeGAmoNoAEnLPWU4QH1pTkDzaeFMsslo5qx_cjx6RuQMWwkMWGhU_mr3xdR9',
}
json_data = {
'context': {
'feature': {},
'operation_type': 'select_normal,normal',
'category_id': category_id,
'ability': [],
},
'appid': 1,
}
url = 'https://fxg.jinritemai.com/product/tproduct/getSchema'
if headers.get("User-Agent"):
ua = headers['User-Agent']
elif headers.get("user-agent"):
ua = headers['user-agent']
else:
raise Exception('User-Agent not found')
a_bogus = generate_a_bogus(params, json_data, ua)
params['a_bogus'] = a_bogus
proxies = check_proxy(proxy_url)
response = requests.post(url, params=params, headers=headers, json=json_data, verify=False, proxies=proxies)
return response
def create_global_promotion(params:str | dict,data: dict,headers: dict, proxy_url: str):
proxies = check_proxy(proxy_url)
if headers.get("User-Agent"):
ua = headers['User-Agent']
elif headers.get("user-agent"):
ua = headers['user-agent']
else:
raise Exception('User-Agent not found')
a_bogus = generate_a_bogus(params, data, ua, params_type=2)
params['a_bogus'] = a_bogus
url_params = urlencode(params)
url = 'https://qianchuan.jinritemai.com/ad/api/creation/v1/ad/create?' + url_params
response = requests.post(url, json=data, headers=headers, verify=False, proxies=proxies)
return response
import asyncio
import json
import subprocess
from errors import HubAPIError
from utils.errors import HubAPIError
from curl_cffi import requests
from curl_cffi.requests import HttpMethod
from loguru import logger
from config import settings
from utils.config import settings
DOMAIN = settings.HUB_DOMAIN
......
import logging
from urllib.parse import urlencode
from loguru import logger
import requests
from common import get_account_sdk_source_info, get_ms_token, encryptParams, encryptParamsId, update_cookies, \
update_callback_cookies, setup_proxy
from dy_verify import mouse_verify
from utils.common import get_account_sdk_source_info, get_ms_token, encryptParams, encryptParamsId, update_cookies, \
update_callback_cookies, check_proxy
from utils.dy_verify import mouse_verify
def get_callback_cookies(login_cookies, headers, encrypt_params, proxies):
......@@ -63,7 +62,7 @@ def get_callback_cookies(login_cookies, headers, encrypt_params, proxies):
cookies = update_callback_cookies(cookies, headers, redirect_url, proxies)
callback_res = requests.get('https://fxg.jinritemai.com/ecomauth/loginv1/callback', params=params, cookies=cookies,
headers=headers, verify=False, proxies=proxies)
logging.info(f"-------- get_callback_cookies: cookies获取成功!--------")
logger.info(f"-------- get_callback_cookies: cookies获取成功!--------")
return update_cookies(callback_res.cookies, cookies)
......@@ -73,7 +72,7 @@ def login(account, password, headers, proxies):
username = proxies['username']
proxies_password = proxies['password']
proxy_url = f"socks5h://{username}:{proxies_password}@{addr}:{port}"
proxies = setup_proxy(proxy_url)
proxies = check_proxy(proxy_url)
encrypt_params = encryptParams(account, password)
login_params = {
'fp': encrypt_params['fp'],
......@@ -99,15 +98,15 @@ def login(account, password, headers, proxies):
login_res = requests.post(login_url, headers=headers, data=data, proxies=proxies, verify=False)
if login_res.json()['error_code'] == 2046:
logging.info(f"-------- login: 登录失败!--------")
logging.info(f"-------- login: {login_res.json()['description']}--------")
logger.info(f"-------- login: 登录失败!--------")
logger.info(f"-------- login: {login_res.json()['description']}--------")
return None
if login_res.json()['error_code'] == 3:
logging.error(f"-------- login: 登录失败!--------")
logging.error(f"-------- login: {login_res.json()['description']}--------")
logger.error(f"-------- login: 登录失败!--------")
logger.error(f"-------- login: {login_res.json()['description']}--------")
return None
if login_res.json()['description'] == '滑动滑块进行验证':
logging.info(f"-------- login_verify: 正在处理滑块!--------")
logger.info(f"-------- login_verify: 正在处理滑块!--------")
verify_center_decision_conf = login_res.json()['verify_center_decision_conf']
count = 0
while True:
......@@ -118,11 +117,11 @@ def login(account, password, headers, proxies):
continue
login_res = requests.post(login_url, headers=headers, data=data, proxies=proxies, verify=False)
if login_res.json()['description'] != '滑动滑块进行验证':
logging.info(f"-------- login_verify: 滑块处理成功!--------")
logger.info(f"-------- login_verify: 滑块处理成功!--------")
break
if count == 5:
logging.error(f"-------- login_verify: 处理滑块失败!--------")
logging.error(f"-------- login: 登录失败!--------")
logger.error(f"-------- login_verify: 处理滑块失败!--------")
logger.error(f"-------- login: 登录失败!--------")
return None
return dict(get_callback_cookies(login_res.cookies, headers, encrypt_params, proxies))
import re
import time
from datetime import datetime, timedelta
from DrissionPage import Chromium
from DrissionPage._functions.by import By
from loguru import logger
from utils.dy_verify import get_distance_by_ddddocr
from utils.errors import AppError
from service.hub_ import openBrowser, closeBrowser
from dao.db import find_account_by_environment_id
def click_button(tab, loc):
tab.wait(1)
retry = 0
while True:
try:
if tab.ele(loc).click():
break
except AppError as e:
logger.error(f"{loc}点击按钮失败-->{e}")
pass
tab.wait(2)
if retry > 3:
raise Exception(f"点击按钮失败-->{loc}")
async def page_login(browser_id):
open_count = 3
while True:
open_count -= 1
try:
open_result = await openBrowser(browser_id, timeout=30)
data = open_result.get('data', {})
if not data:
raise AppError("启动浏览器失败")
port = data.get('debuggingPort', '')
chromium = Chromium(f"127.0.0.1:{port}")
tab = chromium.latest_tab
if chromium.states.is_existed:
break
else:
logger.error(f'非Hub浏览器-->重试启动浏览器:{open_count}')
tab.close()
await closeBrowser(browser_id)
time.sleep(3)
except Exception as e:
logger.error(f"启动浏览器错误-->{e}")
time.sleep(3)
if open_count == 0:
raise AppError("启动浏览器失败")
doc_loaded = None
tab.get('https://fxg.jinritemai.com/ffa/mshop/homepage/index')
tab.listen.start(["account_login/v2/", "captcha/verify"])
for _ in range(3):
doc_loaded = tab.wait.doc_loaded(timeout=10)
if doc_loaded:
break
if not doc_loaded:
raise AppError("打开浏览器失败")
tab.wait(3)
title = tab.title
if '登录' in title:
retry = 0
while True:
if retry > 3:
raise AppError("重试多次,登录失败")
if retry == 4:
raise AppError(f"{browser_id}-->未找到账号信息")
retry += 1
try:
em = "邮箱登录"
tab.get('https://fxg.jinritemai.com/login/common')
doc_loaded = tab.wait.doc_loaded(timeout=10)
eles_loaded = tab.wait.eles_loaded(em)
if not doc_loaded or not eles_loaded:
continue
click_button(tab, em) # 点击邮箱登录按钮
email_input = tab.ele("@title=请输入邮箱")
password_input = tab.ele("@title=密码")
if email_input.value == "" or password_input.value == "":
info = find_account_by_environment_id(browser_id)
if info:
account = info.get('account')
password = info.get('password')
else:
retry = 4
continue
email_input.input(vals= account, clear=True)
password_input.input(vals= password, clear=True)
click_button(tab, ".auxo-checkbox") # 点击勾选框
click_button(tab, ".account-center-submit") # 点击登录按钮
listen_login(tab)
break
except Exception as e:
logger.error(f"{browser_id}-->{e}")
tab.wait(10)
# 使用正则提取 token
token_pattern = r'"token":\s*"([a-f0-9]{32})"'
token_match = re.search(token_pattern, tab.html)
if token_match:
token = token_match.group(1)
# 设置 30 天后过期
expires = (datetime.now() + timedelta(days=30)).strftime("%a, %d %b %Y %H:%M:%S GMT")
cookies = (
f'_shop_token_={token}; '
'path=/; '
'domain=.fxg.jinritemai.com; '
f'expires={expires}; ' # 指定过期时间
'Secure; ' # 仅 HTTPS 传输
'HttpOnly' # 禁止 JS 访问
)
tab.set.cookies(cookies)
else:
raise AppError("未找到 token")
tab.close()
def listen_login(tab):
for packet in tab.listen.steps(timeout=10):
if 'account_login/v2/' in packet.url:
login_res = packet.response.body
logger.info(f'获取登录数据:{login_res}')
if login_res['description'] == '滑动滑块进行验证':
verify_captcha(tab)
elif login_res['description'] == '':
logger.info("登录成功")
break
else:
raise AppError(f"登录失败 {login_res['description']}")
def verify_captcha(tab):
captcha_verify_image = (By.XPATH, '//*[@id="captcha_verify_image"]')
verify_img_slide = (By.XPATH, '//*[@id="captcha-verify_img_slide"]')
captcha_slider_btn = (By.XPATH, '//*[@id="vc_captcha_box"]/div/div/div[4]/div/div[2]/div[2]/div')
logger.info(f"-------- login_verify: 正在处理滑块!--------")
verify_count = 0
while True:
tab.listen.pause()
tab.listen.resume()
verify_count += 1
if verify_count > 5:
raise AppError("滑块验证失败")
iframe = None
for _ in range(3):
iframe = tab.get_frame('t:iframe')
if iframe:
break
tab.wait(1)
if not iframe:
logger.error("未找到iframe滑块")
break
if not iframe.wait.eles_loaded(captcha_verify_image, timeout=10):
logger.error("未找到验证码图片")
break
target_img_bytes = iframe.ele(captcha_verify_image).src()
slide_img_bytes = iframe.ele(verify_img_slide).src()
x = get_distance_by_ddddocr(slide_img_bytes, target_img_bytes)
logger.info(f'缺口位置: {x}')
_captcha_slider_btn = iframe.ele(captcha_slider_btn)
width = _captcha_slider_btn.rect.size[0]
height = _captcha_slider_btn.rect.size[1]
x = int(x / (width / height)) # 计算实际位置,x/宽高比
logger.info(f"实际缺口位置: {x}")
iframe.actions.hold(_captcha_slider_btn)
iframe.actions.right(x)
iframe.actions.release()
for verify_packet in tab.listen.steps(timeout=30):
if "https://verify.zijieapi.com/captcha/verify" in verify_packet.url:
verify_res = verify_packet.response.body
if verify_res['code'] == 200 or verify_res['code'] == 0:
logger.info(f"-------- login_verify: 滑块处理成功!--------")
return
else:
logger.error("滑块验证失败")
tab.wait(3)
break
\ No newline at end of file
import json
import time
from asyncio.log import logger
from DrissionPage import Chromium
from DrissionPage.common import Settings
from service.hub_ import exportCookie, envList, openBrowser
from utils.config import settings
from utils.errors import NotFoundError, ParamsError, AppError
Settings.set_raise_when_wait_failed(False)
TASK_TYPE_SYNC_SHOP = 1 # 同步店铺
TASK_TYPE_SYNC_SHOP_INFO = 2 # 同步店铺信息
async def syncShop(container_name: str):
"""
同步店铺
:return:
"""
env_list = await envList({'containerName': container_name})
items = env_list.get('data', {}).get('list', {})
result = []
if not items:
raise NotFoundError("环境列表为空")
for item in items:
# 读取配置Cookie
browser_id = item.get('containerCode', '')
export_cookie = await exportCookie({'containerCode': browser_id})
cookie = export_cookie.get('data', {})
if not cookie:
raise NotFoundError("Cookie为空")
cookies = json.loads(cookie)
domains = settings.DOUDIAN_COOKIE_DOMAINS
new_cookies = []
for cookie in cookies:
if cookie.get('Domain') in domains:
new_cookies.append(cookie)
accounts = item.get('accounts', {})
result.append({
'proxy': {
'host': item.get('proxyHost', ''),
'port': item.get('proxyPort', ''),
'type': item.get('proxyTypeName', ''),
},
'browser': {
'id': item.get('containerCode', ''),
'name': item.get('containerName', ''),
'create_time': item.get('createTime', ''),
},
'cookie': new_cookies,
'accounts': {
'name': accounts[0].get('accountName', ''),
'supplier_id': accounts[0].get('supplierId', ''),
},
'shop': {
'id': item.get('containerCode', ''),
'name': item.get('containerName', ''),
'browser_type': '',
}
})
return result
async def syncShopInfo(parameter: dict = None):
"""
同步店铺信息
:param task:
:return:
"""
browser_id = parameter.get('browser_id', '')
if not browser_id:
raise ParamsError("浏览器ID为空")
logger.info(f'同步店铺信息:{browser_id}')
# browser_type = parameter.get('browser_type', '')
open_url = parameter.get('open_url', '')
if not open_url:
raise ParamsError("打开地址为空")
open_result = await openBrowser(browser_id)
data = open_result.get('data', {})
if not data:
raise AppError("启动浏览器失败")
port = data.get('debuggingPort', '')
chromium = Chromium(f"127.0.0.1:{port}")
tab = chromium.latest_tab
listen_url = parameter['listen_url']
if not listen_url:
raise ParamsError("监听地址为空")
tab.listen.start(listen_url)
if tab.url != open_url:
tab.get(open_url)
else:
tab.refresh()
time.sleep(3)
tab.refresh()
packet = tab.listen.wait(timeout=20)
if not packet:
raise NotFoundError("未捕获到有效数据包")
body = packet.response.body
logger.info(f'获取数据:{body}')
if not body:
raise NotFoundError('获取数据失败')
# callback_data = body.get('data', {})
callback_data = body
if not callback_data:
logger.error(json.dumps(body))
raise NotFoundError('获取数据为空')
# 获取cookie
try:
callback_data['cookie'] = tab.cookies().as_str()
callback_data['user_agent'] = tab.user_agent
callback_data['browser_id'] = parameter.get('browser_id', '')
return callback_data
except:
raise NotFoundError('获取数据失败')
\ No newline at end of file
......@@ -13,7 +13,7 @@ from loguru import logger
from tenacity import retry, stop_after_attempt, wait_fixed
from tqdm import tqdm
from common import setup_proxy
from utils.common import check_proxy
HEADERS = {
"Content-Type": "application/json",
......@@ -522,7 +522,7 @@ class Upload:
username = proxies['username']
proxies_password = proxies['password']
proxy_url = f"socks5h://{username}:{proxies_password}@{addr}:{port}"
proxies = setup_proxy(proxy_url)
proxies = check_proxy(proxy_url)
self.proxies = proxies
self.file_size = os.path.getsize(self.file_path_)
logger.info("开始获取上传token")
......
......@@ -4,14 +4,17 @@ HUB_APP_ID = "password"
[production]
DEBUG = true
SHEIN_COOKIE_DOMAINS = "fxg.jinritemai.com,.fxg.jinritemai.com,.compass.jinritemai.com,.buyin.jinritemai.com,.jinritemai.com"
# HUB
DOUDIAN_COOKIE_DOMAINS = "fxg.jinritemai.com,.fxg.jinritemai.com,.compass.jinritemai.com,.buyin.jinritemai.com,.jinritemai.com"
HUB_DOMAIN = "http://127.0.0.1:6873"
HUB_EXE_PATH = "D://hubstudio//Hubstudio.exe"
HUB_GROUP_ID = "doudian"
HUB_APP_SECRET = "password"
# celery
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
# 系统域名
SYSTEM_DOMAINS = "https://sheinss.top"
SYSTEM_APP_NAME = "admin"
......@@ -20,5 +23,10 @@ SYSTEM_TASK_QUEUE_NUMBER = 10
SYSTEM_MIN_TASK_QUEUE_NUMBER = 5
SYSTEM_PROCESS_NUMBER = 5
# sqlite3
DB_NAME = "shein_order.db"
\ No newline at end of file
# mysql
DB_HOST = "127.0.0.1"
DB_PORT = 3306
DB_USER = "root"
DB_PASSWORD = "123456"
DB_NAME = "doudian"
DB_CHARSET = "utf8mb4"
\ No newline at end of file
import asyncio
import json
import sys
import time
import concurrent.futures
from curl_cffi import requests
from loguru import logger
from service.hub_ import closeBrowser, envList, exportCookie
from service.page_login import page_login
HUB_DOMAIN = "http://127.0.0.1:6873"
DOMAIN = "http://159.75.92.198:8809"
# DOMAIN = "http://dou-order.test"
DEBUG = False
API_PORT = 9001
import os
# 创建 logs 目录(如果不存在)
os.makedirs("logs", exist_ok=True)
# 添加日志文件输出,按天滚动
logger.add("logs/shop.log", rotation="1 day", level="INFO", encoding="utf-8", backtrace=True, diagnose=True)
async def get_task(params: dict = None):
"""
获取任务
"""
return requests.request('GET', f'{DOMAIN}/api/collection/task', headers=DEFAULT_HEADER, params=params).json()
async def callback_task(data: dict):
"""
回调任务
:param data:
:return:
"""
return requests.request('POST', f"{DOMAIN}/api/collection/task", json=data, headers=DEFAULT_HEADER)
async def formatCallback(task: dict, result: dict) -> dict:
"""
格式化回调数据
:param task:
:param result:
:return dict:
"""
return {
'app_name': task.get('app_name', ''),
'admin_users_id': task.get('admin_users_id', ''),
'type': task.get('type', ''),
'result': result,
}
async def syncShop(task: dict = None):
"""i
同步店铺
:param task:
:return:
"""
DOMAINS = '.jinritemai.com,.fxg.jinritemai.com,.doudian-sso.jinritemai.com'
browser_default_id = task.get('browser_id', None)
# 如果存在 browser_default_id 则需要进行页面登录
for i in range(1, 100):
query_data = {
'containerName': '抖店',
'current': i,
'size': 200,
}
if browser_default_id:
try:
await page_login(browser_default_id)
except Exception as e:
logger.error(f'{browser_default_id}-->页面登录失败:{e}')
await closeBrowser(browser_default_id)
return
query_data['containerCodes'] = [browser_default_id]
env_list = await envList(query_data)
items = env_list.get('data', {}).get('list', {})
total = env_list.get('data', {}).get('total', 0)
logger.info(f'环境总数:{total}, 页数:{i}')
if not items:
logger.error('环境列表为空')
await closeBrowser(browser_default_id)
return
for item in items:
# 读取配置Cookie
browser_id = item.get('containerCode', '')
logger.info(f'读取配置Cookie:{browser_id}')
proxy_host = item.get('proxyHost', '')
if not proxy_host:
logger.error('代理为空')
continue
accounts = item.get('accounts', {})
# 回调任务
if not accounts:
logger.error('账号为空')
continue
account = accounts[0]
export_cookie = await exportCookie({'containerCode': browser_id})
time.sleep(1)
cookie = export_cookie.get('data', {})
if not cookie:
logger.error('Cookie为空')
continue
cookies = json.loads(cookie)
new_cookies = dict()
for cookie in cookies:
if cookie.get('Domain') in DOMAINS:
new_cookies[cookie.get('Name')] = cookie.get('Value')
callback_data = await formatCallback(task, {
'proxy': {
'host': item.get('proxyHost', ''),
'port': item.get('proxyPort', ''),
'type': item.get('proxyTypeName', ''),
},
'browser': {
'id': item.get('containerCode', ''),
'name': item.get('containerName', ''),
'create_time': item.get('createTime', ''),
},
'cookie': new_cookies,
'user_agent': item.get('ua', ''),
'accounts': {
'name': account.get('accountName', ''),
'supplier_id': account.get('supplierId', ''),
},
'shop': {
'id': item.get('containerCode', ''),
'name': item.get('containerName', ''),
'browser_type': '',
}
})
logger.info(json.dumps(callback_data))
await callback_task(callback_data)
if browser_default_id:
await closeBrowser(browser_default_id)
return
def run_sync_shop(task):
"""在新的线程中运行 syncShop 的包装函数"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(syncShop(task))
except Exception as e:
logger.error(f"同步店铺任务执行失败: {e}")
finally:
loop.close()
async def handle_task(task):
"""处理单个任务,根据类型分发"""
type_ = task.get('type', '')
if type_ == 1:
# 同步店铺任务 - 使用线程池执行
with concurrent.futures.ThreadPoolExecutor(max_workers=INNER_MAX_WORKERS) as inner_executor:
future = inner_executor.submit(run_sync_shop, task)
try:
future.result(timeout=300) # 设置5分钟超时
except concurrent.futures.TimeoutError:
logger.error(f"同步店铺任务超时: {task}")
except Exception as e:
logger.error(f"同步店铺任务异常: {e}")
else:
logger.warning(f"未知任务类型: {type_}, 跳过处理")
async def run():
upload_data = {
'queue': 'task-queue',
'number': 1
}
try:
tasks = await get_task(upload_data)
# tasks = {"data": [{"app_name": "admin", "type": 1, "browser_type": 1}]}
# tasks = {"data": [{"app_name": "admin", "type": 1, "browser_type": 1, "browser_id": "1075653218"}]}
except:
logger.error('获取任务失败')
return
logger.info(json.dumps(tasks))
tasks = tasks.get('data', {})
if not tasks:
logger.error('没有任务')
return
# 使用线程池处理所有任务
with concurrent.futures.ThreadPoolExecutor(max_workers=OUTER_MAX_WORKERS) as outer_executor:
# 为每个任务创建处理线程
futures = []
for task in tasks:
# 提交任务处理到线程池
future = outer_executor.submit(
lambda t: asyncio.run(handle_task(t)),
task
)
futures.append(future)
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"任务处理异常: {e}")
if __name__ == '__main__':
# 外层线程池大小
OUTER_MAX_WORKERS = 5
# 内层线程池大小(每个同步店铺任务)
INNER_MAX_WORKERS = 3
argv = sys.argv
if len(argv) != 2:
logger.error("请传入参数")
sys.exit(0)
else:
CODE = argv[1]
DEFAULT_HEADER = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization-Code": CODE,
}
while True:
asyncio.run(run())
time.sleep(10)
import asyncio
import json
import os
from urllib.parse import urlencode
import execjs
import requests
from celery import shared_task
from loguru import logger
from common import setup_proxy
from errors import AppError
from hub_ import closeBrowser
from login import login
from main import syncShop, syncShopInfo, createTemplate
from upload_video import upload_video
from service.create_template import create_template
from service.doudian_service import generate_a_bogus, get_schema, create_global_promotion
from service.hub_ import closeBrowser
from service.login import login
from service.sync_shop import syncShop, syncShopInfo
from service.upload_video import upload_video
from utils.errors import AppError, ABogusParamsError
# 创建 logs 目录(如果不存在)
os.makedirs("logs", exist_ok=True)
# 添加日志文件输出,按7天滚动
logger.add("logs/api.log", rotation="7 day", level="INFO", encoding="utf-8", backtrace=True, diagnose=True)
@shared_task(name='sync_shop')
def execute_sync_shop(container_name: str):
......@@ -89,7 +80,7 @@ def execute_sync_shop_info(browser_id: str, listen_url: str, open_url: str):
def execute_sync_create_template(cookies: dict, template_params: dict, proxies: dict):
"""创建运费模板"""
try:
result = asyncio.run(createTemplate(cookies, template_params, proxies))
result = asyncio.run(create_template(cookies, template_params, proxies))
return {
'code': 200,
'msg': 'success',
......@@ -170,18 +161,10 @@ def execute_doudian_upload_video(task: dict):
}
@shared_task(name='generate_a_bogus')
def execute_generate_a_bogus(params:str | dict,data: dict,ua: str):
def execute_generate_a_bogus(params:str | dict,data: dict,ua: str, params_type: int=1):
"""生成加密参数"""
try:
# 直接读取JS文件
with open('./js/a_bogus.js', 'r', encoding='gbk') as f:
js_code = f.read()
# 执行加密算法
ctx = execjs.compile(js_code)
if type(params) == dict:
params = urlencode(params)
data = json.dumps(data)
result = ctx.call('get_ab', params, data, ua)
result = generate_a_bogus(params,data,ua, params_type)
return {
'code': 200,
'msg': 'success',
......@@ -201,32 +184,7 @@ def execute_generate_a_bogus(params:str | dict,data: dict,ua: str):
def execute_get_schema(headers: dict, proxy_url: str, category_id: str):
"""获取类目参数"""
try:
params = {
'appid': '1',
'_bid': 'ffa_goods',
'msToken': 'R-b2DYMiOLeDM9hJGu-ChiFknO18wFgHEk0gAZ6yW_0n3uCmbNRLJjqd_1JOtdmuUqWk_yO6jvOZizNvhhiJV5fyTMhX72vFvcamLeGAmoNoAEnLPWU4QH1pTkDzaeFMsslo5qx_cjx6RuQMWwkMWGhU_mr3xdR9',
}
json_data = {
'context': {
'feature': {},
'operation_type': 'select_normal,normal',
'category_id': category_id,
'ability': [],
},
'appid': 1,
}
url = 'https://fxg.jinritemai.com/product/tproduct/getSchema'
res = execute_generate_a_bogus(params, json_data, headers['user-agent'])
if res.get('code') != 200:
return {
'code': 500,
'msg': '生成加密参数失败',
'data': None,
'error_type': ''
}
params['a_bogus'] = res['data']
proxies = setup_proxy(proxy_url)
response = requests.post(url, params=params, headers=headers, json=json_data, verify=False, proxies=proxies)
response = get_schema(headers, proxy_url, category_id)
if response.text and response.json().get('code') == 0:
result = response.json().get('data', {})
return {
......@@ -235,13 +193,41 @@ def execute_get_schema(headers: dict, proxy_url: str, category_id: str):
'data': result,
'error_type': ''
}
raise ABogusParamsError(data=response.text)
except AppError as e:
logger.error(f'获取类目参数失败-->{e}')
return {
'code': e.code,
'msg': e.msg,
'data': e.data,
'error_type': type(e).__name__
}
except Exception as e:
logger.error(f'获取类目参数失败-->{e}')
return {
'code': 500,
'msg': '获取类目参数失败',
'data': response.text,
'msg': f'获取类目参数失败:{e}',
'data': None,
'error_type': 'InternalError'
}
@shared_task(name='create_global_promotion')
def execute_create_global_promotion(params:str|dict, data: dict, headers: dict, proxy_url: str):
"""创建全域推广"""
try:
response = create_global_promotion(params, data, headers, proxy_url)
response_data = response.json()
if response_data.get('status_code', -1) != 0:
raise Exception(f"创建全局推广失败 --> {response.text}")
result = response_data.get('data', {})
return {
'code': 200,
'msg': 'success',
'data': result,
'error_type': ''
}
except AppError as e:
logger.error(f'获取类目参数失败-->{e}')
return {
'code': e.code,
'msg': e.msg,
......@@ -249,14 +235,10 @@ def execute_get_schema(headers: dict, proxy_url: str, category_id: str):
'error_type': type(e).__name__
}
except Exception as e:
logger.error(f'获取类目参数异常:{e}')
logger.error(f'params-->{params}')
logger.error(f'json_data-->{json_data}')
logger.error(f'headers-->{headers}')
logger.error(f'proxy_url-->{proxy_url}')
logger.error(f'创建全局推广异常:{e}')
return {
'code': 500,
'msg': f'获取类目参数失败:{e}',
'msg': f'创建全局推广异常:{e}',
'data': None,
'error_type': 'InternalError'
}
\ No newline at end of file
......@@ -3,6 +3,7 @@ from urllib.parse import urlencode
import execjs
import requests
from urllib3 import Retry
cookie = {
"PHPSESSID": "29c772c0cf8efc76ebf9169cb55dfc95",
......@@ -34,5 +35,39 @@ data = json.dumps(json_data)
ab = ctx.call('get_ab', url_params, data, headers['user-agent'])
params['a_bogus'] = ab
# params['a_bogus'] = "DXWhQRu6dk2ivfmk5fCLfY3qV-zfYsuY0SVkMDheJaV-/y39HMOP9exYg/Xv8LbexG/ZIbDjy4hbO3xprQAjM36UHmJx/2aBmDSkKl5Q59YC53ineyfQE0hO-ib3SFad5XNdECifqiKGKuRplnl60fAAPeb="
response = requests.post(url, params=params, cookies=cookie, headers=headers, json=json_data)
# response = requests.post(url, params=params, cookies=cookie, headers=headers, json=json_data)
# print(response.text)
proxy_url = "socks5h://UKZ5TYSJUQNB:3X4Q79VMDPK0@112.91.140.119:9001"
proxies = {
"http": proxy_url,
"https": proxy_url
}
session = requests.Session()
# 配置 SOCKS5 代理 + 远程 DNS 解析
print(proxies)
session.proxies.update(proxies)
retries = Retry(
total=5,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504],
allowed_methods=frozenset(['GET', 'POST'])
)
# 调整 SSL 配置
# 测试实际业务域名连通性
test_res = session.get('http://httpbin.org/ip', timeout=10,proxies=proxies)
print(test_res.text)
addr = proxy_url.split('@')[1].split(':')[0]
# print(addr)
import requests
url = "http://www.cip.cc"
headers = {
'user-agent': 'curl Dalvik/2.1.0 (Linux; U; Android 8.1.0; Pixel Build/NMF26F)'
}
# proxies = {'http': 'http://localhost:1087', 'https': 'http://localhost:1087'}
response = session.post(url, headers=headers)
print(response.text)
import json
import logging
import random
import time
from hashlib import md5
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from urllib.parse import urlparse
import requests
from utils.errors import AppError
from loguru import logger
from errors import *
def setup_proxy(proxy_url):
session = requests.Session()
# 配置 SOCKS5 代理 + 远程 DNS 解析
# 检查代理
def check_proxy(proxy_url):
proxies = {
"http": proxy_url,
"https": proxy_url
}
session.proxies = proxies
addr = proxy_url.split('@')[1].split(':')[0]
# 配置重试机制
retries = Retry(
total=5,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504],
allowed_methods=frozenset(['GET', 'POST'])
)
# 调整 SSL 配置
session.verify = False
session.mount('https://', HTTPAdapter(
max_retries=retries,
pool_connections=100,
pool_maxsize=100
))
# 测试实际业务域名连通性
addr = urlparse(proxy_url).hostname
try:
test_res = session.get(
'https://fxg.jinritemai.com',
timeout=15,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36'}
, proxies=proxies
)
if test_res.status_code == 200:
logging.info("业务域名连通性验证成功")
test_res = session.get('http://httpbin.org/ip', timeout=10, proxies=proxies).json()
logger.info("业务域名连通性验证成功")
test_res = requests.get('http://httpbin.org/ip', timeout=10, proxies=proxies).json()
if test_res['origin'] == addr:
logging.info(f"代理验证成功")
logger.info(f"代理验证成功")
return proxies
else:
raise AppError(msg="代理验证失败")
else:
raise AppError(msg="业务域名连通性验证失败")
except Exception as e:
raise AppError(msg=f"代理验证失败: {str(e)}")
......
......@@ -20,3 +20,7 @@ class ParamsError(AppError):
def __init__(self, msg='参数错误', data=None):
super().__init__(code=400, msg=msg, data=data)
class ABogusParamsError(AppError):
"""a_bogus参数异常"""
def __init__(self, msg='a_bogus参数错误', data=None):
super().__init__(code=401, msg=msg, data=data)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment