Skip to content

asyncPipe / asyncCompose:异步函数的流水线编排

在现代前端开发中,异步操作无处不在:API 调用、数据库查询、文件读写。当多个异步任务需要按顺序处理数据时,传统的 then 链或嵌套 await 会导致代码冗长、难以维护。

asyncPipeasyncCompose 应运而生——它们是 pipe 的异步版本,专为处理返回 Promise 的函数链而设计。

更进一步,我们还需要控制异步任务的执行模式:并行、串行、并发限制。这些模式共同构成了异步编程的“控制流工具箱”。

asyncPipe:异步函数的左到右流水线

核心语义

asyncPipe(f, g, h)(x) 等价于:

ts
async () => {
  const r1 = await f(x)
  const r2 = await g(r1)
  const r3 = await h(r2)
  return r3
}

数据从左向右流动,每一步都等待前一步的 Promise 解析。

实现:async/await + reduce

ts
const asyncPipe = <T>(
  ...fns: Array<(arg: T) => Promise<T> | T>
) => {
  return async (value: T): Promise<T> => {
    return fns.reduce(async (accPromise, fn) => {
      const acc = await accPromise // 等待上一步结果
      return fn(acc)              // 执行当前函数
    }, Promise.resolve(value))
  }
}

关键点

  • accPromise 是一个 Promise,必须 await 获取其值。
  • 初始值为 Promise.resolve(value),确保统一处理。
  • 返回值为 Promise<T>,调用者需 await

使用示例

ts
const fetchUser = (id: number) => fetch(`/api/users/${id}`).then(res => res.json())
const validateUser = (user: User) => { /* 验证逻辑 */ return user }
const sendWelcomeEmail = (user: User) => fetch('/api/email', { method: 'POST', body: user })

const onUserSignup = asyncPipe(
  fetchUser,
  validateUser,
  sendWelcomeEmail
)

await onUserSignup(123) // 顺序执行三个异步操作

asyncCompose:从右到左的异步组合

核心语义

asyncCompose(f, g, h)(x) 等价于 f(g(h(x))),但所有函数异步。

实现:reduceRight

ts
const asyncCompose = <T>(
  ...fns: Array<(arg: T) => Promise<T> | T>
) => {
  return async (value: T): Promise<T> => {
    return fns.reduceRight(async (accPromise, fn) => {
      const acc = await accPromise
      return fn(acc)
    }, Promise.resolve(value))
  }
}

语义与 asyncPipe 相同,仅执行方向相反。推荐优先使用 asyncPipe,因其更符合阅读习惯。

并行执行:parallel 函数

当多个异步任务互不依赖时,应并行执行以提升性能。

核心语义

同时发起所有任务,等待全部完成(类似 Promise.all)。

ts
const parallel = <T>(
  ...fns: Array<() => Promise<T>>
): (() => Promise<T[]>) => {
  return async () => {
    return await Promise.all(fns.map(fn => fn()))
  }
}

使用示例

ts
const fetchUser = () => fetch('/api/user').then(res => res.json())
const fetchPosts = () => fetch('/api/posts').then(res => res.json())
const fetchSettings = () => fetch('/api/settings').then(res => res.json())

const loadDashboard = parallel(fetchUser, fetchPosts, fetchSettings)

const [user, posts, settings] = await loadDashboard()
// 三个请求同时发起,总耗时 ≈ 最慢任务的耗时

串行执行:series 函数

当任务有顺序依赖时,必须串行执行。

核心语义

一个接一个执行,前一个完成后再开始下一个。

实现:for...of + await

ts
const series = <T>(
  ...fns: Array<() => Promise<T>>
): (() => Promise<T[]>) => {
  return async () => {
    const results: T[] = []
    for (const fn of fns) {
      results.push(await fn())
    }
    return results
  }
}

对比 parallel

  • parallel:总时间 ≈ max(task1, task2, task3)
  • series:总时间 ≈ sum(task1, task2, task3)

选择依据:任务间是否有数据依赖。

并发控制:concurrency 限制同时执行的任务数

parallel 的问题是:若任务过多(如 1000 个 API 请求),会瞬间耗尽资源(TCP 连接、内存)。

解法:并发控制(Concurrency Control),限制同时运行的任务数。

核心思想:“任务池”模式

维护一个“池”,最多允许 n 个任务同时运行。当有任务完成,立即从队列中取出新任务补上。

实现

ts
const withConcurrency = <T>(
  concurrency: number,
  tasks: Array<() => Promise<T>>
): (() => Promise<T[]>) => {
  return async () => {
    const results: T[] = []
    const executing: Array<Promise<unknown>> = []

    for (const [i, task] of tasks.entries()) {
      const promise = task().then(result => {
        results[i] = result
      })
      executing.push(promise)

      // 限制并发数
      if (executing.length >= concurrency) {
        await Promise.race(executing) // 等待任意一个完成
        executing.splice(executing.indexOf(promise), 1)
      }
    }

    // 等待剩余任务完成
    await Promise.all(executing)
    return results
  }
}

关键点

  • executing 数组跟踪正在运行的任务。
  • 当数量达到 concurrency,使用 Promise.race 等待任意一个任务完成,然后移除它。
  • 最后用 Promise.all 确保所有任务结束。

使用示例

ts
const tasks = Array(100).fill(0).map((_, i) =>
  () => fetch(`/api/data/${i}`).then(res => res.json())
)

const limitedFetch = withConcurrency(5, tasks) // 最多 5 个并发
const results = await limitedFetch()

这能有效防止请求洪水,保护服务端和客户端。

错误处理:异步流水线的容错

异步流水线中的错误会中断执行。需根据场景决定处理策略。

默认行为:异常中断

text
const asyncPipe =  /* ... */
try {
  await asyncPipe(fn1, fn2, fn3)(input)
} catch (error) {
  // 捕获第一个失败的任务的错误
}

解法:包裹函数返回 Result

ts
const safe = <T, E = Error>(
  fn: () => Promise<T>
): (() => Promise<{ success: true; data: T } | { success: false; error: E }>) => {
  return async () => {
    try {
      const data = await fn()
      return { success: true, data }
    } catch (error) {
      return { success: false, error: error as E }
    }
  }
}

将可能失败的函数用 safe 包裹,错误变为值,流水线可继续执行。

结语:异步控制流是现代应用的基石

asyncPipeparallelserieswithConcurrency 共同构成了异步编程的“四象限”:

模式适用场景
asyncPipe数据转换流水线(A → B → C)
parallel无依赖的批量任务(A, B, C 同时)
series有顺序依赖的任务链(A → B → C)
concurrency大量任务的资源受限执行

掌握这些模式,意味着你能:

  • 优化性能:并行减少等待时间。
  • 保障稳定性:并发控制防止雪崩。
  • 提升可读性:声明式代码替代嵌套回调。

当你用 asyncPipe 编排用户注册流程,或用 withConcurrency 安全地批量处理文件时,你已不仅是写代码,更是在设计数据的流动与系统的节奏——而这,正是高级前端工程师的核心能力。