Commit 86147129 by yexing

feat: 同步最新改动

parent 7e696e48
from typing import Self
ERR = object()
class Data:
@classmethod
......@@ -19,7 +20,7 @@ class Data:
return ((v, getattr(other, k)) for k, v in cls.items())
@classmethod
def inverse_dict(cls):
def inv_dict(cls):
return {v: k for k, v in cls.items()}
class PropProxy:
......
......@@ -2,18 +2,20 @@ import json
import os
import random
import re
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt, wait_random
from tenacity import retry, stop_after_attempt, wait_random, RetryError
import uvicorn
from bs4 import BeautifulSoup
from curl_cffi.requests import Session
from fastapi import FastAPI, HTTPException, Query
from fastapi import FastAPI, Query
from loguru import logger
from const import Postcode, Site
from const import ERR, Postcode, Site
from db import RedisSingleton
from spider.base_info import InfoSpider
from tool import Task
from proxy import ProxyManager
from conf import config
# from utils.admin_api import callback_cookie
......@@ -22,8 +24,16 @@ REDIS = RedisSingleton(redis_url=config["redis"]["url"])
app = FastAPI()
class ApiResponse(BaseModel):
code: int
message: str
data: list = []
errors: list = []
def get_headers():
user_agent = get_rand_ua()
# user_agent = get_rand_ua()
user_agent = generate_edge_user_agent()
return {
"authority": "www.amazon.com",
"accept": "text/html,*/*",
......@@ -139,29 +149,49 @@ def get_rand_ua():
return f"Mozilla/5.0 ({window}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36"
def generate_edge_user_agent():
"""
:return:
"""
version = random.randint(133, 135)
rand_number = random.randint(4000, 5000)
rand_number_ = random.randint(1, 99)
chrome_version = f"{version}.0.{rand_number}.{rand_number_}"
rand_number = random.randint(1, 100)
rand_number_ = random.randint(1, 99)
edge_version = f"{version}.0.{rand_number}.{rand_number_}"
# user_agent = f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{chrome_version} Safari/537.36 Edg/{edge_version}"
windows_version = random.choice(["11.0", "10.0"])
user_agent = f"Mozilla/5.0 (Windows NT {windows_version}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{chrome_version} Edg/{edge_version}"
return user_agent
def get_proxies():
proxy = "http://127.0.0.1:7890"
if IS_DEBUG:
proxy = "http://127.0.0.1:7890"
else:
proxy_manager = ProxyManager(REDIS)
proxy = proxy_manager.get_proxy()
proxy_manager.join_proxy(proxy)
return {
"https": proxy,
"http": proxy,
}
@retry(
stop=stop_after_attempt(20),
wait=wait_random(3, 6),
retry_error_callback=lambda *_: ...,
)
def run(zip_code):
@retry(stop=stop_after_attempt(20), wait=wait_random(3, 6), reraise=True)
def make_loca_cookie(zip_code, refresh: bool = False):
inst = Task(REDIS)
if inst.get_loca_cookie(site=Site.com, postcode=zip_code, only_local=True):
cookie = inst.get_loca_cookie(site=Site.com, postcode=zip_code, only_local=True)
if not refresh and cookie:
return
headers = get_headers()
proxies = get_proxies()
with Session() as s:
url = "https://www.amazon.com"
response = s.get(url, headers=headers, proxies=proxies)
open_url = "https://www.amazon.com/dp/B096NMKTQZ?th=1&psc=1"
with Session(impersonate="edge") as s:
response = s.get(open_url, headers=headers, proxies=proxies)
html = BeautifulSoup(response.text, "html.parser", from_encoding="utf-8")
captcha = html.find("input", id="captchacharacters")
if captcha:
......@@ -193,9 +223,9 @@ def run(zip_code):
"locationType": "LOCATION_INPUT",
"zipCode": zip_code,
"deviceType": "web",
"storeContext": "generic",
"pageType": "Gateway",
"actionSource": "glow",
"storeContext": "musical-instruments",
"pageType": "Detail",
"actionSource": "glow"
}
)
new_headers = {
......@@ -217,8 +247,11 @@ def run(zip_code):
if address_data.get("address", {}).get("zipCode", "") != zip_code:
raise Exception("邮编验证失败")
url = "https://www.amazon.com"
response = s.request("GET", url, headers=headers, proxies=proxies)
response = s.request("GET", "https://www.amazon.com/portal-migration/hz/glow/condo-refresh-html?triggerFeature=AddressList&deviceType=desktop&pageType=Detail&storeContext=musical-instruments&locker=%7B%7D", headers=headers, proxies=proxies)
response = s.request("GET", "https://www.amazon.com/portal-migration/hz/glow/get-location-label?storeContext=musical-instruments&pageType=Detail&actionSource=desktop-modal", headers=headers, proxies=proxies)
response = s.request("GET", open_url, headers=headers, proxies=proxies)
html = BeautifulSoup(response.text, "html.parser", from_encoding="utf-8")
data = html.find("span", id="glow-ingress-line2").text
if zip_code not in data:
......@@ -233,7 +266,7 @@ def run(zip_code):
data = {
"cookie": cookie,
"user_agent": headers["user-agent"],
"user-agent": headers["user-agent"],
}
inst.set_loca_cookie(data, site=Site.com, postcode=zip_code)
result = {
......@@ -245,18 +278,34 @@ def run(zip_code):
# logger.success(f"回调cookie: {json.dumps(callback_response)}")
@app.get("/query/info")
@app.get("/query/info", response_model=ApiResponse)
def query_info(
zip_code: str = Query(..., description="邮编"),
url: str = Query(..., description="URL地址"),
):
def _make():
try:
return InfoSpider().run({"url": url})
except RetryError:
return False
try:
setattr(Postcode, "com", zip_code)
run(zip_code)
return InfoSpider().run({"url": url})
setattr(Postcode, Site.com, zip_code)
make_loca_cookie(zip_code)
# client = REDIS.get_connection()
count = 0
while (
(not (data := _make()) or data == ERR) and count < 5
# and int(client.get("amazon:cookie-error")) > 10
):
make_loca_cookie(zip_code, refresh=True)
count += 1
if not data or data == ERR:
raise Exception("query error")
return ApiResponse(code=0, data=[data], message="Succeed")
except Exception as e:
logger.error(e)
raise HTTPException(status_code=500, detail="服务出错了")
return ApiResponse(code=400, message="Failure")
if __name__ == "__main__":
......
[project]
name = "amazon-mult-site-sync"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"babel>=2.17.0",
"bs4>=0.0.2",
"curl-cffi==0.10.0",
"fastapi>=0.115.14",
"jmespath>=1.0.1",
"loguru>=0.7.3",
"lxml>=6.0.0",
"pydantic>=2.11.7",
"python-dateutil>=2.9.0.post0",
"redis>=6.2.0",
"requests>=2.32.4",
"tenacity>=9.1.2",
"uvicorn>=0.35.0",
]
# 镜像设置
[tool.uv]
index-url = "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"
[dependency-groups]
dev = [
"httpx>=0.28.1",
"pytest>=9.0.2",
]
......@@ -4,14 +4,15 @@ import re
import os
import curl_cffi
import jmespath
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_random
from lxml import etree
from const import Postcode, PropProxy, Site
from const import ERR, Postcode, PropProxy, Site
from db import RedisSingleton
from proxy import ProxyManager
from tool import Fmt, Request, Task
from tool import Request, Task, ToolA
from conf import config
IS_DEBUG = os.environ.get("IS_DEBUG", False)
......@@ -88,14 +89,16 @@ class Tool:
data_json = re.findall("jQuery.parseJSON\('(.*)'\)", text)
data_json = data_json[0].replace("\\'", "'")
return json.loads(data_json)
@staticmethod
def clean_text(text):
def handle_text(text):
"""
通用文本清
文本处
"""
cleaned = re.sub(r'[\x00-\x1f\x7f-\x9f\u2000-\u200f]', '', text)
return re.sub(r'[:\s]+', ' ', cleaned).strip()
cleaned = re.sub(r"[\x00-\x1f\x7f-\x9f\u2000-\u200f:]", "", text)
cleaned = re.sub(r"L\s?x\s?W\s?x\s?H", "", cleaned)
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned.strip().lower()
class ProxyMixin:
......@@ -160,79 +163,103 @@ class InfoSpider(ProxyMixin):
def format_content(self, text):
html = etree.HTML(text)
free_delivery = html.xpath(
'//div[@id="mir-layout-DELIVERY_BLOCK-slot-PRIMARY_DELIVERY_MESSAGE_LARGE"]/span/span/text()'
)
free_delivery = ToolA.get_free_delivery(html, True)
ths: list[etree._Element] = html.xpath(
'//*[@id="productDetails_detailBullets_sections1"]/tr/th'
'//*[@class="a-keyvalue prodDetTable"]/tr/th'
)
span: list[etree._Element] = html.xpath(
'//*[@id="detailBullets_feature_div"]/ul/li/span/span'
)
product_dimensions, item_weight = "", ""
if ths:
tds: list[etree._Element] = html.xpath(
'//*[@id="productDetails_detailBullets_sections1"]/tr/td'
'//*[@class="a-keyvalue prodDetTable"]/tr/td'
)
detail = dict(
map(Tool.handle_text, (th.text, td.text)) for th, td in zip(ths, tds)
)
detail = {th.text.strip(): td.text.strip() for th, td in zip(ths, tds)}
product_dimensions = detail.get("Product Dimensions", "")
item_weight = detail.get("Item Weight", "")
elif span:
detail = dict(
map(Tool.clean_text, (span[i].text.strip(), span[i + 1].text.strip()))
for i in range(0, len(span), 2)
map(Tool.handle_text, (span[i].text, span[i + 1].text))
for i in range(0, len(span) - 1, 2)
)
package_dimensions = detail.get("Package Dimensions", "").split("; ")
if len(package_dimensions) == 2:
product_dimensions, item_weight = package_dimensions
free_delivery = Fmt.parse_date(free_delivery[0]) if len(free_delivery) else ""
else:
detail = {}
item_weight = detail.get("item weight", "")
package_weight = detail.get("package weight", "")
item_length = detail.get("item length", "")
blade_length = detail.get("blade length", "")
item_width = detail.get("item width", "")
package_dimensions = jmespath.search(
'"package dimensions" || "item package dimensions" || ``', detail
)
product_dimensions = jmespath.search(
'"product dimensions" || "item dimensions" || dimensions || ``', detail
)
return {
"free_delivery": free_delivery,
"product_dimensions": product_dimensions,
"item_weight": item_weight,
"product_dimensions": product_dimensions,
"package_weight": package_weight,
"package_dimensions": package_dimensions,
"item_length": item_length,
"blade_length": blade_length,
"item_width": item_width,
}
@retry(
stop=stop_after_attempt(20),
stop=stop_after_attempt(5),
wait=wait_random(3, 6),
retry_error_callback=lambda *_: ...,
)
def run(self, task: dict):
url = task.get("url", "")
asin = Tool.get_url_asin(url)
url = f"https://www.amazon.{self.site}/dp/" + asin + "?th=1&psc=1"
_proxy = self.get_proxy()
if IS_DEBUG:
logger.debug(url)
if _proxy is None:
raise ValueError("没有代理")
try:
headers = self.task_manager.get_loca_cookie(
site=self.site, postcode=self.postcode.value
)
text = Request.request_html(
url,
_proxy["proxy"],
**{
"headers": headers,
"timeout": self.request_timeout,
"postcode": self.postcode.value,
},
)
response = self.format_content(text)
logger.debug(response)
return response
except curl_cffi.curl.CurlError:
logger.error(f"请求超时: {url}")
raise
except Exception as e:
if str(e) == "出现验证码":
self.delete_proxy(_proxy["temp_proxy"])
if str(e) == "采集邮编错误":
self.cookie_error()
logger.error(f"请求异常: {e} - {url}")
raise
finally:
self.join_proxy(_proxy["temp_proxy"])
asin = Tool.get_url_asin(task.get("url", ""))
pf = f"https://www.amazon.{self.site}/dp/{asin}"
urls = [pf + "?th=1", pf + "?th=1&psc=1"]
for i, url in enumerate(urls):
_proxy = self.get_proxy()
if _proxy is None:
raise ValueError("没有代理")
try:
headers = self.task_manager.get_loca_cookie(
site=self.site, postcode=self.postcode.value
)
default_headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"cache-control": "no-cache",
}
headers = {**default_headers, **headers}
print(json.dumps(headers))
text = Request.request_html(
url,
_proxy["proxy"],
**{
"headers": headers,
"timeout": self.request_timeout,
"postcode": self.postcode.value,
},
)
if IS_DEBUG:
from pathlib import Path
p1 = Path("test/1.html")
p1.write_text(text, encoding="utf8")
response = self.format_content(text)
logger.debug(f"{response=}, {url=}")
if response["free_delivery"] or i == len(urls) - 1:
return response
except curl_cffi.curl.CurlError:
logger.error(f"请求超时: {url}")
raise
except Exception as e:
logger.error(f"请求异常: {repr(e)} - {url}")
if str(e) == "出现验证码":
self.delete_proxy(_proxy["temp_proxy"])
if "采集邮编错误" in str(e):
self.cookie_error()
self.delete_proxy(_proxy["temp_proxy"]) # 换代理
return ERR
raise
finally:
self.join_proxy(_proxy["temp_proxy"])
......@@ -137,6 +137,7 @@ class Task:
if not cookie:
cookie = self.get_cookie(site)
assert cookie
if isinstance(cookie, dict):
return cookie
return json.loads(cookie)
......@@ -195,20 +196,21 @@ class Request:
if response.status_code == 200 and is_check_postal:
postal = html.xpath('//span[@id="glow-ingress-line2"]/text()')
print(postal)
postal = postal[0].strip() if len(postal) else ""
if not postal or postcode not in postal:
raise Exception("采集邮编错误")
is_product_detail = kwargs.get("is_product_detail", None)
is_link_error = html.xpath('//div[@id="g"]/a/@href')
title = Tool.get_title(html)
title = ToolA.get_title(html)
if len(is_link_error) == 0 and len(title) == 0 and is_product_detail:
raise Exception("采集内容有误")
return text
class Tool:
class ToolA:
@staticmethod
def get_impersonate():
"""
......@@ -236,6 +238,17 @@ class Tool:
if len(title) == 0:
title = html.xpath('//span[@id="bond-title-desktop"]/text()')
return title
@staticmethod
def get_free_delivery(html, is_prime: bool = False):
p1 = '//div[@id="mir-layout-DELIVERY_BLOCK"]/div[1]/span/span/text()'
p2 = '//div[@id="mir-layout-DELIVERY_BLOCK"]/div[2]/span/span/text()'
lst = html.xpath(p2) if is_prime else None
if lst and lst[0].lower() == "prime members":
del lst[0]
if not lst:
lst = html.xpath(p1)
return Fmt.parse_date(lst[0]) if lst else ""
class Proxy:
......@@ -286,25 +299,28 @@ class Fmt:
:param lang:
:return:
"""
raw = string
if not string:
return ""
elif "Today" in string:
dt = datetime.now()
elif "Tomorrow" in string:
dt = datetime.now() + timedelta(days=1)
elif "Overnight" in string:
dt = datetime.now()
else:
patt1 = re.compile(r"([\w\s]+)-([\w\s]+)")
patt2 = re.compile(r"(.*?)(\d+\D*-)(\D*\d+.*)")
patt1 = re.compile(r"(.*?)(\d+\D*-\s*)(\d+.*)")
patt2 = re.compile(r"[\w\s.]+-([\w\s.]+)")
if patt1.match(string):
string = patt1.match(string).group(2)
string = patt1.sub(r"\1\3", string)
elif patt2.match(string):
string = patt2.sub(r"\1\3", string)
string = patt2.match(string).group(1)
dt = parser.parse(string, parserinfo=Fmt._get_parserinfo(lang), fuzzy=True)
if dt.month < datetime.now().month:
dt = dt + relativedelta(years=1)
date = dt.strftime("%Y-%m-%d")
logger.debug(f"{string} -> {date}")
logger.debug(f"{raw} -> {string} -> {date}")
return date
@staticmethod
......@@ -315,10 +331,13 @@ class Fmt:
:param stock_status: _description_, defaults to StockStatus.com
:return: _description_
"""
raw = string
if not string:
return ""
string = string.strip().lower()
return "In Stock" if stock_status.lower() in string else "Only"
status = "In Stock" if stock_status.lower() == string else "Only"
logger.debug(f"{raw} -> {status}")
return status
@staticmethod
def parse_price(string: str):
......
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment