创源素材
67.15M · 2026-02-04
你会看到一个核心结论:
interrupt()的工作机制:如何暂停、返回什么、如何恢复thread_id(否则无法跨次恢复)interrupt_before/interrupt_after的使用场景(“强制拦截点”)pip install -U langgraph langchain-core在真实业务里,智能体经常会遇到三种“必须让人上场”的情况:
官方文档给了一个很工程化的分类(你可以当作项目里的错误处理策略表):
当节点里调用interrupt(payload)时:
__interrupt__)Command(resume=...)形式提供输入后,图会继续执行官方文档强调:interrupt()必须最先执行。
因为恢复时会从“被中断的节点”重新开始执行;如果你在 interrupt 之前做了写库/扣费/发邮件等动作,恢复会导致重复执行。
当你恢复(resume)时,执行会从“发生中断的那个节点”的开头重新开始。 因此节点设计要满足两点:
最稳妥的做法是:
interrupt()之后再执行,并保证可重入(幂等/可回滚)这是最常用的 HITL 模式:遇到缺参就暂停,等用户给出信息,再回到同一节点继续往下走。
from typing import Literal, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
class State(TypedDict):
customer_id: str
customer_history: dict
def lookup_customer_history(state: State) -> Command[Literal["lookup_customer_history", END]]:
# interrupt() 必须放在最前面
ifnot state.get("customer_id"):
user_input = interrupt(
{
"message": "Customer ID needed",
"request": "请提供 customer_id,用于查询订阅历史",
}
)
return Command(update={"customer_id": user_input["customer_id"]}, goto="lookup_customer_history")
# 这里用假数据模拟:实际项目中换成数据库/CRM API
history = {"tier": "pro", "active": True}
return Command(update={"customer_history": history}, goto=END)
builder = StateGraph(State)
builder.add_node("lookup_customer_history", lookup_customer_history)
builder.add_edge(START, "lookup_customer_history")
builder.add_edge("lookup_customer_history", END)
# 注意:想要真正“暂停后还能恢复”,必须加 checkpointer(下一节会讲)
graph = builder.compile()
典型场景:
核心做法:
plan(要做什么)interrupt()把 plan 给人看approved / edited_plan / rejected很多业务里,“不同意执行”并不等于结束,而是要转到人工流程(工单/客服/运营/风控复核)。把它做成一个标准闭环:
thread_id以Command(resume=...)让流程继续(例如再次审批或直接执行)落地时,建议在 state 里固定这些字段(便于审计与可观测):
review_status:"approved" | "rejected" | "handoff"review_reason: 人工原因/备注ticket_id: 工单号(转人工时)做法很简单:在“转人工”时,不要直接goto=END,而是跳到一个“等待人工完成”的节点,再interrupt()一次,等人工完成后用同一个thread_id恢复继续跑。
示意思路(基于上面的例子改动):
def human_review(state: State) -> Command[Literal["execute", "handoff_wait", END]]:
decision = interrupt({...})
if decision.get("approved"):
...
return Command(update={"approved_plan": edited}, goto="execute")
if decision.get("action") == "handoff":
# 记录工单并进入“等待人工”节点
ticket_id = create_ticket_api(...) # 伪代码
return Command(
update={"review_status": "handoff", "ticket_id": ticket_id},
goto="handoff_wait",
)
...
def handoff_wait(state: State) -> Command[Literal["execute", END]]:
# 第二次暂停:等人工处理完,再 resume 进来
resume_payload = interrupt(
{
"message": f"工单 {state['ticket_id']} 已创建,请人工处理完再恢复",
"action": "handoff_done 或 reject",
}
)
if resume_payload.get("action") == "handoff_done":
return Command(goto="execute") # 或 goto 回人审节点再看一眼
return Command(goto=END)
恢复调用时,只要带上同一个thread_id即可让图找到之前的暂停点:
config = {"configurable": {"thread_id": "demo_thread"}}
resume_cmd = Command(resume={"action": "handoff_done"})
app.invoke(resume_cmd, config)
这样就形成了闭环:转人工 → 创建工单 → 暂停 → 人工处理 → resume → 自动流程继续。
interrupt()能暂停,但要能“过一会儿再恢复”,必须把 state 保存起来。
LangGraph 用checkpointer做这件事(第 2 讲你已经见过它)。
最简单的示例是内存版:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
app = graph.compile(checkpointer=memory)
生产建议:内存版只适合演示。生产环境通常选择:
thread_id取回状态)你恢复时必须带上同一个thread_id,它相当于这条对话/任务的唯一 key。
config = {"configurable": {"thread_id": "order_001"}}
建议thread_id直接绑定业务主键:
ticket_123order_987user_42_session_202601这个例子把 HITL 的主流程跑通:
human_review处interrupt()暂停,等待人工审批/编辑Command(resume=...)恢复执行from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
user_request: str
plan: str
approved_plan: str
result: str
review_status: str
review_reason: str
ticket_id: str
def make_plan(state: State) -> Command[Literal["human_review"]]:
# 实际项目里这里可以是 LLM 生成 plan(此处用固定模板示意)
plan = f"将执行动作:调用工具完成『{state['user_request']}』"
return Command(update={"plan": plan}, goto="human_review")
def human_review(state: State) -> Command[Literal["execute", END]]:
# interrupt() 必须放在最前面
decision = interrupt(
{
"plan": state.get("plan", ""),
"action": "请审核该计划:approved=True 表示同意;也可提供 edited_plan 进行修改",
}
)
if decision.get("approved") isTrue:
edited = decision.get("edited_plan", state.get("plan", ""))
return Command(update={"approved_plan": edited}, goto="execute")
# 拒绝:记录原因并结束(或你也可以 goto 到一个“通知用户/写审计日志”的节点)
if decision.get("action") == "reject":
return Command(
update={
"review_status": "rejected",
"review_reason": decision.get("reason", ""),
},
goto=END,
)
# 转人工:创建工单(示意),写入 ticket_id 后结束自动流程
if decision.get("action") == "handoff":
ticket_id = f"TICKET_{state.get('user_request','')[:6]}_001"# 示例:真实项目替换为工单系统 API
return Command(
update={
"review_status": "handoff",
"review_reason": decision.get("reason", ""),
"ticket_id": ticket_id,
},
goto=END,
)
# 兜底:当作拒绝处理
return Command(update={"review_status": "rejected"}, goto=END)
def execute(state: State) -> Command[Literal[END]]:
# 这里模拟执行:实际项目里换成工具调用/写库等操作
result = f"已执行:{state.get('approved_plan')}"
return Command(update={"result": result}, goto=END)
builder = StateGraph(State)
builder.add_node("make_plan", make_plan)
builder.add_node("human_review", human_review)
builder.add_node("execute", execute)
builder.add_edge(START, "make_plan")
memory = MemorySaver()
app = builder.compile(checkpointer=memory)
# 1) 第一次执行:会在 human_review 暂停
config = {"configurable": {"thread_id": "demo_thread"}}
first = app.invoke(
{
"user_request": "给客户发送退款确认邮件",
"plan": "",
"approved_plan": "",
"result": "",
"review_status": "",
"review_reason": "",
"ticket_id": "",
},
config,
)
print(first.get("__interrupt__"))
# 2) 恢复执行:人工审批 + 可选编辑
resume_cmd = Command(resume={"approved": True, "edited_plan": "将执行动作:发送退款确认邮件(已人工确认)"})
final = app.invoke(resume_cmd, config)
print(final.get("result"))
# 也可以这样恢复:拒绝 / 转人工(示意)
# reject_cmd = Command(resume={"action": "reject", "reason": "风险较高,需补充证据材料"})
# handoff_cmd = Command(resume={"action": "handoff", "reason": "转客服复核并人工发送邮件"})
我拿到了__interrupt__,接下来怎么让人输入并回传 resume?
你可以把它理解成一个“可序列化的待办任务”:
__interrupt__(里面是你给人的 payload)Command(resume=...)下面给一个“最小控制台交互”的示例骨架(便于你将来换成 Web 表单):
from langgraph.types import Command
config = {"configurable": {"thread_id": "demo_thread"}}
result = app.invoke(initial_state, config)
interrupt_payload = result.get("__interrupt__")
if interrupt_payload:
# 1) 展示给人(真实项目里:返回给前端/写入工单系统)
print("需要人工处理:", interrupt_payload)
# 2) 收集输入(示例用控制台;真实项目用表单)
approved = input("是否同意执行?(y/n): ").strip().lower() == "y"
edited_plan = input("可选:编辑计划(留空表示不改): ").strip()
resume = {"approved": approved}
if edited_plan:
resume["edited_plan"] = edited_plan
# 3) 恢复执行(同 thread_id)
final = app.invoke(Command(resume=resume), config)
print("执行结果:", final)
关键要点:
thread_id:否则系统找不到上一次的暂停点__interrupt__当作“待办任务”的载体:可以写入 DB、发到审批系统、挂到工单在一些场景下,你不希望每个节点都手写interrupt(),而是想:
这时可以用interrupt_before / interrupt_after(不同版本参数名称可能略有差异;以你当前 LangGraph 版本文档为准)。
示意(思路型代码):
# app = builder.compile(
# checkpointer=memory,
# interrupt_before=["execute_tool"], # 执行工具之前强制暂停
# interrupt_after=["draft_response"], # 草稿生成后强制暂停
# )
工程建议:
不要卡住:你完全可以用“在节点里手写interrupt()”实现同样效果。
interrupt()写在目标节点函数的第一行(执行动作前)interrupt()(相当于“生成后暂停”)本讲你已经掌握了让智能体“会协作”的关键能力:
interrupt()实现 Pause → Persist → Resumethread_id支撑跨次恢复