Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
P
python_open_ai
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
haojie
python_open_ai
Commits
770cd2e3
Commit
770cd2e3
authored
Apr 14, 2023
by
wangfa
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/master'
parents
25c17d53
96e0bc22
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
109 additions
and
117 deletions
+109
-117
ai_pyppeteer.py
+57
-114
test.py
+45
-3
tool.py
+7
-0
No files found.
ai_pyppeteer.py
View file @
770cd2e3
...
@@ -89,10 +89,6 @@ class ToWork:
...
@@ -89,10 +89,6 @@ class ToWork:
async
def
FindElement
(
self
,
custom_el
,
path
,
el_type
=
''
):
async
def
FindElement
(
self
,
custom_el
,
path
,
el_type
=
''
):
try
:
try
:
el
=
(
await
custom_el
.
xpath
(
path
))
el
=
(
await
custom_el
.
xpath
(
path
))
# logger.info(len(el))
# for i in el:
# title = await page.evaluate('(element) => element.textContent', i)
# logger.info(title)
if
el_type
==
'last'
:
if
el_type
==
'last'
:
return
el
[
len
(
el
)
-
1
]
return
el
[
len
(
el
)
-
1
]
else
:
else
:
...
@@ -130,83 +126,35 @@ class ToWork:
...
@@ -130,83 +126,35 @@ class ToWork:
await
asyncio
.
sleep
(
1
)
await
asyncio
.
sleep
(
1
)
return
status
return
status
# 获取上传策略
# 登录
async
def
get_policy
(
self
,
url
):
async
def
login
(
self
,
page
):
for
i
in
range
(
5
):
user_name
=
'2897821407@qq.com'
try
:
password
=
'lhj157839477'
res
=
req
.
get
(
url
)
# 用户名
data
=
json
.
loads
(
res
.
text
)[
'data'
]
user_name_path
=
'//input[@name="email"]'
return
data
is_offline
=
await
self
.
FindElement
(
page
,
user_name_path
)
except
Exception
as
e
:
if
is_offline
:
logger
.
error
(
'上传策略请求错误'
)
#
logger
.
error
(
e
)
logger
.
info
(
'账号掉线,重新登录'
)
name_el
=
await
page
.
waitForXPath
(
user_name_path
)
async
def
download_img
(
self
,
url
,
cur_uuid
):
await
name_el
.
type
(
user_name
,
{
"delay"
:
1
})
"""
# 密码
下载图片
password_path
=
'//*[@name="password"]'
:param url:
pwd_el
=
await
self
.
FindElement
(
page
,
password_path
)
:return:
if
pwd_el
:
"""
await
pwd_el
.
type
(
password
,
{
"delay"
:
1
})
get_file
=
req
.
get
(
url
=
url
,
headers
=
self
.
header
,
stream
=
True
,
allow_redirects
=
False
,
timeout
=
10
)
# 判断输入的内容
file_name
=
cur_uuid
user_name_value
=
await
page
.
evaluate
(
'(name_el) => name_el.value'
,
name_el
)
if
not
os
.
path
.
exists
(
'./download'
):
password_value
=
await
page
.
evaluate
(
'(pwd_el) => pwd_el.value'
,
pwd_el
)
os
.
makedirs
(
'./download'
)
if
user_name_value
==
user_name
and
password_value
==
password
:
path
=
f
'./download/{file_name}'
# 输入的值正确--点击登录
with
open
(
path
,
"wb"
)
as
Pypdf
:
login_path
=
'//*[@id="app-mount"]/div[2]/div[1]/div[1]/div/div/div/div/form/div[2]/div/div[1]/div[2]/button[2]'
for
chunk
in
get_file
.
iter_content
(
chunk_size
=
1024
):
# 1024 bytes
status
=
await
self
.
ElementClick
(
page
,
login_path
)
if
chunk
:
if
status
:
Pypdf
.
write
(
chunk
)
return
True
return
path
return
False
async
def
image_to_byte
(
self
,
imagepath
):
'''
图片转二进制
'''
with
open
(
imagepath
,
"rb"
)
as
f
:
byte_data
=
f
.
read
()
return
byte_data
async
def
al_upload
(
self
,
policy
,
path
,
cur_uuid
):
# 阿里云上传
url
=
'https://'
+
policy
[
'host'
]
file
=
await
self
.
image_to_byte
(
path
)
logger
.
info
(
'图片已经转为二进制'
)
params
=
{
'key'
:
policy
[
'dir'
]
+
cur_uuid
,
'policy'
:
policy
[
'policy'
],
'OSSAccessKeyId'
:
policy
[
'accessid'
],
'success_action_status'
:
'200'
,
'callback'
:
policy
[
'callback'
],
'signature'
:
policy
[
'signature'
],
'file'
:
file
,
}
header
=
{
'Content-Type'
:
'multipart/form-data;charset=utf-8'
}
logger
.
info
(
'阿里云上传'
)
res
=
req
.
post
(
url
,
data
=
params
,
headers
=
header
)
logger
.
info
(
res
)
# 上传后的链接为
upload_img
=
policy
[
'domain'
]
+
policy
[
'dir'
]
+
cur_uuid
logger
.
info
(
upload_img
)
async
def
upload_img
(
self
,
task
,
href
):
try
:
cur_uuid
=
uuid
.
uuid4
()
cur_uuid
=
str
(
cur_uuid
)
+
'.png'
# 先获取上传策略
policy
=
await
self
.
get_policy
(
task
[
'policy'
])
# 下载discord的图片
# path = await self.download_img(href, cur_uuid)
path
=
'./download/0f24e1f6-4ba2-42d7-bdf4-852653f30fe4.png'
# 开始上传
await
self
.
al_upload
(
policy
,
path
,
cur_uuid
)
except
Exception
as
e
:
logger
.
error
(
e
)
async
def
img_callback
(
self
,
task
,
href
,
message
=
''
):
async
def
img_callback
(
self
,
task
,
href
,
message
=
''
):
# 先要上传图片
# cur_href = await self.upload_img(task, href)
url
=
task
[
'callback'
]
url
=
task
[
'callback'
]
try
:
try
:
params
=
{
params
=
{
...
@@ -351,12 +299,6 @@ class ToWork:
...
@@ -351,12 +299,6 @@ class ToWork:
logger
.
info
(
change_prompt
)
logger
.
info
(
change_prompt
)
# 一个一个输入
# 一个一个输入
await
self
.
driver_paste
(
page
,
change_prompt
)
await
self
.
driver_paste
(
page
,
change_prompt
)
# 往元素里插入文本
# element
await
page
.
evaluate
(
'''(element) => {
element.innerHTML = '111'
}'''
,
''
)
# 回车
# 回车
await
page
.
keyboard
.
press
(
'Enter'
)
await
page
.
keyboard
.
press
(
'Enter'
)
await
asyncio
.
sleep
(
2
)
await
asyncio
.
sleep
(
2
)
...
@@ -445,32 +387,39 @@ class ToWork:
...
@@ -445,32 +387,39 @@ class ToWork:
logger
.
error
(
e
)
logger
.
error
(
e
)
logger
.
info
(
'浏览器打开失败,正在重试'
)
logger
.
info
(
'浏览器打开失败,正在重试'
)
# 等待登录成功
async
def
wait_login_success
(
self
,
page
):
logger
.
info
(
'开始判断登录状态'
)
while
True
:
# 登录成功元素
page_load
=
'//div[contains(@aria-label, "测试服务器")]'
login_success_el
=
await
self
.
FindElement
(
page
,
page_load
)
if
login_success_el
:
return
True
#
return
False
async
def
specified_server
(
self
,
page
):
async
def
specified_server
(
self
,
page
):
try
:
try
:
logger
.
info
(
'等待登录成功'
)
# 等待首屏加载完成,直到登录成功
# 判断页面是否加载完成
status
=
await
self
.
wait_login_success
(
page
)
page_load
=
'//*[@data-list-item-id="guildsnav___home"]'
if
status
:
element
=
await
self
.
CheckElement
(
page
,
page_load
,
30
)
# 登录成功
if
not
element
:
logger
.
info
(
'登录成功'
)
logger
.
error
(
'登录失败'
)
# 等待服务器元素出现
# 去登陆
server_el
=
'//div[contains(@aria-label, "测试服务器")]'
element
=
await
self
.
CheckElement
(
page
,
server_el
,
20
)
if
element
:
# 点击指定服务器
status
=
await
self
.
ElementClick
(
page
,
server_el
,
'测试服务器'
)
if
status
:
# 等待聊天窗口出现
input_el
=
'//*[@role="textbox"]'
element
=
await
self
.
CheckElement
(
page
,
input_el
)
if
element
:
logger
.
info
(
'聊天窗口存在'
)
return
True
return
False
return
False
# 登录成功
logger
.
info
(
'登录成功'
)
# 等待服务器元素出现
server_el
=
'//div[contains(@aria-label, "测试服务器")]'
element
=
await
self
.
CheckElement
(
page
,
server_el
,
20
)
if
element
:
# 点击指定服务器
status
=
await
self
.
ElementClick
(
page
,
server_el
,
'测试服务器'
)
if
status
:
# 等待聊天窗口出现
input_el
=
'//*[@role="textbox"]'
element
=
await
self
.
CheckElement
(
page
,
input_el
)
if
element
:
logger
.
info
(
'聊天窗口存在'
)
return
True
return
False
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
e
)
logger
.
error
(
e
)
return
False
return
False
...
@@ -515,12 +464,6 @@ def process_start(browser_item, lock):
...
@@ -515,12 +464,6 @@ def process_start(browser_item, lock):
keyword
=
ToWork
(
browser_item
,
lock
)
keyword
=
ToWork
(
browser_item
,
lock
)
# 生产
# 生产
loop
.
run_until_complete
(
keyword
.
on_start
())
loop
.
run_until_complete
(
keyword
.
on_start
())
# 测试
# task = {
# 'policy': 'http://test.phpgpt.com/api/users/config/policy2'
# }
# href = 'https://cdn.discordapp.com/attachments/1090583268467945475/1093734564188409946/suifeng_a_very_cute_boys_16_years_old_fly_a_kitein_beautifulstr_cd2d65d6-ea40-432b-9941-c796504b6082.png'
# loop.run_until_complete(keyword.upload_img(task, href))
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
e
)
logger
.
error
(
e
)
...
...
test.py
View file @
770cd2e3
...
@@ -16,13 +16,45 @@ async def FindElement(custom_el, path, el_type=''):
...
@@ -16,13 +16,45 @@ async def FindElement(custom_el, path, el_type=''):
return
False
return
False
# 元素点击-xpath点击
async
def
ElementClick
(
page
,
path
,
name
=
''
):
status
=
False
for
i
in
range
(
1
,
10
):
try
:
ele
=
(
await
page
.
xpath
(
path
))[
0
]
await
ele
.
click
()
logger
.
info
(
f
'{name}-元素存在,开始点击'
)
status
=
True
break
except
Exception
as
e
:
logger
.
info
(
e
)
logger
.
info
(
f
'{name}-元素不存在'
)
status
=
False
await
asyncio
.
sleep
(
1
)
return
status
# 登录
# 登录
async
def
login
(
page
):
async
def
login
(
page
):
time
.
sleep
(
30
)
user_name
=
'2897821407@qq.com'
password
=
'lhj157839477'
print
(
'开始登录'
)
print
(
'开始登录'
)
# 用户名
user_name_path
=
'//input[@name="email"]'
user_name_path
=
'//input[@name="email"]'
element
=
await
FindElement
(
page
,
user_name_path
)
name_el
=
await
page
.
waitForXPath
(
user_name_path
)
logger
.
info
(
element
)
await
name_el
.
type
(
user_name
,
{
"delay"
:
1
})
# 密码
password_path
=
'//*[@name="password"]'
pwd_el
=
await
FindElement
(
page
,
password_path
)
if
pwd_el
:
await
pwd_el
.
type
(
password
,
{
"delay"
:
1
})
# 判断输入的内容
user_name_value
=
await
page
.
evaluate
(
'(name_el) => name_el.value'
,
name_el
)
password_value
=
await
page
.
evaluate
(
'(pwd_el) => pwd_el.value'
,
pwd_el
)
if
user_name_value
==
user_name
and
password_value
==
password
:
# 输入的值正确--点击登录
login_path
=
'//*[@id="app-mount"]/div[2]/div[1]/div[1]/div/div/div/div/form/div[2]/div/div[1]/div[2]/button[2]'
status
=
await
ElementClick
(
page
,
login_path
)
async
def
main
():
async
def
main
():
...
@@ -43,9 +75,19 @@ async def main():
...
@@ -43,9 +75,19 @@ async def main():
# 插入文本
# 插入文本
input_path
=
'//*[@id="app-mount"]/div[2]/div[1]/div[1]/div/div[2]/div/div/div/div/div[2]/div/main/form/div/div[2]/div/div[2]/div/div/div/span[2]/span[2]/span/span/span'
input_path
=
'//*[@id="app-mount"]/div[2]/div[1]/div[1]/div/div[2]/div/div/div/div/div[2]/div/main/form/div/div[2]/div/div[2]/div/div/div/span[2]/span[2]/span/span/span'
element
=
await
FindElement
(
page
,
input_path
)
element
=
await
FindElement
(
page
,
input_path
)
# var evt = new InputEvent('input', {
# inputType: 'insertText',
# data: st,
# dataTransfer: null,
# isComposing: false
# });
# dom.dispatchEvent(evt);
if
element
:
if
element
:
await
page
.
evaluate
(
'''(element,test_title) => {
await
page
.
evaluate
(
'''(element,test_title) => {
element.innerHTML = test_title
element.innerHTML = test_title
var dom = document.querySelector('body')
console.log(dom)
dom.value = test_title
}'''
,
element
,
test_title
)
}'''
,
element
,
test_title
)
logger
.
info
(
'成功'
)
logger
.
info
(
'成功'
)
# 回车
# 回车
...
...
tool.py
0 → 100644
View file @
770cd2e3
def
image_to_byte
(
self
,
imagepath
):
'''
图片转二进制
'''
with
open
(
imagepath
,
"rb"
)
as
f
:
byte_data
=
f
.
read
()
return
byte_data
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment