Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
I
inscription-trade-py
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
haojie
inscription-trade-py
Commits
8d458592
Commit
8d458592
authored
Jul 04, 2023
by
haojie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1
parent
7dbbb8b9
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
197 additions
and
97 deletions
+197
-97
Helper.py
+19
-0
src/duneInscription.py
+178
-0
transfer.py
+0
-97
No files found.
Helper.py
View file @
8d458592
...
...
@@ -12,6 +12,25 @@ from config.env import redis_password, redis_host
uni256Max
=
2
**
256
def
get_default_header
(
other_header
:
dict
=
None
):
"""
获取默认请求头
:param other_header:
:return:
"""
if
other_header
is
None
:
other_header
=
{}
headers
=
{
"Accept"
:
"*/*"
,
"authority"
:
"app-api.dune.com"
,
"Accept-Encoding"
:
"gzip, deflate, br"
,
"Host"
:
"www.tiktok.com"
,
"User-Agent"
:
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
,
}
return
dict
(
headers
,
**
other_header
)
class
RedisConnSingleton
(
object
):
_instance
=
None
...
...
src/duneInscription.py
0 → 100644
View file @
8d458592
import
requests
import
urllib.request
import
urllib.parse
import
json
import
aiohttp
import
asyncio
import
time
from
selenium
import
webdriver
from
Helper
import
RedisCon
import
random
class
Dune
:
def
__init__
(
self
):
pass
# 等待元素出现
def
waitElement
(
self
,
driver
,
value
,
el_type
=
'xpath'
,
num
=
10
):
for
i
in
range
(
num
):
try
:
status
=
driver
.
find_element
(
el_type
,
value
)
if
status
:
return
status
else
:
print
(
'元素不存在'
)
time
.
sleep
(
1
)
except
Exception
as
e
:
print
(
e
)
return
False
# 获取元素text
def
getElementText
(
self
,
driver
,
value
,
el_type
=
'xpath'
):
status
=
driver
.
find_element
(
el_type
,
value
)
if
status
:
return
status
.
text
return
''
# 获取元素的子元素
def
getChildNum
(
self
,
driver
,
value
,
el_type
=
'xpath'
):
try
:
items
=
driver
.
find_elements
(
el_type
,
value
)
lens
=
len
(
items
)
return
{
'size'
:
lens
,
'items'
:
items
,
}
except
Exception
as
e
:
print
(
'获取子元素列表失败'
)
print
(
e
)
return
{
'size'
:
0
,
'items'
:
[],
}
# 元素点击
def
elementClick
(
self
,
driver
,
value
,
el_type
=
'xpath'
):
item
=
driver
.
find_element
(
el_type
,
value
)
if
item
:
driver
.
execute_script
(
"arguments[0].click();"
,
item
)
# item.click()
time
.
sleep
(
random
.
uniform
(
1
,
3
))
return
True
else
:
print
(
'分页按钮未找到'
)
return
False
def
startTask
(
self
,
driver
,
num
):
list
=
[]
# 指定页数
while
True
:
# 获取当前页的长度
current_page_size
=
self
.
getChildNum
(
driver
,
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/*'
)
for
j
in
range
(
current_page_size
[
'size'
]):
obj
=
{}
# 每行
item
=
current_page_size
[
'items'
][
j
]
# Protocol(terc-20)
obj
[
'Protocol'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[1]/div/div'
)
# Name(eths)
obj
[
'Name'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[2]/div/div/a'
)
# Deploy Time(2023-06-24 11:52)
obj
[
'DeployTime'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[3]/div'
)
# Limit Per Mint(1000)
obj
[
'LimitPerMint'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[4]/div'
)
# Total Supply(21,000,000)
obj
[
'TotalSupply'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[5]/div'
)
# Valid Minted(43,646,644)
obj
[
'ValidMinted'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[6]/div'
)
# Progress(%)
obj
[
'Progress'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[7]/div'
)
# Total Minted(49,822,302)
obj
[
'TotalMinted'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[8]/div'
)
# Holders(5,340)
obj
[
'Holders'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[9]/div'
)
# Minters(5,964)
obj
[
'Minters'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[10]/div'
)
# Transactions(49,636)
obj
[
'Transactions'
]
=
self
.
getElementText
(
driver
,
f
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[{j + 1}]/td[11]/div'
)
list
.
append
(
obj
)
if
len
(
list
)
>=
int
(
num
):
# 已爬完
print
(
f
'本次任务已完成,共{len(list)}'
)
# 提交redis
RedisCon
.
task_list
(
REDIS_KEY_MARKET_INSCRIPTION_TASK_CALLBACK
,
json
.
dumps
(
list
))
return
# 下一页,点击按钮
nextPageStatus
=
False
for
it
in
range
(
5
):
nextPage
=
self
.
elementClick
(
driver
,
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[2]/ul/li[6]/button'
)
if
nextPage
:
print
(
'下一页点击成功'
)
break
def
getInscriptionList
(
self
):
# 页面
options
=
webdriver
.
ChromeOptions
()
options
.
add_argument
(
'headless'
)
options
.
add_argument
(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36"
)
options
.
add_argument
(
'window-size=1920x1080'
)
driver
=
webdriver
.
Chrome
(
options
=
options
)
# 控制浏览器访问url地址
driver
.
get
(
"https://dune.com/tyler3/ethscription-indexer?undefined="
)
is_show
=
self
.
waitElement
(
driver
,
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[1]/table/tbody/tr[1]'
)
if
is_show
:
print
(
'元素已出现'
)
# 总行数
page_num
=
self
.
waitElement
(
driver
,
'//*[@id="__next"]/div/main/div[1]/div/section/div/div/article[16]/div/div[2]/ul/li[1]/span'
,
'xpath'
,
5
)
if
page_num
and
page_num
.
text
:
page_num
=
page_num
.
text
.
split
()[
0
]
self
.
startTask
(
driver
,
page_num
)
else
:
print
(
'无法获取页数,检查元素位置是否改变'
)
print
(
'退出浏览器'
)
# 退出浏览器
driver
.
quit
()
def
main
():
dune
=
Dune
()
while
True
:
# redis中获取任务
task
=
RedisCon
.
get_task
(
REDIS_KEY_MARKET_INSCRIPTION_TASK
)
if
task
:
print
(
'获取到任务'
)
# 任务存在
dune
.
getInscriptionList
()
else
:
print
(
'没有任务'
)
time
.
sleep
(
10
)
if
__name__
==
'__main__'
:
REDIS_KEY_MARKET_INSCRIPTION_TASK
=
'laravel_database_market_inscription_task'
REDIS_KEY_MARKET_INSCRIPTION_TASK_CALLBACK
=
'laravel_database_market_inscription_task_callback'
main
()
transfer.py
deleted
100644 → 0
View file @
7dbbb8b9
import
asyncio
import
json
import
time
import
traceback
import
requests
from
Helper
import
RedisCon
,
Helper
import
logging
from
config.env
import
login_params
,
login_api
,
task_api
logging
.
basicConfig
(
level
=
logging
.
ERROR
,
filename
=
'recharge_error.log'
,
datefmt
=
'
%
Y/
%
m/
%
d
%
H:
%
M:
%
S'
,
format
=
'
%(asctime)
s -
%(name)
s -
%(levelname)
s -
%(lineno)
d -
%(module)
s -
%(message)
s'
)
logger
=
logging
.
getLogger
(
__name__
)
logger
.
setLevel
(
level
=
logging
.
DEBUG
)
# 登录的token
login_token
=
''
def
login
():
res
=
requests
.
post
(
login_api
,
data
=
login_params
)
.
text
res
=
json
.
loads
(
res
)
if
res
[
'code'
]
==
0
:
login_token
=
res
[
'data'
][
'access_token'
]
return
True
# 获取任务
def
get_admin_task
():
res
=
requests
.
get
(
task_api
,
headers
=
{
'authorization'
:
'Bearer '
+
login_token
,
})
.
text
res
=
json
.
loads
(
res
)
if
res
[
'code'
]
==
0
:
return
res
[
'data'
]
def
pay_order_status
(
status
,
url
,
data
):
header
=
{
'User-Agent'
:
'Apipost client Runtime/+https://www.apipost.cn/'
,
'authorization'
:
'Bearer '
+
login_token
,
}
data
[
'status'
]
=
status
print
(
'准备回调'
,
data
)
result
=
requests
.
post
(
url
,
headers
=
header
,
data
=
data
)
print
(
result
)
# if result.text == 'success':
# print("请求成功!")
# else:
# print("请求失败!")
logger
.
debug
(
f
">>>>tx_hash:{data['hash']} status:{status} url:{url}"
f
" 请求状态码:{result.status_code}"
)
async
def
rechargeList
(
web3
,
data
):
url
=
data
[
'notify_url'
]
to_address
=
data
[
'to'
]
from_address
=
data
[
'from'
]
# 交易类型
tran_type
=
data
[
'type'
]
status
=
3
# 默认转账失败
print
(
data
)
try
:
if
tran_type
==
1
:
# 铭文转移
pass
elif
tran_type
==
2
:
# 转账
pass
pay_order_status
(
status
,
url
,
data
)
except
Exception
as
e
:
print
(
e
)
def
main
():
while
True
:
tasks
=
list
()
loop
=
asyncio
.
get_event_loop
()
for
i
in
range
(
5
):
# 获取任务
task
=
get_admin_task
()
if
task
:
web3
=
Helper
.
getWeb3Eth
()
task
=
json
.
loads
(
task
)
# task
tasks
.
append
(
rechargeList
(
web3
,
task
))
loop
.
run_until_complete
((
asyncio
.
wait
(
tasks
,
timeout
=
30
)))
if
__name__
==
'__main__'
:
if
not
login_token
:
login_status
=
login
()
print
(
login_status
)
if
login_status
:
# 可以执行
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment