Commit 000830f0 by zhangheng

Initial commit

parents
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
from environs import Env
env = Env()
# Redis数据库本地
REDIS_HOST_LOCAL = env.str('REDIS_HOST_LOCAL', '127.0.0.1')
# Redis数据库密码, 如无则填None
REDIS_PASSWORD_LOCAL = env.str('REDIS_PASSWORD_LOCAL', 'abcd654321')
# Redis数据库端口
REDIS_PORT_LOCAL = env.int('REDIS_PORT_LOCAL', 6379)
REDIS_DB_LOCAL = env.int('REDIS_DB_LOCAL', 14)
# 本地数据库配置
DB_CONNECTION_LOCAL = 'mysql'
DB_HOST_LOCAL = '127.0.0.1'
DB_PORT_LOCAL = 3306
DB_DATABASE_LOCAL = 'dexnav'
DB_USERNAME_LOCAL = 'dexnav'
DB_PASSWORD_LOCAL = 'ayDR66mczRJG38Ck'
import pymysql
from config.db_config import *
class MysqlClientLocal:
def __init__(self):
self.conn = pymysql.connect(
host=DB_HOST_LOCAL,
port=DB_PORT_LOCAL,
user=DB_USERNAME_LOCAL,
password=DB_PASSWORD_LOCAL,
database=DB_DATABASE_LOCAL,
charset='utf8')
def getSymbol(self):
cursor = self.conn.cursor()
sql = 'select symbol_short, target from symbols where exchange = "binance"'
cursor.execute(sql)
result = cursor.fetchall()
return result
import json
import redis
from config.db_config import *
class RedisClientLocal(object):
def __init__(self, host=REDIS_HOST_LOCAL, port=REDIS_PORT_LOCAL, password=REDIS_PASSWORD_LOCAL, db=REDIS_DB_LOCAL):
"""
初始化Redis连接
:param host:
:param port:
:param password:
:param db:
"""
self.db = redis.StrictRedis(host=host, port=port, password=password, db=db, decode_responses=True)
def push_lst(self, key, data):
self.db.lpush(key, json.dumps(data))
def pop_lst(self, key):
return self.db.rpop(key)
def get_lst_len(self, key):
return self.db.llen(key)
def get_ex(self, key):
return self.db.get(key)
def set_ex(self, key, ttl, value):
return self.db.setex(key, ttl, value)
def con(self):
return self.db
import json
import websocket
from db.redis_db import RedisClientLocal
def on_message(ws, message):
ticker = json.loads(message)
ticker_lst = list()
for item in ticker:
ticker_arr = {
'symbol': item['s'],
'price': item['c'],
'exchange': 'binance',
'date': item['E'],
}
ticker_lst.append(ticker_arr)
# 任务入队
redis_con.push_lst('binance:ticker', ticker_lst)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
if __name__ == '__main__':
redis_con = RedisClientLocal()
ws = websocket.WebSocketApp('wss://stream.binance.com:9443/ws/!ticker@arr', on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.run_forever()
import time
import taos
import json
from db.redis_db import RedisClientLocal
from db.mysql_db import MysqlClientLocal
class TDConnection(object):
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = taos.connect(host="127.0.0.1",
user="root",
password="taosdata",
database="symbol_monitor",
port=6030)
return cls._instance
class TDConn:
@staticmethod
def TD_client():
"""连接数据库单例"""
return taos.connect(TDConnection())
@staticmethod
def TD_close(cl):
"""关闭数据库连接"""
cl.close()
@staticmethod
def TD_insert(cl, table, data):
"""插入数据"""
try:
sql = f"INSERT INTO {table} VALUES {data};"
print(sql)
cl.execute(sql)
except Exception as e:
print(e)
@staticmethod
def TD_query_all(cl, sql):
"""查询所有数据"""
try:
cl.execute(sql)
return cl.fetchall()
except Exception as e:
print(e)
@staticmethod
def TD_query_one(cl, sql):
"""查询一条数据"""
try:
cl.execute(sql)
return cl.fetchone()
except Exception as e:
print(e)
def get_mysql_data():
redis_client = RedisClientLocal()
mysql_client = MysqlClientLocal()
data = mysql_client.getSymbol()
symbol_dict = dict()
for item in data:
name = item[0] + item[1]
symbol = item[0]
target = item[1]
symbol_dict[name] = {
"name": name,
"symbol": symbol,
"target": target,
}
redis_client.con().set("binance:symbols:dict", json.dumps(symbol_dict))
return symbol_dict
def get_redis_symbol():
redis_client = RedisClientLocal()
data = redis_client.con().get("binance:symbols:dict")
if data:
symbol_dict = json.loads(data)
else:
# print("redis中没有数据,从mysql中获取")
symbol_dict = get_mysql_data()
return symbol_dict
def main():
nums = 40
cl = TDConn.TD_client()
cl.select_db("symbol_monitor")
redis_client = RedisClientLocal()
while True:
task_num = redis_client.get_lst_len("binance:ticker")
if task_num > 0:
symbol_dict = get_redis_symbol()
if task_num < nums:
process = task_num
else:
process = nums
sql = ''
for i in range(process):
data = redis_client.pop_lst("binance:ticker")
if data:
data = json.loads(data)
for item in data:
name = item.get("symbol")
if name not in symbol_dict:
continue
symbol = symbol_dict.get(name).get("symbol")
target = symbol_dict.get(name).get("target")
price = item.get("price")
exchange = item.get("exchange")
date = item.get("date")
redis_client.con().set(f"laravel_cache:symbol-last-price:{exchange}:{symbol}", price)
if date and symbol and target and price and exchange:
sql += f'({date}, "{symbol}", "{target}", {price}, "{exchange}") '
# 插入数据
if sql != '':
TDConn.TD_insert(cl, "price_binance", sql)
print("插入数据成功")
else:
time.sleep(1)
if __name__ == '__main__':
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment