Commit d260882e by yexing

u

parent 58d033ad
__pycache__
.vscode
celerybeat-*
.pytest_cache
log
pid
image
.idea
config
tmp
from time import sleep
import asyncio
import json
import random
import re
import time
import traceback
from typing import Self
import requests
from DrissionPage import Chromium
from DrissionPage.common import Settings
from DrissionPage.items import WebPageTab
from loguru import logger
from redis.asyncio import from_url
class Data:
@classmethod
def items(cls):
return (
(k, v)
for k, v in cls.__dict__.items()
if isinstance(v, (str, int)) and not k.startswith("__")
)
@classmethod
def values(cls):
return (v for _, v in cls.items())
@classmethod
def zip(cls, other: Self):
return ((v, getattr(other, k)) for k, v in cls.items())
@classmethod
def inverse_dict(cls):
return {v: k for k, v in cls.items()}
class Postcode(Data):
com = "20001"
de = "55545"
it = "66040"
fr = "75000"
es = "04810"
jp = "496-0805"
class Site(Data):
com = "com"
# 德意法西日本
de = "de"
it = "it"
fr = "fr"
es = "es"
jp = "co.jp"
IS_DEBUG = False
DOMAIN = "https://20tools.net"
redis_config = {
"url": "redis://:a123456,a@localhost:6379/10",
"max_connections": 300
}
cookie_config = {
"cookie_time_key": "cookie_expired_time"
}
UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
Settings.set_raise_when_wait_failed(False)
class RedisSingleton:
_redis_pool = None
def __init__(self, redis_url=None):
self.redis_url = redis_url
def get_connection(self):
if self._redis_pool is None:
if self.redis_url:
self._redis_pool = from_url(self.redis_url, decode_responses=True)
else:
# 默认连接地址
self._redis_pool = from_url('redis://localhost', decode_responses=True)
return self._redis_pool
class SiteType(Data):
com = 1
de = 2
def callback(param):
"""
回调接口
"""
requests.post(
f"{DOMAIN}/api/collection/cookie",
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
data=json.dumps(param),
)
def refresh_local_cookie(data: dict, site: str = "com"):
"""
刷新本地cookie
"""
redis = redis_singleton.get_connection()
redis.set(f"cookie:{site}", json.dumps(data))
def get_cookie_error():
"""
获取cookie错误
:return:
"""
redis = redis_singleton.get_connection()
return redis.get("amazon:cookie-error")
def delete_cookie_error():
"""
删除cookie错误
:return:
"""
redis = redis_singleton.get_connection()
return redis.delete("amazon:cookie-error")
def input_postcode(
tab: WebPageTab, postcode: str, locator: str = "#GLUXZipUpdateInput"
):
location_input = tab.ele(locator, timeout=3)
if location_input is None:
raise Exception("未找到输入框")
location_input.input(postcode)
sleep(1)
def get_cookie(tab: WebPageTab, site_type: int = 1):
"""
获取cookie
"""
cookie = tab.cookies().as_str()
content = tab.html
token = re.findall(r"name=\"stylesnap\" value=\"(.*?)\">", content)
response = {
"cookie": cookie,
"token": token[0] if token else "",
"user-agent": UA,
"time": int(time.time()),
}
logger.info(f"获取到cookie: {json.dumps(response)}")
callback({"type": site_type, "data": response})
return cookie
def run(site: str = "com", postcode: str = "20001", site_type: int = 1):
def _close():
cookie = get_cookie(tab, site_type)
if IS_DEBUG:
refresh_local_cookie({"cookie": cookie, "user-agent": UA}, site=site)
chromium.clear_cache()
chromium.quit()
delete_cookie_error()
if not IS_DEBUG:
number = get_cookie_error()
number = int(number) if number else 0
if number < 50:
logger.success("Cookie正常")
return
logger.error("Cookie异常")
chromium = Chromium()
tab = chromium.latest_tab
try:
# &currency=JPY
tab.get(f"https://www.amazon.{site}/stylesnap?language=en_GB")
# 判断邮编是否正确
line = tab.ele("#glow-ingress-line2", timeout=3)
nav = tab.ele("#icp-nav-flyout", timeout=3)
if line and nav:
postal, lang = line.text, nav.text
if (not postal or postcode not in postal) or (not lang or not "EN" not in lang):
logger.info("邮编或语言错误, 开始设置邮编和语言")
else:
logger.info("邮编和语言正确")
_close()
return
location = tab.ele("#nav-global-location-popover-link", timeout=3)
if not location:
raise Exception("没有进入正确页面")
else:
location.click()
postcode_parts = postcode.split("-")
if len(postcode_parts) == 2:
input_postcode(tab, postcode_parts[0], "#GLUXZipUpdateInput_0")
input_postcode(tab, postcode_parts[1], "#GLUXZipUpdateInput_1")
else:
input_postcode(tab, postcode)
locs = [
"#GLUXZipUpdate",
'xpath://*[@id="a-popover-1"]/div/header/button',
'xpath://*[@id="icp-nav-flyout"]/button',
"@text()=English",
]
for loc in locs:
ele = tab.ele(loc, timeout=3)
if not ele:
raise ValueError("元素定位错误")
ele.wait.clickable(timeout=3, raise_err=False).click()
tab.wait(2)
_close()
except Exception as e:
logger.error(e)
def main():
if IS_DEBUG:
items = random.choices(list(Site.zip(Postcode)))
else:
items = Site.zip(Postcode)
for site, postcode in items:
site_type = SiteType.__dict__.get(site)
if site_type is None:
continue
logger.info(f"开始获取cookie: {site} {postcode}")
run(site, postcode)
sleep(10)
if IS_DEBUG:
exit()
if __name__ == "__main__":
while True:
try:
redis_singleton = RedisSingleton(redis_url=redis_config["url"])
asyncio.run(main())
except:
traceback.print_exc()
import asyncio
import json
import random
import re
import time
import traceback
from typing import Self
import requests
from DrissionPage import Chromium
from DrissionPage.common import Settings
from DrissionPage.items import WebPageTab
from loguru import logger
from redis.asyncio import from_url
class Data:
@classmethod
def items(cls):
return (
(k, v)
for k, v in cls.__dict__.items()
if isinstance(v, (str, int)) and not k.startswith("__")
)
@classmethod
def values(cls):
return (v for _, v in cls.items())
@classmethod
def zip(cls, other: Self):
return ((v, getattr(other, k)) for k, v in cls.items())
@classmethod
def inverse_dict(cls):
return {v: k for k, v in cls.items()}
class Postcode(Data):
com = "20001"
de = "55545"
it = "66040"
fr = "75000"
es = "04810"
jp = "496-0805"
class Site(Data):
com = "com"
# 德意法西日本
de = "de"
it = "it"
fr = "fr"
es = "es"
jp = "co.jp"
IS_DEBUG = False
DOMAIN = "https://20tools.net"
redis_config = {
"url": "redis://:a123456,a@localhost:6379/10",
"max_connections": 300
}
cookie_config = {
"cookie_time_key": "cookie_expired_time"
}
UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
Settings.set_raise_when_wait_failed(False)
class RedisSingleton:
_redis_pool = None
def __init__(self, redis_url=None):
self.redis_url = redis_url
async def get_connection(self):
if self._redis_pool is None:
if self.redis_url:
self._redis_pool = await from_url(self.redis_url, decode_responses=True)
else:
# 默认连接地址
self._redis_pool = await from_url('redis://localhost', decode_responses=True)
return self._redis_pool
class SiteType(Data):
com = 1
de = 2
async def callback(param):
"""
回调接口
"""
requests.post(
f"{DOMAIN}/api/collection/cookie",
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
data=json.dumps(param),
)
async def refresh_local_cookie(data: dict, site: str = "com"):
"""
刷新本地cookie
"""
redis = await redis_singleton.get_connection()
await redis.set(f"cookie:{site}", json.dumps(data))
async def get_cookie_error():
"""
获取cookie错误
:return:
"""
redis = await redis_singleton.get_connection()
return await redis.get("amazon:cookie-error")
async def delete_cookie_error():
"""
删除cookie错误
:return:
"""
redis = await redis_singleton.get_connection()
return await redis.delete("amazon:cookie-error")
async def input_postcode(
tab: WebPageTab, postcode: str, locator: str = "#GLUXZipUpdateInput"
):
location_input = tab.ele(locator, timeout=3)
if location_input is None:
raise Exception("未找到输入框")
location_input.input(postcode)
await asyncio.sleep(1)
async def get_cookie(tab: WebPageTab, site_type: int = 1):
"""
获取cookie
"""
cookie = tab.cookies().as_str()
content = tab.html
token = re.findall(r"name=\"stylesnap\" value=\"(.*?)\">", content)
response = {
"cookie": cookie,
"token": token[0] if token else "",
"user-agent": UA,
"time": int(time.time()),
}
logger.info(f"获取到cookie: {json.dumps(response)}")
await callback({"type": site_type, "data": response})
return cookie
async def run(site: str = "com", postcode: str = "20001", site_type: int = 1):
async def _close():
cookie = await get_cookie(tab, site_type)
if IS_DEBUG:
await refresh_local_cookie({"cookie": cookie, "user-agent": UA}, site=site)
chromium.clear_cache()
chromium.quit()
await delete_cookie_error()
if not IS_DEBUG:
number = await get_cookie_error()
number = int(number) if number else 0
if number < 50:
logger.success("Cookie正常")
return
logger.error("Cookie异常")
chromium = Chromium()
tab = chromium.latest_tab
try:
# &currency=JPY
tab.get(f"https://www.amazon.{site}/stylesnap?language=en_GB")
# 判断邮编是否正确
line = tab.ele("#glow-ingress-line2", timeout=3)
nav = tab.ele("#icp-nav-flyout", timeout=3)
if line and nav:
postal, lang = line.text, nav.text
if (not postal or postcode not in postal) or (not lang or not "EN" not in lang):
logger.info("邮编或语言错误, 开始设置邮编和语言")
else:
logger.info("邮编和语言正确")
await _close()
return
location = tab.ele("#nav-global-location-popover-link", timeout=3)
if not location:
raise Exception("没有进入正确页面")
else:
location.click()
postcode_parts = postcode.split("-")
if len(postcode_parts) == 2:
await input_postcode(tab, postcode_parts[0], "#GLUXZipUpdateInput_0")
await input_postcode(tab, postcode_parts[1], "#GLUXZipUpdateInput_1")
else:
await input_postcode(tab, postcode)
locs = [
"#GLUXZipUpdate",
'xpath://*[@id="a-popover-1"]/div/header/button',
'xpath://*[@id="icp-nav-flyout"]/button',
"@text()=English",
]
for loc in locs:
ele = tab.ele(loc, timeout=3)
if not ele:
raise ValueError("元素定位错误")
ele.wait.clickable(timeout=3, raise_err=False).click()
tab.wait(2)
await _close()
except Exception as e:
logger.error(e)
async def main():
if IS_DEBUG:
items = random.choices(list(Site.zip(Postcode)))
else:
items = Site.zip(Postcode)
for site, postcode in items:
site_type = SiteType.__dict__.get(site)
if site_type is None:
continue
logger.info(f"开始获取cookie: {site} {postcode}")
await run(site, postcode)
await asyncio.sleep(10)
if IS_DEBUG:
exit()
if __name__ == "__main__":
while True:
try:
redis_singleton = RedisSingleton(redis_url=redis_config["url"])
asyncio.run(main())
except:
traceback.print_exc()
import json
import os
import threading
import time
import redis
from curl_cffi import requests
from loguru import logger
from conf import config
COLL_DOMAIN = config['app']['coll_domain']
_redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True)
task_monitoring_config = config['task-monitoring']
task_search_config = config['task-search']
task_product_detail_config = config['task-product-detail']
DEFAULT_HEADER = {
"Content-Type": "application/json",
"Accept": "application/json",
}
def task_callback(data):
"""
回调接口
"""
try:
url = f"{COLL_DOMAIN}/api/collection/task"
response = requests.post(url, headers=DEFAULT_HEADER, data=json.dumps(data), verify=False)
data = response.json()
if data["code"] == 0:
return True
else:
return False
except Exception as e:
logger.error(f"回调异常 : {e}")
return False
def batch_callback(callback_key: str):
thread = []
number = _redis_db.llen(callback_key)
logger.info(f"回调 {callback_key},共有{number}个任务")
for _ in range(10):
data = _redis_db.lpop(callback_key)
if data:
result = json.loads(data)
thread.append(threading.Thread(target=task_callback, args=(result,)))
else:
break
for t in thread:
t.start()
for t in thread:
t.join()
def callback_task(callback_key: str):
"""
回调任务
:param callback_key:
:return:
"""
task_number = 500
result = []
try:
if callback_key == task_monitoring_config.get('item_key'):
for _ in range(task_number):
data = _redis_db.lpop(callback_key)
if data:
result.append(json.loads(data))
else:
break
if result:
logger.info(f"回调 {callback_key},共有{len(result)}个任务")
logger.info(f"回调: result: {json.dumps(result)}")
callback = {
"data": {
"error_items": [],
"collection": result,
},
"type": 5,
}
task_callback(callback)
elif callback_key == task_product_detail_config.get('item_key') or callback_key == task_search_config.get('item_key'):
batch_callback(callback_key)
except:
logger.error(f"回调异常")
def run(task_config: dict = task_monitoring_config):
CALLBACK_PID_FILE = "./pid/callback.pid"
while True:
if not os.path.exists(CALLBACK_PID_FILE):
logger.error('任务退出')
break
try:
callback_key = task_config.get('item_key')
callback_task(callback_key)
logger.info(f"回调 {callback_key} 完成")
except Exception as e:
logger.error(f"任务异常 : {e}")
callback_sleep_time = int(task_config.get('callback_sleep_time', 5))
time.sleep(callback_sleep_time)
if __name__ == '__main__':
tasks = []
PID_FILES = [
"monitoring.pid",
"product_detail.pid",
"search.pid",
"callback.pid",
]
for PID_FILE in PID_FILES:
with open(f"./pid/{PID_FILE}", 'w') as f:
f.write(str(os.getpid()))
if task_monitoring_config.get('enabled', None) == 'True':
logger.info(f"采集任务回调启动")
t = threading.Thread(target=run, args=(task_monitoring_config,))
tasks.append(t)
if task_product_detail_config.get('enabled', None) == 'True':
logger.info(f"商品发布回调启动")
t = threading.Thread(target=run, args=(task_product_detail_config,))
tasks.append(t)
if task_search_config.get('enabled', None) == 'True':
logger.info(f"搜索回调启动")
t = threading.Thread(target=run, args=(task_search_config,))
tasks.append(t)
for t in tasks:
t.start()
for t in tasks:
t.join()
import json
import os
import threading
import time
import redis
from curl_cffi import requests
from loguru import logger
from conf import config
COLL_DOMAIN = config['app']['coll_domain']
_redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True)
task_monitoring_config = config['task-monitoring']
task_search_config = config['task-search']
task_product_detail_config = config['task-product-detail']
DEFAULT_HEADER = {
"Content-Type": "application/json",
"Accept": "application/json",
}
def task_callback(data):
"""
回调接口
"""
try:
url = f"{COLL_DOMAIN}/api/collection/task"
response = requests.post(url, headers=DEFAULT_HEADER, data=json.dumps(data), verify=False)
data = response.json()
if data["code"] == 0:
return True
else:
return False
except Exception as e:
logger.error(f"回调异常 : {e}")
return False
def batch_callback(callback_key: str):
thread = []
number = _redis_db.llen(callback_key)
logger.info(f"回调 {callback_key},共有{number}个任务")
for _ in range(10):
data = _redis_db.lpop(callback_key)
if data:
result = json.loads(data)
thread.append(threading.Thread(target=task_callback, args=(result,)))
else:
break
for t in thread:
t.start()
for t in thread:
t.join()
def callback_task(callback_key: str):
"""
回调任务
:param callback_key:
:return:
"""
task_number = 500
result = []
try:
if callback_key == task_monitoring_config.get('item_key'):
for _ in range(task_number):
data = _redis_db.lpop(callback_key)
if data:
result.append(json.loads(data))
else:
break
if result:
logger.info(f"回调 {callback_key},共有{len(result)}个任务")
logger.info(f"回调: result: {json.dumps(result)}")
callback = {
"data": {
"error_items": [],
"collection": result,
},
"type": 5,
}
task_callback(callback)
elif callback_key == task_product_detail_config.get('item_key') or callback_key == task_search_config.get('item_key'):
batch_callback(callback_key)
except:
logger.error(f"回调异常")
def run(task_config: dict = task_monitoring_config):
CALLBACK_PID_FILE = "./pid/callback.pid"
while True:
if not os.path.exists(CALLBACK_PID_FILE):
logger.error('任务退出')
break
try:
callback_key = task_config.get('item_key')
callback_task(callback_key)
logger.info(f"回调 {callback_key} 完成")
except Exception as e:
logger.error(f"任务异常 : {e}")
callback_sleep_time = int(task_config.get('callback_sleep_time', 5))
time.sleep(callback_sleep_time)
if __name__ == '__main__':
tasks = []
PID_FILES = [
"monitoring.pid",
"product_detail.pid",
"search.pid",
"callback.pid",
]
for PID_FILE in PID_FILES:
with open(f"./pid/{PID_FILE}", 'w') as f:
f.write(str(os.getpid()))
if task_monitoring_config.get('enabled', None) == 'True':
logger.info(f"采集任务回调启动")
t = threading.Thread(target=run, args=(task_monitoring_config,))
tasks.append(t)
if task_product_detail_config.get('enabled', None) == 'True':
logger.info(f"商品发布回调启动")
t = threading.Thread(target=run, args=(task_product_detail_config,))
tasks.append(t)
if task_search_config.get('enabled', None) == 'True':
logger.info(f"搜索回调启动")
t = threading.Thread(target=run, args=(task_search_config,))
tasks.append(t)
for t in tasks:
t.start()
for t in tasks:
t.join()
......@@ -33,7 +33,7 @@ app.conf.task_queues = (
)
app.conf.task_routes = {
"celery_tasks.detail_spider_task": {"queue": "detail"},
"celery_tasks.monitor_spider_task": {"queue": "detail"},
"celery_tasks.monitor_spider_task": {"queue": "monitor"},
"celery_tasks.search_spider_task": {"queue": "search"},
"celery_tasks.*_dial_task": {"queue": "dial"},
"celery_tasks.*": {"queue": "detail"},
......
# import aioredis
from redis.asyncio import from_url
from redis import from_url
class RedisSingleton:
......
from time import sleep
import asyncio
import html
import json
import random
......@@ -869,9 +868,7 @@ class Goods:
# 分批
if len(collection_skus) > 0:
for i in range(0, len(collection_skus), 8):
for response in asyncio.gather(
*collection_skus[i : i + 8]
):
for response in collection_skus[i : i + 8]:
try:
if response.get("brand"):
brand.append(response["brand"])
......@@ -1021,8 +1018,6 @@ class Goods:
tasks = [json.loads(task) for task in tasks]
for task in tasks:
queue.append(self.run(task))
if queue:
asyncio.gather(*queue)
logger.info(f"任务耗时: {time.time() - start_time}")
if self.is_debug:
......
from time import sleep
import asyncio
import json
import re
import time
......@@ -420,7 +419,7 @@ class Monitoring:
success_number = 0
logger.info(f"任务数: {len(queue)}")
if queue:
for items in asyncio.gather(*queue):
for items in queue:
success_number += 1
logger.info(f"任务耗时: {time.time() - start_time}, 成功数: {success_number}")
from time import sleep
import asyncio
import functools
import json
import os
......@@ -650,8 +649,6 @@ class Search:
tasks = [json.loads(task) for task in tasks]
for task in tasks:
queue.append(self.run(task))
if queue:
asyncio.gather(*queue)
logger.info(f"任务耗时: {time.time() - start_time}")
if self.is_debug:
......
from time import sleep
import asyncio
import json
import os
import threading
import time
import redis
from curl_cffi import requests
from loguru import logger
from conf import config
from const import Site
from db import RedisSingleton
from tool import Task
COLL_DOMAIN = config['app']['coll_domain']
_redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True)
task_monitoring_config = config['task-monitoring']
task_search_config = config['task-search']
task_product_detail_config = config['task-product-detail']
redis_config = config['redis']
cookie_config = config['cookie']
DEFAULT_HEADER = {
"Content-Type": "application/json",
"Accept": "application/json",
}
PID_FILE = './pid/task.pid'
def get_task(task_key: str = task_monitoring_config['queue_key'], number: int = 1):
"""
获取任务
:param task_key:
:param number:
:return:
"""
try:
url = f"{COLL_DOMAIN}/api/collection/task?number={number}&queue={task_key}"
response = requests.get(url, headers=DEFAULT_HEADER, verify=False)
response = response.json()
if response["code"] == 0:
return response["data"]
else:
return {}
except Exception as e:
logger.error(f"获取任务异常 : {e}")
return {}
def add_task(task_key: str, redis_key: str, task_number: int):
"""
添加任务
:param task_key:
:param redis_key:
:param task_number:
:return:
"""
items = get_task(task_key, task_number)
task_lists = items.get('list', [])
if task_lists:
logger.info(f"{task_key} - {len(task_lists)}个任务加入队列")
for item in task_lists:
_redis_db.lpush(redis_key, json.dumps(item))
def run(task_config: dict = task_monitoring_config):
while True:
if not os.path.exists(PID_FILE):
logger.error('任务退出')
break
add_task_enabled = task_config.get('add_task_enabled')
task_number = int(task_config.get('number'))
task_min_number = int(task_config.get('min_number'))
redis_key = task_config.get('task_key')
todo_task_count = _redis_db.llen(redis_key)
task_key = task_config.get('queue_key')
try:
task_keys = task_key.split(',')
logger.info(f"{redis_key}任务队列长度 : {todo_task_count}")
if todo_task_count <= task_min_number and add_task_enabled == 'True':
for key in task_keys:
add_task(key, redis_key, task_number)
except KeyboardInterrupt:
logger.error(f"任务终止")
except Exception as e:
logger.error(f"任务异常 : {redis_key} : {e}")
time.sleep(5)
def cookie():
for site in Site.values():
time_key = cookie_config['cookie_time_key']
time_key = f"{time_key}:{site}"
_redis_db.delete(time_key)
while True:
if not os.path.exists(PID_FILE):
logger.error('任务退出')
break
logger.info(f"获取cookie")
for site in Site.values():
try:
task_manager.get_cookie(site)
except:
logger.error(f"获取cookie异常")
sleep(5)
if __name__ == '__main__':
tasks = []
redis_singleton = RedisSingleton(redis_url=redis_config['url'])
task_manager = Task(redis_singleton)
with open(PID_FILE, 'w') as f:
f.write(str(os.getpid()))
if task_monitoring_config.get('enabled', None) == 'True':
logger.info(f"监控任务添加")
t = threading.Thread(target=run, args=(task_monitoring_config,))
tasks.append(t)
else:
logger.info(f"监控任务未启动")
if task_product_detail_config.get('enabled', None) == 'True':
logger.info(f"发布任务添加")
t = threading.Thread(target=run, args=(task_product_detail_config,))
tasks.append(t)
else:
logger.info(f"发布任务未启动")
if task_search_config.get('enabled', None) == 'True':
logger.info(f"搜索任务添加")
t = threading.Thread(target=run, args=(task_search_config,))
tasks.append(t)
else:
logger.info(f"搜索任务未启动")
t = threading.Thread(target=asyncio.run, args=(cookie(),))
tasks.append(t)
for t in tasks:
t.start()
for t in tasks:
t.join()
import asyncio
import json
import os
import threading
import time
import redis
from curl_cffi import requests
from loguru import logger
from conf import config
from const import Site
from db import RedisSingleton
from tool import Task
COLL_DOMAIN = config['app']['coll_domain']
_redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True)
task_monitoring_config = config['task-monitoring']
task_search_config = config['task-search']
task_product_detail_config = config['task-product-detail']
redis_config = config['redis']
cookie_config = config['cookie']
DEFAULT_HEADER = {
"Content-Type": "application/json",
"Accept": "application/json",
}
PID_FILE = './pid/task.pid'
def get_task(task_key: str = task_monitoring_config['queue_key'], number: int = 1):
"""
获取任务
:param task_key:
:param number:
:return:
"""
try:
url = f"{COLL_DOMAIN}/api/collection/task?number={number}&queue={task_key}"
response = requests.get(url, headers=DEFAULT_HEADER, verify=False)
response = response.json()
if response["code"] == 0:
return response["data"]
else:
return {}
except Exception as e:
logger.error(f"获取任务异常 : {e}")
return {}
def add_task(task_key: str, redis_key: str, task_number: int):
"""
添加任务
:param task_key:
:param redis_key:
:param task_number:
:return:
"""
items = get_task(task_key, task_number)
task_lists = items.get('list', [])
if task_lists:
logger.info(f"{task_key} - {len(task_lists)}个任务加入队列")
for item in task_lists:
_redis_db.lpush(redis_key, json.dumps(item))
def run(task_config: dict = task_monitoring_config):
while True:
if not os.path.exists(PID_FILE):
logger.error('任务退出')
break
add_task_enabled = task_config.get('add_task_enabled')
task_number = int(task_config.get('number'))
task_min_number = int(task_config.get('min_number'))
redis_key = task_config.get('task_key')
todo_task_count = _redis_db.llen(redis_key)
task_key = task_config.get('queue_key')
try:
task_keys = task_key.split(',')
logger.info(f"{redis_key}任务队列长度 : {todo_task_count}")
if todo_task_count <= task_min_number and add_task_enabled == 'True':
for key in task_keys:
add_task(key, redis_key, task_number)
except KeyboardInterrupt:
logger.error(f"任务终止")
except Exception as e:
logger.error(f"任务异常 : {redis_key} : {e}")
time.sleep(5)
async def cookie():
for site in Site.values():
time_key = cookie_config['cookie_time_key']
time_key = f"{time_key}:{site}"
_redis_db.delete(time_key)
while True:
if not os.path.exists(PID_FILE):
logger.error('任务退出')
break
logger.info(f"获取cookie")
for site in Site.values():
try:
await task_manager.get_cookie(site)
except:
logger.error(f"获取cookie异常")
await asyncio.sleep(5)
if __name__ == '__main__':
tasks = []
redis_singleton = RedisSingleton(redis_url=redis_config['url'])
task_manager = Task(redis_singleton)
with open(PID_FILE, 'w') as f:
f.write(str(os.getpid()))
if task_monitoring_config.get('enabled', None) == 'True':
logger.info(f"监控任务添加")
t = threading.Thread(target=run, args=(task_monitoring_config,))
tasks.append(t)
else:
logger.info(f"监控任务未启动")
if task_product_detail_config.get('enabled', None) == 'True':
logger.info(f"发布任务添加")
t = threading.Thread(target=run, args=(task_product_detail_config,))
tasks.append(t)
else:
logger.info(f"发布任务未启动")
if task_search_config.get('enabled', None) == 'True':
logger.info(f"搜索任务添加")
t = threading.Thread(target=run, args=(task_search_config,))
tasks.append(t)
else:
logger.info(f"搜索任务未启动")
t = threading.Thread(target=asyncio.run, args=(cookie(),))
tasks.append(t)
for t in tasks:
t.start()
for t in tasks:
t.join()
......@@ -7,7 +7,7 @@ from datetime import datetime, timedelta
from babel.dates import get_month_names, get_day_names
from curl_cffi import requests
from curl_cffi.requests import AsyncSession
from curl_cffi.requests import Session
from dateutil import parser
from dateutil.relativedelta import relativedelta
from loguru import logger
......@@ -145,7 +145,7 @@ class Request:
:param url:
:return:
"""
with AsyncSession(max_clients=50) as s:
with Session() as s:
# 清空 请求的值
s.headers.clear()
s.cookies.clear()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment