关键词:Binance WebSocket、Python 异步编程、K 线抓取、Pandas、Parquet 存储、实时行情、REST API、Python-binance 依赖重连机制、数据持久化
为什么选择 WebSocket 而非 REST API?
Binance 提供两条获取 K 线数据的路径:REST API 与 WebSocket。其中:
- REST API 依赖“拉取”,容易产生 1~3 秒的时延,且限速受限。
- WebSocket 采用“推送”机制,延迟低至毫秒级,官方文档也将其列为获取实时行情的首选方案。
借助 Python 的 asyncio 与 ReconnectingWebsocket,我们既能保持连接稳定性,又能把接收到的 Binance K 线数据 秒级写盘,供后续回测或策略研究。
快速组建 WebSocket 连接
我们先为三大交易区(现货、USDT 永续、币本位永续)封装统一函数,免去每次重复写拼接地址的麻烦。
from ws_basics import ReconnectingWebsocket
SPOT_STREAM_URL = 'wss://stream.binance.com:9443/'
USDT_FUTURES_URL = 'wss://fstream.binance.com/'
COIN_FUTURES_URL = 'wss://dstream.binance.com/'
def get_socket(symbols, interval, market='spot'):
"""
生成任意外部市场对应的 WebSocket 对象
"""
base = dict(
spot = (SPOT_STREAM_URL, 'stream.binance.com'),
usdt_futures=(USDT_FUTURES_URL, 'fstream.binance.com'),
coin_futures=(COIN_FUTURES_URL, 'dstream.binance.com'),
)[market]
channels = [f'{s.lower()}@kline_{interval}' for s in symbols]
return ReconnectingWebsocket(
path='/'.join(channels),
url=base[0],
prefix='stream?streams='
)运行此函数,等价于只用一个简洁接口即可订阅多标的、多周期的行情。
解码和校验 K 线原始数据
Binance kline 字段包含十余项内容,但我们只关心最终 十列字段:
| WebSocket 字段 (k.*) | 语义 |
|---|---|
| t | 开盘时间 (ms) |
| o, h, l, c | 开高低收 |
| v | 成交量 (基础币) |
| q | 成交额 (计价币) |
| n | 成交笔数 |
| V | 主动买入成交量 |
| Q | 主动买入成交额 |
| x | K 线是否收盘 (布尔) |
我们使用 防御式编程,只要满足以下三项就入库:
- 字段存在;
- 事件类型是
kline; - 收到已收盘的那一条记录(
x=True)。
def handle_candle_data(res):
if 'data' not in res:
return None
data = res['data']
if data.get('e') != 'kline' or 'k' not in data:
return None
candle = data['k']
if not candle.get('x', False):
return None
return parse_to_df(candle)👉 点击此处查看完整解析方法对比表,避免踩坑整整 3 行定时器错误
多币种、多周期并行监听类 —— CandleListener 设计
传统的“每条行情开一个进程”方案,在中高并发场景会面临高开销。为此我们封装单个 CandleListener 类,把 每类市场 + 周期 + 一组标的 视为一条消费通道:
class CandleListener:
TRADE_TYPE_MAP = {
'usdt_futures': get_usdt_futures_multi_socket,
'coin_futures': get_coin_futures_multi_socket,
'spot': get_spot_multi_socket,
}
def __init__(self, *, type_, symbols, interval, queue):
self.market = type_
self.symbols = set(symbols)
self.interval = interval
self.queue = queue
self.interval_td = pd.to_timedelta(interval)
...核心 start_listen 方法是无限循环,利用 ReconnectingWebsocket 在断网或 60 秒无数据时自动重连,保证 长期稳定运行。
生产者-消费者架构高效写盘
为了把网络 I/O 与磁盘 I/O 分离,我们建立:
- 3 个生产者:现货 1 分钟、USDT 永续 1 分钟、币本位 3 分钟;
- 1 个消费者:读取
asyncio.Queue,增量写入 Parquet。
优势:
- Parquet 压缩比高,
zstd级别写入后单文件 9 KB 可存 数万条。 - 小文件多线程写入,最终合并为持续追加模式,不阻塞数据采集。
示例写入函数:
def update_candle_data(df_new, symbol, interval, market):
path = f'{market}_{symbol}_{interval}.pqt'
df_existing = pd.read_parquet(path) if os.path.exists(path) else pd.DataFrame()
df_all = pd.concat([df_existing, df_new]).drop_duplicates()
df_all.sort_index().to_parquet(path, compression='zstd')运行时序列:
2024-06-30 23:00:00 INFO Record usdt_futures BTCUSDT-1m
2024-06-30 23:00:01 INFO Record coin_futures ETHUSD_PERP-3m
...最终目录如下:
usdt_futures_BTCUSDT_1m.pqt
spot_BNBUSDT_1m.pqt
coin_futures_ETHUSD_PERP_3m.pqt
...常见问题 FAQ
Q1:断电或网络抖动,会不会导致数据缺失?
A:WebSocket 断线会在 5 秒内自动重连;如超过一分钟行业普遍容忍范围,建议额外开 REST API 补空洞 任务,此处逻辑日后会展开。
Q2:为何要用 Parquet 而不是 CSV?
A:Parquet 按列存储、自带压缩 (zstd) 和索引,磁盘与内存占用显著降低;对 Pandas 版本透明兼容。
Q3:想跑上百个币对,会不会造成 CPU 100 % ?
A:在单台 4 vCPU 机上测试,同时监听 100 条 1 分钟通道,CPU 约在 15 % 以内;可继续切多进程横向扩展。
Q4:如何确认 Binance WebSocket 时长限速?
A:连接建立后每 3 分钟收到 ping 心跳,Binance 要求必须在 10 秒返回 pong;ReconnectingWebsocket 已自动处理。
Q5:有没有办法本地一次性回溯所有历史?
A:可先用 Binance REST API 批量下载按需区段的 K 线历史数据 并写入 Parquet,然后接入 WebSocket 增量 更新即可。
Q6:如果只需 5 秒级别心跳监控,是否支持?
A:目前 WebSocket 仅放出公开 1m, 3m, 5m 等周期,5s 级需自己二次聚合。
下一步:扩展到实时策略引擎
完成数据采集后,可立即:
- 用 PyArrow Dataset API 做实时特征抽取;
- 引入机器学习模型,按分钟级别特征滚动预测;
- 用 Redis Stream / Kafka 把数据分发到策略服务集群。
祝高效抓取,数据永不丢!