跳至主要内容
版本:11.x

订阅 / WebSockets

使用订阅

提示

添加订阅过程

server/router.ts
tsx
import { EventEmitter } from 'events';
import { initTRPC } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { z } from 'zod';
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
const t = initTRPC.create();
export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable<Post>((emit) => {
const onAdd = (data: Post) => {
// emit data to client
emit.next(data);
};
// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on('add', onAdd);
// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off('add', onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
}),
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */
ee.emit('add', post);
return post;
}),
});
server/router.ts
tsx
import { EventEmitter } from 'events';
import { initTRPC } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { z } from 'zod';
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
const t = initTRPC.create();
export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable<Post>((emit) => {
const onAdd = (data: Post) => {
// emit data to client
emit.next(data);
};
// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on('add', onAdd);
// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off('add', onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
}),
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */
ee.emit('add', post);
return post;
}),
});

创建 WebSocket 服务器

bash
yarn add ws
bash
yarn add ws
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({ wss, router: appRouter, createContext });
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({ wss, router: appRouter, createContext });
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});

设置 TRPCClient 以使用 WebSockets

提示

您可以使用 链接 将查询和/或突变路由到 HTTP 传输和 WebSockets 上的订阅。

client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});
client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});

使用 React

/examples/next-prisma-starter-websockets

WebSockets RPC 规范

您可以通过深入研究 TypeScript 定义来了解更多详细信息

query / mutation

请求

ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

响应

... 以下,或错误。

ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}

subscription / subscription.stop

启动订阅

ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

要取消订阅,请调用 subscription.stop

ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}
ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}

订阅响应形状

... 以下,或错误。

ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}
ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}

错误

https://www.jsonrpc.org/specification#error_object错误格式

从服务器到客户端的通知

{ id: null, type: 'reconnect' }

告诉客户端在关闭服务器之前重新连接。由 wssHandler.broadcastReconnectNotification() 调用。