Commit 429bf138 by baiquan

添加AI生成视频方法

parent 80d958a4
import json
import time
from urllib.parse import urlencode
import execjs
from loguru import logger
from service.doudian_request import doudian_request
from utils.common import check_proxy
from utils.errors import ParamsError
def generate_a_bogus(params:str | dict,data: dict,ua: str, params_type: int=1):
......@@ -69,3 +72,95 @@ def create_global_promotion(params:str | dict,data: dict,headers: dict, proxy_ur
url = 'https://qianchuan.jinritemai.com/ad/api/creation/v1/ad/create?' + url_params
response = doudian_request("POST", url, proxies=proxies, json=data, headers=headers)
return response
# AI视频生成
def generate_video(task: dict):
addr = task.get("proxies")["addr"]
port = task.get("proxies")["port"]
username = task.get("proxies")["username"]
password = task.get("proxies")["password"]
proxy_url = f"socks5h://{username}:{password}@{addr}:{port}"
cookies = task.get('cookie')
headers = task.get('headers')
generate_video_data = task.get('generate_video_data')
if not generate_video_data:
raise ParamsError('缺少generate_video_data参数')
# generate_video_data = {
# 'product_name': "自粘地板贴家用瓷砖加厚耐磨防水地胶地垫",
# 'customized_images': [
# "https://p3-aio.ecombdimg.com/obj/ecom-shop-material/jpeg_m_9ce719a34608aff3b11827271a67da85_sx_99050_www810-1080",
# "https://p3-aio.ecombdimg.com/obj/ecom-shop-material/jpeg_m_4e88344c8a6746ab6bd79cebd34778a1_sx_187733_www1080-980",
# "https://p3-aio.ecombdimg.com/obj/ecom-shop-material/jpeg_m_995815fcd3378e7f48765b3ef3fc9bec_sx_119123_www810-1080",
# "https://p3-aio.ecombdimg.com/obj/ecom-shop-material/jpeg_m_b1fe80bb8228d713eabc880561120823_sx_53964_www810-1080",
# "https://p3-aio.ecombdimg.com/obj/ecom-shop-material/jpeg_m_cc78dc386c3359945173fc9cb57297c4_sx_137165_www810-1080"],
# 'product_industrys': ["住宅家具", "几类", "角几/边几", ""],
# }
params = {
'appid': '1',
'_bid': 'ffa_goods',
}
product_name = generate_video_data['product_name']
customized_images = generate_video_data['customized_images']
product_industrys = generate_video_data['product_industrys']
json_data = {
'product_id': '',
'optimize_strategy': '视频一键生成',
'img_format': 1,
'optimize_strategy_extra': {
'video_ratio': '1',
'product_name': product_name,
'customized_images': json.dumps(customized_images),
'product_industrys': json.dumps(product_industrys),
'customized_ai_powered_script': '[]',
},
'service_method': 'realtime',
'appid': 1,
'_bid': 'ffa_goods',
}
proxies = check_proxy(proxy_url)
if headers.get("User-Agent"):
ua = headers['User-Agent']
elif headers.get("user-agent"):
ua = headers['user-agent']
else:
raise Exception('User-Agent not found')
a_bogus = generate_a_bogus(params, json_data, ua)
params['a_bogus'] = a_bogus
logger.info("开始生成视频")
url = 'https://fxg.jinritemai.com/product/tproduct/material/imageTextVideo/submitImgOptimizeTask4PC'
response = doudian_request("POST", url, proxies, params, json=json_data, headers=headers, cookies=cookies)
task_id = response['data']['task_id']
logger.info(f"任务id --> {task_id}")
tool_source = response['data']['tool_source']
result = get_task_result(task_id, headers, tool_source, ua, proxies, cookies)
return result
def get_task_result(task_id: str, headers: dict, tool_source: str, ua, proxies, cookies):
params = {
'task_id': task_id,
'app_id': '',
'tool_source': tool_source,
'optimize_strategy': '视频一键生成',
'appid': '1',
}
a_bogus = generate_a_bogus(params, {}, ua)
params['a_bogus'] = a_bogus
url = 'https://fxg.jinritemai.com/product/tproduct/material/imageTextVideo/queryImgOptimizeTask4PC'
while True:
response = doudian_request("GET", url, proxies, params, headers=headers, cookies=cookies)
msg = response['data']['msg']
logger.info(f"{task_id} --> {msg}")
if response['data']['status'] == "执行中":
time.sleep(3)
elif response['data']['status'] == "执行成功":
vid = response['data']['optimized_video_infos'][0]['vid']
video_url = response['data']['optimized_video_infos'][0]['video_url']
result = {
"video_id": vid,
"video_url": video_url,
}
logger.success(f'视频生成成功-->{result}')
return result
else:
raise Exception(f'任务执行失败-->{msg}')
\ No newline at end of file
......@@ -9,8 +9,8 @@ from requests_toolbelt.multipart.encoder import MultipartEncoder
from config import settings
from service.doudian_request import doudian_request
from service.upload_video import upload_video_with_multithreading
from shop import callback_task
from utils.common import check_proxy
from utils.common import check_proxy, callback_task
def get_local_path(item_id, url):
folder_path = os.path.join(settings.BASE_PATH, str(item_id))
......@@ -158,8 +158,8 @@ async def upload_videos(task: dict, item_id: str):
# 合并结果
for md5_key, result in results:
if result and 'video_info' in result and 'MainPlayUrl' in result['video_info']:
video_dict[md5_key] = result['video_info']['MainPlayUrl']
if result:
video_dict[md5_key] = result['video_url']
return video_dict
......
......@@ -17,7 +17,9 @@ from loguru import logger
from tqdm import tqdm
from service.doudian_request import doudian_request
from service.doudian_service import generate_video
from utils.common import check_proxy
from utils.errors import VideoDurationError
HEADERS = {
"Content-Type": "application/json",
......@@ -625,8 +627,14 @@ def is_video_corrupted(file_path):
def upload_video_with_multithreading(task):
"""多线程视频上传主函数"""
# 准备视频文件(同原逻辑)
file_path = prepare_video_file(task)
try:
file_path = prepare_video_file(task)
except VideoDurationError:
# 视频时长超过60秒, 使用生成视频
video_info = generate_video(task)
return video_info
except Exception as e:
raise e
# 初始化上传
upload = Upload(task)
upload.file_size = os.path.getsize(file_path)
......@@ -657,10 +665,10 @@ def upload_video_with_multithreading(task):
upload.change_video_status()
logger.info("视频状态已更改")
video_info = upload.get_upload_video_info()
video_url = video_info['MainPlayUrl']
result = {
'video_id': upload.video_id,
'video_info': video_info,
'file_path': file_path,
'video_url': video_url,
}
return result
......@@ -691,7 +699,7 @@ def prepare_video_file(task):
video_duration = get_video_duration(file_path)
if video_duration > 60:
logger.error("视频时长大于60秒,上传失败")
raise Exception("视频时长大于60秒,上传失败")
raise VideoDurationError("视频时长大于60秒,上传失败")
is_valid, actual_ratio, best_match = check_video_aspect_ratio(file_path)
if is_valid:
logger.info(f"视频比例验证通过! ({best_match}, 实际比例: {actual_ratio:.4f})")
......
......@@ -24,6 +24,7 @@ SYSTEM_TASK_QUEUE_NUMBER = 10
SYSTEM_MIN_TASK_QUEUE_NUMBER = 5
SYSTEM_PROCESS_NUMBER = 5
# mysql
DB_HOST = "127.0.0.1"
DB_PORT = 3306
......@@ -32,4 +33,6 @@ DB_PASSWORD = "123456"
DB_NAME = "doudian"
DB_CHARSET = "utf8mb4"
BASE_PATH = "D://"
\ No newline at end of file
BASE_PATH = "D://"
DOMAIN = "http://159.75.92.198:8809"
Authorization_Code = "xxx"
\ No newline at end of file
import asyncio
import json
import sys
import time
import concurrent.futures
from curl_cffi import requests
from loguru import logger
from service.hub_ import closeBrowser, envList, exportCookie
from service.page_login import page_login
from service.upload_image_and_video import uploadImageAndVideo
HUB_DOMAIN = "http://127.0.0.1:6873"
DOMAIN = "http://159.75.92.198:8809"
# DOMAIN = "http://dou-order.test"
DEBUG = False
API_PORT = 9001
from utils.common import callback_task, formatCallback, get_task
import os
# 创建 logs 目录(如果不存在)
......@@ -23,36 +15,6 @@ os.makedirs("logs", exist_ok=True)
# 添加日志文件输出,按天滚动
logger.add("logs/shop.log", rotation="1 day", level="INFO", encoding="utf-8", backtrace=True, diagnose=True)
async def get_task(params: dict = None):
"""
获取任务
"""
return requests.request('GET', f'{DOMAIN}/api/collection/task', headers=DEFAULT_HEADER, params=params).json()
async def callback_task(data: dict):
"""
回调任务
:param data:
:return:
"""
return requests.request('POST', f"{DOMAIN}/api/collection/task", json=data, headers=DEFAULT_HEADER)
async def formatCallback(task: dict, result: dict) -> dict:
"""
格式化回调数据
:param task:
:param result:
:return dict:
"""
return {
'app_name': task.get('app_name', ''),
'admin_users_id': task.get('admin_users_id', ''),
'type': task.get('type', ''),
'result': result,
}
async def syncShop(task: dict = None):
"""i
......@@ -408,17 +370,7 @@ if __name__ == '__main__':
OUTER_MAX_WORKERS = 5
# 内层线程池大小(每个任务)
INNER_MAX_WORKERS = 3
argv = sys.argv
if len(argv) != 2:
logger.error("请传入参数")
sys.exit(0)
else:
CODE = argv[1]
DEFAULT_HEADER = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization-Code": CODE,
}
while True:
asyncio.run(run())
time.sleep(10)
......@@ -6,10 +6,14 @@ from urllib.parse import urlparse
import requests
from loguru import logger
from config import settings
from utils.errors import ProxyConnectionError
DEFAULT_HEADER = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization-Code": settings.Authorization_Code,
}
# 检查代理
def check_proxy(proxy_url):
proxies = {
......@@ -205,3 +209,35 @@ def encryptParams(account, password):
e[key] = encrypt(e[key])
e['mix_mode'] = s
return e
async def get_task(params: dict = None):
"""
获取任务
"""
return requests.request('GET', f'{settings.DOMAIN}/api/collection/task', headers=DEFAULT_HEADER, params=params).json()
async def callback_task(data: dict):
"""
回调任务
:param data:
:return:
"""
return requests.request('POST', f"{settings.DOMAIN}/api/collection/task", json=data, headers=DEFAULT_HEADER)
async def formatCallback(task: dict, result: dict) -> dict:
"""
格式化回调数据
:param task:
:param result:
:return dict:
"""
return {
'app_name': task.get('app_name', ''),
'admin_users_id': task.get('admin_users_id', ''),
'type': task.get('type', ''),
'result': result,
}
......@@ -33,4 +33,9 @@ class CookiesExpiredError(AppError):
class ProxyConnectionError(AppError):
"""代理连接失败异常"""
def __init__(self, msg='代理连接失败', data=None):
super().__init__(code=403, msg=msg, data=data)
\ No newline at end of file
super().__init__(code=403, msg=msg, data=data)
class VideoDurationError(AppError):
"""视频时长异常"""
def __init__(self, msg='视频时长大于60秒', data=None):
super().__init__(code=406, msg=msg, data=data)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment