枪战英雄
99.99M · 2026-03-28
传统的实现方式通常是这样的:
def process_query(query):
# 步骤1:检索相关表
tables = retrieve_tables(query)
# 步骤2:生成 SQL
sql = generate_sql(query, tables)
# 步骤3:执行查询
results = execute_sql(sql)
# 步骤4:生成回复
response = generate_response(query, results)
return response
LlamaIndex 工作流框架正是为了解决这些问题而设计的:
工作流框架不仅让代码更加清晰,还让复杂 AI 应用的构建变得优雅而高效。
在本文中,我们将从基础概念开始,逐步深入,最终通过一个完整的 Text-to-SQL 示例,展示如何使用工作流框架构建生产级的 AI 应用。
学习路线图:
LlamaIndex 的工作流是事件(event)驱动的:
step组成step处理特定的事件step也会产生新的事件(交由后继的step进行处理)StopEvent整个工作流结束LlamaIndex 工作流框架包含以下核心组件:
1. Event(事件)
StartEvent是工作流的起始事件,通常包含用户输入StopEvent是工作流的结束事件,包含最终结果2. Step(步骤) ️
@step装饰器标记的方法3. Context(上下文)
4. Workflow(工作流类) ️
Workflow基类@step方法,定义完整的工作流程相比传统的顺序执行代码,工作流框架具有以下优势:
1. 解耦与模块化
2. 灵活的执行流程
3. 可观测性
4. 易于扩展
下面是一个最简单的工作流示例,帮助理解基本结构:
from llama_index.core.workflow import (
Workflow,
StartEvent,
StopEvent,
step,
Context,
)
class SimpleWorkflow(Workflow):
"""一个简单的工作流示例"""
@step # 关键:使用 @step 装饰器
def process(self, ctx: Context, ev: StartEvent) -> StopEvent:
"""处理输入并返回结果"""
# 从 StartEvent 中获取输入
query = ev.query
# 执行处理逻辑
result = f"已处理查询: {query}"
# 返回 StopEvent,工作流结束
return StopEvent(result=result)
# 使用工作流
import asyncio
asyncdef main():
workflow = SimpleWorkflow()
response = await workflow.run(query="Hello World")
print(response.result) # 输出: 已处理查询: Hello World
# 运行
asyncio.run(main())
这个示例展示了工作流的基本结构:
1. 定义工作流类
Workflow基类@step方法2. 使用装饰器
@step装饰器标记处理方法3. 事件流转
StartEvent(起始事件)StopEvent(结束事件)4. 运行工作流
workflow.run()启动工作流query)response.result)需求:构建一个带输入验证的工作流,根据验证结果选择不同的处理路径。
from llama_index.core.workflow import (
Workflow,
StartEvent,
StopEvent,
step,
Context,
Event,
)
# 定义自定义事件
class ValidationEvent(Event):
"""验证事件"""
query: str
is_valid: bool
class ProcessedEvent(Event):
"""处理完成事件"""
query: str
result: str
class ConditionalWorkflow(Workflow):
"""展示条件分支的工作流"""
@step
def validate(self, ctx: Context, ev: StartEvent) -> ValidationEvent:
"""验证输入"""
query = ev.query
# 简单的验证逻辑:检查查询是否为空
is_valid = len(query.strip()) > 0
return ValidationEvent(query=query, is_valid=is_valid)
@step
def process_valid(self, ctx: Context, ev: ValidationEvent) -> ProcessedEvent:
"""处理有效查询"""
ifnot ev.is_valid:
# 如果验证失败,返回一个默认结果
return ProcessedEvent(query=ev.query, result="查询无效")
# 处理有效查询
result = f"已处理查询: {ev.query}"
return ProcessedEvent(query=ev.query, result=result)
@step
def finalize(self, ctx: Context, ev: ProcessedEvent) -> StopEvent:
"""最终处理"""
return StopEvent(result=ev.result)
# 使用工作流
asyncdef main():
workflow = ConditionalWorkflow()
# 有效查询
response1 = await workflow.run(query="Hello World")
print(response1.result) # 输出: 已处理查询: Hello World
# 无效查询
response2 = await workflow.run(query="")
print(response2.result) # 输出: 查询无效
asyncio.run(main())
这个示例展示了工作流的进阶用法:
1. 定义自定义事件
ValidationEvent:验证事件,包含验证结果ProcessedEvent:处理完成事件,包含处理结果2. 多步骤工作流
validate:验证输入process_valid:根据验证结果处理finalize:最终处理并返回结果3. 条件分支逻辑
process_valid中根据is_valid选择不同的处理路径接下来,我们将从场景分析开始,逐步构建一个完整的自然语言查询数据库系统。
业务场景:构建一个自然语言查询数据库的系统,用户可以用自然语言提问,系统自动生成 SQL 并返回结果。
工作流设计:
流程说明:
前置依赖:
# 安装必要的包
pip install llama-index
pip install llama-index-llms-dashscope
pip install llama-index-embeddings-dashscope
pip install pandas sqlalchemy python-dotenv
环境配置:
创建.env文件,配置 API Key:
DASHSCOPE_API_KEY=your_api_key_here
数据准备步骤:
import pandas as pd
from pathlib import Path
# 加载 CSV 文件
data_dir = Path("./WikiTableQuestions/csv/200-csv")
csv_files = sorted([f for f in data_dir.glob("*.csv")])
dfs = [pd.read_csv(f) for f in csv_files]
from llama_index.core.prompts import ChatPromptTemplate
from llama_index.core.bridge.pydantic import BaseModel, Field
from llama_index.core.llms import ChatMessage
import json
class TableInfo(BaseModel):
"""表信息"""
table_name: str = Field(..., description="表名(使用下划线,无空格)")
table_summary: str = Field(..., description="表的简短描述")
# 定义提示模板
prompt_str = """
Give me a summary of the table with the following JSON format.
Table: {table_str}
Summary: """
prompt_tmpl = ChatPromptTemplate(
message_templates=[ChatMessage.from_str(prompt_str, role="user")]
)
# 为每个表生成描述(这里简化处理,实际需要调用 LLM)
table_infos = []
# ... 生成 table_infos 的代码(详见完整示例)
from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer
import re
def sanitize_column_name(col_name):
"""清理列名"""
return re.sub(r"W+", "_", col_name)
def create_table_from_dataframe(df, table_name, engine, metadata_obj):
"""从 DataFrame 创建数据库表"""
# 清理列名
sanitized_columns = {col: sanitize_column_name(col) for col in df.columns}
df = df.rename(columns=sanitized_columns)
# 创建列
columns = [
Column(col, String if dtype == "object"else Integer)
for col, dtype in zip(df.columns, df.dtypes)
]
# 创建表
table = Table(table_name, metadata_obj, *columns)
metadata_obj.create_all(engine)
# 插入数据(生产环境建议使用批量插入)
with engine.connect() as conn:
for _, row in df.iterrows():
conn.execute(table.insert().values(**row.to_dict()))
conn.commit()
# 创建数据库
engine = create_engine("sqlite:///wiki_table_questions.db")
metadata_obj = MetaData()
# ... 为每个表创建数据库表
工具清单:
import os
from llama_index.core import Settings
from llama_index.llms.dashscope import DashScope, DashScopeGenerationModels
from llama_index.embeddings.dashscope import DashScopeEmbedding, DashScopeTextEmbeddingModels
from llama_index.core.objects import (
SQLTableNodeMapping,
ObjectIndex,
SQLTableSchema,
)
from llama_index.core import SQLDatabase, VectorStoreIndex
from dotenv import load_dotenv
load_dotenv()
# 设置全局模型
Settings.llm = DashScope(
model_name=DashScopeGenerationModels.QWEN_MAX,
api_key=os.getenv("DASHSCOPE_API_KEY")
)
Settings.embed_model = DashScopeEmbedding(
model_name=DashScopeTextEmbeddingModels.TEXT_EMBEDDING_V1
)
# 连接数据库
sql_database = SQLDatabase(engine)
# 创建表节点映射
table_node_mapping = SQLTableNodeMapping(sql_database)
# 构建表结构对象列表
table_schema_objs = [
SQLTableSchema(table_name=t.table_name, context_str=t.table_summary)
for t in table_infos
]
# 创建 ObjectIndex 并构建检索器
obj_index = ObjectIndex.from_objects(
table_schema_objs,
table_node_mapping,
VectorStoreIndex,
)
obj_retriever = obj_index.as_retriever(similarity_top_k=3)
from llama_index.core.retrievers import SQLRetriever
from typing import List
sql_retriever = SQLRetriever(sql_database)
def get_table_context_str(table_schema_objs: List[SQLTableSchema]):
"""获取表上下文字符串"""
context_strs = []
for table_schema_obj in table_schema_objs:
table_info = sql_database.get_single_table_info(
table_schema_obj.table_name
)
if table_schema_obj.context_str:
table_info += f" The table description is: {table_schema_obj.context_str}"
context_strs.append(table_info)
return"nn".join(context_strs)
from llama_index.core.prompts.default_prompts import DEFAULT_TEXT_TO_SQL_PROMPT
from llama_index.core import PromptTemplate
from llama_index.core.llms import ChatResponse
def parse_response_to_sql(chat_response: ChatResponse) -> str:
"""从响应中解析 SQL"""
response = chat_response.message.content
sql_query_start = response.find("SQLQuery:")
if sql_query_start != -1:
response = response[sql_query_start:]
if response.startswith("SQLQuery:"):
response = response[len("SQLQuery:") :]
sql_result_start = response.find("SQLResult:")
if sql_result_start != -1:
response = response[:sql_result_start]
return response.strip().strip("```").strip()
text2sql_prompt = DEFAULT_TEXT_TO_SQL_PROMPT.partial_format(
dialect=engine.dialect.name
)
response_synthesis_prompt_str = (
"Given an input question, synthesize a response from the query results.n"
"Query: {query_str}n"
"SQL: {sql_query}n"
"SQL Response: {context_str}n"
"Response: "
)
response_synthesis_prompt = PromptTemplate(response_synthesis_prompt_str)
llm = DashScope(
model_name=DashScopeGenerationModels.QWEN_MAX,
api_key=os.getenv("DASHSCOPE_API_KEY")
)
from llama_index.core.workflow import (
Workflow,
StartEvent,
StopEvent,
step,
Context,
Event,
)
# 定义自定义事件
class TableRetrieveEvent(Event):
"""表检索事件"""
table_context_str: str
query: str
class TextToSQLEvent(Event):
"""Text-to-SQL 事件"""
sql: str
query: str
class TextToSQLWorkflow(Workflow):
"""Text-to-SQL 工作流"""
def __init__(
self,
obj_retriever,
text2sql_prompt,
sql_retriever,
response_synthesis_prompt,
llm,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.obj_retriever = obj_retriever
self.text2sql_prompt = text2sql_prompt
self.sql_retriever = sql_retriever
self.response_synthesis_prompt = response_synthesis_prompt
self.llm = llm
@step
def retrieve_tables(
self, ctx: Context, ev: StartEvent
) -> TableRetrieveEvent:
"""检索相关表"""
table_schema_objs = self.obj_retriever.retrieve(ev.query)
table_context_str = get_table_context_str(table_schema_objs)
return TableRetrieveEvent(
table_context_str=table_context_str, query=ev.query
)
@step
def generate_sql(
self, ctx: Context, ev: TableRetrieveEvent
) -> TextToSQLEvent:
"""生成 SQL 语句"""
fmt_messages = self.text2sql_prompt.format_messages(
query_str=ev.query, schema=ev.table_context_str
)
chat_response = self.llm.chat(fmt_messages)
sql = parse_response_to_sql(chat_response)
return TextToSQLEvent(sql=sql, query=ev.query)
@step
def generate_response(
self, ctx: Context, ev: TextToSQLEvent
) -> StopEvent:
"""执行 SQL 并生成回复"""
retrieved_rows = self.sql_retriever.retrieve(ev.sql)
fmt_messages = self.response_synthesis_prompt.format_messages(
sql_query=ev.sql,
context_str=str(retrieved_rows),
query_str=ev.query,
)
chat_response = self.llm.chat(fmt_messages)
return StopEvent(result=chat_response)
实例化工作流:
workflow = TextToSQLWorkflow(
obj_retriever,
text2sql_prompt,
sql_retriever,
response_synthesis_prompt,
llm,
verbose=True, # 开启详细日志,方便调试
)
▶️ 运行工作流:
import asyncio
async def main():
response = await workflow.run(
query="What was the year that The Notorious B.I.G was signed to Bad Boy?"
)
print(f"查询结果: {response.result.message.content}")
# 运行
asyncio.run(main())
调试技巧:
# 可视化工作流
from llama_index.utils.workflow import draw_all_possible_flows
draw_all_possible_flows(
TextToSQLWorkflow, filename="text_to_sql_workflow.html"
)
预期输出示例:
查询结果: The Notorious B.I.G was signed to Bad Boy in 1993.
根据事件类型或数据内容,选择不同的处理路径:
@step
def route(self, ctx: Context, ev: SomeEvent) -> Event:
"""根据条件路由到不同的处理步骤"""
if ev.condition:
return EventA(data=ev.data)
else:
return EventB(data=ev.data)
通过事件类型匹配,实现循环执行:
class IterationEvent(Event):
"""迭代事件"""
data: list
index: int
@step
def process_item(self, ctx: Context, ev: IterationEvent) -> Event:
"""处理单个项目"""
if ev.index < len(ev.data):
# 处理当前项
process(ev.data[ev.index])
# 继续下一个
return IterationEvent(data=ev.data, index=ev.index + 1)
else:
# 完成
return StopEvent(result="完成")
多个 step 可以并行处理不同的事件:
@step
def step_a(self, ctx: Context, ev: StartEvent) -> EventA:
"""步骤 A"""
return EventA(data="A")
@step
def step_b(self, ctx: Context, ev: StartEvent) -> EventB:
"""步骤 B"""
return EventB(data="B")
# step_a 和 step_b 可以并行执行
一个 step 可以依赖多个前置 step 的结果:
@step
def merge(self, ctx: Context, ev: Union[EventA, EventB]) -> StopEvent:
"""合并多个事件的结果"""
# 处理逻辑
return StopEvent(result=merged_result)
多步骤 AI 应用- 需要多个 LLM 调用或处理步骤的应用 复杂业务逻辑- 需要条件分支、循环、并行处理的应用 需要可观测性- 需要追踪和调试执行过程的应用 需要扩展性- 需要频繁添加新功能或修改流程的应用
1. 官方文档
2. 进阶主题
3. 相关资源