流水线上的哲学家:React 与 NestJS 的 AI 流式传输深度指南
各位,下午好!我是你们今天的讲师。今天我们不聊那些花里胡哨的 UI 动画,也不聊怎么在堆栈溢出上买彩票。今天,我们要聊的是 AI 应用开发中那个最性感、最折磨人,但一旦搞定就爽翻天的话题——流式传输。
想象一下,你去餐厅点了一碗面。传统的做法是,厨师把面煮好,盛到碗里,端到你面前。这个过程你很焦虑,你不知道面熟没熟,你只能盯着门口,等待那漫长的“叮”的一声。这叫阻塞调用。
现在,想象一下另一种服务:你点了面,厨师说“好嘞,马上来”。然后,热气腾腾的面条就像一条小溪一样,顺着透明的管道,一根一根地(或者一小段一小段地)流到了你的碗里。你在吃第一口的时候,第二口、第三口已经在路上了。这种体验,就是我们要讲的流式传输。
在 LLM(大语言模型)的世界里,如果模型要吐出几千个字,传统的“等它吐完再给你”的方式,用户早就关掉浏览器去刷 TikTok 了。所以,我们要做的,就是搭建一条从 NestJS 后端到 React 前端的“面条传输管道”。
好了,话不多说,让我们戴上安全帽,钻进这个管道。
第一章:NestJS 端——打造“喷泉”式输出
首先,我们要在后端搞事情。NestJS 这家伙,本来是用来写企业级后端的,架构清晰得像瑞士军刀。但在流式传输里,它就是那个负责“浇水”的园丁。
1.1 服务层的抽象:别把锅甩给 Controller
很多新手喜欢在 Controller 里直接写异步逻辑,比如 await generateStream()。千万别这样!Controller 只是个摆设,它应该负责路由、参数校验和响应头。真正的体力活,得交给 Service。
我们需要定义一个 Service,它的职责是“召唤”模型。
这里为了演示,我们假设我们在用 Ollama 或者 OpenAI 的 SDK。我们不需要真的连上模型,我们用一种“障眼法”来模拟生成的过程。但在真实世界里,你会看到类似这样的代码:
// llm.service.ts
import { Injectable } from '@nestjs/common';
import { OpenAiService } from './open-ai.service'; // 假设你有一个封装好的服务
@Injectable()
export class LlmService {
constructor(private readonly openAiService: OpenAiService) {}
async *streamCompletion(messages: any) {
// 这里调用实际的 LLM SDK,通常是异步生成器
// 例如: for await (const chunk of openAiService.chat.completions.create(...)) { yield chunk }
// 下面是模拟代码,假装我们在思考,假装我们在吐字
const response = "我是流式传输的示例文本。看,我正在一行一行地出现。";
for (const char of response) {
// 模拟网络延迟
await new Promise(resolve => setTimeout(resolve, 100));
yield char;
}
}
}
看懂了吗?async * 和 yield。这叫生成器函数。它不一次性把所有数据内存都占满,而是像枪管一样,把子弹一颗颗打出去。
1.2 Controller:响应头的艺术
接下来是 Controller。当你开启流式传输,你就是在告诉浏览器:“嘿,哥们,这不仅仅是一个文本文件,这是一个流,请把我的响应头设成 SSE (Server-Sent Events) 或者仅仅是让管道保持打开。”
// app.controller.ts
import { Controller, Post, Body, Res } from '@nestjs/common';
import { Response } from 'express';
import { LlmService } from './llm.service';
@Controller('chat')
export class AppController {
constructor(private readonly llmService: LlmService) {}
@Post('stream')
async streamChat(@Body() body: { message: string }, @Res() res: Response) {
// 设置响应头,关键点来了!
// 1. 让浏览器知道这是一个流
res.setHeader('Content-Type', 'text/event-stream');
// 2. 关闭缓存,流不能被浏览器缓存,要实时更新
res.setHeader('Cache-Control', 'no-cache');
// 3. 告诉浏览器这个连接支持重连
res.setHeader('Connection', 'keep-alive');
// 获取生成器
const generator = this.llmService.streamCompletion([body.message]);
// 我们需要一个迭代器来消费这个生成器
const iterator = generator[Symbol.asyncIterator]();
// 进入死循环,开始吐数据
try {
while (true) {
const { value, done } = await iterator.next();
if (done) {
res.end(); // 没话说了,管道关闭
break;
}
// 发送数据。注意,这里发送的是原始的字符串(或者特定的 SSE 格式)
// 如果是用 SSE 协议,通常是 "data: xxxnn"
// 为了简单演示,我们直接流式传输文本,前端用 TextDecoder 解析
res.write(value);
}
} catch (error) {
console.error('Stream error:', error);
res.write(`[Error: ${error.message}]`);
res.end();
}
}
}
1.3 深度解析:为什么 Readable.from 有时更好?
上面的 Controller 写法是经典的“迭代器模式”。但 NestJS 的生态里还有一个神器,那就是 Readable.from。它可以把一个异步生成器包装成一个 Node.js 的可读流。
为什么这很重要?因为 Node.js 的流处理非常强大。我们可以轻易地在流中间插入 TransformStream 来做数据清洗、脱敏或者格式化。
import { Readable } from 'stream';
@Post('stream-advanced')
async streamChatAdvanced(@Body() body: any, @Res() res: Response) {
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Transfer-Encoding', 'chunked'); // 分块传输
// 创建一个 Transform 流来处理数据
// 比如我们想给每个句子加个 Emoji
const emojiTransformer = new Transform({
transform(chunk, encoding, callback) {
// chunk 是 Buffer,我们把它转成字符串
const str = chunk.toString();
// 简单的替换逻辑(实际生产要更聪明点,防止破坏 HTML 标签)
const emojiStr = str.replace(/。/g, "。n[Thinking...]");
callback(null, emojiStr);
}
});
// 从生成器创建可读流
const readableStream = Readable.from(this.llmService.streamCompletion([body.message]));
// 管道传输:生成器 -> Emoji 处理器 -> HTTP 响应
readableStream.pipe(emojiTransformer).pipe(res);
}
你看,通过 pipe,代码变得像搭积木一样干净。这就是 NestJS 流式传输的精髓:不要手动去 await next(),要用流!
第二章:React 端——接住飞来的文字
前端怎么接?这其实比后端简单,但也容易踩坑。React 的核心在于状态更新,而流式传输最怕的就是“状态更新频率过高导致的性能杀手”。
2.1 传统的 fetch 与流式处理
如果不用特殊的库,原生 JS 的 fetch 就能搞定流式接收。
// ChatStream.tsx
import React, { useEffect, useState, useRef } from 'react';
const ChatStream: React.FC = () => {
const [text, setText] = useState('');
const abortControllerRef = useRef<AbortController | null>(null);
const startStreaming = async () => {
// 1. 每次开始新的请求,必须取消之前的请求(防止重复点击)
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
const controller = new AbortController();
abortControllerRef.current = controller;
// 2. 发起请求
const response = await fetch('http://localhost:3000/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: "请用代码解释什么是流式传输" }),
signal: controller.signal
});
// 3. 获取 Reader
const reader = response.body?.getReader();
if (!reader) return;
// 4. 获取解码器
const decoder = new TextDecoder('utf-8');
try {
while (true) {
// 5. 读取数据块
const { done, value } = await reader.read();
if (done) break;
// 6. 解码数据块
const chunk = decoder.decode(value, { stream: true });
// 7. 更新状态
// 这里有个坑:React 的 setState 是异步的。
// 如果数据流太快,频繁的 setState 会拖垮 UI 线程。
// 解决方案:使用 requestAnimationFrame 或者简单的累加
setText(prev => prev + chunk);
}
} catch (error) {
console.error("Stream failed", error);
}
};
return (
<div>
<button onClick={startStreaming}>发送请求</button>
<div style={{ whiteSpace: 'pre-wrap', border: '1px solid #ccc', padding: '10px' }}>
{text}
</div>
</div>
);
};
export default ChatStream;
2.2 性能优化:拒绝卡顿
上面的代码能跑,但在高性能场景下,你会看到 CPU 飙升,页面卡顿。为什么?因为 setText(prev => prev + chunk) 每次都会触发 React 的 Diff 算法。
想象一下,如果流速是 100ms 一块,React 就要在 100ms 内重新渲染整个文本块,这太累了。
优化方案 1:使用 useEffect + AbortController 的终极版
不要在 useEffect 里写 await,那会阻塞渲染。我们用 useEffect 来监听状态变化?不,那更慢。
我们要用即时执行的模式,结合 requestAnimationFrame 来节流。
const ChatStreamOptimized: React.FC = () => {
const [text, setText] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const readerRef = useRef<ReadableStreamDefaultReader | null>(null);
const decoderRef = useRef(new TextDecoder());
const handleSend = async () => {
if (isStreaming) return;
setIsStreaming(true);
// ... fetch 逻辑 ...
};
useEffect(() => {
// 这个 effect 只在组件挂载时运行一次,用来处理流读取逻辑
const runStream = async () => {
// 这里假装我们已经拿到了 reader (从 props 或者 ref 传进来)
const reader = readerRef.current;
if (!reader) return;
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoderRef.current.decode(value, { stream: true });
// 核心优化:直接操作 DOM,或者使用批量更新
// 这里为了演示简单,我们还是用 setState,但加上 requestAnimationFrame 节流
window.requestAnimationFrame(() => {
setText(prev => prev + chunk);
});
}
setIsStreaming(false);
};
runStream();
return () => {
// 清理函数:如果用户点关闭,一定要取消流
if (readerRef.current) {
readerRef.current.cancel();
}
};
}, []); // 空依赖数组,确保只运行一次
return (
// ... JSX
);
};
优化方案 2:虚拟滚动
如果你的文本有几万字,React 重新渲染几万个 DOM 节点绝对是灾难。这时候,你需要 react-window 或 react-virtualized。但通常流式传输的文本不会那么长,所以这里就点到为止。
2.3 Markdown 渲染:给文字加个“外衣”
原生的 white-space: pre-wrap 只能让文字换行,但它不懂代码高亮。为了让你的 AI 应用看起来像个正经的 ChatGPT,你需要集成一个 Markdown 解析库。
推荐 react-markdown。它天然支持流式传输!
import ReactMarkdown from 'react-markdown';
// 在组件里
<ReactMarkdown>
{text}
</ReactMarkdown>
等等,ReactMarkdown 会等待所有文本生成完毕才开始渲染吗?不,它支持 remark-gfm 和流式更新。只要 text 状态更新了,它就会重新计算 Markdown。虽然这比原生文本重绘稍微重一点,但对于 Markdown 来说是值得的。
第三章:深度集成——NestJS 与 React 的“私奔”
好了,现在让我们把这些碎片拼起来。我们需要一个完整的、能跑的、甚至稍微带点“黑客”气质的示例。
3.1 后端:NestJS 完整实现
我们要做一个支持 Markdown 预处理(简单替换)和 SSE 格式的端点。
// src/stream.controller.ts
import { Controller, Post, Body, Res } from '@nestjs/common';
import { Response } from 'express';
import { LlmService } from './llm.service';
@Controller('stream')
export class StreamController {
constructor(private readonly llmService: LlmService) {}
@Post()
async streamResponse(@Body() body: { prompt: string }, @Res() res: Response) {
// 设置 SSE 头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 获取生成器
const generator = this.llmService.streamGenerator(body.prompt);
// 转换为 Node.js Stream
const { Readable } = require('stream');
const readable = Readable.from(generator);
// 直接管道传输。NestJS 会自动处理响应体。
readable.pipe(res);
}
}
3.2 前端:React 组件实现
这个组件封装了所有逻辑,包括错误处理和加载状态。
// src/components/StreamChat.tsx
import React, { useState, useEffect, useRef } from 'react';
const StreamChat: React.FC = () => {
const [input, setInput] = useState('');
const [output, setOutput] = useState('');
const [loading, setLoading] = useState(false);
// 使用 ref 存储 AbortController,防止闭包陷阱
const abortControllerRef = useRef<AbortController | null>(null);
const readerRef = useRef<ReadableStreamDefaultReader<Uint8Array> | null>(null);
const handleSend = async () => {
if (!input.trim()) return;
setLoading(true);
setOutput(''); // 清空旧消息
setInput(''); // 清空输入框
// 1. 创建 AbortController
const controller = new AbortController();
abortControllerRef.current = controller;
try {
const response = await fetch('http://localhost:3000/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt: input }),
signal: controller.signal
});
if (!response.body) throw new Error('Response body is null');
// 2. 获取 Reader
readerRef.current = response.body.getReader();
const decoder = new TextDecoder('utf-8');
// 3. 循环读取
while (true) {
const { done, value } = await readerRef.current!.read();
if (done) break;
// 4. 解码并更新状态
const chunk = decoder.decode(value, { stream: true });
setOutput((prev) => prev + chunk);
}
} catch (error: any) {
if (error.name !== 'AbortError') {
console.error('Stream error:', error);
setOutput((prev) => `${prev}n[系统错误: ${error.message}]`);
}
} finally {
setLoading(false);
readerRef.current = null;
}
};
// 组件卸载时取消请求(重要!)
useEffect(() => {
return () => {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
if (readerRef.current) {
readerRef.current.cancel();
}
};
}, []);
return (
<div style={{ maxWidth: '800px', margin: '0 auto', padding: '20px' }}>
<div style={{ marginBottom: '20px' }}>
<textarea
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="输入你的问题..."
disabled={loading}
rows={3}
style={{ width: '100%' }}
/>
<button
onClick={handleSend}
disabled={loading || !input.trim()}
style={{ marginTop: '10px', padding: '8px 16px', cursor: loading ? 'not-allowed' : 'pointer' }}
>
{loading ? '思考中...' : '发送'}
</button>
</div>
<div style={{
border: '1px solid #ccc',
borderRadius: '8px',
padding: '16px',
minHeight: '200px',
background: '#f9f9f9'
}}>
{loading && !output && <div>🤖 正在接入神经网络...</div>}
<div style={{ whiteSpace: 'pre-wrap' }}>{output}</div>
</div>
</div>
);
};
export default StreamChat;
第四章:那些年我们踩过的“坑”
讲到这里,你以为这就结束了?天真。在实际工程中,流式传输就是一座布满地雷的花园。
4.1 “乱码”现象:字符被切断了
有时候,你会看到这样的现象:
“你好,我是流式传输的示例文本。你看,我正在一行一行地出现。[缺失字符]…[缺失字符]…”
为什么会这样?
假设模型正在发送“你好,世界”。
- 客户端收到了“你好,”(存入缓冲区)。
- 网络断了,或者浏览器还没来得及处理,连接关闭。
- “世界”这俩字丢在了服务器里。
解决之道: 不要直接把原始字节写回给前端。在 NestJS 端,或者在前端,都要注意“换行符”的处理。很多 SDK 会自动处理这个问题,确保每个事件(event)都是完整的。但在自定义流管道时,你要小心,不要在 UTF-8 字符的中间切断。
4.2 CORS 跨域地狱
如果你开发 React 应用和 NestJS 应用不在同一个端口(比如前端跑在 3000,后端跑在 3001),流式传输很容易被 CORS 拦截。
NestJS 的 CORS 默认配置通常是 credentials: true,这对流式传输来说是致命的,因为它会要求每次请求都带 cookie,导致流传输被浏览器策略阻断。
解决方法: 在 NestJS 中配置 CORS:
app.enableCors({
origin: 'http://localhost:3000',
methods: 'GET,HEAD,PUT,PATCH,POST,DELETE,OPTIONS',
credentials: false, // 流式传输通常不需要凭证,设为 false
});
4.3 超时设置
流式传输是一个长连接。如果你设置了默认的 Nginx 或反向代理超时(比如 60 秒),而模型推理要 3 分钟,那么连接会在第 61 秒断开,前端会收到一个错误。
解决方法:
- Nginx 配置:
proxy_read_timeout 300s; - NestJS 配置: 如果使用 Socket.IO(WebSocket 模式),要设置
pingTimeout。如果是 HTTP 长轮询或 SSE,要确保客户端不因为超时报错。
4.4 React 状态更新的“频率诅咒”
如果模型吐字速度是 50ms 一个字,那么 React 每 50ms 就要触发一次 render。这会导致浏览器主线程疯狂忙碌,导致页面滚动卡顿,甚至导致输入框无法输入(因为输入事件也被阻塞了)。
终极解决方案:React.memo + useCallback 优化组件渲染
虽然流式更新最核心的是父组件更新状态,但如果你把 Markdown 渲染器放在子组件里,并且用了 React.memo,那么每次 text 变化时,整个 Markdown 组件会重新渲染。如果 Markdown 解析很重,这会卡死页面。
更好的做法:
- 粗粒度更新: 比如每收到 50 个字符才更新一次 DOM。
- 虚拟化流: 使用
react-virtuoso之类的库,只渲染视口内的文本。
第五章:实战案例——构建一个“智能家居控制”流
为了巩固我们的知识,我们来做一个稍微高级点的例子。
场景: 用户问 AI “我的灯光怎么开?”,AI 不会只回答文字,它会像终端一样,逐行输出命令,并自动执行。
后端逻辑:
我们需要在流里注入“系统命令”。
// LlmService 的模拟扩展
async *streamSmartHome(prompt) {
const instructions = "你是一个智能家居控制助手。";
yield instructions + "n";
for (const char of "Executing: curl -X POST http://192.168.1.100/light/on") {
await new Promise(r => setTimeout(r, 100));
yield char;
}
// 模拟返回结果
yield "nResult: Success. Brightness set to 80%.n";
yield "Executing: curl -X POST http://192.168.1.100/sensor/temp";
for (const char of "Temp: 24.5C") {
await new Promise(r => setTimeout(r, 100));
yield char;
}
}
前端逻辑:
我们需要区分“用户指令”和“系统日志”。我们可以用一个正则或者状态机来检测。
// 简单的流处理器
let buffer = '';
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += new TextDecoder().decode(value);
// 检测是否包含指令关键词
if (buffer.includes('Executing:')) {
// 拆分输出
const parts = buffer.split('Executing:');
// 这里逻辑比较复杂,实际开发中通常让 AI 输出 JSON 格式
// 例如: {"type": "action", "command": "turn_on_light"}
// 简单演示:找到最后一个 "Executing:"
const lastMatch = buffer.match(/Executing:(.*)$/);
if (lastMatch) {
const command = lastMatch[1].trim();
// 触发前端逻辑
executeCommand(command);
// 清除 buffer 中的已处理部分,防止重复处理
buffer = buffer.substring(0, buffer.lastIndexOf('Executing:'));
}
}
}
结语:通往未来的桥梁
各位,今天我们走过了从后端生成器到前端解码器的整个技术栈。流式传输不仅仅是技术实现,它是一种用户体验哲学。
当用户看到文字像瀑布一样流下来,他们感受到的是一种“掌控感”,一种“实时交互感”。这种反馈机制,是 AI 应用区别于传统搜索引擎和静态文档的 DNA。
当然,构建一个完美的流式 AI 应用还面临着许多挑战:如何处理 Markdown 的格式化而不卡顿、如何优雅地处理断线重连、如何防止注入攻击。
但不要怕,一旦你搞懂了 Readable.from 和 ReadableStream,你就掌握了通往 Web 2.0 时代的钥匙。这把钥匙,就是数据在传输过程中的流动。
现在,去构建你的管道吧。让你的代码像尼罗河一样流淌起来!