Websockt数据爬取实现


1、参考文档

# docs:https://www.okx.com/docs-v5/zh/#websocket-api
# example_code:https://github.com/bybit-exchange/api-connectors/blob/master/wss_demo/python/ws_public_demo.py

# 实盘API交易地址如下:
# REST:https://www.okx.com/
# WebSocket公共频道:wss://ws.okx.com:8443/ws/v5/public
# WebSocket私有频道:wss://ws.okx.com:8443/ws/v5/private
# REST:https://aws.okx.com
# WebSocket公共频道:wss://wsaws.okx.com:8443/ws/v5/public
# WebSocket私有频道:wss://wsaws.okx.com:8443/ws/v5/private

2、所需外部包

import json
import time
from datetime import datetime
import websocket
import pymysql
import requests

3、整体代码

host = '***.***.***.***'
user = '*****'
password = '*****'
database_exchange_pairs = 'data'
# 数据库连接对象
db_pairs_info_okx = pymysql.connect(host=host, user=user, password=password, database=database_exchange_pairs)
# 建立数据库游标
cursor_exchange_pairs_okx = db_pairs_info_okx.cursor()

prev_send_time = int(time.time() * 1000)


def get_coin_pair():
    sql_price = "select coin_pairs from data.okx"
    cursor_exchange_pairs_okx.execute(sql_price)
    temp_pair = list(cursor_exchange_pairs_okx.fetchall())
    db_pairs_info_okx.commit()
    now_pair = [_[0] for _ in temp_pair]
    return now_pair
def get_coin_list(el:list):
    # new_list = [temp_file_list[_x:_x + 175] for _x in range(0, len(temp_file_list), 175)]
    coin_list = []
    for one_coin in el:
        args = []
        #  行情频道 tickers
        temp_dict = {"channel": "tickers", "instType": "ANY", "instId": "_x"}
        #  标记价格 mark-price
        # temp_dict = {"channel": "mark-price", "instType": "ANY", "instId": "_x"}
        temp_dict['instId'] = one_coin
        args.append(temp_dict)
        coin_list.append(args)
    return coin_list
# 交易对校验
def check_pairs(database: list, exchange: list):
    """database:databases pairs,exchange:online pairs"""
    delte_list = [_ for _ in database if _ not in exchange]
    for one_pair in delte_list:
        # delete
        sql = "DELETE FROM okx WHERE coin_pairs ='{}'".format(one_pair)
        cursor_exchange_pairs_okx.execute(sql)
        db_pairs_info_okx.commit()
    insert_list = [_ for _ in exchange if _ not in database]
    for one_pair in insert_list:
        # insert
        sql = "INSERT INTO okx(coin_pairs,coin_pairs_temp) values ('{}','{}')".format(one_pair,str(one_pair).replace('-','/'))
        cursor_exchange_pairs_okx.execute(sql)
        db_pairs_info_okx.commit()


def pop_any(el: list):
    """去除重复法币交易对"""
    for one_el in el:
        base = one_el[:str(one_el).find('-')]
        t_symbols = base + '-' + 'USDT'
        if one_el[-4:] == 'USDC' and (t_symbols in el):
            el.remove(one_el)


def get_okx_symbols(instype):
    """instype:{SPOT现货,SWAP永续合约,OPTION期权,FUTURES交割合约}"""
    coin_list = []
    url = f'https://aws.okx.com/api/v5/market/tickers?instType={instype}'
    response = requests.get(url)
    result = response.json()['data']
    for one_pair in result:
        pairs = str(one_pair['instId'])
        if pairs[-4:] in ["USDC", "USDT"]:
            # temp_pairs = str(pairs).replace('-', '/')
            coin_list.append(pairs)
    pop_any(coin_list)
    return coin_list


def on_message(ws, message):
    data = json.loads(message)
    print(data)
    # 构建sql语句
    try:
        coin_name = data['data'][-1]['instId']
        temp_price = data['data'][-1]['last']
        sql = "UPDATE okx SET coin_price = {} WHERE coin_pairs = '{}'".format(temp_price, coin_name)
        cursor_exchange_pairs_okx.execute(sql)
        db_pairs_info_okx.commit()
    except:
        pass


def on_error(ws, error):
    print('we got error')
    print(error)
    print('print error complete')


def on_close(ws):
    print("### about to close please don't close ###")


def on_open(ws):
    print('opened')
    # for one_coin in coin_list:
    #     coin_name = one_coin.replace('/', '')
    #     subscribe = {"symbol": "{}".format(coin_name), "topic": "trade", "event": "sub", "params": {"binary": False}}
    #     ws.send(json.dumps(subscribe))
    for topic in coin_list:
        subscribe = {"op": "subscribe", "args": topic}
        ws.send(json.dumps(subscribe))


def on_pong(ws, *data):
    print('pong received')


def on_ping(ws, *data):
    now = datetime.now()
    dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
    print("date and time =", dt_string)
    print('ping received')


def connWS():
    ws = websocket.WebSocketApp(
        "wss://ws.okx.com:8443/ws/v5/public",
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
        on_ping=on_ping,
        on_pong=on_pong,
        on_open=on_open
    )
    ws.run_forever(
        # http_proxy_host='127.0.0.1',
        # http_proxy_port=1087,
        ping_interval=25,
        ping_timeout=10
    )


if __name__ == "__main__":
    online_pairs = get_okx_symbols("SPOT")
    data_pairs = get_coin_pair()
    # print(online_pairs)
    # print(data_pairs)
    # time.sleep(1000)
    check_pairs(data_pairs, online_pairs)
    # new_list = [temp_file_list[_x:_x + 175] for _x in range(0, len(temp_file_list), 175)]
    coin_list = get_coin_list(online_pairs)
    start_time = time.time()
    while True:
        try:
            websocket.enableTrace(False)
            connWS()
        except Exception as e:
            print(e)
            time.sleep(10)
        now_time = time.time()
        walk_time = now_time-start_time
        if walk_time >= 86400:
            start_time = time.time()
            online_pairs = get_okx_symbols("SPOT")
            data_pairs = get_coin_pair()
            check_pairs(data_pairs, online_pairs)

Great Done!