Commit 1fb7b8ca by yexing
parents 767321a7 d260882e
__pycache__
.vscode
celerybeat-*
.pytest_cache
log
pid
image
.idea
config
tmp
import json import json
import os import os
import threading import threading
import time import time
import redis import redis
from curl_cffi import requests from curl_cffi import requests
from loguru import logger from loguru import logger
from conf import config from conf import config
COLL_DOMAIN = config['app']['coll_domain'] COLL_DOMAIN = config['app']['coll_domain']
_redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True) _redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True)
task_monitoring_config = config['task-monitoring'] task_monitoring_config = config['task-monitoring']
task_search_config = config['task-search'] task_search_config = config['task-search']
task_product_detail_config = config['task-product-detail'] task_product_detail_config = config['task-product-detail']
DEFAULT_HEADER = { DEFAULT_HEADER = {
"Content-Type": "application/json", "Content-Type": "application/json",
"Accept": "application/json", "Accept": "application/json",
} }
def task_callback(data): def task_callback(data):
""" """
回调接口 回调接口
""" """
try: try:
url = f"{COLL_DOMAIN}/api/collection/task" url = f"{COLL_DOMAIN}/api/collection/task"
response = requests.post(url, headers=DEFAULT_HEADER, data=json.dumps(data), verify=False) response = requests.post(url, headers=DEFAULT_HEADER, data=json.dumps(data), verify=False)
data = response.json() data = response.json()
if data["code"] == 0: if data["code"] == 0:
return True return True
else: else:
return False return False
except Exception as e: except Exception as e:
logger.error(f"回调异常 : {e}") logger.error(f"回调异常 : {e}")
return False return False
def batch_callback(callback_key: str): def batch_callback(callback_key: str):
thread = [] thread = []
number = _redis_db.llen(callback_key) number = _redis_db.llen(callback_key)
logger.info(f"回调 {callback_key},共有{number}个任务") logger.info(f"回调 {callback_key},共有{number}个任务")
for _ in range(10): for _ in range(10):
data = _redis_db.lpop(callback_key) data = _redis_db.lpop(callback_key)
if data: if data:
result = json.loads(data) result = json.loads(data)
thread.append(threading.Thread(target=task_callback, args=(result,))) thread.append(threading.Thread(target=task_callback, args=(result,)))
else: else:
break break
for t in thread: for t in thread:
t.start() t.start()
for t in thread: for t in thread:
t.join() t.join()
def callback_task(callback_key: str): def callback_task(callback_key: str):
""" """
回调任务 回调任务
:param callback_key: :param callback_key:
:return: :return:
""" """
task_number = 500 task_number = 500
result = [] result = []
try: try:
if callback_key == task_monitoring_config.get('item_key'): if callback_key == task_monitoring_config.get('item_key'):
for _ in range(task_number): for _ in range(task_number):
data = _redis_db.lpop(callback_key) data = _redis_db.lpop(callback_key)
if data: if data:
result.append(json.loads(data)) result.append(json.loads(data))
else: else:
break break
if result: if result:
logger.info(f"回调 {callback_key},共有{len(result)}个任务") logger.info(f"回调 {callback_key},共有{len(result)}个任务")
logger.info(f"回调: result: {json.dumps(result)}") logger.info(f"回调: result: {json.dumps(result)}")
callback = { callback = {
"data": { "data": {
"error_items": [], "error_items": [],
"collection": result, "collection": result,
}, },
"type": 5, "type": 5,
} }
task_callback(callback) task_callback(callback)
elif callback_key == task_product_detail_config.get('item_key') or callback_key == task_search_config.get('item_key'): elif callback_key == task_product_detail_config.get('item_key') or callback_key == task_search_config.get('item_key'):
batch_callback(callback_key) batch_callback(callback_key)
except: except:
logger.error(f"回调异常") logger.error(f"回调异常")
def run(task_config: dict = task_monitoring_config): def run(task_config: dict = task_monitoring_config):
CALLBACK_PID_FILE = "./pid/callback.pid" CALLBACK_PID_FILE = "./pid/callback.pid"
while True: while True:
if not os.path.exists(CALLBACK_PID_FILE): if not os.path.exists(CALLBACK_PID_FILE):
logger.error('任务退出') logger.error('任务退出')
break break
try: try:
callback_key = task_config.get('item_key') callback_key = task_config.get('item_key')
callback_task(callback_key) callback_task(callback_key)
logger.info(f"回调 {callback_key} 完成") logger.info(f"回调 {callback_key} 完成")
except Exception as e: except Exception as e:
logger.error(f"任务异常 : {e}") logger.error(f"任务异常 : {e}")
callback_sleep_time = int(task_config.get('callback_sleep_time', 5)) callback_sleep_time = int(task_config.get('callback_sleep_time', 5))
time.sleep(callback_sleep_time) time.sleep(callback_sleep_time)
if __name__ == '__main__': if __name__ == '__main__':
tasks = [] tasks = []
PID_FILES = [ PID_FILES = [
"monitoring.pid", "monitoring.pid",
"product_detail.pid", "product_detail.pid",
"search.pid", "search.pid",
"callback.pid", "callback.pid",
] ]
for PID_FILE in PID_FILES: for PID_FILE in PID_FILES:
with open(f"./pid/{PID_FILE}", 'w') as f: with open(f"./pid/{PID_FILE}", 'w') as f:
f.write(str(os.getpid())) f.write(str(os.getpid()))
if task_monitoring_config.get('enabled', None) == 'True': if task_monitoring_config.get('enabled', None) == 'True':
logger.info(f"采集任务回调启动") logger.info(f"采集任务回调启动")
t = threading.Thread(target=run, args=(task_monitoring_config,)) t = threading.Thread(target=run, args=(task_monitoring_config,))
tasks.append(t) tasks.append(t)
if task_product_detail_config.get('enabled', None) == 'True': if task_product_detail_config.get('enabled', None) == 'True':
logger.info(f"商品发布回调启动") logger.info(f"商品发布回调启动")
t = threading.Thread(target=run, args=(task_product_detail_config,)) t = threading.Thread(target=run, args=(task_product_detail_config,))
tasks.append(t) tasks.append(t)
if task_search_config.get('enabled', None) == 'True': if task_search_config.get('enabled', None) == 'True':
logger.info(f"搜索回调启动") logger.info(f"搜索回调启动")
t = threading.Thread(target=run, args=(task_search_config,)) t = threading.Thread(target=run, args=(task_search_config,))
tasks.append(t) tasks.append(t)
for t in tasks: for t in tasks:
t.start() t.start()
for t in tasks: for t in tasks:
t.join() t.join()
...@@ -33,7 +33,7 @@ app.conf.task_queues = ( ...@@ -33,7 +33,7 @@ app.conf.task_queues = (
) )
app.conf.task_routes = { app.conf.task_routes = {
"celery_tasks.detail_spider_task": {"queue": "detail"}, "celery_tasks.detail_spider_task": {"queue": "detail"},
"celery_tasks.monitor_spider_task": {"queue": "detail"}, "celery_tasks.monitor_spider_task": {"queue": "monitor"},
"celery_tasks.search_spider_task": {"queue": "search"}, "celery_tasks.search_spider_task": {"queue": "search"},
"celery_tasks.*_dial_task": {"queue": "dial"}, "celery_tasks.*_dial_task": {"queue": "dial"},
"celery_tasks.*": {"queue": "detail"}, "celery_tasks.*": {"queue": "detail"},
......
# import aioredis # import aioredis
from redis.asyncio import from_url from redis import from_url
class RedisSingleton: class RedisSingleton:
......
from time import sleep from time import sleep
import asyncio
import html import html
import json import json
import random import random
...@@ -868,9 +867,7 @@ class Goods: ...@@ -868,9 +867,7 @@ class Goods:
# 分批 # 分批
if len(collection_skus) > 0: if len(collection_skus) > 0:
for i in range(0, len(collection_skus), 8): for i in range(0, len(collection_skus), 8):
for response in asyncio.gather( for response in collection_skus[i : i + 8]:
*collection_skus[i : i + 8]
):
try: try:
if response.get("brand"): if response.get("brand"):
brand.append(response["brand"]) brand.append(response["brand"])
...@@ -1020,8 +1017,6 @@ class Goods: ...@@ -1020,8 +1017,6 @@ class Goods:
tasks = [json.loads(task) for task in tasks] tasks = [json.loads(task) for task in tasks]
for task in tasks: for task in tasks:
queue.append(self.run(task)) queue.append(self.run(task))
if queue:
asyncio.gather(*queue)
logger.info(f"任务耗时: {time.time() - start_time}") logger.info(f"任务耗时: {time.time() - start_time}")
if self.is_debug: if self.is_debug:
......
from time import sleep from time import sleep
import asyncio
import json import json
import re import re
import time import time
...@@ -420,7 +419,7 @@ class Monitoring: ...@@ -420,7 +419,7 @@ class Monitoring:
success_number = 0 success_number = 0
logger.info(f"任务数: {len(queue)}") logger.info(f"任务数: {len(queue)}")
if queue: if queue:
for items in asyncio.gather(*queue): for items in queue:
success_number += 1 success_number += 1
logger.info(f"任务耗时: {time.time() - start_time}, 成功数: {success_number}") logger.info(f"任务耗时: {time.time() - start_time}, 成功数: {success_number}")
from time import sleep from time import sleep
import asyncio
import functools import functools
import json import json
import os import os
...@@ -650,8 +649,6 @@ class Search: ...@@ -650,8 +649,6 @@ class Search:
tasks = [json.loads(task) for task in tasks] tasks = [json.loads(task) for task in tasks]
for task in tasks: for task in tasks:
queue.append(self.run(task)) queue.append(self.run(task))
if queue:
asyncio.gather(*queue)
logger.info(f"任务耗时: {time.time() - start_time}") logger.info(f"任务耗时: {time.time() - start_time}")
if self.is_debug: if self.is_debug:
......
from time import sleep import asyncio
import asyncio import json
import json import os
import os import threading
import threading import time
import time
import redis
import redis from curl_cffi import requests
from curl_cffi import requests from loguru import logger
from loguru import logger
from conf import config
from conf import config from const import Site
from const import Site from db import RedisSingleton
from db import RedisSingleton from tool import Task
from tool import Task
COLL_DOMAIN = config['app']['coll_domain']
COLL_DOMAIN = config['app']['coll_domain'] _redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True)
_redis_db = redis.Redis.from_url(config['redis']['url'], decode_responses=True) task_monitoring_config = config['task-monitoring']
task_monitoring_config = config['task-monitoring'] task_search_config = config['task-search']
task_search_config = config['task-search'] task_product_detail_config = config['task-product-detail']
task_product_detail_config = config['task-product-detail']
redis_config = config['redis']
redis_config = config['redis'] cookie_config = config['cookie']
cookie_config = config['cookie']
DEFAULT_HEADER = {
DEFAULT_HEADER = { "Content-Type": "application/json",
"Content-Type": "application/json", "Accept": "application/json",
"Accept": "application/json", }
}
PID_FILE = './pid/task.pid'
PID_FILE = './pid/task.pid'
def get_task(task_key: str = task_monitoring_config['queue_key'], number: int = 1):
def get_task(task_key: str = task_monitoring_config['queue_key'], number: int = 1): """
""" 获取任务
获取任务
:param task_key:
:param task_key: :param number:
:param number: :return:
:return: """
""" try:
try: url = f"{COLL_DOMAIN}/api/collection/task?number={number}&queue={task_key}"
url = f"{COLL_DOMAIN}/api/collection/task?number={number}&queue={task_key}" response = requests.get(url, headers=DEFAULT_HEADER, verify=False)
response = requests.get(url, headers=DEFAULT_HEADER, verify=False) response = response.json()
response = response.json() if response["code"] == 0:
if response["code"] == 0: return response["data"]
return response["data"] else:
else: return {}
return {} except Exception as e:
except Exception as e: logger.error(f"获取任务异常 : {e}")
logger.error(f"获取任务异常 : {e}") return {}
return {}
def add_task(task_key: str, redis_key: str, task_number: int):
def add_task(task_key: str, redis_key: str, task_number: int): """
""" 添加任务
添加任务
:param task_key:
:param task_key: :param redis_key:
:param redis_key: :param task_number:
:param task_number: :return:
:return: """
""" items = get_task(task_key, task_number)
items = get_task(task_key, task_number) task_lists = items.get('list', [])
task_lists = items.get('list', []) if task_lists:
if task_lists: logger.info(f"{task_key} - {len(task_lists)}个任务加入队列")
logger.info(f"{task_key} - {len(task_lists)}个任务加入队列") for item in task_lists:
for item in task_lists: _redis_db.lpush(redis_key, json.dumps(item))
_redis_db.lpush(redis_key, json.dumps(item))
def run(task_config: dict = task_monitoring_config):
def run(task_config: dict = task_monitoring_config): while True:
while True:
if not os.path.exists(PID_FILE):
if not os.path.exists(PID_FILE): logger.error('任务退出')
logger.error('任务退出') break
break add_task_enabled = task_config.get('add_task_enabled')
add_task_enabled = task_config.get('add_task_enabled') task_number = int(task_config.get('number'))
task_number = int(task_config.get('number')) task_min_number = int(task_config.get('min_number'))
task_min_number = int(task_config.get('min_number')) redis_key = task_config.get('task_key')
redis_key = task_config.get('task_key') todo_task_count = _redis_db.llen(redis_key)
todo_task_count = _redis_db.llen(redis_key) task_key = task_config.get('queue_key')
task_key = task_config.get('queue_key') try:
try: task_keys = task_key.split(',')
task_keys = task_key.split(',') logger.info(f"{redis_key}任务队列长度 : {todo_task_count}")
logger.info(f"{redis_key}任务队列长度 : {todo_task_count}") if todo_task_count <= task_min_number and add_task_enabled == 'True':
if todo_task_count <= task_min_number and add_task_enabled == 'True': for key in task_keys:
for key in task_keys: add_task(key, redis_key, task_number)
add_task(key, redis_key, task_number) except KeyboardInterrupt:
except KeyboardInterrupt: logger.error(f"任务终止")
logger.error(f"任务终止") except Exception as e:
except Exception as e: logger.error(f"任务异常 : {redis_key} : {e}")
logger.error(f"任务异常 : {redis_key} : {e}") time.sleep(5)
time.sleep(5)
async def cookie():
def cookie(): for site in Site.values():
for site in Site.values(): time_key = cookie_config['cookie_time_key']
time_key = cookie_config['cookie_time_key'] time_key = f"{time_key}:{site}"
time_key = f"{time_key}:{site}" _redis_db.delete(time_key)
_redis_db.delete(time_key)
while True:
while True: if not os.path.exists(PID_FILE):
if not os.path.exists(PID_FILE): logger.error('任务退出')
logger.error('任务退出') break
break
logger.info(f"获取cookie")
logger.info(f"获取cookie") for site in Site.values():
for site in Site.values(): try:
try: await task_manager.get_cookie(site)
task_manager.get_cookie(site) except:
except: logger.error(f"获取cookie异常")
logger.error(f"获取cookie异常") await asyncio.sleep(5)
sleep(5)
if __name__ == '__main__':
if __name__ == '__main__': tasks = []
tasks = [] redis_singleton = RedisSingleton(redis_url=redis_config['url'])
redis_singleton = RedisSingleton(redis_url=redis_config['url']) task_manager = Task(redis_singleton)
task_manager = Task(redis_singleton)
with open(PID_FILE, 'w') as f:
with open(PID_FILE, 'w') as f: f.write(str(os.getpid()))
f.write(str(os.getpid()))
if task_monitoring_config.get('enabled', None) == 'True':
if task_monitoring_config.get('enabled', None) == 'True': logger.info(f"监控任务添加")
logger.info(f"监控任务添加") t = threading.Thread(target=run, args=(task_monitoring_config,))
t = threading.Thread(target=run, args=(task_monitoring_config,)) tasks.append(t)
tasks.append(t) else:
else: logger.info(f"监控任务未启动")
logger.info(f"监控任务未启动")
if task_product_detail_config.get('enabled', None) == 'True':
if task_product_detail_config.get('enabled', None) == 'True': logger.info(f"发布任务添加")
logger.info(f"发布任务添加") t = threading.Thread(target=run, args=(task_product_detail_config,))
t = threading.Thread(target=run, args=(task_product_detail_config,)) tasks.append(t)
tasks.append(t) else:
else: logger.info(f"发布任务未启动")
logger.info(f"发布任务未启动")
if task_search_config.get('enabled', None) == 'True':
if task_search_config.get('enabled', None) == 'True': logger.info(f"搜索任务添加")
logger.info(f"搜索任务添加") t = threading.Thread(target=run, args=(task_search_config,))
t = threading.Thread(target=run, args=(task_search_config,)) tasks.append(t)
tasks.append(t) else:
else: logger.info(f"搜索任务未启动")
logger.info(f"搜索任务未启动")
t = threading.Thread(target=asyncio.run, args=(cookie(),))
t = threading.Thread(target=asyncio.run, args=(cookie(),)) tasks.append(t)
tasks.append(t)
for t in tasks:
for t in tasks: t.start()
t.start()
for t in tasks:
for t in tasks: t.join()
t.join()
...@@ -7,7 +7,7 @@ from datetime import datetime, timedelta ...@@ -7,7 +7,7 @@ from datetime import datetime, timedelta
from babel.dates import get_month_names, get_day_names from babel.dates import get_month_names, get_day_names
from curl_cffi import requests from curl_cffi import requests
from curl_cffi.requests import AsyncSession from curl_cffi.requests import Session
from dateutil import parser from dateutil import parser
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from loguru import logger from loguru import logger
...@@ -145,7 +145,7 @@ class Request: ...@@ -145,7 +145,7 @@ class Request:
:param url: :param url:
:return: :return:
""" """
with AsyncSession(max_clients=50) as s: with Session() as s:
# 清空 请求的值 # 清空 请求的值
s.headers.clear() s.headers.clear()
s.cookies.clear() s.cookies.clear()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment