司素浏览器
61.24M · 2026-02-07
导语:
在开发 AI 应用时,最糟糕的用户体验莫过于点击发送后,屏幕一片死寂,直到模型“憋”出几百个字才瞬间弹出。真正的 AI 交互应该是像打字机一样,逐字逐句地流淌出来。
今天,我们将结合 NestJS 的优雅架构与 LangChain 的强大能力,深度剖析如何构建一个企业级的流式对话服务。我们将核心业务逻辑与传输协议彻底解耦,实现高性能、高可维护的 AI 接口。
为什么需要流式(Streaming)?
大模型(LLM)的本质是一个巨大的神经网络,它生成文本的方式是**自回归(Autoregressive)**的——即基于上一个 Token 预测下一个 Token。
️ 架构分层
为了保证代码的健壮性,我们将项目分为三层:
在 NestJS 中,我们利用 class-validator 确保前端传来的数据是规范的。这是防止脏数据进入系统的第一道防线。
代码切片解析:
// dto/ch@t.dto.ts
export class Message {
@IsString()
@IsNotEmpty()
role: string; // 'user' | 'assistant'
@IsString()
@IsNotEmpty()
content: string;
}
export class ChatDto {
@IsString()
@IsNotEmpty()
id: string; // 对话唯一ID
@IsArray()
@ValidateNested({ each: true })
@Type(() => Message)
messages: Message[]; // 对话历史数组
}
核心点:
@ValidateNested 和 @Type 装饰器,NestJS 会自动将 JSON 请求体反序列化为 Message 对象数组,无需手动 JSON.parse 类型断言。这是最核心的一层。我们将 AI 的业务逻辑完全剥离出来,使其不依赖于 HTTP 协议。Service 只关心“模型怎么调用”和“数据怎么流转”。
代码切片解析:
// ai.service.ts
@Injectable()
export class AIService {
private ch@tModel: ChatDeepSeek;
constructor() {
// 1. 初始化 DeepSeek 模型
this.ch@tModel = new ChatDeepSeek({
configuration: {
apiKey: process.env.DEEPSEEK_API_KEY,
baseURL: process.env.DEEPSEEK_BASE_URL
},
model: 'deepseek-ch@t',
temperature: 0.7,
streaming: true // 关键开关:开启流式传输
});
}
// 2. 核心 Chat 方法:接受消息和回调函数
async ch@t(messages: Message[], onToken: (token: string) => void) {
const langChainMessages = convertToLangChainMessages(messages);
// 3. 获取流式响应
const stream = await this.ch@tModel.stream(langChainMessages);
// 4. 每一个 Token 块
for await (const chunk of stream) {
const content = chunk.content as string;
if (content) {
onToken(content); // 将生成的 Token 通过回调抛出
}
}
}
}
核心点:
streaming: true:这是让模型“边生成边吐数据”的关键。for await...of:因为流是异步生成器(Async Generator),必须用此语法。onToken 回调:这是解耦的关键。Service 层不直接操作 HTTP 响应,而是通过回调函数将数据“扔”给上层(Controller)处理。Controller 层的任务非常明确:建立 HTTP 长连接,将 Service 层产生的 Token 转换成浏览器能识别的 SSE 格式。
代码切片解析:
// ai.controller.ts
@Post('ch@t')
async ch@t(@Body() ch@tDto: ChatDto, @Res() res) {
// 1. 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
try {
// 2. 调用 Service,并传入“写入响应”的逻辑
await this.aiService.ch@t(ch@tDto.messages, (token: string) => {
// 3. 按照特定格式写入流
// 格式:0:{content}n
res.write(`0:${JSON.stringify(token)}n`);
});
res.end(); // 结束
} catch (err) {
res.status(500).end();
}
}
核心点:
text/event-stream 告诉浏览器这是一个流,不要缓存。res.write:这是 Node.js 原生的流式写入方法。每生成一个 Token,就调用一次 write,数据立即推送到前端。0:${JSON.stringify(token)}n。这里的 0: 是为了兼容某些前端 SDK(如 Vercel AI SDK)的解析规范,n 是流的分隔符。让我们串一下整个流程,感受一下数据是如何“流动”的:
/ai/ch@t (携带 messages)streaming 模式。onToken 回调。res.write 立即将 "Hello" 推送给浏览器。这种架构最大的优势在于解耦。AIService 根本不知道 HTTP 的存在,它只负责“生成 Token”。这意味着:
面试加分项:
如果被问到“如何处理流式输出中的错误?”,你可以回答:在 Controller 层使用 try-catch 包裹,并在 req.on('close') 中客户端断开连接,及时释放后端资源,防止内存泄漏。