我的奶牛
10.48M · 2026-03-26
pipecat作为专注实时交互的Python框架,其核心优势在于将复杂的多模态处理拆解为模块化组。
数据从输入到输出依次经过多个处理器(Processor),你可以理解为一条“处理流水线”。
pipeline = Pipeline(
[
transport.input(), # 输入(音频/文本)
rtvi, # RTVI 协议处理器
stt, # 语音识别
user_input_store_processor, # 存储用户输入
context_aggregator.user(), # 用户上下文聚合
llm, # 大语言模型
tts, # 语音合成
transport.output(), # 输出
context_aggregator.assistant(),
]
)
负责启动 Pipeline 并发出 StartFrame / EndFrame。
task = PipelineTask(
pipeline,
params=PipelineParams(allow_interruptions=True),
)
负责节点流转、工具注册、上下文更新与跳转逻辑。
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
transport=transport,
)
启动任务并维持会话运行。
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
Transport 是客户端与服务端通信的核心层,负责:
Transport 会经历以下状态:
在 SmallWebRTC 测试页面中,只有当状态进入 Ready,才能正常进行语音/文本交互。
官方说明参考:docs.pipecat.ai/client/js/t…
SmallWebRTC 测试页面的 默认参数最终进入 runner_args.body 的路径是 /api/offer 的 request_data。
/start 用于 session 初始化(创建 sessionId、ICE 配置)/api/offer 才会把 request_data 写入 runner_args.body所以如果你需要把默认参数传给后端,请确保注入到 /api/offer。
SmallWebRTC UI 发送 send-text,会生成 InputTransportMessageFrame。
if isinstance(frame, InputTransportMessageFrame):
message = frame.message
if message.get("type") == "send-text":
text = message["data"]["content"]
语音经过 STT 后输出 TranscriptionFrame。
if isinstance(frame, TranscriptionFrame):
text = frame.text
使用 flow_manager.state 存储用户每次输入:
flow_manager.state.setdefault("user_inputs", [])
flow_manager.state["user_inputs"].append({"text": text})
flow_manager.state["last_user_input"] = text
在函数 handler 中读取:
last_input = flow_manager.state.get("last_user_input", "")
history = flow_manager.state.get("user_inputs", [])
{
"body": {
"init_string": "患者ID=12345",
"speech_meta": {"name": "lyq", "age": 31}
}
}
init_body = runner_args.body or {}
init_string = init_body.get("init_string")
speech_meta = init_body.get("speech_meta", {})
flow_manager.state["init_string"] = init_string
context.add_message({"role": "system", "content": f"初始化参数:{init_string}"})
Flow 的分支逻辑靠 handler 返回 (result, next_node) 。
async def handle_choose_pizza(args, flow_manager):
result = pizza_info()
if result == 1:
return result, create_confirm_node()
else:
return result, create_pizza_task_node()
_set_node() 完成上下文更新 + 工具注册Pipecat 的核心抽象是 FrameProcessor。
所有组件都遵循:
组件分为三类:
| 组件类型 | 继承基类 | 作用 |
|---|---|---|
| 通用处理器 | FrameProcessor | 自定义帧处理 |
| AI 服务 | STTService / TTSService / LLMService | 与模型交互 |
| Transport | BaseTransport | 推拉流 |
如果你要接入第三方 RTC(Agora / 腾讯 / 网易),只需实现 input() / output():
class CustomRTCTransport(BaseTransport):
def __init__(self, params: TransportParams, rtc_client):
super().__init__()
self._input = CustomRTCInput(params, rtc_client)
self._output = CustomRTCOutput(params, rtc_client)
def input(self):
return self._input
def output(self):
return self._output
Transport 只负责:
官方 Transport 文档参考:
docs.pipecat.ai/client/js/t…