OutputParser 错误处理:.parse() 成为 .invoke() 的一部分,错误时抛出 OutputParserException
在上一章中,我们探讨了 OutputParser 如何将 LLM 的文本输出转换为结构化数据。本章将深入探讨 OutputParser 的错误处理机制,特别是 .parse() 方法如何成为 .invoke() 的一部分,以及在解析失败时如何抛出 OutputParserException。
OutputParserException 的设计
OutputParserException 是 LangChain V3 中专门用于处理输出解析错误的异常类型:
typescript
class OutputParserException extends BaseLLMError {
constructor(
message: string,
public llmInput?: string,
public originalError?: Error
) {
super(message, llmInput, originalError);
this.name = 'OutputParserException';
}
}这种设计有几个重要优势:
- 上下文信息 - 保留原始输入和原始错误信息
- 类型区分 - 可以通过 instanceof 进行类型检查
- 继承体系 - 继承自 BaseLLMError,保持错误处理的一致性
- 调试友好 - 提供丰富的调试信息
.parse() 与 .invoke() 的关系
在 LangChain V3 中,.parse() 方法成为 .invoke() 的核心实现:
typescript
abstract class BaseOutputParser<T> implements Runnable<string, T> {
// 核心解析方法
abstract parse(text: string): Promise<T> | T;
// Runnable 接口的实现
async invoke(input: string, _options?: RunnableConfig): Promise<T> {
try {
return await this.parse(input);
} catch (error) {
// 将所有解析错误包装为 OutputParserException
if (error instanceof OutputParserException) {
throw error; // 如果已经是 OutputParserException,直接抛出
} else {
throw new OutputParserException(
`输出解析失败: ${error.message}`,
input,
error
);
}
}
}
// 批量处理
async batch(inputs: string[], _options?: RunnableConfig | RunnableConfig[]): Promise<T[]> {
return await Promise.all(inputs.map(input => this.invoke(input)));
}
// 流式处理
async *stream(input: string, _options?: RunnableConfig): AsyncGenerator<T> {
const result = await this.invoke(input);
yield result;
}
}JsonOutputParser 的错误处理实现
让我们详细看看 JsonOutputParser 如何处理各种错误情况:
typescript
class JsonOutputParser<T extends Record<string, any> = Record<string, any>>
extends BaseOutputParser<T> {
private schema?: any;
constructor(schema?: any) {
super();
this.schema = schema;
}
async parse(text: string): Promise<T> {
// 步骤1: 提取 JSON 内容
const jsonText = this.extractJson(text);
// 步骤2: 解析 JSON
let parsed: any;
try {
parsed = JSON.parse(jsonText);
} catch (parseError) {
throw new OutputParserException(
`JSON 解析失败: ${parseError.message}. 原始文本: ${text.substring(0, 100)}...`,
text,
parseError
);
}
// 步骤3: 验证 schema(如果提供了 schema)
if (this.schema) {
try {
return this.validateWithSchema(parsed, jsonText);
} catch (validationError) {
throw new OutputParserException(
`JSON 验证失败: ${validationError.message}. 解析结果: ${JSON.stringify(parsed)}`,
text,
validationError
);
}
}
return parsed;
}
private extractJson(text: string): string {
// 尝试从代码块中提取 JSON
const codeBlockMatch = text.match(/```(?:json)?\s*([\s\S]*?)\s*```/i);
if (codeBlockMatch) {
return codeBlockMatch[1];
}
// 尝试提取 JSON 对象或数组
const jsonObjectMatch = text.match(/(\{[\s\S]*\})/);
const jsonArrayMatch = text.match(/($$\s*[\s\S]*\s*$$)/);
if (jsonObjectMatch) {
return jsonObjectMatch[1];
}
if (jsonArrayMatch) {
return jsonArrayMatch[1];
}
// 如果没有找到明确的 JSON 结构,返回原文本
return text.trim();
}
private validateWithSchema(data: any, originalText: string): T {
try {
// 如果使用 Zod 进行验证
if (this.schema && typeof this.schema.parse === 'function') {
return this.schema.parse(data);
}
// 如果使用自定义验证逻辑
if (typeof this.schema === 'function') {
return this.schema(data);
}
// 如果没有有效的验证器,返回数据
return data;
} catch (error) {
throw new OutputParserException(
`Schema 验证失败: ${error.message}`,
originalText,
error
);
}
}
}多层错误处理示例
让我们看一个复杂的错误处理示例,展示多层嵌套的错误处理:
typescript
// 定义数据结构
interface ComplexData {
id: string;
metadata: {
createdAt: string;
updatedAt: string;
tags: string[];
};
content: {
title: string;
body: string;
references: Array<{
id: string;
url: string;
title: string;
}>;
};
}
// 使用 Zod 定义 schema
import { z } from 'zod';
const ComplexDataSchema = z.object({
id: z.string().uuid(),
metadata: z.object({
createdAt: z.string().datetime(),
updatedAt: z.string().datetime(),
tags: z.array(z.string()).min(1)
}),
content: z.object({
title: z.string().min(1).max(200),
body: z.string().min(1),
references: z.array(z.object({
id: z.string().uuid(),
url: z.string().url(),
title: z.string().min(1)
}))
})
});
// 创建带有详细错误处理的解析器
class DetailedJsonOutputParser<T> extends BaseOutputParser<T> {
private schema: any;
constructor(schema: any) {
super();
this.schema = schema;
}
async parse(text: string): Promise<T> {
console.log('开始解析输出:', text.substring(0, 50) + '...');
// 第一层:提取 JSON
const jsonText = this.extractJsonWithLogging(text);
// 第二层:解析 JSON
const parsedData = this.parseJsonWithLogging(jsonText, text);
// 第三层:验证 schema
const validatedData = this.validateWithLogging(parsedData, jsonText);
console.log('解析完成');
return validatedData;
}
private extractJsonWithLogging(text: string): string {
try {
console.log('提取 JSON 内容...');
const result = this.extractJson(text);
console.log('JSON 提取成功');
return result;
} catch (error) {
console.error('JSON 提取失败:', error.message);
throw new OutputParserException(
`无法从输出中提取 JSON: ${error.message}`,
text,
error
);
}
}
private parseJsonWithLogging(jsonText: string, originalText: string): any {
try {
console.log('解析 JSON...');
const result = JSON.parse(jsonText);
console.log('JSON 解析成功');
return result;
} catch (error) {
console.error('JSON 解析失败:', error.message);
throw new OutputParserException(
`JSON 格式错误: ${error.message}. 提取的文本: ${jsonText.substring(0, 100)}...`,
originalText,
error
);
}
}
private validateWithLogging(data: any, jsonText: string): T {
try {
console.log('验证数据 schema...');
const result = this.schema.parse(data);
console.log('Schema 验证成功');
return result;
} catch (error) {
console.error('Schema 验证失败:', error.message);
// 提供更详细的错误信息
const errorDetails = {
validationErrors: error.errors || [error.message],
parsedData: data,
jsonData: jsonText
};
throw new OutputParserException(
`数据验证失败: ${JSON.stringify(errorDetails)}`,
jsonText,
error
);
}
}
private extractJson(text: string): string {
// 实现 JSON 提取逻辑
const codeBlockMatch = text.match(/```(?:json)?\s*([\s\S]*?)\s*```/i);
if (codeBlockMatch) {
return codeBlockMatch[1];
}
const jsonMatch = text.match(/(\{[\s\S]*\}|\[[\s\S]*\])/);
if (jsonMatch) {
return jsonMatch[1];
}
return text.trim();
}
}错误恢复和重试机制
OutputParser 还可以实现错误恢复和重试机制:
typescript
class ResilientOutputParser<T> extends BaseOutputParser<T> {
private parser: BaseOutputParser<T>;
private maxRetries: number;
private retryDelay: number;
constructor(
parser: BaseOutputParser<T>,
maxRetries: number = 3,
retryDelay: number = 1000
) {
super();
this.parser = parser;
this.maxRetries = maxRetries;
this.retryDelay = retryDelay;
}
async parse(text: string): Promise<T> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
console.log(`解析尝试 ${attempt}/${this.maxRetries}`);
const result = await this.parser.parse(text);
console.log(`解析成功,第 ${attempt} 次尝试`);
return result;
} catch (error) {
lastError = error;
console.warn(`解析失败 (尝试 ${attempt}):`, error.message);
if (attempt < this.maxRetries) {
console.log(`等待 ${this.retryDelay}ms 后重试...`);
await new Promise(resolve => setTimeout(resolve, this.retryDelay));
}
}
}
// 所有重试都失败了
throw new OutputParserException(
`在 ${this.maxRetries} 次尝试后解析仍然失败: ${lastError?.message}`,
text,
lastError || undefined
);
}
}实际应用中的错误处理
让我们看一个实际应用示例,展示如何在应用中处理 OutputParser 错误:
typescript
// 创建一个智能问答系统
class QASystem {
private llm: BaseChatModel;
private parser: BaseOutputParser<Answer>;
constructor(llm: BaseChatModel) {
this.llm = llm;
// 定义答案结构
const AnswerSchema = z.object({
answer: z.string().min(1),
confidence: z.number().min(0).max(1),
sources: z.array(z.string().url()).optional(),
followUpQuestions: z.array(z.string()).optional()
});
this.parser = new DetailedJsonOutputParser(AnswerSchema);
}
async answerQuestion(question: string): Promise<Answer | null> {
try {
const prompt = new PromptTemplate({
template: `请回答以下问题并以指定的 JSON 格式返回:
问题: {question}
请严格按照以下格式返回 JSON:
{
"answer": "你的答案",
"confidence": 0.95,
"sources": ["https://example.com"],
"followUpQuestions": ["相关问题1", "相关问题2"]
}`,
inputVariables: ["question"]
});
const chain = prompt.pipe(this.llm).pipe(this.parser);
const result = await chain.invoke({ question });
console.log('问题回答成功:', result);
return result;
} catch (error) {
if (error instanceof OutputParserException) {
console.error('输出解析失败:', error.message);
console.error('原始输入:', error.llmInput);
// 记录详细错误信息用于调试
this.logErrorDetails(error);
// 尝试恢复或返回默认值
return this.recoverFromParseError(question, error);
} else {
console.error('未预期的错误:', error);
throw error;
}
}
}
private logErrorDetails(error: OutputParserException): void {
const errorInfo = {
message: error.message,
name: error.name,
stack: error.stack,
llmInput: error.llmInput,
originalError: error.originalError ? {
name: error.originalError.name,
message: error.originalError.message,
stack: error.originalError.stack
} : null
};
// 在生产环境中,这应该发送到日志系统
console.error('详细错误信息:', JSON.stringify(errorInfo, null, 2));
}
private async recoverFromParseError(
question: string,
error: OutputParserException
): Promise<Answer | null> {
try {
// 尝试使用更简单的解析器
console.log('尝试使用简单解析器恢复...');
const simpleParser = new JsonOutputParser();
const simpleResult = await simpleParser.parse(error.llmInput || '');
// 尝试映射到期望的格式
if (typeof simpleResult === 'object' && simpleResult !== null) {
return {
answer: simpleResult.answer || simpleResult.response || '无法生成答案',
confidence: simpleResult.confidence || 0.5,
sources: Array.isArray(simpleResult.sources) ? simpleResult.sources : [],
followUpQuestions: Array.isArray(simpleResult.followUpQuestions)
? simpleResult.followUpQuestions
: []
} as Answer;
}
} catch (recoveryError) {
console.warn('恢复尝试失败:', recoveryError.message);
}
// 如果恢复失败,返回 null 或默认答案
return null;
}
}
interface Answer {
answer: string;
confidence: number;
sources?: string[];
followUpQuestions?: string[];
}
// 使用示例
async function demonstrateErrorHandling() {
const qaSystem = new QASystem(new ChatOpenAI());
try {
const answer = await qaSystem.answerQuestion("什么是量子计算?");
if (answer) {
console.log('答案:', answer);
} else {
console.log('无法获取答案');
}
} catch (error) {
console.error('系统错误:', error);
}
}与监控和日志系统的集成
OutputParserException 可以很好地与监控和日志系统集成:
typescript
class MonitoredOutputParser<T> extends BaseOutputParser<T> {
private parser: BaseOutputParser<T>;
private logger: Logger;
private metrics: MetricsCollector;
constructor(
parser: BaseOutputParser<T>,
logger: Logger,
metrics: MetricsCollector
) {
super();
this.parser = parser;
this.logger = logger;
this.metrics = metrics;
}
async parse(text: string): Promise<T> {
const startTime = Date.now();
let success = false;
try {
const result = await this.parser.parse(text);
success = true;
return result;
} catch (error) {
// 记录解析错误
this.logger.error('OutputParser 错误', {
error: error.message,
name: error.name,
llmInput: error.llmInput ? error.llmInput.substring(0, 200) : undefined,
stack: error.stack,
timestamp: new Date().toISOString()
});
// 收集指标
this.metrics.increment('output_parser.errors');
if (error instanceof OutputParserException) {
this.metrics.increment(`output_parser.errors.${error.name}`);
}
throw error;
} finally {
const duration = Date.now() - startTime;
this.metrics.timing('output_parser.duration', duration);
if (success) {
this.metrics.increment('output_parser.success');
}
}
}
}总结
OutputParser 的错误处理机制通过将 .parse() 方法作为 .invoke() 的核心实现,提供了一套完整且一致的错误处理方案:
- 统一异常类型 - 使用
OutputParserException统一处理所有解析错误 - 丰富的上下文信息 - 保留原始输入和原始错误信息用于调试
- 层级化处理 - 支持多层嵌套的错误处理和恢复机制
- 可扩展性 - 支持重试、恢复和监控等高级功能
- 与生态系统集成 - 可以很好地与日志和监控系统集成
这种设计使得 LangChain V3 的 OutputParser 不仅功能强大,而且具有很好的可维护性和可观察性,为构建生产级 LLM 应用提供了坚实的基础。
在下一章中,我们将探讨流式解析:transform() 方法如何处理增量文本,深入了解实时数据处理的机制。