1、参考文档
# docs:https://www.okx.com/docs-v5/zh/#websocket-api
# example_code:https://github.com/bybit-exchange/api-connectors/blob/master/wss_demo/python/ws_public_demo.py
# 实盘API交易地址如下:
# REST:https://www.okx.com/
# WebSocket公共频道:wss://ws.okx.com:8443/ws/v5/public
# WebSocket私有频道:wss://ws.okx.com:8443/ws/v5/private
# REST:https://aws.okx.com
# WebSocket公共频道:wss://wsaws.okx.com:8443/ws/v5/public
# WebSocket私有频道:wss://wsaws.okx.com:8443/ws/v5/private
2、所需外部包
import json
import time
from datetime import datetime
import websocket
import pymysql
import requests
3、整体代码
host = '***.***.***.***'
user = '*****'
password = '*****'
database_exchange_pairs = 'data'
# 数据库连接对象
db_pairs_info_okx = pymysql.connect(host=host, user=user, password=password, database=database_exchange_pairs)
# 建立数据库游标
cursor_exchange_pairs_okx = db_pairs_info_okx.cursor()
prev_send_time = int(time.time() * 1000)
def get_coin_pair():
sql_price = "select coin_pairs from data.okx"
cursor_exchange_pairs_okx.execute(sql_price)
temp_pair = list(cursor_exchange_pairs_okx.fetchall())
db_pairs_info_okx.commit()
now_pair = [_[0] for _ in temp_pair]
return now_pair
def get_coin_list(el:list):
# new_list = [temp_file_list[_x:_x + 175] for _x in range(0, len(temp_file_list), 175)]
coin_list = []
for one_coin in el:
args = []
# 行情频道 tickers
temp_dict = {"channel": "tickers", "instType": "ANY", "instId": "_x"}
# 标记价格 mark-price
# temp_dict = {"channel": "mark-price", "instType": "ANY", "instId": "_x"}
temp_dict['instId'] = one_coin
args.append(temp_dict)
coin_list.append(args)
return coin_list
# 交易对校验
def check_pairs(database: list, exchange: list):
"""database:databases pairs,exchange:online pairs"""
delte_list = [_ for _ in database if _ not in exchange]
for one_pair in delte_list:
# delete
sql = "DELETE FROM okx WHERE coin_pairs ='{}'".format(one_pair)
cursor_exchange_pairs_okx.execute(sql)
db_pairs_info_okx.commit()
insert_list = [_ for _ in exchange if _ not in database]
for one_pair in insert_list:
# insert
sql = "INSERT INTO okx(coin_pairs,coin_pairs_temp) values ('{}','{}')".format(one_pair,str(one_pair).replace('-','/'))
cursor_exchange_pairs_okx.execute(sql)
db_pairs_info_okx.commit()
def pop_any(el: list):
"""去除重复法币交易对"""
for one_el in el:
base = one_el[:str(one_el).find('-')]
t_symbols = base + '-' + 'USDT'
if one_el[-4:] == 'USDC' and (t_symbols in el):
el.remove(one_el)
def get_okx_symbols(instype):
"""instype:{SPOT现货,SWAP永续合约,OPTION期权,FUTURES交割合约}"""
coin_list = []
url = f'https://aws.okx.com/api/v5/market/tickers?instType={instype}'
response = requests.get(url)
result = response.json()['data']
for one_pair in result:
pairs = str(one_pair['instId'])
if pairs[-4:] in ["USDC", "USDT"]:
# temp_pairs = str(pairs).replace('-', '/')
coin_list.append(pairs)
pop_any(coin_list)
return coin_list
def on_message(ws, message):
data = json.loads(message)
print(data)
# 构建sql语句
try:
coin_name = data['data'][-1]['instId']
temp_price = data['data'][-1]['last']
sql = "UPDATE okx SET coin_price = {} WHERE coin_pairs = '{}'".format(temp_price, coin_name)
cursor_exchange_pairs_okx.execute(sql)
db_pairs_info_okx.commit()
except:
pass
def on_error(ws, error):
print('we got error')
print(error)
print('print error complete')
def on_close(ws):
print("### about to close please don't close ###")
def on_open(ws):
print('opened')
# for one_coin in coin_list:
# coin_name = one_coin.replace('/', '')
# subscribe = {"symbol": "{}".format(coin_name), "topic": "trade", "event": "sub", "params": {"binary": False}}
# ws.send(json.dumps(subscribe))
for topic in coin_list:
subscribe = {"op": "subscribe", "args": topic}
ws.send(json.dumps(subscribe))
def on_pong(ws, *data):
print('pong received')
def on_ping(ws, *data):
now = datetime.now()
dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
print("date and time =", dt_string)
print('ping received')
def connWS():
ws = websocket.WebSocketApp(
"wss://ws.okx.com:8443/ws/v5/public",
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_ping=on_ping,
on_pong=on_pong,
on_open=on_open
)
ws.run_forever(
# http_proxy_host='127.0.0.1',
# http_proxy_port=1087,
ping_interval=25,
ping_timeout=10
)
if __name__ == "__main__":
online_pairs = get_okx_symbols("SPOT")
data_pairs = get_coin_pair()
# print(online_pairs)
# print(data_pairs)
# time.sleep(1000)
check_pairs(data_pairs, online_pairs)
# new_list = [temp_file_list[_x:_x + 175] for _x in range(0, len(temp_file_list), 175)]
coin_list = get_coin_list(online_pairs)
start_time = time.time()
while True:
try:
websocket.enableTrace(False)
connWS()
except Exception as e:
print(e)
time.sleep(10)
now_time = time.time()
walk_time = now_time-start_time
if walk_time >= 86400:
start_time = time.time()
online_pairs = get_okx_symbols("SPOT")
data_pairs = get_coin_pair()
check_pairs(data_pairs, online_pairs)
Great Done!
markdown test!