为什么需要工作流框架?

传统方式的痛点

传统的实现方式通常是这样的:

def process_query(query):
    # 步骤1:检索相关表
    tables = retrieve_tables(query)
    # 步骤2:生成 SQL
    sql = generate_sql(query, tables)
    # 步骤3:执行查询
    results = execute_sql(sql)
    # 步骤4:生成回复
    response = generate_response(query, results)
    return response

工作流框架的解决方案

LlamaIndex 工作流框架正是为了解决这些问题而设计的:

工作流框架不仅让代码更加清晰,还让复杂 AI 应用的构建变得优雅而高效

本文学习路径

在本文中,我们将从基础概念开始,逐步深入,最终通过一个完整的 Text-to-SQL 示例,展示如何使用工作流框架构建生产级的 AI 应用。

学习路线图

  1. 基础概念- 理解工作流的核心组件
  2. 简单示例- 快速上手第一个工作流
  3. 进阶示例- 掌握条件分支等高级特性
  4. 实战项目- 构建完整的 Text-to-SQL 应用
  5. 高级特性- 探索分支、循环、并行等能力
  6. 总结延伸- 核心要点与进阶学习

【基础】工作流核心概念

事件驱动机制

LlamaIndex 的工作流是事件(event)驱动的:

  • 工作流由step组成
  • 每个step处理特定的事件
  • step也会产生新的事件(交由后继的step进行处理)
  • 直到产生StopEvent整个工作流结束

核心组件

LlamaIndex 工作流框架包含以下核心组件:

1. Event(事件)

  • 事件是工作流中传递数据的载体
  • 每个事件都是一个数据类,包含需要传递的信息
  • StartEvent是工作流的起始事件,通常包含用户输入
  • StopEvent是工作流的结束事件,包含最终结果

2. Step(步骤)

  • 使用@step装饰器标记的方法
  • 每个 step 接收一个事件,处理后返回新的事件
  • step 之间通过事件类型自动连接

3. Context(上下文)

  • 在工作流执行过程中共享的状态
  • 可以在不同的 step 之间传递和访问数据

4. Workflow(工作流类)

  • 继承自Workflow基类
  • 包含多个@step方法,定义完整的工作流程

工作流的优势

相比传统的顺序执行代码,工作流框架具有以下优势:

1. 解耦与模块化

  • 每个 step 是独立的处理单元,职责清晰
  • 易于单独测试和维护

2. 灵活的执行流程

  • 支持条件分支:根据事件类型选择不同的处理路径
  • 支持循环:可以重复执行某些步骤
  • 支持并行:多个 step 可以并行执行

3. 可观测性

  • 每个 step 的输入输出都是明确的事件
  • 便于追踪和调试工作流执行过程
  • 可以可视化整个工作流的结构

4. 易于扩展

  • 添加新的处理步骤只需定义新的 step 方法
  • 工作流框架会自动处理事件路由

【入门】简单示例

完整代码示例

下面是一个最简单的工作流示例,帮助理解基本结构:

from llama_index.core.workflow import (
    Workflow,
    StartEvent,
    StopEvent,
    step,
    Context,
)

class SimpleWorkflow(Workflow):
    """一个简单的工作流示例"""
    
    @step  #  关键:使用 @step 装饰器
    def process(self, ctx: Context, ev: StartEvent) -> StopEvent:
        """处理输入并返回结果"""
        # 从 StartEvent 中获取输入
        query = ev.query
        
        # 执行处理逻辑
        result = f"已处理查询: {query}"
        
        # 返回 StopEvent,工作流结束
        return StopEvent(result=result)

# 使用工作流
import asyncio

asyncdef main():
    workflow = SimpleWorkflow()
    response = await workflow.run(query="Hello World")
    print(response.result)  # 输出: 已处理查询: Hello World

# 运行
asyncio.run(main())

代码解析

这个示例展示了工作流的基本结构:

1. 定义工作流类

  • 继承自Workflow基类
  • 包含一个或多个@step方法

2. 使用装饰器

  • 使用@step装饰器标记处理方法
  • 框架会自动识别并连接这些步骤

3. 事件流转

  • 方法接收StartEvent(起始事件)
  • 返回StopEvent(结束事件)
  • 事件中包含了需要传递的数据

4. 运行工作流

  • 通过workflow.run()启动工作流
  • 传入初始参数(如query
  • 获取最终结果(response.result

【进阶】中等复杂度示例:条件分支

场景说明

需求:构建一个带输入验证的工作流,根据验证结果选择不同的处理路径。

完整代码示例

from llama_index.core.workflow import (
    Workflow,
    StartEvent,
    StopEvent,
    step,
    Context,
    Event,
)

# 定义自定义事件
class ValidationEvent(Event):
    """验证事件"""
    query: str
    is_valid: bool

class ProcessedEvent(Event):
    """处理完成事件"""
    query: str
    result: str

class ConditionalWorkflow(Workflow):
    """展示条件分支的工作流"""
    
    @step
    def validate(self, ctx: Context, ev: StartEvent) -> ValidationEvent:
        """验证输入"""
        query = ev.query
        
        # 简单的验证逻辑:检查查询是否为空
        is_valid = len(query.strip()) > 0
        
        return ValidationEvent(query=query, is_valid=is_valid)
    
    @step
    def process_valid(self, ctx: Context, ev: ValidationEvent) -> ProcessedEvent:
        """处理有效查询"""
        ifnot ev.is_valid:
            # 如果验证失败,返回一个默认结果
            return ProcessedEvent(query=ev.query, result="查询无效")
        
        # 处理有效查询
        result = f"已处理查询: {ev.query}"
        return ProcessedEvent(query=ev.query, result=result)
    
    @step
    def finalize(self, ctx: Context, ev: ProcessedEvent) -> StopEvent:
        """最终处理"""
        return StopEvent(result=ev.result)

# 使用工作流
asyncdef main():
    workflow = ConditionalWorkflow()
    
    # 有效查询
    response1 = await workflow.run(query="Hello World")
    print(response1.result)  # 输出: 已处理查询: Hello World
    
    # 无效查询
    response2 = await workflow.run(query="")
    print(response2.result)  # 输出: 查询无效

asyncio.run(main())

代码解析

这个示例展示了工作流的进阶用法:

1. 定义自定义事件

  • ValidationEvent:验证事件,包含验证结果
  • ProcessedEvent:处理完成事件,包含处理结果
  • 通过自定义事件实现步骤之间的数据传递

2. 多步骤工作流

  • validate:验证输入
  • process_valid:根据验证结果处理
  • finalize:最终处理并返回结果

3. 条件分支逻辑

  • process_valid中根据is_valid选择不同的处理路径
  • 验证失败时返回默认结果
  • 验证成功时执行正常处理

【实战】Text-to-SQL 完整示例

接下来,我们将从场景分析开始,逐步构建一个完整的自然语言查询数据库系统。

5.1 场景说明

业务场景:构建一个自然语言查询数据库的系统,用户可以用自然语言提问,系统自动生成 SQL 并返回结果。

工作流设计

流程说明

  1. 用户输入- 用户输入自然语言查询
  2. 表检索- 系统检索与查询相关的数据库表
  3. SQL 生成- 根据表的 Schema 让大模型生成 SQL
  4. 执行查询- 执行生成的 SQL 查询数据库
  5. 生成回复- 根据查询结果,调用大模型生成自然语言回复

5.2 数据准备

前置依赖

# 安装必要的包
pip install llama-index
pip install llama-index-llms-dashscope
pip install llama-index-embeddings-dashscope
pip install pandas sqlalchemy python-dotenv

环境配置

创建.env文件,配置 API Key:

DASHSCOPE_API_KEY=your_api_key_here

数据准备步骤

  1. 下载并加载数据(简化版,详细代码见附录)
import pandas as pd
from pathlib import Path

# 加载 CSV 文件
data_dir = Path("./WikiTableQuestions/csv/200-csv")
csv_files = sorted([f for f in data_dir.glob("*.csv")])
dfs = [pd.read_csv(f) for f in csv_files]
  1. 为每个表生成描述信息(用于后续检索)
from llama_index.core.prompts import ChatPromptTemplate
from llama_index.core.bridge.pydantic import BaseModel, Field
from llama_index.core.llms import ChatMessage
import json

class TableInfo(BaseModel):
    """表信息"""
    table_name: str = Field(..., description="表名(使用下划线,无空格)")
    table_summary: str = Field(..., description="表的简短描述")

# 定义提示模板
prompt_str = """
Give me a summary of the table with the following JSON format.
Table: {table_str}
Summary: """

prompt_tmpl = ChatPromptTemplate(
    message_templates=[ChatMessage.from_str(prompt_str, role="user")]
)

# 为每个表生成描述(这里简化处理,实际需要调用 LLM)
table_infos = []
# ... 生成 table_infos 的代码(详见完整示例)
  1. 将数据存入 SQLite 数据库
from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer
import re

def sanitize_column_name(col_name):
    """清理列名"""
    return re.sub(r"W+", "_", col_name)

def create_table_from_dataframe(df, table_name, engine, metadata_obj):
    """从 DataFrame 创建数据库表"""
    # 清理列名
    sanitized_columns = {col: sanitize_column_name(col) for col in df.columns}
    df = df.rename(columns=sanitized_columns)
    
    # 创建列
    columns = [
        Column(col, String if dtype == "object"else Integer)
        for col, dtype in zip(df.columns, df.dtypes)
    ]
    
    # 创建表
    table = Table(table_name, metadata_obj, *columns)
    metadata_obj.create_all(engine)
    
    # 插入数据(生产环境建议使用批量插入)
    with engine.connect() as conn:
        for _, row in df.iterrows():
            conn.execute(table.insert().values(**row.to_dict()))
        conn.commit()

# 创建数据库
engine = create_engine("sqlite:///wiki_table_questions.db")
metadata_obj = MetaData()
# ... 为每个表创建数据库表

5.3 工具构建

工具清单

  1. 创建基于表描述的向量索引
import os
from llama_index.core import Settings
from llama_index.llms.dashscope import DashScope, DashScopeGenerationModels
from llama_index.embeddings.dashscope import DashScopeEmbedding, DashScopeTextEmbeddingModels
from llama_index.core.objects import (
    SQLTableNodeMapping,
    ObjectIndex,
    SQLTableSchema,
)
from llama_index.core import SQLDatabase, VectorStoreIndex
from dotenv import load_dotenv

load_dotenv()

# 设置全局模型
Settings.llm = DashScope(
    model_name=DashScopeGenerationModels.QWEN_MAX,
    api_key=os.getenv("DASHSCOPE_API_KEY")
)
Settings.embed_model = DashScopeEmbedding(
    model_name=DashScopeTextEmbeddingModels.TEXT_EMBEDDING_V1
)

# 连接数据库
sql_database = SQLDatabase(engine)

# 创建表节点映射
table_node_mapping = SQLTableNodeMapping(sql_database)

# 构建表结构对象列表
table_schema_objs = [
    SQLTableSchema(table_name=t.table_name, context_str=t.table_summary)
    for t in table_infos
]

# 创建 ObjectIndex 并构建检索器
obj_index = ObjectIndex.from_objects(
    table_schema_objs,
    table_node_mapping,
    VectorStoreIndex,
)
obj_retriever = obj_index.as_retriever(similarity_top_k=3)
  1. 创建 SQL 查询器
from llama_index.core.retrievers import SQLRetriever
from typing import List

sql_retriever = SQLRetriever(sql_database)

def get_table_context_str(table_schema_objs: List[SQLTableSchema]):
    """获取表上下文字符串"""
    context_strs = []
    for table_schema_obj in table_schema_objs:
        table_info = sql_database.get_single_table_info(
            table_schema_obj.table_name
        )
        if table_schema_obj.context_str:
            table_info += f" The table description is: {table_schema_obj.context_str}"
        context_strs.append(table_info)
    return"nn".join(context_strs)
  1. 创建 Text2SQL 提示词和解析器
from llama_index.core.prompts.default_prompts import DEFAULT_TEXT_TO_SQL_PROMPT
from llama_index.core import PromptTemplate
from llama_index.core.llms import ChatResponse

def parse_response_to_sql(chat_response: ChatResponse) -> str:
    """从响应中解析 SQL"""
    response = chat_response.message.content
    sql_query_start = response.find("SQLQuery:")
    if sql_query_start != -1:
        response = response[sql_query_start:]
        if response.startswith("SQLQuery:"):
            response = response[len("SQLQuery:") :]
    sql_result_start = response.find("SQLResult:")
    if sql_result_start != -1:
        response = response[:sql_result_start]
    return response.strip().strip("```").strip()

text2sql_prompt = DEFAULT_TEXT_TO_SQL_PROMPT.partial_format(
    dialect=engine.dialect.name
)
  1. 创建自然语言回复生成模板
response_synthesis_prompt_str = (
    "Given an input question, synthesize a response from the query results.n"
    "Query: {query_str}n"
    "SQL: {sql_query}n"
    "SQL Response: {context_str}n"
    "Response: "
)
response_synthesis_prompt = PromptTemplate(response_synthesis_prompt_str)

llm = DashScope(
    model_name=DashScopeGenerationModels.QWEN_MAX,
    api_key=os.getenv("DASHSCOPE_API_KEY")
)

5.4 工作流定义

from llama_index.core.workflow import (
    Workflow,
    StartEvent,
    StopEvent,
    step,
    Context,
    Event,
)

# 定义自定义事件
class TableRetrieveEvent(Event):
    """表检索事件"""
    table_context_str: str
    query: str

class TextToSQLEvent(Event):
    """Text-to-SQL 事件"""
    sql: str
    query: str

class TextToSQLWorkflow(Workflow):
    """Text-to-SQL 工作流"""
    
    def __init__(
        self,
        obj_retriever,
        text2sql_prompt,
        sql_retriever,
        response_synthesis_prompt,
        llm,
        *args,
        **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.obj_retriever = obj_retriever
        self.text2sql_prompt = text2sql_prompt
        self.sql_retriever = sql_retriever
        self.response_synthesis_prompt = response_synthesis_prompt
        self.llm = llm
    
    @step
    def retrieve_tables(
        self, ctx: Context, ev: StartEvent
    ) -> TableRetrieveEvent:
        """检索相关表"""
        table_schema_objs = self.obj_retriever.retrieve(ev.query)
        table_context_str = get_table_context_str(table_schema_objs)
        return TableRetrieveEvent(
            table_context_str=table_context_str, query=ev.query
        )
    
    @step
    def generate_sql(
        self, ctx: Context, ev: TableRetrieveEvent
    ) -> TextToSQLEvent:
        """生成 SQL 语句"""
        fmt_messages = self.text2sql_prompt.format_messages(
            query_str=ev.query, schema=ev.table_context_str
        )
        chat_response = self.llm.chat(fmt_messages)
        sql = parse_response_to_sql(chat_response)
        return TextToSQLEvent(sql=sql, query=ev.query)
    
    @step
    def generate_response(
        self, ctx: Context, ev: TextToSQLEvent
    ) -> StopEvent:
        """执行 SQL 并生成回复"""
        retrieved_rows = self.sql_retriever.retrieve(ev.sql)
        fmt_messages = self.response_synthesis_prompt.format_messages(
            sql_query=ev.sql,
            context_str=str(retrieved_rows),
            query_str=ev.query,
        )
        chat_response = self.llm.chat(fmt_messages)
        return StopEvent(result=chat_response)

5.5 运行与调试

实例化工作流

workflow = TextToSQLWorkflow(
    obj_retriever,
    text2sql_prompt,
    sql_retriever,
    response_synthesis_prompt,
    llm,
    verbose=True,  #  开启详细日志,方便调试
)

▶️ 运行工作流

import asyncio

async def main():
    response = await workflow.run(
        query="What was the year that The Notorious B.I.G was signed to Bad Boy?"
    )
    print(f"查询结果: {response.result.message.content}")

# 运行
asyncio.run(main())

调试技巧

  1. 开启详细日志
  2. 查看中间结果
  3. 可视化工作流
# 可视化工作流
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(
    TextToSQLWorkflow, filename="text_to_sql_workflow.html"
)

预期输出示例

查询结果: The Notorious B.I.G was signed to Bad Boy in 1993.

【高级】高级特性

条件分支

根据事件类型或数据内容,选择不同的处理路径:

@step
def route(self, ctx: Context, ev: SomeEvent) -> Event:
    """根据条件路由到不同的处理步骤"""
    if ev.condition:
        return EventA(data=ev.data)
    else:
        return EventB(data=ev.data)

循环执行

通过事件类型匹配,实现循环执行:

class IterationEvent(Event):
    """迭代事件"""
    data: list
    index: int

@step
def process_item(self, ctx: Context, ev: IterationEvent) -> Event:
    """处理单个项目"""
    if ev.index < len(ev.data):
        # 处理当前项
        process(ev.data[ev.index])
        # 继续下一个
        return IterationEvent(data=ev.data, index=ev.index + 1)
    else:
        # 完成
        return StopEvent(result="完成")

并行执行

多个 step 可以并行处理不同的事件:

@step
def step_a(self, ctx: Context, ev: StartEvent) -> EventA:
    """步骤 A"""
    return EventA(data="A")

@step
def step_b(self, ctx: Context, ev: StartEvent) -> EventB:
    """步骤 B"""
    return EventB(data="B")

# step_a 和 step_b 可以并行执行

复杂依赖关系

一个 step 可以依赖多个前置 step 的结果:

@step
def merge(self, ctx: Context, ev: Union[EventA, EventB]) -> StopEvent:
    """合并多个事件的结果"""
    # 处理逻辑
    return StopEvent(result=merged_result)

【总结】核心要点与延伸学习

核心要点回顾

  1. 事件驱动架构
  2. 模块化设计
  3. 灵活的执行流程
  4. ️ 可观测性

适用场景

多步骤 AI 应用- 需要多个 LLM 调用或处理步骤的应用 复杂业务逻辑- 需要条件分支、循环、并行处理的应用 需要可观测性- 需要追踪和调试执行过程的应用 需要扩展性- 需要频繁添加新功能或修改流程的应用

延伸学习

1. 官方文档

  • LlamaIndex Workflows 官方文档
  • 工作流示例集合

2. 进阶主题

  • 工作流的错误处理和重试机制
  • 工作流的性能优化
  • 工作流的测试策略
  • 工作流的部署和监控

3. 相关资源

  • LlamaIndex GitHub 仓库
  • LlamaIndex 社区论坛
  • 工作流最佳实践案例

实践建议

  1. 从简单开始- 先构建简单的工作流,逐步增加复杂度
  2. 充分利用事件- 通过事件类型清晰地定义数据流
  3. 保持单一职责- 每个 step 只做一件事
  4. 使用可视化工具- 定期可视化工作流,确保结构清晰
  5. 编写测试- 为每个 step 编写单元测试
本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com