asyncPipe / asyncCompose:异步函数的流水线编排
在现代前端开发中,异步操作无处不在:API 调用、数据库查询、文件读写。当多个异步任务需要按顺序处理数据时,传统的 then 链或嵌套 await 会导致代码冗长、难以维护。
asyncPipe 与 asyncCompose 应运而生——它们是 pipe 的异步版本,专为处理返回 Promise 的函数链而设计。
更进一步,我们还需要控制异步任务的执行模式:并行、串行、并发限制。这些模式共同构成了异步编程的“控制流工具箱”。
asyncPipe:异步函数的左到右流水线
核心语义
asyncPipe(f, g, h)(x) 等价于:
async () => {
const r1 = await f(x)
const r2 = await g(r1)
const r3 = await h(r2)
return r3
}数据从左向右流动,每一步都等待前一步的 Promise 解析。
实现:async/await + reduce
const asyncPipe = <T>(
...fns: Array<(arg: T) => Promise<T> | T>
) => {
return async (value: T): Promise<T> => {
return fns.reduce(async (accPromise, fn) => {
const acc = await accPromise // 等待上一步结果
return fn(acc) // 执行当前函数
}, Promise.resolve(value))
}
}关键点
accPromise是一个Promise,必须await获取其值。- 初始值为
Promise.resolve(value),确保统一处理。 - 返回值为
Promise<T>,调用者需await。
使用示例
const fetchUser = (id: number) => fetch(`/api/users/${id}`).then(res => res.json())
const validateUser = (user: User) => { /* 验证逻辑 */ return user }
const sendWelcomeEmail = (user: User) => fetch('/api/email', { method: 'POST', body: user })
const onUserSignup = asyncPipe(
fetchUser,
validateUser,
sendWelcomeEmail
)
await onUserSignup(123) // 顺序执行三个异步操作asyncCompose:从右到左的异步组合
核心语义
asyncCompose(f, g, h)(x) 等价于 f(g(h(x))),但所有函数异步。
实现:reduceRight
const asyncCompose = <T>(
...fns: Array<(arg: T) => Promise<T> | T>
) => {
return async (value: T): Promise<T> => {
return fns.reduceRight(async (accPromise, fn) => {
const acc = await accPromise
return fn(acc)
}, Promise.resolve(value))
}
}语义与 asyncPipe 相同,仅执行方向相反。推荐优先使用 asyncPipe,因其更符合阅读习惯。
并行执行:parallel 函数
当多个异步任务互不依赖时,应并行执行以提升性能。
核心语义
同时发起所有任务,等待全部完成(类似 Promise.all)。
const parallel = <T>(
...fns: Array<() => Promise<T>>
): (() => Promise<T[]>) => {
return async () => {
return await Promise.all(fns.map(fn => fn()))
}
}使用示例
const fetchUser = () => fetch('/api/user').then(res => res.json())
const fetchPosts = () => fetch('/api/posts').then(res => res.json())
const fetchSettings = () => fetch('/api/settings').then(res => res.json())
const loadDashboard = parallel(fetchUser, fetchPosts, fetchSettings)
const [user, posts, settings] = await loadDashboard()
// 三个请求同时发起,总耗时 ≈ 最慢任务的耗时串行执行:series 函数
当任务有顺序依赖时,必须串行执行。
核心语义
一个接一个执行,前一个完成后再开始下一个。
实现:for...of + await
const series = <T>(
...fns: Array<() => Promise<T>>
): (() => Promise<T[]>) => {
return async () => {
const results: T[] = []
for (const fn of fns) {
results.push(await fn())
}
return results
}
}对比 parallel
parallel:总时间 ≈max(task1, task2, task3)series:总时间 ≈sum(task1, task2, task3)
选择依据:任务间是否有数据依赖。
并发控制:concurrency 限制同时执行的任务数
parallel 的问题是:若任务过多(如 1000 个 API 请求),会瞬间耗尽资源(TCP 连接、内存)。
解法:并发控制(Concurrency Control),限制同时运行的任务数。
核心思想:“任务池”模式
维护一个“池”,最多允许 n 个任务同时运行。当有任务完成,立即从队列中取出新任务补上。
实现
const withConcurrency = <T>(
concurrency: number,
tasks: Array<() => Promise<T>>
): (() => Promise<T[]>) => {
return async () => {
const results: T[] = []
const executing: Array<Promise<unknown>> = []
for (const [i, task] of tasks.entries()) {
const promise = task().then(result => {
results[i] = result
})
executing.push(promise)
// 限制并发数
if (executing.length >= concurrency) {
await Promise.race(executing) // 等待任意一个完成
executing.splice(executing.indexOf(promise), 1)
}
}
// 等待剩余任务完成
await Promise.all(executing)
return results
}
}关键点
executing数组跟踪正在运行的任务。- 当数量达到
concurrency,使用Promise.race等待任意一个任务完成,然后移除它。 - 最后用
Promise.all确保所有任务结束。
使用示例
const tasks = Array(100).fill(0).map((_, i) =>
() => fetch(`/api/data/${i}`).then(res => res.json())
)
const limitedFetch = withConcurrency(5, tasks) // 最多 5 个并发
const results = await limitedFetch()这能有效防止请求洪水,保护服务端和客户端。
错误处理:异步流水线的容错
异步流水线中的错误会中断执行。需根据场景决定处理策略。
默认行为:异常中断
const asyncPipe = /* ... */
try {
await asyncPipe(fn1, fn2, fn3)(input)
} catch (error) {
// 捕获第一个失败的任务的错误
}解法:包裹函数返回 Result
const safe = <T, E = Error>(
fn: () => Promise<T>
): (() => Promise<{ success: true; data: T } | { success: false; error: E }>) => {
return async () => {
try {
const data = await fn()
return { success: true, data }
} catch (error) {
return { success: false, error: error as E }
}
}
}将可能失败的函数用 safe 包裹,错误变为值,流水线可继续执行。
结语:异步控制流是现代应用的基石
asyncPipe、parallel、series、withConcurrency 共同构成了异步编程的“四象限”:
| 模式 | 适用场景 |
|---|---|
asyncPipe | 数据转换流水线(A → B → C) |
parallel | 无依赖的批量任务(A, B, C 同时) |
series | 有顺序依赖的任务链(A → B → C) |
concurrency | 大量任务的资源受限执行 |
掌握这些模式,意味着你能:
- 优化性能:并行减少等待时间。
- 保障稳定性:并发控制防止雪崩。
- 提升可读性:声明式代码替代嵌套回调。
当你用 asyncPipe 编排用户注册流程,或用 withConcurrency 安全地批量处理文件时,你已不仅是写代码,更是在设计数据的流动与系统的节奏——而这,正是高级前端工程师的核心能力。