Commit 128bb4f2 by baiquan

优化上传图片与视频

parent 4c6547c8
......@@ -73,13 +73,33 @@ def create_global_promotion(params:str | dict,data: dict,headers: dict, proxy_ur
response = doudian_request("POST", url, proxies=proxies, json=data, headers=headers)
return response
def search_category(product_name, cookies, headers, proxies):
"""you"""
if 15 > len(product_name) > 60:
raise Exception('商品标题长度必须在15-60字符之间')
params = {
'key': product_name,
'search_type': '1',
'appid': '1',
'_bid': 'ffa_goods',
}
url = 'https://fxg.jinritemai.com/product/tproduct/searchCategoryN'
response = doudian_request("GET", url, proxies, params, headers=headers, cookies=cookies)
return response['data']
# AI视频生成
def generate_video(task: dict):
addr = task.get("proxies")["addr"]
port = task.get("proxies")["port"]
username = task.get("proxies")["username"]
password = task.get("proxies")["password"]
proxy_url = f"socks5h://{username}:{password}@{addr}:{port}"
if type(task.get("proxies")) == dict:
addr = task.get("proxies")["addr"]
port = task.get("proxies")["port"]
username = task.get("proxies")["username"]
password = task.get("proxies")["password"]
proxy_url = f"socks5h://{username}:{password}@{addr}:{port}"
elif type(task.get("proxies")) == str:
proxy_url = task.get("proxies")
else:
raise ValueError("代理格式错误")
proxies = check_proxy(proxy_url)
cookies = task.get('cookie')
headers = task.get('headers')
generate_video_data = task.get('generate_video_data')
......@@ -93,7 +113,6 @@ def generate_video(task: dict):
# "https://p3-aio.ecombdimg.com/obj/ecom-shop-material/jpeg_m_995815fcd3378e7f48765b3ef3fc9bec_sx_119123_www810-1080",
# "https://p3-aio.ecombdimg.com/obj/ecom-shop-material/jpeg_m_b1fe80bb8228d713eabc880561120823_sx_53964_www810-1080",
# "https://p3-aio.ecombdimg.com/obj/ecom-shop-material/jpeg_m_cc78dc386c3359945173fc9cb57297c4_sx_137165_www810-1080"],
# 'product_industrys': ["住宅家具", "几类", "角几/边几", ""],
# }
params = {
'appid': '1',
......@@ -101,7 +120,18 @@ def generate_video(task: dict):
}
product_name = generate_video_data['product_name']
customized_images = generate_video_data['customized_images']
product_industrys = generate_video_data['product_industrys']
logger.info(len(customized_images))
if len(customized_images) < 3:
raise Exception('生成视频的图片数量至少为3张')
category_list = search_category(product_name, cookies, headers, proxies)
if not category_list:
raise Exception('未找到商品分类')
first_name = category_list[0].get('first_name', '')
second_name = category_list[0].get('second_name', '')
third_name = category_list[0].get('third_name', '')
fourth_name = category_list[0].get('fourth_name', '')
product_industrys = [first_name, second_name, third_name, fourth_name]
logger.info(f"商品分类:{product_industrys}")
json_data = {
'product_id': '',
'optimize_strategy': '视频一键生成',
......@@ -117,7 +147,6 @@ def generate_video(task: dict):
'appid': 1,
'_bid': 'ffa_goods',
}
proxies = check_proxy(proxy_url)
if headers.get("User-Agent"):
ua = headers['User-Agent']
elif headers.get("user-agent"):
......
......@@ -9,10 +9,11 @@ from loguru import logger
from requests_toolbelt.multipart.encoder import MultipartEncoder
from config import settings
from service.doudian_request import doudian_request
from service.upload_video import upload_video_with_multithreading
from service.upload_video import upload_video_with_multithreading, download_video
from utils.common import check_proxy, callback_task
# 图片转换成正方形
def convert_rect_to_square(image_path):
# 打开原始图片并转换为 RGBA(支持透明通道)
......@@ -59,22 +60,25 @@ def check_image_width_height(image_path, image_type: int = 1):
raise Exception(f"{image_path} --> 图片类型错误")
def get_local_path(item_id, url):
def get_local_path(item_id, original_url):
folder_path = os.path.join(settings.BASE_PATH, str(item_id))
if not os.path.exists(folder_path):
raise FileNotFoundError(f"文件夹不存在: {folder_path}")
if "?" in url:
url = url.split("?")[0]
os.makedirs(folder_path)
# raise FileNotFoundError(f"文件夹不存在: {folder_path}")
if "?" in original_url:
url = original_url.split("?")[0]
else:
url = original_url
logger.info(url)
file_path = os.path.join(folder_path, os.path.basename(url))
if not os.path.exists(file_path):
if file_path.endswith(".mp4"):
raise FileNotFoundError(f"文件不存在: {file_path}")
logger.info(f"{file_path} 文件不存在,开始下载")
img_bytes = requests.get(url, stream=True)
with open(file_path, 'wb') as f:
for chunk in img_bytes.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
if not file_path.endswith(".mp4"):
img_bytes = requests.get(url, stream=True)
with open(file_path, 'wb') as f:
for chunk in img_bytes.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return file_path
def upload_image_by_bytes(cookies, headers, proxies, image_path_list):
......@@ -126,15 +130,11 @@ async def uploadImageAndVideo(task: dict = None):
:param task:
:return:
"""
addr = task.get("proxies")["addr"]
port = task.get("proxies")["port"]
username = task.get("proxies")["username"]
password = task.get("proxies")["password"]
proxy_url = f"socks5h://{username}:{password}@{addr}:{port}"
proxy_url = task.get("proxies")
proxies = check_proxy(proxy_url)
cookies = task.get('cookie')
headers = task.get('headers')
item_id = task.get('id')
item_id = task.get('item_id')
skus = task.get('skus')
# 准备SKU图片上传
sku_image_list = []
......@@ -175,7 +175,6 @@ async def uploadImageAndVideo(task: dict = None):
run_in_executor(upload_image_by_bytes, cookies, headers, proxies, description_list),
upload_videos(task, item_id)
)
except Exception as e:
logger.error(f"上传过程中发生错误: {str(e)}")
return None
......@@ -186,10 +185,10 @@ async def uploadImageAndVideo(task: dict = None):
"images": image_dict,
"description": description_dict,
"video_list": video_dict,
"type": 2
# "type": 2
}
logger.info(json.dumps(callback_data))
await callback_task(callback_data)
await callback_task(callback_data, 2)
async def run_in_executor(func, *args):
"""在异步环境中运行同步函数"""
......@@ -221,6 +220,7 @@ async def upload_videos(task: dict, item_id: str):
async def upload_single_video(task: dict, local_path: any, original_url: str):
"""上传单个视频"""
task['file_path'] = local_path
task['video_url'] = original_url
md5_key = hashlib.md5(original_url.encode()).hexdigest()
try:
......
......@@ -5,6 +5,7 @@ import math
import os
import random
import threading
import time
import urllib.parse
import zlib
from concurrent.futures import ThreadPoolExecutor
......@@ -168,11 +169,16 @@ class Upload:
self.access_key_id = None
self.file_size = None
self.file_name = None
addr = task.get("proxies")["addr"]
port = task.get("proxies")["port"]
username = task.get("proxies")["username"]
password = task.get("proxies")["password"]
proxy_url = f"socks5h://{username}:{password}@{addr}:{port}"
if type(task.get("proxies")) == dict:
addr = task.get("proxies")["addr"]
port = task.get("proxies")["port"]
username = task.get("proxies")["username"]
password = task.get("proxies")["password"]
proxy_url = f"socks5h://{username}:{password}@{addr}:{port}"
elif type(task.get("proxies")) == str:
proxy_url = task.get("proxies")
else:
raise ValueError("代理格式错误")
self.proxies = check_proxy(proxy_url)
self.cookies = task.get('cookie')
self.headers = task.get('headers')
......@@ -451,19 +457,19 @@ def download_video(url: str, file_: str,headers: dict):
:param file_: 文件地址
:return:
"""
response = requests.get(url, verify=False, stream=True, headers=headers)
response = requests.get(url, stream=True, headers=headers)
with open(file_, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk: f.write(chunk)
# 判断文件是否下载完成
file_size = os.path.getsize(file_)
logger.info(f"文件大小: {file_size}")
if file_size > 10000 and not is_video_corrupted(file_):
logger.info(f"下载完成: {file_}")
else:
raise Exception(f"下载失败: {file_}")
def get_video_duration(filename):
"""
获取视频时长
......@@ -678,24 +684,20 @@ def prepare_video_file(task):
if not task.get("file_path"):
if not os.path.exists(VIDEO_PATH):
os.makedirs(VIDEO_PATH)
file_name = f"{task['file_name']}"
file_path = os.path.join(VIDEO_PATH, file_name)
if not os.path.exists(file_path):
logger.info(f"文件 {file_name} 不存在,开始下载")
download_video(task['video_url'], file_path, headers=task['headers'])
else:
logger.info(f"文件 {file_name} 已存在,跳过下载")
if is_video_corrupted(file_path):
logger.error("视频文件已损坏,正在重新下载")
download_video(task['video_url'], file_path, headers=task['headers'])
else:
file_path = task.get("file_path")
if not os.path.exists(file_path):
raise VideoError(f"视频文件 {file_path} 不存在")
if not os.path.exists(file_path):
logger.info(f"文件 {file_path} 不存在,开始下载")
download_video(task['video_url'], file_path, headers=task['headers'])
else:
logger.info(f"文件 {file_path} 已存在,跳过下载")
if is_video_corrupted(file_path):
raise VideoError(f"视频文件 {file_path} 已损坏")
logger.error("视频文件已损坏,正在重新下载")
download_video(task['video_url'], file_path, headers=task['headers'])
video_duration = get_video_duration(file_path)
if video_duration > 60:
logger.error("视频时长大于60秒,上传失败")
......
......@@ -215,16 +215,27 @@ async def get_task(params: dict = None):
"""
获取任务
"""
return requests.request('GET', f'{settings.DOMAIN}/api/collection/task', headers=DEFAULT_HEADER, params=params).json()
async def callback_task(data: dict):
publish_task = requests.get(f"{settings.DOMAIN}/api/collection/task/getPublishTask", headers=DEFAULT_HEADER).json()
# collection_task = requests.get(f'{settings.DOMAIN}/api/collection/task', headers=DEFAULT_HEADER, params=params).json()
# if collection_task.get('data', []):
# collection_task['data'].extend(publish_task_data)
# return collection_task
# else:
return publish_task
async def callback_task(data: dict, task_type:int):
"""
回调任务
:param data:
:param task_type:
:return:
"""
return requests.request('POST', f"{settings.DOMAIN}/api/collection/task", json=data, headers=DEFAULT_HEADER)
if task_type == 1:
return requests.post(f"{settings.DOMAIN}/api/collection/task", json=data, headers=DEFAULT_HEADER)
elif task_type == 2:
return requests.post(f"{settings.DOMAIN}/api/collection/task/callBackPublishTask", json=data)
else:
raise Exception("task_type error")
async def formatCallback(task: dict, result: dict) -> dict:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment