使用 Python 实时抓取 Binance K 线:WebSocket、协程与 Parquet 一站式指南

·

关键词:Binance WebSocket、Python 异步编程、K 线抓取、Pandas、Parquet 存储、实时行情、REST API、Python-binance 依赖重连机制、数据持久化

为什么选择 WebSocket 而非 REST API?

Binance 提供两条获取 K 线数据的路径:REST APIWebSocket。其中:

借助 Python 的 asyncioReconnectingWebsocket,我们既能保持连接稳定性,又能把接收到的 Binance K 线数据 秒级写盘,供后续回测或策略研究。


快速组建 WebSocket 连接

我们先为三大交易区(现货、USDT 永续、币本位永续)封装统一函数,免去每次重复写拼接地址的麻烦。

from ws_basics import ReconnectingWebsocket

SPOT_STREAM_URL   = 'wss://stream.binance.com:9443/'
USDT_FUTURES_URL  = 'wss://fstream.binance.com/'
COIN_FUTURES_URL  = 'wss://dstream.binance.com/'

def get_socket(symbols, interval, market='spot'):
    """
    生成任意外部市场对应的 WebSocket 对象
    """
    base = dict(
        spot       = (SPOT_STREAM_URL,  'stream.binance.com'),
        usdt_futures=(USDT_FUTURES_URL, 'fstream.binance.com'),
        coin_futures=(COIN_FUTURES_URL, 'dstream.binance.com'),
    )[market]

    channels = [f'{s.lower()}@kline_{interval}' for s in symbols]
    return ReconnectingWebsocket(
        path='/'.join(channels),
        url=base[0],
        prefix='stream?streams='
    )

运行此函数,等价于只用一个简洁接口即可订阅多标的、多周期的行情。


解码和校验 K 线原始数据

Binance kline 字段包含十余项内容,但我们只关心最终 十列字段

WebSocket 字段 (k.*)语义
t开盘时间 (ms)
o, h, l, c开高低收
v成交量 (基础币)
q成交额 (计价币)
n成交笔数
V主动买入成交量
Q主动买入成交额
xK 线是否收盘 (布尔)

我们使用 防御式编程,只要满足以下三项就入库:

  1. 字段存在;
  2. 事件类型是 kline
  3. 收到已收盘的那一条记录(x=True)。
def handle_candle_data(res):
    if 'data' not in res:
        return None
    data = res['data']
    if data.get('e') != 'kline' or 'k' not in data:
        return None
    candle = data['k']
    if not candle.get('x', False):
        return None
    return parse_to_df(candle)

👉 点击此处查看完整解析方法对比表,避免踩坑整整 3 行定时器错误


多币种、多周期并行监听类 —— CandleListener 设计

传统的“每条行情开一个进程”方案,在中高并发场景会面临高开销。为此我们封装单个 CandleListener 类,把 每类市场 + 周期 + 一组标的 视为一条消费通道:

class CandleListener:
    TRADE_TYPE_MAP = {
        'usdt_futures': get_usdt_futures_multi_socket,
        'coin_futures': get_coin_futures_multi_socket,
        'spot':         get_spot_multi_socket,
    }
    def __init__(self, *, type_, symbols, interval, queue):
        self.market       = type_
        self.symbols      = set(symbols)
        self.interval     = interval
        self.queue        = queue
        self.interval_td  = pd.to_timedelta(interval)
    ...

核心 start_listen 方法是无限循环,利用 ReconnectingWebsocket 在断网或 60 秒无数据时自动重连,保证 长期稳定运行


生产者-消费者架构高效写盘

为了把网络 I/O 与磁盘 I/O 分离,我们建立:

优势:

  1. Parquet 压缩比高,zstd 级别写入后单文件 9 KB 可存 数万条
  2. 小文件多线程写入,最终合并为持续追加模式,不阻塞数据采集。

示例写入函数:

def update_candle_data(df_new, symbol, interval, market):
    path = f'{market}_{symbol}_{interval}.pqt'
    df_existing = pd.read_parquet(path) if os.path.exists(path) else pd.DataFrame()
    df_all = pd.concat([df_existing, df_new]).drop_duplicates()
    df_all.sort_index().to_parquet(path, compression='zstd')

运行时序列:

2024-06-30 23:00:00  INFO  Record usdt_futures BTCUSDT-1m
2024-06-30 23:00:01  INFO  Record coin_futures ETHUSD_PERP-3m
...

最终目录如下:

usdt_futures_BTCUSDT_1m.pqt
spot_BNBUSDT_1m.pqt
coin_futures_ETHUSD_PERP_3m.pqt
...

常见问题 FAQ

Q1:断电或网络抖动,会不会导致数据缺失?
A:WebSocket 断线会在 5 秒内自动重连;如超过一分钟行业普遍容忍范围,建议额外开 REST API 补空洞 任务,此处逻辑日后会展开。

Q2:为何要用 Parquet 而不是 CSV?
A:Parquet 按列存储、自带压缩 (zstd) 和索引,磁盘与内存占用显著降低;对 Pandas 版本透明兼容。

Q3:想跑上百个币对,会不会造成 CPU 100 % ?
A:在单台 4 vCPU 机上测试,同时监听 100 条 1 分钟通道,CPU 约在 15 % 以内;可继续切多进程横向扩展。

Q4:如何确认 Binance WebSocket 时长限速?
A:连接建立后每 3 分钟收到 ping 心跳,Binance 要求必须在 10 秒返回 pongReconnectingWebsocket 已自动处理。

Q5:有没有办法本地一次性回溯所有历史?
A:可先用 Binance REST API 批量下载按需区段的 K 线历史数据 并写入 Parquet,然后接入 WebSocket 增量 更新即可。

Q6:如果只需 5 秒级别心跳监控,是否支持?
A:目前 WebSocket 仅放出公开 1m, 3m, 5m 等周期,5s 级需自己二次聚合。

👉 想亲手体验高阶撮合实例?这份完整代码仓库现已公开


下一步:扩展到实时策略引擎

完成数据采集后,可立即:

  1. PyArrow Dataset API 做实时特征抽取;
  2. 引入机器学习模型,按分钟级别特征滚动预测;
  3. Redis Stream / Kafka 把数据分发到策略服务集群。

祝高效抓取,数据永不丢!