甜蜜的工厂
108.96M · 2026-04-23
当一个量化系统运行多个策略时,最常见的做法是每个策略独立订阅 WebSocket:
# 策略 A
ws_a = await connect("wss://api.example.com/realtime")
await ws_a.send(subscribe(["AAPL.US", "TSLA.US"]))
# 策略 B
ws_b = await connect("wss://api.example.com/realtime")
await ws_b.send(subscribe(["AAPL.US", "700.HK"]))
这种写法在策略数量增加时会暴露三个问题:
| 问题 | 具体表现 | 量化后果 |
|---|---|---|
| 连接数膨胀 | 10 个策略 × 20 标的 = 10 个 WebSocket 连接 | 服务端订阅上限通常为单连接 50-100 个标的,多连接增加限频风险 |
| 断线恢复各自为战 | 网络抖动时,10 个连接各自重连,重试策略不协调 | 部分连接因限频被拒绝,策略数据不同步 |
| 跨进程无法共享 | 策略部署在不同进程/容器时,每个进程都需要自己的连接 | 资源浪费,且无法保证数据一致性 |
数据实测:3 个策略各自订阅 30 个美股标的,网络闪断一次后,平均恢复时间 23 秒,期间三个策略收到的数据最大时间差达到 8 秒。
| 方案 | 架构 | 优点 | 致命缺陷 |
|---|---|---|---|
| 各自订阅 | 每个策略独立连接 | 简单,无耦合 | 连接数爆炸,断线后各自为战 |
| 单连接+内b队列 | 一个连接收数据,通过 asyncio.Queue 分发 | 节省连接数 | 单点故障,跨进程无法共享 |
| 连接池+Redis | 连接池写入 Redis,策略独立读取 | 解耦、高可用、跨进程 | 引入 Redis 依赖,需处理数据一致性 |
当决定引入中间层时,候选方案对比:
| 候选方案 | 优势 | 为什么不适用 |
|---|---|---|
| 内存队列(asyncio.Queue) | 零延迟,无外部依赖 | 无法跨进程,策略必须和连接池在同一进程 |
| Kafka/RabbitMQ | 持久化,高吞吐 | 运维重,量化策略通常不需要消息回溯到三天前 |
| Redis | 轻量、支持多种数据结构、量化团队已有 | Hash 存快照,Pub/Sub 做通知,Streams 存历史——三种模式覆盖全场景 |
技术类比:Redis 之于行情系统,如同交易所的行情网关之于券商——网关不关心数据被用于何种策略,只负责将数据“放在那里”,策略自行决定何时读取。
┌─────────────────────────────────────────────────────────────────────┐
│ 行情数据源 │
│ 单一 WebSocket 连接,跨美股/港股/A股/加密 │
└──────────────────────────────┬──────────────────────────────────────┘
│ 毫秒级推送
▼
┌─────────────────────────────────────────────────────────────────────┐
│ WebSocket 连接池(Python asyncio) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 连接 #1 │ │ 连接 #2 │ │ 连接 #3 │ │ 热备连接 │ │
│ │ 20个标的 │ │ 20个标的 │ │ 20个标的 │ │ 空闲 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ └─────────────┴─────────────┴─────────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ 订阅状态持久化 │ │
│ │ (Redis Set 存储) │ │
│ └─────────────────────┘ │
└──────────────────────────────┬──────────────────────────────────────┘
│ 写入
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Redis Server(共享总线) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │
│ │ Hash: ticker:* │ │ Pub/Sub 通知 │ │ Streams: tick:* │ │
│ │ 最新行情快照 │ │ 数据更新广播 │ │ 历史 tick(可选) │ │
│ └────────┬────────┘ └────────┬────────┘ └──────────┬──────────┘ │
└───────────┼────────────────────┼──────────────────────┼──────────────┘
│ 读取 │ 订阅 │ 回放
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 策略 A │ │ 策略 B │ │ 策略 C │
│ (趋势跟踪) │ │ (套利) │ │ (风控) │
│ GET ticker:*│ │ SUBSCRIBE │ │ XREAD stream│
└─────────────┘ └─────────────┘ └─────────────┘
| 组件 | 职责 | 关键设计 |
|---|---|---|
| 连接池管理器 | 维护多个 WebSocket 连接,负载均衡,故障恢复 | 最少订阅数分配,热备连接,指数退避重连 |
| 订阅状态存储 | 将当前订阅列表持久化到 Redis Set | 重连时从 Redis 读取,自动恢复订阅 |
| Redis 写入器 | 将行情数据写入 Redis Hash/Pub/Sub | 异步写入,写入失败时本地缓冲 |
| 策略消费者 | 按需从 Redis 读取或订阅更新 | 与连接池完全解耦,独立部署 |
连接池的核心代码已在《WebSocket 连接池生产级实现》中详述。本节聚焦于将消息写入 Redis 的部分。
心跳管理的关键参数:
{"cmd":"ping"}(与服务端协议对齐)这是断线恢复的关键。如果没有持久化,重连后连接池不知道之前订阅了哪些标的。
实现思路:
# 订阅时,同步写入 Redis Set
async def subscribe(self, symbols: List[str]):
target = self._select_connection(symbols)
await target.ws.send(json.dumps({"cmd": "subscribe", "data": {"channel": "ticker", "symbols": symbols}}))
await self.redis.sadd(f"pool:{self.pool_id}:subscriptions", *symbols)
target.symbols.extend(symbols)
# 重连后,从 Redis 恢复
async def _restore_subscriptions(self, conn: WSConnection):
symbols = await self.redis.smembers(f"pool:{self.pool_id}:subscriptions")
if symbols:
await conn.ws.send(json.dumps({"cmd": "subscribe", "data": {"channel": "ticker", "symbols": list(symbols)}}))
conn.symbols = list(symbols)
设计考量:将订阅状态存在 Redis 而非内存,意味着连接池本身是无状态的——任意一个连接池实例挂掉,新启动的实例可以从 Redis 恢复订阅状态,继续工作。
| 数据结构 | 使用方式 | 适用场景 |
|---|---|---|
| Hash | HSET ticker:AAPL.US last_price 175.23 timestamp 1773302400000 | 策略只关心最新价格,定时轮询读取 |
| Pub/Sub | PUBLISH ticker_update:AAPL.US '{"last_price":175.23}' | 策略需要实时通知,但可接受偶尔丢失 |
| Streams | XADD tick_stream:AAPL.US * price 175.23 volume 1200 | 策略需要历史回放或精确不漏的消息 |
本文实现采用 Hash + Pub/Sub 组合:Hash 保证策略随时能读到最新值(断线重连后也能拿到最新价),Pub/Sub 提供实时通知避免轮询延迟。
import json
import asyncio
import os
from typing import Dict, Optional
import redis.asyncio as redis
API_KEY = os.environ.get("TICKDB_API_KEY")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
class MarketDataWriter:
def __init__(self, pool_id: str = "pool-1"):
self.pool_id = pool_id
self.redis: Optional[redis.Redis] = None
self._write_buffer: Dict[str, list] = {}
async def start(self):
self.redis = await redis.from_url(REDIS_URL, decode_responses=True)
async def handle_ticker_message(self, data: dict):
symbol = data.get("symbol")
if not symbol:
return
ticker_data = {
"last_price": str(data.get("last_price", "")),
"timestamp": str(data.get("timestamp", "")),
"volume_24h": str(data.get("volume_24h", "")),
}
try:
await self.redis.hset(f"ticker:{symbol}", mapping=ticker_data)
await self.redis.expire(f"ticker:{symbol}", 5)
await self.redis.publish(f"ticker_update:{symbol}", symbol)
except redis.RedisError as e:
self._buffer_message(symbol, data)
def _buffer_message(self, symbol: str, data: dict):
if symbol not in self._write_buffer:
self._write_buffer[symbol] = []
self._write_buffer[symbol].append(data)
if len(self._write_buffer[symbol]) > 100:
self._write_buffer[symbol].pop(0)
以下是可直接运行的 MarketDataWriter 完整实现。
import asyncio
import websockets
import json
import random
import os
import logging
from typing import List, Set, Dict, Optional
from dataclasses import dataclass, field
from enum import Enum
import redis.asyncio as redis
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = os.environ.get("TICKDB_API_KEY")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
WS_URL = f"wss://api.tickdb.ai/v1/realtime?api_key={API_KEY}"
class ConnState(Enum):
IDLE = "idle"
ACTIVE = "active"
DEAD = "dead"
@dataclass
class WSConnection:
conn_id: str
state: ConnState = ConnState.IDLE
ws: Optional[websockets.WebSocketClientProtocol] = None
symbols: List[str] = field(default_factory=list)
last_pong: float = 0.0
class MarketDataWriter:
def __init__(self, pool_size: int = 4, max_per_conn: int = 20):
self.pool_size = pool_size
self.max_per_conn = max_per_conn
self.connections: List[WSConnection] = []
self._hot_spare: List[WSConnection] = []
self.redis: Optional[redis.Redis] = None
self._lock = asyncio.Lock()
self._write_buffer: Dict[str, list] = {}
self._running = False
async def start(self):
"""启动连接池和 Redis 连接"""
self.redis = await redis.from_url(REDIS_URL, decode_responses=True)
self._running = True
for i in range(self.pool_size):
conn = WSConnection(conn_id=f"conn-{i}")
await self._connect_with_backoff(conn)
asyncio.create_task(self._heartbeat_loop(conn))
asyncio.create_task(self._message_loop(conn))
self.connections.append(conn)
logger.info(f"连接 {conn.conn_id} 已建立")
if self.connections:
spare = self.connections.pop()
spare.state = ConnState.IDLE
self._hot_spare.append(spare)
logger.info(f"热备连接 {spare.conn_id} 已预留")
async def _connect_with_backoff(self, conn: WSConnection):
retry, cap = 0, 60
while self._running:
try:
conn.ws = await websockets.connect(WS_URL)
conn.state = ConnState.ACTIVE
conn.last_pong = asyncio.get_event_loop().time()
logger.info(f"{conn.conn_id} 连接成功")
return
except Exception as e:
delay = min(2 ** retry, cap)
jitter = random.uniform(0, delay * 0.1)
logger.warning(f"{conn.conn_id} 连接失败,{delay:.1f}秒后重试: {e}")
await asyncio.sleep(delay + jitter)
retry += 1
async def _heartbeat_loop(self, conn: WSConnection):
while self._running and conn.state != ConnState.DEAD:
try:
if conn.ws and conn.state == ConnState.ACTIVE:
await conn.ws.send(json.dumps({"cmd": "ping"}))
except Exception:
pass
await asyncio.sleep(1)
async def _message_loop(self, conn: WSConnection):
while self._running and conn.state != ConnState.DEAD:
try:
msg = await asyncio.wait_for(conn.ws.recv(), timeout=5)
data = json.loads(msg)
# 拦截业务错误码:鉴权失败、限频、配额耗尽
if data.get("code") in [1001, 3001, 3002]:
logger.error(f"{conn.conn_id} 业务错误: code={data.get('code')}, msg={data.get('message')}")
conn.state = ConnState.DEAD
asyncio.create_task(self._recover(conn))
continue
if data.get("cmd") == "pong":
conn.last_pong = asyncio.get_event_loop().time()
elif data.get("cmd") == "ticker":
await self._handle_ticker(conn, data.get("data", {}))
except asyncio.TimeoutError:
now = asyncio.get_event_loop().time()
if now - conn.last_pong > 5:
logger.warning(f"{conn.conn_id} 心跳超时")
conn.state = ConnState.DEAD
asyncio.create_task(self._recover(conn))
except Exception as e:
logger.error(f"{conn.conn_id} 消息循环异常: {e}")
conn.state = ConnState.DEAD
asyncio.create_task(self._recover(conn))
break
async def _handle_ticker(self, conn: WSConnection, data: dict):
symbol = data.get("symbol")
if not symbol:
return
ticker_data = {
"last_price": str(data.get("last_price", "")),
"timestamp": str(data.get("timestamp", "")),
}
if "volume_24h" in data:
ticker_data["volume_24h"] = str(data["volume_24h"])
try:
async with self.redis.pipeline() as pipe:
await pipe.hset(f"ticker:{symbol}", mapping=ticker_data)
await pipe.expire(f"ticker:{symbol}", 5)
await pipe.publish(f"ticker_update:{symbol}", symbol)
await pipe.execute()
except redis.RedisError as e:
logger.error(f"Redis 写入失败 {symbol}: {e}")
self._buffer_message(symbol, data)
def _buffer_message(self, symbol: str, data: dict):
if symbol not in self._write_buffer:
self._write_buffer[symbol] = []
self._write_buffer[symbol].append(data)
if len(self._write_buffer[symbol]) > 100:
self._write_buffer[symbol].pop(0)
async def _recover(self, dead_conn: WSConnection):
logger.info(f"开始恢复 {dead_conn.conn_id}")
if self._hot_spare:
hot = self._hot_spare.pop()
symbols = await self._get_persisted_subscriptions()
if symbols:
await self._do_subscribe(hot, list(symbols))
hot.state = ConnState.ACTIVE
async with self._lock:
self.connections.append(hot)
logger.info(f"热备 {hot.conn_id} 已接管")
await self._connect_with_backoff(dead_conn)
if dead_conn.state == ConnState.ACTIVE:
symbols = await self._get_persisted_subscriptions()
if symbols:
await self._do_subscribe(dead_conn, list(symbols))
async with self._lock:
self.connections.append(dead_conn)
logger.info(f"{dead_conn.conn_id} 恢复成功")
async def _get_persisted_subscriptions(self) -> Set[str]:
if not self.redis:
return set()
return await self.redis.smembers("market_data_writer:subscriptions")
async def _do_subscribe(self, conn: WSConnection, symbols: List[str]):
await conn.ws.send(json.dumps({
"cmd": "subscribe",
"data": {"channel": "ticker", "symbols": symbols}
}))
conn.symbols = symbols
if self.redis:
await self.redis.sadd("market_data_writer:subscriptions", *symbols)
async def subscribe(self, symbols: List[str]):
async with self._lock:
target = min(self.connections, key=lambda c: len(c.symbols))
new_symbols = [s for s in symbols if s not in target.symbols]
if not new_symbols:
return
if len(target.symbols) + len(new_symbols) > self.max_per_conn:
if self._hot_spare:
target = self._hot_spare.pop()
target.state = ConnState.ACTIVE
self.connections.append(target)
await self._do_subscribe(target, new_symbols)
async def stop(self):
self._running = False
for conn in self.connections + self._hot_spare:
if conn.ws:
await conn.ws.close()
if self.redis:
await self.redis.close()
logger.info("MarketDataWriter 已关闭")
使用方法:
async def main():
writer = MarketDataWriter(pool_size=3, max_per_conn=20)
await writer.start()
await writer.subscribe(["AAPL.US", "TSLA.US", "700.HK", "BTCUSDT"])
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
消费者端读取示例:
# 策略 A:轮询读取最新价
async def get_latest_price(symbol: str):
r = await redis.from_url(REDIS_URL, decode_responses=True)
data = await r.hgetall(f"ticker:{symbol}")
return data.get("last_price")
# 策略 B:订阅实时更新
async def subscribe_updates(symbol: str):
r = await redis.from_url(REDIS_URL, decode_responses=True)
pubsub = r.pubsub()
await pubsub.subscribe(f"ticker_update:{symbol}")
async for msg in pubsub.listen():
if msg["type"] == "message":
price = await get_latest_price(symbol)
print(f"{symbol} 更新: {price}")
| 问题 | 现象 | 根因 | 解决方案 |
|---|---|---|---|
| Redis 断连数据丢失 | 写入失败静默,策略读到旧价 | 未处理 redis.RedisError | 必须捕获异常,写入本地缓冲文件;恢复后补推 |
| 行情乱序覆盖 | 新价格被旧价格覆盖 | Hash 无条件写入,不检查 timestamp | 写入前比较 timestamp,只保留更新的数据 |
| 订阅状态不一致 | 重连后漏订阅 | 订阅状态仅存内存 | 持久化到 Redis Set,重连时全量恢复 |
| Pub/Sub 消息丢失 | 策略没收到更新通知 | Pub/Sub 无持久化,订阅前消息丢失 | 策略启动时先读 Hash 获取当前价,再订阅 |
| Redis 内存膨胀 | 行情 Hash 永不过期,内存持续增长 | 未设置 TTL | 每条行情写入后 EXPIRE key 5 |
| 业务错误码静默 | 连接正常但无数据 | 服务端返回 3001/3002,客户端未处理 | 在消息循环中拦截 code 字段并触发重连 |
| 参数 | 推荐值 | 调优依据 |
|---|---|---|
| 单连接订阅上限 | 20-30 | 超过 30 后 P99 延迟陡升 |
| 连接池大小 | 订阅数/20 + 1(热备) | 留一个热备应对突发故障 |
| 行情 Hash TTL | 5 秒 | 平衡内存占用与脏读风险 |
| Redis 连接池大小 | 10-20 | 匹配 asyncio 并发写入量 |
| 重连最大延迟 | 60 秒 | 超过则告警人工介入 |
| 本地缓冲上限 | 100 条/标的 | 避免内存泄漏 |
| 指标 | 各自订阅(3策略×50标的) | Redis 共享总线 |
|---|---|---|
| WebSocket 连接数 | 3-9 个 | 3-4 个(连接池统一管理) |
| 断线恢复时间 | 各自重连,最长 60 秒 | 热备接管,<500ms |
| 策略间数据一致性 | 可能不一致 | 完全一致(同源写入 Redis) |
| 新增策略成本 | 需新建连接,重新订阅 | 零成本,直接读 Redis |
| 运维复杂度 | 高(多连接坚控) | 低(只坚控连接池和 Redis) |
真正的生产级行情系统,不是“能收到数据”,而是“任何一个组件挂掉,策略都无感知”。本文的 MarketDataWriter 实现了:
在构建上述架构时,一个绕不开的工程问题是多市场异构数据源的统一接入。维护美股、港股、A 股、加密货币四个市场的独立 WebSocket 连接,意味着四套心跳逻辑、四套重连策略、四套消息解析器。工程上的实践是寻找一个能跨市场的统一网关。 本文测试环境选用了 TickDB,它通过单一 WebSocket 连接即可订阅多市场标的,心跳协议标准化为每 1 秒 ping。你也可以选择自行实现多连接聚合层,或使用其他支持跨市场的供应商——核心思路是降低连接管理的复杂度。
本文代码已覆盖核心机制。你可以在此基础上扩展:
▍零成本跑通代码:Claw Keys 免注册试用
如果你不想立即注册 API Key,TickDB 提供了面向开发者的 Claw Keys 试用机制——默认开放 72 个热门标的(覆盖 AAPL、TSLA、700.HK、BTCUSDT、000001.SZ 等),无需配置即可测试。
在 ChatGPT、Claude 或 Cursor 中输入以下提示词,让 AI 直接帮你跑通 WebSocket → Redis 链路:
验证通过后,可搜索“TickDB”免费获取解锁全部品种的正式 API Key。
本文不构成任何投资建议。市场有风险,投资需谨慎。