Commit 025ccc1a by baiquan

refactor(service): 优化抖店服务和上传任务的超时处理

- 增加上传任务的超时控制,设置总超时时间为 65 秒
- 优化上传任务的并行处理逻辑,使用 asyncio.wait_for进行超时控制
-增加错误消息记录,提高错误处理的可读性
- 调整视频上传的超时时间为剩余时间,确保总超时时间不超过设定值
parent 00d8acb1
...@@ -148,7 +148,7 @@ def generate_video(task: dict): ...@@ -148,7 +148,7 @@ def generate_video(task: dict):
raise Exception('User-Agent not found') raise Exception('User-Agent not found')
retry_count = 0 retry_count = 0
while True: while True:
if retry_count > 3: if retry_count > 2:
raise Exception('生成视频失败') raise Exception('生成视频失败')
a_bogus = generate_a_bogus(params, json_data, ua) a_bogus = generate_a_bogus(params, json_data, ua)
params['a_bogus'] = a_bogus params['a_bogus'] = a_bogus
...@@ -177,7 +177,11 @@ def get_task_result(task_id: str, headers: dict, tool_source: str, ua, proxies, ...@@ -177,7 +177,11 @@ def get_task_result(task_id: str, headers: dict, tool_source: str, ua, proxies,
a_bogus = generate_a_bogus(params, {}, ua) a_bogus = generate_a_bogus(params, {}, ua)
params['a_bogus'] = a_bogus params['a_bogus'] = a_bogus
url = 'https://fxg.jinritemai.com/product/tproduct/material/imageTextVideo/queryImgOptimizeTask4PC' url = 'https://fxg.jinritemai.com/product/tproduct/material/imageTextVideo/queryImgOptimizeTask4PC'
start_time = time.time()
while True: while True:
if time.time() - start_time > 30:
logger.error('任务执行超时')
return None
response = doudian_request("GET", url, proxies, params, headers=headers, cookies=cookies) response = doudian_request("GET", url, proxies, params, headers=headers, cookies=cookies)
msg = response['data']['msg'] msg = response['data']['msg']
logger.info(f"{task_id} --> {msg}") logger.info(f"{task_id} --> {msg}")
......
...@@ -2,6 +2,7 @@ import asyncio ...@@ -2,6 +2,7 @@ import asyncio
import hashlib import hashlib
import json import json
import os import os
import time
import requests import requests
from PIL import Image from PIL import Image
...@@ -136,6 +137,8 @@ async def uploadImageAndVideo(task: dict = None): ...@@ -136,6 +137,8 @@ async def uploadImageAndVideo(task: dict = None):
headers = task.get('headers') headers = task.get('headers')
item_id = task.get('item_id') item_id = task.get('item_id')
skus = task.get('skus') skus = task.get('skus')
sku_image_dict, image_dict, description_dict, video_dict = None, None, None, None
error_msg = ""
# 准备SKU图片上传 # 准备SKU图片上传
sku_image_list = [] sku_image_list = []
for sku in skus: for sku in skus:
...@@ -166,17 +169,32 @@ async def uploadImageAndVideo(task: dict = None): ...@@ -166,17 +169,32 @@ async def uploadImageAndVideo(task: dict = None):
check_image_width_height(local_path, 2) check_image_width_height(local_path, 2)
description_list.append({md5_key: local_path}) description_list.append({md5_key: local_path})
try: try:
# 并行处理所有上传任务 # 设置超时时间为60秒
sku_image_dict, image_dict, description_dict = await asyncio.gather( timeout_seconds = 65
run_in_executor(upload_image_by_bytes, cookies, headers, proxies, sku_image_list), start_time = time.time()
run_in_executor(upload_image_by_bytes, cookies, headers, proxies, image_list), # 使用asyncio.wait_for设置超时并行处理所有上传任务
run_in_executor(upload_image_by_bytes, cookies, headers, proxies, description_list), sku_image_dict, image_dict, description_dict = await asyncio.wait_for(
asyncio.gather(
run_in_executor(upload_image_by_bytes, cookies, headers, proxies, sku_image_list),
run_in_executor(upload_image_by_bytes, cookies, headers, proxies, image_list),
run_in_executor(upload_image_by_bytes, cookies, headers, proxies, description_list)
),
timeout=timeout_seconds
) )
remaining_time = timeout_seconds - (time.time() - start_time)
image_list = list(image_dict.values()) image_list = list(image_dict.values())
video_dict = await upload_videos(task, item_id, image_list)
# 对视频上传也添加超时控制
video_dict = await asyncio.wait_for(
upload_videos(task, item_id, image_list),
timeout=remaining_time
)
except asyncio.TimeoutError:
error_msg = "上传任务超过指定时间未完成,发生超时"
logger.error(error_msg)
except Exception as e: except Exception as e:
logger.error(f"上传过程中发生错误: {str(e)}") error_msg = f"上传过程中发生错误: {str(e)}"
return None logger.error(error_msg)
# 构建回调数据结构 # 构建回调数据结构
callback_data = { callback_data = {
"id": item_id, "id": item_id,
...@@ -184,6 +202,7 @@ async def uploadImageAndVideo(task: dict = None): ...@@ -184,6 +202,7 @@ async def uploadImageAndVideo(task: dict = None):
"images": image_dict, "images": image_dict,
"description": description_dict, "description": description_dict,
"video_list": video_dict, "video_list": video_dict,
"error_msg": error_msg
# "type": 2 # "type": 2
} }
logger.info(json.dumps(callback_data)) logger.info(json.dumps(callback_data))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment