tRPC 订阅模式(Subscriptions):利用 tRPC 与 React 协同处理长连接数据的类型分发

tRPC 订阅模式:当类型系统遇见长连接 —— 一次关于实时数据的“深度调情”

各位前端同仁,大家好!

欢迎来到今天的讲座。我是你们的向导,今天我们要聊的是 tRPC 的一个“隐秘角落”——订阅模式。在开始之前,我想问大家一个问题:你们爱过 HTTP 吗?

当然,HTTP 是个可靠的老伙计,是个勤勤恳恳的邮递员,每天早晚各来一次(GET)和一次(POST)。但你们有没有想过,如果这位邮递员不再来敲门,而是直接从你的窗户爬进来,实时地往你手里塞信,那该多好?或者更糟糕,如果他总是三天两头才来一次,让你在焦急的等待中,对着屏幕上“加载中”的转圈圈怀疑人生,那你就得考虑换个“约会对象”了。

这就是为什么我们需要 tRPC 的订阅功能。今天,我们要把 HTTP 这根“单线联系”升级为 Socket 连接这种“长期饭票”。我们要让 TypeScript 的类型系统,穿过传输层,在长连接里依然保持精准和严苛。

准备好了吗?让我们把代码写起来,把逻辑跑起来。


第一部分:告别轮询的“垃圾邮件”时代

首先,我们要明白为什么要引入订阅。在传统的 REST API 或者旧版 tRPC 中,我们最常用的模式是“轮询”。

想象一下,你在一个实时聊天室里,或者在看股票K线图。你会怎么做?你写一个 setInterval,每隔 5 秒钟就向服务器扔一个请求:“嘿,老板,有新消息吗?”“老板,跌了吗?”“老板,涨了吗?”

这简直是网络层面的垃圾邮件轰炸。服务器很累,你的电费也在流泪,而用户端还在那个“老掉牙”的 loading 状态里瑟瑟发抖。延迟?那是肯定有的。

现在,tRPC 提供了 subscriptions。简单来说,就是服务器主动推送

在 tRPC 的世界里,订阅不仅仅是一个功能,它是对类型安全的一次挑战和胜利。我们要做的,就是在客户端建立一条通往服务器的“专线”,利用 React 的 useSubscription 钩子,让数据像流水一样流进我们的组件里。


第二部分:搭建舞台 —— tRPC + Socket.io

虽然 tRPC 本身是协议无关的,但它通常需要配合一个传输层来承载长连接。在这个教程里,我们选择 Socket.io,因为它在 Node.js 生态里最顺手,而且自带重连、心跳检测这些贴心功能(像极了那个不离不弃的备胎)。

1. 安装依赖

首先,我们需要一些工具包。不要在 package.json 里瞎填,听我的,按这个顺序来:

npm install @trpc/server @trpc/client @trpc/react @trpc/serveradapter socket.io
npm install -D @types/node socket.io-client

2. 服务端:定义 Router 和 Context

服务端是我们的“哨兵”。它不仅要处理普通的 CRUD(增删改查),还要通过 observable 模式来发射数据。

注意:在 tRPC v10/v11 中,router 的结构发生了微调,我们需要确保 contextmiddlewareprocedure 分离开来。

// src/server/trpc/index.ts
import { applyWSSHandler } from "@trpc/server/adapters/ws";
import { createTRPCServer } from "./server"; // 我们稍后会创建这个
import { WebSocketServer } from "ws";
import http from "http";

// 1. 创建 HTTP 服务器(Socket.io 需要)
const server = http.createServer();
const wss = new WebSocketServer({ server });

// 2. 创建 tRPC 服务器实例
const appRouter = createTRPCServer();

// 3. 应用 WebSocket 处理器
applyWSSHandler({
  wss,
  router: appRouter,
});

server.listen(3000, () => {
  console.log("服务器正在运行在 http://localhost:3000");
});

现在,我们来看核心部分:createTRPCServer

// src/server/trpc/server.ts
import { initTRPC, TRPCError } from "@trpc/server";
import * as trpcWebSocket from "@trpc/server/adapters/ws"; // 引入 WebSocket 适配器
import { z } from "zod";

// 上下文:tRPC 每次调用都会携带的信息
export interface Context {
  // 在这里你可以放入 session, db, headers 等
  userId: string;
}

// 初始化 tRPC
const t = initTRPC.context<Context>().create();

// 中间件:确保用户登录
const isAuthed = t.middleware(({ ctx, next }) => {
  if (!ctx.userId) {
    throw new TRPCError({ code: "UNAUTHORIZED" });
  }
  return next({ ctx: { ...ctx, userId: ctx.userId } });
});

// 路由定义
const appRouter = t.router({
  // 这是一个普通的 procedure
  hello: t.procedure
    .input(z.object({ name: z.string() }))
    .query(({ input }) => {
      return { greeting: `Hello, ${input.name}` };
    }),

  // --- 核心部分来了:Subscription ---
  onNewMessage: t.procedure
    .use(isAuthed) // 订阅也需要鉴权
    .subscription(() => {
      return {
        // 这是一个工厂函数,返回 Observable
        // 核心思想:什么时候发射数据?怎么发射数据?

        // 假设我们有一个全局的消息存储(真实场景下可能是数据库)
        // 为了演示,我们创建一个“假”的 observable
        async* onData() {
          // 这里可以写业务逻辑
          // 比如:监听数据库的 Change Streams,或者 Redis 的 Pub/Sub

          // 模拟每秒推送一条消息
          let count = 0;
          const interval = setInterval(() => {
            count++;
            yield {
              id: count,
              text: `这是第 ${count} 条实时消息`,
              timestamp: new Date(),
            };
          }, 1000);

          // 5秒后停止,模拟一个会话
          setTimeout(() => {
            clearInterval(interval);
            // 这是一个特殊的结束信号
            yield { done: true };
          }, 5000);
        },
      };
    }),
});

export type AppRouter = typeof appRouter;
export default appRouter;

关键点解析
看第 81 行,subscription 返回了一个对象,里面有一个 onData 方法,它是一个 Async Generator(异步生成器)。这非常关键!yield 就是发射数据。tRPC 会自动将这个 Async Generator 包装成一个 Observable,然后通过 WebSocket 发送给客户端。


第三部分:客户端—— 连上专线

客户端的代码同样重要。我们需要告诉 tRPC:“嘿,别用 fetch 了,用 WebSocket。”

1. 设置 Socket.io 客户端连接

我们需要先创建一个 socket.io-client 的实例,然后把这个实例传给 tRPC 的客户端创建函数。

// src/client/trpc.ts
import { createTRPCClient, httpBatchLink, wsLink } from "@trpc/client";
import { createTRPCReact } from "@trpc/react";
import type { AppRouter } from "./server";
import { io } from "socket.io-client";

// 1. 创建 Socket.io 实例
const socket = io("ws://localhost:3000", {
  transports: ["websocket"], // 强制使用 WebSocket
});

// 2. 创建 tRPC 客户端
// 注意:这里我们需要配置链接,分别处理长连接和普通请求
const client = createTRPCClient<AppRouter>({
  links: [
    wsLink({
      socket, // 传入 socket 实例
    }),
    httpBatchLink({
      url: "http://localhost:3000",
    }),
  ],
});

// 3. 创建 React Hooks
export const trpc = createTRPCReact<AppRouter>();

2. 类型体操:trpc.client.subscription

这里有一个神奇的地方。注意看第 33 行,我们没有写 trpc.onNewMessage,而是写了 trpc.client.subscription.onNewMessage

这是因为 tRPC v10 的设计理念是将“订阅”作为客户端代理的一个特殊属性。这允许 tRPC 在类型层面区分“只读”和“实时”接口。

// 类型定义暗示:
// t.client.procedure -> {
//   query: ...
//   mutation: ...
// }
// t.client.subscription -> {
//   mySubscription: ...
// }

第四部分:React 中的魔法 —— useSubscription

现在,我们终于可以把 React 组件拉进来了。tRPC 的 React 集成提供了 useSubscription 这个钩子。它就像是把 WebSocket 的数据流挂载到了 React 的 State 生命周期上。

// src/components/RealTimePanel.tsx
import { trpc } from "../client/trpc";
import { useSubscription } from "@trpc/react";

export const RealTimePanel = () => {
  // 1. 配置订阅
  // input 是可选的,就像普通查询一样,你可以传参进去
  const subscription = trpc.client.subscription.onNewMessage;

  useSubscription(subscription, {
    onData: (data) => {
      // 数据来了!
      console.log("收到了推送的数据:", data);

      // 这里我们可以把数据添加到本地状态数组中
      addMessageToState(data);
    },
    onError: (err) => {
      console.error("订阅出错了:", err);
      // 处理错误,比如断线重连逻辑
    },
    onComplete: () => {
      console.log("连接关闭或订阅结束");
    },
  });

  return (
    <div>
      <h2>实时消息流</h2>
      {/* 这里渲染消息列表 */}
      {messages.map(msg => <div key={msg.id}>{msg.text}</div>)}
    </div>
  );
};

为什么要这样写?

这种方式非常符合 React 的直觉。onData 就像是 useEffect 的回调,但是它更高级——它是一个流式回调。你不需要自己管理 setInterval,也不需要处理重连逻辑,useSubscription 会帮你搞定。


第五部分:深入类型系统—— 为什么它这么爽?

这是我最喜欢的部分。tRPC 的订阅之所以强大,是因为它保持了全栈的类型一致性。

让我们看看服务端定义的类型:

// 服务端:返回结构是 { id, text, timestamp }
yield {
  id: number;
  text: string;
  timestamp: Date;
};

当我们写 trpc.client.subscription.onNewMessage 时,tRPC 会自动推断出返回类型。在 useSubscriptiononData 回调中,TypeScript 会智能提示你:

onData: (data) => {
  // data 自动推导为 { id: number; text: string; timestamp: Date; }
  // 如果你手抖写成了 data.foo,TypeScript 会立刻报错!
  console.log(data.timestamp); // ✅ 自动补全生效
  console.log(data.randomField); // ❌ 编译错误:Property 'randomField' does not exist
}

这就好比你在服务端写文档,客户端自动生成文档并强制执行。这就是“类型分发”。即使数据是乱序到达的、异步加载的,类型系统依然忠实地守护着你的代码。


第六部分:实战场景—— 构建一个“在线协作白板”

光说不练假把式。我们来做一个稍微复杂点的例子:多人协作白板

假设用户 A 在白板上画了一个点,用户 B 必须立刻看到。

1. 服务端逻辑

我们需要一个全局的 whiteboardData(可以用 Redis 或内存 Map)。当服务端检测到数据变化时,通过 subscription 推送给所有订阅了该画板 ID 的用户。

// 伪代码:在 router 中
whiteboard: t.router({
  // 订阅整个画板
  subscribe: t.procedure
    .input(z.object({ boardId: z.string() }))
    .subscription(async function* ({ input }) {
      const board = getBoard(input.boardId);

      // 监听数据库变更
      board.onChange((newData) => {
        // yield 是这里的核心,tRPC 会自动把这个 yield 包装成 WebSocket 消息
        yield newData;
      });
    }),

  // 用户画点
  drawPoint: t.procedure
    .input(z.object({ x: z.number(), y: z.number(), color: z.string() }))
    .mutation(async ({ input }) => {
      const board = getBoard(input.boardId);
      // 保存数据...
      return { success: true };
    }),
}),

2. 客户端逻辑

用户 A 调用 drawPoint,用户 B 的 useSubscription 瞬间收到数据并重绘 Canvas。

const { drawPoint } = trpc.whiteboard.drawPoint.useMutation();
const { subscribe } = trpc.client.subscription.whiteboard.subscribe;

const { data: points, isLoading } = trpc.useSubscription(
  subscribe({ boardId: "board-123" }),
  {
    onData: (point) => {
      // 直接更新 Canvas 状态
      addPointToCanvas(point);
    },
  }
);

const handleCanvasClick = (x, y) => {
  drawPoint({ x, y, color: "red" });
};

这个流程完美吗?并不完美。这里有一个经典的冲突问题

如果用户 B 画了一个点,还没来得及推送到 A(假设网络延迟),A 又画了一个点,A 的数据覆盖了 B 的数据。这就导致了“最终一致性”的问题。

在 tRPC 订阅中,我们通常配合乐观 UI版本号来解决。


第七部分:进阶技巧—— 乐观更新与去抖动

1. 乐观更新

在实时应用中,我们需要一种“感觉”比真实更快的技术。当用户点击“发送”时,我们不要等服务器确认,先把它放到屏幕上,然后通过订阅把服务器的“真实数据”推回来覆盖它。

const sendMessage = trpc.chat.sendMessage.useMutation({
  onMutate: async (newMsg) => {
    // 1. 取消掉所有的后续订阅(防止旧消息插入)
    cancelQueries(['messages']);

    // 2. 保存当前状态(回滚点)
    const previousMessages = queryClient.getQueryData(['messages']);

    // 3. 立即更新本地状态(乐观 UI)
    queryClient.setQueryData(['messages'], (old: any) => 
      [newMsg, ...(old || [])]
    );

    // 4. 返回上下文,以便发生错误时回滚
    return { previousMessages };
  },
  onError: (err, newMsg, context) => {
    // 出错了?回滚!
    queryClient.setQueryData(['messages'], context.previousMessages);
  },
  onSettled: () => {
    // 无论成功失败,都刷新数据,或者如果使用了订阅,订阅会自动补全数据
    queryClient.invalidateQueries(['messages']);
  },
});

2. 防抖与缓冲

客户端可能因为网络抖动收到乱序的数据,或者服务端推送得太快,UI 渲染不出来。

我们可以用 useSubscription 的配置项:

useSubscription(subscription, {
  onData: debounce((data) => {
    // 防抖动处理,防止高频推送导致的重渲染
    updateUI(data);
  }, 100),
});

或者,在服务端使用 RxJS 的 bufferdebounceTime 操作符来处理流。


第八部分:常见陷阱与调试

在使用 tRPC 订阅时,有几个坑很容易踩。

陷阱一:Context 丢失
在订阅中,context(比如用户 ID)是非常重要的。确保你在服务端的 createContext 里正确传递了信息,并且你的订阅中间件正确使用了 use(isAuthed)。如果鉴权失败,Socket 连接可能会断开。

陷阱二:内存泄漏
如果你在一个组件里启动了订阅,却在组件卸载时没有正确清理,浏览器可能会因为 WebSocket 保留的连接而崩溃。
React Query (tRPC 的底层) 有时候会自动处理这个问题,但如果你手动使用了 socket.disconnect(),请务必小心。

陷阱三:TypeScript 类型推断错误
如果你发现 trpc.client.subscriptionundefined,那通常是 tRPC 版本的问题,或者是 createTRPCReact 没有正确导出。


第九部分:未来展望 —— 不仅仅是 WebSocket

tRPC 订阅最酷的地方在于它的抽象性。我们目前用的是 Socket.io,但如果你以后想换成 Pusher,或者原生 WebSockets,你的服务端 tRPC Router 定义代码几乎不需要改动,只需要替换底层的 wsLink 配置即可。

这就意味着,你可以把这套逻辑从 Web 应用移植到移动端(React Native),甚至移植到 Electron 桌面应用中。只要这些环境能跑 Socket.io,你就能享受完全一致的类型安全体验。


总结:为什么你应该现在就拥抱订阅

好,讲了这么多,我们总结一下。

  1. 类型安全是王道:在订阅中,你依然能享受 TS 带来的快乐。你不需要写接口文档,编译器会帮你盯着。
  2. 性能提升:告别轮询。你的服务器压力小了,用户的体验好了,电费也省了。
  3. 架构解耦:你不需要为了实时数据专门写一套 API(REST)和一套实时协议。tRPC 把它们统一了。

tRPC 的订阅模式就像是给 React 的大脑装上了一根神经末梢。它不再是被动地等待指令,而是能主动感知世界的变化。这对于构建现代 Web 应用——从实时协作工具到金融交易系统——都是必不可少的武器。

所以,下次当你写那个 setInterval 的时候,请停下来,看看你的 tRPC 文档,想一想 Socket.io,想一想那个 Async Generator 的 yield。也许你会发现,原来交互还可以这么丝滑。

好了,代码都写好了,Socket 都连上了。现在,打开你的浏览器,去接收来自服务器的第一个推送吧。别客气,这数据可是自带类型的!

谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注