为什么 90% 的 LangChain 项目无法进入生产环境?

LangChain 与 LangGraph 的组件化能力,降低了 AI Agent 的原型构建门槛。但在项目迈向生产环境时,AI Agent 工程化落地还存在一些问题。今天想在这里分享一下,作者本人在实际生产环境中 AI Agent 开发过程中所遇到的问题和几点经验。

一切都以实际需求为准,那么就以实际需求为切入点,徐徐展开。

1. 多租户 API 配置的动态切换

需求场景

在 SaaS 场景下,把 Agent 封装成 API 服务对外提供时,会出现不同用户需要使用不同 LLM 配置的情况:

  • 费用隔离 - 企业客户自带 API Key,按自己的额度计费
  • 数据安全 - 部分用户要求接入私有化部署的模型
  • A/B 测试 - 同一服务对不同请求使用不同模型对比效果

除这三个常见的以外,可能还有其他的使用场景。总之,每个请求需要能够动态指定 modelbase_urlapi_key

LangChain 原生实现的问题

先看看 LangChain 原生是怎么做的:

from langchain_openai import ChatOpenAI

# 初始化时就要绑定配置
llm = ChatOpenAI(
    model="gpt-4",
    api_key="sk-xxx",
    base_url="https://api.openai.com/v1"
)

这样写在单用户场景没问题,但放到多租户 Web 服务里就有几个问题:

  1. 配置在初始化时绑定,实例创建后无法切换ChatOpenAImodelopenai_api_baseopenai_api_key 都是构造参数,一旦实例创建完成,配置就固定了。如果要切换到另一个服务商或 API Key,就必须创建新实例
  2. 频繁创建实例:每来一个用户请求,就得 new 一个新的 ChatOpenAI 实例
  3. HTTP 连接无法复用:每个实例会独立创建底层 HTTP 连接,高并发时连接数不可控,可能导致资源耗尽

解决方案

核心思路是Agent 复用,Client 按需创建,HTTP 连接池共享

import httpx
from openai import AsyncOpenAI

# ========== 1. 全局 HTTP 连接池(单例)==========
class GlobalHTTPFactory:
    _client = None
    
    @classmethod
    async def get_client(cls):
        if cls._client is None:
            cls._client = httpx.AsyncClient(
                limits=httpx.Limits(max_connections=50),
                http2=True
            )
        return cls._client


# ========== 2. Agent 基类 ==========
class BaseAgent:
    def __init__(self, default_config=None):
        self.default_config = default_config or {}
    
    async def get_openai_client(self, runtime_config=None):
        # 合并配置:运行时配置覆盖默认配置
        config = {**self.default_config, **runtime_config} if runtime_config else self.default_config
        
        # 复用全局 HTTP 连接池
        shared_http = await GlobalHTTPFactory.get_client()
        
        return AsyncOpenAI(
            api_key=config["api_key"],
            base_url=config["base_url"],
            http_client=shared_http  # 关键:注入共享的 HTTP 客户端
        )
    
    async def run(self, text, **runtime_config):
        client = await self.get_openai_client(runtime_config)
        # 调用 LLM...


# ========== 3. Web 服务层 ==========
# 服务启动时,创建一次 Agent
agent = BaseAgent()

@app.post("/ch@t")
async def ch@t(request):
    # 每个请求传入自己的配置
    return await agent.run(
        text=request.text,
        model=request.model,
        base_url=request.base_url,
        api_key=request.api_key
    )

整个设计的要点:

  1. Agent 只创建一次,服务启动时初始化,全局复用
  2. OpenAI 客户端按需创建,每个请求根据传入的配置生成
  3. HTTP 连接池全局共享,通过 http_client 参数注入,避免连接数爆炸,极大降低运行所需内存

2. 模型思考过程的流式透传

需求场景

DeepSeek-R1、GLM-4 等推理模型在生成最终答案之前,会先进行一段"思考"。这个思考过程对用户来说是有价值的——它能让用户看到模型是如何分析问题的,增强可信度。

把 Agent 封装成 API 服务时,需要把这个思考过程实时流式地透传给前端:

  • 实时展示:用户能看到模型"正在思考",而不是干等
  • 区分内容:前端需要区分"思考内容"和"最终回答",分别展示
  • 多模型兼容:不同模型厂商的思考参数格式不同(DeepSeek、GLM、Qwen 各有各的写法)

问题在哪

OpenAI SDK 返回的流式 chunk 结构是这样的:

# 普通内容在 delta.content
# 思考内容在 delta.reasoning_content(这是 DeepSeek 扩展的字段)
chunk.choices[0].delta.content           # 最终回答
chunk.choices[0].delta.reasoning_content  # 思考过程

问题是:

  1. reasoning_content 不是标准字段:LangChain 原生不认识这个字段,会直接丢弃
  2. 不同模型参数不同:DeepSeek 用 reasoning_content,GLM 用 thinking.type,Qwen 用 enable_thinking
  3. 需要手动解析 chunk:拿到 chunk 后,要自己判断是思考内容还是正式内容

解决方案

核心问题是:LangChain 的 AIMessageChunk 只保留 content,会把 reasoning_content 丢掉。

解决思路是自定义消息类型 + 自定义 ChatModel,把 OpenAI 原始响应完整保留下来:

from langchain_core.messages import BaseMessageChunk
from langchain_core.language_models.ch@t_models import BaseChatModel

# ========== 1. 自定义消息类型,保留完整的 OpenAI 原始响应 ==========
class ChatMessageChunk(BaseMessageChunk):
    """关键:把 OpenAI 的原始 chunk 完整保留下来"""
    ch@t_completion_chunk: Optional[ChatCompletionChunk] = None


# ========== 2. 自定义 ChatModel,流式时用自定义消息类型包装 ==========
class LLMClientChatModel(BaseChatModel):
    
    async def _astream(self, messages, **kwargs):
        async for chunk in self.client.astream(messages):
            # 用自定义消息包装,不让 LangChain 丢掉 reasoning_content
            message = ChatMessageChunk(content="", ch@t_completion_chunk=chunk)
            yield ChatGenerationChunk(message=message)


# ========== 3. Agent 中解析 chunk,区分思考/内容 ==========
class MyAgent:
    async def run_stream(self, text, **kwargs):
        async for event in self.graph.astream_events(inputs, config):
            if event["event"] == "on_ch@t_model_stream":
                chunk = event["data"]["chunk"]
                # 因为保留了完整 chunk,这里能拿到 reasoning_content
                delta = chunk.message.ch@t_completion_chunk.choices[0].delta
                
                if delta.reasoning_content:
                    yield StreamChunk(type="thinking", content=delta.reasoning_content)
                elif delta.content:
                    yield StreamChunk(type="content", content=delta.content)


# ========== 4. 不同模型的思考参数统一管理 ==========
class ThinkingConfig:
    def __init__(self):
        self.model_params = {
            "glm": {"thinking": {"type": "enabled"}},
            "deepseek": {},  # DeepSeek 默认支持
            "qwen": {"enable_thinking": True}
        }
    
    def get_params(self, model_name, enable):
        model_type = self._detect_model_type(model_name)
        return self.model_params.get(model_type, {}) if enable else {}

整个设计的要点:

  1. 自定义消息类型ChatMessageChunk 保留完整的 ChatCompletionChunk,不让 LangChain 丢弃扩展字段
  2. 自定义 ChatModelLLMClientChatModel 在流式输出时用自定义消息包装
  3. Agent 层解析:从保留的完整 chunk 中取出 reasoning_content,区分类型输出
  4. 统一配置管理ThinkingConfig 封装不同模型的参数差异

3. 中间执行路径的可观测性

需求场景

以一个智能研报生成 Agent 为例,它的执行流程比较复杂:

输入解析 → 多源数据采集(并行)→ 数据交叉验证 → 深度分析 → 观点提炼 → 报告生成 → 合规审查 → 输出
              ↓                      ↓
         [财报API]              [验证失败则重试]
         [新闻API]
         [行情API]

用户调用 API 生成一份研报时,可能需要等待 30 秒以上。如果这期间前端只显示一个转圈动画,用户体验会很差。我们需要把执行进度实时透传出来:

  • 实时反馈:用户能看到「正在采集财报数据 (2/3)」「正在进行深度分析」,而不是干等
  • 并行任务状态:多个数据源并行采集时,分别展示各自的进度
  • 条件分支可见:数据验证失败触发重试时,用户能知道发生了什么
  • 调试定位:出问题时能快速定位是哪个节点、哪个数据源出错

问题在哪

LangGraph 的 ainvoke() 方法只返回最终结果,中间过程完全是黑盒:

# 只能拿到最终结果,中间 30 秒发生了什么完全不知道
result = await graph.ainvoke(inputs)

虽然 LangGraph 提供了 astream_events() 方法,但它的事件类型很多,需要自己过滤和解析:

# astream_events 会抛出各种事件:on_chain_start, on_chain_end, on_ch@t_model_stream...
# 需要自己判断哪些是节点事件,哪些是 LLM 事件,哪些是子图事件
async for event in graph.astream_events(inputs):
    # 怎么过滤?怎么区分主图和子图?怎么处理并行节点?
    pass

解决方案

核心是 on_chain_start / on_chain_end 事件,结合节点元数据,输出结构化的进度信息:

from pydantic import BaseModel, Field
from typing import Optional, Literal
from enum import Enum

# ========== 1. 统一的流式输出格式 ==========
class ChunkType(str, Enum):
    PROCESSING = "processing"   # 执行进度
    THINKING = "thinking"       # 思考过程
    CONTENT = "content"         # 正式内容
    FINAL = "final"             # 最终结果
    ERROR = "error"             # 错误信息

class StreamChunk(BaseModel):
    """流式输出块,前端按 type 分别处理"""
    type: ChunkType
    content: str
    metadata: Optional[dict] = None


# ========== 2. 节点进度配置(可配置化)==========
NODE_PROGRESS_MAP = {
    # 节点名 -> (进度描述, 预估耗时秒, 所属阶段)
    "parse_input": ("解析用户输入", 1, "准备阶段"),
    "fetch_financial_data": ("采集财报数据", 5, "数据采集"),
    "fetch_news_data": ("采集新闻资讯", 3, "数据采集"),
    "fetch_market_data": ("采集行情数据", 2, "数据采集"),
    "validate_data": ("交叉验证数据", 3, "数据处理"),
    "deep_analysis": ("深度分析", 10, "智能分析"),
    "extract_insights": ("提炼核心观点", 5, "智能分析"),
    "generate_report": ("生成研报内容", 8, "报告生成"),
    "compliance_check": ("合规性审查", 3, "质量保障"),
}


# ========== 3. Agent 中节点事件 ==========
class ResearchReportAgent:
    def __init__(self):
        self.parallel_tasks = {}  # 跟踪并行任务状态
        self.retry_count = {}     # 跟踪重试次数
    
    async def run_stream(self, query: str, **kwargs):
        async for event in self.graph.astream_events(inputs, config):
            event_type = event.get("event", "")
            node_name = event.get("name", "")
            
            # ===== 节点开始事件 =====
            if event_type == "on_chain_start":
                if node_name in NODE_PROGRESS_MAP:
                    desc, est_time, stage = NODE_PROGRESS_MAP[node_name]
                    
                    # 处理重试场景
                    retry = self.retry_count.get(node_name, 0)
                    retry_hint = f"(第 {retry + 1} 次尝试)" if retry > 0 else ""
                    
                    yield StreamChunk(
                        type=ChunkType.PROCESSING,
                        content=f"[{stage}] {desc}{retry_hint}...",
                        metadata={
                            "node": node_name,
                            "stage": stage,
                            "estimated_seconds": est_time,
                            "retry_count": retry
                        }
                    )
                    
                # 处理并行数据采集节点
                elif node_name == "parallel_data_fetch":
                    self.parallel_tasks = {"financial": "pending", "news": "pending", "market": "pending"}
                    yield StreamChunk(
                        type=ChunkType.PROCESSING,
                        content="[数据采集] 正在并行采集多源数据...",
                        metadata={"parallel_status": self.parallel_tasks}
                    )
            
            # ===== 节点结束事件 =====
            elif event_type == "on_chain_end":
                # 更新并行任务状态
                if node_name == "fetch_financial_data":
                    self.parallel_tasks["financial"] = "completed"
                    yield StreamChunk(
                        type=ChunkType.PROCESSING,
                        content="[数据采集] 财报数据采集完成 ",
                        metadata={"parallel_status": self.parallel_tasks.copy()}
                    )
                
                # 处理验证失败触发重试的场景
                elif node_name == "validate_data":
                    output = event.get("data", {}).get("output", {})
                    if not output.get("is_valid", True):
                        failed_sources = output.get("failed_sources", [])
                        for src in failed_sources:
                            self.retry_count[f"fetch_{src}_data"] = self.retry_count.get(f"fetch_{src}_data", 0) + 1
                        yield StreamChunk(
                            type=ChunkType.PROCESSING,
                            content=f"[数据处理] 验证未通过,{failed_sources} 将重新采集...",
                            metadata={"retry_sources": failed_sources}
                        )
                
                # 图执行完毕,输出最终结果
                elif node_name == "LangGraph":
                    final_output = event["data"]["output"]
                    yield StreamChunk(
                        type=ChunkType.FINAL,
                        content="",
                        metadata={"report": final_output}
                    )
            
            # ===== LLM 流式输出事件 =====
            elif event_type == "on_ch@t_model_stream":
                chunk = event["data"]["chunk"]
                delta = chunk.message.ch@t_completion_chunk.choices[0].delta
                
                if delta.reasoning_content:
                    yield StreamChunk(type=ChunkType.THINKING, content=delta.reasoning_content)
                elif delta.content:
                    yield StreamChunk(type=ChunkType.CONTENT, content=delta.content)

前端拿到的流式输出会是这样:

{"type": "processing", "content": "[准备阶段] 解析用户输入...", "metadata": {"node": "parse_input", "stage": "准备阶段", "estimated_seconds": 1}}
{"type": "processing", "content": "[数据采集] 正在并行采集多源数据...", "metadata": {"parallel_status": {"financial": "pending", "news": "pending", "market": "pending"}}}
{"type": "processing", "content": "[数据采集] 采集新闻资讯...", "metadata": {"node": "fetch_news_data", "stage": "数据采集"}}
{"type": "processing", "content": "[数据采集] 采集财报数据...", "metadata": {"node": "fetch_financial_data", "stage": "数据采集"}}
{"type": "processing", "content": "[数据采集] 新闻数据采集完成 ", "metadata": {"parallel_status": {"financial": "pending", "news": "completed", "market": "pending"}}}
{"type": "processing", "content": "[数据采集] 财报数据采集完成 ", "metadata": {"parallel_status": {"financial": "completed", "news": "completed", "market": "pending"}}}
{"type": "processing", "content": "[数据采集] 行情数据采集完成 ", "metadata": {"parallel_status": {"financial": "completed", "news": "completed", "market": "completed"}}}
{"type": "processing", "content": "[数据处理] 交叉验证数据...", "metadata": {"node": "validate_data"}}
{"type": "processing", "content": "[数据处理] 验证未通过,['market'] 将重新采集...", "metadata": {"retry_sources": ["market"]}}
{"type": "processing", "content": "[数据采集] 采集行情数据(第 2 次尝试)...", "metadata": {"node": "fetch_market_data", "retry_count": 1}}
{"type": "processing", "content": "[智能分析] 深度分析...", "metadata": {"node": "deep_analysis", "estimated_seconds": 10}}
{"type": "thinking", "content": "我需要从多个维度分析这家公司..."}
{"type": "thinking", "content": "首先看财务数据,营收同比增长..."}
{"type": "content", "content": "## 一、公司概况nn"}
{"type": "content", "content": "该公司是国内领先的..."}
{"type": "processing", "content": "[智能分析] 提炼核心观点...", "metadata": {"node": "extract_insights"}}
{"type": "processing", "content": "[报告生成] 生成研报内容...", "metadata": {"node": "generate_report"}}
{"type": "processing", "content": "[质量保障] 合规性审查...", "metadata": {"node": "compliance_check"}}
{"type": "final", "content": "", "metadata": {"report": {"title": "XX公司深度研究报告", "sections": [...]}}}

整个设计的要点:

  1. 统一输出格式StreamChunk 定义 type 字段,前端按类型分别处理
  2. 节点配置可扩展NODE_PROGRESS_MAP 集中管理节点描述,新增节点只需加一行配置
  3. 并行任务可追踪:通过 parallel_status 字段,前端可以渲染多任务进度条
  4. 重试过程透明:验证失败、重新采集等异常流程对用户可见,增强可信度
  5. 预估时间可用estimated_seconds 可用于前端渲染预估进度条

总结

本文分享了在开发 open-pilot-agent 过程中遇到的三个生产级挑战,以及对应的解决方案:

  1. 多租户 API 配置的动态切换:通过 Agent 复用 + Client 按需创建 + HTTP 连接池共享,解决配置动态切换和资源管理问题
  2. 模型思考过程的流式透传:通过自定义消息类型和 ChatModel,保留 OpenAI 原始响应中的 reasoning_content 字段
  3. 中间执行路径的可观测性:通过 LangGraph 的 astream_events,将节点执行进度实时透传给前端

后续作者将会继续分享生产级 Agent 的实际工程经验。


本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com