Usage > Streaming Updates: updating cache from pushed messages">Usage > Streaming Updates: updating cache from pushed messages">
跳至主要内容

流式更新

概述

RTK Query 使您能够接收持久查询的流式更新。这使查询能够建立与服务器的持续连接(通常使用 WebSockets),并在从服务器接收更多信息时将更新应用于缓存数据。

流式更新可用于使 API 接收后端数据的实时更新,例如创建新条目或更新重要属性。

要为查询启用流式更新,请将异步 onCacheEntryAdded 函数传递给查询,其中包含当接收流式数据时如何更新查询的逻辑。有关更多详细信息,请参阅 onCacheEntryAdded API 参考

何时使用流式更新

查询数据的更新主要应通过 轮询 以一定间隔间歇地进行,使用 缓存失效 根据与查询和变异相关的标签使数据失效,或者使用 refetchOnMountOrArgChange 在使用数据的组件挂载时获取最新数据。

但是,流式更新对于涉及以下情况的场景特别有用

  • 对大型对象的少量频繁更改。与其反复轮询大型对象,不如使用初始查询获取该对象,并使用流式更新在收到更新时更新单个属性。
  • 外部事件驱动的更新。如果数据可能由服务器或其他外部用户更改,并且预计实时更新将显示给活动用户,则仅轮询会导致查询之间出现一段时间的数据陈旧,从而导致状态很容易不同步。流式更新可以在更新发生时更新所有活动客户端,而不是等待下一个间隔过去。

从流式更新中受益的示例用例是

  • GraphQL 订阅
  • 实时聊天应用程序
  • 实时多人游戏
  • 具有多个并发用户的协作文档编辑

使用 onCacheEntryAdded 生命周期

onCacheEntryAdded 生命周期回调允许您编写任意异步逻辑,该逻辑将在将新缓存条目添加到 RTK Query 缓存后执行(即,在组件创建对给定端点+参数组合的新订阅后)。

onCacheEntryAdded 将使用两个参数调用:传递给订阅的 arg,以及包含“生命周期承诺”和实用程序函数的选项对象。您可以使用这些来编写顺序逻辑,该逻辑等待数据添加、启动服务器连接、应用部分更新以及在查询订阅被删除时清理连接。

通常,您将 await cacheDataLoaded 来确定何时获取了第一个数据,然后使用 updateCacheData 实用程序在收到消息时应用流式更新。updateCacheData 是一个 Immer 驱动的回调,它接收当前缓存值的 draft。您可以根据收到的值“修改”草稿值以根据需要更新它。RTK Query 然后将调度一个操作,该操作根据这些更改应用一个差异化补丁。

最后,您可以使用 await cacheEntryRemoved 来了解何时清理任何服务器连接。

流式更新示例

WebSocket 聊天 API

import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<Message[], Channel>({
query: (channel) => `messages/${channel}`,
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
// create a websocket connection when the cache subscription starts
const ws = new WebSocket('ws://localhost:8080')
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded

// when data is received from the socket connection to the server,
// if it is a message and for the appropriate channel,
// update our query result with the received message
const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCachedData((draft) => {
draft.push(data)
})
}

ws.addEventListener('message', listener)
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
ws.close()
},
}),
}),
})

export const { useGetMessagesQuery } = api

预期结果

getMessages 查询被触发时(例如,通过使用 useGetMessagesQuery() 钩子挂载组件),将根据端点的序列化参数添加一个 缓存条目。关联的查询将根据 query 属性被触发,以获取缓存的初始数据。同时,异步 onCacheEntryAdded 回调将开始,并创建一个新的 WebSocket 连接。一旦收到初始查询的响应,缓存将被填充响应数据,并且 cacheDataLoaded promise 将被解析。在等待 cacheDataLoaded promise 后,message 事件监听器将被添加到 WebSocket 连接中,当收到关联的消息时,它会更新缓存数据。

当没有更多对数据的活动订阅时(例如,当订阅的组件在足够长的时间内保持未挂载状态时),cacheEntryRemoved promise 将被解析,允许剩余的代码运行并关闭 websocket 连接。RTK Query 也会从缓存中删除关联的数据。

如果稍后对相应缓存条目的查询运行,它将覆盖整个缓存条目,并且流式更新监听器将继续对更新后的数据进行操作。

具有转换后的响应形状的 WebSocket 聊天 API

import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { createEntityAdapter } from '@reduxjs/toolkit'
import type { EntityState } from '@reduxjs/toolkit'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

const messagesAdapter = createEntityAdapter<Message>()
export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<EntityState<Message, number>, Channel>({
query: (channel) => `messages/${channel}`,
transformResponse(response: Message[]) {
return messagesAdapter.addMany(
messagesAdapter.getInitialState(),
response
)
},
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
const ws = new WebSocket('ws://localhost:8080')
try {
await cacheDataLoaded

const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCachedData((draft) => {
messagesAdapter.upsertOne(draft, data)
})
}

ws.addEventListener('message', listener)
} catch {}
await cacheEntryRemoved
ws.close()
},
}),
}),
})

export const { useGetMessagesQuery } = api

此示例演示了如何更改 上一个示例 以允许在将数据添加到缓存时转换响应形状。

例如,数据从以下形状转换而来

[
{
id: 0
channel: 'redux'
userName: 'Mark'
text: 'Welcome to #redux!'
},
{
id: 1
channel: 'redux'
userName: 'Lenz'
text: 'Glad to be here!'
},
]

到以下形状

{
// The unique IDs of each item. Must be strings or numbers
ids: [0, 1],
// A lookup table mapping entity IDs to the corresponding entity objects
entities: {
0: {
id: 0,
channel: "redux",
userName: "Mark",
text: "Welcome to #redux!",
},
1: {
id: 1,
channel: "redux",
userName: "Lenz",
text: "Glad to be here!",
},
},
};

需要注意的是,在 onCacheEntryAdded 回调中对缓存数据的更新必须尊重将用于缓存数据的转换后的数据形状。该示例展示了如何使用 createEntityAdapter 进行初始 transformResponse,以及在收到流式更新时将接收到的项目插入到缓存数据中,同时保持规范化的状态结构。