🔥函数式与 RxJS:Observable 是 Monad 吗?
——map、flatMap、filter 的函子/单子特性
“RxJS 不是函数式库,
但它把函数式编程的 DNA,
注入了异步事件流。”
一、Observable:时间维度上的函子
Observable 是什么?
一个能随时间推移发射多个值的序列。
ts
const stream$ = interval(1000); // 0, 1, 2, 3, ...这不像 Promise(单值),而像“未来值的数组”。
二、map:Observable 是 Functor!
Functor 定律回顾:
F.map(f)把函数f: A → B提升到上下文中,得到F<B>
Observable 的 map
ts
import { map } from 'rxjs/operators';
const numbers$ = of(1, 2, 3);
const doubled$ = numbers$.pipe(map(x => x * 2));
// 1 → 2, 2 → 4, 3 → 6- 输入:
Observable<number> - 函数:
number => number - 输出:
Observable<number>
完全符合 Functor 行为!
Observable 是一个“时间函子”(Temporal Functor)
三、flatMap(mergeMap):Observable 是 Monad!
Monad 核心:chain / flatMap
用于处理“嵌套上下文”,如 Promise<Promise<T>> → Promise<T>
Observable 中的扁平化需求
ts
const userIds$ = of(1, 2, 3);
// 每个 ID 要发起 HTTP 请求
const userRequests$ = userIds$.pipe(
map(id => fetchUser(id)) // 得到 Observable<Observable<User>>
);我们不想要嵌套流,而是合并成一个流。
mergeMap 来拯救
ts
import { mergeMap } from 'rxjs/operators';
const users$ = userIds$.pipe(
mergeMap(id => fetchUser(id))
);
// 自动展开并合并所有内部 ObservablemergeMap就是flatMap的别名- 它把
A → Observable<B>应用到Observable<A>,返回Observable<B>
完全符合 Monad 的 chain 操作!
所以:Observable 是 Monad
四、filter:Applicative 的影子?
虽然 filter 不是核心类型类操作,但 Observable 支持:
ts
const even$ = numbers$.pipe(filter(x => x % 2 === 0));这类似于:
- List 的
filter - Array 的
filter
在范畴论中,这属于 Strong Functor 或 Filterable 类型类。
但更重要的是:它保持了链式组合的流畅性。
五、完整的函数式接口支持
| 操作 | 类比 | 作用 |
|---|---|---|
map | Functor | 变换值 |
mergeMap / switchMap / concatMap | Monad (chain) | 扁平化异步操作 |
filter | Filterable | 条件筛选 |
scan | Fold / Reduce | 累积状态(类似 useReducer) |
startWith | Applicative (of) | 初始值 |
combineLatest, zip | Applicative (liftA2) | 合并多个流 |
示例:combineLatest = liftA2
ts
const a$ = of(1, 2);
const b$ = of('x', 'y');
combineLatest([a$, b$]).subscribe(console.log);
// [1,'x'], [2,'x'], [2,'y']就像 liftA2(f, Maybe(1), Maybe(2)),但发生在时间上。
六、为什么说 Observable 是“最强 Monad”?
对比其他 Monad:
| Monad | 值的数量 | 特性 |
|---|---|---|
Maybe | 0 或 1 | 存在性 |
Promise | 1(未来) | 异步单值 |
Array | 0 到 N | 多值集合 |
Observable | 0 到 N(随时间) | 异步多值流 |
Observable 是最通用的计算上下文之一
它能模拟:
Promise(.toPromise())EventEmitterGeneratorsetTimeout/setInterval
七、真实场景:函数式响应式编程(FRP)
登录表单验证(纯函数式风格)
text
const username$ = fromEvent(usernameInput, 'input').pipe(
map(e => e.target.value),
filter(val => val.length >= 3),
debounceTime(300)
);
const password$ = /* 类似 */;
const loginEnabled$ = combineLatest([
username$.pipe(map(validUsername)),
password$.pipe(map(validPassword))
]).pipe(
map(([u, p]) => u && p) // liftA2(and)
);
loginEnabled$.subscribe(enabled => {
submitButton.disabled = !enabled;
});- 每一步都是纯函数变换
- 数据流清晰可测
- 错误处理可用
catchError(Monad 左折叠)
八、注意事项:Observable 的“非纯”边界
虽然 Observable 接口是函数式的,但:
订阅才有副作用
tsconst risky$ = apiCall().pipe(tap(console.log)); // 订阅前不执行Hot vs Cold Observables
- Cold:每次订阅重新开始(更纯)
- Hot:共享状态(需小心)
必须手动取消订阅
tsconst sub = interval(1000).subscribe(console.log); sub.unsubscribe(); // 避免内存泄漏