扫码
40.67M · 2026-04-17
前面几章我们分析了 TypeScript SDK 的 Client 实现,对 MCP 客户端的职责有了清晰认知。本章我们将目光转向 Python SDK 的客户端实现。Python SDK 在设计哲学上与 TypeScript SDK 保持一致——都围绕"会话"这一核心抽象展开,但在具体实现上却因语言特性而呈现出显著差异。Python SDK 选择了 anyio 作为异步运行时抽象层,用 Pydantic 取代 Zod 进行类型校验,用 async with 上下文管理器替代手动的生命周期管理。这些选择不是偶然的,它们深刻影响了客户端的 API 形态和内部架构。
Python MCP Client 的源码位于 src/mcp/client/ 目录下,核心文件包括:
session.py — ClientSession 类,单服务器会话的核心抽象session_group.py — ClientSessionGroup 类,多服务器聚合管理stdio.py — stdio 传输层实现sse.py — SSE 传输层实现streamable_http.py — Streamable HTTP 传输层实现auth/oauth2.py — OAuth2 客户端认证下面这张架构图展示了各模块之间的关系:
graph TB
subgraph 应用层
APP[应用代码]
end
subgraph 会话层
CSG[ClientSessionGroup<br/>多服务器聚合]
CS[ClientSession<br/>单服务器会话]
end
subgraph 基础设施层
BS[BaseSession<br/>请求/响应路由]
end
subgraph 传输层
STDIO[stdio_client<br/>子进程通信]
SSE[sse_client<br/>SSE 长连接]
SH[streamable_http_client<br/>HTTP 流式]
end
subgraph 认证层
OAUTH[OAuth2Auth<br/>PKCE + 自动刷新]
end
APP --> CSG
APP --> CS
CSG -->|管理多个| CS
CS -->|继承| BS
CS --> STDIO
CS --> SSE
CS --> SH
SH -.->|可选| OAUTH
SSE -.->|可选| OAUTH
ClientSession 是 Python 客户端的核心类,它继承自 BaseSession,负责与单个 MCP Server 的全部通信。我们先看其类型签名:
class ClientSession(
BaseSession[
types.ClientRequest, # 发送的请求类型
types.ClientNotification, # 发送的通知类型
types.ClientResult, # 发送的结果类型
types.ServerRequest, # 接收的请求类型
types.ServerNotification, # 接收的通知类型
]
):
这个五元泛型参数看起来复杂,但逻辑清晰:前三个约束"我发出去的消息",后两个约束"我收到的消息"。BaseSession 利用这些泛型参数在编译期保证消息类型的正确性。
ClientSession 的构造函数接受读写流和一系列回调函数:
def __init__(
self,
read_stream: ReadStream[SessionMessage | Exception],
write_stream: WriteStream[SessionMessage],
read_timeout_seconds: float | None = None,
sampling_callback: SamplingFnT | None = None,
elicitation_callback: ElicitationFnT | None = None,
list_roots_callback: ListRootsFnT | None = None,
logging_callback: LoggingFnT | None = None,
message_handler: MessageHandlerFnT | None = None,
client_info: types.Implementation | None = None,
):
这些回调函数的设计值得关注。Python SDK 使用 Protocol 类来定义回调类型,这是 Python 式的结构化子类型(structural subtyping),等价于 TypeScript 的接口但更灵活:
class SamplingFnT(Protocol):
async def __call__(
self,
context: RequestContext[ClientSession],
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult | types.ErrorData: ...
每个回调都有默认实现。例如,当服务器发起 sampling 请求但客户端未注册回调时,默认返回错误:
async def _default_sampling_callback(
context: RequestContext[ClientSession],
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult | types.ErrorData:
return types.ErrorData(
code=types.INVALID_REQUEST,
message="Sampling not supported",
)
初始化握手通过 initialize() 方法完成。该方法会根据注册的回调函数自动推断客户端能力——如果注册了 sampling_callback,就声明支持 sampling 能力;如果注册了 list_roots_callback,就声明支持 roots 能力。这种"注册即声明"的设计避免了能力声明与实际实现不一致的问题。
ClientSession 为 MCP 协议的每种操作提供了对应的异步方法:
| 方法 | 协议操作 | 返回类型 |
|---|---|---|
list_tools() | tools/list | ListToolsResult |
call_tool() | tools/call | CallToolResult |
list_resources() | resources/list | ListResourcesResult |
read_resource() | resources/read | ReadResourceResult |
list_prompts() | prompts/list | ListPromptsResult |
get_prompt() | prompts/get | GetPromptResult |
complete() | completion/complete | CompleteResult |
send_ping() | ping | EmptyResult |
以 call_tool 为例,它展示了 Python 客户端的一个独特特性——结构化内容校验:
async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
read_timeout_seconds: float | None = None,
progress_callback: ProgressFnT | None = None,
) -> types.CallToolResult:
result = await self.send_request(
types.CallToolRequest(
params=types.CallToolRequestParams(name=name, arguments=arguments),
),
types.CallToolResult,
request_read_timeout_seconds=read_timeout_seconds,
progress_callback=progress_callback,
)
if not result.is_error:
await self._validate_tool_result(name, result)
return result
_validate_tool_result 会根据工具的 output_schema 对返回的 structured_content 进行 JSON Schema 校验。工具的 output schema 在 list_tools() 调用时被缓存到 _tool_output_schemas 字典中,后续的 call_tool 会自动利用这个缓存。如果缓存中没有该工具的 schema(比如工具是后来动态添加的),会自动触发一次 list_tools() 刷新缓存。
MCP 是双向协议,服务器也会向客户端发起请求。_received_request 方法通过 Python 的 match/case 模式匹配来分发这些请求:
async def _received_request(self, responder):
ctx = RequestContext[ClientSession](
request_id=responder.request_id,
meta=responder.request_meta,
session=self,
)
match responder.request:
case types.CreateMessageRequest(params=params):
with responder:
response = await self._sampling_callback(ctx, params)
await responder.respond(response)
case types.ElicitRequest(params=params):
with responder:
response = await self._elicitation_callback(ctx, params)
await responder.respond(response)
case types.ListRootsRequest():
with responder:
response = await self._list_roots_callback(ctx)
await responder.respond(response)
注意 with responder 上下文管理器的使用。RequestResponder 实现了 __enter__/__exit__,在进入时设置取消作用域(CancelScope),在退出时通知会话该请求已处理完毕。这确保了即使回调函数抛出异常,请求的生命周期也能被正确管理。
真实的 AI Agent 应用往往需要同时连接多个 MCP Server。ClientSessionGroup 正是为此设计的——它管理多个 ClientSession,并将所有服务器的 tools、resources、prompts 聚合到统一的命名空间中。
graph LR
subgraph ClientSessionGroup
TOOLS[聚合 Tools 字典]
RES[聚合 Resources 字典]
PROMPTS[聚合 Prompts 字典]
end
subgraph Server A - 文件系统
SA[ClientSession A]
TA["read_file, write_file"]
end
subgraph Server B - 数据库
SB[ClientSession B]
TB["query, execute"]
end
subgraph Server C - Web 搜索
SC[ClientSession C]
TC["search, fetch"]
end
SA --> TA
SB --> TB
SC --> TC
TOOLS -->|路由| SA
TOOLS -->|路由| SB
TOOLS -->|路由| SC
ClientSessionGroup 实现了 async with 协议,用 AsyncExitStack 管理所有子会话的生命周期:
class ClientSessionGroup:
def __init__(
self,
exit_stack: contextlib.AsyncExitStack | None = None,
component_name_hook: _ComponentNameHook | None = None,
):
self._tools = {}
self._resources = {}
self._prompts = {}
self._sessions = {}
self._tool_to_session = {}
if exit_stack is None:
self._exit_stack = contextlib.AsyncExitStack()
self._owns_exit_stack = True
else:
self._exit_stack = exit_stack
self._owns_exit_stack = False
这里有一个精妙的设计:exit_stack 可以外部传入,也可以内部创建。如果外部传入,ClientSessionGroup 不负责关闭它(_owns_exit_stack = False)。这使得 ClientSessionGroup 可以嵌入到更大的资源管理体系中。
连接新服务器通过 connect_to_server 完成:
async with ClientSessionGroup() as group:
session_a = await group.connect_to_server(
StdioServerParameters(command="python", args=["-m", "my_server"])
)
session_b = await group.connect_to_server(
StreamableHttpParameters(url="http://localhost:8080/mcp")
)
# 此时 group.tools 包含两个服务器的所有工具
result = await group.call_tool("read_file", {"path": "/tmp/test.txt"})
_establish_session 方法展示了传输层的选择逻辑——根据 ServerParameters 的具体类型(StdioServerParameters、SseServerParameters、StreamableHttpParameters)自动选择对应的传输层实现。每个会话都有独立的 AsyncExitStack,确保单个服务器断连不会影响其他会话。
当多个服务器提供同名工具时,ClientSessionGroup 默认会抛出错误。但它提供了 component_name_hook 机制来自定义命名策略:
name_fn = lambda name, server_info: f"{server_info.name}_{name}"
async with ClientSessionGroup(component_name_hook=name_fn) as group:
await group.connect_to_server(server_a_params) # tools: "serverA_read"
await group.connect_to_server(server_b_params) # tools: "serverB_read"
聚合过程使用临时字典来保证原子性——如果聚合过程中任何一步失败,已有的聚合状态不会被污染:
async def _aggregate_components(self, server_info, session):
prompts_temp: dict[str, types.Prompt] = {}
resources_temp: dict[str, types.Resource] = {}
tools_temp: dict[str, types.Tool] = {}
# ... 收集所有组件到临时字典 ...
# 检查重复
matching_tools = tools_temp.keys() & self._tools.keys()
if matching_tools:
raise MCPError(...)
# 原子性地合并
self._tools.update(tools_temp)
disconnect_from_server 支持运行时动态移除某个服务器。它通过 _ComponentNames 反向索引快速定位该会话注册的所有组件,逐一清理:
async def disconnect_from_server(self, session):
component_names = self._sessions.pop(session)
for name in component_names.tools:
del self._tools[name]
del self._tool_to_session[name]
# 关闭该会话的 exit_stack
session_stack = self._session_exit_stacks.pop(session)
await session_stack.aclose()
Python SDK 提供了三种传输层实现,它们都遵循相同的模式——作为异步上下文管理器,yield 出 (read_stream, write_stream) 元组。
stdio_client 通过 anyio.open_process 启动子进程,将子进程的 stdin/stdout 包装为 MCP 消息流。它在内部启动两个并发任务:stdout_reader 负责从子进程读取 JSON-RPC 消息并写入 read_stream,stdin_writer 负责从 write_stream 读取消息并写入子进程的 stdin。
关闭时遵循 MCP 规范的 graceful shutdown 序列:先关闭 stdin,等待进程自行退出;超时后发送 SIGTERM;仍未退出则发送 SIGKILL。
SSE 传输使用 httpx + httpx-sse 库建立长连接。Streamable HTTP 传输是 SSE 的演进版本,支持双向流式通信,并引入了 mcp-session-id 头来维护会话状态。两者都可以通过 OAuth2Auth 中间件进行认证。
session_group.py 中定义了三种传输参数的 Pydantic 模型,统一为 ServerParameters 类型别名:
ServerParameters: TypeAlias = (
StdioServerParameters | SseServerParameters | StreamableHttpParameters
)
这使得 connect_to_server 可以接受任意一种参数,内部通过 isinstance 分发到对应的传输层。
Python SDK 的 OAuth2 实现位于 client/auth/oauth2.py,支持 Authorization Code + PKCE 流程。PKCEParameters 类封装了 code_verifier/code_challenge 的生成:
class PKCEParameters(BaseModel):
code_verifier: str = Field(..., min_length=43, max_length=128)
code_challenge: str = Field(..., min_length=43, max_length=128)
@classmethod
def generate(cls) -> "PKCEParameters":
code_verifier = "".join(
secrets.choice(string.ascii_letters + string.digits + "-._~")
for _ in range(128)
)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
return cls(code_verifier=code_verifier, code_challenge=code_challenge)
认证流程通过 TokenStorage 协议实现 token 的持久化存储,应用可以自定义存储后端(文件、数据库、密钥链等)。OAuth2Auth 作为 httpx 的 Auth 中间件,自动在请求中注入 Bearer token 并处理 token 刷新。
通过对比两个 SDK 的客户端实现,可以提炼出以下关键差异:
| 维度 | Python SDK | TypeScript SDK |
|---|---|---|
| 异步运行时 | anyio(兼容 asyncio/trio) | 原生 async/await |
| 生命周期管理 | async with + AsyncExitStack | 手动 connect()/close() |
| 类型校验 | Pydantic TypeAdapter | Zod schema |
| 回调类型 | Protocol(结构化子类型) | TypeScript 函数类型 |
| 消息分发 | match/case 模式匹配 | switch/case |
| 多服务器 | 内置 ClientSessionGroup | 需自行实现 |
| 工具结果校验 | 内置 _validate_tool_result | 无内置校验 |
| 传输层切换 | isinstance + 上下文管理器 | Transport 接口 |
其中最值得关注的差异有三点:
第一,生命周期管理。 Python 的 async with 将资源获取与释放绑定在语法结构上,不可能忘记关闭连接。TypeScript 需要开发者自觉调用 close(),或使用 try/finally。AsyncExitStack 的嵌套使用更是 Python 独有的模式——ClientSessionGroup 用主 exit_stack 管理所有子会话的 exit_stack,形成资源管理的树状结构。
第二,多服务器聚合。 ClientSessionGroup 是 Python SDK 独有的抽象。它不仅管理连接,还提供了组件聚合、命名冲突检测、动态断连等高级功能。TypeScript SDK 没有对应的内置实现,开发者需要自行管理多个 Client 实例。
第三,工具结果校验。 Python 的 call_tool 在返回结果前会自动校验 structured_content 是否符合工具声明的 output_schema。这利用了 jsonschema 库进行运行时校验,为 Agent 应用提供了额外的安全保障。TypeScript 端没有这个内置机制。
本章深入分析了 MCP Python SDK 的客户端实现。ClientSession 作为单服务器会话的核心抽象,继承自 BaseSession 并提供了完整的 MCP 协议操作方法;ClientSessionGroup 在此基础上实现了多服务器管理与组件聚合。Python SDK 充分利用了 anyio 的结构化并发、async with 上下文管理器、Pydantic 类型校验等语言特性,在保持与 TypeScript SDK 协议兼容性的同时,提供了更符合 Python 生态习惯的 API 设计。传输层的三种实现(stdio、SSE、Streamable HTTP)通过统一的读写流抽象与上下文管理器模式,实现了对应用层的完全透明。OAuth2 认证则作为可插拔的 httpx 中间件,为 HTTP 类传输提供了标准化的安全方案。