tRPC 订阅模式:当类型系统遇见长连接 —— 一次关于实时数据的“深度调情”
各位前端同仁,大家好!
欢迎来到今天的讲座。我是你们的向导,今天我们要聊的是 tRPC 的一个“隐秘角落”——订阅模式。在开始之前,我想问大家一个问题:你们爱过 HTTP 吗?
当然,HTTP 是个可靠的老伙计,是个勤勤恳恳的邮递员,每天早晚各来一次(GET)和一次(POST)。但你们有没有想过,如果这位邮递员不再来敲门,而是直接从你的窗户爬进来,实时地往你手里塞信,那该多好?或者更糟糕,如果他总是三天两头才来一次,让你在焦急的等待中,对着屏幕上“加载中”的转圈圈怀疑人生,那你就得考虑换个“约会对象”了。
这就是为什么我们需要 tRPC 的订阅功能。今天,我们要把 HTTP 这根“单线联系”升级为 Socket 连接这种“长期饭票”。我们要让 TypeScript 的类型系统,穿过传输层,在长连接里依然保持精准和严苛。
准备好了吗?让我们把代码写起来,把逻辑跑起来。
第一部分:告别轮询的“垃圾邮件”时代
首先,我们要明白为什么要引入订阅。在传统的 REST API 或者旧版 tRPC 中,我们最常用的模式是“轮询”。
想象一下,你在一个实时聊天室里,或者在看股票K线图。你会怎么做?你写一个 setInterval,每隔 5 秒钟就向服务器扔一个请求:“嘿,老板,有新消息吗?”“老板,跌了吗?”“老板,涨了吗?”
这简直是网络层面的垃圾邮件轰炸。服务器很累,你的电费也在流泪,而用户端还在那个“老掉牙”的 loading 状态里瑟瑟发抖。延迟?那是肯定有的。
现在,tRPC 提供了 subscriptions。简单来说,就是服务器主动推送。
在 tRPC 的世界里,订阅不仅仅是一个功能,它是对类型安全的一次挑战和胜利。我们要做的,就是在客户端建立一条通往服务器的“专线”,利用 React 的 useSubscription 钩子,让数据像流水一样流进我们的组件里。
第二部分:搭建舞台 —— tRPC + Socket.io
虽然 tRPC 本身是协议无关的,但它通常需要配合一个传输层来承载长连接。在这个教程里,我们选择 Socket.io,因为它在 Node.js 生态里最顺手,而且自带重连、心跳检测这些贴心功能(像极了那个不离不弃的备胎)。
1. 安装依赖
首先,我们需要一些工具包。不要在 package.json 里瞎填,听我的,按这个顺序来:
npm install @trpc/server @trpc/client @trpc/react @trpc/serveradapter socket.io
npm install -D @types/node socket.io-client
2. 服务端:定义 Router 和 Context
服务端是我们的“哨兵”。它不仅要处理普通的 CRUD(增删改查),还要通过 observable 模式来发射数据。
注意:在 tRPC v10/v11 中,router 的结构发生了微调,我们需要确保 context 和 middleware 与 procedure 分离开来。
// src/server/trpc/index.ts
import { applyWSSHandler } from "@trpc/server/adapters/ws";
import { createTRPCServer } from "./server"; // 我们稍后会创建这个
import { WebSocketServer } from "ws";
import http from "http";
// 1. 创建 HTTP 服务器(Socket.io 需要)
const server = http.createServer();
const wss = new WebSocketServer({ server });
// 2. 创建 tRPC 服务器实例
const appRouter = createTRPCServer();
// 3. 应用 WebSocket 处理器
applyWSSHandler({
wss,
router: appRouter,
});
server.listen(3000, () => {
console.log("服务器正在运行在 http://localhost:3000");
});
现在,我们来看核心部分:createTRPCServer。
// src/server/trpc/server.ts
import { initTRPC, TRPCError } from "@trpc/server";
import * as trpcWebSocket from "@trpc/server/adapters/ws"; // 引入 WebSocket 适配器
import { z } from "zod";
// 上下文:tRPC 每次调用都会携带的信息
export interface Context {
// 在这里你可以放入 session, db, headers 等
userId: string;
}
// 初始化 tRPC
const t = initTRPC.context<Context>().create();
// 中间件:确保用户登录
const isAuthed = t.middleware(({ ctx, next }) => {
if (!ctx.userId) {
throw new TRPCError({ code: "UNAUTHORIZED" });
}
return next({ ctx: { ...ctx, userId: ctx.userId } });
});
// 路由定义
const appRouter = t.router({
// 这是一个普通的 procedure
hello: t.procedure
.input(z.object({ name: z.string() }))
.query(({ input }) => {
return { greeting: `Hello, ${input.name}` };
}),
// --- 核心部分来了:Subscription ---
onNewMessage: t.procedure
.use(isAuthed) // 订阅也需要鉴权
.subscription(() => {
return {
// 这是一个工厂函数,返回 Observable
// 核心思想:什么时候发射数据?怎么发射数据?
// 假设我们有一个全局的消息存储(真实场景下可能是数据库)
// 为了演示,我们创建一个“假”的 observable
async* onData() {
// 这里可以写业务逻辑
// 比如:监听数据库的 Change Streams,或者 Redis 的 Pub/Sub
// 模拟每秒推送一条消息
let count = 0;
const interval = setInterval(() => {
count++;
yield {
id: count,
text: `这是第 ${count} 条实时消息`,
timestamp: new Date(),
};
}, 1000);
// 5秒后停止,模拟一个会话
setTimeout(() => {
clearInterval(interval);
// 这是一个特殊的结束信号
yield { done: true };
}, 5000);
},
};
}),
});
export type AppRouter = typeof appRouter;
export default appRouter;
关键点解析:
看第 81 行,subscription 返回了一个对象,里面有一个 onData 方法,它是一个 Async Generator(异步生成器)。这非常关键!yield 就是发射数据。tRPC 会自动将这个 Async Generator 包装成一个 Observable,然后通过 WebSocket 发送给客户端。
第三部分:客户端—— 连上专线
客户端的代码同样重要。我们需要告诉 tRPC:“嘿,别用 fetch 了,用 WebSocket。”
1. 设置 Socket.io 客户端连接
我们需要先创建一个 socket.io-client 的实例,然后把这个实例传给 tRPC 的客户端创建函数。
// src/client/trpc.ts
import { createTRPCClient, httpBatchLink, wsLink } from "@trpc/client";
import { createTRPCReact } from "@trpc/react";
import type { AppRouter } from "./server";
import { io } from "socket.io-client";
// 1. 创建 Socket.io 实例
const socket = io("ws://localhost:3000", {
transports: ["websocket"], // 强制使用 WebSocket
});
// 2. 创建 tRPC 客户端
// 注意:这里我们需要配置链接,分别处理长连接和普通请求
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
socket, // 传入 socket 实例
}),
httpBatchLink({
url: "http://localhost:3000",
}),
],
});
// 3. 创建 React Hooks
export const trpc = createTRPCReact<AppRouter>();
2. 类型体操:trpc.client.subscription
这里有一个神奇的地方。注意看第 33 行,我们没有写 trpc.onNewMessage,而是写了 trpc.client.subscription.onNewMessage。
这是因为 tRPC v10 的设计理念是将“订阅”作为客户端代理的一个特殊属性。这允许 tRPC 在类型层面区分“只读”和“实时”接口。
// 类型定义暗示:
// t.client.procedure -> {
// query: ...
// mutation: ...
// }
// t.client.subscription -> {
// mySubscription: ...
// }
第四部分:React 中的魔法 —— useSubscription
现在,我们终于可以把 React 组件拉进来了。tRPC 的 React 集成提供了 useSubscription 这个钩子。它就像是把 WebSocket 的数据流挂载到了 React 的 State 生命周期上。
// src/components/RealTimePanel.tsx
import { trpc } from "../client/trpc";
import { useSubscription } from "@trpc/react";
export const RealTimePanel = () => {
// 1. 配置订阅
// input 是可选的,就像普通查询一样,你可以传参进去
const subscription = trpc.client.subscription.onNewMessage;
useSubscription(subscription, {
onData: (data) => {
// 数据来了!
console.log("收到了推送的数据:", data);
// 这里我们可以把数据添加到本地状态数组中
addMessageToState(data);
},
onError: (err) => {
console.error("订阅出错了:", err);
// 处理错误,比如断线重连逻辑
},
onComplete: () => {
console.log("连接关闭或订阅结束");
},
});
return (
<div>
<h2>实时消息流</h2>
{/* 这里渲染消息列表 */}
{messages.map(msg => <div key={msg.id}>{msg.text}</div>)}
</div>
);
};
为什么要这样写?
这种方式非常符合 React 的直觉。onData 就像是 useEffect 的回调,但是它更高级——它是一个流式回调。你不需要自己管理 setInterval,也不需要处理重连逻辑,useSubscription 会帮你搞定。
第五部分:深入类型系统—— 为什么它这么爽?
这是我最喜欢的部分。tRPC 的订阅之所以强大,是因为它保持了全栈的类型一致性。
让我们看看服务端定义的类型:
// 服务端:返回结构是 { id, text, timestamp }
yield {
id: number;
text: string;
timestamp: Date;
};
当我们写 trpc.client.subscription.onNewMessage 时,tRPC 会自动推断出返回类型。在 useSubscription 的 onData 回调中,TypeScript 会智能提示你:
onData: (data) => {
// data 自动推导为 { id: number; text: string; timestamp: Date; }
// 如果你手抖写成了 data.foo,TypeScript 会立刻报错!
console.log(data.timestamp); // ✅ 自动补全生效
console.log(data.randomField); // ❌ 编译错误:Property 'randomField' does not exist
}
这就好比你在服务端写文档,客户端自动生成文档并强制执行。这就是“类型分发”。即使数据是乱序到达的、异步加载的,类型系统依然忠实地守护着你的代码。
第六部分:实战场景—— 构建一个“在线协作白板”
光说不练假把式。我们来做一个稍微复杂点的例子:多人协作白板。
假设用户 A 在白板上画了一个点,用户 B 必须立刻看到。
1. 服务端逻辑
我们需要一个全局的 whiteboardData(可以用 Redis 或内存 Map)。当服务端检测到数据变化时,通过 subscription 推送给所有订阅了该画板 ID 的用户。
// 伪代码:在 router 中
whiteboard: t.router({
// 订阅整个画板
subscribe: t.procedure
.input(z.object({ boardId: z.string() }))
.subscription(async function* ({ input }) {
const board = getBoard(input.boardId);
// 监听数据库变更
board.onChange((newData) => {
// yield 是这里的核心,tRPC 会自动把这个 yield 包装成 WebSocket 消息
yield newData;
});
}),
// 用户画点
drawPoint: t.procedure
.input(z.object({ x: z.number(), y: z.number(), color: z.string() }))
.mutation(async ({ input }) => {
const board = getBoard(input.boardId);
// 保存数据...
return { success: true };
}),
}),
2. 客户端逻辑
用户 A 调用 drawPoint,用户 B 的 useSubscription 瞬间收到数据并重绘 Canvas。
const { drawPoint } = trpc.whiteboard.drawPoint.useMutation();
const { subscribe } = trpc.client.subscription.whiteboard.subscribe;
const { data: points, isLoading } = trpc.useSubscription(
subscribe({ boardId: "board-123" }),
{
onData: (point) => {
// 直接更新 Canvas 状态
addPointToCanvas(point);
},
}
);
const handleCanvasClick = (x, y) => {
drawPoint({ x, y, color: "red" });
};
这个流程完美吗?并不完美。这里有一个经典的冲突问题。
如果用户 B 画了一个点,还没来得及推送到 A(假设网络延迟),A 又画了一个点,A 的数据覆盖了 B 的数据。这就导致了“最终一致性”的问题。
在 tRPC 订阅中,我们通常配合乐观 UI 和版本号来解决。
第七部分:进阶技巧—— 乐观更新与去抖动
1. 乐观更新
在实时应用中,我们需要一种“感觉”比真实更快的技术。当用户点击“发送”时,我们不要等服务器确认,先把它放到屏幕上,然后通过订阅把服务器的“真实数据”推回来覆盖它。
const sendMessage = trpc.chat.sendMessage.useMutation({
onMutate: async (newMsg) => {
// 1. 取消掉所有的后续订阅(防止旧消息插入)
cancelQueries(['messages']);
// 2. 保存当前状态(回滚点)
const previousMessages = queryClient.getQueryData(['messages']);
// 3. 立即更新本地状态(乐观 UI)
queryClient.setQueryData(['messages'], (old: any) =>
[newMsg, ...(old || [])]
);
// 4. 返回上下文,以便发生错误时回滚
return { previousMessages };
},
onError: (err, newMsg, context) => {
// 出错了?回滚!
queryClient.setQueryData(['messages'], context.previousMessages);
},
onSettled: () => {
// 无论成功失败,都刷新数据,或者如果使用了订阅,订阅会自动补全数据
queryClient.invalidateQueries(['messages']);
},
});
2. 防抖与缓冲
客户端可能因为网络抖动收到乱序的数据,或者服务端推送得太快,UI 渲染不出来。
我们可以用 useSubscription 的配置项:
useSubscription(subscription, {
onData: debounce((data) => {
// 防抖动处理,防止高频推送导致的重渲染
updateUI(data);
}, 100),
});
或者,在服务端使用 RxJS 的 buffer 或 debounceTime 操作符来处理流。
第八部分:常见陷阱与调试
在使用 tRPC 订阅时,有几个坑很容易踩。
陷阱一:Context 丢失
在订阅中,context(比如用户 ID)是非常重要的。确保你在服务端的 createContext 里正确传递了信息,并且你的订阅中间件正确使用了 use(isAuthed)。如果鉴权失败,Socket 连接可能会断开。
陷阱二:内存泄漏
如果你在一个组件里启动了订阅,却在组件卸载时没有正确清理,浏览器可能会因为 WebSocket 保留的连接而崩溃。
React Query (tRPC 的底层) 有时候会自动处理这个问题,但如果你手动使用了 socket.disconnect(),请务必小心。
陷阱三:TypeScript 类型推断错误
如果你发现 trpc.client.subscription 是 undefined,那通常是 tRPC 版本的问题,或者是 createTRPCReact 没有正确导出。
第九部分:未来展望 —— 不仅仅是 WebSocket
tRPC 订阅最酷的地方在于它的抽象性。我们目前用的是 Socket.io,但如果你以后想换成 Pusher,或者原生 WebSockets,你的服务端 tRPC Router 定义代码几乎不需要改动,只需要替换底层的 wsLink 配置即可。
这就意味着,你可以把这套逻辑从 Web 应用移植到移动端(React Native),甚至移植到 Electron 桌面应用中。只要这些环境能跑 Socket.io,你就能享受完全一致的类型安全体验。
总结:为什么你应该现在就拥抱订阅
好,讲了这么多,我们总结一下。
- 类型安全是王道:在订阅中,你依然能享受 TS 带来的快乐。你不需要写接口文档,编译器会帮你盯着。
- 性能提升:告别轮询。你的服务器压力小了,用户的体验好了,电费也省了。
- 架构解耦:你不需要为了实时数据专门写一套 API(REST)和一套实时协议。tRPC 把它们统一了。
tRPC 的订阅模式就像是给 React 的大脑装上了一根神经末梢。它不再是被动地等待指令,而是能主动感知世界的变化。这对于构建现代 Web 应用——从实时协作工具到金融交易系统——都是必不可少的武器。
所以,下次当你写那个 setInterval 的时候,请停下来,看看你的 tRPC 文档,想一想 Socket.io,想一想那个 Async Generator 的 yield。也许你会发现,原来交互还可以这么丝滑。
好了,代码都写好了,Socket 都连上了。现在,打开你的浏览器,去接收来自服务器的第一个推送吧。别客气,这数据可是自带类型的!
谢谢大家!