拦截器中的 handle():Observable 流的魔法
在 NestJS 拦截器中,handle() 方法是一个关键组件,它返回一个 Observable 流,允许我们以声明式的方式操作请求和响应。通过 RxJS 的强大功能,我们可以实现复杂的异步转换、错误处理和流控制。本文将深入探讨 handle() 方法的工作原理,以及如何利用 RxJS 操作符来实现各种高级功能。
1. Observable 基础概念
1.1 什么是 Observable?
Observable 是 RxJS 中的核心概念,代表一个可观察的异步数据流:
typescript
import { Observable, of, from, interval } from 'rxjs';
// 创建简单的 Observable
const simpleObservable: Observable<string> = of('Hello', 'World');
// 从数组创建 Observable
const arrayObservable: Observable<number> = from([1, 2, 3, 4, 5]);
// 创建定时 Observable
const timerObservable: Observable<number> = interval(1000); // 每秒发射一个值1.2 Observable 的基本操作
typescript
import { of } from 'rxjs';
import { map, filter, tap } from 'rxjs/operators';
// Observable 管道操作
of(1, 2, 3, 4, 5)
.pipe(
filter(x => x % 2 === 0), // 过滤偶数
map(x => x * 2), // 每个数乘以2
tap(x => console.log(x)) // 副作用操作
)
.subscribe(x => console.log(`Result: ${x}`));
// 输出:
// 4
// Result: 4
// 8
// Result: 82. 拦截器中的 handle() 方法
2.1 handle() 方法的作用
在 NestJS 拦截器中,handle() 方法返回一个 Observable,代表路由处理函数的执行结果:
typescript
import { Injectable, NestInterceptor, ExecutionContext, CallHandler } from '@nestjs/common';
import { Observable } from 'rxjs';
@Injectable()
export class LoggingInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
console.log('Before...');
// handle() 返回路由处理函数的结果流
const now = Date.now();
return next
.handle()
.pipe(
tap(() => console.log(`After... ${Date.now() - now}ms`)),
);
}
}2.2 handle() 的执行机制
typescript
// 简化的 handle() 执行过程
class CallHandlerImpl implements CallHandler {
constructor(
private readonly target: Function,
private readonly args: any[],
) {}
handle(): Observable<any> {
try {
// 执行目标函数
const result = this.target(...this.args);
// 根据返回值类型创建相应的 Observable
if (result instanceof Observable) {
return result;
}
if (result instanceof Promise) {
return from(result);
}
// 同步值转换为 Observable
return of(result);
} catch (error) {
// 错误转换为错误 Observable
return throwError(error);
}
}
}3. RxJS 操作符在拦截器中的应用
3.1 map 操作符:数据转换
typescript
import { map } from 'rxjs/operators';
@Injectable()
export class TransformInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next
.handle()
.pipe(
map(data => ({
data,
statusCode: context.switchToHttp().getResponse().statusCode,
timestamp: new Date().toISOString(),
})),
);
}
}
// 使用示例
// 控制器返回: { id: 1, name: 'John' }
// 拦截器转换为:
// {
// data: { id: 1, name: 'John' },
// statusCode: 200,
// timestamp: '2023-01-01T00:00:00.000Z'
// }3.2 catchError 操作符:错误处理
typescript
import { catchError } from 'rxjs/operators';
import { throwError } from 'rxjs';
@Injectable()
export class ErrorInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next
.handle()
.pipe(
catchError(error => {
// 记录错误日志
console.error('Error occurred:', error);
// 转换错误格式
return throwError({
statusCode: error.status || 500,
message: error.message || 'Internal server error',
timestamp: new Date().toISOString(),
});
}),
);
}
}3.3 switchMap 操作符:异步转换
typescript
import { switchMap } from 'rxjs/operators';
@Injectable()
export class DataEnrichmentInterceptor implements NestInterceptor {
constructor(private readonly userService: UserService) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next
.handle()
.pipe(
switchMap(data => {
// 基于原始数据执行异步操作
if (data && data.userId) {
// 获取用户详细信息来丰富数据
return this.userService.findById(data.userId).pipe(
map(user => ({
...data,
user: {
name: user.name,
email: user.email,
},
})),
);
}
// 如果没有 userId,直接返回原始数据
return of(data);
}),
);
}
}4. 高级拦截器模式
4.1 缓存拦截器
typescript
import { of, Observable } from 'rxjs';
@Injectable()
export class CacheInterceptor implements NestInterceptor {
private readonly cache = new Map<string, { data: any; timestamp: number }>();
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const request = context.switchToHttp().getRequest();
// 只缓存 GET 请求
if (request.method !== 'GET') {
return next.handle();
}
const key = this.generateCacheKey(request);
const cachedItem = this.cache.get(key);
// 检查缓存是否有效(5分钟有效期)
if (cachedItem && Date.now() - cachedItem.timestamp < 5 * 60 * 1000) {
return of(cachedItem.data);
}
// 执行实际处理并缓存结果
return next
.handle()
.pipe(
tap(data => {
this.cache.set(key, {
data,
timestamp: Date.now(),
});
}),
);
}
private generateCacheKey(request: any): string {
return `${request.method}:${request.url}:${JSON.stringify(request.query)}`;
}
}4.2 限流拦截器
typescript
import { throwError, Observable } from 'rxjs';
import { catchError, delayWhen, retryWhen, mergeMap } from 'rxjs/operators';
@Injectable()
export class RateLimitInterceptor implements NestInterceptor {
private readonly requestCounts = new Map<string, { count: number; resetTime: number }>();
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const request = context.switchToHttp().getRequest();
const clientId = this.getClientId(request);
const now = Date.now();
// 获取客户端请求计数
let clientInfo = this.requestCounts.get(clientId);
if (!clientInfo || clientInfo.resetTime < now) {
clientInfo = { count: 0, resetTime: now + 60000 }; // 1分钟重置
this.requestCounts.set(clientId, clientInfo);
}
// 检查是否超过限制(每分钟100次请求)
if (clientInfo.count >= 100) {
return throwError({
status: 429,
message: 'Too Many Requests',
});
}
// 增加请求计数
clientInfo.count++;
return next.handle();
}
private getClientId(request: any): string {
return request.ip || request.headers['x-forwarded-for'] || 'unknown';
}
}4.3 重试拦截器
typescript
import { retryWhen, delayWhen, mergeMap, timer, throwError } from 'rxjs';
@Injectable()
export class RetryInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next
.handle()
.pipe(
retryWhen(errors =>
errors.pipe(
mergeMap((error, index) => {
const retryAttempt = index + 1;
// 最多重试3次
if (retryAttempt > 3) {
return throwError(error);
}
// 检查是否是可重试的错误
if (this.isRetriableError(error)) {
// 指数退避延迟
const delay = Math.pow(2, retryAttempt) * 1000;
return timer(delay);
}
// 不可重试的错误直接抛出
return throwError(error);
}),
),
),
);
}
private isRetriableError(error: any): boolean {
// 可重试的错误类型:网络错误、超时等
return error.status >= 500 || error.status === 408 || error.status === 429;
}
}5. 流控制和背压处理
5.1 背压处理
typescript
import { Observable } from 'rxjs';
import { buffer, debounceTime, concatMap } from 'rxjs/operators';
@Injectable()
export class BackpressureInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next
.handle()
.pipe(
// 缓冲快速连续的请求
buffer(next.handle().pipe(debounceTime(100))),
concatMap(bufferedData => {
// 顺序处理缓冲的数据
return from(bufferedData);
}),
);
}
}5.2 流合并
typescript
import { merge, forkJoin } from 'rxjs';
import { mergeMap, map } from 'rxjs/operators';
@Injectable()
export class ParallelProcessingInterceptor implements NestInterceptor {
constructor(
private readonly userService: UserService,
private readonly orderService: OrderService,
) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next
.handle()
.pipe(
mergeMap(data => {
// 并行获取用户和订单信息
const user$ = this.userService.findById(data.userId);
const orders$ = this.orderService.findByUserId(data.userId);
// 等待所有并行操作完成
return forkJoin({
userData: user$,
orders: orders$,
}).pipe(
map(({ userData, orders }) => ({
...data,
user: userData,
orders: orders,
})),
);
}),
);
}
}6. 实际应用示例
6.1 综合数据处理拦截器
typescript
@Injectable()
export class ComprehensiveDataInterceptor implements NestInterceptor {
constructor(
private readonly logger: Logger,
private readonly cacheService: CacheService,
) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const request = context.switchToHttp().getRequest();
const startTime = Date.now();
// 请求前处理
this.logger.log(`Processing ${request.method} ${request.url}`);
return next
.handle()
.pipe(
// 数据转换
map(data => this.transformData(data)),
// 缓存处理
tap(data => this.cacheService.set(this.getCacheKey(request), data)),
// 性能监控
tap({
next: () => {
const duration = Date.now() - startTime;
this.logger.log(`Request completed in ${duration}ms`);
},
error: (error) => {
const duration = Date.now() - startTime;
this.logger.error(`Request failed after ${duration}ms: ${error.message}`);
},
}),
// 错误处理
catchError(error => {
this.logger.error('Error in interceptor:', error);
return throwError(error);
}),
);
}
private transformData(data: any): any {
// 数据转换逻辑
return {
...data,
processedAt: new Date().toISOString(),
};
}
private getCacheKey(request: any): string {
return `${request.method}:${request.url}`;
}
}7. 总结
NestJS 拦截器中的 handle() 方法结合 RxJS 提供了强大的流处理能力:
- 异步处理:通过 Observable 处理同步和异步数据
- 操作符链:使用 RxJS 操作符链式处理数据
- 错误处理:通过 catchError 等操作符统一处理错误
- 流控制:通过各种操作符控制数据流的行为
RxJS 操作符的强大功能包括:
- map:数据转换
- filter:数据过滤
- switchMap:异步转换
- catchError:错误处理
- retryWhen:重试机制
- buffer:缓冲处理
通过深入理解 handle() 方法和 RxJS 操作符,我们可以:
- 构建更加灵活和强大的拦截器
- 实现复杂的异步数据处理逻辑
- 提高应用的性能和可靠性
- 创建可复用的横切关注点处理机制
在下一篇文章中,我们将探讨自定义装饰器:如何创建 @CurrentUser(),了解如何结合 @SetMetadata() 与守卫,实现上下文注入。