跳至主要内容

createListenerMiddleware

概述

一个 Redux 中间件,允许您定义包含“效果”回调的“监听器”条目,该回调包含其他逻辑,以及一种根据分派的 Action 或状态更改指定该回调何时运行的方法。

它旨在成为更广泛使用的 Redux 异步中间件(如 sagas 和 observables)的轻量级替代方案。虽然与 thunk 的复杂程度和概念相似,但它可以用来复制一些常见的 saga 使用模式。

从概念上讲,您可以将其视为类似于 React 的 useEffect 钩子,只是它在响应 Redux 商店更新而不是组件 props/state 更新时运行逻辑。

监听器效果回调可以访问 dispatchgetState,类似于 thunk。监听器还接收一组异步工作流函数,如 takeconditionpauseforkunsubscribe,这些函数允许编写更复杂的异步逻辑。

监听器可以在设置期间通过调用 `listenerMiddleware.startListening()` 静态定义,或者在运行时使用特殊的 `dispatch(addListener())` 和 `dispatch(removeListener())` 操作动态添加和删除。

基本用法

import { configureStore, createListenerMiddleware } from '@reduxjs/toolkit'

import todosReducer, {
todoAdded,
todoToggled,
todoDeleted,
} from '../features/todos/todosSlice'

// Create the middleware instance and methods
const listenerMiddleware = createListenerMiddleware()

// Add one or more listener entries that look for specific actions.
// They may contain any sync or async logic, similar to thunks.
listenerMiddleware.startListening({
actionCreator: todoAdded,
effect: async (action, listenerApi) => {
// Run whatever additional side-effect-y logic you want here
console.log('Todo added: ', action.payload.text)

// Can cancel other running instances
listenerApi.cancelActiveListeners()

// Run async logic
const data = await fetchData()

// Pause until action dispatched or state changed
if (await listenerApi.condition(matchSomeAction)) {
// Use the listener API methods to dispatch, get state,
// unsubscribe the listener, start child tasks, and more
listenerApi.dispatch(todoAdded('Buy pet food'))

// Spawn "child tasks" that can do more work and return results
const task = listenerApi.fork(async (forkApi) => {
// Can pause execution
await forkApi.delay(5)
// Complete the child by returning a value
return 42
})

const result = await task.result
// Unwrap the child result in the listener
if (result.status === 'ok') {
// Logs the `42` result value that was returned
console.log('Child succeeded: ', result.value)
}
}
},
})

const store = configureStore({
reducer: {
todos: todosReducer,
},
// Add the listener middleware to the store.
// NOTE: Since this can receive actions with functions inside,
// it should go before the serializability check middleware
middleware: (getDefaultMiddleware) =>
getDefaultMiddleware().prepend(listenerMiddleware.middleware),
})

createListenerMiddleware

创建一个中间件实例,然后应该通过 `configureStore` 的 `middleware` 参数添加到存储中。

const createListenerMiddleware = (options?: CreateMiddlewareOptions) =>
ListenerMiddlewareInstance

interface CreateListenerMiddlewareOptions<ExtraArgument = unknown> {
extra?: ExtraArgument
onError?: ListenerErrorHandler
}

type ListenerErrorHandler = (
error: unknown,
errorInfo: ListenerErrorInfo,
) => void

interface ListenerErrorInfo {
raisedBy: 'effect' | 'predicate'
}

中间件选项

  • extra:一个可选的“额外参数”,它将被注入到每个监听器的 `listenerApi` 参数中。等同于 Redux Thunk 中间件中的“额外参数”
  • onError:一个可选的错误处理程序,它将被调用,并带有 `listener` 引发的同步和异步错误,以及 `predicate` 抛出的同步错误。

监听器中间件实例

从 `createListenerMiddleware` 返回的“监听器中间件实例”是一个类似于 `createSlice` 生成的“切片”对象的物体。该实例对象 *不是* 实际的 Redux 中间件本身。相反,它包含中间件和一些用于在中间件中添加和删除监听器条目的实例方法。

interface ListenerMiddlewareInstance<
State = unknown,
Dispatch extends ThunkDispatch<State, unknown, UnknownAction> = ThunkDispatch<
State,
unknown,
UnknownAction
>,
ExtraArgument = unknown,
> {
middleware: ListenerMiddleware<State, Dispatch, ExtraArgument>
startListening: (options: AddListenerOptions) => Unsubscribe
stopListening: (
options: AddListenerOptions & UnsubscribeListenerOptions,
) => boolean
clearListeners: () => void
}

middleware

实际的 Redux 中间件。通过 `configureStore.middleware` 选项 将其添加到 Redux 存储中。

由于监听器中间件可以接收包含函数的“添加”和“删除”操作,因此通常应将其作为链中的第一个中间件添加,以便它位于可序列化检查中间件之前。

const store = configureStore({
reducer: {
todos: todosReducer,
},
// Add the listener middleware to the store.
// NOTE: Since this can receive actions with functions inside,
// it should go before the serializability check middleware
middleware: (getDefaultMiddleware) =>
getDefaultMiddleware().prepend(listenerMiddleware.middleware),
})

startListening

向中间件添加一个新的监听器条目。通常用于在应用程序设置期间“静态”添加新的监听器。

const startListening = (options: AddListenerOptions) => UnsubscribeListener

interface AddListenerOptions {
// Four options for deciding when the listener will run:

// 1) Exact action type string match
type?: string

// 2) Exact action type match based on the RTK action creator
actionCreator?: ActionCreator

// 3) Match one of many actions using an RTK matcher
matcher?: Matcher

// 4) Return true based on a combination of action + state
predicate?: ListenerPredicate

// The actual callback to run when the action is matched
effect: (action: Action, listenerApi: ListenerApi) => void | Promise<void>
}

type ListenerPredicate<Action extends ReduxAction, State> = (
action: Action,
currentState?: State,
originalState?: State,
) => boolean

type UnsubscribeListener = (
unsubscribeOptions?: UnsubscribeListenerOptions,
) => void

interface UnsubscribeListenerOptions {
cancelActive?: true
}

您必须提供四个选项中的 *一个* 来决定监听器何时运行:typeactionCreatormatcherpredicate。每次调度操作时,将检查每个监听器,以查看它是否应根据当前操作与提供的比较选项进行比较而运行。

这些都是可以接受的

// 1) Action type string
listenerMiddleware.startListening({ type: 'todos/todoAdded', effect })
// 2) RTK action creator
listenerMiddleware.startListening({ actionCreator: todoAdded, effect })
// 3) RTK matcher function
listenerMiddleware.startListening({
matcher: isAnyOf(todoAdded, todoToggled),
effect,
})
// 4) Listener predicate
listenerMiddleware.startListening({
predicate: (action, currentState, previousState) => {
// return true when the listener should run
},
effect,
})

请注意,predicate 选项实际上允许仅针对与状态相关的检查进行匹配,例如“state.x 是否已更改”或“state.x 的当前值是否与某些条件匹配”,而与实际操作无关。

RTK 中包含的 “匹配器”实用程序函数 可以作为 matcherpredicate 选项。

返回值是一个 unsubscribe() 回调函数,它将删除此监听器。默认情况下,取消订阅 *不会* 取消监听器的任何活动实例。但是,您也可以传入 {cancelActive: true} 来取消正在运行的实例。

如果您尝试添加一个监听器条目,但另一个具有相同函数引用的条目已经存在,则不会添加新的条目,并且将返回现有的 unsubscribe 方法。

effect 回调函数将接收当前操作作为其第一个参数,以及一个类似于 createAsyncThunk 中的“thunk API”对象的“监听器 API”对象。

所有监听器谓词和回调函数在根 reducer 已经处理完操作并更新了状态之后才会被检查。listenerApi.getOriginalState() 方法可用于获取在处理触发此监听器的操作之前存在的 state 值。

stopListening

移除给定的监听器条目。

它接受与 startListening() 相同的参数。它通过比较 listener 的函数引用和提供的 actionCreator/matcher/predicate 函数或 type 字符串来检查是否存在监听器条目。

默认情况下,这不会取消任何正在运行的实例。但是,您也可以传入 {cancelActive: true} 来取消正在运行的实例。

const stopListening = (
options: AddListenerOptions & UnsubscribeListenerOptions,
) => boolean

interface UnsubscribeListenerOptions {
cancelActive?: true
}

如果监听器条目已被移除,则返回 true;如果未找到与提供的输入匹配的订阅,则返回 false

// Examples:
// 1) Action type string
listenerMiddleware.stopListening({
type: 'todos/todoAdded',
listener,
cancelActive: true,
})
// 2) RTK action creator
listenerMiddleware.stopListening({ actionCreator: todoAdded, effect })
// 3) RTK matcher function
listenerMiddleware.stopListening({ matcher, effect, cancelActive: true })
// 4) Listener predicate
listenerMiddleware.stopListening({ predicate, effect })

clearListeners

移除所有当前的监听器条目。它还会取消所有这些监听器的正在运行的实例。

这在测试场景中可能非常有用,在测试场景中,单个中间件或存储实例可能在多个测试中使用,以及一些应用程序清理情况。

const clearListeners = () => void;

操作创建者

除了通过直接调用监听器实例上的方法来添加和移除监听器之外,您还可以通过分发特殊的“添加”和“移除”操作来在运行时动态添加和移除监听器。这些操作从主 RTK 包中导出,作为标准的 RTK 生成的操作创建者。

addListener

一个标准的 RTK 操作创建者,从包中导入。分发此操作会告诉中间件在运行时动态添加新的监听器。它接受与 startListening() 完全相同的选项。

分发此操作会从 dispatch 返回一个 unsubscribe() 回调函数。

// Per above, provide `predicate` or any of the other comparison options
const unsubscribe = store.dispatch(addListener({ predicate, effect }))

removeListener

一个标准的 RTK action creator,从包中导入。分发此 action 会告诉中间件在运行时动态删除监听器。接受与 stopListening() 相同的参数。

默认情况下,这不会取消任何正在运行的实例。但是,您也可以传入 {cancelActive: true} 来取消正在运行的实例。

如果监听器条目已删除,则返回 true,如果未找到与提供的输入匹配的订阅,则返回 false

const wasRemoved = store.dispatch(
removeListener({ predicate, effect, cancelActive: true }),
)

clearAllListeners

一个标准的 RTK action creator,从包中导入。分发此 action 会告诉中间件删除所有当前监听器条目。它还会取消所有这些监听器的活动运行实例。

store.dispatch(clearAllListeners())

监听器 API

listenerApi 对象是每个监听器回调的第二个参数。它包含几个可以在监听器逻辑中的任何地方调用的实用程序函数。

export interface ListenerEffectAPI<
State,
Dispatch extends ReduxDispatch<UnknownAction>,
ExtraArgument = unknown,
> extends MiddlewareAPI<Dispatch, State> {
// NOTE: MiddlewareAPI contains `dispatch` and `getState` already

/**
* Returns the store state as it existed when the action was originally dispatched, _before_ the reducers ran.
* This function can **only** be invoked **synchronously**, it throws error otherwise.
*/
getOriginalState: () => State
/**
* Removes the listener entry from the middleware and prevent future instances of the listener from running.
* It does **not** cancel any active instances.
*/
unsubscribe(): void
/**
* It will subscribe a listener if it was previously removed, noop otherwise.
*/
subscribe(): void
/**
* Returns a promise that resolves when the input predicate returns `true` or
* rejects if the listener has been cancelled or is completed.
*
* The return value is `true` if the predicate succeeds or `false` if a timeout is provided and expires first.
*/
condition: ConditionFunction<State>
/**
* Returns a promise that resolves when the input predicate returns `true` or
* rejects if the listener has been cancelled or is completed.
*
* The return value is the `[action, currentState, previousState]` combination that the predicate saw as arguments.
*
* The promise resolves to null if a timeout is provided and expires first.
*/
take: TakePattern<State>
/**
* Cancels all other running instances of this same listener except for the one that made this call.
*/
cancelActiveListeners: () => void
/**
* Cancels the listener instance that made this call.
*/
cancel: () => void
/**
* Throws a `TaskAbortError` if this listener has been cancelled
*/
throwIfCancelled: () => void
/**
* An abort signal whose `aborted` property is set to `true`
* if the listener execution is either aborted or completed.
* @see https://mdn.org.cn/en-US/docs/Web/API/AbortSignal
*/
signal: AbortSignal
/**
* Returns a promise that resolves after `timeoutMs` or
* rejects if the listener has been cancelled or is completed.
*/
delay(timeoutMs: number): Promise<void>
/**
* Queues in the next microtask the execution of a task.
*/
fork<T>(executor: ForkedTaskExecutor<T>): ForkedTask<T>
/**
* Returns a promise that resolves when `waitFor` resolves or
* rejects if the listener has been cancelled or is completed.
* @param promise
*/
pause<M>(promise: Promise<M>): Promise<M>
extra: ExtraArgument
}

这些可以分为几个类别。

存储交互方法

  • dispatch: Dispatch:标准的 store.dispatch 方法
  • getState: () => State:标准的 store.getState 方法
  • getOriginalState: () => State:返回 action 最初分发时存在的存储状态, reducer 运行之前。(注意:此方法只能在初始分发调用堆栈期间同步调用,以避免内存泄漏。异步调用它将抛出错误。)
  • extra: unknown:如果存在,作为中间件设置的一部分提供的“额外参数”

dispatchgetState 与 thunk 中的完全相同。getOriginalState 可用于比较监听器启动之前的原始状态。

extra 可用于在创建时将 API 服务层等值注入中间件,并且可以在此处访问。

监听器订阅管理

  • unsubscribe: () => void:从中间件中删除监听器条目,并防止监听器的未来实例运行。(这不会取消任何活动实例。)
  • subscribe: () => void:如果监听器条目以前被删除,则会重新订阅它,或者如果当前已订阅,则执行无操作
  • cancelActiveListeners: () => void: 取消所有其他正在运行的相同监听器实例,除了发出此调用的实例。(如果其他实例使用取消感知 API(如 take/cancel/pause/delay)暂停,则取消将产生有意义的效果 - 有关更多详细信息,请参阅“使用”部分中的“取消和任务管理”)。
  • cancel: () => void: 取消发出此调用的监听器实例。
  • throwIfCancelled: () => void: 如果当前监听器实例被取消,则抛出 TaskAbortError
  • signal: AbortSignal: 一个 AbortSignal,如果监听器执行被中止或完成,其 aborted 属性将被设置为 true

动态取消订阅和重新订阅此监听器允许更复杂的异步工作流程,例如通过在监听器开始时调用 listenerApi.unsubscribe() 来避免重复运行实例,或者调用 listenerApi.cancelActiveListeners() 来确保只有最新的实例可以完成。

条件工作流程执行

  • take: (predicate: ListenerPredicate, timeout?: number) => Promise<[Action, State, State] | null>: 返回一个承诺,当 predicate 返回 true 时,该承诺将被解决。返回值是 predicate 作为参数看到的 [action, currentState, previousState] 组合。如果提供了 timeout 并且首先过期,则承诺将解析为 null
  • condition: (predicate: ListenerPredicate, timeout?: number) => Promise<boolean>: 与 take 类似,但如果谓词成功则解析为 true,如果提供了 timeout 并且首先过期则解析为 false。这允许异步逻辑暂停并等待某个条件发生,然后再继续。有关使用详细信息,请参阅下面的“编写异步工作流程”。
  • delay: (timeoutMs: number) => Promise<void>: 返回一个取消感知承诺,该承诺在超时后解析,或者如果在过期之前被取消则拒绝。
  • pause: (promise: Promise<T>) => Promise<T>: 接受任何承诺,并返回一个取消感知承诺,该承诺要么使用参数承诺解析,要么如果在解析之前被取消则拒绝。

这些方法提供了根据未来分派的 action 和状态更改编写条件逻辑的能力。两者都接受可选的以毫秒为单位的 timeout

take 解析为 [action, currentState, previousState] 元组或 null(如果超时),而 condition 解析为 true(如果成功)或 false(如果超时)。

take 用于“等待 action 并获取其内容”,而 condition 用于检查,例如 if (await condition(predicate))

这两种方法都支持取消,如果监听器实例在暂停时被取消,它们会抛出 TaskAbortError 错误。

请注意,takecondition 只有在 **下一个动作** 被分发后才会解析。即使它们的谓词对当前状态返回 true,它们也不会立即解析。

子任务

  • fork: (executor: (forkApi: ForkApi) => T | Promise<T>) => ForkedTask<T>: 启动一个“子任务”,可用于完成额外的工作。接受任何同步或异步函数作为参数,并返回一个 {result, cancel} 对象,可用于检查子任务的最终状态和返回值,或在进行中取消它。

子任务可以被启动,并等待收集它们的返回值。提供的 executor 函数将异步调用,并带有包含 {pause, delay, signal}forkApi 对象,允许它暂停或检查取消状态。它还可以使用监听器范围内的 listenerApi

一个例子可能是监听器派生一个包含无限循环的子任务,该循环监听来自服务器的事件。然后,父级使用 listenerApi.condition() 等待“停止”动作,并取消子任务。

任务和结果类型为

interface ForkedTaskAPI {
pause<W>(waitFor: Promise<W>): Promise<W>
delay(timeoutMs: number): Promise<void>
signal: AbortSignal
}

export type TaskResolved<T> = {
readonly status: 'ok'
readonly value: T
}

export type TaskRejected = {
readonly status: 'rejected'
readonly error: unknown
}

export type TaskCancelled = {
readonly status: 'cancelled'
readonly error: TaskAbortError
}

export type TaskResult<Value> =
| TaskResolved<Value>
| TaskRejected
| TaskCancelled

export interface ForkedTask<T> {
result: Promise<TaskResult<T>>
cancel(): void
}

TypeScript 使用

中间件代码完全是 TS 类型化的。但是,startListeningaddListener 函数默认情况下不知道存储的 RootState 类型是什么,因此 getState() 将返回 unknown

为了解决这个问题,中间件提供了类型来定义这些方法的“预类型化”版本,类似于用于定义预类型化 React-Redux 钩子的模式。我们特别建议在与实际 configureStore() 调用不同的文件中创建中间件实例

// listenerMiddleware.ts
import { createListenerMiddleware, addListener } from '@reduxjs/toolkit'
import type { RootState, AppDispatch } from './store'

export const listenerMiddleware = createListenerMiddleware()

export const startAppListening = listenerMiddleware.startListening.withTypes<
RootState,
AppDispatch
>()

export const addAppListener = addListener.withTypes<RootState, AppDispatch>()

然后在您的组件中导入并使用这些预类型化方法。

使用指南

总体目的

此中间件允许您在分发某些操作时运行其他逻辑,作为诸如 sagas 和 observables 之类的中间件的轻量级替代方案,这些中间件既有沉重的运行时捆绑成本,又有很大的概念开销。

此中间件并非旨在处理所有可能的用例。与 thunk 一样,它为您提供了一组基本原语(包括对 dispatchgetState 的访问),并赋予您编写任何同步或异步逻辑的自由。这既是优势(您可以做任何事!),也是劣势(您可以做任何事,没有任何限制!)。

该中间件包含几个异步工作流原语,足以编写等效于许多 Redux-Saga 效果操作符的代码,例如 takeLatesttakeLeadingdebounce,尽管这些方法都没有直接包含在内。(请参阅 监听器中间件测试文件,了解如何编写等效于这些效果的代码示例。)

标准使用模式

最常见的预期用法是“在分派给定操作后运行一些逻辑”。例如,您可以通过查找某些操作并将提取的数据发送到服务器来设置一个简单的分析跟踪器,包括从存储中提取用户详细信息。

listenerMiddleware.startListening({
matcher: isAnyOf(action1, action2, action3),
effect: (action, listenerApi) => {
const user = selectUserDetails(listenerApi.getState())

const { specialData } = action.meta

analyticsApi.trackUsage(action.type, user, specialData)
},
})

但是,predicate 选项还允许在某些状态值发生变化或状态匹配特定条件时触发逻辑。

listenerMiddleware.startListening({
predicate: (action, currentState, previousState) => {
// Trigger logic whenever this field changes
return currentState.counter.value !== previousState.counter.value
},
effect,
})

listenerMiddleware.startListening({
predicate: (action, currentState, previousState) => {
// Trigger logic after every action if this condition is true
return currentState.counter.value > 3
},
effect,
})

您还可以实现一个通用的 API 获取功能,其中 UI 分派一个描述要请求的资源类型的普通操作,中间件会自动获取它并分派一个结果操作。

listenerMiddleware.startListening({
actionCreator: resourceRequested,
effect: async (action, listenerApi) => {
const { name, args } = action.payload
listenerApi.dispatch(resourceLoading())

const res = await serverApi.fetch(`/api/${name}`, ...args)
listenerApi.dispatch(resourceLoaded(res.data))
},
})

(也就是说,我们建议将 RTK Query 用于任何有意义的数据获取行为 - 这主要是一个示例,说明您可以在监听器中做什么。)

listenerApi.unsubscribe 方法可以在任何时候使用,并且会从处理任何未来的操作中移除监听器。例如,您可以通过在主体中无条件地调用 unsubscribe() 来创建一个一次性监听器 - 效果回调将在第一次看到相关操作时运行,然后立即取消订阅,并且永远不会再次运行。(中间件实际上在内部使用这种技术来实现 take/condition 方法)

使用条件编写异步工作流

Saga 和 Observable 的一大优势是它们支持复杂异步工作流,包括根据特定分派的 action 停止和启动行为。然而,它们的缺点是都需要掌握复杂的 API,其中包含许多独特的操作符(saga 的 effect 方法,如 call()fork(),observable 的 RxJS 操作符),并且都会显著增加应用程序包大小。

虽然监听器中间件旨在完全取代 saga 或 observable,但它确实提供了一组精心挑选的 API 来实现长时间运行的异步工作流。

监听器可以使用 listenerApi 中的 conditiontake 方法来等待直到某些 action 被分派或状态检查满足。condition 方法直接受到 Temporal.io 工作流 API 中的 condition 函数 的启发(感谢 @swyx 的建议!),而 take 则受到 Redux-Saga 中的 take effect 的启发。

签名如下

type ConditionFunction<Action extends ReduxAction, State> = (
predicate: ListenerPredicate<Action, State> | (() => boolean),
timeout?: number,
) => Promise<boolean>

type TakeFunction<Action extends ReduxAction, State> = (
predicate: ListenerPredicate<Action, State> | (() => boolean),
timeout?: number,
) => Promise<[Action, State, State] | null>

您可以使用 await condition(somePredicate) 作为一种方法,暂停监听器回调的执行,直到满足某些条件。

predicate 将在每个 action 被 reducer 处理后被调用,并且当条件应该被解析时应该返回 true。(它实际上是一个一次性监听器。)如果提供了 timeout 数字(以毫秒为单位),则如果 predicate 首先返回,则 promise 将解析为 true,或者如果超时过期,则解析为 false。这允许您编写诸如 if (await condition(predicate, timeout)) 之类的比较。

这应该能够编写更长时间运行的工作流,这些工作流具有更复杂的异步逻辑,例如 Redux-Saga 中的“可取消计数器”示例

condition 用法的示例,来自测试套件

test('condition method resolves promise when there is a timeout', async () => {
let finalCount = 0
let listenerStarted = false

listenerMiddleware.startListening({
predicate: (action, currentState: CounterState) => {
return increment.match(action) && currentState.value === 0
},
effect: async (action, listenerApi) => {
listenerStarted = true
// Wait for either the counter to hit 3, or 50ms to elapse
const result = await listenerApi.condition(
(action, currentState: CounterState) => {
return currentState.value === 3
},
50,
)

// In this test, we expect the timeout to happen first
expect(result).toBe(false)
// Save the state for comparison outside the listener
const latestState = listenerApi.getState()
finalCount = latestState.value
},
})

store.dispatch(increment())
// The listener should have started right away
expect(listenerStarted).toBe(true)

store.dispatch(increment())

// If we wait 150ms, the condition timeout will expire first
await delay(150)
// Update the state one more time to confirm the listener isn't checking it
store.dispatch(increment())

// Handled the state update before the delay, but not after
expect(finalCount).toBe(2)
})

取消和任务管理

监听器中间件支持取消正在运行的监听器实例、take/condition/pause/delay 函数和“子任务”,其实现基于 AbortController

listenerApi.pause/delay() 函数提供了一种取消感知的方式,可以让当前监听器休眠。pause() 接受一个 promise,而 delay 接受一个超时值。如果监听器在等待时被取消,则会抛出 TaskAbortError。此外,takecondition 也支持取消中断。

listenerApi.cancelActiveListeners() 将取消正在运行的其他现有实例,而 listenerApi.cancel() 可用于取消当前实例(这可能对从 fork 中取消有用,fork 可能深度嵌套,无法直接抛出 promise 来退出 effect 执行)。listenerAPi.throwIfCancelled() 也可以用来在 effect 执行其他工作时发生取消的情况下退出工作流。

listenerApi.fork() 可用于启动可以执行额外工作的“子任务”。可以等待这些任务以收集其结果。这可能看起来像

listenerMiddleware.startListening({
actionCreator: increment,
effect: async (action, listenerApi) => {
// Spawn a child task and start it immediately
const task = listenerApi.fork(async (forkApi) => {
// Artificially wait a bit inside the child
await forkApi.delay(5)
// Complete the child by returning a value
return 42
})

const result = await task.result
// Unwrap the child result in the listener
if (result.status === 'ok') {
// Logs the `42` result value that was returned
console.log('Child succeeded: ', result.value)
}
},
})

复杂异步工作流

提供的异步工作流原语(cancelActiveListenerscancelunsubscribesubscribetakeconditionpausedelay)可用于实现等效于 Redux-Saga 库中许多更复杂异步工作流功能的行为。这包括诸如 throttledebouncetakeLatesttakeLeadingfork/join 之类的 effect。测试套件中的一些示例

test('debounce / takeLatest', async () => {
// Repeated calls cancel previous ones, no work performed
// until the specified delay elapses without another call
// NOTE: This is also basically identical to `takeLatest`.
// Ref: https://saga.redux.js.cn/docs/api#debouncems-pattern-saga-args
// Ref: https://saga.redux.js.cn/docs/api#takelatestpattern-saga-args

listenerMiddleware.startListening({
actionCreator: increment,
effect: async (action, listenerApi) => {
// Cancel any in-progress instances of this listener
listenerApi.cancelActiveListeners()

// Delay before starting actual work
await listenerApi.delay(15)

// do work here
},
})
}

test('takeLeading', async () => {
// Starts listener on first action, ignores others until task completes
// Ref: https://saga.redux.js.cn/docs/api#takeleadingpattern-saga-args

listenerMiddleware.startListening({
actionCreator: increment,
effect: async (action, listenerApi) => {
listenerCalls++

// Stop listening for this action
listenerApi.unsubscribe()

// Pretend we're doing expensive work

// Re-enable the listener
listenerApi.subscribe()
},
})
})

test('cancelled', async () => {
// cancelled allows checking if the current task was cancelled
// Ref: https://saga.redux.js.cn/docs/api#cancelled

let canceledAndCaught = false
let canceledCheck = false

// Example of canceling prior instances conditionally and checking cancellation
listenerMiddleware.startListening({
matcher: isAnyOf(increment, decrement, incrementByAmount),
effect: async (action, listenerApi) => {
if (increment.match(action)) {
// Have this branch wait around to be cancelled by the other
try {
await listenerApi.delay(10)
} catch (err) {
// Can check cancellation based on the exception and its reason
if (err instanceof TaskAbortError) {
canceledAndCaught = true
}
}
} else if (incrementByAmount.match(action)) {
// do a non-cancellation-aware wait
await delay(15)
if (listenerApi.signal.aborted) {
canceledCheck = true
}
} else if (decrement.match(action)) {
listenerApi.cancelActiveListeners()
}
},
})
})

作为一个更实际的例子:这个基于 saga 的“长轮询”循环 会反复向服务器请求消息,然后处理每个响应。当调度“开始轮询”操作时,子循环按需启动,当调度“停止轮询”操作时,循环被取消。

这种方法可以通过监听器中间件实现

// Track how many times each message was processed by the loop
const receivedMessages = {
a: 0,
b: 0,
c: 0,
}

const eventPollingStarted = createAction('serverPolling/started')
const eventPollingStopped = createAction('serverPolling/stopped')

listenerMiddleware.startListening({
actionCreator: eventPollingStarted,
effect: async (action, listenerApi) => {
// Only allow one instance of this listener to run at a time
listenerApi.unsubscribe()

// Start a child job that will infinitely loop receiving messages
const pollingTask = listenerApi.fork(async (forkApi) => {
try {
while (true) {
// Cancellation-aware pause for a new server message
const serverEvent = await forkApi.pause(pollForEvent())
// Process the message. In this case, just count the times we've seen this message.
if (serverEvent.type in receivedMessages) {
receivedMessages[
serverEvent.type as keyof typeof receivedMessages
]++
}
}
} catch (err) {
if (err instanceof TaskAbortError) {
// could do something here to track that the task was cancelled
}
}
})

// Wait for the "stop polling" action
await listenerApi.condition(eventPollingStopped.match)
pollingTask.cancel()
},
})

在组件内部添加监听器

可以通过 dispatch(addListener()) 在运行时添加监听器。这意味着您可以在任何可以访问 dispatch 的地方添加监听器,包括 React 组件。

由于调度 addListener 会返回一个 unsubscribe 回调函数,因此它自然映射到 React useEffect 钩子的行为,后者允许您返回一个清理函数。您可以在 effect 中添加一个监听器,并在钩子被清理时移除该监听器。

基本模式可能看起来像

useEffect(() => {
// Could also just `return dispatch(addListener())` directly, but showing this
// as a separate variable to be clear on what's happening
const unsubscribe = dispatch(
addListener({
actionCreator: todoAdded,
effect: (action, listenerApi) => {
// do some useful logic here
},
}),
)
return unsubscribe
}, [])

虽然这种模式可行,但我们并不建议这样做! React 和 Redux 社区一直试图强调尽可能地将行为建立在状态之上。让 React 组件直接绑定到 Redux 操作调度管道可能会导致代码库更难维护。

同时,这是一种有效的技术,无论是在 API 行为方面还是在潜在用例方面。将 saga 作为代码拆分应用程序的一部分进行延迟加载一直很常见,这通常需要一些复杂的额外设置工作来“注入”saga。相比之下,dispatch(addListener()) 自然地适合 React 组件的生命周期。

因此,虽然我们没有特别鼓励使用这种模式,但值得在这里记录下来,以便用户了解它作为一种可能性。

在文件中组织监听器

作为起点,最好在单独的文件(例如 app/listenerMiddleware.ts)中创建监听器中间件,而不是与存储在同一个文件中。这避免了其他文件尝试导入 middleware.addListener 时可能出现的任何循环导入问题。

从那里开始,我们已经想出了三种不同的方法来组织监听器函数和设置。

首先,您可以从切片文件导入效果回调到中间件文件,并添加监听器

app/listenerMiddleware.ts
import { action1, listener1 } from '../features/feature1/feature1Slice'
import { action2, listener2 } from '../features/feature2/feature2Slice'

listenerMiddleware.startListening({ actionCreator: action1, effect: listener1 })
listenerMiddleware.startListening({ actionCreator: action2, effect: listener2 })

这可能是最简单的选项,并且反映了商店设置如何将所有切片 reducer 汇集在一起以创建应用程序。

第二个选项是相反的:让切片文件导入中间件并直接添加它们的监听器

features/feature1/feature1Slice.ts
import { listenerMiddleware } from '../../app/listenerMiddleware'

const feature1Slice = createSlice(/* */)
const { action1 } = feature1Slice.actions

export default feature1Slice.reducer

listenerMiddleware.startListening({
actionCreator: action1,
effect: () => {},
})

这将所有逻辑保留在切片中,尽管它确实将设置锁定到单个中间件实例。

第三个选项是在切片中创建一个设置函数,但让监听器文件在启动时调用该函数

features/feature1/feature1Slice.ts
import type { AppStartListening } from '../../app/listenerMiddleware'

const feature1Slice = createSlice(/* */)
const { action1 } = feature1Slice.actions

export default feature1Slice.reducer

export const addFeature1Listeners = (startListening: AppStartListening) => {
startListening({
actionCreator: action1,
effect: () => {},
})
}
app/listenerMiddleware.ts
import { addFeature1Listeners } from '../features/feature1/feature1Slice'

addFeature1Listeners(listenerMiddleware.startListening)

您可以随意使用这些方法中任何最适合您的应用程序的方法。