Commit f5374576 by baiquan

refactor(service): 重构登录功能以提高稳定性和可维护性

- 新增 is_logged_in 函数用于检查登录状态
- 重新组织代码结构,提高可读性和可维护性
- 增加异常处理和日志记录,提高稳定性
parent 29f17d25
import re import re
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from DrissionPage import Chromium from DrissionPage import Chromium
from DrissionPage._configs.chromium_options import ChromiumOptions from DrissionPage._configs.chromium_options import ChromiumOptions
from DrissionPage._functions.by import By from DrissionPage._functions.by import By
...@@ -9,121 +8,37 @@ from DrissionPage.errors import * ...@@ -9,121 +8,37 @@ from DrissionPage.errors import *
from loguru import logger from loguru import logger
from dao.db import find_account_by_environment_id from dao.db import find_account_by_environment_id
from service.hub_ import openBrowser, closeBrowser from service.hub_ import closeBrowser, openBrowser
from utils.dy_verify import get_distance_by_ddddocr from utils.dy_verify import get_distance_by_ddddocr
from utils.errors import AppError from utils.errors import AppError
def click_button(tab, loc): def click_button(tab, loc, retry_limit=5, delay=2):
tab.wait(1) """带重试机制的按钮点击函数"""
retry = 0 for attempt in range(retry_limit):
while True:
try:
if tab.ele(loc).click():
break
except AppError as e:
logger.error(f"{loc}点击按钮失败-->{e}")
pass
tab.wait(2)
if retry > 3:
raise Exception(f"点击按钮失败-->{loc}")
async def page_login(browser_id, restart=False):
open_count = 3
while True:
open_count -= 1
try: try:
open_result = await openBrowser(browser_id, timeout=30) tab.wait(1)
data = open_result.get('data', {}) if element := tab.ele(loc):
if not data: element.click()
raise AppError("启动浏览器失败") return True
port = data.get('debuggingPort', '')
co = ChromiumOptions()
co.set_local_port(port)
co.no_imgs(True)
co.no_js(True)
chromium = Chromium(co)
tab = chromium.latest_tab
tab.set.load_mode.eager()
tab.get('https://fxg.jinritemai.com/login/common')
if chromium.states.is_existed and "https://fxg.jinritemai.com" in tab.url:
break
else:
tab.close()
raise Exception(f'非Hub浏览器-->重试启动浏览器:{open_count}')
except Exception as e: except Exception as e:
logger.error(f"{browser_id}启动浏览器错误-->{e}") logger.warning(f"点击 {loc} 失败 ({attempt + 1}/{retry_limit}): {e}")
await closeBrowser(browser_id) time.sleep(delay)
time.sleep(3) logger.error(f"永久点击失败: {loc}")
if open_count == 0: raise ElementNotFoundError(f"无法点击元素 {loc}")
raise AppError(f"{browser_id}-->启动浏览器失败")
tab.listen.start(["account_login/v2/", "captcha/verify", "/aff/check_login", "/loginv1/callback"])
login_res = listen_check_login(tab)
logger.info(f"{browser_id}当前页面是否登录-->{login_res}")
PHPSESSID = ""
if not login_res:
retry = 0
while True:
if retry == 4:
tab.close()
raise AppError(f"未找到账号信息")
if retry >= 3:
tab.close()
raise AppError(f"重试多次,登录失败")
retry += 1
try:
em = "邮箱登录"
doc_loaded = tab.wait.doc_loaded(timeout=10)
eles_loaded = tab.wait.eles_loaded(em)
if tab.title == "首页":
logger.info(f"{browser_id}-->当前页面为首页,已登录")
break
if not doc_loaded or not eles_loaded:
raise AppError(f"页面或元素未加载完成({retry})")
click_button(tab, em) # 点击邮箱登录按钮
email_input = tab.ele("@title=请输入邮箱")
password_input = tab.ele("@title=密码")
if email_input.value == "" or password_input.value == "":
info = find_account_by_environment_id(browser_id)
if info:
account = info.get('account')
password = info.get('password')
else:
retry = 4
continue
email_input.input(vals= account, clear=True)
password_input.input(vals= password, clear=True)
click_button(tab, ".auxo-checkbox") # 点击勾选框
click_button(tab, ".account-center-submit") # 点击登录按钮
listen_login(tab)
PHPSESSID = wait_login_callback(tab)
break
except ElementNotFoundError as e:
if tab.title == "首页":
logger.info(f"{browser_id}-->当前页面为首页,已登录")
break
else:
logger.error(f"{browser_id}-->{e}")
except Exception as e:
logger.error(f"{browser_id}-->{e}")
if not restart:
await closeBrowser(browser_id)
time.sleep(3)
await page_login(browser_id, restart=True)
for _ in range(3):
if tab.title == "首页":
if not PHPSESSID:
cookies = tab.cookies().as_dict()
PHPSESSID = cookies.get('PHPSESSID', '')
set_token(tab)
return PHPSESSID
tab.wait(3)
if not restart:
await page_login(browser_id, restart=True)
raise AppError("登录失败-->未获取到token")
def set_token(tab): def is_logged_in(tab):
"""检查是否已登录的多种方式"""
if tab.title == "首页":
logger.info("已处于登录状态")
return True
cookies = tab.cookies().as_dict()
return bool(cookies.get('PHPSESSID'))
def set_auth_token(tab):
"""获取并设置认证token"""
token_pattern = r'"token":\s*"([a-f0-9]{32})"' token_pattern = r'"token":\s*"([a-f0-9]{32})"'
token_match = re.search(token_pattern, tab.html) token_match = re.search(token_pattern, tab.html)
if token_match: if token_match:
...@@ -139,53 +54,12 @@ def set_token(tab): ...@@ -139,53 +54,12 @@ def set_token(tab):
'HttpOnly' # 禁止 JS 访问 'HttpOnly' # 禁止 JS 访问
) )
tab.set.cookies(cookies) tab.set.cookies(cookies)
return True
else: else:
raise AppError("未找到 token") return False
def wait_login_callback(tab): def handle_slider_captcha(tab):
for packet in tab.listen.steps(timeout=10):
if '/loginv1/callback' in packet.url:
headers = packet.response.headers
set_cookie = headers['set-cookie']
if "PHPSESSID" in set_cookie:
logger.info(f"响应的cookie --> {set_cookie}")
PHPSESSID = re.search(r'PHPSESSID=(.*?);', set_cookie).group(1)
return PHPSESSID
raise AppError("登录失败 --> 未获取到回调的cookie")
def listen_login(tab):
for packet in tab.listen.steps(timeout=15):
if 'account_login/v2/' in packet.url:
login_res = packet.response.body
logger.info(f'获取登录数据:{login_res}')
if not login_res:
continue
if login_res['description'] == '滑动滑块进行验证':
verify_captcha(tab)
return
elif login_res['description'] == '':
logger.info("登录成功")
return
else:
raise AppError(f"登录失败 {login_res['description']}")
raise AppError("登录失败-->未获取到登录数据")
def listen_check_login(tab):
check_result = False
for packet in tab.listen.steps(timeout=5):
if '/aff/check_login' in packet.url and packet.response.body:
login_res = packet.response.body
logger.info(f'获取确认登录数据:{login_res}')
if (login_res['deputy_has_login'] and login_res['subject_has_login'] and login_res['error_code'] == 0
and login_res.get('sec_subject_uid') and login_res.get('sec_user_id')):
check_result = True
if check_result:
if tab.title != "首页":
return False
return check_result
def verify_captcha(tab):
captcha_verify_image = (By.XPATH, '//*[@id="captcha_verify_image"]') captcha_verify_image = (By.XPATH, '//*[@id="captcha_verify_image"]')
verify_img_slide = (By.XPATH, '//*[@id="captcha-verify_img_slide"]') verify_img_slide = (By.XPATH, '//*[@id="captcha-verify_img_slide"]')
captcha_slider_btn = (By.XPATH, '//*[@id="vc_captcha_box"]/div/div/div[4]/div/div[2]/div[2]/div') captcha_slider_btn = (By.XPATH, '//*[@id="vc_captcha_box"]/div/div/div[4]/div/div[2]/div[2]/div')
...@@ -198,7 +72,7 @@ def verify_captcha(tab): ...@@ -198,7 +72,7 @@ def verify_captcha(tab):
if verify_count > 5: if verify_count > 5:
raise AppError("滑块验证失败") raise AppError("滑块验证失败")
iframe = None iframe = None
for _ in range(10): for _ in range(3):
iframe = tab.get_frame('t:iframe') iframe = tab.get_frame('t:iframe')
if iframe: if iframe:
break break
...@@ -221,5 +95,102 @@ def verify_captcha(tab): ...@@ -221,5 +95,102 @@ def verify_captcha(tab):
iframe.actions.hold(_captcha_slider_btn) iframe.actions.hold(_captcha_slider_btn)
iframe.actions.right(x) iframe.actions.right(x)
iframe.actions.release() iframe.actions.release()
listen_login(tab) return True
return
\ No newline at end of file
def perform_login(tab, account, password):
"""执行登录流程的核心函数"""
# 切换到邮箱登录
if not click_button(tab, "邮箱登录"):
return False
# 填充凭证
email_input = tab.ele('@title=请输入邮箱', timeout=5)
password_input = tab.ele('@title=密码', timeout=5)
email_input.input(account, clear=True)
password_input.input(password, clear=True)
# 同意协议
click_button(tab, ".auxo-checkbox")
# 提交登录
click_button(tab, ".account-center-submit")
# 监听登录结果
for packet in tab.listen.steps(timeout=15):
if 'account_login/v2/' in packet.url:
login_res = packet.response.body
if login_res['description'] == '滑动滑块进行验证':
if not handle_slider_captcha(tab):
raise AppError("滑块验证失败")
logger.info(f"-------- login_verify: 滑块处理成功!--------")
return True
elif login_res['description'] == '':
return True # 登录成功
return False
async def page_login(browser_id, max_retries=3):
"""优化的登录主流程"""
# 浏览器初始化
for attempt in range(max_retries):
try:
open_result = await openBrowser(browser_id, timeout=30)
if not open_result.get('data'):
continue
port = open_result['data']['debuggingPort']
co = ChromiumOptions().set_local_port(port)
browser = Chromium(co)
tab = browser.latest_tab
tab.set.cookies.clear()
tab.set.load_mode.eager()
tab.listen.start(["account_login/v2/", "captcha/verify", "/aff/check_login", "/loginv1/callback"])
# 访问登录页
tab.get('https://fxg.jinritemai.com/login/common')
if "login" in tab.url:
break
except Exception as e:
logger.warning(f"浏览器初始化失败 [{attempt + 1}/{max_retries}]: {e}")
await closeBrowser(browser_id)
time.sleep(3)
else:
raise AppError("无法启动浏览器")
# 登录状态检查
if is_logged_in(tab):
# 验证登录结果
for _ in range(20): # 最多等待30秒
if set_auth_token(tab):
cookies = tab.cookies().as_dict()
PHPSESSID = cookies.get('PHPSESSID', '')
return PHPSESSID
time.sleep(1)
raise AppError("未找到 token")
# 获取登录凭证
account_info = find_account_by_environment_id(browser_id)
if not account_info:
raise AppError("未找到账号凭证")
# 执行登录
for attempt in range(max_retries):
try:
logger.info(f"尝试登录 [{attempt + 1}/{max_retries}]")
if perform_login(tab, account_info['account'], account_info['password']):
break
except Exception as e:
logger.warning(f"登录尝试失败: {e}")
else:
raise AppError("多次登录失败")
# 验证登录结果
for _ in range(20): # 最多等待30秒
logger.debug(f"等待登录结果...")
if is_logged_in(tab) and set_auth_token(tab):
cookies = tab.cookies().as_dict()
PHPSESSID = cookies.get('PHPSESSID', '')
return PHPSESSID
time.sleep(1)
raise AppError("登录状态验证失败")
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment