Skip to content

🔥函数式与 RxJS:Observable 是 Monad 吗?

——mapflatMapfilter 的函子/单子特性

“RxJS 不是函数式库,
但它把函数式编程的 DNA,
注入了异步事件流。”

一、Observable:时间维度上的函子

Observable 是什么?

一个能随时间推移发射多个值的序列。

ts
const stream$ = interval(1000); // 0, 1, 2, 3, ...

这不像 Promise(单值),而像“未来值的数组”。

二、map:Observable 是 Functor!

Functor 定律回顾:

F.map(f) 把函数 f: A → B 提升到上下文中,得到 F<B>

Observable 的 map

ts
import { map } from 'rxjs/operators';

const numbers$ = of(1, 2, 3);
const doubled$ = numbers$.pipe(map(x => x * 2));
// 1 → 2, 2 → 4, 3 → 6
  • 输入:Observable<number>
  • 函数:number => number
  • 输出:Observable<number>

完全符合 Functor 行为!

Observable 是一个“时间函子”(Temporal Functor)

三、flatMap(mergeMap):Observable 是 Monad!

Monad 核心:chain / flatMap

用于处理“嵌套上下文”,如 Promise<Promise<T>>Promise<T>

Observable 中的扁平化需求

ts
const userIds$ = of(1, 2, 3);

// 每个 ID 要发起 HTTP 请求
const userRequests$ = userIds$.pipe(
  map(id => fetchUser(id)) // 得到 Observable<Observable<User>>
);

我们不想要嵌套流,而是合并成一个流。

mergeMap 来拯救

ts
import { mergeMap } from 'rxjs/operators';

const users$ = userIds$.pipe(
  mergeMap(id => fetchUser(id))
);
// 自动展开并合并所有内部 Observable
  • mergeMap 就是 flatMap 的别名
  • 它把 A → Observable<B> 应用到 Observable<A>,返回 Observable<B>

完全符合 Monad 的 chain 操作!

所以:Observable 是 Monad

四、filter:Applicative 的影子?

虽然 filter 不是核心类型类操作,但 Observable 支持:

ts
const even$ = numbers$.pipe(filter(x => x % 2 === 0));

这类似于:

  • List 的 filter
  • Array 的 filter

在范畴论中,这属于 Strong FunctorFilterable 类型类。

但更重要的是:它保持了链式组合的流畅性。

五、完整的函数式接口支持

操作类比作用
mapFunctor变换值
mergeMap / switchMap / concatMapMonad (chain)扁平化异步操作
filterFilterable条件筛选
scanFold / Reduce累积状态(类似 useReducer
startWithApplicative (of)初始值
combineLatest, zipApplicative (liftA2)合并多个流

示例:combineLatest = liftA2

ts
const a$ = of(1, 2);
const b$ = of('x', 'y');

combineLatest([a$, b$]).subscribe(console.log);
// [1,'x'], [2,'x'], [2,'y']

就像 liftA2(f, Maybe(1), Maybe(2)),但发生在时间上。

六、为什么说 Observable 是“最强 Monad”?

对比其他 Monad:

Monad值的数量特性
Maybe0 或 1存在性
Promise1(未来)异步单值
Array0 到 N多值集合
Observable0 到 N(随时间)异步多值流

Observable 是最通用的计算上下文之一

它能模拟:

  • Promise.toPromise()
  • EventEmitter
  • Generator
  • setTimeout / setInterval

七、真实场景:函数式响应式编程(FRP)

登录表单验证(纯函数式风格)

text
const username$ = fromEvent(usernameInput, 'input').pipe(
  map(e => e.target.value),
  filter(val => val.length >= 3),
  debounceTime(300)
);

const password$ = /* 类似 */;

const loginEnabled$ = combineLatest([
  username$.pipe(map(validUsername)),
  password$.pipe(map(validPassword))
]).pipe(
  map(([u, p]) => u && p) // liftA2(and)
);

loginEnabled$.subscribe(enabled => {
  submitButton.disabled = !enabled;
});
  • 每一步都是纯函数变换
  • 数据流清晰可测
  • 错误处理可用 catchError(Monad 左折叠)

八、注意事项:Observable 的“非纯”边界

虽然 Observable 接口是函数式的,但:

  1. 订阅才有副作用

    ts
    const risky$ = apiCall().pipe(tap(console.log)); // 订阅前不执行
  2. Hot vs Cold Observables

    • Cold:每次订阅重新开始(更纯)
    • Hot:共享状态(需小心)
  3. 必须手动取消订阅

    ts
    const sub = interval(1000).subscribe(console.log);
    sub.unsubscribe(); // 避免内存泄漏