Skip to content

第 9 篇:可观测性 —— 当系统出问题时,你能看懂它吗?

在前面的文章中,我们探讨了缓存策略等重要话题。现在,让我们关注另一个关键的横切关注点:可观测性。

可观测性是现代分布式系统的重要特性,它帮助我们理解系统的内部状态,快速定位和解决问题。没有良好的可观测性,系统就像一个黑盒子,当问题发生时我们只能盲目猜测。

Metrics:Prometheus + Grafana 监控 QPS/延迟

指标监控是可观测性的基础,它帮助我们了解系统的整体健康状况:

typescript
// 指标收集服务
@Injectable()
export class MetricsService {
  private readonly registry: Registry;
  private readonly httpRequestsTotal: Counter;
  private readonly httpRequestDuration: Histogram;
  private readonly databaseQueryDuration: Histogram;
  private readonly cacheHitRate: Gauge;
  private readonly activeUsers: Gauge;

  constructor() {
    this.registry = new Registry();
    
    // HTTP 请求总数
    this.httpRequestsTotal = new Counter({
      name: 'http_requests_total',
      help: 'Total number of HTTP requests',
      labelNames: ['method', 'route', 'status_code', 'tenant_id'],
      registers: [this.registry],
    });

    // HTTP 请求延迟
    this.httpRequestDuration = new Histogram({
      name: 'http_request_duration_seconds',
      help: 'Duration of HTTP requests in seconds',
      labelNames: ['method', 'route', 'status_code', 'tenant_id'],
      buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10],
      registers: [this.registry],
    });

    // 数据库查询延迟
    this.databaseQueryDuration = new Histogram({
      name: 'database_query_duration_seconds',
      help: 'Duration of database queries in seconds',
      labelNames: ['entity', 'operation', 'tenant_id'],
      buckets: [0.001, 0.01, 0.1, 0.5, 1, 5],
      registers: [this.registry],
    });

    // 缓存命中率
    this.cacheHitRate = new Gauge({
      name: 'cache_hit_rate',
      help: 'Cache hit rate',
      labelNames: ['cache_type', 'tenant_id'],
      registers: [this.registry],
    });

    // 活跃用户数
    this.activeUsers = new Gauge({
      name: 'active_users',
      help: 'Number of active users',
      labelNames: ['tenant_id'],
      registers: [this.registry],
    });
  }

  // 记录 HTTP 请求指标
  recordHttpRequest(
    method: string,
    route: string,
    statusCode: number,
    duration: number,
    tenantId?: string,
  ): void {
    const labels = { method, route, status_code: statusCode, tenant_id: tenantId || 'unknown' };
    
    this.httpRequestsTotal.inc(labels);
    this.httpRequestDuration.observe(labels, duration);
  }

  // 记录数据库查询指标
  recordDatabaseQuery(
    entity: string,
    operation: string,
    duration: number,
    tenantId?: string,
  ): void {
    this.databaseQueryDuration.observe(
      { entity, operation, tenant_id: tenantId || 'unknown' },
      duration,
    );
  }

  // 更新缓存命中率
  updateCacheHitRate(
    cacheType: string,
    hitRate: number,
    tenantId?: string,
  ): void {
    this.cacheHitRate.set({ cache_type: cacheType, tenant_id: tenantId || 'unknown' }, hitRate);
  }

  // 更新活跃用户数
  updateActiveUsers(count: number, tenantId?: string): void {
    this.activeUsers.set({ tenant_id: tenantId || 'unknown' }, count);
  }

  // 获取指标数据
  async getMetrics(): Promise<string> {
    return await this.registry.metrics();
  }
}

// HTTP 指标中间件
@Injectable()
export class MetricsMiddleware implements NestMiddleware {
  constructor(private readonly metricsService: MetricsService) {}

  use(req: Request, res: Response, next: NextFunction) {
    const startTime = Date.now();
    const method = req.method;
    const route = this.getRoutePattern(req);

    res.on('finish', () => {
      const duration = (Date.now() - startTime) / 1000; // 转换为秒
      const statusCode = res.statusCode;
      const tenantId = (req as any).user?.tenantId;

      this.metricsService.recordHttpRequest(
        method,
        route,
        statusCode,
        duration,
        tenantId,
      );
    });

    next();
  }

  private getRoutePattern(req: Request): string {
    // 简化路由模式,避免参数导致的指标爆炸
    return req.route?.path || req.path.replace(/\/\d+/g, '/:id');
  }
}

// 数据库查询指标拦截器
@Injectable()
export class DatabaseMetricsInterceptor implements NestInterceptor {
  constructor(private readonly metricsService: MetricsService) {}

  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    const startTime = Date.now();
    const tenantId = this.getTenantId(context);
    
    return next.handle().pipe(
      tap(() => {
        const duration = (Date.now() - startTime) / 1000;
        const { entity, operation } = this.getQueryInfo(context);
        
        this.metricsService.recordDatabaseQuery(
          entity,
          operation,
          duration,
          tenantId,
        );
      }),
    );
  }

  private getTenantId(context: ExecutionContext): string | undefined {
    const request = context.switchToHttp().getRequest();
    return request.user?.tenantId;
  }

  private getQueryInfo(context: ExecutionContext): { entity: string; operation: string } {
    // 根据上下文提取查询信息
    const handler = context.getHandler();
    const className = context.getClass().name;
    
    // 简化实现,实际应用中需要更复杂的逻辑
    return {
      entity: className.replace('Service', ''),
      operation: handler.name,
    };
  }
}

Tracing:Jaeger / Zipkin 跟踪请求链路

分布式追踪帮助我们理解请求在系统中的流转过程:

typescript
// 追踪服务
@Injectable()
export class TracingService {
  private tracer: Tracer;

  constructor() {
    const serviceName = process.env.SERVICE_NAME || 'saas-app';
    
    // 初始化 Jaeger 追踪器
    const reporter = new UDPSender({
      host: process.env.JAEGER_AGENT_HOST || 'localhost',
      port: parseInt(process.env.JAEGER_AGENT_PORT || '6832', 10),
    });

    const sampler = new ConstSampler(true);

    this.tracer = initTracer({
      serviceName,
      reporter,
      sampler,
    });
  }

  startSpan(operationName: string, parentContext?: SpanContext): Span {
    const spanOptions: SpanOptions = {
      childOf: parentContext,
    };

    return this.tracer.startSpan(operationName, spanOptions);
  }

  injectTraceContext(span: Span, headers: any): void {
    this.tracer.inject(span, FORMAT_HTTP_HEADERS, headers);
  }

  extractTraceContext(headers: any): SpanContext | null {
    return this.tracer.extract(FORMAT_HTTP_HEADERS, headers);
  }

  close(): void {
    this.tracer.close();
  }
}

// 追踪中间件
@Injectable()
export class TracingMiddleware implements NestMiddleware {
  constructor(private readonly tracingService: TracingService) {}

  use(req: Request, res: Response, next: NextFunction) {
    // 从请求头中提取追踪上下文
    const parentContext = this.tracingService.extractTraceContext(req.headers);
    
    // 创建新的追踪跨度
    const span = this.tracingService.startSpan(`HTTP ${req.method} ${req.path}`, parentContext);
    
    // 添加请求标签
    span.setTag('http.method', req.method);
    span.setTag('http.url', req.url);
    span.setTag('span.kind', 'server');
    
    // 将跨度存储在请求对象中
    (req as any).span = span;

    // 在响应结束时完成跨度
    res.on('finish', () => {
      span.setTag('http.status_code', res.statusCode);
      if (res.statusCode >= 400) {
        span.setTag('error', true);
      }
      
      // 添加用户信息标签
      if ((req as any).user) {
        span.setTag('user.id', (req as any).user.id);
        span.setTag('tenant.id', (req as any).user.tenantId);
      }
      
      span.finish();
    });

    next();
  }
}

// 追踪拦截器
@Injectable()
export class TracingInterceptor implements NestInterceptor {
  constructor(private readonly tracingService: TracingService) {}

  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    const request = context.switchToHttp().getRequest();
    const parentSpan = (request as any).span;
    
    if (!parentSpan) {
      return next.handle();
    }

    const handler = context.getHandler();
    const className = context.getClass().name;
    const operationName = `${className}.${handler.name}`;
    
    // 创建子跨度
    const span = this.tracingService.startSpan(operationName, parentSpan.context());
    
    // 添加业务标签
    span.setTag('component', 'nestjs');
    span.setTag('tenant.id', request.user?.tenantId);
    
    return next.handle().pipe(
      tap({
        next: (data) => {
          // 记录返回数据大小
          if (data) {
            span.setTag('response.size', JSON.stringify(data).length);
          }
        },
        error: (error) => {
          // 记录错误信息
          span.setTag('error', true);
          span.setTag('error.message', error.message);
          span.setTag('error.code', error.code || 'UNKNOWN');
        },
        complete: () => {
          span.finish();
        },
      }),
    );
  }
}

// 数据库查询追踪
@Injectable()
export class TracingRepository extends Repository<any> {
  constructor(
    private readonly tracingService: TracingService,
    private readonly metricsService: MetricsService,
  ) {
    super();
  }

  async findWithTrace(options?: FindManyOptions<any>): Promise<any[]> {
    const parentSpan = this.getCurrentSpan();
    const span = this.tracingService.startSpan('database.find', parentSpan?.context());
    
    try {
      span.setTag('entity', this.metadata.name);
      span.setTag('query', JSON.stringify(options));
      
      const startTime = Date.now();
      const result = await super.find(options);
      const duration = (Date.now() - startTime) / 1000;
      
      span.setTag('result.count', result.length);
      
      // 记录指标
      this.metricsService.recordDatabaseQuery(
        this.metadata.name,
        'find',
        duration,
        this.getTenantId(),
      );
      
      return result;
    } catch (error) {
      span.setTag('error', true);
      span.setTag('error.message', error.message);
      throw error;
    } finally {
      span.finish();
    }
  }

  private getCurrentSpan(): Span | undefined {
    // 从请求上下文中获取当前跨度
    // 这需要与中间件和拦截器配合使用
    return undefined;
  }

  private getTenantId(): string | undefined {
    // 获取当前租户ID
    return undefined;
  }
}

Logging:结构化日志 + trace_id 串联

结构化日志帮助我们更好地分析和查询日志信息:

typescript
// 结构化日志服务
@Injectable()
export class StructuredLogger {
  private logger: winston.Logger;

  constructor() {
    this.logger = winston.createLogger({
      level: process.env.LOG_LEVEL || 'info',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.errors({ stack: true }),
        winston.format.metadata(),
        winston.format((info) => {
          // 添加追踪ID
          if (!info.traceId) {
            info.traceId = this.getCurrentTraceId();
          }
          
          // 添加租户ID
          if (!info.tenantId) {
            info.tenantId = this.getCurrentTenantId();
          }
          
          // 添加用户ID
          if (!info.userId) {
            info.userId = this.getCurrentUserId();
          }
          
          return info;
        })(),
        winston.format.json(),
      ),
      defaultMeta: { 
        service: process.env.SERVICE_NAME || 'saas-app',
        version: process.env.VERSION || 'unknown',
      },
      transports: [
        new winston.transports.File({
          filename: 'logs/structured.log',
          maxsize: 50000000, // 50MB
          maxFiles: 10,
        }),
      ],
    });

    // 在非生产环境中输出到控制台
    if (process.env.NODE_ENV !== 'production') {
      this.logger.add(new winston.transports.Console({
        format: winston.format.combine(
          winston.format.colorize(),
          winston.format.simple(),
        ),
      }));
    }
  }

  private getCurrentTraceId(): string | undefined {
    // 从请求上下文中获取追踪ID
    return undefined;
  }

  private getCurrentTenantId(): string | undefined {
    // 从请求上下文中获取租户ID
    return undefined;
  }

  private getCurrentUserId(): string | undefined {
    // 从请求上下文中获取用户ID
    return undefined;
  }

  error(message: string, meta?: any): void {
    this.logger.error(message, meta);
  }

  warn(message: string, meta?: any): void {
    this.logger.warn(message, meta);
  }

  info(message: string, meta?: any): void {
    this.logger.info(message, meta);
  }

  debug(message: string, meta?: any): void {
    this.logger.debug(message, meta);
  }

  log(level: string, message: string, meta?: any): void {
    this.logger.log(level, message, meta);
  }
}

// 日志上下文中间件
@Injectable()
export class LogContextMiddleware implements NestMiddleware {
  use(req: Request, res: Response, next: NextFunction) {
    // 生成或传递追踪ID
    const traceId = req.headers['x-trace-id'] as string || uuidv4();
    (req as any).traceId = traceId;
    
    // 在响应头中返回追踪ID,便于调试
    res.setHeader('X-Trace-ID', traceId);
    
    next();
  }
}

// 业务日志装饰器
export function LogBusinessEvent(eventType: string): MethodDecorator {
  return function (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) {
    const originalMethod = descriptor.value;

    descriptor.value = async function (...args: any[]) {
      const logger = this['structuredLogger'];
      const tenantId = this['getCurrentTenantId']?.();
      const userId = this['getCurrentUserId']?.();
      
      const startTime = Date.now();
      
      try {
        logger?.info(`Starting business event: ${eventType}`, {
          eventType,
          tenantId,
          userId,
          args: args.slice(0, 3), // 只记录前3个参数,避免日志过大
        });

        const result = await originalMethod.apply(this, args);
        
        const duration = Date.now() - startTime;
        logger?.info(`Business event completed: ${eventType}`, {
          eventType,
          tenantId,
          userId,
          duration,
          success: true,
        });
        
        return result;
      } catch (error) {
        const duration = Date.now() - startTime;
        logger?.error(`Business event failed: ${eventType}`, {
          eventType,
          tenantId,
          userId,
          duration,
          success: false,
          error: {
            name: error.name,
            message: error.message,
            code: error.code,
          },
        });
        
        throw error;
      }
    };
  };
}

// 在服务中使用
@Injectable()
export class OrderService {
  constructor(
    private readonly structuredLogger: StructuredLogger,
  ) {}

  @LogBusinessEvent('order.created')
  async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
    // 订单创建逻辑
  }

  @LogBusinessEvent('order.updated')
  async updateOrder(id: string, updateOrderDto: UpdateOrderDto): Promise<Order> {
    // 订单更新逻辑
  }
}

创业最小配置:一行日志包含 tenantId + userId + traceId

对于创业团队,可以从最小配置开始:

typescript
// 最小可观测性配置
@Injectable()
export class MinimalObservabilityService {
  private logger: winston.Logger;

  constructor() {
    this.logger = winston.createLogger({
      level: 'info',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.printf(({ 
          timestamp, 
          level, 
          message, 
          tenantId, 
          userId, 
          traceId 
        }) => {
          return `${timestamp} [${level}] [${tenantId || 'no-tenant'}] [${userId || 'no-user'}] [${traceId || 'no-trace'}] ${message}`;
        }),
      ),
      transports: [
        new winston.transports.Console(),
        new winston.transports.File({ filename: 'logs/app.log' }),
      ],
    });
  }

  logRequest(req: Request, res: Response, next: NextFunction) {
    const tenantId = (req as any).user?.tenantId;
    const userId = (req as any).user?.id;
    const traceId = (req as any).traceId || uuidv4();
    
    // 记录请求开始
    this.logger.info(`HTTP ${req.method} ${req.path} started`, {
      tenantId,
      userId,
      traceId,
    });

    const startTime = Date.now();
    
    res.on('finish', () => {
      const duration = Date.now() - startTime;
      
      this.logger.info(`HTTP ${req.method} ${req.path} completed`, {
        tenantId,
        userId,
        traceId,
        statusCode: res.statusCode,
        duration: `${duration}ms`,
      });
    });

    next();
  }

  logError(error: Error, req?: Request) {
    const tenantId = req && (req as any).user?.tenantId;
    const userId = req && (req as any).user?.id;
    const traceId = req && (req as any).traceId;
    
    this.logger.error(error.message, {
      tenantId,
      userId,
      traceId,
      stack: error.stack,
    });
  }
}

// 全局异常过滤器集成可观测性
@Catch()
export class ObservabilityExceptionFilter implements ExceptionFilter {
  constructor(
    private readonly minimalObservabilityService: MinimalObservabilityService,
  ) {}

  catch(exception: unknown, host: ArgumentsHost) {
    const ctx = host.switchToHttp();
    const response = ctx.getResponse();
    const request = ctx.getRequest();

    // 记录异常
    this.minimalObservabilityService.logError(
      exception instanceof Error ? exception : new Error(String(exception)),
      request,
    );

    // 正常的异常响应处理
    const status = exception instanceof HttpException ? exception.getStatus() : 500;
    
    response.status(status).json({
      statusCode: status,
      timestamp: new Date().toISOString(),
      path: request.url,
      message: exception instanceof Error ? exception.message : String(exception),
    });
  }
}

创业团队行动清单

  1. 立即行动

    • 实现最小配置的结构化日志
    • 添加 HTTP 请求监控指标
    • 集成分布式追踪系统
  2. 一周内完成

    • 实现数据库查询指标监控
    • 添加业务事件日志记录
    • 建立日志聚合和查询系统
  3. 一月内完善

    • 实现完整的指标监控面板
    • 建立告警规则和通知机制
    • 完善分布式追踪的采样策略

总结

可观测性是保障系统稳定运行的关键,正确的实现需要:

  1. 指标监控:通过 Prometheus + Grafana 监控系统关键指标
  2. 分布式追踪:使用 Jaeger/Zipkin 跟踪请求链路
  3. 结构化日志:实现包含上下文信息的结构化日志
  4. 渐进式实施:从最小配置开始,逐步完善

在下一篇文章中,我们将探讨配置管理,这是保障系统安全和灵活性的重要手段。