Commit cf0fa24e by lei

1

parents
File added
# 默认忽略的文件
/shelf/
/workspace.xml
Img_Ai
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.8" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/python-demo.iml" filepath="$PROJECT_DIR$/.idea/python-demo.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module version="4">
<component name="NewModuleRootManager">
<orderEntry type="jdk" jdkName="Python 3.8" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
import re
import time
import json
import redis
import random
import aiohttp
import requests
from web3 import Web3
from web3.eth import AsyncEth
import datetime
import itertools
import eth_utils
import numpy as np
from lxml import etree
from fake_useragent import UserAgent
from aredis import StrictRedis, ConnectionPool
from web3._utils.abi import get_abi_output_types, map_abi_data
from web3._utils.normalizers import BASE_RETURN_NORMALIZERS
uni256Max = 2**256
class RedisConnSingleton(object):
_instance = None
def __new__(cls, *args, **kwargs):
"""
redis connection
:param args:
:param kwargs:
"""
if cls._instance is None:
cls._instance = redis.ConnectionPool(max_connections=1000, host='114.132.50.141', port=6379, db=0, password='lhj123456')
return cls._instance
class RedisConnectionPoolSingleton(object):
_instance = None
def __new__(cls, *args, **kwargs):
"""
aredis connection
:param args:
:param kwargs:
"""
if cls._instance is None:
cls._instance = ConnectionPool(max_connections=1000, host='114.132.50.141', port=6379, db=0, password='lhj123456')
return cls._instance
class RedisCon:
@staticmethod
def RedisConn():
"""
Redis connect
:return:
"""
redis_con = redis.Redis(connection_pool=RedisConnSingleton())
return redis_con
@staticmethod
def delete_task_data(redis_key, key):
"""
redis hdel
:param redis_key:
:param key:
:return:
"""
redis_con = redis.Redis(connection_pool=RedisConnSingleton())
try:
redis_con.hdel(redis_key, key)
except Exception as e:
print("Redis hdel error", e)
@staticmethod
def save_data(redis_key, key, value):
"""
redis hset
:param redis_key:
:param key:
:param value:
:return:
"""
redis_con = redis.Redis(connection_pool=RedisConnSingleton())
try:
redis_con.hset(redis_key, key, value)
except Exception as e:
print("Redis hset error", e)
@staticmethod
def query_contrast(redis_key, key):
"""
redis hget
:param redis_key:
:param key:
:return:
"""
redis_con = redis.Redis(connection_pool=RedisConnSingleton())
try:
res = redis_con.hget(redis_key, key)
return res
except Exception as e:
print("Redis hget error:", e)
@staticmethod
def task_list(key, value):
"""
redis rpush
:param key:
:param value:
:return:
"""
redis_con = redis.Redis(connection_pool=RedisConnSingleton())
try:
redis_con.rpush(key, value)
except Exception as e:
print("Redis rpush error:", e)
@staticmethod
def get_task(key):
"""
redis lpop
:param key:
:return:
"""
redis_con = redis.Redis(connection_pool=RedisConnSingleton())
try:
task = redis_con.lpop(key)
return task
except Exception as e:
print("Redis lpop error:", e)
@staticmethod
def del_task(key, value):
"""
redis lrem
:param key:
:param value:
:return:
"""
redis_con = redis.Redis(connection_pool=RedisConnSingleton())
try:
redis_con.lrem(key, 0, value)
except Exception as e:
print("Redis lrem error:", e)
@staticmethod
def ex_data(key, v_time, value):
"""
redis setex
:param key:
:param v_time:
:param value:
:return:
"""
redis_con = redis.Redis(connection_pool=RedisConnSingleton())
try:
redis_con.setex(key, v_time, value)
except Exception as e:
print("Redis setex error:", e)
@staticmethod
def get_task_num(key):
"""
redis llen
:param key:
:return:
"""
redis_con = redis.Redis(connection_pool=RedisConnSingleton())
try:
num = redis_con.llen(key)
if num is not None:
return num
else:
return 0
except Exception as e:
print("Redis llen error:", e)
class Helper:
"""公共基础类"""
@staticmethod
def getProxyLst():
"""获取代理列表"""
proxy_lst = list()
for i in range(5):
try:
response = requests.get("https://proxy.webshare.io/api/v2/proxy/list/download/pgewrawzvvqtzwdzjfdfswvkwcjasmofoucnfyyq/-/any/username/direct/-/").text
a = response.split('\r\n')
proxy_lst = a[:-1]
proxy_lst = list(map(lambda x: x.split(":"), proxy_lst))
proxy_lst = list(map(lambda x: x[2] + ':' + x[3] + '@' + x[0] + ':' + x[1], proxy_lst))
if proxy_lst != []:
# print("代理获取成功!")
break
except:
# 等待 1s 重试
time.sleep(2)
RedisCon.save_data("chain:pLst", "proxy", json.dumps(proxy_lst))
RedisCon.save_data("chain:pLst", "date", json.dumps(int(time.time())))
return proxy_lst
@staticmethod
def updateProxyLst():
status = 0
proxy_date = RedisCon.query_contrast("chain:pLst", "date")
if proxy_date:
proxy_date = json.loads(proxy_date)
if int(time.time()) - proxy_date > 1800:
status = 1
else:
status = 1
return status
@staticmethod
def getProxyList():
proxy = RedisCon.query_contrast("chain:pLst", "proxy")
if proxy:
proxy = json.loads(proxy)
# 代理获取失败,重新获取代理!
if not proxy:
proxy = Helper.getProxyLst()
else:
proxy = Helper.getProxyLst()
return proxy
@staticmethod
def swap_str():
"""
swap str
:return:
"""
return '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
@staticmethod
def tran_str():
"""
transfer str
:return:
"""
return '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
@staticmethod
async def query_hash(redis_key, key):
"""
aredis hget
:param redis_key:
:param key:
:return:
"""
redis_con = StrictRedis(connection_pool=RedisConnectionPoolSingleton())
try:
res = await redis_con.hget(redis_key, key)
return res
except Exception as e:
print("Redis hget error:", e)
@staticmethod
async def task_list(key, value):
"""
aredis rpush
:param key:
:param value:
:return:
"""
redis_con = StrictRedis(connection_pool=RedisConnectionPoolSingleton())
try:
await redis_con.rpush(key, value)
except Exception as e:
print("Redis rpush error:", e)
@staticmethod
async def get_task(key):
"""
aredis lpop
:param key:
:return:
"""
redis_con = StrictRedis(connection_pool=RedisConnectionPoolSingleton())
try:
task = await redis_con.lpop(key)
return task
except Exception as e:
print("Redis lpop error:", e)
@staticmethod
async def save_hash_data(redis_key, key, value):
"""
aredis hset
:param redis_key:
:param key:
:param value:
:return:
"""
redis_con = StrictRedis(connection_pool=RedisConnectionPoolSingleton())
try:
await redis_con.hset(redis_key, key, value)
except Exception as e:
print("Redis hset error", e)
@staticmethod
async def del_hash_data(redis_key, key):
"""
aredis hdel
:param redis_key:
:param key:
:return:
"""
redis_con = StrictRedis(connection_pool=RedisConnectionPoolSingleton())
try:
await redis_con.hdel(redis_key, key)
except Exception as e:
print("Redis hdel error", e)
@staticmethod
async def get_string_value(name):
"""
aredis get
:param name:
:return:
"""
redis_con = StrictRedis(connection_pool=RedisConnectionPoolSingleton())
try:
data = await redis_con.get(name)
return data
except Exception as e:
print("Redis get error", e)
@staticmethod
async def ex_data(key, time, value):
"""
aredis setex
:param key:
:param time:
:param value:
:return:
"""
redis_con = StrictRedis(connection_pool=RedisConnectionPoolSingleton())
try:
await redis_con.setex(key, time, value)
except Exception as e:
print("Redis setex error", e)
@staticmethod
async def get_functions_(fn_abi, transaction, web_async):
"""
web3 get_functions_
:param fn_abi:
:param transaction:
:param web_async:
:return:
"""
eth = AsyncEth(web_async)
contract = await eth.call(transaction)
output_types = get_abi_output_types(fn_abi)
output_data = web_async.codec.decode_abi(output_types, contract)
_normalizers = itertools.chain(
BASE_RETURN_NORMALIZERS,
(),
)
normalized_data = map_abi_data(_normalizers, output_types, output_data)
# print(normalized_data)
if len(normalized_data) == 1:
return normalized_data[0]
else:
return normalized_data
@staticmethod
async def get_token_decimals(token, web_async):
"""
web3 decimals
:param token:
:param web_async:
:return:
"""
fn_abi = {'inputs': [], 'name': 'decimals', 'outputs': [{'internalType': 'uint8', 'name': '', 'type': 'uint8'}],
'stateMutability': 'view', 'type': 'function'}
transaction = {'to': f'{token}', 'data': '0x313ce567'}
function = await Helper.get_functions_(fn_abi, transaction, web_async)
# print("decimals", function)
return function
@staticmethod
async def get_token_name(token, web_async):
"""
web3 name
:param token:
:param web_async:
:return:
"""
fn_abi = {'inputs': [], 'name': 'name', 'outputs': [{'internalType': 'string', 'name': '', 'type': 'string'}],
'stateMutability': 'view', 'type': 'function'}
transaction = {'to': f'{token}', 'data': '0x06fdde03'}
function = await Helper.get_functions_(fn_abi, transaction, web_async)
# print("name", function)
return function
@staticmethod
async def get_token_supply(token, web_async):
"""
web3 supply
:param token:
:param web_async:
:return:
"""
fn_abi = {'inputs': [], 'name': 'totalSupply',
'outputs': [{'internalType': 'uint256', 'name': '', 'type': 'uint256'}], 'stateMutability': 'view',
'type': 'function'}
transaction = {'to': f'{token}', 'data': '0x18160ddd'}
function = await Helper.get_functions_(fn_abi, transaction, web_async)
# print("total supply", function)
return function
@staticmethod
async def get_token_symbol(token, web_async):
"""
web3 symbol
:param token:
:param web_async:
:return:
"""
fn_abi = {'inputs': [], 'name': 'symbol', 'outputs': [{'internalType': 'string', 'name': '', 'type': 'string'}],
'stateMutability': 'view', 'type': 'function'}
transaction = {'to': f'{token}', 'data': '0x95d89b41'}
function = await Helper.get_functions_(fn_abi, transaction, web_async)
# print("symbol", function)
return function
@staticmethod
async def get_token_balance(token, address, web_async):
"""
web3 balanceOf
:param token:
:param address:
:param web_async:
:return:
"""
fn_abi = {'inputs': [{'internalType': 'address', 'name': '', 'type': 'address'}], 'name': 'balanceOf',
'outputs': [{'internalType': 'uint256', 'name': '', 'type': 'uint256'}], 'stateMutability': 'view',
'type': 'function'}
address = '0x70a08231000000000000000000000000' + (address.lower()[2:])
transaction = {'to': f'{token}', 'data': address}
function = await Helper.get_functions_(fn_abi, transaction, web_async)
# print("token balance", function)
return function
@staticmethod
def trans_address_n0x(address):
"""
address type change
:param address:
:return:
"""
address = address[2:]
while len(address) < 64:
address = '0' + address
return address.lower()
@staticmethod
async def get_amounts_out_op(token0, token1, web_async, Router, token0_d):
"""
get price rate - op
:param token0:
:param token1:
:param web_async:
:param Router:
:param token0_d:
:return:
"""
fn_abi = {'inputs': [{'internalType': 'uint256', 'name': 'amountIn', 'type': 'uint256'},
{'internalType': 'address[]', 'name': 'path', 'type': 'address[]'}],
'name': 'getAmountsOut',
'outputs': [{'internalType': 'uint256[]', 'name': 'amounts', 'type': 'uint256[]'}],
'stateMutability': 'view', 'type': 'function'}
str_0 = '0x9881fcb4'
str_1 = Helper.trans_address_n0x(web_async.toHex(1 * (10 ** token0_d)))
str_2 = '00000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000001'
str_3 = Helper.trans_address_n0x(token0)
str_4 = Helper.trans_address_n0x(token1)
str_5 = "0000000000000000000000000000000000000000000000000000000000000001"
data_str = str_0 + str_1 + str_2 + str_3 + str_4 + str_5
transaction = {'to': f'{Router}', 'data': data_str}
function = await Helper.get_functions_(fn_abi, transaction, web_async)
# print("token amount", function)
return function
@staticmethod
async def get_amounts_out(token0, token1, web_async, Router, token0_d):
"""
get price rate
:param token0:
:param token1:
:param web_async:
:param Router:
:param token0_d:
:return:
"""
fn_abi = {'inputs': [{'internalType': 'uint256', 'name': 'amountIn', 'type': 'uint256'},
{'internalType': 'address[]', 'name': 'path', 'type': 'address[]'}],
'name': 'getAmountsOut',
'outputs': [{'internalType': 'uint256[]', 'name': 'amounts', 'type': 'uint256[]'}],
'stateMutability': 'view', 'type': 'function'}
str_0 = '0xd06ca61f'
str_1 = Helper.trans_address_n0x(web_async.toHex(1 * (10 ** token0_d)))
str_2 = '00000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000002'
str_3 = Helper.trans_address_n0x(token0)
str_4 = Helper.trans_address_n0x(token1)
data_str = str_0 + str_1 + str_2 + str_3 + str_4
transaction = {'to': f'{Router}', 'data': data_str}
function = await Helper.get_functions_(fn_abi, transaction, web_async)
# print("token amount", function)
return function
@staticmethod
async def get_token_detail_web(token, chain):
"""
get goPlus token detail
:param token:
:param chain:
:return:
"""
url = f'https://api.gopluslabs.io/api/v1/token_security/{chain}?contract_addresses={token}'
async with aiohttp.ClientSession() as session:
proxy = Helper.aiohttp_proxy_pool()
headers = Helper.getHeaders()
res = await session.get(url, headers=headers, proxy=proxy, timeout=10)
res = await res.text()
da = json.loads(res)
return da['result'][token.lower()] if token.lower() in da['result'] else {}
@staticmethod
async def get_sell_radio(token, sell_num, token_d, address, web_async):
"""
get sell radio
:param token:
:param sell_num:
:param token_d:
:param address:
:param web_async:
:return:
"""
if address is not None:
balance = await Helper.get_token_balance(token, address, web_async)
balance = balance / (10 ** token_d)
sell_radio = sell_num / (sell_num + balance)
return {
'radio': sell_radio,
'balance': balance,
}
else:
return {
'radio': None,
'balance': None,
}
@staticmethod
async def balancer_sell_radio(token, token_d, tx_hash, token_a, sell_num, web_async):
"""
balancer sell info
:param token:
:param token_d:
:param tx_hash:
:param token_a:
:param sell_num:
:param web_async:
:return:
"""
tran_str = Helper.tran_str()
pool = "0x000000000000000000000000ba12222222228d8ba445958a75a0704d566bf2c8"
receipt = await web_async.eth.get_transaction_receipt(tx_hash)
logs = receipt['logs']
address = None
for log in logs:
if log['address'] == token and log['topics'][0].hex() == tran_str and log['topics'][
2].hex() == pool and eval(log['data']) == token_a:
address = log['topics'][1].hex()
address = web_async.toChecksumAddress(Helper.hex2bsc(address))
break
else:
pass
sell_radio, balance = None, None
if address is not None:
balance = await Helper.get_token_balance(token, address, web_async)
balance = balance / (10 ** token_d)
sell_radio = sell_num / (sell_num + balance)
return {
'address': address,
'radio': sell_radio,
'balance': balance
}
@staticmethod
async def Beethoven_sell_radio(token, token_d, tx_hash, token_a, sell_num, web_async):
"""
beethoven sell info
:param token:
:param token_d:
:param tx_hash:
:param token_a:
:param sell_num:
:param web_async:
:return:
"""
tran_str = Helper.tran_str()
pool = "0x00000000000000000000000020dd72ed959b6147912c2e529f0a0c651c33c9ce"
receipt = await web_async.eth.get_transaction_receipt(tx_hash)
logs = receipt['logs']
address = None
for log in logs:
if log['address'] == token and log['topics'][0].hex() == tran_str and log['topics'][
2].hex() == pool and eval(log['data']) == token_a:
address = log['topics'][1].hex()
address = web_async.toChecksumAddress(Helper.hex2bsc(address))
break
else:
pass
sell_radio, balance = None, None
if address is not None:
balance = await Helper.get_token_balance(token, address, web_async)
balance = balance / (10 ** token_d)
sell_radio = sell_num / (sell_num + balance)
return {
'address': address,
'radio': sell_radio,
'balance': balance
}
@staticmethod
async def get_new_holder(token, token_d, address, buy_num, web_async):
"""
get new holder info
:param token:
:param token_d:
:param address:
:param buy_num:
:param web_async:
:return:
"""
address = web_async.toChecksumAddress(address)
balance = await Helper.get_token_balance(token, address, web_async)
balance = balance / (10 ** token_d)
if balance <= buy_num:
new = 1
else:
new = 0
return {
'balance': balance,
'new': new
}
@staticmethod
async def balancer_new_holder(token, tx_hash, token_a, web_async):
"""
balancer new holder info
:param token:
:param tx_hash:
:param token_a:
:param web_async:
:return:
"""
tran_str = Helper.tran_str()
pool = "0x000000000000000000000000ba12222222228d8ba445958a75a0704d566bf2c8"
receipt = await web_async.eth.get_transaction_receipt(tx_hash)
logs = receipt['logs']
address = None
for log in logs:
if log['address'] == token and log['topics'][0].hex() == tran_str and log['topics'][
1].hex() == pool and eval(log['data']) == token_a:
address = log['topics'][2].hex()
address = web_async.toChecksumAddress(Helper.hex2bsc(address))
break
else:
pass
# print('address', address)
new, balance = 0, 0
if address is not None:
balance = await Helper.get_token_balance(token, address, web_async)
if balance == token_a:
new = 1
return {
'balance': balance,
'address': address,
'new': new
}
@staticmethod
async def Beethoven_new_holder(token, tx_hash, token_a, web_async):
"""
beethoven new holder info
:param token:
:param tx_hash:
:param token_a:
:param web_async:
:return:
"""
tran_str = Helper.tran_str()
pool = "0x00000000000000000000000020dd72ed959b6147912c2e529f0a0c651c33c9ce"
receipt = await web_async.eth.get_transaction_receipt(tx_hash)
logs = receipt['logs']
address = None
for log in logs:
if log['address'] == token and log['topics'][0].hex() == tran_str and log['topics'][
1].hex() == pool and eval(log['data']) == token_a:
address = log['topics'][2].hex()
address = web_async.toChecksumAddress(Helper.hex2bsc(address))
break
else:
pass
# print('address', address)
new, balance = 0, 0
if address is not None:
balance = await Helper.get_token_balance(token, address, web_async)
if balance == token_a:
new = 1
return {
'balance': balance,
'address': address,
'new': new
}
@staticmethod
async def sell_address(logs, token, pool, amount, web_async):
"""
get sell address
:param logs:
:param token:
:param pool:
:param amount:
:param web_async:
:return:
"""
tran_str = Helper.tran_str()
address = None
for log in logs:
if log['address'] == token and log['topics'][0].hex() == tran_str and log['topics'][
2].hex() == pool and eval(log['data']) == amount:
address = log['topics'][1].hex()
address = web_async.toChecksumAddress(Helper.hex2bsc(address))
break
else:
pass
if address is None:
for log in logs:
if log['address'] == token and log['topics'][0].hex() == tran_str and log['topics'][2].hex() == pool:
address = log['topics'][1].hex()
address = web_async.toChecksumAddress(Helper.hex2bsc(address))
break
else:
pass
return address
@staticmethod
def hex2bsc(address):
"""
change hex-address
:param address:
:return:
"""
address = '0x' + address[26:]
return address
@staticmethod
def normalize_32_byte_hex_address(address):
"""
change type toCheckAddress
:param address:
:return:
"""
as_bytes = eth_utils.to_bytes(hexstr=address)
return eth_utils.to_normalized_address(as_bytes[-20:])
@staticmethod
def checksumAddress(address, web_async):
"""
web3 checkAddress
:param address:
:param web_async:
:return:
"""
address = Helper.normalize_32_byte_hex_address(address)
return web_async.toChecksumAddress(address)
@staticmethod
async def wBalances(amount, p_token, p_token_symbol, pDecimals, wToken, Router, redisKey, wDecimals, web_async):
"""
get wrap pool balance
:param amount
:param p_token
:param p_token_symbol
:param pDecimals
:param wToken
:param Router
:param redisKey
:param wDecimals
:param web_async
"""
redis_con = StrictRedis(connection_pool=RedisConnectionPoolSingleton())
rate = await redis_con.get(f'{redisKey}:rate:' + p_token_symbol + '-W')
opType = ['opt', 'can']
if rate is None:
if redisKey in opType:
rate = await Helper.get_amounts_out_op(p_token, wToken, web_async, Router, pDecimals)
else:
rate = await Helper.get_amounts_out(p_token, wToken, web_async, Router, pDecimals)
rate = rate[1] / (10 ** wDecimals)
await redis_con.setex(f'{redisKey}:rate:' + p_token_symbol + '-W', 60, rate)
else:
rate = json.loads(rate)
w_balances = amount * rate
return w_balances
@staticmethod
def trans_address(address):
"""
change address-hex
:param address:
:return:
"""
address = address[2:]
while len(address) < 64:
address = '0' + address
address = '0x' + address
return address.lower()
@staticmethod
async def get_pool_swap(pair, web_async):
"""
get pool swap type
:param pair:
:param web_async:
:return:
"""
status = 0
pool_swap = ''
info = await Helper.query_hash("taos:search:saved", pair)
if info is not None:
info = json.loads(info)
if "swap" in info:
pool_swap = info['swap']
status = 1
if status == 0:
try:
pool_swap = await Helper.get_token_name(pair, web_async)
except:
pool_swap = 'Error Contract'
return {
"swap": pool_swap,
"status": status
}
@staticmethod
async def pair_balance(p_token, p_token_d, pair_address, redisKey, swapType, p_amount, web_async):
"""
get pair balance
:param p_token
:param p_token_d
:param pair_address
:param redisKey
:param swapType
:param p_amount
:param web_async
"""
status = 0
balance = await Helper.query_hash(f"{redisKey}:balance", pair_address)
if balance is None:
status = 1
else:
balance = json.loads(balance)
ts_before = Helper.get_time_stamp(balance['time'])
ts = Helper.get_time_stamp(Helper.get_time())
if ts - ts_before > 300:
status = 1
else:
if swapType == "sell":
balance['p_balance'] -= p_amount
else:
balance['p_balance'] += p_amount
if balance['p_balance'] < 0:
status = 1
if status == 1:
p_token_balance = await Helper.get_token_balance(p_token, pair_address, web_async)
balance = {
'time': Helper.get_time(),
'p_balance': (p_token_balance / (10 ** p_token_d))
}
await Helper.save_hash_data(f"{redisKey}:balance", pair_address, json.dumps(balance))
return balance
@staticmethod
async def public2usd(pToken, pSymbol, usd, redisKey, Router, uDecimals, pDecimals, web_async):
"""
get public token - usd rate
:param pToken
:param pSymbol
:param usd
:param redisKey
:param Router
:param uDecimals
:param pDecimals
:param web_async
"""
busd_bsc = "0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56" # BUSD (bsc)
weth_bsc = "0x2170Ed0880ac9A755fd29B2688956BD959F933F8" # ETH (bsc)
wbtc_bsc = "0x7130d2A12B9BCbFAe4f2634d864A1Ee1Ce3Ead9c" # BTCB (bsc)
router_bsc = "0x10ED43C718714eb63d5aA57B78B54704E256024E" # pancakeSwapV2Router
redis_con = StrictRedis(connection_pool=RedisConnectionPoolSingleton())
rate = await redis_con.get(f'{redisKey}:rate:' + pSymbol + '-U')
opType = ['opt', 'can']
chainErrorRouter = ['cro', 'aur', 'gno', 'opt', 'cel', 'met', 'ast']
chainErrorToken = ['WETH', 'WBTC']
if rate is None:
if pSymbol == 'WETH' and redisKey in chainErrorRouter:
bsc_rate = await redis_con.get('bsc:rate:WETH-U')
if bsc_rate is None:
rate = await Helper.get_amounts_out(weth_bsc, busd_bsc, Helper.getWeb3Bsc(), router_bsc, 18)
uDecimals = 18
rate = rate[1] / (10 ** uDecimals)
await redis_con.setex('bsc:rate:WETH-U', 60, rate)
else:
rate = json.loads(bsc_rate)
return rate
elif pSymbol == 'WBTC' and redisKey in chainErrorRouter:
bsc_rate = await redis_con.get('bsc:rate:WBTC-U')
if bsc_rate is None:
rate = await Helper.get_amounts_out(wbtc_bsc, busd_bsc, Helper.getWeb3Bsc(), router_bsc, 18)
uDecimals = 18
rate = rate[1] / (10 ** uDecimals)
await redis_con.setex('bsc:rate:WBTC-U', 60, rate)
else:
rate = json.loads(bsc_rate)
return rate
elif redisKey in opType and pSymbol not in chainErrorToken:
rate = await Helper.get_amounts_out_op(pToken, usd, web_async, Router, pDecimals)
else:
rate = await Helper.get_amounts_out(pToken, usd, web_async, Router, pDecimals)
rate = rate[1] / (10 ** uDecimals)
await redis_con.setex(f'{redisKey}:rate:' + pSymbol + '-U', 60, rate)
else:
rate = json.loads(rate)
return rate
@staticmethod
async def redis_data(data, info, ts_, redisKey, public_token, chainId, stable, search_table, web_async):
"""
taos save insert task list
:param data:
:param info:
:param ts_:
:param redisKey:
:param public_token:
:param chainId:
:param stable:
:param search_table:
:param web_async:
:return:
"""
token = data['token']
if info is None:
try:
supply = await Helper.get_token_supply(token, web_async) / (10 ** data['token_decimals'])
symbol = await Helper.get_token_symbol(token, web_async)
name = await Helper.get_token_name(token, web_async)
except:
da = await Helper.get_token_detail_web(token, chainId)
name = da['token_name'] if "token_name" in da else ""
symbol = da['token_symbol'] if "token_symbol" in da else ""
supply = da['total_supply'] if "total_supply" in da else ""
redis_token = {
'name': name,
'symbol': symbol,
'supply': supply,
'decimals': data['token_decimals'],
'token': token
}
await Helper.save_hash_data(f'{redisKey}:tokens', token, json.dumps(redis_token))
else:
name = info['name']
symbol = info['symbol']
supply = info['supply']
basic_info = {
'name': name,
'symbol': symbol,
'supply': supply,
}
chainInfo = {
'stable': stable,
'search_table': search_table,
'chain_id': chainId,
'redis_key': redisKey
}
if chainId == 1 or chainId == 56:
token_info = dict(**data, **basic_info)
await Helper.task_list(f"{redisKey}:taos:tokens", json.dumps(token_info))
else:
token_info = dict(**data, **basic_info, **chainInfo)
await Helper.task_list("taos:insert:tokens", json.dumps(token_info))
@staticmethod
def get_swap_amount(amount):
"""
uniSwap v2 tx amount
:param amount:
:return:
"""
if amount[0:2] == "0x":
amount0In = eval('0x' + amount[2:66])
amount1In = eval('0x' + amount[66:130])
else:
amount0In = eval('0x' + amount[0:64])
amount1In = eval('0x' + amount[64:128])
amount0Out = eval('0x' + amount[-128:-64])
amount1Out = eval('0x' + amount[-64:])
swap_info = {
'0In': amount0In,
'1In': amount1In,
'0Out': amount0Out,
'1Out': amount1Out
}
return swap_info
@staticmethod
def getUniswapV3Amount(amount):
"""
uniSwap v3 tx amount
:param amount:
:return:
"""
amount0 = eval('0x' + amount[2:66])
amount1 = eval('0x' + amount[66:130])
if amount0 > amount1:
amount0 = -(uni256Max - amount0)
else:
amount1 = -(uni256Max - amount1)
swapinfo = {
"amount0": amount0,
"amount1": amount1
}
return swapinfo
@staticmethod
def getBalancerAmount(amount):
"""
balancer tx amount
:param amount:
:return:
"""
amountIn = eval('0x' + amount[2:66])
amountOut = eval('0x' + amount[66:130])
return amountIn, amountOut
@staticmethod
async def get_usd_price_V2(p_token, p_token_d, p_token_symbol, price, usd_token, uToken, redisKey, Router, uDecimals, web_async):
"""
get usd price
:param p_token
:param p_token_d
:param p_token_symbol
:param price
:param usd_token
:param web_async
:param uToken
:param redisKey
:param Router
:param uDecimals
"""
if p_token in usd_token:
usd_price = price
else:
rate = await Helper.public2usd(p_token, p_token_symbol, uToken, redisKey, Router, uDecimals, p_token_d,
web_async)
usd_price = rate * price
return usd_price
@staticmethod
async def get_token_detail(p_token, p_token_d, p_token_symbol, pool_address, usd_token, wToken, wDecimals, uToken, uDecimals, redisKey, Router, swapType, p_amount, web_async):
"""
get token info
:param p_token
:param p_token_d
:param p_token_symbol
:param pool_address
:param usd_token
:param wToken
:param wDecimals
:param uToken
:param uDecimals
:param redisKey
:param Router
:param swapType
:param p_amount
:param web_async
"""
balance = await Helper.pair_balance(p_token, p_token_d, pool_address, redisKey, swapType, p_amount, web_async)
p_balance = balance['p_balance']
u_balance = await Helper.get_usd_price_V2(p_token, p_token_d, p_token_symbol, p_balance, usd_token, uToken,
redisKey, Router, uDecimals, web_async)
if p_token == wToken:
w_balance = p_balance
else:
w_balance = await Helper.wBalances(p_balance, p_token, p_token_symbol, p_token_d, wToken, Router, redisKey,
wDecimals, web_async)
detail = {
'pair_balance': p_balance,
'usd_balance': u_balance,
'wbnb_balance': w_balance
}
return detail
@staticmethod
async def redisSaveList(pool, dataProcess, redisKey, public_token, chainId, stable, search_table, web_async):
"""
redis task list
:param pool
:param dataProcess
:param redisKey
:param public_token
:param chainId
:param stable
:param search_table
:param web_async
"""
data = dataProcess['data']
token_detail = dataProcess['info']
ts_ = Helper.get_time_stamp(data['time'])
if 'p' in pool['r']:
pass
else:
new = await Helper.query_hash(f'{redisKey}:open:recent', data['pool_address'])
if new:
new = json.loads(new)
if 'time' in new:
ts = Helper.get_time_stamp(new['time'])
if ts_ - ts <= 86400:
data['new_token'] = 1
else:
data['new_token'] = 0
else:
data['new_token'] = 1
new['time'] = data['time']
await Helper.save_hash_data(f'{redisKey}:open:recent', data['pool_address'], json.dumps(new))
else:
data['new_token'] = 0
data['is100'] = 0
holder_info = await Helper.query_hash(f'{redisKey}:holder:100', data['token'])
data_ = {
'chain': redisKey,
'token': data['token']
}
if holder_info is not None:
holder_info = json.loads(holder_info)
holder_list = holder_info['holder_100']
time_ = holder_info['date']
stamp = Helper.get_time_stamp(time_)
if ts_ - stamp <= 3600:
if (data['swap_address']).lower() in holder_list:
data['is100'] = 1
else:
await Helper.task_list('chain:holderQueue', json.dumps(data_))
else:
await Helper.task_list('chain:holderQueue', json.dumps(data_))
await Helper.redis_data(data, token_detail, ts_, redisKey, public_token, chainId, stable, search_table, web_async)
@staticmethod
def get_time():
"""
get millisecond time
:return:
"""
ct = time.time()
local_time = time.localtime(ct)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
data_secs = (ct - np.compat.long(ct)) * 1000
time_stamp = "%s.%03d" % (data_head, data_secs)
return time_stamp
@staticmethod
def get_time_add(s):
"""
recent time add millisecond time
:param s:
:return:
"""
ct = (Helper.get_stamp(Helper.get_time()) + s) / 1000
local_time = time.localtime(ct)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
data_secs = (ct - np.compat.long(ct)) * 1000
time_stamp = "%s.%03d" % (data_head, data_secs)
return time_stamp
@staticmethod
def get_time_add_old(ts, s):
"""
ts add millisecond time
:param ts:
:param s:
:return:
"""
ct = (int(ts) + s) / 1000
local_time = time.localtime(ct)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
data_secs = (ct - np.compat.long(ct)) * 1000
time_stamp = "%s.%03d" % (data_head, data_secs)
return time_stamp
@staticmethod
def get_stamp(timeStr):
"""
get millisecond timestamp
:param timeStr:
:return:
"""
try:
datetime_obj = datetime.datetime.strptime(timeStr, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
datetime_obj = datetime.datetime.strptime(timeStr, "%Y-%m-%d %H:%M:%S")
obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0)
return obj_stamp
@staticmethod
def get_time_stamp(date):
"""
get seconds timestamp
:param date:
:return:
"""
try:
d = datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f")
except:
d = datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
t = d.timetuple()
time_stamp = int(time.mktime(t))
return time_stamp
@staticmethod
def get_all_proxy():
"""
get rola all type proxy
:return:
"""
proxies_address = requests.get(
'http://list.rola.info:8088/user_get_ip_list?token=JyJu6trbF4LguQxA1656402892061&type=datacenter&qty=1&time=5&country=&format=txt&protocol=http&filter=1').text
async_proxy = f"http://{proxies_address}"
proxy = {
'https': f"http://{proxies_address}",
'http': f"http://{proxies_address}",
}
return {
"proxy": proxy,
"async_proxy": async_proxy
}
@staticmethod
def all_proxy_pool():
"""
get random all type proxy
:return:
"""
proxy_lst = Helper.getProxyList()
index = random.randint(0, 999)
proxies_address = proxy_lst[index]
async_proxy = f"http://{proxies_address}"
proxy = {
'https': f"http://{proxies_address}",
'http': f"http://{proxies_address}",
}
return {
"proxy": proxy,
"async_proxy": async_proxy
}
@staticmethod
def aiohttp_proxy_pool():
"""
get aiohttp proxy
:return:
"""
proxy_lst = Helper.getProxyList()
index = random.randint(0, 999)
proxy = proxy_lst[index]
proxies = f"http://{proxy}"
return proxies
@staticmethod
def requests_proxy_pool():
"""
get http proxy
:return:
"""
proxy_lst = Helper.getProxyList()
index = random.randint(0, 999)
proxy = proxy_lst[index]
proxies = {
'https': f"http://{proxy}",
'http': f"http://{proxy}",
}
return proxies
@staticmethod
def get_radio(radio):
"""
get radio
:param radio:
:return:
"""
fen = "%.2f%%" % (radio * 100)
return fen
@staticmethod
def getHeaders():
"""
get http headers
:return:
"""
ua = UserAgent()
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-language': 'zh-CN,zh;q=0.9',
'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="99", "Google Chrome";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'iframe',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': ua.chrome
}
return headers
@staticmethod
async def request_page(url, proxy):
"""
aiohttp get page
:param url:
:param proxy:
:return:
"""
# headers = Helper.getHeaders()
async with aiohttp.ClientSession() as session:
# response = await session.get(url, headers=headers, proxy=proxy, timeout=20)
response = await session.get(url, proxy=proxy, timeout=20)
return await response.text()
@staticmethod
async def getTroops(chain, token):
"""
get bsc/eth troops info
:param chain:
:param token:
:return:
"""
if chain == 56:
url = f'https://aywt3wreda.execute-api.eu-west-1.amazonaws.com/default/IsHoneypot?chain=bsc2&token={token}'
else:
url = f'https://aywt3wreda.execute-api.eu-west-1.amazonaws.com/default/IsHoneypot?chain=eth&token={token}'
troops = dict()
for i in range(5):
try:
proxy = Helper.aiohttp_proxy_pool()
res = await Helper.request_page(url, proxy)
troops = json.loads(res)
break
except Exception as e:
print("get troops error!", e)
return troops
@staticmethod
async def getHolderNum(chain, token):
"""
get holder nums
:param chain:
:param token:
:return:
"""
if chain == 56:
url = f"https://bscscan.com/token/generic-tokenholders2?m=normal&a={token}&s=8000000000000&sid=&p=1"
else:
url = f"https://cn.etherscan.com/token/generic-tokenholders2?m=normal&a={token}&s=888888888000000000&sid=&p=1"
holder_num = 0
for i in range(5):
try:
proxy = Helper.aiohttp_proxy_pool()
res = await Helper.request_page(url, proxy)
html = etree.HTML(res)
holder_list = html.xpath('//*[@class="mb-2 mb-md-0"]/text()')[0].strip()
holder_num = re.findall(r"\d+,?\d*", holder_list)[0]
# 代币持仓人数
holder_num = int(holder_num.replace(',', ''))
break
except Exception as e:
print("get holder nums error!", e)
return holder_num
@staticmethod
def getWeb3Bsc():
"""
async bsc web3
:return:
"""
bsc = "https://rpc.ankr.com/bsc"
# proxy = Helper.aiohttp_proxy_pool()
# request_kwargs={"proxy": proxy}
web_async = Web3(Web3.AsyncHTTPProvider(bsc), modules={'eth': (AsyncEth,)},
middlewares=[])
return web_async
@staticmethod
def getWeb3Eth():
"""
async eth web3
:return:
"""
eth = "https://rpc.ankr.com/eth"
proxy = Helper.aiohttp_proxy_pool()
web_async = Web3(Web3.AsyncHTTPProvider(eth, request_kwargs={"proxy": proxy}), modules={'eth': (AsyncEth,)},
middlewares=[])
return web_async
@staticmethod
def bscPoolType(token):
"""
bsc public token
:param token:
:return:
"""
if token == 'WBNB':
return '0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c'
elif token == 'BUSD':
return '0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56'
elif token == 'USDT':
return '0x55d398326f99059fF775485246999027B3197955'
elif token == 'USDC':
return '0x8AC76a51cc950d9822D68b83fE1Ad97B32Cd580d'
elif token == 'BTCB':
return '0x7130d2A12B9BCbFAe4f2634d864A1Ee1Ce3Ead9c'
elif token == 'DAI':
return '0x1AF3F329e8BE154074D8769D1FFa4eE058B1DBc3'
elif token == 'ETH':
return '0x2170Ed0880ac9A755fd29B2688956BD959F933F8'
elif token == 'FIST':
return '0xC9882dEF23bc42D53895b8361D0b1EDC7570Bc6A'
elif token == 'DOGE':
return '0xbA2aE424d960c26247Dd6c32edC70B295c744C43'
elif token == 'Cake':
return '0x0E09FaBB73Bd3Ade0a17ECC321fD13a19e81cE82'
@staticmethod
def bscPoolDecimal(token):
"""
bsc pool decimals
:param token:
:return:
"""
if token == 'WBNB':
return 18
elif token == 'BUSD':
return 18
elif token == 'USDT':
return 18
elif token == 'USDC':
return 18
elif token == 'BTCB':
return 18
elif token == 'DAI':
return 18
elif token == 'ETH':
return 18
elif token == 'FIST':
return 6
elif token == 'DOGE':
return 8
elif token == 'Cake':
return 18
@staticmethod
def ethPoolType(token):
"""
eth public token
:param token:
:return:
"""
if token == 'WETH':
return '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2'
elif token == 'USDT':
return '0xdAC17F958D2ee523a2206206994597C13D831ec7'
elif token == 'USDC':
return '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'
elif token == 'BUSD':
return '0x4Fabb145d64652a948d72533023f6E7A623C7C53'
elif token == 'WBTC':
return '0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599'
elif token == 'DAI':
return '0x6B175474E89094C44Da98b954EedeAC495271d0F'
elif token == 'BNB':
return '0xB8c77482e45F1F44dE1745F52C74426C631bDD52'
@staticmethod
def ethPoolDecimal(token):
"""
eth pool decimals
:param token:
:return:
"""
if token == 'WETH':
return 18
elif token == 'USDT':
return 6
elif token == 'USDC':
return 6
elif token == 'BUSD':
return 18
elif token == 'WBTC':
return 8
elif token == 'DAI':
return 18
elif token == 'BNB':
return 18
if __name__ == '__main__':
# a = Helper.updateProxyLst()
# if a[0] == 0:
# print("代理未过期!")
# else:
# proxyLst = a[1]
# print(proxyLst)
print(Helper.getProxyLst())
import requests
# 官方文档地址,需要科学上网使用
# http://doc.bitbrowser.cn/api-jie-kou-wen-dang/ben-di-fu-wu-zhi-nan
# 此demo仅作为参考使用,以下使用的指纹参数仅是部分参数,完整参数请参考文档
request = requests.session()
url = "http://127.0.0.1:54345"
def createOrUpdateBrowser(browser_item): # 创建或者更新窗口
headers = {
# 7 1115ce586624431a8b7bcba8b1e82035
# 9 db474b6936a24a7a949377a6d90b4dcb
'id': browser_item['id'], # 有值时为修改,无值是添加
# 'groupId': '2c996b378054663b01805a69f0344410', # 群组ID,绑定群组时传入,如果登录的是子账号,则必须赋值,否则会自动分配到主账户下面去
'platform': 'https://discord.com', # 账号平台
'platformIcon': 'discord', # 取账号平台的 hostname 或者设置为other
'url': '', # 打开的url,多个用,分开
'name': 'Discord', # 窗口名称
'remark': '登录discord', # 备注
'userName': '', # 用户账号
'password': '', # 用户密码
'cookie': '', # Cookie,符合标准的序列化字符串,具体可参考文档
# IP库,默认ip-api,选项 ip-api | ip123in | luminati,luminati为Luminati专用
'ipCheckService': 'ip-api',
'proxyMethod': 2, # 代理方式 2自定义 3 提取IP
# 代理类型 ['noproxy', 'http', 'https', 'socks5', '911s5']
'proxyType': 'http',
'host': '192.168.100.139', # 代理主机
'port': 2009, # 代理端口
'proxyUserName': '', # 代理账号
'proxyPassword': '', # 代理密码
'ip': '', # ip
'city': '', # 城市
'province': '', # 州/省
'country': '', # 国家地区
'dynamicIpUrl': '', # 提取IP url,参考文档
'dynamicIpChannel': '', # 提取IP服务商,参考文档
'isDynamicIpChangeIp': True, # 提取IP方式,参考文档
'isGlobalProxyInfo': False, # 提取IP设置,参考文档
'isIpv6': False, # 是否是IP6
'syncTabs': True, # 同步标签页
'syncCookies': True, # 同步Cookie
'syncIndexedDb': False, # 同步IndexedDB
'syncLocalStorage': False, # 同步 Local Storage
'syncBookmarks': True, # 同步书签
'credentialsEnableService': True, # 禁止保存密码弹窗
'syncHistory': False, # 保存历史记录
'clearCacheFilesBeforeLaunch': False, # 启动前清理缓存文件
'clearCookiesBeforeLaunch': False, # 启动前清理cookie
'clearHistoriesBeforeLaunch': False, # 启动前清理历史记录
'randomFingerprint': False, # 每次启动均随机指纹
# 浏览器窗口工作台页面,默认 chuhai2345,不展示填 disable 可选 chuhai2345 | localserver | disable
'workbench': 'chuhai2345',
'disableGpu': False, # 关闭GPU硬件加速 False取反 默认 开启
'enableBackgroundMode': False, # 关闭浏览器后继续运行应用
'disableTranslatePopup': False, # 翻译弹窗
'syncExtensions': False, # 同步扩展应用数据
'syncUserExtensions': False, # 跨窗口同步扩展应用
'allowedSignin': False, # 允许google账号登录浏览器
'abortImage': False, # 禁止加载图片
'abortMedia': False, # 禁止视频自动播放
'muteAudio': False, # 禁止播放声音
'stopWhileNetError': False, # 网络不通停止打开
"browserFingerPrint": {
'coreVersion': '104', # 内核版本,默认104,可选92
'ostype': 'PC', # 操作系统平台 PC | Android | IOS
'os': 'Win32',
# 为navigator.platform值 Win32 | Linux i686 | Linux armv7l | MacIntel,当ostype设置为IOS时,设置os为iPhone,ostype为Android时,设置为 Linux i686 | | Linux armv7l
'version': '', # 浏览器版本,不填随机
# 'userAgent': '', # ua,不填则自动生成
'isIpCreateTimeZone': True, # 基于IP生成对应的时区
'timeZone': '', # 时区,isIpCreateTimeZone 为False时,参考附录中的时区列表
'timeZoneOffset': 0, # isIpCreateTimeZone 为False时设置,时区偏移量
'webRTC': '0', # webrtc 0替换 | 1允许 | 2禁止
'ignoreHttpsErrors': False, # 忽略https证书错误,True | False
'position': '1', # 地理位置 0询问 | 1允许 | 2禁止
'isIpCreatePosition': True, # 是否基于IP生成对应的地理位置
'lat': '', # 经度 isIpCreatePosition 为False时设置
'lng': '', # 纬度 isIpCreatePosition 为False时设置
'precisionData': '', # 精度米 isIpCreatePosition 为False时设置
'isIpCreateLanguage': True, # 是否基于IP生成对应国家的浏览器语言
'languages': '', # isIpCreateLanguage 为False时设置,值参考附录
'isIpCreateDisplayLanguage': False, # 是否基于IP生成对应国家的浏览器界面语言
'displayLanguages': '', # isIpCreateDisplayLanguage 为False时设置,默认为空,即跟随系统,值参考附录
'openWidth': 1280, # 窗口宽度
'openHeight': 720, # 窗口高度
'resolutionType': '0', # 分辨率类型 0跟随电脑 | 1自定义
'resolution': '1920 x 1080', # 自定义分辨率时,具体值
'windowSizeLimit': True, # 分辨率类型为自定义,且ostype为PC时,此项有效,约束窗口最大尺寸不超过分辨率
'devicePixelRatio': 1, # 显示缩放比例,默认1,填写时,建议 1|1.5 | 2 | 2.5 | 3
'fontType': '2', # 字体生成类型 0系统默认 | 1自定义 | 2随机匹配
'font': '', # 自定义或随机匹配时,设置的字体值,值参考附录字体
'canvas': '0', # canvas 0随机|1关闭
'canvasValue': None, # canvas为0随机时设置, 噪音值 10000 - 1000000
'webGL': '0', # webGL图像,0随机|1关闭
'webGLValue': None, # webGL为0时,随机噪音值 10000 - 1000000
'webGLMeta': '0', # webgl元数据 0自定义|1关闭
'webGLManufacturer': '', # webGLMeta 自定义时,webGL厂商值,参考附录
'webGLRender': '', # webGLMeta自定义时,webGL渲染值,参考附录
'audioContext': '0', # audioContext值,0随机|1关闭
'audioContextValue': None, # audioContext为随机时,噪音值, 1 - 100 ,关闭时默认10
'mediaDevice': '0', # 媒体设备信息,0自定义|1关闭
'mediaDeviceValue': None, # mediaDevice 噪音值,不填则由系统生成,填值时,参考附录
'speechVoices': '0', # Speech Voices,0随机|1关闭
'speechVoicesValue': None, # speechVoices为0时,随机时由系统自动生成,自定义时,参考附录
'hardwareConcurrency': '4', # 硬件并发数
'deviceMemory': '8', # 设备内存
'doNotTrack': '0', # doNotTrack 1开启|0关闭
# ClientRects True使用相匹配的值代替您真实的ClientRects | False每个浏览器使用当前电脑默认的ClientRects
'clientRectNoiseEnabled': True,
'clientRectNoiseValue': 0, # clientRectNoiseEnabled开启时随机,值 1 - 999999
'portScanProtect': '0', # 端口扫描保护 0开启|1关闭
'portWhiteList': '', # 端口扫描保护开启时的白名单,逗号分隔
'deviceInfoEnabled': True, # 自定义设备信息,默认开启
'computerName': '', # deviceInfoEnabled 为True时,设置
'macAddr': '', # deviceInfoEnabled 为True时,设置
# ssl是否禁用特性,默认不禁用,注意开启后自定义设置时,有可能会导致某些网站无法访问
'disableSslCipherSuitesFlag': False,
'disableSslCipherSuites': None, # ssl 禁用特性,序列化的ssl特性值,参考附录
'enablePlugins': False, # 是否启用插件指纹
'plugins': '' # enablePlugins为True时,序列化的插件值,插件指纹值参考附录
}
}
res = request.post(f"{url}/browser/update", json=headers).json()
browserId = res['data']['id']
print(browserId)
return browserId
def openBrowser(browser_item): # 打开窗口
headers = {'id': f'{createOrUpdateBrowser(browser_item)}'}
res = request.post(f"{url}/browser/open", json=headers).json()
print(res)
return res
def closeBrowser(id): # 关闭窗口
headers = {'id': f'{id}'}
request.post(f"{url}/browser/close", json=headers).json()
def deleteBrowser(id): # 删除窗口
headers = {'id': f'{id}'}
print(request.post(f"{url}/browser/delete", json=headers).json())
if __name__ == '__main__':
createOrUpdateBrowser()
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support import expected_conditions as EC
import pyperclip
import time
from loguru import logger
from bit_api import *
import asyncio
from Helper import RedisCon
import json
from multiprocessing import Process
import random
class ToWork:
def __init__(self):
self.init_prompt = '/imagine'
# 随机等待
def random_sleep(self, min, max):
time.sleep(random.uniform(min, max))
# 粘贴板插入数据
def driver_copy(self, driver, value):
pyperclip.copy(value)
# 粘贴
def driver_paste(self, dricer, value):
ActionChains(dricer).send_keys(Keys.CONTROL, value).perform()
# 元素出现的循环次数
def CheckElement(self, dricer, type, value, num=10):
for i in range(num):
element = self.sleepElementShow(dricer, type, value)
if element:
# 随机等待
self.random_sleep(1, 2)
return element
time.sleep(1)
# 等待元素出现--自定义
def sleepElementShow(self, dricer, type, value):
try:
element = dricer.find_element(type, value)
if element:
return element
return False
except Exception as e:
return False
# 元素点击
def ElementClick(self, dricer, type, value, name=''):
try:
task = dricer.find_element(type, value)
if task:
# 随机等待
self.random_sleep(1, 2)
# js 点击
dricer.execute_script("arguments[0].click()", task)
# task.click()
logger.info(f'{name}-元素存在,开始点击')
return True
logger.info(f'{name}-元素不存在')
return False
except Exception as e:
logger.info(f'{name}-元素不存在')
logger.error(e)
return False
# 循环输入指令
def enter_prompt(self, dricer, tasks):
# task是一个列表
for task in tasks:
# 输入初始指令
logger.info('输入初始指令')
self.driver_paste(dricer, self.init_prompt)
# 选择指令
prompt_el = '//*[@id="autocomplete-0"]'
# 等待选择指令元素出现
element = self.CheckElement(dricer, 'xpath', prompt_el, 5)
if element:
status = self.ElementClick(dricer, 'xpath', prompt_el, '选择指令')
if status:
self.driver_paste(dricer, task['prompt'])
# 回车
ActionChains(dricer).send_keys(Keys.ENTER).perform()
time.sleep(2)
# 输入完成,循环获取生成成功后的链接
# 获取当前事件
while True:
# 获取任务列表
def get_current_tasks(self):
# 测试
# 当前浏览器需要执行的任务
cur_tasks = []
for j in range(2):
# task = RedisCon.get_task(key)
task = {
'prompt': "a very cute boys, 16 years old, fly a kite,in beautifulstreet,full body,3d art, c4d, octane render, ray tracting,clay material,popmart blind box,Pixar trend, animationlighting, depth of field, ultra detailed --q 5",
'user_id': 1,
'task_id': 1,
'prompt_img': [],
'result_img': "",
'type': 2,
'callback': 'http://test.phpgpt.com/api/users/prompt/callback',
}
if task:
# 将任务添加到列表
cur_tasks.append(task)
if len(cur_tasks):
return cur_tasks
return []
def open_browser(self, browser_item):
try:
logger.info(f'浏览器id-{browser_item["num"]},开始执行')
# /browser/open 接口会返回 selenium使用的http地址,以及webdriver的path,直接使用即可
res = openBrowser(browser_item)
driverPath = res['data']['driver']
debuggerAddress = res['data']['http']
chrome_options = Options()
chrome_options.add_experimental_option("debuggerAddress", debuggerAddress)
dricer = webdriver.Chrome(service=ChromeService(driverPath), options=chrome_options)
logger.info('打开窗口')
dricer.get('https://discord.com/channels/@me')
# dricer.get('https://discord.com/login')
return True
except Exception as e:
logger.error(e)
return False
# 初始化任务
def on_start(self, browser_item):
while True:
# 先获取任务
tasks = self.get_current_tasks()
if len(tasks):
# 打开窗口
browser_status = self.open_browser(browser_item)
if not browser_status:
# 浏览器打开失败
logger.error(f'{browser_item["num"]},窗口打开失败,检查代理ip')
else:
# 开始执行任务
statsu = self.run_task(browser_item, tasks)
if statsu == False:
logger.info('请检查自动化')
else:
time.sleep(3)
logger.info('不存在任务,等待3秒')
def run_task(self, browser_item, tasks):
try:
logger.info('等待登录成功')
# 判断页面是否加载完成
page_load = '//*[@data-list-item-id="guildsnav___home"]'
element = self.CheckElement(dricer, 'xpath', page_load)
if not element:
logger.error('登录失败')
# 去登陆
return False
# 登录成功
logger.info('登录成功')
# 等待服务器元素出现
server_el = '//div[contains(@aria-label, "测试服务器")]'
element = self.CheckElement(dricer, 'xpath', server_el)
if element:
# 点击指定服务器
status = self.ElementClick(dricer, 'xpath', server_el, '测试服务器')
if status:
# 等待聊天窗口出现
input_el = '//*[@role="textbox"]'
element = self.CheckElement(dricer, 'xpath', input_el)
if element:
logger.info('聊天窗口存在')
# 输入指令
self.enter_prompt(dricer, tasks)
logger.info('元素未找到,任务暂停')
return False
except Exception as e:
logger.error(e)
return False
if __name__ == '__main__':
key = 'laravel_database_midjourney_prompt'
bro = ToWork()
# while False:
# tasks = list()
# # 获取任务数量
# task_num = RedisCon.get_task_num(key)
# print(task_num, '个任务')
# nums = 3
# if task_num and task_num > 0:
# if task_num < nums:
# process = task_num
# else:
# # 最多同时3个任务
# process = nums
# loop = asyncio.get_event_loop()
# for i in range(process):
# task = RedisCon.get_task(key)
# if task:
# # 存在任务--转对象
# task = json.loads(task)
# logger.info(task)
# tasks.append(bro.OpenBros(task))
# loop.run_until_complete((asyncio.wait(tasks, timeout=30)))
# else:
# time.sleep(3)
# continue
# 测试-两个任务
browser_list = [
{
# 7
'num': 7,
'id': '1115ce586624431a8b7bcba8b1e82035'
},
{
# 9
'num': 9,
'id': 'db474b6936a24a7a949377a6d90b4dcb'
}
]
#
l = []
for item in browser_list:
p = Process(target=bro.OpenBros, args=(item,))
p.start()
l.append(p) # 将进程加入列表中,这样 5 个子进程就会全部执行
for p in l:
p.join() # 当5 个子进程全部执行完,才会执行主进程
import asyncio
import json
import time
import openai
from loguru import logger
from Helper import RedisCon, Helper
# key
api_key = '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'
# 组织标识
organization = 'org-ZMM4WGwi3vmM7o5y51IljTcW'
# 回调接口
def gpt_callback(res_prompt, task):
url = task['callback']
print(1)
def split_prompt(response):
logger.info(response)
def get_gpt_message(task):
openai.organization = organization
openai.api_key = api_key
prompt_num = task['prompt_num']
# 指令
prompt = task['prompt']
logger.info('开始访问open_ai')
try:
response = openai.Completion.create(
model="text-davinci-003",
prompt="请根据我给你的指令生成"+str(prompt_num)+"条相似的指令,指令如下:"+prompt,
temperature=1,
max_tokens=500,
top_p=1,
frequency_penalty=0.0,
presence_penalty=0.0,
stop=None
)
logger.info(response)
except Exception as e:
logger.info(f"错误 {e}")
# 拆分指令
res_prompt = split_prompt(response, task)
# 回调
gpt_callback(res_prompt, task)
logger.info('open_ai_结束')
if __name__ == '__main__':
key = 'laravel_database_gpt_text_prompt'
logger.info('开始执行')
while True:
tasks = list()
# 获取任务数量
task_num = RedisCon.get_task_num(key)
print(task_num, '个任务')
nums = 10
if task_num and task_num > 0:
if task_num < nums:
process = task_num
else:
# 最多同时10个任务
process = nums
loop = asyncio.get_event_loop()
for i in range(process):
task = RedisCon.get_task(key)
if task:
# 存在任务--转对象
task = json.loads(task)
logger.info(task)
tasks.append(get_gpt_message(task))
loop.run_until_complete((asyncio.wait(tasks, timeout=30)))
else:
time.sleep(3)
continue
# test_task = {
# 'type': 2,
# 'prompt': 'a very cute boys, 16 years old, fly a kite,in beautifulstreet,full body,3d art, c4d, octane render, ray tracting,clay material,popmart blind box,Pixar trend, animationlighting, depth of field, ultra detailed --q 5',
# 'prompt_img': '',
# 'user_id': 1,
# 'prompt_num': 5,
# 'task_id': 10,
# 'callback': 'http://test.phpgpt.com/api/users/gpt/callback'
# }
# get_gpt_message(test_task)
# 图像质量增强
# client_id 为官网获取的AK, client_secret 为官网获取的SK
import requests
import base64
from loguru import logger
class ImgEnhance:
def __init__(self):
self.host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=SEVtEVlQPyqf55mgGt5u5Ad2&client_secret=0CNGhjMrgXRPmDE014S2IXyHLKCuTcfK'
self.res_token = ''
# 获取百度api的token
def get_api_token(self):
response = requests.get(self.host)
if response:
print(response.json())
access_token = response.json()['access_token']
self.res_token = access_token
print(access_token)
self.Work()
# self.Img_enlarge()
# 开始图片增强处理
def Work(self):
'''
图像清晰度增强
'''
request_url = "https://aip.baidubce.com/rest/2.0/image-process/v1/image_definition_enhance"
# 二进制方式打开图片文件
f = open('./res_assets/test_10.jpg', 'rb')
img = base64.b64encode(f.read())
params = {"image": img}
request_url = request_url + "?access_token=" + self.res_token
headers = {'content-type': 'application/x-www-form-urlencoded'}
response = requests.post(request_url, data=params, headers=headers)
if response:
imgdata = base64.b64decode(response.json()["image"])
file = open(r'./res_assets/test_10.jpg', 'wb')
file.write(imgdata)
file.close()
# 图片无损放大
def Img_enlarge(self):
'''
图片无损放大
'''
request_url = "https://aip.baidubce.com/rest/2.0/image-process/v1/image_quality_enhance"
# 二进制方式打开图片文件
f = open('./res_assets/test_10.jpg', 'rb')
img = base64.b64encode(f.read())
params = {"image": img}
request_url = request_url + "?access_token=" + self.res_token
headers = {'content-type': 'application/x-www-form-urlencoded'}
response = requests.post(request_url, data=params, headers=headers)
if response:
imgdata = base64.b64decode(response.json()["image"])
file = open(r'./res_assets/test_10.jpg', 'wb')
file.write(imgdata)
file.close()
if __name__ == '__main__':
Img = ImgEnhance()
Img.get_api_token()
import cv2
import os
import pathlib
target_path = "./res_assets"
def mkdir(path):
path = path.strip()
path = path.rstrip("\\")
isExists = os.path.exists(path)
if not isExists:
os.makedirs(path)
print(path + ' 创建成功')
return True
else:
print(path + ' 目录已存在')
return False
def split_img(img_file):
img = cv2.imread(img_file)
# 这一段是改为灰色
# img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img_h = img.shape[0] # 高度
img_w = img.shape[1] # 宽度
# h1_half = int(img_h / 2)
# w1_half = int(img_w / 2)
h1_half = img_h // 2
w1_half = img_w // 2
img_name = os.path.basename(img_file)
for i in range(4):
img1 = img[int(i / 2) * h1_half: h1_half * (int(i / 2) + 1), int(i % 2) * w1_half: (int(i % 2) + 1) * w1_half]
img1_path = os.path.join(target_path, f"{img_name[:-4]}_{i}.jpg")
print("spilt img:", img1_path)
cv2.imwrite(img1_path, img1)
if __name__ == '__main__':
mkdir(target_path)
split_img(os.path.join('./assets', 'test.png'))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment