Commit 48f21530 by yexing

优化代码

parent e04e6148
......@@ -9,36 +9,36 @@ from datetime import datetime, time
from typing import Optional, List
import redis.asyncio as redis
from requests.exceptions import ProxyError
from bs4 import BeautifulSoup
from curl_cffi.requests import AsyncSession, Response
from curl_cffi.requests.exceptions import Timeout, ConnectionError, ProxyError
from fake_useragent import UserAgent
from loguru import logger
from tenacity import retry, stop_after_attempt
from tenacity import retry, stop_after_attempt, wait_random
DOMAIN = "https://20tools.net"
UA = UserAgent(platforms=['desktop'])
UA = UserAgent(platforms=["desktop"])
MAX_RETRIES = 20
TASK_DURATION = 600
IS_DEBUG = int(os.environ.get("IS_DEBUG", False))
LOG_PATH = f"./log/{Path(__file__).stem}.log"
if not IS_DEBUG:
if not IS_DEBUG:
logger.remove()
logger.add(LOG_PATH, level="INFO", rotation="1 day", retention="1 months")
class StrCounter:
def __init__(self):
self.counts = Counter()
self.heap: List[str] = []
def add(self, s: str):
self.counts[s] += 1
heapq.heappush(self.heap, (-self.counts[s], s))
def pop(self):
_, s = heapq.heappop(self.heap)
self.counts.pop(s)
def max(self):
while self.heap:
count, s = heapq.heappop(self.heap)
......@@ -46,10 +46,12 @@ class StrCounter:
if cur == -count:
heapq.heappush(self.heap, (count, s))
return cur, s
return 0, ''
return 0, ""
class AsyncRedisClient:
_instance: Optional[redis.Redis] = None
@staticmethod
def get_redis() -> redis.Redis:
"""获取 Redis 客户端(异步)
......@@ -58,19 +60,22 @@ class AsyncRedisClient:
"""
if AsyncRedisClient._instance is None:
AsyncRedisClient._instance = redis.Redis(
host="localhost", port=6380, db=0,
max_connections=10 # 设置连接池大小
host="localhost",
port=int(os.getenv("REDIS_HOST", 6379)),
password=os.getenv("REDIS_PASSWORD", "12345678"),
db=0,
max_connections=10, # 设置连接池大小
)
return AsyncRedisClient._instance
@staticmethod
async def close_redis():
"""关闭 Redis 客户端(异步)
"""
"""关闭 Redis 客户端(异步)"""
if AsyncRedisClient._instance is not None:
await AsyncRedisClient._instance.aclose()
AsyncRedisClient._instance = None
class Tool:
@staticmethod
async def get_proxy_ips() -> set:
......@@ -80,7 +85,7 @@ class Tool:
"""
r = AsyncRedisClient().get_redis()
return await r.smembers("walmart_ips")
@staticmethod
def replace_none(obj: dict) -> dict:
"""替换字典里的 None 值
......@@ -88,7 +93,7 @@ class Tool:
:param obj: 字典对象
:return: 替换后的字典对象
"""
return json.loads(json.dumps(obj).replace('null', '""'))
return json.loads(json.dumps(obj).replace("null", '""'))
@staticmethod
def get_walmart_headers():
......@@ -98,12 +103,12 @@ class Tool:
:return:
"""
return {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'referer': 'https://www.walmart.com/',
'user-agent': UA.edge,
"user-agent": UA.chrome,
# "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
# "referer": "https://www.walmart.com/",
# 'Cookie': 'xptwj=uz:37e0877dc804c3221718:bdVasz6/73NdttPtcK27fPToos/4oZxy1UVlykJE0nkfLqVhmrKHI452MWIshP06rvlg6Oo4i/CAN+vCMS/BDqQPJtb1jF2UpHbJHuf+N3jupylUCiRaFrAtkQyytLu9SHMeQtQzWLBWK0mgSKk5GRqrGp86eHJ+TbhU//sz5ypzHMVRNtnq; ak_bmsc=2773E11742238181A0BCDEF70472B0E8~000000000000000000000000000000~YAAQjPw7F8Q79RiRAQAAJI6FIRhEoerjT5Fn46CibnTHiZ8nQJv6pLxGhTxu+OWA2qwUlBGLclYLJGQXToJ+BTKfwzjwI5+ud07a9A2L57hKMVbIX2vAo4ZGXA9p0BWAKKn/SbV4VHTnZa/i+pIMZB0ylq0shc7noTTr/tsto5DDd/FHf5vFFICqtEujI+2AckaTNJGYW8PPy9VW6DqXCNpVcgo3qVSntqYOT1bhJLjdYtWmspJGBhEFH5vRQdC7IdN3VqC3BxJLZWzVTaQsmsJYN2Pem7MKiHyk8/AgjKv9ZRs57VOCn/YXSPICVuP9SNUGIJcxZG8Le0VS+L4XcgyY3ngnOx8XBn8MNzpceQ0rKVCI44zb1SD11n6ympD4JV12cwN8L1JFpPs=; vtc=UcbKnqHGuhRWhgfWtgcr70; bstc=UcbKnqHGuhRWhgfWtgcr70; pxcts=85e47474-52ff-11ef-8dba-4f0e258ac181; _pxvid=85e46481-52ff-11ef-8dba-e4bc4df1a16b; adblocked=true; hasACID=true; assortmentStoreId=3081; _shcc=US; _intlbu=false; hasLocData=1; abqme=true; mobileweb=0; xpth=x-o-mart%2BB2C~x-o-mverified%2Bfalse; xpa=54G-6|CoEEB|D2oRZ|ELwUx|IuElO|McEea|MoRkL|NbX17|O8sIU|OFImx|Ocfr2|SqH-y|VyWly|XIItK|eo_el|nzZmL|rd3k-|zf8aF; exp-ck=54G-61D2oRZ1ELwUx1IuElO1NbX172O8sIU1OFImx1SqH-y2VyWly1XIItK2eo_el3nzZmL1zf8aF2; _pxhd=c8185cb38f153869ee089d6ab969bf1ed0ba0f4d4e66cd414b5dfb0daa85c913:85e46481-52ff-11ef-8dba-e4bc4df1a16b; bm_mi=ABB90B88348B58A9787BACA8B2B84DC5~YAAQjPw7F7w99RiRAQAA7KGFIRgasMlRZloJg1t00D254khXjSN/IrLyrciUo2TiMd/5dzEdpQ0rZdLkmWbcqhDvW4LcpJsY0/ViOAItAsERoIpacm5TGpo4+dliNw8JD8aa2peQ5nWBF46y0YMbmPatpEzPfi+SasMjQmt+oWQMr2Q9I3p9CBFvXsmAazCwcGDmXNtTShQbyQ9Gfq93Zgc1eh3WXmhtsDw7hanPmZF2kqaqIL1bBE46OKpVQzJKpiBZVtluHYILY+4LsIaKmwxNJmW1gbqIDx8Sbm4anTBTryfr26L/s3IA5mQ3yyk=~1; _px3=27f094085ac92f8a53a7507dbb323f50efb95f173b554348e0b72a5732857d78:2VRKXx+P0wIrkvwIM7+Xtfysy6oYDVs6V9uhgK8m88W6Ck43XPZSkDLlnFReenWMPrQ3MmpViErhyjVaXANA0A==:1000:5ZwO3UHAT/3uI8KmWYckrGicT4zhb/RLBnKTB2fZu7NK2BVIs9Tp4YrQEPmeQLr27F/Csvs7uj4SuQMN8cPuZyDda7XwJIqyx7V/BlbxwhefKls21slpn9Hkiz0U44U2DITgh0p/sfol2JVGAEXwS66TjQY9DEa3M2GGuD2Xf4+3KT5MAymWIMYp1w5P3Rqtv0KcYxURCTMZDW2B3Ol9/sKFAOeEgEWRfvTh0NaYVLI=; _astc=52d130b133cbd1b501460d9fdae93a97; xptc=_m%2B9~assortmentStoreId%2B3081; xpm=1%2B1722844296%2BUcbKnqHGuhRWhgfWtgcr70~%2B0; _pxde=c92ceb7d7d808ccee6d120ac60cabdac3b14ba9f42bfb2ff4ed5e6ef8f8a7396:eyJ0aW1lc3RhbXAiOjE3MjI4NDQyOTkyMjN9; xptwg=2769187247:ADDF1B60AE2118:1B17408:73DC67D1:89F8600E:2CEC42A1:; TS012768cf=01d032572a9131c004c984f1591f1050b2bc64767650396a370f20a1d0dcb0c458b394f0f12ffbd85b8ab44153a1cbf2c143166c54; TS01a90220=01d032572a9131c004c984f1591f1050b2bc64767650396a370f20a1d0dcb0c458b394f0f12ffbd85b8ab44153a1cbf2c143166c54; TS2a5e0c5c027=08edc6f644ab20005250728372d83aee067d8ef4429ed38ad3f72422cd7beb712284fa2bb6dcc53008e648c99a113000bbea3d56aaf743f8797d1fd537dfebae66e076aea8557039e6abbfe3d370af617c79b48e084bdcd637ffb8a8b7b06568; QMReplaySample=true; io_id=2b1e23f1-a177-4e38-86da-561e276b6abf; TS016ef4c8=018f75cfbcd4def242c1bbe08d5578972d0f66b599a484d002e1540db87e4ac90c4800be2ab90e9078fff48b5e8c5739eb3d440c3b; TS01f89308=018f75cfbcd4def242c1bbe08d5578972d0f66b599a484d002e1540db87e4ac90c4800be2ab90e9078fff48b5e8c5739eb3d440c3b; TS8cb5a80e027=08bd2f8669ab2000009af9c8550cbc249ae938bfdca0492f8d384c6808c0e90a144e4024b84fbf37082893210e113000ad32e6b74f355e50fc204aad58e20722d6ed74efd203ad1c6a356b2d93d18f547cc29e00ec15f9e4c59e73bb2f5fc352; bm_sv=60519E529ABF4EFE97D2B63408DD5BD1~YAAQjPw7F35D9RiRAQAAANaFIRionbimzr8LgiM2GAxwy+I6Bu2U7faKmNM03jfRJ1ukw3hFQzT+obDLwlGwWa4HEiO9wHosev0vkl9j46QR9DoFq+6/MAGwpf9A8wuMswRgYpSFSZvyAm8uCG9mGPzhuuN5sOmxMflboFyOm2+5jFgcDmBA3WzZRPhRRy1M0xYfthXmO5D7IppDKw8+Zbzj7sG6Wdg5pUBb5XzzWaDNDswJnHdONYEd7O7hOGbyIw==~1'
}
@staticmethod
def get_impersonate():
"""
......@@ -112,10 +117,23 @@ class Tool:
:return:
"""
impersonates = [
"edge99",
"edge101",
# "safari15_3",
# "safari15_5",
# Chrome
"chrome99",
"chrome100",
"chrome101",
"chrome104",
"chrome107",
"chrome110",
"chrome116",
"chrome119",
"chrome120",
"chrome123",
"chrome124",
"chrome131",
"chrome133a",
"chrome136",
"chrome99_android",
"chrome131_android",
]
return random.choice(impersonates)
......@@ -129,9 +147,11 @@ class Tool:
"""
# with open(f'./log/data.html', 'w') as f:
# json.dump(text, f)
soup = BeautifulSoup(text, 'html.parser')
soup = BeautifulSoup(text, "html.parser")
# 找到包含JSON数据的script标签
script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'})
script_tag = soup.find(
"script", {"id": "__NEXT_DATA__", "type": "application/json"}
)
# 找到评分人数
# a_tag = soup.find('a', {"link-identifier": "reviewsLink"})
# ratings = int(a_tag.string.rstrip(' ratings').replace(',', '')) if a_tag else 0
......@@ -140,7 +160,8 @@ class Tool:
json_data = json.loads(script_tag.string)
# json_data["$ratings"] = ratings
return json_data
@staticmethod
async def upload(s: AsyncSession, data: str):
url = "https://walmart.meinuosha.com/index.php/index/index/SetIteminformationitemId?accessvalue=Walmart2025PY0307"
headers = {"content-type": "application/json"}
......@@ -148,8 +169,7 @@ class Tool:
logger.info(resp.text)
@staticmethod
@retry(stop=stop_after_attempt(3))
async def get_html(s: AsyncSession, url, header, proxies):
async def get_html(s: AsyncSession, url, proxy_ip):
"""
获取HTML内容
......@@ -159,23 +179,28 @@ class Tool:
:param proxies:
:return:
"""
content = ""
s.headers.clear()
s.cookies.clear()
s.proxies.clear()
proxies = {"http": f"http://{proxy_ip}", "https": f"http://{proxy_ip}"}
walmart_headers = Tool.get_walmart_headers()
try:
s.headers.clear()
s.cookies.clear()
s.proxies.clear()
walmart_headers = Tool.get_walmart_headers()
# impersonate = Tool.get_impersonate()
# response = await s.get(url, proxies=proxies, headers=walmart_headers, timeout=10, data={}, impersonate=impersonate)
response = await s.get(url, proxies=proxies, headers=walmart_headers, timeout=10, data={})
content = response.text
# logger.debug(content)
except Exception as e:
logger.error(f"获取HTML失败: {url} {e}")
return content
impersonate = Tool.get_impersonate()
response = await s.get(
url,
proxies=proxies,
headers=walmart_headers,
timeout=10,
impersonate=impersonate,
)
return response.text
except (Timeout, ConnectionError):
raise Exception(f"连接失败: {proxy_ip}")
except ProxyError:
raise Exception(f"代理失败: {proxy_ip}")
@staticmethod
@retry(stop=stop_after_attempt(3))
@retry(stop=stop_after_attempt(3), wait=wait_random(1, 2), reraise=True)
async def get_tasks(s: AsyncSession) -> list:
"""获取任务
......@@ -189,10 +214,11 @@ class Tool:
raise Exception("获取任务失败")
logger.info(resp.text)
data: dict = resp.json()
if data["status"].lower() == 'ok':
if data["status"].lower() == "ok":
return [it["itemId"] for it in data.get("shop_items", [{}])]
return []
class Goods:
task_name = "沃尔玛商品"
......@@ -205,90 +231,102 @@ class Goods:
"""
data_dict = await Tool.get_html_to_json(content)
try:
data: dict = data_dict['props']['pageProps']['initialData']['data']
data: dict = data_dict["props"]["pageProps"]["initialData"]["data"]
# ratings = data_dict["$ratings"]
except: # noqa: E722
raise ProxyError("获取数据异常")
raise Exception("获取数据异常")
product: dict = data.get('product', {})
product: dict = data.get("product", {})
if product is None:
return
return
idml: dict = data.get("idml") or {}
reviews: dict = data.get("reviews") or {}
item_id = product.get("usItemId") or ''
item_id = product.get("usItemId") or ""
if IS_DEBUG:
# with open(f'./log/item_{item_id}.json', 'w') as f:
# json.dump(data, f)
pass
category = [
(it.get("name") or '').replace("&", "&")
(it.get("name") or "").replace("&", "&")
for it in ((product.get("category") or {}).get("path") or [{}])
]
crossed_price = ((product.get("priceInfo") or {}).get("wasPrice") or {}).get("price") or ''
main_image = ((product.get("imageInfo") or {}).get("allImages") or [{}])[0].get("url") or ''
discount = (product.get("promoDiscount") or {}).get("discount") or ''
pro_seller = "Pro seller" if any(
it.get("type") == "PRO_SELLER"
for it in product.get("trustBadges") or [{}]
) else ''
manufacturer = next(
filter(
lambda x: x.get("name") == "Manufacturer",
idml.get("specifications", [{}])
),
{}).get("value") or ''
crossed_price = ((product.get("priceInfo") or {}).get("wasPrice") or {}).get(
"price"
) or ""
main_image = ((product.get("imageInfo") or {}).get("allImages") or [{}])[0].get(
"url"
) or ""
discount = (product.get("promoDiscount") or {}).get("discount") or ""
pro_seller = (
"Pro seller"
if any(
it.get("type") == "PRO_SELLER"
for it in product.get("trustBadges") or [{}]
)
else ""
)
manufacturer = (
next(
filter(
lambda x: x.get("name") == "Manufacturer",
idml.get("specifications", [{}]),
),
{},
).get("value")
or ""
)
return {
"itemId": item_id,
"product_name": product.get('name') or '',
"product_name": product.get("name") or "",
"image": main_image,
"customerRating": reviews.get("roundedAverageOverallRating") or '',
"ratingCount": reviews.get("totalReviewCount") or '',
"price": ((product.get("priceInfo") or {}).get("currentPrice") or {}).get("price") or 0,
"customerRating": reviews.get("roundedAverageOverallRating") or "",
"ratingCount": reviews.get("totalReviewCount") or "",
"price": ((product.get("priceInfo") or {}).get("currentPrice") or {}).get(
"price"
)
or 0,
"Crossedprice": crossed_price,
"Commodityclassification": category,
'brand_name': product.get('brand') or '',
"Shoppingcartstore": product.get("sellerDisplayName") or '',
"brand_name": product.get("brand") or "",
"Shoppingcartstore": product.get("sellerDisplayName") or "",
"manufacturer": manufacturer,
"Pro_seller": pro_seller,
"Pro_seller": pro_seller,
"discount": discount,
}
async def subtask(self, s: AsyncSession, task_ip: str, proxy_ip: bytes):
async def subtask(self, s: AsyncSession, task_ip: str, proxy_ip):
"""子任务
:param s:
:param task_ip:
:param proxy_ip:
:return:
:param s:
:param task_ip:
:param proxy_ip:
:return:
"""
try:
proxy_ip = proxy_ip.decode()
proxies = {
"http": f"http://{proxy_ip}",
"https": f"http://{proxy_ip}"
}
if isinstance(proxy_ip, bytes):
proxy_ip = proxy_ip.decode()
url = f"https://www.walmart.com/ip/{task_ip}"
walmart_headers = Tool.get_walmart_headers()
content = await Tool.get_html(s, url, walmart_headers, proxies)
content = await Tool.get_html(s, url, proxy_ip)
if "Robot or human?" in content:
raise Exception(f"人机验证: {proxy_ip}")
data = await self.format_(content)
logger.debug(f"{url} - {data}")
return data
except Exception as e:
raise Exception(task_ip, e)
async def run_during_night(self):
"""在晚上运行任务
"""
"""在晚上运行任务"""
tasks = set()
while True:
now = datetime.now().time()
if (time(22, 0) > now and now > time(7, 0)) and not IS_DEBUG:
if (time(19, 0) > now and now > time(2, 0)) and not IS_DEBUG:
await AsyncRedisClient().close_redis()
logger.info("不在指定时间范围内, 停止运行")
break
if len(tasks) >= 5:
logger.warning("达到最大并发任务数, 跳过")
await asyncio.sleep(60)
......@@ -324,7 +362,7 @@ class Goods:
for task_ip, proxy_ip in zip(task_ips, proxy_ips)
]
fail = 0
some = min(len(task_ips), len(proxy_ips))
some = len(fs)
for f in asyncio.as_completed(fs):
try:
shop_item = await f
......@@ -341,14 +379,14 @@ class Goods:
if count > MAX_RETRIES:
logger.info(f"{task_ip} 超过最大重试次数, 丢弃")
for i, x in enumerate(task_ips):
if x == task_ip:
if x == task_ip:
task_ips.pop(i)
sc.pop()
task_ips = task_ips[some:]
if IS_DEBUG:
# json_data["shop_items"] = shop_items * 100
pass
data = json.dumps(json_data, separators=(',', ':'))
data = json.dumps(json_data, separators=(",", ":"))
logger.debug(data)
logger.info(f"采集完成({len(shop_items)})")
await Tool.upload(s, data)
......@@ -359,7 +397,8 @@ class Goods:
logger.exception(e)
logger.error(f"{self.task_name} - 任务异常 - {e}")
await AsyncRedisClient().close_redis()
if __name__ == '__main__':
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(Goods().run_during_night())
......@@ -65,7 +65,7 @@ class Subsidiary:
"""在晚上运行任务"""
while True:
now = datetime.now().time()
if time(22, 0) > now and now > time(7, 0):
if time(19, 0) > now and now > time(2, 0):
for task in self._tasks:
if task and not task.done():
task.cancel()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment