Commit 96e0bc22 by lei

1

parent d0fac8c2
......@@ -89,10 +89,6 @@ class ToWork:
async def FindElement(self, custom_el, path, el_type=''):
try:
el = (await custom_el.xpath(path))
# logger.info(len(el))
# for i in el:
# title = await page.evaluate('(element) => element.textContent', i)
# logger.info(title)
if el_type == 'last':
return el[len(el) - 1]
else:
......@@ -130,83 +126,35 @@ class ToWork:
await asyncio.sleep(1)
return status
# 获取上传策略
async def get_policy(self, url):
for i in range(5):
try:
res = req.get(url)
data = json.loads(res.text)['data']
return data
except Exception as e:
logger.error('上传策略请求错误')
logger.error(e)
async def download_img(self, url, cur_uuid):
"""
下载图片
:param url:
:return:
"""
get_file = req.get(url=url, headers=self.header, stream=True, allow_redirects=False, timeout=10)
file_name = cur_uuid
if not os.path.exists('./download'):
os.makedirs('./download')
path = f'./download/{file_name}'
with open(path, "wb") as Pypdf:
for chunk in get_file.iter_content(chunk_size=1024): # 1024 bytes
if chunk:
Pypdf.write(chunk)
return path
async def image_to_byte(self, imagepath):
'''
图片转二进制
'''
with open(imagepath, "rb") as f:
byte_data = f.read()
return byte_data
async def al_upload(self, policy, path, cur_uuid):
# 阿里云上传
url = 'https://' + policy['host']
file = await self.image_to_byte(path)
logger.info('图片已经转为二进制')
params = {
'key': policy['dir'] + cur_uuid,
'policy': policy['policy'],
'OSSAccessKeyId': policy['accessid'],
'success_action_status': '200',
'callback': policy['callback'],
'signature': policy['signature'],
'file': file,
}
header = {
'Content-Type': 'multipart/form-data;charset=utf-8'
}
logger.info('阿里云上传')
res = req.post(url, data=params, headers=header)
logger.info(res)
# 上传后的链接为
upload_img = policy['domain'] + policy['dir'] + cur_uuid
logger.info(upload_img)
async def upload_img(self, task, href):
try:
cur_uuid = uuid.uuid4()
cur_uuid = str(cur_uuid) + '.png'
# 先获取上传策略
policy = await self.get_policy(task['policy'])
# 下载discord的图片
# path = await self.download_img(href, cur_uuid)
path = './download/0f24e1f6-4ba2-42d7-bdf4-852653f30fe4.png'
# 开始上传
await self.al_upload(policy, path, cur_uuid)
except Exception as e:
logger.error(e)
# 登录
async def login(self, page):
user_name = '2897821407@qq.com'
password = 'lhj157839477'
# 用户名
user_name_path = '//input[@name="email"]'
is_offline = await self.FindElement(page, user_name_path)
if is_offline:
#
logger.info('账号掉线,重新登录')
name_el = await page.waitForXPath(user_name_path)
await name_el.type(user_name, {"delay": 1})
# 密码
password_path = '//*[@name="password"]'
pwd_el = await self.FindElement(page, password_path)
if pwd_el:
await pwd_el.type(password, {"delay": 1})
# 判断输入的内容
user_name_value = await page.evaluate('(name_el) => name_el.value', name_el)
password_value = await page.evaluate('(pwd_el) => pwd_el.value', pwd_el)
if user_name_value == user_name and password_value == password:
# 输入的值正确--点击登录
login_path = '//*[@id="app-mount"]/div[2]/div[1]/div[1]/div/div/div/div/form/div[2]/div/div[1]/div[2]/button[2]'
status = await self.ElementClick(page, login_path)
if status:
return True
return False
async def img_callback(self, task, href, message=''):
# 先要上传图片
# cur_href = await self.upload_img(task, href)
url = task['callback']
try:
params = {
......@@ -351,12 +299,6 @@ class ToWork:
logger.info(change_prompt)
# 一个一个输入
await self.driver_paste(page, change_prompt)
# 往元素里插入文本
# element
await page.evaluate('''(element) => {
element.innerHTML = '111'
}''', '')
# 回车
await page.keyboard.press('Enter')
await asyncio.sleep(2)
......@@ -445,32 +387,39 @@ class ToWork:
logger.error(e)
logger.info('浏览器打开失败,正在重试')
# 等待登录成功
async def wait_login_success(self, page):
logger.info('开始判断登录状态')
while True:
# 登录成功元素
page_load = '//div[contains(@aria-label, "测试服务器")]'
login_success_el = await self.FindElement(page, page_load)
if login_success_el:
return True
#
return False
async def specified_server(self, page):
try:
logger.info('等待登录成功')
# 判断页面是否加载完成
page_load = '//*[@data-list-item-id="guildsnav___home"]'
element = await self.CheckElement(page, page_load, 30)
if not element:
logger.error('登录失败')
# 去登陆
# 等待首屏加载完成,直到登录成功
status = await self.wait_login_success(page)
if status:
# 登录成功
logger.info('登录成功')
# 等待服务器元素出现
server_el = '//div[contains(@aria-label, "测试服务器")]'
element = await self.CheckElement(page, server_el, 20)
if element:
# 点击指定服务器
status = await self.ElementClick(page, server_el, '测试服务器')
if status:
# 等待聊天窗口出现
input_el = '//*[@role="textbox"]'
element = await self.CheckElement(page, input_el)
if element:
logger.info('聊天窗口存在')
return True
return False
# 登录成功
logger.info('登录成功')
# 等待服务器元素出现
server_el = '//div[contains(@aria-label, "测试服务器")]'
element = await self.CheckElement(page, server_el, 20)
if element:
# 点击指定服务器
status = await self.ElementClick(page, server_el, '测试服务器')
if status:
# 等待聊天窗口出现
input_el = '//*[@role="textbox"]'
element = await self.CheckElement(page, input_el)
if element:
logger.info('聊天窗口存在')
return True
return False
except Exception as e:
logger.error(e)
return False
......@@ -515,12 +464,6 @@ def process_start(browser_item, lock):
keyword = ToWork(browser_item, lock)
# 生产
loop.run_until_complete(keyword.on_start())
# 测试
# task = {
# 'policy': 'http://test.phpgpt.com/api/users/config/policy2'
# }
# href = 'https://cdn.discordapp.com/attachments/1090583268467945475/1093734564188409946/suifeng_a_very_cute_boys_16_years_old_fly_a_kitein_beautifulstr_cd2d65d6-ea40-432b-9941-c796504b6082.png'
# loop.run_until_complete(keyword.upload_img(task, href))
except Exception as e:
logger.error(e)
......
......@@ -16,13 +16,45 @@ async def FindElement(custom_el, path, el_type=''):
return False
# 元素点击-xpath点击
async def ElementClick(page, path, name=''):
status = False
for i in range(1, 10):
try:
ele = (await page.xpath(path))[0]
await ele.click()
logger.info(f'{name}-元素存在,开始点击')
status = True
break
except Exception as e:
logger.info(e)
logger.info(f'{name}-元素不存在')
status = False
await asyncio.sleep(1)
return status
# 登录
async def login(page):
time.sleep(30)
user_name = '2897821407@qq.com'
password = 'lhj157839477'
print('开始登录')
# 用户名
user_name_path = '//input[@name="email"]'
element = await FindElement(page, user_name_path)
logger.info(element)
name_el = await page.waitForXPath(user_name_path)
await name_el.type(user_name, {"delay": 1})
# 密码
password_path = '//*[@name="password"]'
pwd_el = await FindElement(page, password_path)
if pwd_el:
await pwd_el.type(password, {"delay": 1})
# 判断输入的内容
user_name_value = await page.evaluate('(name_el) => name_el.value', name_el)
password_value = await page.evaluate('(pwd_el) => pwd_el.value', pwd_el)
if user_name_value == user_name and password_value == password:
# 输入的值正确--点击登录
login_path = '//*[@id="app-mount"]/div[2]/div[1]/div[1]/div/div/div/div/form/div[2]/div/div[1]/div[2]/button[2]'
status = await ElementClick(page, login_path)
async def main():
......@@ -43,9 +75,19 @@ async def main():
# 插入文本
input_path = '//*[@id="app-mount"]/div[2]/div[1]/div[1]/div/div[2]/div/div/div/div/div[2]/div/main/form/div/div[2]/div/div[2]/div/div/div/span[2]/span[2]/span/span/span'
element = await FindElement(page, input_path)
# var evt = new InputEvent('input', {
# inputType: 'insertText',
# data: st,
# dataTransfer: null,
# isComposing: false
# });
# dom.dispatchEvent(evt);
if element:
await page.evaluate('''(element,test_title) => {
element.innerHTML = test_title
var dom = document.querySelector('body')
console.log(dom)
dom.value = test_title
}''', element, test_title)
logger.info('成功')
# 回车
......
def image_to_byte(self, imagepath):
'''
图片转二进制
'''
with open(imagepath, "rb") as f:
byte_data = f.read()
return byte_data
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment