巨峰眼最新版本
46.25MB · 2025-11-14
模型上下文协议(Model Context Protocol, MCP)是一种开放协议,用于标准化应用程序向大语言模型(LLM)提供工具和上下文的方式。LangChain智能体可借助langchain-mcp-adapters库,使用在MCP服务器上定义的工具。
若要在 LangGraph 中使用 MCP 工具,请安装 langchain-mcp-adapters 库:
pip install langchain-mcp-adapters
模型上下文协议(MCP)支持多种用于客户端 - 服务器通信的传输机制:
langchain-mcp-adapters库支持智能体使用在一个或多个MCP服务器上定义的工具。
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from config import api_key, api_base
def init_model():
model = init_chat_model(
api_key = api_key,
base_url = api_base,
model = "Qwen/Qwen3-8B",
model_provider = "openai",
temperature = 0.7,
)
return model
model = init_model()
client = MultiServerMCPClient(
{
"math": {
"transport": "stdio", # Local subprocess communication
"command": "python",
# Absolute path to your math_server.py file
"args": ["/your/path/to/mcp/math_server.py"],
},
"weather": {
"transport": "streamable_http", # HTTP-based remote server
# Ensure you start your weather server on port 8000
"url": "http://localhost:8000/mcp",
}
}
)
async def main():
# 异步获取工具
tools = await client.get_tools()
agent = create_agent(
model,
tools
)
math_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "what's (3 + 5) x 12?"}]}
)
print("Math response:", math_response)
weather_response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "what is the weather in nyc?"}]}
)
print("Weather response:", weather_response)
# 运行异步主函数
if __name__ == "__main__":
asyncio.run(main())
若要创建自己的MCP服务,可使用mcp库。该库提供了一种简单的方式来定义工具,并将这些工具作为服务器运行。
pip install mcp
可以下参考实现,在MCP工具服务器上测试你的智能体。
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Math")
@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b
@mcp.tool()
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b
if __name__ == "__main__":
mcp.run(transport="stdio")
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Weather")
@mcp.tool()
async def get_weather(location: str) -> str:
"""Get weather for location."""
return "It's always sunny in New York"
if __name__ == "__main__":
mcp.run(transport="streamable-http")
对于在工具调用之间维持上下文的有状态服务器,可使用 client.session() 创建一个持久化的客户端会话。
from langchain_mcp_adapters.tools import load_mcp_tools
client = MultiServerMCPClient({...})
async with client.session("math") as session:
tools = await load_mcp_tools(session)