Commit 58d033ad by yexing

up

parents
### celery 命令
- celery -A celery_app beat -l info
- celery -A celery_app worker -Q spider -l info -P gevent -E
- celery -A celery_app flower --port=5555 --basic_auth=username:password
- celery -A celery_app inspect active
- celery -A celery_app purge -Q spider -f
- celery -A celery_app control shutdown
### 其他
conda activate amazon-mult-site
* */12 * * * supervisorctl -c /etc/supervisord.conf restart celery-beat:00
from time import sleep
import asyncio
import json
import random
import re
import time
import traceback
from typing import Self
import requests
from DrissionPage import Chromium
from DrissionPage.common import Settings
from DrissionPage.items import WebPageTab
from loguru import logger
from redis.asyncio import from_url
class Data:
@classmethod
def items(cls):
return (
(k, v)
for k, v in cls.__dict__.items()
if isinstance(v, (str, int)) and not k.startswith("__")
)
@classmethod
def values(cls):
return (v for _, v in cls.items())
@classmethod
def zip(cls, other: Self):
return ((v, getattr(other, k)) for k, v in cls.items())
@classmethod
def inverse_dict(cls):
return {v: k for k, v in cls.items()}
class Postcode(Data):
com = "20001"
de = "55545"
it = "66040"
fr = "75000"
es = "04810"
jp = "496-0805"
class Site(Data):
com = "com"
# 德意法西日本
de = "de"
it = "it"
fr = "fr"
es = "es"
jp = "co.jp"
IS_DEBUG = False
DOMAIN = "https://20tools.net"
redis_config = {
"url": "redis://:a123456,a@localhost:6379/10",
"max_connections": 300
}
cookie_config = {
"cookie_time_key": "cookie_expired_time"
}
UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
Settings.set_raise_when_wait_failed(False)
class RedisSingleton:
_redis_pool = None
def __init__(self, redis_url=None):
self.redis_url = redis_url
def get_connection(self):
if self._redis_pool is None:
if self.redis_url:
self._redis_pool = from_url(self.redis_url, decode_responses=True)
else:
# 默认连接地址
self._redis_pool = from_url('redis://localhost', decode_responses=True)
return self._redis_pool
class SiteType(Data):
com = 1
de = 2
def callback(param):
"""
回调接口
"""
requests.post(
f"{DOMAIN}/api/collection/cookie",
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
data=json.dumps(param),
)
def refresh_local_cookie(data: dict, site: str = "com"):
"""
刷新本地cookie
"""
redis = redis_singleton.get_connection()
redis.set(f"cookie:{site}", json.dumps(data))
def get_cookie_error():
"""
获取cookie错误
:return:
"""
redis = redis_singleton.get_connection()
return redis.get("amazon:cookie-error")
def delete_cookie_error():
"""
删除cookie错误
:return:
"""
redis = redis_singleton.get_connection()
return redis.delete("amazon:cookie-error")
def input_postcode(
tab: WebPageTab, postcode: str, locator: str = "#GLUXZipUpdateInput"
):
location_input = tab.ele(locator, timeout=3)
if location_input is None:
raise Exception("未找到输入框")
location_input.input(postcode)
sleep(1)
def get_cookie(tab: WebPageTab, site_type: int = 1):
"""
获取cookie
"""
cookie = tab.cookies().as_str()
content = tab.html
token = re.findall(r"name=\"stylesnap\" value=\"(.*?)\">", content)
response = {
"cookie": cookie,
"token": token[0] if token else "",
"user-agent": UA,
"time": int(time.time()),
}
logger.info(f"获取到cookie: {json.dumps(response)}")
callback({"type": site_type, "data": response})
return cookie
def run(site: str = "com", postcode: str = "20001", site_type: int = 1):
def _close():
cookie = get_cookie(tab, site_type)
if IS_DEBUG:
refresh_local_cookie({"cookie": cookie, "user-agent": UA}, site=site)
chromium.clear_cache()
chromium.quit()
delete_cookie_error()
if not IS_DEBUG:
number = get_cookie_error()
number = int(number) if number else 0
if number < 50:
logger.success("Cookie正常")
return
logger.error("Cookie异常")
chromium = Chromium()
tab = chromium.latest_tab
try:
# &currency=JPY
tab.get(f"https://www.amazon.{site}/stylesnap?language=en_GB")
# 判断邮编是否正确
line = tab.ele("#glow-ingress-line2", timeout=3)
nav = tab.ele("#icp-nav-flyout", timeout=3)
if line and nav:
postal, lang = line.text, nav.text
if (not postal or postcode not in postal) or (not lang or not "EN" not in lang):
logger.info("邮编或语言错误, 开始设置邮编和语言")
else:
logger.info("邮编和语言正确")
_close()
return
location = tab.ele("#nav-global-location-popover-link", timeout=3)
if not location:
raise Exception("没有进入正确页面")
else:
location.click()
postcode_parts = postcode.split("-")
if len(postcode_parts) == 2:
input_postcode(tab, postcode_parts[0], "#GLUXZipUpdateInput_0")
input_postcode(tab, postcode_parts[1], "#GLUXZipUpdateInput_1")
else:
input_postcode(tab, postcode)
locs = [
"#GLUXZipUpdate",
'xpath://*[@id="a-popover-1"]/div/header/button',
'xpath://*[@id="icp-nav-flyout"]/button',
"@text()=English",
]
for loc in locs:
ele = tab.ele(loc, timeout=3)
if not ele:
raise ValueError("元素定位错误")
ele.wait.clickable(timeout=3, raise_err=False).click()
tab.wait(2)
_close()
except Exception as e:
logger.error(e)
def main():
if IS_DEBUG:
items = random.choices(list(Site.zip(Postcode)))
else:
items = Site.zip(Postcode)
for site, postcode in items:
site_type = SiteType.__dict__.get(site)
if site_type is None:
continue
logger.info(f"开始获取cookie: {site} {postcode}")
run(site, postcode)
sleep(10)
if IS_DEBUG:
exit()
if __name__ == "__main__":
while True:
try:
redis_singleton = RedisSingleton(redis_url=redis_config["url"])
asyncio.run(main())
except:
traceback.print_exc()
# 依赖项:
# pip install fastapi uvicorn
import json
import uvicorn
from fastapi import FastAPI, Query, HTTPException
from conf import config
from db import RedisSingleton
app = FastAPI(title="Redis Query API", description="API to query data from Redis")
# 从配置文件获取Redis URL
redis_url = config.get('redis', 'url')
redis_client = RedisSingleton(redis_url)
@app.get("/redis/query/")
def query_redis(
query_key: str = Query(..., description="Redis key to query"),
count: int = Query(1, description="Number of items to retrieve", ge=1)
):
"""
从Redis中获取指定key的值
- **query_key**: Redis中的键
- **count**: 要获取的数量,默认为1
"""
try:
# 获取Redis连接
redis_conn = redis_client.get_connection()
# 如果是列表类型,获取指定数量的元素
values = redis_conn.lpop(query_key, count - 1)
# 将字符串转换为字典
new_values = [json.loads(value) for value in values]
return {"key": query_key, "type": "list", "values": new_values, "count": len(values)}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error querying Redis: {str(e)}")
if __name__ == "__main__":
# 启动FastAPI应用
uvicorn.run(app, host="0.0.0.0", port=9012)
import json
import os
import threading
import time
import redis
from curl_cffi import requests
from loguru import logger
from conf import config
COLL_DOMAIN = config['app']['coll_domain']
_redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True)
task_monitoring_config = config['task-monitoring']
task_search_config = config['task-search']
task_product_detail_config = config['task-product-detail']
DEFAULT_HEADER = {
"Content-Type": "application/json",
"Accept": "application/json",
}
def task_callback(data):
"""
回调接口
"""
try:
url = f"{COLL_DOMAIN}/api/collection/task"
response = requests.post(url, headers=DEFAULT_HEADER, data=json.dumps(data), verify=False)
data = response.json()
if data["code"] == 0:
return True
else:
return False
except Exception as e:
logger.error(f"回调异常 : {e}")
return False
def batch_callback(callback_key: str):
thread = []
number = _redis_db.llen(callback_key)
logger.info(f"回调 {callback_key},共有{number}个任务")
for _ in range(10):
data = _redis_db.lpop(callback_key)
if data:
result = json.loads(data)
thread.append(threading.Thread(target=task_callback, args=(result,)))
else:
break
for t in thread:
t.start()
for t in thread:
t.join()
def callback_task(callback_key: str):
"""
回调任务
:param callback_key:
:return:
"""
task_number = 500
result = []
try:
if callback_key == task_monitoring_config.get('item_key'):
for _ in range(task_number):
data = _redis_db.lpop(callback_key)
if data:
result.append(json.loads(data))
else:
break
if result:
logger.info(f"回调 {callback_key},共有{len(result)}个任务")
logger.info(f"回调: result: {json.dumps(result)}")
callback = {
"data": {
"error_items": [],
"collection": result,
},
"type": 5,
}
task_callback(callback)
elif callback_key == task_product_detail_config.get('item_key') or callback_key == task_search_config.get('item_key'):
batch_callback(callback_key)
except:
logger.error(f"回调异常")
def run(task_config: dict = task_monitoring_config):
CALLBACK_PID_FILE = "./pid/callback.pid"
while True:
if not os.path.exists(CALLBACK_PID_FILE):
logger.error('任务退出')
break
try:
callback_key = task_config.get('item_key')
callback_task(callback_key)
logger.info(f"回调 {callback_key} 完成")
except Exception as e:
logger.error(f"任务异常 : {e}")
callback_sleep_time = int(task_config.get('callback_sleep_time', 5))
time.sleep(callback_sleep_time)
if __name__ == '__main__':
tasks = []
PID_FILES = [
"monitoring.pid",
"product_detail.pid",
"search.pid",
"callback.pid",
]
for PID_FILE in PID_FILES:
with open(f"./pid/{PID_FILE}", 'w') as f:
f.write(str(os.getpid()))
if task_monitoring_config.get('enabled', None) == 'True':
logger.info(f"采集任务回调启动")
t = threading.Thread(target=run, args=(task_monitoring_config,))
tasks.append(t)
if task_product_detail_config.get('enabled', None) == 'True':
logger.info(f"商品发布回调启动")
t = threading.Thread(target=run, args=(task_product_detail_config,))
tasks.append(t)
if task_search_config.get('enabled', None) == 'True':
logger.info(f"搜索回调启动")
t = threading.Thread(target=run, args=(task_search_config,))
tasks.append(t)
for t in tasks:
t.start()
for t in tasks:
t.join()
from datetime import timedelta
from celery import Celery
from kombu import Queue, Exchange
from conf import config
redis_url = config["redis"]["url"]
app = Celery(
"amazon",
broker=redis_url,
backend=redis_url,
include=["celery_tasks"],
)
app.conf.update(
task_serializer="json",
task_track_started=True,
task_create_missing_queues=True,
worker_concurrency=1,
worker_max_tasks_per_child=100,
worker_max_memory_per_child=200_000,
worker_prefetch_multiplier=2,
result_expires=300,
)
app.conf.broker_transport_options = {"global_keyprefix": "celery_broker"}
app.conf.result_backend_transport_options = {"prefix": "celery_backend"}
app.conf.timezone = "Asia/Shanghai"
app.conf.enable_utc = False
app.conf.singleton_lock_expiry = 600
app.conf.task_queues = (
Queue("spider", Exchange("spider"), routing_key="spider"),
Queue("dial", Exchange("dial"), routing_key="dial"),
)
app.conf.task_routes = {
"celery_tasks.detail_spider_task": {"queue": "detail"},
"celery_tasks.monitor_spider_task": {"queue": "detail"},
"celery_tasks.search_spider_task": {"queue": "search"},
"celery_tasks.*_dial_task": {"queue": "dial"},
"celery_tasks.*": {"queue": "detail"},
}
app.conf.beat_schedule = {
"start_spider_task": {
"task": "celery_tasks.start_spider_task",
"schedule": timedelta(seconds=5),
},
"start_dial_task": {
"task": "celery_tasks.start_dial_task",
"schedule": timedelta(seconds=5),
},
}
app.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': redis_url,
'default_timeout': 30
}
}
from __future__ import annotations
from time import sleep
import asyncio
import functools
import json
import os
from types import ModuleType
import tldextract
from celery import current_app, shared_task
from celery_singleton import Singleton
from celery_once import QueueOnce
from loguru import logger
from conf import config
from const import Site, Spider
from db import RedisSingleton
from dial import Dial
from proxy import ProxyManager
from spider import detail, monitor, search
from tool import Task
IS_DEBUG = int(os.environ.get("IS_DEBUG", False))
@shared_task(base=Singleton)
def start_spider_task(*keys: str):
for key in keys or Spider.values():
if config[key]["enabled"] != "True":
logger.debug(f"skip: {key}")
continue
assign_spider_task.delay(key)
@shared_task(base=Singleton)
def start_dial_task():
server = config["dial"]["server"]
server_list = json.loads(server)
for server in server_list:
check_dial_task.delay(server)
@shared_task(base=QueueOnce, once={'keys': ['server']})
def check_dial_task(server):
redis = RedisSingleton(redis_url=config["redis"]["url"])
proxyManage = ProxyManager(redis)
is_dial = config["dial"]["is_debug"] != "True"
dial = Dial(is_dial=is_dial, nameserver="223.5.5.5\nnameserver 119.29.29.29")
logger.debug(f"dial: {server}")
dial.check_dial(server, proxyManage)
@shared_task(bind=True)
def clean_proxy_task(self):
redis = RedisSingleton(redis_url=config["redis"]["url"])
proxy_manager = ProxyManager(redis)
proxy_manager.delete_all_proxy()
def base_spider(task: dict, site: str, module: ModuleType, class_name: str):
redis = RedisSingleton(redis_url=config["redis"]["url"])
cls = getattr(module, site.capitalize() + class_name)
if cls:
result = cls(redis).run(task)
logger.info(f"finish: {task.get('url')}")
return result
@shared_task(bind=True, autoretry_for=(Exception,), max_retries=10, retry_backoff=True)
def detail_spider_task(self, task: dict, site: str):
base_spider(task, site, detail, "Goods")
@shared_task(bind=True, autoretry_for=(Exception,), max_retries=10, retry_backoff=True)
def monitor_spider_task(self, task: dict, site: str):
base_spider(task, site, monitor, "Monitoring")
@shared_task(bind=True, autoretry_for=(Exception,), max_retries=10, retry_backoff=True)
def search_spider_task(self, task: dict, site: str):
base_spider(task, site, search, "Search")
@shared_task(bind=True, autoretry_for=(Exception,), max_retries=10, retry_backoff=True)
def assign_spider_task(self, key: str, tasks: list = None):
logger.debug(f"assign: {key}")
redis = RedisSingleton(redis_url=config["redis"]["url"])
proxy_manager = ProxyManager(redis)
proxy_number = proxy_manager.get_proxy_number()
logger.debug(f"代理数量: {proxy_number}")
if proxy_number < 8:
logger.warning("代理不足,任务终止")
sleep(1)
return
if tasks is None:
task_manager = Task(redis)
task_key = config[key]["task_key"]
task_number = int(config[key]["task_number"])
tasks = task_manager.get_task(task_key, task_number)
tasks = [json.loads(task) for task in tasks]
# tasks = [{"url": "https://www.amazon.de/"}] # Cause errors
inv = Site.inverse_dict()
for task in tasks:
url: str = task.get("url")
site = inv.get(tldextract.extract(url).suffix) if url else None
if not site:
logger.error(f"域名解析失败: {site}")
continue
if key == Spider.detail:
detail_spider_task.apply_async(
args=(task, site),
link_error=handle_failed_task.s(task_name=detail_spider_task.name),
)
elif key == Spider.monitor:
monitor_spider_task.apply_async(
args=(task, site),
link_error=handle_failed_task.s(task_name=monitor_spider_task.name),
)
elif key == Spider.search:
search_spider_task.apply_async(
args=(task, site),
link_error=handle_failed_task.s(task_name=search_spider_task.name),
)
@shared_task
def handle_failed_task(*args, **kwargs):
ctx, reason, _ = args
logger.info(f"task: {ctx.id}, reason: {reason}")
if not isinstance(reason, RuntimeError):
return
task_name = kwargs.get("task_name")
_trace = (ctx.headers or {}).get("_trace")
_trace = _trace + 1 if _trace else 1
logger.debug(f"trace: {_trace}")
current_app.tasks[task_name].apply_async(
args=ctx.args,
kwargs=ctx.kwargs,
countdown=30,
headers={"_trace": _trace},
link_error=handle_failed_task.s(task_name=task_name),
)
import configparser
import os
config = configparser.ConfigParser()
# 读取配置文件是否存在
config_path = os.path.join(os.path.dirname(__file__), 'config/config.ini')
if not config.read(config_path, encoding='utf-8'):
raise FileNotFoundError("配置文件不存在")
config.read('config.ini', encoding='utf-8')
# 校验配置文件是否正确
if 'redis' not in config:
raise ValueError("Redis 配置不存在")
else:
redis_config = config['redis']
if 'url' not in redis_config:
raise ValueError("Redis URL 不存在")
if 'max_connections' not in redis_config:
raise ValueError("Redis max_connections 不存在")
if 'app' not in config:
raise ValueError("App 配置不存在")
if 'task-monitoring' not in config:
raise ValueError("商品监控配置不存在")
if 'task-search' not in config:
raise ValueError("搜索配置不存在")
if 'cookie' not in config:
config['cookie'] = {
"cookie_time_key": "cookie_expired_time"
}
if 'impersonate' not in config['app']:
config['app']['impersonate'] = "chrome"
from typing import Self
class Data:
@classmethod
def items(cls):
return (
(k, v)
for k, v in cls.__dict__.items()
if isinstance(v, (str, int)) and not k.startswith("__")
)
@classmethod
def values(cls):
return (v for _, v in cls.items())
@classmethod
def zip(cls, other: Self):
return ((v, getattr(other, k)) for k, v in cls.items())
@classmethod
def inverse_dict(cls):
return {v: k for k, v in cls.items()}
class Spider(Data):
detail = "task-product-detail"
monitor = "task-monitoring"
search = "task-search"
class Site(Data):
com = "com"
# 德意法西日本
de = "de"
it = "it"
fr = "fr"
es = "es"
jp = "co.jp"
class Postcode(Data):
com = "20001"
de = "55545"
it = "66040"
fr = "75000"
es = "04810"
jp = "496-0805"
class Lang(Data):
com = "en_US"
de = "de_DE"
it = "it_IT"
fr = "fr_FR"
es = "es_ES"
jp = "ja_JP"
class StockStatus(Data):
com = "In Stock"
de = "Auf Lager"
it = "Disponibilità immediata"
fr = "En stock"
es = "En stock"
jp = "现在有货"
class SiteType(Data):
com = 1
de = 2
\ No newline at end of file
# import aioredis
from redis.asyncio import from_url
class RedisSingleton:
_redis_pool = None
def __init__(self, redis_url=None):
self.redis_url = redis_url
def get_connection(self):
if self._redis_pool is None:
if self.redis_url:
self._redis_pool = from_url(self.redis_url, decode_responses=True)
else:
# 默认连接地址
self._redis_pool = from_url('redis://localhost', decode_responses=True)
return self._redis_pool
from time import sleep
import asyncio
import json
import os
import socket
from multiprocessing import Pool
from fabric2 import Connection
from loguru import logger
from conf import config
from db import RedisSingleton
from proxy import ProxyManager
IS_DEBUG = int(os.environ.get("IS_DEBUG", False))
def get_server_info(item: str):
"""
:param item:
:return:
"""
info = item.split(":")
if len(info) < 4:
raise ValueError("拨号服务器配置错误")
return {
"name": info[0],
"host": info[1],
"port": int(info[2]),
"password": info[3],
"proxy_number": int(info[4]) if len(info) == 5 else 8,
}
def command_run(conn, command):
"""
run command
:param conn:
:param command:
:return:
"""
try:
item = conn.run(command, timeout=10, hide=True, warn=True)
if item.ok:
return item.stdout
return None
except socket.timeout:
return None
except Exception:
return None
class Dial:
_first_check = True
def __init__(self, **kwargs):
"""
server: server object [{name}:host:port:password:{代理数}]
:param kwargs:
"""
# self.server = kwargs.get('server', None)
# if self.server is None:
# raise ValueError("server is required")
self.nameserver = kwargs.get("nameserver", None)
self.dial_time = kwargs.get("dial_time", 5)
self.restart = kwargs.get("restart", True)
self.redis_url = kwargs.get("redis_url", "redis://localhost:6379/0")
self.loop_number = kwargs.get("loop_number", 600)
self.is_dial = kwargs.get("is_dial", False)
def dial(self, item, proxy_client):
try:
server_info = get_server_info(item)
dial_name = server_info["name"]
conn = Connection(
host=f"{server_info['host']}",
user="root",
port=server_info["port"],
connect_kwargs={"password": server_info["password"]},
)
except Exception as e:
logger.error(f"服务器连接异常: {item} : {e}")
return
for _ in range(10):
try:
command_list = []
if self.is_dial:
logger.info(f"重新拨号: {dial_name}")
command_list.append("pppoe-stop")
command_list.append("pppoe-start")
if self.nameserver:
command_list.append(
f'echo -e "nameserver {self.nameserver}" | tee /etc/resolv.conf > /dev/null && chmod 644 /etc/resolv.conf && cat /etc/resolv.conf'
)
if self.restart:
command_list.append("tp start;tp restart")
for command in command_list:
command_run(conn, command)
logger.success(f"{dial_name}: 拨号成功")
proxy = None
for _ in range(10):
sleep(self.dial_time)
proxy = command_run(conn, "curl ifconfig.me")
if proxy:
break
if proxy:
logger.info(f"{dial_name} - 代理: {proxy}")
proxy_client.add_proxy(
name=server_info["name"],
proxy=f"{proxy}:3328",
proxy_number=server_info["proxy_number"],
)
break
except Exception as e:
logger.error(f"{dial_name}: 拨号失败 - {e}")
continue
conn.close()
def check_dial(self, item, proxy_client: ProxyManager):
"""
校验拨号
:param item:
:param proxy_client:
:return:
"""
server_info = get_server_info(item)
dial_name = server_info["name"]
proxy_number = server_info["proxy_number"]
sem = asyncio.Semaphore(proxy_number)
ttl_time = proxy_client.check_proxy_expire(dial_name)
if ttl_time and proxy_client.get_proxy_number() >= 8:
return
try:
proxy_client.delete_proxy(dial_name)
with sem:
self.dial(item, proxy_client)
except Exception as e:
logger.error(f"拨号失败: {item}, {e}")
def run(self, item, proxy_client):
"""
:param item:
:param proxy_client:
:return:
"""
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(self.check_dial(item, proxy_client))
except KeyboardInterrupt:
logger.error(f"任务终止")
except Exception as e:
logger.error(f"任务终止: {e}")
if __name__ == "__main__":
server = config["dial"]["server"]
server_list = json.loads(server)
redis_config = config["redis"]
pool = Pool(processes=len(server_list))
redis_singleton = RedisSingleton(redis_url=redis_config["url"])
proxyManage = ProxyManager(redis_singleton)
results = []
# dial = Dial(server=server_list, is_dial=True, nameserver='119.29.29.29\nnameserver 223.5.5.5')
dial = Dial(
server=server_list,
is_dial=False,
nameserver="223.5.5.5\nnameserver 119.29.29.29",
)
for server in server_list:
result = pool.apply_async(dial.run, args=(server, proxyManage))
results.append(result)
for i in results:
i.wait()
import asyncio
import socket
import paramiko
from fabric2 import Connection
from loguru import logger
def get_server_info(item: str):
"""
:param item:
:return:
"""
info = item.split(":")
if len(info) < 4:
raise ValueError("拨号服务器配置错误")
return {
"name": info[0],
"host": info[1],
"port": int(info[2]),
"password": info[3],
"proxy_number": int(info[4]) if len(info) == 5 else 8,
}
def command_run(conn, command):
"""
run command
:param conn:
:param command:
:return:
"""
try:
item = conn.run(command, timeout=10, hide=True, warn=True)
if item.ok:
return item.stdout
return None
except socket.timeout:
return None
except Exception:
return None
def run(item):
server_info = get_server_info(item)
dial_name = server_info["name"]
logger.info(f"开始拨号: {dial_name}")
try:
conn = Connection(
host=f"{server_info['host']}",
user="root",
port=server_info["port"],
connect_kwargs={"password": server_info["password"]},
connect_timeout=10,
)
command_run(conn, "pppoe-start")
proxy = command_run(conn, "curl ifconfig.me")
if proxy:
logger.info(f"{dial_name} {proxy}")
else:
logger.error(f"{dial_name} None")
conn.close()
except paramiko.SSHException:
logger.error(f"服务器连接异常: {dial_name}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
dials = ["qcs64rk21:58.241.175.244:20125:WyyOF214982d:8","qcs64rk22:58.241.175.245:20371:WyyOF214982d:8","qcs64rk23:58.241.175.245:20315:WyyOF214982d:8","qcs64rk24:58.241.175.245:20063:WyyOF214982d:8","qcs64rk25:58.241.175.243:20015:WyyOF214982d:8","qcs64rk26:58.241.175.243:20335:WyyOF214982d:8","qcs64rk27:58.241.175.245:20221:WyyOF214982d:8","qcs64rk28:58.241.175.244:20299:WyyOF214982d:8","qcs64rk29:58.241.175.243:20259:WyyOF214982d:8","qcs64rk210:58.241.175.245:20151:WyyOF214982d:8","qcs64rk211:58.241.175.243:20173:WyyOF214982d:8","qcs64rk212:58.241.175.243:20149:WyyOF214982d:8","qcs64rk213:58.241.175.243:20157:WyyOF214982d:8","qcs64rk214:58.241.175.245:20207:WyyOF214982d:8","qcs64rk215:58.241.175.243:20061:WyyOF214982d:8","ahvxr8n4:jmdx2.leyuyun.com:20025:nBMf28x17M32:8","x9cg9u8p5:61.175.187.148:20283:1IZce6p3A524:8","x9cg9u8p4:61.175.187.148:20331:1IZce6p3A524:8","x9cg9u8p3:61.175.187.148:20177:1IZce6p3A524:8","x9cg9u8p2:61.175.187.148:20429:1IZce6p3A524:8","x9cg9u8p1:61.175.187.148:20189:1IZce6p3A524:8"]
for item in dials:
asyncio.run(run(item))
import logging
class ProxyManager:
def __init__(self, redis_singleton, **kwargs):
self.proxy_set_key = kwargs.get('proxy_set_key', 'dial-proxies')
self.proxy_expire_key = kwargs.get('proxy_expire_key', 'dial-proxies-expire')
self.proxy_key = kwargs.get('proxy_key', 'dial-proxies')
self.redis_singleton = redis_singleton
def add_proxy(self, name: str = None, proxy: str = None, during: int = 360000, proxy_number: int = 8):
"""
添加代理
:param name:
:param proxy:
:param during:
:param proxy_number:
:return:
"""
# 将代理添加到集合中
redis_client = self.redis_singleton.get_connection()
proxy_key = f"{self.proxy_expire_key}:{name}"
redis_client.set(proxy_key, 1, ex=during)
proxy_key = f"{self.proxy_key}:{name}"
redis_client.set(proxy_key, proxy)
for i in range(proxy_number):
self.join_proxy("#".join([proxy, f"{name}#{i}"]), is_first=True)
def delete_proxy(self, name):
"""
删除代理
:param name:
:return:
"""
proxy = self.get_use_proxy(name)
redis_client = self.redis_singleton.get_connection()
if proxy is not None:
for i in range(50):
redis_client.srem(self.proxy_set_key, "#".join([proxy, f"{name}#{i}"]))
redis_client.delete(f"{self.proxy_expire_key}:{name}")
redis_client.delete(f"{self.proxy_key}:{name}")
def delete_all_proxy(self):
"""
删除所有代理
:return:
"""
redis_client = self.redis_singleton.get_connection()
keys = redis_client.scan_iter(f"{self.proxy_expire_key}*")
for key in keys:
redis_client.delete(key)
keys = redis_client.scan_iter(f"{self.proxy_key}*")
for key in keys:
redis_client.delete(key)
redis_client.delete(self.proxy_set_key)
def get_use_proxy(self, name):
"""
获取使用的代理
:param name:
:return:
"""
redis_client = self.redis_singleton.get_connection()
proxy_key = f"{self.proxy_key}:{name}"
proxy = redis_client.get(proxy_key)
return proxy
def check_proxy_expire(self, name):
"""
检查代理是否过期
:param name:
:return:
"""
# 判断代理是否过期
redis_client = self.redis_singleton.get_connection()
proxy_key = f"{self.proxy_expire_key}:{name}"
ttl = redis_client.ttl(proxy_key)
return ttl > 0
def get_proxy(self):
"""
获取代理
:return:
"""
redis_client = self.redis_singleton.get_connection()
proxy = redis_client.spop(self.proxy_set_key)
return proxy if proxy else None
def get_proxy_number(self):
"""
获取代理数量
:return:
"""
redis_client = self.redis_singleton.get_connection()
number = redis_client.scard(self.proxy_set_key)
return number
def join_proxy(self, proxy: str = None, is_first: bool = False):
"""
将代理加入到代理集合中
:param proxy:
:param is_first:
:return:
"""
name = proxy.split('#')[1]
host = proxy.split('#')[0]
redis_client = self.redis_singleton.get_connection()
old_proxy = self.get_use_proxy(name)
if (old_proxy and old_proxy == host) or is_first:
redis_client.sadd(self.proxy_set_key, proxy)
else:
logging.error(f"代理已经被切换, 不加入: {proxy}")
[pytest]
addopts = -vs -p no:warnings
testpaths = ./test
python_files = test_*.py
python_classes = Test*
python_functions = test*
\ No newline at end of file
asgiref==3.8.1
Babel==2.17.0
celery==5.5.2
celery_singleton==0.3.1
curl_cffi==0.10.0
DrissionPage==4.1.0.18
DrissionPage==4.1.0.18
fabric2==3.2.2
fastapi==0.115.12
loguru==0.7.3
lxml==5.4.0
pytest==8.3.5
python_dateutil==2.9.0.post0
redis==6.1.0
Requests==2.32.3
tldextract==5.3.0
uvicorn==0.34.2
flower
from const import Postcode, Site, Lang, StockStatus
class Adapter(type):
def __new__(cls, name, bases, attrs):
code = name[:2].lower() # 类名格式 XX<Type>
attrs.update({
"site": getattr(Site, code),
"postcode": getattr(Postcode, code),
"lang": getattr(Lang, code),
"stock_status": getattr(StockStatus, code)
})
return super().__new__(cls, name, bases, attrs)
import asyncio
import os
import platform
from loguru import logger
from spider.ada import Adapter
from spider.base_detail import Goods, redis_singleton
class ComGoods(Goods):
pass
class DeGoods(Goods, metaclass=Adapter):
pass
class ItGoods(Goods, metaclass=Adapter):
pass
class FrGoods(Goods, metaclass=Adapter):
pass
class EsGoods(Goods, metaclass=Adapter):
pass
class JpGoods(Goods, metaclass=Adapter):
pass
if __name__ == "__main__":
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
PID_FILE = "./pid/product_detail.pid"
for _ in range(200):
# 判断是否存在pid文件
if not os.path.exists(PID_FILE):
logger.error("任务退出")
break
goods = Goods(redis_singleton)
asyncio.run(goods.main())
import asyncio
import os
import platform
from loguru import logger
from spider.ada import Adapter
from spider.base_monitor import Monitoring, redis_singleton
class ComMonitoring(Monitoring):
pass
class DeMonitoring(Monitoring, metaclass=Adapter):
pass
class ItMonitoring(Monitoring, metaclass=Adapter):
pass
class FrMonitoring(Monitoring, metaclass=Adapter):
pass
class EsMonitoring(Monitoring, metaclass=Adapter):
pass
class JpMonitoring(Monitoring, metaclass=Adapter):
pass
if __name__ == "__main__":
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
PID_FILE = "./pid/monitoring.pid"
for _ in range(100):
# 判断是否存在pid文件
if not os.path.exists(PID_FILE):
logger.error("任务退出")
break
monitoring = Monitoring(redis_singleton)
asyncio.run(monitoring.main())
import asyncio
import os
import platform
from loguru import logger
from spider.ada import Adapter
from spider.base_search import Search, redis_singleton
class ComSearch(Search):
pass
class DeSearch(Search, metaclass=Adapter):
pass
class ItSearch(Search, metaclass=Adapter):
pass
class FrSearch(Search, metaclass=Adapter):
pass
class EsSearch(Search, metaclass=Adapter):
pass
class JpSearch(Search, metaclass=Adapter):
pass
if __name__ == "__main__":
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
PID_FILE = "./pid/search.pid"
for _ in range(100):
if not os.path.exists(PID_FILE):
logger.info("任务退出")
break
search = Search(redis_singleton)
asyncio.run(search.main())
#!/bin/bash
# 保存PID
PID_DIR="./pid"
mkdir -p $PID_DIR
# 保存LOG
LOG_DIR="./log"
mkdir -p $LOG_DIR
# 启动Worker
nohup celery -A celery_app worker -Q spider -l info -E > $LOG_DIR/worker.log 2>&1 &
echo $! > $PID_DIR/worker.pid
# 启动Beat
nohup celery -A celery_app beat -l info > $LOG_DIR/beat.log 2>&1 &
echo $! > $PID_DIR/beat.pid
# 启动Flower
#nohup celery -A celery_app flower --port=5555 --address=127.0.0.1 > $LOG_DIR/flower.log 2>&1 &
#echo $! > $PID_DIR/flower.pid
echo "服务已启动 | Worker PID:$(cat $PID_DIR/worker.pid) | Beat PID:$(cat $PID_DIR/beat.pid) | Flower PID:$(cat $PID_DIR/flower.pid)"
#!/bin/bash
PID_DIR="./pid"
kill -9 $(cat $PID_DIR/*.pid) 2>/dev/null
rm -f $PID_DIR/*.pid
echo "服务已停止"
[program:celery-app-detail]
command = celery -A celery_app worker -Q detail -n spider_detail_worker@%%h -l info --concurrency=40 -E
directory = /root/amazon-mult-site
autorestart = true
startsecs = 10
startretries = 10
stdout_logfile = /etc/supervisord.d/log/celery_app_spider.log
stderr_logfile = /etc/supervisord.d/log/celery_app_spider.log
stdout_logfile_maxbytes = 2MB
stderr_logfile_maxbytes = 2MB
user = root
priority = 999
numprocs = 1
process_name = %(process_num)02d
[program:celery-app-search]
command = celery -A celery_app worker -Q search -n spider_search_worker@%%h -l info --concurrency=20 -E
directory = /root/amazon-mult-site
autorestart = true
startsecs = 10
startretries = 10
stdout_logfile = /etc/supervisord.d/log/celery_app_spider.log
stderr_logfile = /etc/supervisord.d/log/celery_app_spider.log
stdout_logfile_maxbytes = 2MB
stderr_logfile_maxbytes = 2MB
user = root
priority = 999
numprocs = 1
process_name = %(process_num)02d
[program:celery-app-dial]
command = celery -A celery_app worker -Q dial -n dial_worker@%%h -l info --concurrency=3 -E
directory = /root/amazon-mult-site
autorestart = true
startsecs = 10
startretries = 10
stdout_logfile = /etc/supervisord.d/log/celery_app_dial.log
stderr_logfile = /etc/supervisord.d/log/celery_app_dial.log
stdout_logfile_maxbytes = 2MB
stderr_logfile_maxbytes = 2MB
user = root
priority = 999
numprocs = 1
process_name = %(process_num)02d
[program:celery-beat]
command = celery -A celery_app beat -l info
directory = /root/amazon-mult-site
autorestart = true
startsecs = 10
startretries = 10
stdout_logfile = /etc/supervisord.d/log/celery_beat.log
stderr_logfile = /etc/supervisord.d/log/celery_beat.log
stdout_logfile_maxbytes = 2MB
stderr_logfile_maxbytes = 2MB
user = root
priority = 999
numprocs = 1
process_name = %(process_num)02d
[program:celery-flower]
command = celery -A celery_app flower --port=5555 --address=0.0.0.0 --basic_auth=chensav:chensav
directory = /root/amazon-mult-site
autorestart = true
startsecs = 10
startretries = 10
stdout_logfile = /etc/supervisord.d/log/celery_flower.log
stderr_logfile = /etc/supervisord.d/log/celery_flower.log
stdout_logfile_maxbytes = 2MB
stderr_logfile_maxbytes = 2MB
user = root
priority = 999
numprocs = 1
process_name = %(process_num)02d
[program:task]
command = /usr/bin/python3 task.py
directory = /root/amazon-mult-site
autorestart = true
startsecs = 10
startretries = 10
stdout_logfile = /etc/supervisord.d/log/task.log
stderr_logfile = /etc/supervisord.d/log/task.log
stdout_logfile_maxbytes = 2MB
stderr_logfile_maxbytes = 2MB
user = root
priority = 999
numprocs = 1
process_name = %(process_num)02d
[program:callback]
command = /usr/bin/python3 callback.py
directory = /root/amazon-mult-site
autorestart = true
startsecs = 10
startretries = 10
stdout_logfile = /etc/supervisord.d/log/callback.log
stderr_logfile = /etc/supervisord.d/log/callback.log
stdout_logfile_maxbytes = 2MB
stderr_logfile_maxbytes = 2MB
user = root
priority = 999
numprocs = 1
process_name = %(process_num)02d
from time import sleep
import asyncio
import json
import os
import threading
import time
import redis
from curl_cffi import requests
from loguru import logger
from conf import config
from const import Site
from db import RedisSingleton
from tool import Task
COLL_DOMAIN = config['app']['coll_domain']
_redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True)
task_monitoring_config = config['task-monitoring']
task_search_config = config['task-search']
task_product_detail_config = config['task-product-detail']
redis_config = config['redis']
cookie_config = config['cookie']
DEFAULT_HEADER = {
"Content-Type": "application/json",
"Accept": "application/json",
}
PID_FILE = './pid/task.pid'
def get_task(task_key: str = task_monitoring_config['queue_key'], number: int = 1):
"""
获取任务
:param task_key:
:param number:
:return:
"""
try:
url = f"{COLL_DOMAIN}/api/collection/task?number={number}&queue={task_key}"
response = requests.get(url, headers=DEFAULT_HEADER, verify=False)
response = response.json()
if response["code"] == 0:
return response["data"]
else:
return {}
except Exception as e:
logger.error(f"获取任务异常 : {e}")
return {}
def add_task(task_key: str, redis_key: str, task_number: int):
"""
添加任务
:param task_key:
:param redis_key:
:param task_number:
:return:
"""
items = get_task(task_key, task_number)
task_lists = items.get('list', [])
if task_lists:
logger.info(f"{task_key} - {len(task_lists)}个任务加入队列")
for item in task_lists:
_redis_db.lpush(redis_key, json.dumps(item))
def run(task_config: dict = task_monitoring_config):
while True:
if not os.path.exists(PID_FILE):
logger.error('任务退出')
break
add_task_enabled = task_config.get('add_task_enabled')
task_number = int(task_config.get('number'))
task_min_number = int(task_config.get('min_number'))
redis_key = task_config.get('task_key')
todo_task_count = _redis_db.llen(redis_key)
task_key = task_config.get('queue_key')
try:
task_keys = task_key.split(',')
logger.info(f"{redis_key}任务队列长度 : {todo_task_count}")
if todo_task_count <= task_min_number and add_task_enabled == 'True':
for key in task_keys:
add_task(key, redis_key, task_number)
except KeyboardInterrupt:
logger.error(f"任务终止")
except Exception as e:
logger.error(f"任务异常 : {redis_key} : {e}")
time.sleep(5)
def cookie():
for site in Site.values():
time_key = cookie_config['cookie_time_key']
time_key = f"{time_key}:{site}"
_redis_db.delete(time_key)
while True:
if not os.path.exists(PID_FILE):
logger.error('任务退出')
break
logger.info(f"获取cookie")
for site in Site.values():
try:
task_manager.get_cookie(site)
except:
logger.error(f"获取cookie异常")
sleep(5)
if __name__ == '__main__':
tasks = []
redis_singleton = RedisSingleton(redis_url=redis_config['url'])
task_manager = Task(redis_singleton)
with open(PID_FILE, 'w') as f:
f.write(str(os.getpid()))
if task_monitoring_config.get('enabled', None) == 'True':
logger.info(f"监控任务添加")
t = threading.Thread(target=run, args=(task_monitoring_config,))
tasks.append(t)
else:
logger.info(f"监控任务未启动")
if task_product_detail_config.get('enabled', None) == 'True':
logger.info(f"发布任务添加")
t = threading.Thread(target=run, args=(task_product_detail_config,))
tasks.append(t)
else:
logger.info(f"发布任务未启动")
if task_search_config.get('enabled', None) == 'True':
logger.info(f"搜索任务添加")
t = threading.Thread(target=run, args=(task_search_config,))
tasks.append(t)
else:
logger.info(f"搜索任务未启动")
t = threading.Thread(target=asyncio.run, args=(cookie(),))
tasks.append(t)
for t in tasks:
t.start()
for t in tasks:
t.join()
import json
import pytest
from celery import Celery
from celery.contrib.testing.worker import start_worker
from celery_tasks import assign_spider_task, start_spider_task
from const import Site
@pytest.fixture(scope="session")
def celery_worker():
app = Celery(
broker_url="memory://",
result_backend="cache+memory://",
task_always_eager=False,
)
with start_worker(
app,
shutdown_timeout=600,
perform_ping_check=False,
# loglevel="DEBUG",
) as worker:
yield worker
def test_assign_task(celery_worker):
asin = "B0F4K4NQM2"
sites = [Site.com]
# sites = Site.values()
for site in sites:
style_snap_token = {
Site.de: "hOCRqU2moPvF7OHbdVIA%2FcA5ovfB6r0ylJuUi%2FlOBZz3AAAAAGgkOxkAAAAB",
Site.it: "hDnqRGV%2FHNIvmt066sCeQ%2BCV3nSmUqKeaz8xxb%2FvRxVcAAAAAGgkSZgAAAAB",
Site.fr: "hOszNRVKeyUBhfw3wv5b0aA8oeLE2aeIwsz2PUdA298eAAAAAGgkYs0AAAAB",
Site.es: "hBrvHdT5rdf55ogsaVpOntP1EBtw%2FzMQ9EjExhl%2BfaEpAAAAAGgq2CgAAAAB",
Site.jp: "hLIxmKRriLNZAn1JzZapY3HvPn42ymFTSMsDTQg5HAfBAAAAAGgkcnQAAAAB",
}
style_snap_cookie = {
Site.de: "session-id=262-3255866-8161667; i18n-prefs=EUR; lc-acbde=de_DE; ubid-acbde=260-9608208-7094621; session-id-time=2082787201l; session-token=o/NZjNyInsXiiUgIboxXoXSRrYwl3rx9fAymAp4CCwf6WdcuRNAUQCEFg47k8VXTC3KSYw6fBMBgM6tXUIwbHjsorhAdhwCurKbypoC3lgoZMkf00U5I1kSmNqc3HLpjse4Ymn69X/0QdC85JsfK71ik4AxLCvPas/xOzW4eqxBAyX+pV3wApnfeZnIphELnlFDSr9mrQ4OK0sJaveHFbCQyPsCsOqEbgqokVxqZuo1PHbbNtk8kmY5+O4FGZZ1Rd29/WpI325eY46Khf3mKdQsUvmHJ9Qdjti0OHBQXqG/2Uas2LyCi0cKyybMJPn7LqzFS0ro6MZ3ufRXA2ZbdRlzCu/imptOn; rxc=AM5BPAyBERNPU5AAzsA; csm-hit=tb:s-60X7Y7EP7NMPAY5A4EB8|1747204893470&t:1747204894017&adb:adblk_no",
Site.it: "session-id=259-0276938-7980111; i18n-prefs=EUR; ubid-acbit=261-6163436-4078364; session-id-time=2082787201l; session-token=E/w0330BjSmKZBECYTfHbZ7yEy7I4AKBdWL/m8ZX+tL2cAR7kPoLrFNx8S5dx16pTkHdMKW09HLjWeQeNz7llVEHqn172fHwxDcS7pazkXJgNRSRLDsLKfhf37omIg1wiBFLLlF/Ze/7Y9UHiodrU+0Ii0BZB6fXmpuia1MfYZVIhc0tFhAo8YFtCB9tyGZo0HTV11uIxxlm7fWf8o750ibLu1P2XupDWM1XEWpTkioZfgwU/HDDA7VO3nteiy42ajAiVqg+Mp6qA4qnaem9nNx+7TQwOEfhY1QvoMVtrfiZydc/m5wAij9/Cy8vfkSSxrtRXiwMwMI2lkzTSyfIzd0KLLJ+VdrT; csm-hit=tb:s-WXX22RF20QVB7J6QY6GK|1747208604822&t:1747208605107&adb:adblk_no",
Site.fr: "session-id=259-3895263-6553629; session-id-time=2082787201l; i18n-prefs=EUR; ubid-acbfr=259-4408157-0120414; session-token=eUYnvkQ5sAKaXqveut1y8oduGNiOD2EjeL69ox8B8WMis+WxwXvxOxl6YG3/Vbmxua74nu6Po4SqX7jb2zWEN0MafCL2vNWuqH8EtzWGxf7b956vhl+Zke6znbzQIGc0TJvEHQxYH4qhU2nhjMewvfEKaNpLin+BlSteJ/29YPTAmJoDmn1SBQFmLS1F+lwvJslCVl2tE5wPQtlRJRoy1J3Y3PyilY/QeSeOoQm0kspCKVwiIZ0Rker6IoiJ994xQp9mBMPgf7Kaxq+3/4WcJH0gUgDMTEJ422JCvVyUsPlIqoB87OnL9svvThxe2rw3+G3Pl7RGcfsrgTNxMLajeryo0c4gDsCA; csm-hit=tb:s-MVXSWSB18MKR7M8ZGZPG|1747214963697&t:1747214964546&adb:adblk_no",
Site.es: "session-id=257-1877650-6628461; session-id-time=2082787201l; i18n-prefs=EUR; ubid-acbes=257-0235449-8488852; session-token=4VefymsDENsTHhmV3VThQL9xtbG25qqF0ue7hTQAuej9Hwcb8gteCVV4dDQ8BzXeSdcE1qxvGNxI5sjDddBC4GwnfXxNjPW4KKREHTXl4AcOnIBB0NsAP8ysdlrGmnF2i6tZaPQQmujRiQkE6W6541cqvo2awOmApuIAzLb/yTYBIwJdC7DAbRSAUjBpCb/Par21qrO4i6Vz/yzK9jvAZdmWNIHE6aFN6P0gNMnz8ubsQwqHzEhCqDZDzd2/82dB7ZkztnblwWdLTf11HIWSVcwY0Yrt/3Dva64m73AGG9ehiXhZVZAo9nBai5dRi1Bw21HZm9LicAtLlbc51P5nskJuXBVBUqHT; csm-hit=tb:s-6TH1ZWPZ3MSWA7C2FNME|1747638315404&t:1747638315630&adb:adblk_no",
Site.jp: "session-id=356-1153481-7631550; session-id-time=2082787201l; i18n-prefs=JPY; lc-acbjp=zh_CN; ubid-acbjp=356-3715403-6686010; session-token=\"UPpZlDoDBS1AIP3L68R2R+yjb0KoWvefyp2stmNqnQdqPJSW03v3MGhH9DhlM1a14QVzBoJY6/sys0OcBcFD+edCQDl9W1FPUeMbExBr4xxrsaPHcUWQ/pciyQ6+tXq1A4mn42SoZgYU1qf9Sz3P2+gz5NPfj8R82NFt5oL3s61DpWWDeLLXLdWIzHHyAq6Szi19aVK9GnhDuMu67f/vvi/ecgaeRgFumFtFmOCjAD3zTkumKFvJnKyEEHUzS3lye1TOx/bB6mEtPVHxlo3R4XIgeeDQeaAkjLot9ksW9GzRhe9xtdUHTPHRI9P8oKy2PMWyrqu7yH5AhIj0uYC/XUtYoom6Z85Qr5J8RxoeQnY=\"; csm-hit=tb:s-VE4YKH0HM7B9R7F0G6GC|1747219065639&t:1747219065883&adb:adblk_no",
}
url1 = f"https://www.amazon.{site}/dp/{asin}?th=1"
key1 = "task-product-detail"
url2 = f"https://www.amazon.{site}/dp/{asin}"
key2 = "task-monitoring"
key3 = "task-search"
task1 = json.dumps(
{
"url": f"https://www.amazon.{site}/s?k=Adhesive+Shower+Caddy&page=2&xpid=oznV7KayXAf7P&qid=1747215325&ref=sr_pg_2",
# "url": f"https://www.amazon.{site}/s?k=Adhesive+Shower+Caddy&i=garden&page=2&crid=1MR2IOB4BMAXF&qid=1739174661&sprefix=adhesive+shower+caddy%2Cgarden%2C391&xpid=--1JOuYBktM8-&ref=sr_pg_2",
"admin_users_id": 1,
"task_id": 37,
"collection_number": 0,
"callback_type": 8,
"app_name": "admin",
"platform_type": 6,
}
)
task2 = json.dumps(
{
"url": f"https://www.amazon.{site}/+style_snap_upload", # 虚构路径
"admin_users_id": 1,
"task_id": 37,
"collection_number": 0,
"callback_type": 8,
"app_name": "admin",
"platform_type": 6,
"style_snap_token": style_snap_token.get(site),
"style_snap_cookie": style_snap_cookie.get(site),
}
)
all_tasks = [
['{"id": 252, "admin_users_id": 1, "log_id": 11, "platform_type": 1, "callback_type": 9, "app_name": "admin", "max_sku_number": 35, "project": "temu", "url": "%s"}'%url1],
['{"url":"%s","item_id":"27","admin_users_id":1,"callback_type":5,"collection_type":1,"app_name":"admin","cache_time":60}'%url2],
[task1], [task2]
]
keys = [key1, key2, key3, key3]
items = list(zip(keys, all_tasks))
items = [items[1]]
for key, tasks in items:
# tasks = None
assign_spider_task.apply_async(args=(key, tasks,)).get()
def test_start_task(celery_worker):
# start_dial_task.delay().get()
start_spider_task.delay().get()
if __name__ == "__main__":
# pytest.main(["./"])
# pytest.main(["test/test_celery.py::test_start_task"])
pytest.main(["test/test_celery.py::test_assign_task"])
import json
import pytest
import redis
from conf import config
from const import Site, Spider
DB_REDIS = redis.Redis.from_url(config["redis"]["url"], decode_responses=True)
def test_spider_task():
asin = "B0CMTQFXB8"
for _ in range(1):
DB_REDIS.lpush(
config[Spider.detail]["task_key"],
# json.dumps({"url": f"https://www.amazon.{Site.de}/dp/{asin}"}),
json.dumps({'id': 2061071844, 'admin_users_id': 0, 'log_id': 8252798, 'platform_type': 6, 'callback_type': 9, 'app_name': 'admin', 'max_sku_number': 35, 'url': 'https://www.amazon.de/dp/B01MYBVW76', 'project': 'tiktok'})
)
if __name__ == "__main__":
pytest.main(["test/test_task.py::test_spider_task"])
from __future__ import annotations
import json
import random
import re
from datetime import datetime, timedelta
from babel.dates import get_month_names, get_day_names
from curl_cffi import requests
from curl_cffi.requests import AsyncSession
from dateutil import parser
from dateutil.relativedelta import relativedelta
from loguru import logger
from lxml import etree
from conf import config
from const import Lang, StockStatus
from const import SiteType
DOMAIN = config["app"]["domain"]
COOKIE = config["cookie"]
IMPERSONATE = config["app"]["impersonate"]
class Task:
def __init__(self, redis_singleton):
self.redis_singleton = redis_singleton
def get_task(self, task_key: str = "", batch_size: int = 10):
"""
获取任务
:param task_key:
:param batch_size:
:return:
"""
redis_client = self.redis_singleton.get_connection()
with redis_client.pipeline() as pipe:
pipe.lrange(task_key, 0, batch_size - 1)
pipe.ltrim(task_key, batch_size, -1)
datas, _ = pipe.execute()
return datas
def get_task_number(self, task_key: str = ""):
"""
获取任务数量
:param task_key:
:return:
"""
redis_client = self.redis_singleton.get_connection()
number = redis_client.llen(task_key)
return number
def add_task(self, task: dict, task_key: str):
"""
添加任务
:param task:
:param task_key:
:return:
"""
redis_client = self.redis_singleton.get_connection()
task["retry"] = task.get("retry", 0) + 1
if task_key == "task:amazon:monitoring":
only = [
"url",
"item_id",
"admin_users_id",
"callback_type",
"collection_type",
"app_name",
"step_value",
"is_first",
"retry",
"cache_time",
]
task = {key: task[key] for key in only if key in task}
redis_client.lpush(task_key, json.dumps(task))
def callback_task(self, callback_key: str, result: dict):
"""
回调任务
:param callback_key:
:param result:
:return:
"""
redis = self.redis_singleton.get_connection()
redis.lpush(callback_key, json.dumps(result))
def get_cookie(self, site: str = "com"):
"""
获取cookie
:return:
"""
type = SiteType.__dict__.get(site)
if type is None:
return None
cookie_url = f"{DOMAIN}/api/collection/get-cookie?type={type}"
redis_client = self.redis_singleton.get_connection()
time_key = COOKIE["cookie_time_key"]
time_key = f"{time_key}:{site}"
is_exists = redis_client.get(time_key)
if is_exists:
return None
response = requests.get(cookie_url).json()
cookie = response.get("data", [])
if cookie:
logger.info(f"获取cookie成功")
cookie = cookie[0]
new_cookie = {
"cookie": cookie.get("cookie", ""),
"user-agent": cookie.get("user-agent", ""),
}
redis_client.set(f"cookie:{site}", json.dumps(new_cookie))
redis_client.set(time_key, 1, ex=60)
else:
redis_client.delete(time_key)
return cookie
def get_loca_cookie(self, site: str = "com"):
"""
获取本地cookie
:return:
"""
redis_client = self.redis_singleton.get_connection()
cookie = redis_client.get(f"cookie:{site}")
if not cookie:
cookie = self.get_cookie(site)
if isinstance(cookie, dict):
return cookie
return json.loads(cookie)
class Request:
@staticmethod
def request_html(url: str = "", proxy: str = None, **kwargs: dict):
"""
请求html
:param proxy:
:param url:
:return:
"""
with AsyncSession(max_clients=50) as s:
# 清空 请求的值
s.headers.clear()
s.cookies.clear()
s.proxies.clear()
proxies = {
"https": proxy,
"http": proxy,
}
headers = kwargs.get("headers", {})
timeout = kwargs.get("timeout", 10)
postcode = kwargs.get("postcode", "20001")
headers["accept-language"] = (
"zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6"
)
is_check_postal = kwargs.get("is_check_postal", True)
response = s.get(
url,
headers=headers,
proxies=proxies,
timeout=timeout,
impersonate=IMPERSONATE,
)
if response.status_code != 200 and response.status_code != 404:
raise Exception(f"状态码错误: {response.status_code}, url:{url}")
text = response.text
html = etree.HTML(text)
captcha_image = html.xpath('//div[@class="a-row a-text-center"]/img/@src')
if len(captcha_image):
raise Exception("出现验证码")
if response.status_code == 200 and is_check_postal:
postal = html.xpath('//span[@id="glow-ingress-line2"]/text()')
postal = postal[0].strip() if len(postal) else ""
if not postal or postcode not in postal:
raise Exception("采集邮编错误")
is_product_detail = kwargs.get("is_product_detail", None)
is_link_error = html.xpath('//div[@id="g"]/a/@href')
title = Tool.get_title(html)
if len(is_link_error) == 0 and len(title) == 0 and is_product_detail:
raise Exception(f"采集内容有误")
return text
class Tool:
@staticmethod
def get_impersonate():
"""
获取伪装头
:return:
"""
impersonates = [
"chrome131",
"chrome124",
"chrome123",
"chrome120",
]
return random.choice(impersonates)
@staticmethod
def get_title(html):
"""
获取标题
:param html:
:return:
"""
title = html.xpath('//span[@id="productTitle"]/text()')
if len(title) == 0:
title = html.xpath('//span[@id="bond-title-desktop"]/text()')
return title
class Proxy:
@staticmethod
def get_zhan_proxies():
"""
获取代理
:return:
"""
proxies_url = f"{DOMAIN}/api/proxies/all?sign=ftd*kcm.ygh4mjp7ERJ"
proxies = requests.get(proxies_url).json()
ip = random.choice(proxies)
if not ip:
raise Exception(f"获取代理失败")
if "status" in ip:
return None
return ip
class Fmt:
@staticmethod
def _get_parserinfo(lang=Lang.com):
if lang == Lang.com:
return parser.parserinfo()
months = get_month_names("wide", locale=lang)
abbrev_months = get_month_names("abbreviated", locale=lang)
days = get_day_names("wide", locale=lang)
abbrev_days = get_day_names("abbreviated", locale=lang)
class TmpParserinfo(parser.parserinfo):
WEEKDAYS = [
(abbrev.lower(), full.lower())
for abbrev, full in zip(abbrev_days.values(), days.values())
]
MONTHS = [
(abbrev.lower(), full.lower())
for abbrev, full in zip(abbrev_months.values(), months.values())
]
return TmpParserinfo()
@staticmethod
def parse_date(string: str, lang=Lang.com):
"""解析日期
:param string:
:param lang:
:return:
"""
if not string:
return ""
elif "Today" in string:
dt = datetime.now()
elif "Tomorrow" in string:
dt = datetime.now() + timedelta(days=1)
else:
patt1 = re.compile(r"([\w\s]+)-([\w\s]+)")
patt2 = re.compile(r"(.*?)(\d+\D*-)(\D*\d+.*)")
if patt1.match(string):
string = patt1.match(string).group(2)
elif patt2.match(string):
string = patt2.sub(r"\1\3", string)
dt = parser.parse(string, parserinfo=Fmt._get_parserinfo(lang), fuzzy=True)
if dt.month < datetime.now().month:
dt = dt + relativedelta(years=1)
date = dt.strftime("%Y-%m-%d")
logger.debug(f"{string} -> {date}")
return date
@staticmethod
def parse_status(string: str, stock_status=StockStatus.com):
"""解析库存状态
:param string: _description_
:param stock_status: _description_, defaults to StockStatus.com
:return: _description_
"""
if not string:
return ""
string = string.strip().lower()
return "In Stock" if stock_status.lower() in string else "Only"
@staticmethod
def parse_price(string: str):
"""解析价格
:param string: _description_
:return: _description_
"""
if not string:
return float("nan")
return float(re.sub(r"[^\d.]", "", string))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment