Skip to content

中间步骤的日志与监控:通过 RunnableConfig.callbacks 注入

集成 LangSmith、自定义 Logger

在构建生产级 LLM 应用时,监控和日志记录是确保系统稳定性和可观察性的关键要素。LangChain V3 通过 RunnableConfig.callbacks 提供了强大的监控和日志注入机制,使得开发者能够在工作流的每个步骤中收集详细的执行信息。本章将深入探讨如何通过回调机制实现全面的日志和监控。

Callbacks 机制的基本概念

RunnableConfig.callbacks 是 LangChain V3 中用于监控和日志记录的核心机制:

typescript
interface RunnableConfig {
  callbacks?: Callbacks;
  metadata?: Record<string, any>;
  tags?: string[];
  signal?: AbortSignal;
  timeout?: number;
}

interface Callbacks {
  onChainStart?: (chain: Runnable, inputs: any, runId: string) => Promise<void> | void;
  onChainEnd?: (outputs: any, runId: string) => Promise<void> | void;
  onChainError?: (error: Error, runId: string) => Promise<void> | void;
  onLLMStart?: (llm: BaseLanguageModel, prompts: string[], runId: string) => Promise<void> | void;
  onLLMEnd?: (output: LLMResult, runId: string) => Promise<void> | void;
  onLLMError?: (error: Error, runId: string) => Promise<void> | void;
  onToolStart?: (tool: Tool, input: string, runId: string) => Promise<void> | void;
  onToolEnd?: (output: string, runId: string) => Promise<void> | void;
  onToolError?: (error: Error, runId: string) => Promise<void> | void;
}

基础日志回调实现

让我们实现一个基础的日志回调系统:

typescript
class LoggingCallbackHandler {
  private logger: Logger;
  
  constructor(logger: Logger) {
    this.logger = logger;
  }
  
  async onChainStart(chain: Runnable, inputs: any, runId: string): Promise<void> {
    this.logger.info('链开始执行', {
      chainName: chain.constructor.name,
      inputs,
      runId
    });
  }
  
  async onChainEnd(outputs: any, runId: string): Promise<void> {
    this.logger.info('链执行完成', {
      outputs,
      runId
    });
  }
  
  async onChainError(error: Error, runId: string): Promise<void> {
    this.logger.error('链执行错误', {
      error: error.message,
      stack: error.stack,
      runId
    });
  }
  
  async onLLMStart(llm: BaseLanguageModel, prompts: string[], runId: string): Promise<void> {
    this.logger.info('LLM 开始调用', {
      modelName: llm.constructor.name,
      promptCount: prompts.length,
      runId
    });
  }
  
  async onLLMEnd(output: LLMResult, runId: string): Promise<void> {
    this.logger.info('LLM 调用完成', {
      tokenUsage: output.llmOutput?.tokenUsage,
      generationCount: output.generations.length,
      runId
    });
  }
  
  async onLLMError(error: Error, runId: string): Promise<void> {
    this.logger.error('LLM 调用错误', {
      error: error.message,
      runId
    });
  }
  
  async onToolStart(tool: Tool, input: string, runId: string): Promise<void> {
    this.logger.info('工具开始执行', {
      toolName: tool.name,
      input,
      runId
    });
  }
  
  async onToolEnd(output: string, runId: string): Promise<void> {
    this.logger.info('工具执行完成', {
      output,
      runId
    });
  }
  
  async onToolError(error: Error, runId: string): Promise<void> {
    this.logger.error('工具执行错误', {
      error: error.message,
      toolName: (error as any).toolName,
      runId
    });
  }
}

高级监控回调实现

实现更高级的监控回调,包括性能指标和分布式追踪:

typescript
interface PerformanceMetrics {
  executionTime: number;
  tokenCount?: number;
  retryCount?: number;
  errorCount?: number;
}

class MonitoringCallbackHandler {
  private metricsCollector: MetricsCollector;
  private tracer: Tracer;
  private activeRuns: Map<string, { startTime: number; type: string }>;
  
  constructor(metricsCollector: MetricsCollector, tracer: Tracer) {
    this.metricsCollector = metricsCollector;
    this.tracer = tracer;
    this.activeRuns = new Map();
  }
  
  async onChainStart(chain: Runnable, inputs: any, runId: string): Promise<void> {
    const startTime = Date.now();
    this.activeRuns.set(runId, { startTime, type: 'chain' });
    
    // 开始追踪
    this.tracer.startSpan(`chain.${chain.constructor.name}`, {
      runId,
      startTime
    });
    
    // 记录指标
    this.metricsCollector.increment('chain.started', {
      chainType: chain.constructor.name
    });
  }
  
  async onChainEnd(outputs: any, runId: string): Promise<void> {
    const runInfo = this.activeRuns.get(runId);
    if (runInfo) {
      const executionTime = Date.now() - runInfo.startTime;
      
      // 结束追踪
      this.tracer.endSpan(runId, { executionTime });
      
      // 记录性能指标
      this.metricsCollector.timing('chain.execution_time', executionTime, {
        chainType: runInfo.type
      });
      
      this.metricsCollector.increment('chain.completed', {
        chainType: runInfo.type
      });
      
      this.activeRuns.delete(runId);
    }
  }
  
  async onChainError(error: Error, runId: string): Promise<void> {
    const runInfo = this.activeRuns.get(runId);
    if (runInfo) {
      const executionTime = Date.now() - runInfo.startTime;
      
      // 记录错误追踪
      this.tracer.errorSpan(runId, error, { executionTime });
      
      // 记录错误指标
      this.metricsCollector.increment('chain.failed', {
        chainType: runInfo.type,
        errorType: error.constructor.name
      });
      
      this.metricsCollector.increment('chain.error_count');
      
      this.activeRuns.delete(runId);
    }
  }
  
  async onLLMStart(llm: BaseLanguageModel, prompts: string[], runId: string): Promise<void> {
    const startTime = Date.now();
    this.activeRuns.set(runId, { startTime, type: 'llm' });
    
    // 开始追踪
    this.tracer.startSpan(`llm.${llm.constructor.name}`, {
      runId,
      startTime,
      promptCount: prompts.length
    });
    
    // 记录指标
    this.metricsCollector.increment('llm.started', {
      modelType: llm.constructor.name
    });
  }
  
  async onLLMEnd(output: LLMResult, runId: string): Promise<void> {
    const runInfo = this.activeRuns.get(runId);
    if (runInfo) {
      const executionTime = Date.now() - runInfo.startTime;
      const tokenUsage = output.llmOutput?.tokenUsage;
      
      // 结束追踪
      this.tracer.endSpan(runId, { 
        executionTime,
        tokenUsage
      });
      
      // 记录性能指标
      this.metricsCollector.timing('llm.execution_time', executionTime, {
        modelType: runInfo.type
      });
      
      if (tokenUsage) {
        this.metricsCollector.increment('llm.total_tokens', tokenUsage.totalTokens || 0);
        this.metricsCollector.increment('llm.prompt_tokens', tokenUsage.promptTokens || 0);
        this.metricsCollector.increment('llm.completion_tokens', tokenUsage.completionTokens || 0);
      }
      
      this.metricsCollector.increment('llm.completed', {
        modelType: runInfo.type
      });
      
      this.activeRuns.delete(runId);
    }
  }
  
  async onLLMError(error: Error, runId: string): Promise<void> {
    const runInfo = this.activeRuns.get(runId);
    if (runInfo) {
      const executionTime = Date.now() - runInfo.startTime;
      
      // 记录错误追踪
      this.tracer.errorSpan(runId, error, { executionTime });
      
      // 记录错误指标
      this.metricsCollector.increment('llm.failed', {
        modelType: runInfo.type,
        errorType: error.constructor.name
      });
      
      this.metricsCollector.increment('llm.error_count');
      
      this.activeRuns.delete(runId);
    }
  }
}

LangSmith 集成

LangSmith 是 LangChain 官方提供的监控和评估平台,可以与回调机制无缝集成:

typescript
class LangSmithCallbackHandler {
  private client: LangSmithClient;
  private projectId: string;
  
  constructor(client: LangSmithClient, projectId: string) {
    this.client = client;
    this.projectId = projectId;
  }
  
  async onChainStart(chain: Runnable, inputs: any, runId: string): Promise<void> {
    try {
      await this.client.createRun({
        id: runId,
        name: chain.constructor.name,
        start_time: new Date().toISOString(),
        run_type: "chain",
        inputs,
        project_id: this.projectId
      });
    } catch (error) {
      console.warn('LangSmith 记录失败:', error);
    }
  }
  
  async onChainEnd(outputs: any, runId: string): Promise<void> {
    try {
      await this.client.updateRun(runId, {
        end_time: new Date().toISOString(),
        outputs,
        status: "success"
      });
    } catch (error) {
      console.warn('LangSmith 更新失败:', error);
    }
  }
  
  async onChainError(error: Error, runId: string): Promise<void> {
    try {
      await this.client.updateRun(runId, {
        end_time: new Date().toISOString(),
        error: {
          message: error.message,
          stack: error.stack
        },
        status: "error"
      });
    } catch (error) {
      console.warn('LangSmith 错误更新失败:', error);
    }
  }
  
  async onLLMStart(llm: BaseLanguageModel, prompts: string[], runId: string): Promise<void> {
    try {
      await this.client.createRun({
        id: runId,
        name: llm.constructor.name,
        start_time: new Date().toISOString(),
        run_type: "llm",
        inputs: { prompts },
        project_id: this.projectId
      });
    } catch (error) {
      console.warn('LangSmith LLM 记录失败:', error);
    }
  }
  
  async onLLMEnd(output: LLMResult, runId: string): Promise<void> {
    try {
      await this.client.updateRun(runId, {
        end_time: new Date().toISOString(),
        outputs: {
          generations: output.generations,
          llm_output: output.llmOutput
        },
        status: "success"
      });
    } catch (error) {
      console.warn('LangSmith LLM 更新失败:', error);
    }
  }
  
  async onLLMError(error: Error, runId: string): Promise<void> {
    try {
      await this.client.updateRun(runId, {
        end_time: new Date().toISOString(),
        error: {
          message: error.message,
          stack: error.stack
        },
        status: "error"
      });
    } catch (error) {
      console.warn('LangSmith LLM 错误更新失败:', error);
    }
  }
}

自定义业务监控

实现针对特定业务需求的监控回调:

typescript
interface BusinessMetrics {
  userSatisfaction?: number;
  cost?: number;
  accuracy?: number;
}

class BusinessMonitoringCallbackHandler {
  private businessMetricsCollector: BusinessMetricsCollector;
  private costCalculator: CostCalculator;
  
  constructor(
    businessMetricsCollector: BusinessMetricsCollector,
    costCalculator: CostCalculator
  ) {
    this.businessMetricsCollector = businessMetricsCollector;
    this.costCalculator = costCalculator;
  }
  
  async onChainEnd(outputs: any, runId: string): Promise<void> {
    // 计算业务指标
    const metrics = this.calculateBusinessMetrics(outputs);
    
    // 记录业务指标
    if (metrics.userSatisfaction !== undefined) {
      this.businessMetricsCollector.record('user_satisfaction', metrics.userSatisfaction);
    }
    
    if (metrics.cost !== undefined) {
      this.businessMetricsCollector.record('operation_cost', metrics.cost);
    }
    
    if (metrics.accuracy !== undefined) {
      this.businessMetricsCollector.record('answer_accuracy', metrics.accuracy);
    }
  }
  
  private calculateBusinessMetrics(outputs: any): BusinessMetrics {
    const metrics: BusinessMetrics = {};
    
    // 计算用户满意度(基于置信度)
    if (outputs.confidence !== undefined) {
      metrics.userSatisfaction = outputs.confidence;
    }
    
    // 计算成本(基于 token 使用)
    if (outputs.llmOutput?.tokenUsage) {
      metrics.cost = this.costCalculator.calculateCost(outputs.llmOutput.tokenUsage);
    }
    
    // 计算准确性(如果有标注数据)
    if (outputs.expectedAnswer && outputs.generatedAnswer) {
      metrics.accuracy = this.calculateAccuracy(
        outputs.expectedAnswer,
        outputs.generatedAnswer
      );
    }
    
    return metrics;
  }
  
  private calculateAccuracy(expected: string, generated: string): number {
    // 简化的准确性计算
    // 实际应用中可以使用更复杂的相似度算法
    const expectedWords = expected.toLowerCase().split(/\s+/);
    const generatedWords = generated.toLowerCase().split(/\s+/);
    
    const commonWords = expectedWords.filter(word => generatedWords.includes(word));
    return commonWords.length / Math.max(expectedWords.length, generatedWords.length);
  }
}

多回调管理器

实现一个回调管理器来协调多个回调处理器:

typescript
class CallbackManager {
  private handlers: Callbacks[];
  
  constructor(handlers: Callbacks[]) {
    this.handlers = handlers;
  }
  
  async onChainStart(chain: Runnable, inputs: any, runId: string): Promise<void> {
    await Promise.all(
      this.handlers.map(handler => 
        handler.onChainStart?.(chain, inputs, runId)
      )
    );
  }
  
  async onChainEnd(outputs: any, runId: string): Promise<void> {
    await Promise.all(
      this.handlers.map(handler => 
        handler.onChainEnd?.(outputs, runId)
      )
    );
  }
  
  async onChainError(error: Error, runId: string): Promise<void> {
    await Promise.all(
      this.handlers.map(handler => 
        handler.onChainError?.(error, runId)
      )
    );
  }
  
  // 其他回调方法的实现...
  
  static fromHandlers(...handlers: Callbacks[]): CallbackManager {
    return new CallbackManager(handlers);
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何在问答系统中集成多种监控和日志功能:

typescript
// 完整的监控问答系统
class MonitoredQASystem {
  private workflow: Runnable<string, string>;
  private callbackManager: CallbackManager;
  
  constructor(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel
  ) {
    this.workflow = this.buildWorkflow(retriever, llm);
    this.callbackManager = this.buildCallbackManager();
  }
  
  private buildWorkflow(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel
  ): Runnable<string, string> {
    const prompt = new PromptTemplate({
      template: `使用以下文档回答问题。如果文档中没有相关信息,请说明无法回答。

文档:
{context}

问题: {question}

答案:`,
      inputVariables: ["context", "question"]
    });
    
    return RunnableMap.from({
      question: (input: string) => input,
      context: retriever.pipe((docs: Document[]) => 
        docs.map(d => d.pageContent).join('\n\n')
      )
    })
    .pipe(prompt)
    .pipe(llm)
    .pipe(new StringOutputParser())
    .withRetry({ maxAttempts: 3 });
  }
  
  private buildCallbackManager(): CallbackManager {
    // 创建不同的回调处理器
    const loggingHandler = new LoggingCallbackHandler(console);
    const monitoringHandler = new MonitoringCallbackHandler(
      new MetricsCollector(),
      new Tracer()
    );
    
    // 如果配置了 LangSmith,则添加 LangSmith 处理器
    let langSmithHandler: LangSmithCallbackHandler | null = null;
    if (process.env.LANGSMITH_API_KEY) {
      const langSmithClient = new LangSmithClient({
        apiKey: process.env.LANGSMITH_API_KEY
      });
      langSmithHandler = new LangSmithCallbackHandler(
        langSmithClient,
        process.env.LANGSMITH_PROJECT_ID || 'default'
      );
    }
    
    // 创建业务监控处理器
    const businessHandler = new BusinessMonitoringCallbackHandler(
      new BusinessMetricsCollector(),
      new CostCalculator()
    );
    
    // 组合所有处理器
    const handlers = [loggingHandler, monitoringHandler, businessHandler];
    if (langSmithHandler) {
      handlers.push(langSmithHandler);
    }
    
    return CallbackManager.fromHandlers(...handlers);
  }
  
  async answerQuestion(question: string): Promise<string> {
    try {
      // 使用回调配置执行工作流
      const config: RunnableConfig = {
        callbacks: this.callbackManager,
        metadata: {
          userId: 'user123',
          sessionId: this.generateSessionId(),
          questionType: this.classifyQuestion(question)
        },
        tags: ['qa-system', 'production']
      };
      
      const answer = await this.workflow.invoke(question, config);
      return answer;
    } catch (error) {
      console.error('问答系统错误:', error);
      return "抱歉,处理您的问题时出现了错误。";
    }
  }
  
  private generateSessionId(): string {
    return Math.random().toString(36).substring(2, 15) + 
           Math.random().toString(36).substring(2, 15);
  }
  
  private classifyQuestion(question: string): string {
    const lowerQuestion = question.toLowerCase();
    if (lowerQuestion.includes('如何') || lowerQuestion.includes('怎么')) {
      return 'how-to';
    } else if (lowerQuestion.includes('什么') || lowerQuestion.includes('谁')) {
      return 'what';
    } else if (lowerQuestion.includes('为什么') || lowerQuestion.includes('原因')) {
      return 'why';
    } else {
      return 'other';
    }
  }
}

// 模拟的监控组件
class MetricsCollector {
  increment(metric: string, value: number = 1, tags: Record<string, string> = {}): void {
    console.log(`指标增加: ${metric} += ${value}`, tags);
  }
  
  timing(metric: string, value: number, tags: Record<string, string> = {}): void {
    console.log(`时间指标: ${metric} = ${value}ms`, tags);
  }
  
  record(metric: string, value: number, tags: Record<string, string> = {}): void {
    console.log(`记录指标: ${metric} = ${value}`, tags);
  }
}

class Tracer {
  startSpan(name: string, attributes: any = {}): void {
    console.log(`开始追踪: ${name}`, attributes);
  }
  
  endSpan(spanId: string, attributes: any = {}): void {
    console.log(`结束追踪: ${spanId}`, attributes);
  }
  
  errorSpan(spanId: string, error: Error, attributes: any = {}): void {
    console.log(`错误追踪: ${spanId}`, { ...attributes, error: error.message });
  }
}

class BusinessMetricsCollector {
  record(metric: string, value: number): void {
    console.log(`业务指标: ${metric} = ${value}`);
  }
}

class CostCalculator {
  calculateCost(tokenUsage: any): number {
    const promptTokens = tokenUsage.promptTokens || 0;
    const completionTokens = tokenUsage.completionTokens || 0;
    
    // 简化的成本计算(基于 GPT-3.5-turbo 的定价)
    const promptCost = (promptTokens / 1000) * 0.0015;
    const completionCost = (completionTokens / 1000) * 0.002;
    
    return promptCost + completionCost;
  }
}

class LangSmithClient {
  private apiKey: string;
  
  constructor(config: { apiKey: string }) {
    this.apiKey = config.apiKey;
  }
  
  async createRun(runData: any): Promise<void> {
    console.log('LangSmith 创建运行:', runData);
  }
  
  async updateRun(runId: string, updateData: any): Promise<void> {
    console.log('LangSmith 更新运行:', runId, updateData);
  }
}

// 使用示例
async function demonstrateMonitoredQA() {
  // 创建向量存储和检索器
  const vectorStore = new MemoryVectorStore();
  
  const sampleDocuments: Document[] = [
    {
      pageContent: "LangChain 是一个用于开发由语言模型驱动的应用程序的框架。它可以帮助开发者将语言模型与其他数据源和计算逻辑结合起来。",
      metadata: { source: "langchain-docs" }
    },
    {
      pageContent: "LCEL (LangChain Expression Language) 是 LangChain V3 中引入的表达式语言。它允许开发者使用管道操作符 (|) 来组合不同的组件。",
      metadata: { source: "lcel-docs" }
    }
  ];
  
  await vectorStore.addDocuments(sampleDocuments);
  
  const retriever = new VectorStoreRetriever({ vectorStore, k: 2 });
  
  // 创建监控问答系统
  const qaSystem = new MonitoredQASystem(
    retriever,
    new ChatOpenAI({ modelName: "gpt-3.5-turbo" })
  );
  
  console.log('=== 监控问答系统演示 ===\n');
  
  const answer = await qaSystem.answerQuestion("什么是 LangChain?");
  console.log('问题: 什么是 LangChain?');
  console.log('答案:', answer);
  console.log();
}

总结

通过 RunnableConfig.callbacks 注入机制,LangChain V3 提供了强大的日志和监控能力:

  1. 多层监控 - 支持链、LLM、工具等不同层级的监控
  2. 性能指标 - 收集执行时间、token 使用量等性能数据
  3. 错误追踪 - 完整的错误信息记录和追踪
  4. 业务指标 - 可以集成业务相关的监控指标
  5. 平台集成 - 支持 LangSmith 等监控平台集成
  6. 灵活配置 - 可以根据需要启用不同的监控组件

这种回调机制使得开发者能够全面了解应用的运行状态,及时发现和解决问题,为构建生产级 LLM 应用提供了坚实的基础。

在下一章中,我们将探讨批处理优化:.batch() 如何并行化请求,了解如何通过批量处理提高系统性能。