抖音火山极速版官方正版
308.71MB · 2026-02-05
LangChain 与 LangGraph 的组件化能力,降低了 AI Agent 的原型构建门槛。但在项目迈向生产环境时,AI Agent 工程化落地还存在一些问题。今天想在这里分享一下,作者本人在实际生产环境中 AI Agent 开发过程中所遇到的问题和几点经验。
一切都以实际需求为准,那么就以实际需求为切入点,徐徐展开。
在 SaaS 场景下,把 Agent 封装成 API 服务对外提供时,会出现不同用户需要使用不同 LLM 配置的情况:
除这三个常见的以外,可能还有其他的使用场景。总之,每个请求需要能够动态指定 model、base_url 和 api_key。
先看看 LangChain 原生是怎么做的:
from langchain_openai import ChatOpenAI
# 初始化时就要绑定配置
llm = ChatOpenAI(
model="gpt-4",
api_key="sk-xxx",
base_url="https://api.openai.com/v1"
)
这样写在单用户场景没问题,但放到多租户 Web 服务里就有几个问题:
ChatOpenAI 的 model、openai_api_base、openai_api_key 都是构造参数,一旦实例创建完成,配置就固定了。如果要切换到另一个服务商或 API Key,就必须创建新实例new 一个新的 ChatOpenAI 实例核心思路是Agent 复用,Client 按需创建,HTTP 连接池共享:
import httpx
from openai import AsyncOpenAI
# ========== 1. 全局 HTTP 连接池(单例)==========
class GlobalHTTPFactory:
_client = None
@classmethod
async def get_client(cls):
if cls._client is None:
cls._client = httpx.AsyncClient(
limits=httpx.Limits(max_connections=50),
http2=True
)
return cls._client
# ========== 2. Agent 基类 ==========
class BaseAgent:
def __init__(self, default_config=None):
self.default_config = default_config or {}
async def get_openai_client(self, runtime_config=None):
# 合并配置:运行时配置覆盖默认配置
config = {**self.default_config, **runtime_config} if runtime_config else self.default_config
# 复用全局 HTTP 连接池
shared_http = await GlobalHTTPFactory.get_client()
return AsyncOpenAI(
api_key=config["api_key"],
base_url=config["base_url"],
http_client=shared_http # 关键:注入共享的 HTTP 客户端
)
async def run(self, text, **runtime_config):
client = await self.get_openai_client(runtime_config)
# 调用 LLM...
# ========== 3. Web 服务层 ==========
# 服务启动时,创建一次 Agent
agent = BaseAgent()
@app.post("/ch@t")
async def ch@t(request):
# 每个请求传入自己的配置
return await agent.run(
text=request.text,
model=request.model,
base_url=request.base_url,
api_key=request.api_key
)
整个设计的要点:
http_client 参数注入,避免连接数爆炸,极大降低运行所需内存DeepSeek-R1、GLM-4 等推理模型在生成最终答案之前,会先进行一段"思考"。这个思考过程对用户来说是有价值的——它能让用户看到模型是如何分析问题的,增强可信度。
把 Agent 封装成 API 服务时,需要把这个思考过程实时流式地透传给前端:
OpenAI SDK 返回的流式 chunk 结构是这样的:
# 普通内容在 delta.content
# 思考内容在 delta.reasoning_content(这是 DeepSeek 扩展的字段)
chunk.choices[0].delta.content # 最终回答
chunk.choices[0].delta.reasoning_content # 思考过程
问题是:
reasoning_content 不是标准字段:LangChain 原生不认识这个字段,会直接丢弃reasoning_content,GLM 用 thinking.type,Qwen 用 enable_thinking核心问题是:LangChain 的 AIMessageChunk 只保留 content,会把 reasoning_content 丢掉。
解决思路是自定义消息类型 + 自定义 ChatModel,把 OpenAI 原始响应完整保留下来:
from langchain_core.messages import BaseMessageChunk
from langchain_core.language_models.ch@t_models import BaseChatModel
# ========== 1. 自定义消息类型,保留完整的 OpenAI 原始响应 ==========
class ChatMessageChunk(BaseMessageChunk):
"""关键:把 OpenAI 的原始 chunk 完整保留下来"""
ch@t_completion_chunk: Optional[ChatCompletionChunk] = None
# ========== 2. 自定义 ChatModel,流式时用自定义消息类型包装 ==========
class LLMClientChatModel(BaseChatModel):
async def _astream(self, messages, **kwargs):
async for chunk in self.client.astream(messages):
# 用自定义消息包装,不让 LangChain 丢掉 reasoning_content
message = ChatMessageChunk(content="", ch@t_completion_chunk=chunk)
yield ChatGenerationChunk(message=message)
# ========== 3. Agent 中解析 chunk,区分思考/内容 ==========
class MyAgent:
async def run_stream(self, text, **kwargs):
async for event in self.graph.astream_events(inputs, config):
if event["event"] == "on_ch@t_model_stream":
chunk = event["data"]["chunk"]
# 因为保留了完整 chunk,这里能拿到 reasoning_content
delta = chunk.message.ch@t_completion_chunk.choices[0].delta
if delta.reasoning_content:
yield StreamChunk(type="thinking", content=delta.reasoning_content)
elif delta.content:
yield StreamChunk(type="content", content=delta.content)
# ========== 4. 不同模型的思考参数统一管理 ==========
class ThinkingConfig:
def __init__(self):
self.model_params = {
"glm": {"thinking": {"type": "enabled"}},
"deepseek": {}, # DeepSeek 默认支持
"qwen": {"enable_thinking": True}
}
def get_params(self, model_name, enable):
model_type = self._detect_model_type(model_name)
return self.model_params.get(model_type, {}) if enable else {}
整个设计的要点:
ChatMessageChunk 保留完整的 ChatCompletionChunk,不让 LangChain 丢弃扩展字段LLMClientChatModel 在流式输出时用自定义消息包装reasoning_content,区分类型输出ThinkingConfig 封装不同模型的参数差异以一个智能研报生成 Agent 为例,它的执行流程比较复杂:
输入解析 → 多源数据采集(并行)→ 数据交叉验证 → 深度分析 → 观点提炼 → 报告生成 → 合规审查 → 输出
↓ ↓
[财报API] [验证失败则重试]
[新闻API]
[行情API]
用户调用 API 生成一份研报时,可能需要等待 30 秒以上。如果这期间前端只显示一个转圈动画,用户体验会很差。我们需要把执行进度实时透传出来:
LangGraph 的 ainvoke() 方法只返回最终结果,中间过程完全是黑盒:
# 只能拿到最终结果,中间 30 秒发生了什么完全不知道
result = await graph.ainvoke(inputs)
虽然 LangGraph 提供了 astream_events() 方法,但它的事件类型很多,需要自己过滤和解析:
# astream_events 会抛出各种事件:on_chain_start, on_chain_end, on_ch@t_model_stream...
# 需要自己判断哪些是节点事件,哪些是 LLM 事件,哪些是子图事件
async for event in graph.astream_events(inputs):
# 怎么过滤?怎么区分主图和子图?怎么处理并行节点?
pass
核心是 on_chain_start / on_chain_end 事件,结合节点元数据,输出结构化的进度信息:
from pydantic import BaseModel, Field
from typing import Optional, Literal
from enum import Enum
# ========== 1. 统一的流式输出格式 ==========
class ChunkType(str, Enum):
PROCESSING = "processing" # 执行进度
THINKING = "thinking" # 思考过程
CONTENT = "content" # 正式内容
FINAL = "final" # 最终结果
ERROR = "error" # 错误信息
class StreamChunk(BaseModel):
"""流式输出块,前端按 type 分别处理"""
type: ChunkType
content: str
metadata: Optional[dict] = None
# ========== 2. 节点进度配置(可配置化)==========
NODE_PROGRESS_MAP = {
# 节点名 -> (进度描述, 预估耗时秒, 所属阶段)
"parse_input": ("解析用户输入", 1, "准备阶段"),
"fetch_financial_data": ("采集财报数据", 5, "数据采集"),
"fetch_news_data": ("采集新闻资讯", 3, "数据采集"),
"fetch_market_data": ("采集行情数据", 2, "数据采集"),
"validate_data": ("交叉验证数据", 3, "数据处理"),
"deep_analysis": ("深度分析", 10, "智能分析"),
"extract_insights": ("提炼核心观点", 5, "智能分析"),
"generate_report": ("生成研报内容", 8, "报告生成"),
"compliance_check": ("合规性审查", 3, "质量保障"),
}
# ========== 3. Agent 中节点事件 ==========
class ResearchReportAgent:
def __init__(self):
self.parallel_tasks = {} # 跟踪并行任务状态
self.retry_count = {} # 跟踪重试次数
async def run_stream(self, query: str, **kwargs):
async for event in self.graph.astream_events(inputs, config):
event_type = event.get("event", "")
node_name = event.get("name", "")
# ===== 节点开始事件 =====
if event_type == "on_chain_start":
if node_name in NODE_PROGRESS_MAP:
desc, est_time, stage = NODE_PROGRESS_MAP[node_name]
# 处理重试场景
retry = self.retry_count.get(node_name, 0)
retry_hint = f"(第 {retry + 1} 次尝试)" if retry > 0 else ""
yield StreamChunk(
type=ChunkType.PROCESSING,
content=f"[{stage}] {desc}{retry_hint}...",
metadata={
"node": node_name,
"stage": stage,
"estimated_seconds": est_time,
"retry_count": retry
}
)
# 处理并行数据采集节点
elif node_name == "parallel_data_fetch":
self.parallel_tasks = {"financial": "pending", "news": "pending", "market": "pending"}
yield StreamChunk(
type=ChunkType.PROCESSING,
content="[数据采集] 正在并行采集多源数据...",
metadata={"parallel_status": self.parallel_tasks}
)
# ===== 节点结束事件 =====
elif event_type == "on_chain_end":
# 更新并行任务状态
if node_name == "fetch_financial_data":
self.parallel_tasks["financial"] = "completed"
yield StreamChunk(
type=ChunkType.PROCESSING,
content="[数据采集] 财报数据采集完成 ",
metadata={"parallel_status": self.parallel_tasks.copy()}
)
# 处理验证失败触发重试的场景
elif node_name == "validate_data":
output = event.get("data", {}).get("output", {})
if not output.get("is_valid", True):
failed_sources = output.get("failed_sources", [])
for src in failed_sources:
self.retry_count[f"fetch_{src}_data"] = self.retry_count.get(f"fetch_{src}_data", 0) + 1
yield StreamChunk(
type=ChunkType.PROCESSING,
content=f"[数据处理] 验证未通过,{failed_sources} 将重新采集...",
metadata={"retry_sources": failed_sources}
)
# 图执行完毕,输出最终结果
elif node_name == "LangGraph":
final_output = event["data"]["output"]
yield StreamChunk(
type=ChunkType.FINAL,
content="",
metadata={"report": final_output}
)
# ===== LLM 流式输出事件 =====
elif event_type == "on_ch@t_model_stream":
chunk = event["data"]["chunk"]
delta = chunk.message.ch@t_completion_chunk.choices[0].delta
if delta.reasoning_content:
yield StreamChunk(type=ChunkType.THINKING, content=delta.reasoning_content)
elif delta.content:
yield StreamChunk(type=ChunkType.CONTENT, content=delta.content)
前端拿到的流式输出会是这样:
{"type": "processing", "content": "[准备阶段] 解析用户输入...", "metadata": {"node": "parse_input", "stage": "准备阶段", "estimated_seconds": 1}}
{"type": "processing", "content": "[数据采集] 正在并行采集多源数据...", "metadata": {"parallel_status": {"financial": "pending", "news": "pending", "market": "pending"}}}
{"type": "processing", "content": "[数据采集] 采集新闻资讯...", "metadata": {"node": "fetch_news_data", "stage": "数据采集"}}
{"type": "processing", "content": "[数据采集] 采集财报数据...", "metadata": {"node": "fetch_financial_data", "stage": "数据采集"}}
{"type": "processing", "content": "[数据采集] 新闻数据采集完成 ", "metadata": {"parallel_status": {"financial": "pending", "news": "completed", "market": "pending"}}}
{"type": "processing", "content": "[数据采集] 财报数据采集完成 ", "metadata": {"parallel_status": {"financial": "completed", "news": "completed", "market": "pending"}}}
{"type": "processing", "content": "[数据采集] 行情数据采集完成 ", "metadata": {"parallel_status": {"financial": "completed", "news": "completed", "market": "completed"}}}
{"type": "processing", "content": "[数据处理] 交叉验证数据...", "metadata": {"node": "validate_data"}}
{"type": "processing", "content": "[数据处理] 验证未通过,['market'] 将重新采集...", "metadata": {"retry_sources": ["market"]}}
{"type": "processing", "content": "[数据采集] 采集行情数据(第 2 次尝试)...", "metadata": {"node": "fetch_market_data", "retry_count": 1}}
{"type": "processing", "content": "[智能分析] 深度分析...", "metadata": {"node": "deep_analysis", "estimated_seconds": 10}}
{"type": "thinking", "content": "我需要从多个维度分析这家公司..."}
{"type": "thinking", "content": "首先看财务数据,营收同比增长..."}
{"type": "content", "content": "## 一、公司概况nn"}
{"type": "content", "content": "该公司是国内领先的..."}
{"type": "processing", "content": "[智能分析] 提炼核心观点...", "metadata": {"node": "extract_insights"}}
{"type": "processing", "content": "[报告生成] 生成研报内容...", "metadata": {"node": "generate_report"}}
{"type": "processing", "content": "[质量保障] 合规性审查...", "metadata": {"node": "compliance_check"}}
{"type": "final", "content": "", "metadata": {"report": {"title": "XX公司深度研究报告", "sections": [...]}}}
整个设计的要点:
StreamChunk 定义 type 字段,前端按类型分别处理NODE_PROGRESS_MAP 集中管理节点描述,新增节点只需加一行配置parallel_status 字段,前端可以渲染多任务进度条estimated_seconds 可用于前端渲染预估进度条本文分享了在开发 open-pilot-agent 过程中遇到的三个生产级挑战,以及对应的解决方案:
reasoning_content 字段astream_events,将节点执行进度实时透传给前端后续作者将会继续分享生产级 Agent 的实际工程经验。
308.71MB · 2026-02-05
308.71MB · 2026-02-05
339.21MB · 2026-02-05