魔神公寓
94.55M · 2026-04-07
user_id 去重)user-feedback Score)的平均值security-risk Score)的总和search_city_info),缓存其结果。TokenTruncationMemory、SummarizationMemory 等记忆策略。2.4 节中我们已经讨论过,使用 asyncio 和 httpx 可以并行化工具调用,显著降低 I/O 密集型任务的延迟。langfuse.cache() 来为工具函数添加缓存。Supervisor Agent,使其在决策时,可以根据任务类型,选择调用 cheap_llm 或 powerful_llm。metadata={"version": "A"} 或 {"version": "B"})。许多团队在成功上线第一个 AI 应用后,会有一种“大功告成”的错觉。然而,对于一个有生命力的产品来说,上线仅仅是“Day 1”,真正的挑战来自“Day 2”以及之后的漫长岁月。
“运维”的目标 AI 应用的运维,其核心目标就是对抗这种“熵增”和“衰退”,通过持续的监控、分析和优化,确保服务始终满足预设的服务等级协议(SLA),如:
在你开始任何优化之前,你必须先能量化“好”与“坏”。一个好的仪表盘,能让你在 30 秒内掌握服务的核心健康状况。
在 Langfuse 中创建 Dashboard
登录你的 Langfuse 项目,进入 Dashboards 页面,点击 New dashboard。然后,添加以下关键图表(Widgets):
COUNT(traces) - 显示总调用量趋势。COUNT(DISTINCT users) - 显示日活/周活用户趋势。Time (Grouped by Day/Week)。AVG(latency), P95(latency) - 平均和 P95 延迟。这是衡量用户体感速度的核心指标。Time (Grouped by Hour/Day)。SUM(totalCost) - 预估的总费用。model - 可以看到不同模型(如 GPT-4 vs GPT-3.5)的成本构成。Time (Grouped by Day)。AVG(scores)score.name = "user-feedback" - 实时追踪线上用户的平均满意度。score.name = "security-risk-assessment",用于监控安全风险。拥有了这个仪表盘,你就从一个“盲人”,变成了一个能实时俯瞰整个战场的“指挥官”。
当仪表盘显示某个指标(如延迟、成本)恶化时,你需要从你的“军火库”中拿出相应的武器。
原理:对于相同的输入,不再进行昂贵的计算(LLM 调用或工具调用),而是直接返回之前已经计算过的结果。
from langfuse import Langfuse
langfuse = Langfuse(enable_caching=True)
handler = langfuse.get_langchain_handler()
# 第一次调用,会真正访问 LLM
llm.invoke("1+1=?", config={"callbacks": [handler]})
# 第二次调用完全相同的输入,会直接返回缓存的结果,耗时为毫秒级
# Langfuse 会在 Trace 中自动标记此 Generation 为 CACHE_HIT
llm.invoke("1+1=?", config={"callbacks": [handler]})
import functools
@functools.lru_cache(maxsize=128)
def search_city_info(city: str):
# ...
原理:并非所有任务都需要“法拉利”。通过在工作流的不同阶段使用不同能力的模型,可以在保证质量的前提下,大幅降低成本和延迟。
# 伪代码
def router_node(state):
task_type = classify_task(state['messages'][-1].content) # 使用一个简单的分类模型或 LLM
if task_type == "simple_qa":
# 对于简单问答,使用便宜快速的模型
return {"next_node": "cheap_agent"}
elif task_type == "complex_reasoning":
# 对于复杂推理,使用强大的模型
return {"next_node": "powerful_agent"}
我们已经在 1.8 节讨论过,这是降低成本和延迟最直接的方法之一。通过 SummarizationMemory 或 VectorMemory,确保你每次传递给 LLM 的上下文都是最相关、最精简的。
对于需要调用多个外部 API 的 Agent(如 DeepResearch),将 I/O 操作并行化,能极大地缩短端到端延迟。这要求将你的 LangGraph 节点和 Runnable 都实现为 async 版本。
让我们为“旅小智”的 Supervisor Agent 增加模型路由能力。
# agents/supervisor.py (改造版)
class Route(BaseModel):
destination: Literal[...]
# 新增字段:为下一步骤建议使用的模型
next_model: Literal["cheap", "powerful"] = Field(description="Choose 'cheap' for simple tasks and 'powerful' for complex reasoning.")
# ...
def create_supervisor_runnable():
# ...
# 在 Prompt 中,明确告知 Supervisor 它可以选择模型
prompt = ChatPromptTemplate.from_messages([
("system", """... You can choose a 'cheap' model for simple lookups (like city info)
and a 'powerful' model for complex tasks (like generating the final itinerary)."""),
# ...
])
return prompt | structured_llm
# 在主图逻辑中
def router(state: TripState):
supervisor_decision = state['supervisor_decision']
# 根据 supervisor 的建议,动态选择要调用的专家 Agent
if supervisor_decision.next_model == "cheap":
expert_runnable = cheap_experts[supervisor_decision.destination]
else:
expert_runnable = powerful_experts[supervisor_decision.destination]
# ... 调用 expert_runnable ...
当你做了一项优化,比如替换了 Prompt 或上线了模型路由功能,你怎么能确定这个改动在线上真的带来了好处,而不是负面影响?答案是A/B 测试。
流程:
thread_id 或 user_id 的哈希值。
import hashlib
def get_user_version(thread_id: str):
# 将 5% 的流量分到 'B' 版本
if int(hashlib.md5(thread_id.encode()).hexdigest(), 16) % 100 < 5:
return "B"
return "A"
version = get_user_version(thread_id)
handler = LangfuseCallbackHandler(..., metadata={"version": version})
if version == "A":
app_vA.invoke(..., config={"callbacks": [handler]})
else:
app_vB.invoke(..., config={"callbacks": [handler]})
AVG(user-feedback),Filter: version = "A"AVG(user-feedback),Filter: version = "B"AI 应用的开发,从来不是一个“开发-测试-部署”的线性瀑布流,而是一个“部署-监控-评估-优化”的持续循环。
通过本章的学习,你已经掌握了这个循环的每一个环节:
完成了这个思维模式的转变,你就真正拥有了驾驭和运营一个生产级 AI 应用所需的全部核心能力,使你的 AI 产品能像一个有生命的有机体一样,在真实世界的考验中,不断地学习、适应和进化。