Commit a5f07d1d by baiquan

refactor(upload): 重构上传服务并优化图片处理逻辑

- 新增 process_image_width_height 函数,用于处理图片宽高
-重构 get_local_path 函数,使用数据库获取文件路径
- 优化图片上传流程,增加宽高处理
parent 08c75f15
......@@ -4,6 +4,7 @@ import json
import os
import time
import pymysql
import requests
from PIL import Image
from loguru import logger
......@@ -41,6 +42,40 @@ def convert_rect_to_square(image_path):
return output_path
return image_path
def process_image_width_height(image_path):
"""处理图片:当宽高小于600px时,在透明背景上居中显示,背景尺寸取max(600, width)和max(600, height)"""
with Image.open(image_path) as img:
width, height = img.size
# 如果图片尺寸满足最小600px要求,直接返回
if width >= 600 and height >= 600:
return img
# 计算新背景尺寸(取max(600, width)和max(600, height))
new_width = max(width, 600)
new_height = max(height, 600)
# 创建透明背景
background = Image.new("RGBA", (new_width, new_height), (0, 0, 0, 0))
# 确保原始图片支持透明度
if img.mode != "RGBA":
img = img.convert("RGBA")
# 居中位置计算
x = (new_width - width) // 2
y = (new_height - height) // 2
# 将图片粘贴到透明背景中心
background.paste(img, (x, y), img)
output_path = os.path.join(os.path.dirname(image_path), f"width_height_{os.path.basename(image_path)}")
# 保存结果
background.save(output_path, "PNG")
logger.info(f"{output_path} --> 图片宽高转换完成")
return output_path
def check_image_size(image_path):
file_size = os.path.getsize(image_path)
max_bytes = 5 * 1024 * 1024
......@@ -53,34 +88,55 @@ def check_image_width_height(image_path, image_type: int = 1):
width, height = original.size
if image_type == 1:
if width < 600 or height < 600:
raise Exception(f"{image_path} --> 图片尺寸小于600*600")
logger.info(f"{image_path} --> 图片尺寸小于600*600")
return process_image_width_height(image_path)
elif image_type == 2:
if width > 5000 or height > 20000:
raise Exception(f"{image_path} --> 图片尺寸大于5000*20000")
else:
raise Exception(f"{image_path} --> 图片类型错误")
return image_path
def get_local_path(item_id, original_url):
folder_path = os.path.join(settings.BASE_PATH, str(item_id))
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# raise FileNotFoundError(f"文件夹不存在: {folder_path}")
if "?" in original_url:
url = original_url.split("?")[0]
def get_local_path(id, url):
conn = pymysql.connect(host='127.0.0.1', port=3306, password='123456', charset='utf8mb4', database='taobao-order',
user='root')
cs = conn.cursor()
zui = url.split('/')[-1].split('?')[0]
sql = """SELECT * FROM `taobao_images` WHERE `id` = %s"""
cs.execute(sql, (id,))
exists = cs.fetchone() # 获取单条结果
if exists:
file_path=fr'{exists[1]}:\images\{id}\{zui}'
if not os.path.exists(file_path):
raise FileNotFoundError(f"该文件不存在 --> {file_path}")
logger.info(f"文件存在 --> {file_path}")
return file_path
else:
url = original_url
logger.info(url)
file_path = os.path.join(folder_path, os.path.basename(url))
if not os.path.exists(file_path):
logger.info(f"{file_path} 文件不存在,开始下载")
if not file_path.endswith(".mp4"):
img_bytes = requests.get(url, stream=True)
with open(file_path, 'wb') as f:
for chunk in img_bytes.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return file_path
error_msg = f"数据库中不存在该资源 --> {id} {url}"
logger.info(error_msg)
raise FileNotFoundError(error_msg)
# def get_local_path(item_id, original_url):
# folder_path = os.path.join(settings.BASE_PATH, str(item_id))
# if not os.path.exists(folder_path):
# os.makedirs(folder_path)
# # raise FileNotFoundError(f"文件夹不存在: {folder_path}")
# if "?" in original_url:
# url = original_url.split("?")[0]
# else:
# url = original_url
# logger.info(url)
# file_path = os.path.join(folder_path, os.path.basename(url))
# if not os.path.exists(file_path):
# logger.info(f"{file_path} 文件不存在,开始下载")
# if not file_path.endswith(".mp4"):
# img_bytes = requests.get(url, stream=True)
# with open(file_path, 'wb') as f:
# for chunk in img_bytes.iter_content(chunk_size=8192):
# if chunk:
# f.write(chunk)
# return file_path
def upload_image_by_bytes(cookies, headers, proxies, image_path_list):
result_dict = {}
......@@ -141,34 +197,34 @@ async def uploadImageAndVideo(task: dict = None):
error_msg = ""
# 准备SKU图片上传
sku_image_list = []
for sku in skus:
sku_id = sku.get('sku_id')
img_url = sku.get('image')
if img_url:
# md5_key = hashlib.md5(img_url.encode()).hexdigest()
local_path = get_local_path(item_id, img_url)
check_image_size(local_path)
local_path = convert_rect_to_square(local_path)
sku_image_list.append({sku_id: local_path})
# 准备主图上传
image_list = []
for url in task.get('images', []):
md5_key = hashlib.md5(url.encode()).hexdigest()
local_path = get_local_path(item_id, url)
check_image_size(local_path)
check_image_width_height(local_path, 1)
image_list.append({md5_key: local_path})
# 准备详情图上传
description_list = []
for url in task.get('description', []):
md5_key = hashlib.md5(url.encode()).hexdigest()
local_path = get_local_path(item_id, url)
check_image_size(local_path)
check_image_width_height(local_path, 2)
description_list.append({md5_key: local_path})
try:
for sku in skus:
sku_id = sku.get('sku_id')
img_url = sku.get('image')
if img_url:
# md5_key = hashlib.md5(img_url.encode()).hexdigest()
local_path = get_local_path(item_id, img_url)
check_image_size(local_path)
local_path = convert_rect_to_square(local_path)
sku_image_list.append({sku_id: local_path})
# 准备主图上传
image_list = []
for url in task.get('images', []):
md5_key = hashlib.md5(url.encode()).hexdigest()
local_path = get_local_path(item_id, url)
check_image_size(local_path)
local_path = check_image_width_height(local_path, 1)
image_list.append({md5_key: local_path})
# 准备详情图上传
description_list = []
for url in task.get('description', []):
md5_key = hashlib.md5(url.encode()).hexdigest()
local_path = get_local_path(item_id, url)
check_image_size(local_path)
local_path = check_image_width_height(local_path, 2)
description_list.append({md5_key: local_path})
# 设置超时时间为60秒
timeout_seconds = 65
start_time = time.time()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment