以观书法
108.85M · 2026-02-05
本文是 LangGraph 系列的第 4 讲,在前三讲的基础上,我们将深入探讨如何让智能体调用外部工具和系统,实现真正的"智能代理"能力。
一个真正有用的智能体,不仅要能"记住",更要能"行动"——调用 API、查询数据库、执行代码、搜索信息等。这一讲,我们将学习如何让智能体具备这些能力。
️ 已准备好:
️ 已准备好:
工具调用(Function Calling)是让 LLM 能够调用外部函数或 API 的机制。通过工具调用,智能体可以:
用户输入
↓
模型分析需求 → 决定调用哪个工具 → 生成工具调用请求
↓
执行工具 → 获取结果
↓
将结果返回给模型 → 模型基于结果生成最终回复
在 LangGraph 中,工具调用涉及以下组件:
最简单的方式是使用 LangChain 的@tool装饰器:
from langchain_core.tools import tool
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.checkpoint.memory import InMemorySaver
from dotenv import load_dotenv
from pathlib import Path
# 环境配置
env_path = Path("../../") / "config" / ".env"
load_dotenv(dotenv_path=env_path)
# 初始化模型
model = init_chat_model(model="deepseek-chat", model_provider="deepseek")
# 定义工具:计算器
@tool
def calculator(expression: str) -> str:
"""
执行数学表达式计算
Args:
expression: 要计算的数学表达式,例如 "2 + 3 * 4"
Returns:
计算结果
"""
try:
result = eval(expression)
returnf"计算结果:{result}"
except Exception as e:
returnf"计算错误:{str(e)}"
# 定义工具:获取当前时间
@tool
def get_current_time() -> str:
"""
获取当前系统时间
Returns:
格式化的时间字符串
"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 将工具绑定到模型
tools = [calculator, get_current_time]
model_with_tools = model.bind_tools(tools)
def agent_node(state: MessagesState):
"""
智能体节点:处理用户输入,调用工具,生成回复
"""
messages = state["messages"]
# 调用模型(模型可能会生成工具调用请求)
response = model_with_tools.invoke(messages)
# 检查是否有工具调用
if hasattr(response, "tool_calls") and response.tool_calls:
# 执行工具调用
tool_results = []
for tool_call in response.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
# 根据工具名称执行对应的工具
if tool_name == "calculator":
result = calculator.invoke(tool_args)
elif tool_name == "get_current_time":
result = get_current_time.invoke(tool_args)
else:
result = f"未知工具:{tool_name}"
# 创建 ToolMessage
from langchain_core.messages import ToolMessage
tool_results.append(
ToolMessage(
content=str(result),
tool_call_id=tool_call["id"]
)
)
# 返回 AI 消息和工具结果
return {"messages": [response] + tool_results}
# 没有工具调用,直接返回模型回复
return {"messages": [response]}
# 构建图
checkpointer = InMemorySaver()
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
# 添加条件边:如果有工具调用,继续执行;否则结束
def should_continue(state: MessagesState) -> str:
messages = state["messages"]
last_message = messages[-1]
# 如果最后一条消息是 ToolMessage,需要继续让模型处理
if hasattr(last_message, "type") and last_message.type == "tool":
return"agent"
return"end"
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("agent", END)
graph = builder.compile(checkpointer=checkpointer)
# 测试
config = {"configurable": {"thread_id": "tool-demo-1"}}
response = graph.invoke(
{"messages": "帮我计算 123 * 456 + 789,然后告诉我现在几点了"},
config
)
print(response["messages"][-1].content)
运行结果:
计算结果:123 × 456 + 789 = 56,877
现在的时间是:2026年1月18日 10:16:48
| 特性 | @tool装饰器 | StructuredTool |
|---|---|---|
| 使用方式 | 装饰器,最简单 | 需要显式创建 Pydantic Schema |
| 参数推断 | 自动从函数签名和 docstring 推断 | 需要手动定义args_schema |
| 参数验证 | 基础类型验证(str, int 等) | 完整的 Pydantic 验证(类型、范围、默认值等) |
| 适用场景 | 简单工具(1-2 个参数,类型简单) | 复杂工具(多个参数、需要验证、有默认值、嵌套结构) |
| 灵活性 | 较低,依赖函数签名 | 高,可精确控制每个参数 |
适合场景:
示例:
@tool
def calculator(expression: str) -> str:
"""执行数学计算"""
return str(eval(expression))
@tool
def get_time() -> str:
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
适合场景:
示例:需要多个参数且有默认值
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from typing import Optional
class WeatherQuery(BaseModel):
"""天气查询参数"""
city: str = Field(description="要查询的城市名称,例如:北京、上海")
unit: str = Field(
default="celsius",
description="温度单位:'celsius'(摄氏度)或 'fahrenheit'(华氏度)"
)
date: Optional[str] = Field(
default=None,
description="查询日期,格式:YYYY-MM-DD,默认为今天"
)
def get_weather(city: str, unit: str = "celsius", date: str = None) -> str:
"""
获取指定城市的天气信息
Args:
city: 城市名称
unit: 温度单位
date: 查询日期
Returns:
天气信息字符串
"""
# 这里只是示例,实际应该调用天气 API
date_str = date or"今天"
returnf"{city} {date_str}的天气:晴天,温度 25°{unit[0].upper()}"
weather_tool = StructuredTool.from_function(
func=get_weather,
name="get_weather",
description="获取指定城市的天气信息,支持查询今天或指定日期的天气",
args_schema=WeatherQuery
)
示例:需要参数验证(数值范围)
from pydantic import BaseModel, Field, validator
class SearchQuery(BaseModel):
"""搜索查询参数"""
query: str = Field(description="搜索关键词")
max_results: int = Field(
default=10,
description="最大返回结果数",
ge=1, # 最小值:至少 1
le=100# 最大值:最多 100
)
sort_by: str = Field(
default="relevance",
description="排序方式",
pattern="^(relevance|date|rating)$"# 正则表达式验证
)
def search(query: str, max_results: int = 10, sort_by: str = "relevance") -> str:
"""执行搜索"""
returnf"搜索 '{query}',返回 {max_results} 条结果,按 {sort_by} 排序"
search_tool = StructuredTool.from_function(
func=search,
name="search",
description="搜索信息",
args_schema=SearchQuery
)
@tool:对于大多数简单工具,@tool更简洁高效StructuredTool:当你的工具需要:实际项目中,90% 的工具用@tool就够了,只有遇到复杂参数场景时才用StructuredTool。
实际应用中,智能体可能需要多次调用工具才能完成任务。需要实现一个循环机制:
from langchain_core.messages import AIMessage, ToolMessage
from langgraph.graph import StateGraph, START, MessagesState, END
from langgraph.checkpoint.memory import InMemorySaver
def agent_node(state: MessagesState):
"""
智能体节点:处理消息,可能生成工具调用
"""
messages = state["messages"]
response = model_with_tools.invoke(messages)
return {"messages": [response]}
def tool_node(state: MessagesState):
"""
工具执行节点:执行所有待处理的工具调用
"""
messages = state["messages"]
last_message = messages[-1]
# 检查最后一条消息是否是包含工具调用的 AI 消息
ifnot isinstance(last_message, AIMessage) ornot last_message.tool_calls:
return {"messages": []}
# 执行所有工具调用
tool_results = []
for tool_call in last_message.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
# 根据工具名称执行工具(这里简化处理)
if tool_name == "calculator":
result = calculator.invoke(tool_args)
elif tool_name == "get_current_time":
result = get_current_time.invoke(tool_args)
else:
result = f"未知工具:{tool_name}"
tool_results.append(
ToolMessage(
content=str(result),
tool_call_id=tool_call["id"]
)
)
return {"messages": tool_results}
def should_continue(state: MessagesState) -> str:
"""
判断下一步:继续调用工具,还是结束
"""
messages = state["messages"]
last_message = messages[-1]
# 如果最后一条是工具结果,继续让模型处理
if isinstance(last_message, ToolMessage):
return"agent"
# 如果最后一条是 AI 消息且包含工具调用,执行工具
if isinstance(last_message, AIMessage) and last_message.tool_calls:
return"tools"
# 否则结束
return"end"
# 构建图
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
builder.add_edge("agent", END)
graph = builder.compile(checkpointer=checkpointer)
import requests
from langchain_core.tools import tool
@tool
def search_web(query: str) -> str:
"""
搜索网络信息(示例:使用 DuckDuckGo API)
Args:
query: 搜索关键词
Returns:
搜索结果摘要
"""
try:
# 这里使用示例 API,实际项目中替换为真实的搜索 API
url = f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1"
response = requests.get(url, timeout=5)
data = response.json()
if data.get("AbstractText"):
return data["AbstractText"]
return"未找到相关信息"
except Exception as e:
returnf"搜索出错:{str(e)}"
from langchain_core.tools import tool
import sqlite3
@tool
def query_database(sql: str) -> str:
"""
执行 SQL 查询(示例:查询用户信息)
Args:
sql: SQL 查询语句
Returns:
查询结果
"""
try:
# 连接到数据库(示例)
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
cursor.execute(sql)
results = cursor.fetchall()
conn.close()
returnf"查询结果:{results}"
except Exception as e:
returnf"查询出错:{str(e)}"
实际应用中,工具调用可能会失败。我们需要实现错误处理和重试:
from langchain_core.messages import AIMessage, ToolMessage
import time
def tool_node_with_retry(state: MessagesState, max_retries: int = 3):
"""
带重试机制的工具执行节点
"""
messages = state["messages"]
last_message = messages[-1]
ifnot isinstance(last_message, AIMessage) ornot last_message.tool_calls:
return {"messages": []}
tool_results = []
for tool_call in last_message.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
# 重试逻辑
result = None
for attempt in range(max_retries):
try:
if tool_name == "calculator":
result = calculator.invoke(tool_args)
elif tool_name == "get_current_time":
result = get_current_time.invoke(tool_args)
else:
result = f"未知工具:{tool_name}"
# 成功则跳出重试循环
break
except Exception as e:
if attempt < max_retries - 1:
time.sleep(1) # 等待 1 秒后重试
continue
else:
result = f"工具调用失败(已重试 {max_retries} 次):{str(e)}"
tool_results.append(
ToolMessage(
content=str(result),
tool_call_id=tool_call["id"]
)
)
return {"messages": tool_results}
在生产环境中,我们需要对工具的使用进行控制:
from typing import Dict, List
from langchain_core.tools import BaseTool
class ToolManager:
"""
工具管理器:控制工具的可用性和使用限制
"""
def __init__(self):
self.tool_usage_count: Dict[str, int] = {}
self.tool_limits: Dict[str, int] = {}
self.enabled_tools: List[str] = []
def register_tool(self, tool: BaseTool, max_uses: int = None):
"""注册工具并设置使用限制"""
self.tool_usage_count[tool.name] = 0
if max_uses:
self.tool_limits[tool.name] = max_uses
self.enabled_tools.append(tool.name)
return tool
def can_use_tool(self, tool_name: str) -> bool:
"""检查工具是否可用"""
if tool_name notin self.enabled_tools:
returnFalse
if tool_name in self.tool_limits:
return self.tool_usage_count[tool_name] < self.tool_limits[tool_name]
returnTrue
def record_tool_use(self, tool_name: str):
"""记录工具使用"""
if tool_name in self.tool_usage_count:
self.tool_usage_count[tool_name] += 1
# 使用示例
tool_manager = ToolManager()
calculator = tool_manager.register_tool(calculator, max_uses=10) # 限制使用 10 次
我们构建一个综合示例,整合多个工具:
from langchain_core.tools import tool
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START, MessagesState
from langgraph.checkpoint.memory import InMemorySaver
# 定义多个工具
@tool
def calculator(expression: str) -> str:
"""执行数学计算"""
try:
return str(eval(expression))
except:
return"计算错误"
@tool
def get_time() -> str:
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@tool
def search_knowledge_base(query: str) -> str:
"""搜索知识库(模拟)"""
# 实际项目中这里会连接向量数据库
returnf"关于 '{query}' 的信息:这是模拟的搜索结果"
# 绑定所有工具
tools = [calculator, get_time, search_knowledge_base]
model_with_tools = model.bind_tools(tools)
# 构建完整的智能体图(包含循环逻辑)
# ...(参考前面的完整实现)
**持久执行(Durable Execution)**是一种技术,允许工作流在关键点保存进度,从而可以在暂停后从上次保存的位置恢复执行。这对于工具调用特别重要,因为:
在 LangGraph 中,如果你使用了checkpointer,持久执行就已经自动启用了:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, MessagesState
# 使用 checkpointer 启用持久执行
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 执行时需要指定 thread_id
config = {"configurable": {"thread_id": "tool-demo-1"}}
response = graph.invoke({"messages": "计算 123 * 456"}, config)
LangGraph 支持三种持久性模式(Durability Modes),你可以根据应用需求选择:
# 模式 1:exit(性能最优,但中间状态不保存)
graph.stream(
{"messages": "查询天气"},
config,
durability="exit"# 只在退出时保存
)
# 模式 2:async(平衡性能和持久性)
graph.stream(
{"messages": "查询天气"},
config,
durability="async"# 异步保存,性能好但有小风险
)
# 模式 3:sync(最安全,但性能开销最大)
graph.stream(
{"messages": "查询天气"},
config,
durability="sync"# 同步保存,确保每个检查点都写入
)
三种模式对比:
| 模式 | 性能 | 持久性 | 适用场景 |
|---|---|---|---|
| exit | ⭐⭐⭐⭐⭐ | ⭐ | 短时间运行、不需要恢复中间状态 |
| async | ⭐⭐⭐⭐ | ⭐⭐⭐ | 大多数生产环境场景 |
| sync | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 关键业务、需要高可靠性 |
当工具调用涉及非确定性操作(如随机数生成)或副作用操作(如 API 调用、文件写入)时,应该使用@task装饰器包装,确保恢复时不会重复执行:
from langgraph.func import task
from langchain_core.tools import tool
import requests
# 错误示例:直接在节点中调用 API
def tool_node_bad(state: MessagesState):
# 如果工作流恢复,这个 API 调用会被重复执行
result = requests.get("https://api.example.com/data").text
return {"messages": [ToolMessage(content=result)]}
# 正确示例:使用 @task 包装
@task
def fetch_api_data(url: str) -> str:
"""获取 API 数据"""
return requests.get(url).text[:100]
def tool_node_good(state: MessagesState):
# 使用 task,恢复时不会重复执行
task_result = fetch_api_data("https://api.example.com/data")
result = task_result.result() # 获取任务结果
return {"messages": [ToolMessage(content=result)]}
为什么需要使用 Tasks?
当工具调用失败时,持久执行允许我们从最后一个成功的检查点恢复:
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.messages import ToolMessage, AIMessage
def tool_node_with_retry(state: MessagesState):
"""带错误恢复的工具节点"""
messages = state["messages"]
last_message = messages[-1]
ifnot isinstance(last_message, AIMessage) ornot last_message.tool_calls:
return {"messages": []}
tool_results = []
for tool_call in last_message.tool_calls:
try:
# 执行工具调用
if tool_call["name"] == "calculator":
result = calculator.invoke(tool_call["args"])
else:
result = f"未知工具:{tool_call['name']}"
tool_results.append(
ToolMessage(
content=str(result),
tool_call_id=tool_call["id"]
)
)
except Exception as e:
# 工具调用失败,返回错误信息
tool_results.append(
ToolMessage(
content=f"工具调用失败:{str(e)}",
tool_call_id=tool_call["id"]
)
)
return {"messages": tool_results}
# 构建图
checkpointer = InMemorySaver()
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node_with_retry)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
builder.add_edge("agent", END)
graph = builder.compile(checkpointer=checkpointer)
# 执行工作流
config = {"configurable": {"thread_id": "tool-retry-demo"}}
try:
response = graph.invoke(
{"messages": "帮我计算 123 * 456"},
config
)
except Exception as e:
print(f"执行失败:{e}")
# 从失败点恢复:使用相同的 thread_id,输入为 None
response = graph.invoke(None, config)
print("已从最后一个检查点恢复执行")
下面是一个完整的示例,展示如何在工具调用中使用持久执行:
from langchain_core.tools import tool
from langchain.chat_models import init_chat_model
from langchain_core.messages import AIMessage, ToolMessage
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import task
from dotenv import load_dotenv
from pathlib import Path
import requests
import time
# 环境配置
env_path = Path("../../") / "config" / ".env"
load_dotenv(dotenv_path=env_path)
# 初始化模型
model = init_chat_model(model="deepseek-chat", model_provider="deepseek")
# 定义工具:使用 @task 包装 API 调用
@task
def fetch_weather_api(city: str) -> str:
"""获取天气信息(模拟 API 调用)"""
# 模拟网络延迟
time.sleep(0.5)
# 实际项目中这里会调用真实的天气 API
returnf"{city}的天气:晴天,温度 25°C"
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气信息"""
# 使用 task 包装,确保恢复时不会重复调用 API
task_result = fetch_weather_api(city)
return task_result.result()
# 将工具绑定到模型
tools = [get_weather]
model_with_tools = model.bind_tools(tools)
def agent_node(state: MessagesState):
"""智能体节点"""
messages = state["messages"]
response = model_with_tools.invoke(messages)
return {"messages": [response]}
def tool_node(state: MessagesState):
"""工具执行节点"""
messages = state["messages"]
last_message = messages[-1]
ifnot isinstance(last_message, AIMessage) ornot last_message.tool_calls:
return {"messages": []}
tool_results = []
for tool_call in last_message.tool_calls:
if tool_call["name"] == "get_weather":
result = get_weather.invoke(tool_call["args"])
else:
result = f"未知工具:{tool_call['name']}"
tool_results.append(
ToolMessage(
content=str(result),
tool_call_id=tool_call["id"]
)
)
return {"messages": tool_results}
def should_continue(state: MessagesState) -> str:
"""判断下一步"""
messages = state["messages"]
last_message = messages[-1]
if isinstance(last_message, ToolMessage):
return"agent"
if isinstance(last_message, AIMessage) and last_message.tool_calls:
return"tools"
return"end"
# 构建图(启用持久执行)
checkpointer = InMemorySaver()
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
builder.add_edge("agent", END)
graph = builder.compile(checkpointer=checkpointer)
# 测试:使用 sync 模式确保高可靠性
config = {"configurable": {"thread_id": "durable-tool-demo"}}
if __name__ == "__main__":
print("=== 测试持久执行的工具调用 ===")
# 使用 sync 模式执行
response = graph.invoke(
{"messages": "帮我查一下北京的天气"},
config
)
print("AI 回复:")
print(response["messages"][-1].content)
print()
print("=== 测试恢复机制 ===")
print("如果工作流中断,可以从最后一个检查点恢复")
print("使用相同的 thread_id,输入为 None 即可恢复")
exit、async或syncthread_id和None输入来恢复工作流本讲我们深入学习了工具调用机制,让智能体具备了"行动"能力:
@tool、StructuredTool)