Commit a5474c0e by yexing

u

parent 1fb7b8ca
import json
import pytest
from celery import Celery
from celery.contrib.testing.worker import start_worker
from celery_tasks import assign_spider_task, start_spider_task
from const import Site
@pytest.fixture(scope="session")
def celery_worker():
app = Celery(
broker_url="memory://",
result_backend="cache+memory://",
task_always_eager=False,
)
with start_worker(
app,
shutdown_timeout=600,
perform_ping_check=False,
# loglevel="DEBUG",
) as worker:
yield worker
def test_assign_task(celery_worker):
asin = "B0F4K4NQM2"
sites = [Site.com]
# sites = Site.values()
for site in sites:
style_snap_token = {
Site.de: "hOCRqU2moPvF7OHbdVIA%2FcA5ovfB6r0ylJuUi%2FlOBZz3AAAAAGgkOxkAAAAB",
Site.it: "hDnqRGV%2FHNIvmt066sCeQ%2BCV3nSmUqKeaz8xxb%2FvRxVcAAAAAGgkSZgAAAAB",
Site.fr: "hOszNRVKeyUBhfw3wv5b0aA8oeLE2aeIwsz2PUdA298eAAAAAGgkYs0AAAAB",
Site.es: "hBrvHdT5rdf55ogsaVpOntP1EBtw%2FzMQ9EjExhl%2BfaEpAAAAAGgq2CgAAAAB",
Site.jp: "hLIxmKRriLNZAn1JzZapY3HvPn42ymFTSMsDTQg5HAfBAAAAAGgkcnQAAAAB",
}
style_snap_cookie = {
Site.de: "session-id=262-3255866-8161667; i18n-prefs=EUR; lc-acbde=de_DE; ubid-acbde=260-9608208-7094621; session-id-time=2082787201l; session-token=o/NZjNyInsXiiUgIboxXoXSRrYwl3rx9fAymAp4CCwf6WdcuRNAUQCEFg47k8VXTC3KSYw6fBMBgM6tXUIwbHjsorhAdhwCurKbypoC3lgoZMkf00U5I1kSmNqc3HLpjse4Ymn69X/0QdC85JsfK71ik4AxLCvPas/xOzW4eqxBAyX+pV3wApnfeZnIphELnlFDSr9mrQ4OK0sJaveHFbCQyPsCsOqEbgqokVxqZuo1PHbbNtk8kmY5+O4FGZZ1Rd29/WpI325eY46Khf3mKdQsUvmHJ9Qdjti0OHBQXqG/2Uas2LyCi0cKyybMJPn7LqzFS0ro6MZ3ufRXA2ZbdRlzCu/imptOn; rxc=AM5BPAyBERNPU5AAzsA; csm-hit=tb:s-60X7Y7EP7NMPAY5A4EB8|1747204893470&t:1747204894017&adb:adblk_no",
Site.it: "session-id=259-0276938-7980111; i18n-prefs=EUR; ubid-acbit=261-6163436-4078364; session-id-time=2082787201l; session-token=E/w0330BjSmKZBECYTfHbZ7yEy7I4AKBdWL/m8ZX+tL2cAR7kPoLrFNx8S5dx16pTkHdMKW09HLjWeQeNz7llVEHqn172fHwxDcS7pazkXJgNRSRLDsLKfhf37omIg1wiBFLLlF/Ze/7Y9UHiodrU+0Ii0BZB6fXmpuia1MfYZVIhc0tFhAo8YFtCB9tyGZo0HTV11uIxxlm7fWf8o750ibLu1P2XupDWM1XEWpTkioZfgwU/HDDA7VO3nteiy42ajAiVqg+Mp6qA4qnaem9nNx+7TQwOEfhY1QvoMVtrfiZydc/m5wAij9/Cy8vfkSSxrtRXiwMwMI2lkzTSyfIzd0KLLJ+VdrT; csm-hit=tb:s-WXX22RF20QVB7J6QY6GK|1747208604822&t:1747208605107&adb:adblk_no",
Site.fr: "session-id=259-3895263-6553629; session-id-time=2082787201l; i18n-prefs=EUR; ubid-acbfr=259-4408157-0120414; session-token=eUYnvkQ5sAKaXqveut1y8oduGNiOD2EjeL69ox8B8WMis+WxwXvxOxl6YG3/Vbmxua74nu6Po4SqX7jb2zWEN0MafCL2vNWuqH8EtzWGxf7b956vhl+Zke6znbzQIGc0TJvEHQxYH4qhU2nhjMewvfEKaNpLin+BlSteJ/29YPTAmJoDmn1SBQFmLS1F+lwvJslCVl2tE5wPQtlRJRoy1J3Y3PyilY/QeSeOoQm0kspCKVwiIZ0Rker6IoiJ994xQp9mBMPgf7Kaxq+3/4WcJH0gUgDMTEJ422JCvVyUsPlIqoB87OnL9svvThxe2rw3+G3Pl7RGcfsrgTNxMLajeryo0c4gDsCA; csm-hit=tb:s-MVXSWSB18MKR7M8ZGZPG|1747214963697&t:1747214964546&adb:adblk_no",
Site.es: "session-id=257-1877650-6628461; session-id-time=2082787201l; i18n-prefs=EUR; ubid-acbes=257-0235449-8488852; session-token=4VefymsDENsTHhmV3VThQL9xtbG25qqF0ue7hTQAuej9Hwcb8gteCVV4dDQ8BzXeSdcE1qxvGNxI5sjDddBC4GwnfXxNjPW4KKREHTXl4AcOnIBB0NsAP8ysdlrGmnF2i6tZaPQQmujRiQkE6W6541cqvo2awOmApuIAzLb/yTYBIwJdC7DAbRSAUjBpCb/Par21qrO4i6Vz/yzK9jvAZdmWNIHE6aFN6P0gNMnz8ubsQwqHzEhCqDZDzd2/82dB7ZkztnblwWdLTf11HIWSVcwY0Yrt/3Dva64m73AGG9ehiXhZVZAo9nBai5dRi1Bw21HZm9LicAtLlbc51P5nskJuXBVBUqHT; csm-hit=tb:s-6TH1ZWPZ3MSWA7C2FNME|1747638315404&t:1747638315630&adb:adblk_no",
Site.jp: "session-id=356-1153481-7631550; session-id-time=2082787201l; i18n-prefs=JPY; lc-acbjp=zh_CN; ubid-acbjp=356-3715403-6686010; session-token=\"UPpZlDoDBS1AIP3L68R2R+yjb0KoWvefyp2stmNqnQdqPJSW03v3MGhH9DhlM1a14QVzBoJY6/sys0OcBcFD+edCQDl9W1FPUeMbExBr4xxrsaPHcUWQ/pciyQ6+tXq1A4mn42SoZgYU1qf9Sz3P2+gz5NPfj8R82NFt5oL3s61DpWWDeLLXLdWIzHHyAq6Szi19aVK9GnhDuMu67f/vvi/ecgaeRgFumFtFmOCjAD3zTkumKFvJnKyEEHUzS3lye1TOx/bB6mEtPVHxlo3R4XIgeeDQeaAkjLot9ksW9GzRhe9xtdUHTPHRI9P8oKy2PMWyrqu7yH5AhIj0uYC/XUtYoom6Z85Qr5J8RxoeQnY=\"; csm-hit=tb:s-VE4YKH0HM7B9R7F0G6GC|1747219065639&t:1747219065883&adb:adblk_no",
}
url1 = f"https://www.amazon.{site}/dp/{asin}?th=1"
key1 = "task-product-detail"
url2 = f"https://www.amazon.{site}/dp/{asin}"
key2 = "task-monitoring"
key3 = "task-search"
task1 = json.dumps(
{
"url": f"https://www.amazon.{site}/s?k=Adhesive+Shower+Caddy&page=2&xpid=oznV7KayXAf7P&qid=1747215325&ref=sr_pg_2",
# "url": f"https://www.amazon.{site}/s?k=Adhesive+Shower+Caddy&i=garden&page=2&crid=1MR2IOB4BMAXF&qid=1739174661&sprefix=adhesive+shower+caddy%2Cgarden%2C391&xpid=--1JOuYBktM8-&ref=sr_pg_2",
"admin_users_id": 1,
"task_id": 37,
"collection_number": 0,
"callback_type": 8,
"app_name": "admin",
"platform_type": 6,
}
)
task2 = json.dumps(
{
"url": f"https://www.amazon.{site}/+style_snap_upload", # 虚构路径
"admin_users_id": 1,
"task_id": 37,
"collection_number": 0,
"callback_type": 8,
"app_name": "admin",
"platform_type": 6,
"style_snap_token": style_snap_token.get(site),
"style_snap_cookie": style_snap_cookie.get(site),
}
)
all_tasks = [
['{"id": 252, "admin_users_id": 1, "log_id": 11, "platform_type": 1, "callback_type": 9, "app_name": "admin", "max_sku_number": 35, "project": "temu", "url": "%s"}'%url1],
['{"url":"%s","item_id":"27","admin_users_id":1,"callback_type":5,"collection_type":1,"app_name":"admin","cache_time":60}'%url2],
[task1], [task2]
]
keys = [key1, key2, key3, key3]
items = list(zip(keys, all_tasks))
items = [items[1]]
for key, tasks in items:
# tasks = None
assign_spider_task.apply_async(args=(key, tasks,)).get()
def test_start_task(celery_worker):
# start_dial_task.delay().get()
start_spider_task.delay().get()
if __name__ == "__main__":
# pytest.main(["./"])
# pytest.main(["test/test_celery.py::test_start_task"])
pytest.main(["test/test_celery.py::test_assign_task"])
import json
import pytest
import redis
from conf import config
from const import Site, Spider
DB_REDIS = redis.Redis.from_url(config["redis"]["url"], decode_responses=True)
def test_spider_task():
asin = "B0CMTQFXB8"
for _ in range(1):
DB_REDIS.lpush(
config[Spider.detail]["task_key"],
# json.dumps({"url": f"https://www.amazon.{Site.de}/dp/{asin}"}),
json.dumps({'id': 2061071844, 'admin_users_id': 0, 'log_id': 8252798, 'platform_type': 6, 'callback_type': 9, 'app_name': 'admin', 'max_sku_number': 35, 'url': 'https://www.amazon.de/dp/B01MYBVW76', 'project': 'tiktok'})
)
if __name__ == "__main__":
pytest.main(["test/test_task.py::test_spider_task"])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment