第 9 篇:可观测性 —— 当系统出问题时,你能看懂它吗?
在前面的文章中,我们探讨了缓存策略等重要话题。现在,让我们关注另一个关键的横切关注点:可观测性。
可观测性是现代分布式系统的重要特性,它帮助我们理解系统的内部状态,快速定位和解决问题。没有良好的可观测性,系统就像一个黑盒子,当问题发生时我们只能盲目猜测。
Metrics:Prometheus + Grafana 监控 QPS/延迟
指标监控是可观测性的基础,它帮助我们了解系统的整体健康状况:
typescript
// 指标收集服务
@Injectable()
export class MetricsService {
private readonly registry: Registry;
private readonly httpRequestsTotal: Counter;
private readonly httpRequestDuration: Histogram;
private readonly databaseQueryDuration: Histogram;
private readonly cacheHitRate: Gauge;
private readonly activeUsers: Gauge;
constructor() {
this.registry = new Registry();
// HTTP 请求总数
this.httpRequestsTotal = new Counter({
name: 'http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['method', 'route', 'status_code', 'tenant_id'],
registers: [this.registry],
});
// HTTP 请求延迟
this.httpRequestDuration = new Histogram({
name: 'http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['method', 'route', 'status_code', 'tenant_id'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10],
registers: [this.registry],
});
// 数据库查询延迟
this.databaseQueryDuration = new Histogram({
name: 'database_query_duration_seconds',
help: 'Duration of database queries in seconds',
labelNames: ['entity', 'operation', 'tenant_id'],
buckets: [0.001, 0.01, 0.1, 0.5, 1, 5],
registers: [this.registry],
});
// 缓存命中率
this.cacheHitRate = new Gauge({
name: 'cache_hit_rate',
help: 'Cache hit rate',
labelNames: ['cache_type', 'tenant_id'],
registers: [this.registry],
});
// 活跃用户数
this.activeUsers = new Gauge({
name: 'active_users',
help: 'Number of active users',
labelNames: ['tenant_id'],
registers: [this.registry],
});
}
// 记录 HTTP 请求指标
recordHttpRequest(
method: string,
route: string,
statusCode: number,
duration: number,
tenantId?: string,
): void {
const labels = { method, route, status_code: statusCode, tenant_id: tenantId || 'unknown' };
this.httpRequestsTotal.inc(labels);
this.httpRequestDuration.observe(labels, duration);
}
// 记录数据库查询指标
recordDatabaseQuery(
entity: string,
operation: string,
duration: number,
tenantId?: string,
): void {
this.databaseQueryDuration.observe(
{ entity, operation, tenant_id: tenantId || 'unknown' },
duration,
);
}
// 更新缓存命中率
updateCacheHitRate(
cacheType: string,
hitRate: number,
tenantId?: string,
): void {
this.cacheHitRate.set({ cache_type: cacheType, tenant_id: tenantId || 'unknown' }, hitRate);
}
// 更新活跃用户数
updateActiveUsers(count: number, tenantId?: string): void {
this.activeUsers.set({ tenant_id: tenantId || 'unknown' }, count);
}
// 获取指标数据
async getMetrics(): Promise<string> {
return await this.registry.metrics();
}
}
// HTTP 指标中间件
@Injectable()
export class MetricsMiddleware implements NestMiddleware {
constructor(private readonly metricsService: MetricsService) {}
use(req: Request, res: Response, next: NextFunction) {
const startTime = Date.now();
const method = req.method;
const route = this.getRoutePattern(req);
res.on('finish', () => {
const duration = (Date.now() - startTime) / 1000; // 转换为秒
const statusCode = res.statusCode;
const tenantId = (req as any).user?.tenantId;
this.metricsService.recordHttpRequest(
method,
route,
statusCode,
duration,
tenantId,
);
});
next();
}
private getRoutePattern(req: Request): string {
// 简化路由模式,避免参数导致的指标爆炸
return req.route?.path || req.path.replace(/\/\d+/g, '/:id');
}
}
// 数据库查询指标拦截器
@Injectable()
export class DatabaseMetricsInterceptor implements NestInterceptor {
constructor(private readonly metricsService: MetricsService) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const startTime = Date.now();
const tenantId = this.getTenantId(context);
return next.handle().pipe(
tap(() => {
const duration = (Date.now() - startTime) / 1000;
const { entity, operation } = this.getQueryInfo(context);
this.metricsService.recordDatabaseQuery(
entity,
operation,
duration,
tenantId,
);
}),
);
}
private getTenantId(context: ExecutionContext): string | undefined {
const request = context.switchToHttp().getRequest();
return request.user?.tenantId;
}
private getQueryInfo(context: ExecutionContext): { entity: string; operation: string } {
// 根据上下文提取查询信息
const handler = context.getHandler();
const className = context.getClass().name;
// 简化实现,实际应用中需要更复杂的逻辑
return {
entity: className.replace('Service', ''),
operation: handler.name,
};
}
}Tracing:Jaeger / Zipkin 跟踪请求链路
分布式追踪帮助我们理解请求在系统中的流转过程:
typescript
// 追踪服务
@Injectable()
export class TracingService {
private tracer: Tracer;
constructor() {
const serviceName = process.env.SERVICE_NAME || 'saas-app';
// 初始化 Jaeger 追踪器
const reporter = new UDPSender({
host: process.env.JAEGER_AGENT_HOST || 'localhost',
port: parseInt(process.env.JAEGER_AGENT_PORT || '6832', 10),
});
const sampler = new ConstSampler(true);
this.tracer = initTracer({
serviceName,
reporter,
sampler,
});
}
startSpan(operationName: string, parentContext?: SpanContext): Span {
const spanOptions: SpanOptions = {
childOf: parentContext,
};
return this.tracer.startSpan(operationName, spanOptions);
}
injectTraceContext(span: Span, headers: any): void {
this.tracer.inject(span, FORMAT_HTTP_HEADERS, headers);
}
extractTraceContext(headers: any): SpanContext | null {
return this.tracer.extract(FORMAT_HTTP_HEADERS, headers);
}
close(): void {
this.tracer.close();
}
}
// 追踪中间件
@Injectable()
export class TracingMiddleware implements NestMiddleware {
constructor(private readonly tracingService: TracingService) {}
use(req: Request, res: Response, next: NextFunction) {
// 从请求头中提取追踪上下文
const parentContext = this.tracingService.extractTraceContext(req.headers);
// 创建新的追踪跨度
const span = this.tracingService.startSpan(`HTTP ${req.method} ${req.path}`, parentContext);
// 添加请求标签
span.setTag('http.method', req.method);
span.setTag('http.url', req.url);
span.setTag('span.kind', 'server');
// 将跨度存储在请求对象中
(req as any).span = span;
// 在响应结束时完成跨度
res.on('finish', () => {
span.setTag('http.status_code', res.statusCode);
if (res.statusCode >= 400) {
span.setTag('error', true);
}
// 添加用户信息标签
if ((req as any).user) {
span.setTag('user.id', (req as any).user.id);
span.setTag('tenant.id', (req as any).user.tenantId);
}
span.finish();
});
next();
}
}
// 追踪拦截器
@Injectable()
export class TracingInterceptor implements NestInterceptor {
constructor(private readonly tracingService: TracingService) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const request = context.switchToHttp().getRequest();
const parentSpan = (request as any).span;
if (!parentSpan) {
return next.handle();
}
const handler = context.getHandler();
const className = context.getClass().name;
const operationName = `${className}.${handler.name}`;
// 创建子跨度
const span = this.tracingService.startSpan(operationName, parentSpan.context());
// 添加业务标签
span.setTag('component', 'nestjs');
span.setTag('tenant.id', request.user?.tenantId);
return next.handle().pipe(
tap({
next: (data) => {
// 记录返回数据大小
if (data) {
span.setTag('response.size', JSON.stringify(data).length);
}
},
error: (error) => {
// 记录错误信息
span.setTag('error', true);
span.setTag('error.message', error.message);
span.setTag('error.code', error.code || 'UNKNOWN');
},
complete: () => {
span.finish();
},
}),
);
}
}
// 数据库查询追踪
@Injectable()
export class TracingRepository extends Repository<any> {
constructor(
private readonly tracingService: TracingService,
private readonly metricsService: MetricsService,
) {
super();
}
async findWithTrace(options?: FindManyOptions<any>): Promise<any[]> {
const parentSpan = this.getCurrentSpan();
const span = this.tracingService.startSpan('database.find', parentSpan?.context());
try {
span.setTag('entity', this.metadata.name);
span.setTag('query', JSON.stringify(options));
const startTime = Date.now();
const result = await super.find(options);
const duration = (Date.now() - startTime) / 1000;
span.setTag('result.count', result.length);
// 记录指标
this.metricsService.recordDatabaseQuery(
this.metadata.name,
'find',
duration,
this.getTenantId(),
);
return result;
} catch (error) {
span.setTag('error', true);
span.setTag('error.message', error.message);
throw error;
} finally {
span.finish();
}
}
private getCurrentSpan(): Span | undefined {
// 从请求上下文中获取当前跨度
// 这需要与中间件和拦截器配合使用
return undefined;
}
private getTenantId(): string | undefined {
// 获取当前租户ID
return undefined;
}
}Logging:结构化日志 + trace_id 串联
结构化日志帮助我们更好地分析和查询日志信息:
typescript
// 结构化日志服务
@Injectable()
export class StructuredLogger {
private logger: winston.Logger;
constructor() {
this.logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.metadata(),
winston.format((info) => {
// 添加追踪ID
if (!info.traceId) {
info.traceId = this.getCurrentTraceId();
}
// 添加租户ID
if (!info.tenantId) {
info.tenantId = this.getCurrentTenantId();
}
// 添加用户ID
if (!info.userId) {
info.userId = this.getCurrentUserId();
}
return info;
})(),
winston.format.json(),
),
defaultMeta: {
service: process.env.SERVICE_NAME || 'saas-app',
version: process.env.VERSION || 'unknown',
},
transports: [
new winston.transports.File({
filename: 'logs/structured.log',
maxsize: 50000000, // 50MB
maxFiles: 10,
}),
],
});
// 在非生产环境中输出到控制台
if (process.env.NODE_ENV !== 'production') {
this.logger.add(new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple(),
),
}));
}
}
private getCurrentTraceId(): string | undefined {
// 从请求上下文中获取追踪ID
return undefined;
}
private getCurrentTenantId(): string | undefined {
// 从请求上下文中获取租户ID
return undefined;
}
private getCurrentUserId(): string | undefined {
// 从请求上下文中获取用户ID
return undefined;
}
error(message: string, meta?: any): void {
this.logger.error(message, meta);
}
warn(message: string, meta?: any): void {
this.logger.warn(message, meta);
}
info(message: string, meta?: any): void {
this.logger.info(message, meta);
}
debug(message: string, meta?: any): void {
this.logger.debug(message, meta);
}
log(level: string, message: string, meta?: any): void {
this.logger.log(level, message, meta);
}
}
// 日志上下文中间件
@Injectable()
export class LogContextMiddleware implements NestMiddleware {
use(req: Request, res: Response, next: NextFunction) {
// 生成或传递追踪ID
const traceId = req.headers['x-trace-id'] as string || uuidv4();
(req as any).traceId = traceId;
// 在响应头中返回追踪ID,便于调试
res.setHeader('X-Trace-ID', traceId);
next();
}
}
// 业务日志装饰器
export function LogBusinessEvent(eventType: string): MethodDecorator {
return function (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) {
const originalMethod = descriptor.value;
descriptor.value = async function (...args: any[]) {
const logger = this['structuredLogger'];
const tenantId = this['getCurrentTenantId']?.();
const userId = this['getCurrentUserId']?.();
const startTime = Date.now();
try {
logger?.info(`Starting business event: ${eventType}`, {
eventType,
tenantId,
userId,
args: args.slice(0, 3), // 只记录前3个参数,避免日志过大
});
const result = await originalMethod.apply(this, args);
const duration = Date.now() - startTime;
logger?.info(`Business event completed: ${eventType}`, {
eventType,
tenantId,
userId,
duration,
success: true,
});
return result;
} catch (error) {
const duration = Date.now() - startTime;
logger?.error(`Business event failed: ${eventType}`, {
eventType,
tenantId,
userId,
duration,
success: false,
error: {
name: error.name,
message: error.message,
code: error.code,
},
});
throw error;
}
};
};
}
// 在服务中使用
@Injectable()
export class OrderService {
constructor(
private readonly structuredLogger: StructuredLogger,
) {}
@LogBusinessEvent('order.created')
async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
// 订单创建逻辑
}
@LogBusinessEvent('order.updated')
async updateOrder(id: string, updateOrderDto: UpdateOrderDto): Promise<Order> {
// 订单更新逻辑
}
}创业最小配置:一行日志包含 tenantId + userId + traceId
对于创业团队,可以从最小配置开始:
typescript
// 最小可观测性配置
@Injectable()
export class MinimalObservabilityService {
private logger: winston.Logger;
constructor() {
this.logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.printf(({
timestamp,
level,
message,
tenantId,
userId,
traceId
}) => {
return `${timestamp} [${level}] [${tenantId || 'no-tenant'}] [${userId || 'no-user'}] [${traceId || 'no-trace'}] ${message}`;
}),
),
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'logs/app.log' }),
],
});
}
logRequest(req: Request, res: Response, next: NextFunction) {
const tenantId = (req as any).user?.tenantId;
const userId = (req as any).user?.id;
const traceId = (req as any).traceId || uuidv4();
// 记录请求开始
this.logger.info(`HTTP ${req.method} ${req.path} started`, {
tenantId,
userId,
traceId,
});
const startTime = Date.now();
res.on('finish', () => {
const duration = Date.now() - startTime;
this.logger.info(`HTTP ${req.method} ${req.path} completed`, {
tenantId,
userId,
traceId,
statusCode: res.statusCode,
duration: `${duration}ms`,
});
});
next();
}
logError(error: Error, req?: Request) {
const tenantId = req && (req as any).user?.tenantId;
const userId = req && (req as any).user?.id;
const traceId = req && (req as any).traceId;
this.logger.error(error.message, {
tenantId,
userId,
traceId,
stack: error.stack,
});
}
}
// 全局异常过滤器集成可观测性
@Catch()
export class ObservabilityExceptionFilter implements ExceptionFilter {
constructor(
private readonly minimalObservabilityService: MinimalObservabilityService,
) {}
catch(exception: unknown, host: ArgumentsHost) {
const ctx = host.switchToHttp();
const response = ctx.getResponse();
const request = ctx.getRequest();
// 记录异常
this.minimalObservabilityService.logError(
exception instanceof Error ? exception : new Error(String(exception)),
request,
);
// 正常的异常响应处理
const status = exception instanceof HttpException ? exception.getStatus() : 500;
response.status(status).json({
statusCode: status,
timestamp: new Date().toISOString(),
path: request.url,
message: exception instanceof Error ? exception.message : String(exception),
});
}
}创业团队行动清单
立即行动:
- 实现最小配置的结构化日志
- 添加 HTTP 请求监控指标
- 集成分布式追踪系统
一周内完成:
- 实现数据库查询指标监控
- 添加业务事件日志记录
- 建立日志聚合和查询系统
一月内完善:
- 实现完整的指标监控面板
- 建立告警规则和通知机制
- 完善分布式追踪的采样策略
总结
可观测性是保障系统稳定运行的关键,正确的实现需要:
- 指标监控:通过 Prometheus + Grafana 监控系统关键指标
- 分布式追踪:使用 Jaeger/Zipkin 跟踪请求链路
- 结构化日志:实现包含上下文信息的结构化日志
- 渐进式实施:从最小配置开始,逐步完善
在下一篇文章中,我们将探讨配置管理,这是保障系统安全和灵活性的重要手段。