1. 导读

你会看到一个核心结论:

  • Human-in-the-loop 不是“加一个 input()”
  • 它是“可恢复的长流程”能力:Pause → Persist → Resume(暂停 → 持久化 → 恢复)

1.1 本讲你将学会

  • interrupt()的工作机制:如何暂停、返回什么、如何恢复
  • 为什么必须配合checkpointer+thread_id(否则无法跨次恢复)
  • 两类常见模式:
  • interrupt_before/interrupt_after的使用场景(“强制拦截点”)
  • 一个可运行的综合示例:工具调用审批 + 人工编辑 + 恢复执行

1.2 前置要求

  • ️ Python 3.10+
  • 推荐安装:pip install -U langgraph langchain-core
  • LLM(示例沿用 DeepSeek)

2. 什么是 Human-in-the-loop?

在真实业务里,智能体经常会遇到三种“必须让人上场”的情况:

  • 缺信息(User-fixable):缺账号、缺订单、缺配置、指令不清 → 让用户/运营补齐信息再继续
  • 高风险动作(Approval required):退款、下单、发通知、写库、删数据 → 让人审批、编辑、确认后再执行
  • 质量兜底(Quality gate):对外回复、对外邮件、对外发布文案 → 让人审核,避免“模型自信胡说”

官方文档给了一个很工程化的分类(你可以当作项目里的错误处理策略表):

  • 临时错误(网络抖动/限流)→ 自动重试(RetryPolicy)
  • LLM 可恢复错误(工具失败/解析失败)→ 写入 state,回到 agent 再试
  • 用户可修复错误(缺信息/指令不清)→interrupt() 暂停,等人补齐
  • 未知错误→ 冒泡给开发者排查

3. interrupt():暂停与恢复

3.1 interrupt() 会发生什么?

当节点里调用interrupt(payload)时:

  • 图会立刻暂停
  • 当前 state 会通过checkpointer被保存
  • 这次执行会返回一个特殊字段(通常你会在结果中看到__interrupt__
  • 等你下次以Command(resume=...)形式提供输入后,图会继续执行

3.2 interrupt() 必须放在节点最前面

官方文档强调:interrupt()必须最先执行。 因为恢复时会从“被中断的节点”重新开始执行;如果你在 interrupt 之前做了写库/扣费/发邮件等动作,恢复会导致重复执行。

3.3 恢复会“从节点开头重跑”

当你恢复(resume)时,执行会从“发生中断的那个节点”的开头重新开始。 因此节点设计要满足两点:

  • 可重入(re-entrant):同一个节点被重复进入不会把系统搞乱
  • 幂等(idempotent):即使重复执行,也要保证幂等(重复扣费、重复发消息、重复写库)

最稳妥的做法是:

  • 把写库/扣费/发消息等“外部操作/事务性动作”留到后续独立节点,在interrupt()之后再执行,并保证可重入(幂等/可回滚)
  • 对外部系统写操作加幂等键(例如订单号 + step_id)

4. 模式一:缺信息就问人

这是最常用的 HITL 模式:遇到缺参就暂停,等用户给出信息,再回到同一节点继续往下走。

from typing import Literal, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command


class State(TypedDict):
    customer_id: str
    customer_history: dict


def lookup_customer_history(state: State) -> Command[Literal["lookup_customer_history", END]]:
    # interrupt() 必须放在最前面
    ifnot state.get("customer_id"):
        user_input = interrupt(
            {
                "message": "Customer ID needed",
                "request": "请提供 customer_id,用于查询订阅历史",
            }
        )
        return Command(update={"customer_id": user_input["customer_id"]}, goto="lookup_customer_history")

    # 这里用假数据模拟:实际项目中换成数据库/CRM API
    history = {"tier": "pro", "active": True}
    return Command(update={"customer_history": history}, goto=END)


builder = StateGraph(State)
builder.add_node("lookup_customer_history", lookup_customer_history)
builder.add_edge(START, "lookup_customer_history")
builder.add_edge("lookup_customer_history", END)

# 注意:想要真正“暂停后还能恢复”,必须加 checkpointer(下一节会讲)
graph = builder.compile()

️ 5. 模式二:高风险动作先审批

典型场景:

  • 工具调用会重复执行(写库/下单/发送消息/调用支付)
  • 你要做“先展示计划,再让人确认”

核心做法:

  1. 模型先生成一个plan(要做什么)
  2. 在审批节点调用interrupt()把 plan 给人看
  3. 人给出approved / edited_plan / rejected
  4. 根据人类决定:继续 / 拒绝 / 转人工

5.1 “拒绝/转人工处理”的落地路径(推荐)

很多业务里,“不同意执行”并不等于结束,而是要转到人工流程(工单/客服/运营/风控复核)。把它做成一个标准闭环:

  • 拒绝(Reject):记录拒绝原因 → 通知发起人 → 结束流程(或回到补充信息节点)
  • 转人工(Handoff):创建工单/任务 → 返回工单号 → 结束自动流程(等待人工在系统里处理)
  • 再进入(Optional):人工处理完后,用同thread_idCommand(resume=...)让流程继续(例如再次审批或直接执行)

落地时,建议在 state 里固定这些字段(便于审计与可观测):

  • review_status:"approved" | "rejected" | "handoff"
  • review_reason: 人工原因/备注
  • ticket_id: 工单号(转人工时)

5.2 转人工后如何再恢复?

做法很简单:在“转人工”时,不要直接goto=END,而是跳到一个“等待人工完成”的节点,再interrupt()一次,等人工完成后用同一个thread_id恢复继续跑。

示意思路(基于上面的例子改动):

def human_review(state: State) -> Command[Literal["execute", "handoff_wait", END]]:
    decision = interrupt({...})
    if decision.get("approved"):
        ...
        return Command(update={"approved_plan": edited}, goto="execute")
    if decision.get("action") == "handoff":
        # 记录工单并进入“等待人工”节点
        ticket_id = create_ticket_api(...)  # 伪代码
        return Command(
            update={"review_status": "handoff", "ticket_id": ticket_id},
            goto="handoff_wait",
        )
    ...

def handoff_wait(state: State) -> Command[Literal["execute", END]]:
    # 第二次暂停:等人工处理完,再 resume 进来
    resume_payload = interrupt(
        {
            "message": f"工单 {state['ticket_id']} 已创建,请人工处理完再恢复",
            "action": "handoff_done 或 reject",
        }
    )
    if resume_payload.get("action") == "handoff_done":
        return Command(goto="execute")  # 或 goto 回人审节点再看一眼
    return Command(goto=END)

恢复调用时,只要带上同一个thread_id即可让图找到之前的暂停点:

config = {"configurable": {"thread_id": "demo_thread"}}
resume_cmd = Command(resume={"action": "handoff_done"})
app.invoke(resume_cmd, config)

这样就形成了闭环:转人工 → 创建工单 → 暂停 → 人工处理 → resume → 自动流程继续。

6. 为什么需要 checkpointer + thread_id?

6.1 checkpointer:保存暂停点

interrupt()能暂停,但要能“过一会儿再恢复”,必须把 state 保存起来。 LangGraph 用checkpointer做这件事(第 2 讲你已经见过它)。

最简单的示例是内存版:

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
app = graph.compile(checkpointer=memory)

生产建议:内存版只适合演示。生产环境通常选择:

  • Redis(低延迟,适合高并发短会话)
  • Postgres / MySQL(更强一致性与审计能力)
  • 以及你团队已有的持久化体系(关键是:能按thread_id取回状态)

6.2 thread_id:恢复哪条线程

你恢复时必须带上同一个thread_id,它相当于这条对话/任务的唯一 key。

config = {"configurable": {"thread_id": "order_001"}}

建议thread_id直接绑定业务主键:

  • 工单:ticket_123
  • 订单:order_987
  • 用户会话:user_42_session_202601

7. 综合实战:人工审批 + 恢复执行

这个例子把 HITL 的主流程跑通:

  • 第一步:生成一个“执行计划”(模拟)
  • 第二步:human_reviewinterrupt()暂停,等待人工审批/编辑
  • 第三步:用Command(resume=...)恢复执行
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver


class State(TypedDict):
    user_request: str
    plan: str
    approved_plan: str
    result: str
    review_status: str
    review_reason: str
    ticket_id: str


def make_plan(state: State) -> Command[Literal["human_review"]]:
    # 实际项目里这里可以是 LLM 生成 plan(此处用固定模板示意)
    plan = f"将执行动作:调用工具完成『{state['user_request']}』"
    return Command(update={"plan": plan}, goto="human_review")


def human_review(state: State) -> Command[Literal["execute", END]]:
    # interrupt() 必须放在最前面
    decision = interrupt(
        {
            "plan": state.get("plan", ""),
            "action": "请审核该计划:approved=True 表示同意;也可提供 edited_plan 进行修改",
        }
    )

    if decision.get("approved") isTrue:
        edited = decision.get("edited_plan", state.get("plan", ""))
        return Command(update={"approved_plan": edited}, goto="execute")

    #  拒绝:记录原因并结束(或你也可以 goto 到一个“通知用户/写审计日志”的节点)
    if decision.get("action") == "reject":
        return Command(
            update={
                "review_status": "rejected",
                "review_reason": decision.get("reason", ""),
            },
            goto=END,
        )

    #  转人工:创建工单(示意),写入 ticket_id 后结束自动流程
    if decision.get("action") == "handoff":
        ticket_id = f"TICKET_{state.get('user_request','')[:6]}_001"# 示例:真实项目替换为工单系统 API
        return Command(
            update={
                "review_status": "handoff",
                "review_reason": decision.get("reason", ""),
                "ticket_id": ticket_id,
            },
            goto=END,
        )

    # 兜底:当作拒绝处理
    return Command(update={"review_status": "rejected"}, goto=END)


def execute(state: State) -> Command[Literal[END]]:
    # 这里模拟执行:实际项目里换成工具调用/写库等操作
    result = f"已执行:{state.get('approved_plan')}"
    return Command(update={"result": result}, goto=END)


builder = StateGraph(State)
builder.add_node("make_plan", make_plan)
builder.add_node("human_review", human_review)
builder.add_node("execute", execute)

builder.add_edge(START, "make_plan")

memory = MemorySaver()
app = builder.compile(checkpointer=memory)

# 1) 第一次执行:会在 human_review 暂停
config = {"configurable": {"thread_id": "demo_thread"}}
first = app.invoke(
    {
        "user_request": "给客户发送退款确认邮件",
        "plan": "",
        "approved_plan": "",
        "result": "",
        "review_status": "",
        "review_reason": "",
        "ticket_id": "",
    },
    config,
)
print(first.get("__interrupt__"))

# 2) 恢复执行:人工审批 + 可选编辑
resume_cmd = Command(resume={"approved": True, "edited_plan": "将执行动作:发送退款确认邮件(已人工确认)"})
final = app.invoke(resume_cmd, config)
print(final.get("result"))

# 也可以这样恢复:拒绝 / 转人工(示意)
# reject_cmd = Command(resume={"action": "reject", "reason": "风险较高,需补充证据材料"})
# handoff_cmd = Command(resume={"action": "handoff", "reason": "转客服复核并人工发送邮件"})

8. 如何把中断“接到真实交互”里?

我拿到了__interrupt__,接下来怎么让人输入并回传 resume?

你可以把它理解成一个“可序列化的待办任务”:

  1. 运行图→ 返回__interrupt__(里面是你给人的 payload)
  2. 把 payload 展示给人(控制台 / Web 表单 / 管理后台)
  3. 人提交输入→ 后端构造Command(resume=...)
  4. **用同一个thread_id调用app.invoke(Command(...), config)**→ 图从暂停点继续

下面给一个“最小控制台交互”的示例骨架(便于你将来换成 Web 表单):

from langgraph.types import Command

config = {"configurable": {"thread_id": "demo_thread"}}

result = app.invoke(initial_state, config)
interrupt_payload = result.get("__interrupt__")
if interrupt_payload:
    # 1) 展示给人(真实项目里:返回给前端/写入工单系统)
    print("需要人工处理:", interrupt_payload)

    # 2) 收集输入(示例用控制台;真实项目用表单)
    approved = input("是否同意执行?(y/n): ").strip().lower() == "y"
    edited_plan = input("可选:编辑计划(留空表示不改): ").strip()

    resume = {"approved": approved}
    if edited_plan:
        resume["edited_plan"] = edited_plan

    # 3) 恢复执行(同 thread_id)
    final = app.invoke(Command(resume=resume), config)
    print("执行结果:", final)

关键要点:

  • 一定要同一个thread_id:否则系统找不到上一次的暂停点
  • __interrupt__当作“待办任务”的载体:可以写入 DB、发到审批系统、挂到工单

9. interrupt_before / interrupt_after:把“拦截点”做成系统能力

在一些场景下,你不希望每个节点都手写interrupt(),而是想:

  • 在某些节点执行前强制暂停(例如工具执行前必须审批
  • 或者在某些节点执行后暂停(例如生成草稿后人工审核

这时可以用interrupt_before / interrupt_after(不同版本参数名称可能略有差异;以你当前 LangGraph 版本文档为准)。

示意(思路型代码):

# app = builder.compile(
#     checkpointer=memory,
#     interrupt_before=["execute_tool"],  # 执行工具之前强制暂停
#     interrupt_after=["draft_response"], # 草稿生成后强制暂停
# )

工程建议:

  • 工具类节点优先用 interrupt_before 做“审批门”
  • 输出类节点优先用 interrupt_after 做“质量门”

9.1 没有 interrupt_before/after 怎么办?

不要卡住:你完全可以用“在节点里手写interrupt()”实现同样效果

  • 想要 interrupt_before:把审批interrupt()写在目标节点函数的第一行(执行动作前)
  • 想要 interrupt_after:把生成逻辑放在一个节点,审核逻辑放在下一个节点;在审核节点里interrupt()(相当于“生成后暂停”)

10. 生产实践建议(踩坑总结)

  • 节点要小:越小越容易恢复、越不容易重复做无谓工作
  • interrupt() 前不要做写操作:否则恢复会重复执行
  • state 存“原始数据”:提示词/格式化字符串按需拼(你在第 5 讲已经实践过)
  • thread_id 设计要稳定:最好能关联业务主键(订单号/工单号/用户ID)
  • 审批数据要可审计:建议把 human decision 写入 state(或写日志)

11. 小结

本讲你已经掌握了让智能体“会协作”的关键能力:

  • interrupt()实现 Pause → Persist → Resume
  • 用 checkpointer +thread_id支撑跨次恢复
  • 两类最常用模式:缺信息询问、审批门控
本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com