React 与 后端 AI 推理引擎的集成:实现在 NestJS 后端调用 LLM 并将结果流式同步至 React 组件空间

流水线上的哲学家:React 与 NestJS 的 AI 流式传输深度指南

各位,下午好!我是你们今天的讲师。今天我们不聊那些花里胡哨的 UI 动画,也不聊怎么在堆栈溢出上买彩票。今天,我们要聊的是 AI 应用开发中那个最性感、最折磨人,但一旦搞定就爽翻天的话题——流式传输

想象一下,你去餐厅点了一碗面。传统的做法是,厨师把面煮好,盛到碗里,端到你面前。这个过程你很焦虑,你不知道面熟没熟,你只能盯着门口,等待那漫长的“叮”的一声。这叫阻塞调用

现在,想象一下另一种服务:你点了面,厨师说“好嘞,马上来”。然后,热气腾腾的面条就像一条小溪一样,顺着透明的管道,一根一根地(或者一小段一小段地)流到了你的碗里。你在吃第一口的时候,第二口、第三口已经在路上了。这种体验,就是我们要讲的流式传输

在 LLM(大语言模型)的世界里,如果模型要吐出几千个字,传统的“等它吐完再给你”的方式,用户早就关掉浏览器去刷 TikTok 了。所以,我们要做的,就是搭建一条从 NestJS 后端到 React 前端的“面条传输管道”。

好了,话不多说,让我们戴上安全帽,钻进这个管道。


第一章:NestJS 端——打造“喷泉”式输出

首先,我们要在后端搞事情。NestJS 这家伙,本来是用来写企业级后端的,架构清晰得像瑞士军刀。但在流式传输里,它就是那个负责“浇水”的园丁。

1.1 服务层的抽象:别把锅甩给 Controller

很多新手喜欢在 Controller 里直接写异步逻辑,比如 await generateStream()。千万别这样!Controller 只是个摆设,它应该负责路由、参数校验和响应头。真正的体力活,得交给 Service。

我们需要定义一个 Service,它的职责是“召唤”模型。

这里为了演示,我们假设我们在用 Ollama 或者 OpenAI 的 SDK。我们不需要真的连上模型,我们用一种“障眼法”来模拟生成的过程。但在真实世界里,你会看到类似这样的代码:

// llm.service.ts
import { Injectable } from '@nestjs/common';
import { OpenAiService } from './open-ai.service'; // 假设你有一个封装好的服务

@Injectable()
export class LlmService {
  constructor(private readonly openAiService: OpenAiService) {}

  async *streamCompletion(messages: any) {
    // 这里调用实际的 LLM SDK,通常是异步生成器
    // 例如: for await (const chunk of openAiService.chat.completions.create(...)) { yield chunk }

    // 下面是模拟代码,假装我们在思考,假装我们在吐字
    const response = "我是流式传输的示例文本。看,我正在一行一行地出现。";
    for (const char of response) {
      // 模拟网络延迟
      await new Promise(resolve => setTimeout(resolve, 100));
      yield char;
    }
  }
}

看懂了吗?async *yield。这叫生成器函数。它不一次性把所有数据内存都占满,而是像枪管一样,把子弹一颗颗打出去。

1.2 Controller:响应头的艺术

接下来是 Controller。当你开启流式传输,你就是在告诉浏览器:“嘿,哥们,这不仅仅是一个文本文件,这是一个流,请把我的响应头设成 SSE (Server-Sent Events) 或者仅仅是让管道保持打开。”

// app.controller.ts
import { Controller, Post, Body, Res } from '@nestjs/common';
import { Response } from 'express';
import { LlmService } from './llm.service';

@Controller('chat')
export class AppController {
  constructor(private readonly llmService: LlmService) {}

  @Post('stream')
  async streamChat(@Body() body: { message: string }, @Res() res: Response) {
    // 设置响应头,关键点来了!
    // 1. 让浏览器知道这是一个流
    res.setHeader('Content-Type', 'text/event-stream');
    // 2. 关闭缓存,流不能被浏览器缓存,要实时更新
    res.setHeader('Cache-Control', 'no-cache');
    // 3. 告诉浏览器这个连接支持重连
    res.setHeader('Connection', 'keep-alive');

    // 获取生成器
    const generator = this.llmService.streamCompletion([body.message]);

    // 我们需要一个迭代器来消费这个生成器
    const iterator = generator[Symbol.asyncIterator]();

    // 进入死循环,开始吐数据
    try {
      while (true) {
        const { value, done } = await iterator.next();

        if (done) {
          res.end(); // 没话说了,管道关闭
          break;
        }

        // 发送数据。注意,这里发送的是原始的字符串(或者特定的 SSE 格式)
        // 如果是用 SSE 协议,通常是 "data: xxxnn"
        // 为了简单演示,我们直接流式传输文本,前端用 TextDecoder 解析
        res.write(value);
      }
    } catch (error) {
      console.error('Stream error:', error);
      res.write(`[Error: ${error.message}]`);
      res.end();
    }
  }
}

1.3 深度解析:为什么 Readable.from 有时更好?

上面的 Controller 写法是经典的“迭代器模式”。但 NestJS 的生态里还有一个神器,那就是 Readable.from。它可以把一个异步生成器包装成一个 Node.js 的可读流。

为什么这很重要?因为 Node.js 的流处理非常强大。我们可以轻易地在流中间插入 TransformStream 来做数据清洗、脱敏或者格式化。

import { Readable } from 'stream';

@Post('stream-advanced')
async streamChatAdvanced(@Body() body: any, @Res() res: Response) {
  res.setHeader('Content-Type', 'text/plain');
  res.setHeader('Transfer-Encoding', 'chunked'); // 分块传输

  // 创建一个 Transform 流来处理数据
  // 比如我们想给每个句子加个 Emoji
  const emojiTransformer = new Transform({
    transform(chunk, encoding, callback) {
      // chunk 是 Buffer,我们把它转成字符串
      const str = chunk.toString();
      // 简单的替换逻辑(实际生产要更聪明点,防止破坏 HTML 标签)
      const emojiStr = str.replace(/。/g, "。n[Thinking...]"); 
      callback(null, emojiStr);
    }
  });

  // 从生成器创建可读流
  const readableStream = Readable.from(this.llmService.streamCompletion([body.message]));

  // 管道传输:生成器 -> Emoji 处理器 -> HTTP 响应
  readableStream.pipe(emojiTransformer).pipe(res);
}

你看,通过 pipe,代码变得像搭积木一样干净。这就是 NestJS 流式传输的精髓:不要手动去 await next(),要用流!


第二章:React 端——接住飞来的文字

前端怎么接?这其实比后端简单,但也容易踩坑。React 的核心在于状态更新,而流式传输最怕的就是“状态更新频率过高导致的性能杀手”。

2.1 传统的 fetch 与流式处理

如果不用特殊的库,原生 JS 的 fetch 就能搞定流式接收。

// ChatStream.tsx
import React, { useEffect, useState, useRef } from 'react';

const ChatStream: React.FC = () => {
  const [text, setText] = useState('');
  const abortControllerRef = useRef<AbortController | null>(null);

  const startStreaming = async () => {
    // 1. 每次开始新的请求,必须取消之前的请求(防止重复点击)
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
    }
    const controller = new AbortController();
    abortControllerRef.current = controller;

    // 2. 发起请求
    const response = await fetch('http://localhost:3000/chat/stream', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ message: "请用代码解释什么是流式传输" }),
      signal: controller.signal
    });

    // 3. 获取 Reader
    const reader = response.body?.getReader();
    if (!reader) return;

    // 4. 获取解码器
    const decoder = new TextDecoder('utf-8');

    try {
      while (true) {
        // 5. 读取数据块
        const { done, value } = await reader.read();

        if (done) break;

        // 6. 解码数据块
        const chunk = decoder.decode(value, { stream: true });

        // 7. 更新状态
        // 这里有个坑:React 的 setState 是异步的。
        // 如果数据流太快,频繁的 setState 会拖垮 UI 线程。
        // 解决方案:使用 requestAnimationFrame 或者简单的累加
        setText(prev => prev + chunk);
      }
    } catch (error) {
      console.error("Stream failed", error);
    }
  };

  return (
    <div>
      <button onClick={startStreaming}>发送请求</button>
      <div style={{ whiteSpace: 'pre-wrap', border: '1px solid #ccc', padding: '10px' }}>
        {text}
      </div>
    </div>
  );
};

export default ChatStream;

2.2 性能优化:拒绝卡顿

上面的代码能跑,但在高性能场景下,你会看到 CPU 飙升,页面卡顿。为什么?因为 setText(prev => prev + chunk) 每次都会触发 React 的 Diff 算法。

想象一下,如果流速是 100ms 一块,React 就要在 100ms 内重新渲染整个文本块,这太累了。

优化方案 1:使用 useEffect + AbortController 的终极版

不要在 useEffect 里写 await,那会阻塞渲染。我们用 useEffect 来监听状态变化?不,那更慢。

我们要用即时执行的模式,结合 requestAnimationFrame 来节流。

const ChatStreamOptimized: React.FC = () => {
  const [text, setText] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  const readerRef = useRef<ReadableStreamDefaultReader | null>(null);
  const decoderRef = useRef(new TextDecoder());

  const handleSend = async () => {
    if (isStreaming) return;
    setIsStreaming(true);
    // ... fetch 逻辑 ...
  };

  useEffect(() => {
    // 这个 effect 只在组件挂载时运行一次,用来处理流读取逻辑
    const runStream = async () => {
      // 这里假装我们已经拿到了 reader (从 props 或者 ref 传进来)
      const reader = readerRef.current;
      if (!reader) return;

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoderRef.current.decode(value, { stream: true });

        // 核心优化:直接操作 DOM,或者使用批量更新
        // 这里为了演示简单,我们还是用 setState,但加上 requestAnimationFrame 节流
        window.requestAnimationFrame(() => {
          setText(prev => prev + chunk);
        });
      }
      setIsStreaming(false);
    };

    runStream();
    return () => {
      // 清理函数:如果用户点关闭,一定要取消流
      if (readerRef.current) {
        readerRef.current.cancel();
      }
    };
  }, []); // 空依赖数组,确保只运行一次

  return (
    // ... JSX
  );
};

优化方案 2:虚拟滚动

如果你的文本有几万字,React 重新渲染几万个 DOM 节点绝对是灾难。这时候,你需要 react-windowreact-virtualized。但通常流式传输的文本不会那么长,所以这里就点到为止。

2.3 Markdown 渲染:给文字加个“外衣”

原生的 white-space: pre-wrap 只能让文字换行,但它不懂代码高亮。为了让你的 AI 应用看起来像个正经的 ChatGPT,你需要集成一个 Markdown 解析库。

推荐 react-markdown。它天然支持流式传输!

import ReactMarkdown from 'react-markdown';

// 在组件里
<ReactMarkdown>
  {text}
</ReactMarkdown>

等等,ReactMarkdown 会等待所有文本生成完毕才开始渲染吗?不,它支持 remark-gfm 和流式更新。只要 text 状态更新了,它就会重新计算 Markdown。虽然这比原生文本重绘稍微重一点,但对于 Markdown 来说是值得的。


第三章:深度集成——NestJS 与 React 的“私奔”

好了,现在让我们把这些碎片拼起来。我们需要一个完整的、能跑的、甚至稍微带点“黑客”气质的示例。

3.1 后端:NestJS 完整实现

我们要做一个支持 Markdown 预处理(简单替换)和 SSE 格式的端点。

// src/stream.controller.ts
import { Controller, Post, Body, Res } from '@nestjs/common';
import { Response } from 'express';
import { LlmService } from './llm.service';

@Controller('stream')
export class StreamController {
  constructor(private readonly llmService: LlmService) {}

  @Post()
  async streamResponse(@Body() body: { prompt: string }, @Res() res: Response) {
    // 设置 SSE 头
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');

    // 获取生成器
    const generator = this.llmService.streamGenerator(body.prompt);

    // 转换为 Node.js Stream
    const { Readable } = require('stream');
    const readable = Readable.from(generator);

    // 直接管道传输。NestJS 会自动处理响应体。
    readable.pipe(res);
  }
}

3.2 前端:React 组件实现

这个组件封装了所有逻辑,包括错误处理和加载状态。

// src/components/StreamChat.tsx
import React, { useState, useEffect, useRef } from 'react';

const StreamChat: React.FC = () => {
  const [input, setInput] = useState('');
  const [output, setOutput] = useState('');
  const [loading, setLoading] = useState(false);

  // 使用 ref 存储 AbortController,防止闭包陷阱
  const abortControllerRef = useRef<AbortController | null>(null);
  const readerRef = useRef<ReadableStreamDefaultReader<Uint8Array> | null>(null);

  const handleSend = async () => {
    if (!input.trim()) return;

    setLoading(true);
    setOutput(''); // 清空旧消息
    setInput('');  // 清空输入框

    // 1. 创建 AbortController
    const controller = new AbortController();
    abortControllerRef.current = controller;

    try {
      const response = await fetch('http://localhost:3000/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ prompt: input }),
        signal: controller.signal
      });

      if (!response.body) throw new Error('Response body is null');

      // 2. 获取 Reader
      readerRef.current = response.body.getReader();
      const decoder = new TextDecoder('utf-8');

      // 3. 循环读取
      while (true) {
        const { done, value } = await readerRef.current!.read();

        if (done) break;

        // 4. 解码并更新状态
        const chunk = decoder.decode(value, { stream: true });
        setOutput((prev) => prev + chunk);
      }
    } catch (error: any) {
      if (error.name !== 'AbortError') {
        console.error('Stream error:', error);
        setOutput((prev) => `${prev}n[系统错误: ${error.message}]`);
      }
    } finally {
      setLoading(false);
      readerRef.current = null;
    }
  };

  // 组件卸载时取消请求(重要!)
  useEffect(() => {
    return () => {
      if (abortControllerRef.current) {
        abortControllerRef.current.abort();
      }
      if (readerRef.current) {
        readerRef.current.cancel();
      }
    };
  }, []);

  return (
    <div style={{ maxWidth: '800px', margin: '0 auto', padding: '20px' }}>
      <div style={{ marginBottom: '20px' }}>
        <textarea
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="输入你的问题..."
          disabled={loading}
          rows={3}
          style={{ width: '100%' }}
        />
        <button 
          onClick={handleSend} 
          disabled={loading || !input.trim()}
          style={{ marginTop: '10px', padding: '8px 16px', cursor: loading ? 'not-allowed' : 'pointer' }}
        >
          {loading ? '思考中...' : '发送'}
        </button>
      </div>

      <div style={{ 
        border: '1px solid #ccc', 
        borderRadius: '8px', 
        padding: '16px', 
        minHeight: '200px',
        background: '#f9f9f9' 
      }}>
        {loading && !output && <div>🤖 正在接入神经网络...</div>}
        <div style={{ whiteSpace: 'pre-wrap' }}>{output}</div>
      </div>
    </div>
  );
};

export default StreamChat;

第四章:那些年我们踩过的“坑”

讲到这里,你以为这就结束了?天真。在实际工程中,流式传输就是一座布满地雷的花园。

4.1 “乱码”现象:字符被切断了

有时候,你会看到这样的现象:

“你好,我是流式传输的示例文本。你看,我正在一行一行地出现。[缺失字符]…[缺失字符]…”

为什么会这样?
假设模型正在发送“你好,世界”。

  1. 客户端收到了“你好,”(存入缓冲区)。
  2. 网络断了,或者浏览器还没来得及处理,连接关闭。
  3. “世界”这俩字丢在了服务器里。

解决之道: 不要直接把原始字节写回给前端。在 NestJS 端,或者在前端,都要注意“换行符”的处理。很多 SDK 会自动处理这个问题,确保每个事件(event)都是完整的。但在自定义流管道时,你要小心,不要在 UTF-8 字符的中间切断。

4.2 CORS 跨域地狱

如果你开发 React 应用和 NestJS 应用不在同一个端口(比如前端跑在 3000,后端跑在 3001),流式传输很容易被 CORS 拦截。

NestJS 的 CORS 默认配置通常是 credentials: true,这对流式传输来说是致命的,因为它会要求每次请求都带 cookie,导致流传输被浏览器策略阻断。

解决方法: 在 NestJS 中配置 CORS:

app.enableCors({
  origin: 'http://localhost:3000',
  methods: 'GET,HEAD,PUT,PATCH,POST,DELETE,OPTIONS',
  credentials: false, // 流式传输通常不需要凭证,设为 false
});

4.3 超时设置

流式传输是一个长连接。如果你设置了默认的 Nginx 或反向代理超时(比如 60 秒),而模型推理要 3 分钟,那么连接会在第 61 秒断开,前端会收到一个错误。

解决方法:

  1. Nginx 配置: proxy_read_timeout 300s;
  2. NestJS 配置: 如果使用 Socket.IO(WebSocket 模式),要设置 pingTimeout。如果是 HTTP 长轮询或 SSE,要确保客户端不因为超时报错。

4.4 React 状态更新的“频率诅咒”

如果模型吐字速度是 50ms 一个字,那么 React 每 50ms 就要触发一次 render。这会导致浏览器主线程疯狂忙碌,导致页面滚动卡顿,甚至导致输入框无法输入(因为输入事件也被阻塞了)。

终极解决方案:React.memo + useCallback 优化组件渲染

虽然流式更新最核心的是父组件更新状态,但如果你把 Markdown 渲染器放在子组件里,并且用了 React.memo,那么每次 text 变化时,整个 Markdown 组件会重新渲染。如果 Markdown 解析很重,这会卡死页面。

更好的做法:

  1. 粗粒度更新: 比如每收到 50 个字符才更新一次 DOM。
  2. 虚拟化流: 使用 react-virtuoso 之类的库,只渲染视口内的文本。

第五章:实战案例——构建一个“智能家居控制”流

为了巩固我们的知识,我们来做一个稍微高级点的例子。

场景: 用户问 AI “我的灯光怎么开?”,AI 不会只回答文字,它会像终端一样,逐行输出命令,并自动执行。

后端逻辑:
我们需要在流里注入“系统命令”。

// LlmService 的模拟扩展
async *streamSmartHome(prompt) {
  const instructions = "你是一个智能家居控制助手。";
  yield instructions + "n";

  for (const char of "Executing: curl -X POST http://192.168.1.100/light/on") {
    await new Promise(r => setTimeout(r, 100));
    yield char;
  }

  // 模拟返回结果
  yield "nResult: Success. Brightness set to 80%.n";
  yield "Executing: curl -X POST http://192.168.1.100/sensor/temp";

  for (const char of "Temp: 24.5C") {
    await new Promise(r => setTimeout(r, 100));
    yield char;
  }
}

前端逻辑:
我们需要区分“用户指令”和“系统日志”。我们可以用一个正则或者状态机来检测。

// 简单的流处理器
let buffer = '';
const reader = stream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  buffer += new TextDecoder().decode(value);

  // 检测是否包含指令关键词
  if (buffer.includes('Executing:')) {
    // 拆分输出
    const parts = buffer.split('Executing:');
    // 这里逻辑比较复杂,实际开发中通常让 AI 输出 JSON 格式
    // 例如: {"type": "action", "command": "turn_on_light"}

    // 简单演示:找到最后一个 "Executing:"
    const lastMatch = buffer.match(/Executing:(.*)$/);
    if (lastMatch) {
        const command = lastMatch[1].trim();
        // 触发前端逻辑
        executeCommand(command); 
        // 清除 buffer 中的已处理部分,防止重复处理
        buffer = buffer.substring(0, buffer.lastIndexOf('Executing:'));
    }
  }
}

结语:通往未来的桥梁

各位,今天我们走过了从后端生成器到前端解码器的整个技术栈。流式传输不仅仅是技术实现,它是一种用户体验哲学。

当用户看到文字像瀑布一样流下来,他们感受到的是一种“掌控感”,一种“实时交互感”。这种反馈机制,是 AI 应用区别于传统搜索引擎和静态文档的 DNA。

当然,构建一个完美的流式 AI 应用还面临着许多挑战:如何处理 Markdown 的格式化而不卡顿、如何优雅地处理断线重连、如何防止注入攻击。

但不要怕,一旦你搞懂了 Readable.fromReadableStream,你就掌握了通往 Web 2.0 时代的钥匙。这把钥匙,就是数据在传输过程中的流动

现在,去构建你的管道吧。让你的代码像尼罗河一样流淌起来!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注