简打卡
51.76M · 2026-03-28
前两篇我们已经讲清楚:
这一篇只做一件事:把 MCP 真正跑起来,并尽量接近生产实践。
你将看到:
建议按下面顺序实践:
STDIO(本地最简单)SSE(理解远程双通道)Streamable HTTP(生产部署首选)Server 端:
import asyncio
from mcp.server.lowlevel import Server
from mcp.server.stdio import stdio_server
mcp = Server("mysql_mcp_server")
@mcp.list_tools()
async def list_tools():
return []
@mcp.call_tool()
async def call_tool(name: str, arguments: dict):
return []
async def main():
async with stdio_server() as (read_stream, write_stream):
await mcp.run(read_stream, write_stream, mcp.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())
Client 端:
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
server_params = StdioServerParameters(command="python", args=["mysqlMCPServer.py"])
async with stdio_client(server_params) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
tools = await session.list_tools()
print(tools)
if __name__ == "__main__":
asyncio.run(main())
适用场景:本地调试、单机测试、快速验证业务逻辑。
Server 端:
from fastapi import FastAPI, Request
from mcp.server.sse import SseServerTransport
from starlette.routing import Mount
from mysqlMCPServer import mcp
app = FastAPI()
sse = SseServerTransport("/messages/")
app.router.routes.append(Mount("/messages", app=sse.handle_post_message))
@app.get("/sse")
async def handle_sse(request: Request):
async with sse.connect_sse(request.scope, request.receive, request._send) as (read_stream, write_stream):
await mcp.run(read_stream, write_stream, mcp.create_initialization_options())
Client 端:
import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client
async def main():
async with sse_client(url="http://localhost:8000/sse") as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
result = await session.call_tool("execute_sql", {"query": "SELECT 1"})
print(result)
if __name__ == "__main__":
asyncio.run(main())
适用场景:远程部署、需要服务端主动推送结果或进度。
Server 端:
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.applications import Starlette
from starlette.routing import Mount
from mysqlMCPServer import mcp
session_manager = StreamableHTTPSessionManager(
app=mcp,
event_store=None,
json_response=None,
stateless=True,
)
async def handle_streamable_http(scope, receive, send):
await session_manager.handle_request(scope, receive, send)
starlette_app = Starlette(routes=[Mount("/mcp", app=handle_streamable_http)])
Client 端:
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async def main():
async with streamablehttp_client(url="http://localhost:8000/mcp") as (read_stream, write_stream, get_session_id):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
print("session_id:", get_session_id())
result = await session.call_tool("execute_sql", {"query": "SELECT 1"})
print(result)
if __name__ == "__main__":
asyncio.run(main())
适用场景:云部署、无状态扩展、网关和中间件友好。
下面直接给你一套可复制即跑的代码:一个 Server、一个 Agent Client。
import os
import contextlib
import logging
from collections.abc import AsyncIterator
import uvicorn
from dotenv import load_dotenv
from mysql.connector import connect
from mcp.server.lowlevel import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import Tool, TextContent
from starlette.applications import Starlette
from starlette.routing import Mount
from starlette.types import Scope, Receive, Send
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_streamable_server")
load_dotenv()
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "8000"))
DB_CONFIG = {
"host": os.getenv("MYSQL_HOST", "127.0.0.1"),
"user": os.getenv("MYSQL_USER", "root"),
"password": os.getenv("MYSQL_PASSWORD", ""),
"database": os.getenv("MYSQL_DATABASE", "test"),
}
mcp = Server("mysql_mcp_server")
@mcp.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="execute_sql",
description="执行只读 SQL 查询(建议 SELECT)。",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "要执行的 SQL 语句"}
},
"required": ["query"],
},
)
]
@mcp.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name != "execute_sql":
raise ValueError(f"Unknown tool: {name}")
query = arguments.get("query", "").strip()
if not query:
raise ValueError("query is required")
if not query.upper().startswith("SELECT"):
raise ValueError("only SELECT is allowed in this demo")
with connect(**DB_CONFIG) as conn:
with conn.cursor() as cursor:
cursor.execute(query)
cols = [c[0] for c in cursor.description]
rows = cursor.fetchall()
lines = [",".join(cols)] + [",".join(map(str, r)) for r in rows]
return [TextContent(type="text", text="n".join(lines))]
session_manager = StreamableHTTPSessionManager(
app=mcp,
event_store=None,
json_response=None,
stateless=True,
)
async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
await session_manager.handle_request(scope, receive, send)
@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
async with session_manager.run():
logger.info("Streamable HTTP MCP Server started")
yield
app = Starlette(
routes=[Mount("/mcp", app=handle_streamable_http)],
lifespan=lifespan,
)
if __name__ == "__main__":
uvicorn.run(app, host=HOST, port=PORT, log_level="info")
import asyncio
from dotenv import load_dotenv
from langchain.ch@t_models import init_ch@t_model
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
load_dotenv()
llm = init_ch@t_model(
model="deepseek-ch@t",
temperature=0,
model_provider="deepseek",
)
async def main():
client = MultiServerMCPClient(
{
"mysql_streamable": {
"url": "http://127.0.0.1:8000/mcp",
"transport": "streamable_http",
}
}
)
tools = await client.get_tools()
agent = create_react_agent(
model=llm,
tools=tools,
prompt=SystemMessage(content="你是数据分析助手,优先调用工具并输出结论。"),
checkpointer=InMemorySaver(),
)
config = {"configurable": {"thread_id": "thread-1"}}
result = await agent.ainvoke(
{"messages": [HumanMessage(content="查询 goods 表前 5 条数据,并给出一句摘要")]},
config=config,
)
print(result["messages"][-1].content)
if __name__ == "__main__":
asyncio.run(main())
DEEPSEEK_API_KEY、MYSQL_HOST、MYSQL_USER、MYSQL_PASSWORD、MYSQL_DATABASE。async with 管理连接和会话。initialize() 失败怎么办?排查顺序建议:
STDIOSSEStreamable HTTP到这里,你已经完成了从“理解 MCP”到“落地 MCP”的闭环:
MCP 的价值,不止是“能调用工具”,更是让 AI 应用具备可维护、可扩展、可上线的工程能力。