Skip to content

OutputParser 错误处理:.parse() 成为 .invoke() 的一部分,错误时抛出 OutputParserException

在上一章中,我们探讨了 OutputParser 如何将 LLM 的文本输出转换为结构化数据。本章将深入探讨 OutputParser 的错误处理机制,特别是 .parse() 方法如何成为 .invoke() 的一部分,以及在解析失败时如何抛出 OutputParserException

OutputParserException 的设计

OutputParserException 是 LangChain V3 中专门用于处理输出解析错误的异常类型:

typescript
class OutputParserException extends BaseLLMError {
  constructor(
    message: string,
    public llmInput?: string,
    public originalError?: Error
  ) {
    super(message, llmInput, originalError);
    this.name = 'OutputParserException';
  }
}

这种设计有几个重要优势:

  1. 上下文信息 - 保留原始输入和原始错误信息
  2. 类型区分 - 可以通过 instanceof 进行类型检查
  3. 继承体系 - 继承自 BaseLLMError,保持错误处理的一致性
  4. 调试友好 - 提供丰富的调试信息

.parse() 与 .invoke() 的关系

在 LangChain V3 中,.parse() 方法成为 .invoke() 的核心实现:

typescript
abstract class BaseOutputParser<T> implements Runnable<string, T> {
  // 核心解析方法
  abstract parse(text: string): Promise<T> | T;
  
  // Runnable 接口的实现
  async invoke(input: string, _options?: RunnableConfig): Promise<T> {
    try {
      return await this.parse(input);
    } catch (error) {
      // 将所有解析错误包装为 OutputParserException
      if (error instanceof OutputParserException) {
        throw error; // 如果已经是 OutputParserException,直接抛出
      } else {
        throw new OutputParserException(
          `输出解析失败: ${error.message}`,
          input,
          error
        );
      }
    }
  }
  
  // 批量处理
  async batch(inputs: string[], _options?: RunnableConfig | RunnableConfig[]): Promise<T[]> {
    return await Promise.all(inputs.map(input => this.invoke(input)));
  }
  
  // 流式处理
  async *stream(input: string, _options?: RunnableConfig): AsyncGenerator<T> {
    const result = await this.invoke(input);
    yield result;
  }
}

JsonOutputParser 的错误处理实现

让我们详细看看 JsonOutputParser 如何处理各种错误情况:

typescript
class JsonOutputParser<T extends Record<string, any> = Record<string, any>> 
  extends BaseOutputParser<T> {
  
  private schema?: any;
  
  constructor(schema?: any) {
    super();
    this.schema = schema;
  }
  
  async parse(text: string): Promise<T> {
    // 步骤1: 提取 JSON 内容
    const jsonText = this.extractJson(text);
    
    // 步骤2: 解析 JSON
    let parsed: any;
    try {
      parsed = JSON.parse(jsonText);
    } catch (parseError) {
      throw new OutputParserException(
        `JSON 解析失败: ${parseError.message}. 原始文本: ${text.substring(0, 100)}...`,
        text,
        parseError
      );
    }
    
    // 步骤3: 验证 schema(如果提供了 schema)
    if (this.schema) {
      try {
        return this.validateWithSchema(parsed, jsonText);
      } catch (validationError) {
        throw new OutputParserException(
          `JSON 验证失败: ${validationError.message}. 解析结果: ${JSON.stringify(parsed)}`,
          text,
          validationError
        );
      }
    }
    
    return parsed;
  }
  
  private extractJson(text: string): string {
    // 尝试从代码块中提取 JSON
    const codeBlockMatch = text.match(/```(?:json)?\s*([\s\S]*?)\s*```/i);
    if (codeBlockMatch) {
      return codeBlockMatch[1];
    }
    
    // 尝试提取 JSON 对象或数组
    const jsonObjectMatch = text.match(/(\{[\s\S]*\})/);
    const jsonArrayMatch = text.match(/($$\s*[\s\S]*\s*$$)/);
    
    if (jsonObjectMatch) {
      return jsonObjectMatch[1];
    }
    
    if (jsonArrayMatch) {
      return jsonArrayMatch[1];
    }
    
    // 如果没有找到明确的 JSON 结构,返回原文本
    return text.trim();
  }
  
  private validateWithSchema(data: any, originalText: string): T {
    try {
      // 如果使用 Zod 进行验证
      if (this.schema && typeof this.schema.parse === 'function') {
        return this.schema.parse(data);
      }
      
      // 如果使用自定义验证逻辑
      if (typeof this.schema === 'function') {
        return this.schema(data);
      }
      
      // 如果没有有效的验证器,返回数据
      return data;
    } catch (error) {
      throw new OutputParserException(
        `Schema 验证失败: ${error.message}`,
        originalText,
        error
      );
    }
  }
}

多层错误处理示例

让我们看一个复杂的错误处理示例,展示多层嵌套的错误处理:

typescript
// 定义数据结构
interface ComplexData {
  id: string;
  metadata: {
    createdAt: string;
    updatedAt: string;
    tags: string[];
  };
  content: {
    title: string;
    body: string;
    references: Array<{
      id: string;
      url: string;
      title: string;
    }>;
  };
}

// 使用 Zod 定义 schema
import { z } from 'zod';

const ComplexDataSchema = z.object({
  id: z.string().uuid(),
  metadata: z.object({
    createdAt: z.string().datetime(),
    updatedAt: z.string().datetime(),
    tags: z.array(z.string()).min(1)
  }),
  content: z.object({
    title: z.string().min(1).max(200),
    body: z.string().min(1),
    references: z.array(z.object({
      id: z.string().uuid(),
      url: z.string().url(),
      title: z.string().min(1)
    }))
  })
});

// 创建带有详细错误处理的解析器
class DetailedJsonOutputParser<T> extends BaseOutputParser<T> {
  private schema: any;
  
  constructor(schema: any) {
    super();
    this.schema = schema;
  }
  
  async parse(text: string): Promise<T> {
    console.log('开始解析输出:', text.substring(0, 50) + '...');
    
    // 第一层:提取 JSON
    const jsonText = this.extractJsonWithLogging(text);
    
    // 第二层:解析 JSON
    const parsedData = this.parseJsonWithLogging(jsonText, text);
    
    // 第三层:验证 schema
    const validatedData = this.validateWithLogging(parsedData, jsonText);
    
    console.log('解析完成');
    return validatedData;
  }
  
  private extractJsonWithLogging(text: string): string {
    try {
      console.log('提取 JSON 内容...');
      const result = this.extractJson(text);
      console.log('JSON 提取成功');
      return result;
    } catch (error) {
      console.error('JSON 提取失败:', error.message);
      throw new OutputParserException(
        `无法从输出中提取 JSON: ${error.message}`,
        text,
        error
      );
    }
  }
  
  private parseJsonWithLogging(jsonText: string, originalText: string): any {
    try {
      console.log('解析 JSON...');
      const result = JSON.parse(jsonText);
      console.log('JSON 解析成功');
      return result;
    } catch (error) {
      console.error('JSON 解析失败:', error.message);
      throw new OutputParserException(
        `JSON 格式错误: ${error.message}. 提取的文本: ${jsonText.substring(0, 100)}...`,
        originalText,
        error
      );
    }
  }
  
  private validateWithLogging(data: any, jsonText: string): T {
    try {
      console.log('验证数据 schema...');
      const result = this.schema.parse(data);
      console.log('Schema 验证成功');
      return result;
    } catch (error) {
      console.error('Schema 验证失败:', error.message);
      
      // 提供更详细的错误信息
      const errorDetails = {
        validationErrors: error.errors || [error.message],
        parsedData: data,
        jsonData: jsonText
      };
      
      throw new OutputParserException(
        `数据验证失败: ${JSON.stringify(errorDetails)}`,
        jsonText,
        error
      );
    }
  }
  
  private extractJson(text: string): string {
    // 实现 JSON 提取逻辑
    const codeBlockMatch = text.match(/```(?:json)?\s*([\s\S]*?)\s*```/i);
    if (codeBlockMatch) {
      return codeBlockMatch[1];
    }
    
    const jsonMatch = text.match(/(\{[\s\S]*\}|\[[\s\S]*\])/);
    if (jsonMatch) {
      return jsonMatch[1];
    }
    
    return text.trim();
  }
}

错误恢复和重试机制

OutputParser 还可以实现错误恢复和重试机制:

typescript
class ResilientOutputParser<T> extends BaseOutputParser<T> {
  private parser: BaseOutputParser<T>;
  private maxRetries: number;
  private retryDelay: number;
  
  constructor(
    parser: BaseOutputParser<T>,
    maxRetries: number = 3,
    retryDelay: number = 1000
  ) {
    super();
    this.parser = parser;
    this.maxRetries = maxRetries;
    this.retryDelay = retryDelay;
  }
  
  async parse(text: string): Promise<T> {
    let lastError: Error | null = null;
    
    for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
      try {
        console.log(`解析尝试 ${attempt}/${this.maxRetries}`);
        const result = await this.parser.parse(text);
        console.log(`解析成功,第 ${attempt} 次尝试`);
        return result;
      } catch (error) {
        lastError = error;
        console.warn(`解析失败 (尝试 ${attempt}):`, error.message);
        
        if (attempt < this.maxRetries) {
          console.log(`等待 ${this.retryDelay}ms 后重试...`);
          await new Promise(resolve => setTimeout(resolve, this.retryDelay));
        }
      }
    }
    
    // 所有重试都失败了
    throw new OutputParserException(
      `在 ${this.maxRetries} 次尝试后解析仍然失败: ${lastError?.message}`,
      text,
      lastError || undefined
    );
  }
}

实际应用中的错误处理

让我们看一个实际应用示例,展示如何在应用中处理 OutputParser 错误:

typescript
// 创建一个智能问答系统
class QASystem {
  private llm: BaseChatModel;
  private parser: BaseOutputParser<Answer>;
  
  constructor(llm: BaseChatModel) {
    this.llm = llm;
    
    // 定义答案结构
    const AnswerSchema = z.object({
      answer: z.string().min(1),
      confidence: z.number().min(0).max(1),
      sources: z.array(z.string().url()).optional(),
      followUpQuestions: z.array(z.string()).optional()
    });
    
    this.parser = new DetailedJsonOutputParser(AnswerSchema);
  }
  
  async answerQuestion(question: string): Promise<Answer | null> {
    try {
      const prompt = new PromptTemplate({
        template: `请回答以下问题并以指定的 JSON 格式返回:
问题: {question}

请严格按照以下格式返回 JSON:
{
  "answer": "你的答案",
  "confidence": 0.95,
  "sources": ["https://example.com"],
  "followUpQuestions": ["相关问题1", "相关问题2"]
}`,
        inputVariables: ["question"]
      });
      
      const chain = prompt.pipe(this.llm).pipe(this.parser);
      const result = await chain.invoke({ question });
      
      console.log('问题回答成功:', result);
      return result;
      
    } catch (error) {
      if (error instanceof OutputParserException) {
        console.error('输出解析失败:', error.message);
        console.error('原始输入:', error.llmInput);
        
        // 记录详细错误信息用于调试
        this.logErrorDetails(error);
        
        // 尝试恢复或返回默认值
        return this.recoverFromParseError(question, error);
      } else {
        console.error('未预期的错误:', error);
        throw error;
      }
    }
  }
  
  private logErrorDetails(error: OutputParserException): void {
    const errorInfo = {
      message: error.message,
      name: error.name,
      stack: error.stack,
      llmInput: error.llmInput,
      originalError: error.originalError ? {
        name: error.originalError.name,
        message: error.originalError.message,
        stack: error.originalError.stack
      } : null
    };
    
    // 在生产环境中,这应该发送到日志系统
    console.error('详细错误信息:', JSON.stringify(errorInfo, null, 2));
  }
  
  private async recoverFromParseError(
    question: string, 
    error: OutputParserException
  ): Promise<Answer | null> {
    try {
      // 尝试使用更简单的解析器
      console.log('尝试使用简单解析器恢复...');
      const simpleParser = new JsonOutputParser();
      const simpleResult = await simpleParser.parse(error.llmInput || '');
      
      // 尝试映射到期望的格式
      if (typeof simpleResult === 'object' && simpleResult !== null) {
        return {
          answer: simpleResult.answer || simpleResult.response || '无法生成答案',
          confidence: simpleResult.confidence || 0.5,
          sources: Array.isArray(simpleResult.sources) ? simpleResult.sources : [],
          followUpQuestions: Array.isArray(simpleResult.followUpQuestions) 
            ? simpleResult.followUpQuestions 
            : []
        } as Answer;
      }
    } catch (recoveryError) {
      console.warn('恢复尝试失败:', recoveryError.message);
    }
    
    // 如果恢复失败,返回 null 或默认答案
    return null;
  }
}

interface Answer {
  answer: string;
  confidence: number;
  sources?: string[];
  followUpQuestions?: string[];
}

// 使用示例
async function demonstrateErrorHandling() {
  const qaSystem = new QASystem(new ChatOpenAI());
  
  try {
    const answer = await qaSystem.answerQuestion("什么是量子计算?");
    if (answer) {
      console.log('答案:', answer);
    } else {
      console.log('无法获取答案');
    }
  } catch (error) {
    console.error('系统错误:', error);
  }
}

与监控和日志系统的集成

OutputParserException 可以很好地与监控和日志系统集成:

typescript
class MonitoredOutputParser<T> extends BaseOutputParser<T> {
  private parser: BaseOutputParser<T>;
  private logger: Logger;
  private metrics: MetricsCollector;
  
  constructor(
    parser: BaseOutputParser<T>,
    logger: Logger,
    metrics: MetricsCollector
  ) {
    super();
    this.parser = parser;
    this.logger = logger;
    this.metrics = metrics;
  }
  
  async parse(text: string): Promise<T> {
    const startTime = Date.now();
    let success = false;
    
    try {
      const result = await this.parser.parse(text);
      success = true;
      return result;
    } catch (error) {
      // 记录解析错误
      this.logger.error('OutputParser 错误', {
        error: error.message,
        name: error.name,
        llmInput: error.llmInput ? error.llmInput.substring(0, 200) : undefined,
        stack: error.stack,
        timestamp: new Date().toISOString()
      });
      
      // 收集指标
      this.metrics.increment('output_parser.errors');
      
      if (error instanceof OutputParserException) {
        this.metrics.increment(`output_parser.errors.${error.name}`);
      }
      
      throw error;
    } finally {
      const duration = Date.now() - startTime;
      this.metrics.timing('output_parser.duration', duration);
      
      if (success) {
        this.metrics.increment('output_parser.success');
      }
    }
  }
}

总结

OutputParser 的错误处理机制通过将 .parse() 方法作为 .invoke() 的核心实现,提供了一套完整且一致的错误处理方案:

  1. 统一异常类型 - 使用 OutputParserException 统一处理所有解析错误
  2. 丰富的上下文信息 - 保留原始输入和原始错误信息用于调试
  3. 层级化处理 - 支持多层嵌套的错误处理和恢复机制
  4. 可扩展性 - 支持重试、恢复和监控等高级功能
  5. 与生态系统集成 - 可以很好地与日志和监控系统集成

这种设计使得 LangChain V3 的 OutputParser 不仅功能强大,而且具有很好的可维护性和可观察性,为构建生产级 LLM 应用提供了坚实的基础。

在下一章中,我们将探讨流式解析:transform() 方法如何处理增量文本,深入了解实时数据处理的机制。