扫码
40.67M · 2026-04-17
前面几章我们分析了 STDIO 和 Streamable HTTP 两种传输机制。本章聚焦于 MCP 协议中另外两种重要的传输方式:SSE(Server-Sent Events) 和 WebSocket。SSE 曾是 MCP 在网络环境下的主力传输方案,而 WebSocket 则作为 Python SDK 的独有实现提供了全双工通信能力。理解它们的设计思路和当前定位,对于在实际项目中做出正确的技术选型至关重要。
SSE 是一种基于 HTTP 的单向流式传输技术。浏览器(或客户端)通过一个长连接的 GET 请求接收服务端推送的事件流,而客户端向服务端发送消息则需要通过独立的 POST 请求完成。这种非对称的通信模型决定了 SSE 传输在 MCP 中的架构形态。
SSE 传输的核心设计是将一条逻辑上的双向通信链路拆分成两条物理通道:
sequenceDiagram
participant Client as MCP 客户端
participant Server as MCP 服务端
Note over Client,Server: 阶段一:建立 SSE 连接
Client->>Server: GET /sse (建立 SSE 长连接)
Server-->>Client: event: endpointndata: /messages?sessionId=xxx
Note over Client,Server: 阶段二:双向通信
Client->>Server: POST /messages?sessionId=xxxn{"jsonrpc":"2.0","method":"initialize",...}
Server-->>Client: event: messagendata: {"jsonrpc":"2.0","result":{...}}
Client->>Server: POST /messages?sessionId=xxxn{"jsonrpc":"2.0","method":"tools/list",...}
Server-->>Client: event: messagendata: {"jsonrpc":"2.0","result":{...}}
Note over Client,Server: 阶段三:关闭连接
Client->>Server: 关闭 EventSource
这个设计有几个值得注意的要点:
endpoint 事件告知客户端后续 POST 请求应发送到哪个 URL。sessionId 参数,将 SSE 连接与后续的 POST 请求关联到同一个会话。TypeScript SDK 中的 SSEClientTransport 类实现了 Transport 接口,完整展示了 SSE 传输的工作机制。
连接建立的核心逻辑在 _startOrAuth 方法中。该方法创建 EventSource 实例并两类事件:
// endpoint 事件,获取 POST 请求的目标 URL
this._eventSource.addEventListener('endpoint', (event: Event) => {
const messageEvent = event as MessageEvent;
this._endpoint = new URL(messageEvent.data, this._url);
if (this._endpoint.origin !== this._url.origin) {
throw new Error(`Endpoint origin does not match connection origin`);
}
resolve();
});
// message 事件,接收服务端推送的 JSON-RPC 消息
this._eventSource.onmessage = (event: Event) => {
const messageEvent = event as MessageEvent;
const message = JSONRPCMessageSchema.parse(JSON.parse(messageEvent.data));
this.onmessage?.(message);
};
发送消息时,send 方法通过 POST 请求将 JSON-RPC 消息发送到之前接收到的 endpoint URL:
async _send(message: JSONRPCMessage, isAuthRetry: boolean): Promise<void> {
if (!this._endpoint) {
throw new SdkError(SdkErrorCode.NotConnected, 'Not connected');
}
const headers = await this._commonHeaders();
headers.set('content-type', 'application/json');
const response = await (this._fetch ?? fetch)(this._endpoint, {
method: 'POST',
headers,
body: JSON.stringify(message),
signal: this._abortController?.signal
});
// ... 错误处理与认证重试
}
这里有一个精妙的设计:认证失败(401)时,_send 方法会调用 authProvider.onUnauthorized 尝试刷新凭证,然后以 isAuthRetry = true 递归调用自身进行重试,但只重试一次。同样的认证重试逻辑也存在于 SSE 连接建立阶段的 onerror 处理中。
Python SDK 的 sse_client 采用了异步上下文管理器模式,使用 httpx 和 httpx_sse 库实现 SSE 连接,并通过 anyio 的内存流实现消息的异步传递。
其核心架构由两个并发协程构成:
sse_reader:从 SSE 事件流中读取消息,解析后写入 read_streampost_writer:从 write_stream 中读取待发送的消息,通过 POST 请求发送到服务端async def sse_reader(task_status):
async for sse in event_source.aiter_sse():
match sse.event:
case "endpoint":
endpoint_url = urljoin(url, sse.data)
# 安全校验:验证 origin 一致性
task_status.started(endpoint_url)
case "message":
message = types.jsonrpc_message_adapter.validate_json(sse.data)
await read_stream_writer.send(SessionMessage(message))
async def post_writer(endpoint_url: str):
async for session_message in write_stream_reader:
response = await client.post(endpoint_url, json=session_message.message.model_dump(...))
response.raise_for_status()
值得注意的是,sse_reader 使用了 anyio 的 task_status 机制:只有当 endpoint 事件到达后,post_writer 才会启动。这确保了在发送任何消息之前,endpoint URL 已经就绪。Python SDK 还提供了 on_session_created 回调,允许调用者在会话建立时获取 sessionId,便于实现会话恢复等高级功能。
WebSocket 提供了真正的全双工通信能力,客户端和服务端可以在同一条连接上同时发送和接收消息。在 MCP 生态中,WebSocket 传输仅在 Python SDK 中实现,TypeScript SDK 并未提供对应支持。
与 SSE 的双通道设计不同,WebSocket 的架构更加简洁:
sequenceDiagram
participant Client as MCP 客户端
participant Server as MCP 服务端
Note over Client,Server: WebSocket 握手
Client->>Server: GET /ws (Upgrade: websocket)nSec-WebSocket-Protocol: mcp
Server-->>Client: 101 Switching ProtocolsnSec-WebSocket-Protocol: mcp
Note over Client,Server: 全双工通信(同一连接)
Client->>Server: {"jsonrpc":"2.0","method":"initialize",...}
Server-->>Client: {"jsonrpc":"2.0","result":{...}}
par 并行消息
Server-->>Client: {"jsonrpc":"2.0","method":"notifications/progress",...}
and
Client->>Server: {"jsonrpc":"2.0","method":"tools/call",...}
end
Server-->>Client: {"jsonrpc":"2.0","result":{...}}
Note over Client,Server: 关闭连接
Client->>Server: WebSocket Close Frame
Server-->>Client: WebSocket Close Ack
WebSocket 方案有两个显著优势:无需 endpoint 协商(连接建立后即可直接通信)和 真正的全双工(双方可以在任意时刻发送消息,无需等待对方响应)。
Python SDK 的 websocket_client 同样采用异步上下文管理器模式,代码简洁而完整:
@asynccontextmanager
async def websocket_client(url: str):
async with ws_connect(url, subprotocols=[Subprotocol("mcp")]) as ws:
# 创建内存流用于消息传递
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
async def ws_reader():
async for raw_text in ws:
message = types.jsonrpc_message_adapter.validate_json(raw_text)
await read_stream_writer.send(SessionMessage(message))
async def ws_writer():
async for session_message in write_stream_reader:
msg_dict = session_message.message.model_dump(
by_alias=True, mode="json", exclude_unset=True
)
await ws.send(json.dumps(msg_dict))
async with anyio.create_task_group() as tg:
tg.start_soon(ws_reader)
tg.start_soon(ws_writer)
yield (read_stream, write_stream)
tg.cancel_scope.cancel()
这段代码展现了几个关键设计决策:
mcp 子协议:通过 subprotocols=[Subprotocol("mcp")] 声明使用 MCP 子协议,服务端据此识别这是 MCP 通信而非普通 WebSocket 连接。anyio.create_memory_object_stream(0) 创建的是无缓冲流,这意味着发送方会阻塞直到接收方准备好。这种背压机制防止了消息在内存中无限堆积。(read_stream, write_stream) 元组,上层的 Session 和 Client 类无需关心底层使用的是哪种传输协议。async with 块时,tg.cancel_scope.cancel() 会取消所有子任务,WebSocket 连接随之关闭。与 SSE 实现对比,WebSocket 版本的代码量明显更少,不需要 endpoint 协商、不需要维护两个独立的 HTTP 通道、也不需要处理 SSE 事件类型的分发逻辑。
MCP 协议目前支持四种传输方式,各有其适用场景。选型时需要综合考虑部署环境、性能需求和生态兼容性。
| 维度 | STDIO | Streamable HTTP | SSE (已废弃) | WebSocket |
|---|---|---|---|---|
| 通信模型 | 进程管道 | HTTP 请求/响应 + 可选 SSE 流 | GET 流 + POST 请求 | 全双工 |
| 部署模型 | 本地进程 | 远程 HTTP 服务 | 远程 HTTP 服务 | 远程 WebSocket 服务 |
| SDK 支持 | TS + Python | TS + Python | TS + Python | 仅 Python |
| 是否需要服务端 | 否(子进程) | 是 | 是 | 是 |
| 认证支持 | 无(依赖 OS) | OAuth 2.0 | OAuth 2.0 | 无内建支持 |
| 协议状态 | 稳定 | 当前推荐 | 已废弃 | 实验性 |
| 防火墙友好 | N/A | 非常友好 | 友好 | 一般 |
在实际项目中,可以按照以下决策路径选择传输方式:
TypeScript SDK 中的 SSEClientTransport 类已被标记为 @deprecated,官方建议迁移到 StreamableHTTPClientTransport。废弃的核心原因在于 SSE 传输的架构缺陷:
sampling/createMessage)。Streamable HTTP 通过将所有通信统一到标准 HTTP 请求/响应模型(并可选地在响应中使用 SSE 流式传输),解决了上述所有问题。
尽管 SSE 已被废弃,但由于大量现存服务端仍在使用该传输方式,客户端需要在过渡期间支持两种协议。TypeScript SDK 官方示例提供了一个清晰的降级策略:
async function connectWithBackwardsCompatibility(url: string) {
const client = new Client({ name: 'my-client', version: '1.0.0' });
try {
// 优先尝试 Streamable HTTP
const transport = new StreamableHTTPClientTransport(new URL(url));
await client.connect(transport);
return { client, transport, type: 'streamable-http' };
} catch (error) {
// 降级到 SSE
const sseTransport = new SSEClientTransport(new URL(url));
const sseClient = new Client({ name: 'my-client', version: '1.0.0' });
await sseClient.connect(sseTransport);
return { client: sseClient, transport: sseTransport, type: 'sse' };
}
}
这个模式的关键在于:先尝试现代协议,如果服务端不支持(返回 4xx 错误),则回退到旧协议。这使得客户端可以无缝适配不同版本的服务端。
对于正在维护 MCP 服务端的开发者,建议按以下步骤完成迁移:
本章深入分析了 MCP 协议中 SSE 和 WebSocket 两种传输方式的设计与实现。SSE 通过 GET 长连接接收服务端事件、POST 请求发送客户端消息的双通道模型,在 MCP 早期提供了可靠的网络传输能力。WebSocket 作为 Python SDK 的独有实现,以其全双工特性和简洁的代码架构展现了另一种设计思路。
然而,随着 Streamable HTTP 的推出,SSE 传输已被标记为废弃。Streamable HTTP 统一了通信模型,消除了双通道的复杂性,并提供了更好的会话管理和连接恢复能力。理解这一演进脉络,不仅有助于正确维护现有的 MCP 集成,也为未来的协议扩展和技术选型提供了坚实的认知基础。
在下一章中,我们将探讨 MCP 的安全模型与认证机制,了解 OAuth 2.0 在 MCP 协议中的集成方式。