流式解析:transform() 方法如何处理增量文本?
在现代 LLM 应用中,流式处理是一个关键特性,它允许用户在生成过程中逐步看到结果。然而,流式处理不仅仅是逐个输出 token,还需要能够流式解析增量文本。LangChain V3 通过 transform() 方法实现了这一功能,使得 OutputParser 能够处理逐步到达的文本片段。本章将深入探讨流式解析的机制和实现。
流式解析的重要性
流式解析在 LLM 应用中有以下几个重要作用:
- 实时反馈 - 用户可以立即看到部分解析结果
- 内存效率 - 不需要等待完整输出就可以开始处理
- 用户体验 - 提供更流畅的交互体验
- 错误检测 - 可以在早期检测和处理解析错误
Transform 方法的设计
在 LangChain V3 中,transform() 方法是实现流式解析的关键:
typescript
abstract class BaseOutputParser<T> implements Runnable<string, T> {
// 传统的完整解析方法
abstract parse(text: string): Promise<T> | T;
// 流式解析方法
async *transform(
inputGenerator: AsyncGenerator<string>
): AsyncGenerator<T> {
let accumulatedText = '';
// 逐步累积文本
for await (const chunk of inputGenerator) {
accumulatedText += chunk;
try {
// 尝试解析累积的文本
const result = await this.parse(accumulatedText);
yield result;
} catch (error) {
// 如果解析失败,继续累积更多文本
if (!(error instanceof OutputParserException)) {
throw error;
}
// 解析失败时继续等待更多内容
continue;
}
}
// 最后确保完整解析
try {
const finalResult = await this.parse(accumulatedText);
yield finalResult;
} catch (error) {
throw new OutputParserException(
`流式解析最终失败: ${error.message}`,
accumulatedText,
error
);
}
}
// Runnable 接口实现
async invoke(input: string): Promise<T> {
return await this.parse(input);
}
}JsonOutputParser 的流式实现
让我们详细看看 JsonOutputParser 如何实现流式解析:
typescript
class JsonOutputParser<T extends Record<string, any> = Record<string, any>>
extends BaseOutputParser<T> {
private schema?: any;
private partialParsing: boolean = true;
constructor(schema?: any, options?: { partialParsing?: boolean }) {
super();
this.schema = schema;
this.partialParsing = options?.partialParsing ?? true;
}
async parse(text: string): Promise<T> {
const jsonText = this.extractJson(text);
try {
const parsed = JSON.parse(jsonText);
if (this.schema) {
return this.validate(parsed);
}
return parsed;
} catch (error) {
throw new OutputParserException(
`JSON 解析失败: ${error.message}`,
text,
error
);
}
}
async *transform(
inputGenerator: AsyncGenerator<string>
): AsyncGenerator<T> {
let accumulatedText = '';
let lastYielded: T | null = null;
let hasYieldedValid = false;
for await (const chunk of inputGenerator) {
accumulatedText += chunk;
try {
// 尝试提取和解析 JSON
const jsonText = this.extractJson(accumulatedText);
if (jsonText) {
// 尝试解析 JSON
const parsed = JSON.parse(jsonText);
// 验证(如果提供了 schema)
let validated: T;
if (this.schema) {
validated = this.validate(parsed);
} else {
validated = parsed;
}
// 只有当新结果与上次不同时才 yield
if (!this.deepEquals(lastYielded, validated)) {
lastYielded = validated;
hasYieldedValid = true;
yield validated;
}
}
} catch (error) {
// JSON 不完整或格式错误,继续累积
if (error instanceof SyntaxError) {
continue;
}
// 其他错误抛出
if (!(error instanceof OutputParserException)) {
throw new OutputParserException(
`流式解析错误: ${error.message}`,
accumulatedText,
error
);
}
}
}
// 确保最终输出
if (!hasYieldedValid && accumulatedText) {
try {
const finalResult = await this.parse(accumulatedText);
yield finalResult;
} catch (error) {
throw new OutputParserException(
`流式解析最终失败: ${error.message}`,
accumulatedText,
error
);
}
}
}
private extractJson(text: string): string {
// 查找 JSON 对象或数组
const objectMatch = text.match(/(\{[^]*\})/);
const arrayMatch = text.match(/($$[^]*\$$)/);
// 优先返回更完整的匹配
if (objectMatch && arrayMatch) {
return objectMatch[0].length > arrayMatch[0].length ? objectMatch[0] : arrayMatch[0];
}
if (objectMatch) return objectMatch[0];
if (arrayMatch) return arrayMatch[0];
return '';
}
private validate(data: any): T {
if (this.schema && typeof this.schema.parse === 'function') {
try {
return this.schema.parse(data);
} catch (error) {
throw new OutputParserException(
`JSON 验证失败: ${error.message}`,
JSON.stringify(data),
error
);
}
}
return data;
}
private deepEquals(a: any, b: any): boolean {
if (a === b) return true;
if (a == null || b == null) return a === b;
if (typeof a !== 'object' || typeof b !== 'object') return a === b;
const keysA = Object.keys(a);
const keysB = Object.keys(b);
if (keysA.length !== keysB.length) return false;
for (const key of keysA) {
if (!keysB.includes(key) || !this.deepEquals(a[key], b[key])) {
return false;
}
}
return true;
}
}流式解析的实际应用
让我们看一个完整的实际应用示例,展示如何使用流式解析:
typescript
// 创建一个流式问答系统
class StreamingQASystem {
private llm: BaseChatModel;
private parser: JsonOutputParser<Answer>;
constructor(llm: BaseChatModel) {
this.llm = llm;
// 定义答案结构
const AnswerSchema = {
parse: (data: any) => {
if (typeof data !== 'object' || data === null) {
throw new Error('数据必须是对象');
}
return {
answer: typeof data.answer === 'string' ? data.answer : '',
confidence: typeof data.confidence === 'number' ? data.confidence : 0,
sources: Array.isArray(data.sources) ? data.sources : [],
followUpQuestions: Array.isArray(data.followUpQuestions)
? data.followUpQuestions
: []
};
}
};
this.parser = new JsonOutputParser<Answer>(AnswerSchema);
}
async *streamAnswer(question: string): AsyncGenerator<Partial<Answer>> {
try {
// 创建提示模板
const prompt = new PromptTemplate({
template: `请逐步回答以下问题并以 JSON 格式返回:
问题: {question}
请按照以下格式逐步返回 JSON (可以是不完整的 JSON):
{
"answer": "你的答案",
"confidence": 0.95,
"sources": ["https://example.com"],
"followUpQuestions": ["相关问题1", "相关问题2"]
}`,
inputVariables: ["question"]
});
// 生成流式响应
const messages: BaseMessage[] = [
{ role: 'user', content: await prompt.format({ question }) }
];
const stream = await this.llm.stream(messages);
// 使用流式解析器处理响应
const transformStream = this.parser.transform(stream);
// 逐个 yield 解析结果
for await (const parsedAnswer of transformStream) {
yield parsedAnswer;
}
} catch (error) {
if (error instanceof OutputParserException) {
console.error('流式解析错误:', error.message);
yield { answer: `解析错误: ${error.message}` } as Partial<Answer>;
} else {
console.error('未预期错误:', error);
yield { answer: '系统错误' } as Partial<Answer>;
}
}
}
}
interface Answer {
answer: string;
confidence: number;
sources: string[];
followUpQuestions: string[];
}
// 在前端组件中使用流式解析
class StreamingAnswerComponent {
private qaSystem: StreamingQASystem;
private answerContainer: HTMLElement;
private sourcesContainer: HTMLElement;
constructor(qaSystem: StreamingQASystem) {
this.qaSystem = qaSystem;
this.answerContainer = document.getElementById('answer')!;
this.sourcesContainer = document.getElementById('sources')!;
}
async askQuestion(question: string) {
// 清空之前的内容
this.answerContainer.textContent = '';
this.sourcesContainer.innerHTML = '';
// 处理流式响应
for await (const partialAnswer of this.qaSystem.streamAnswer(question)) {
// 更新答案
if (partialAnswer.answer) {
this.answerContainer.textContent = partialAnswer.answer;
}
// 更新置信度(如果需要显示)
if (partialAnswer.confidence !== undefined) {
this.updateConfidence(partialAnswer.confidence);
}
// 更新来源
if (partialAnswer.sources) {
this.updateSources(partialAnswer.sources);
}
// 更新后续问题
if (partialAnswer.followUpQuestions) {
this.updateFollowUpQuestions(partialAnswer.followUpQuestions);
}
}
}
private updateConfidence(confidence: number) {
const confidenceElement = document.getElementById('confidence');
if (confidenceElement) {
confidenceElement.textContent = `置信度: ${(confidence * 100).toFixed(1)}%`;
confidenceElement.className = confidence > 0.8 ? 'high-confidence' :
confidence > 0.5 ? 'medium-confidence' : 'low-confidence';
}
}
private updateSources(sources: string[]) {
this.sourcesContainer.innerHTML = '';
sources.forEach(source => {
const sourceElement = document.createElement('div');
sourceElement.className = 'source';
sourceElement.innerHTML = `<a href="${source}" target="_blank">${source}</a>`;
this.sourcesContainer.appendChild(sourceElement);
});
}
private updateFollowUpQuestions(questions: string[]) {
const followUpContainer = document.getElementById('follow-up');
if (followUpContainer) {
followUpContainer.innerHTML = '';
questions.forEach(question => {
const questionElement = document.createElement('button');
questionElement.className = 'follow-up-question';
questionElement.textContent = question;
questionElement.onclick = () => this.askQuestion(question);
followUpContainer.appendChild(questionElement);
});
}
}
}复杂结构的流式解析
对于更复杂的结构,流式解析需要更智能的处理:
typescript
// 处理列表结构的流式解析器
class ListOutputParser<T> extends BaseOutputParser<T[]> {
private itemParser: BaseOutputParser<T>;
constructor(itemParser: BaseOutputParser<T>) {
super();
this.itemParser = itemParser;
}
async parse(text: string): Promise<T[]> {
// 提取列表结构
const listMatch = text.match(/$$(?:.|\n)*$$/);
if (!listMatch) {
throw new OutputParserException('未找到列表结构', text);
}
try {
const parsedArray = JSON.parse(listMatch[0]);
if (!Array.isArray(parsedArray)) {
throw new Error('解析结果不是数组');
}
return await Promise.all(parsedArray.map(item => this.itemParser.parse(JSON.stringify(item))));
} catch (error) {
throw new OutputParserException(
`列表解析失败: ${error.message}`,
text,
error
);
}
}
async *transform(inputGenerator: AsyncGenerator<string>): AsyncGenerator<T[]> {
let accumulatedText = '';
let lastYieldedLength = 0;
for await (const chunk of inputGenerator) {
accumulatedText += chunk;
try {
// 尝试提取数组
const arrayMatch = accumulatedText.match(/$$(?:.|\n)*$$/);
if (arrayMatch) {
const parsedArray = JSON.parse(arrayMatch[0]);
if (Array.isArray(parsedArray) && parsedArray.length > lastYieldedLength) {
// 只解析新增的项目
const newItems = parsedArray.slice(lastYieldedLength);
const parsedNewItems = await Promise.all(
newItems.map(item => this.itemParser.parse(JSON.stringify(item)))
);
lastYieldedLength = parsedArray.length;
yield parsedArray.slice(0, lastYieldedLength);
}
}
} catch (error) {
// JSON 不完整,继续累积
if (error instanceof SyntaxError) {
continue;
}
throw error;
}
}
// 最终解析
try {
const finalResult = await this.parse(accumulatedText);
yield finalResult;
} catch (error) {
throw new OutputParserException(
`列表流式解析最终失败: ${error.message}`,
accumulatedText,
error
);
}
}
}
// 使用示例
interface Product {
name: string;
price: number;
description: string;
}
const productParser = new JsonOutputParser<Product>();
const productListParser = new ListOutputParser<Product>(productParser);
async function* mockStream(): AsyncGenerator<string> {
const response = `[{"name": "产品A", "price": 100, "description": "描述A"}, {"name": "产品B", "price": 200, "description": "描述B"}]`;
for (let i = 0; i < response.length; i += 10) {
yield response.slice(i, i + 10);
await new Promise(resolve => setTimeout(resolve, 100)); // 模拟延迟
}
}
async function demonstrateListStreaming() {
console.log('开始流式解析产品列表...');
for await (const productList of productListParser.transform(mockStream())) {
console.log('当前解析结果:', productList);
}
console.log('流式解析完成');
}错误处理和恢复
流式解析中的错误处理和恢复机制:
typescript
class ResilientStreamingParser<T> extends BaseOutputParser<T> {
private parser: BaseOutputParser<T>;
private bufferWindow: number = 1000; // 缓冲区大小
private recoveryAttempts: number = 3;
constructor(parser: BaseOutputParser<T>) {
super();
this.parser = parser;
}
async *transform(inputGenerator: AsyncGenerator<string>): AsyncGenerator<T> {
let buffer = '';
let recoveryCount = 0;
for await (const chunk of inputGenerator) {
buffer += chunk;
// 限制缓冲区大小
if (buffer.length > this.bufferWindow) {
buffer = buffer.slice(-this.bufferWindow);
}
try {
const result = await this.parser.parse(buffer);
yield result;
recoveryCount = 0; // 重置恢复计数
} catch (error) {
if (error instanceof OutputParserException) {
if (recoveryCount < this.recoveryAttempts) {
recoveryCount++;
console.warn(`流式解析失败,尝试恢复 (${recoveryCount}/${this.recoveryAttempts}):`, error.message);
continue;
} else {
console.error('流式解析恢复尝试用尽:', error.message);
throw new OutputParserException(
`流式解析持续失败: ${error.message}`,
buffer,
error
);
}
} else {
throw error;
}
}
}
// 最终尝试
if (buffer) {
try {
const finalResult = await this.parser.parse(buffer);
yield finalResult;
} catch (error) {
throw new OutputParserException(
`流式解析最终失败: ${error.message}`,
buffer,
error
);
}
}
}
async parse(text: string): Promise<T> {
return await this.parser.parse(text);
}
}总结
流式解析通过 transform() 方法为 LangChain V3 提供了强大的增量文本处理能力。关键要点包括:
- 增量处理 - 逐步处理到达的文本片段
- 智能解析 - 在文本不完整时继续累积,完整时进行解析
- 去重机制 - 避免重复 yield 相同的结果
- 错误恢复 - 在解析失败时继续尝试而不是立即报错
- 内存效率 - 通过缓冲区管理控制内存使用
- 用户体验 - 提供实时反馈和流畅的交互体验
通过这些机制,LangChain V3 的流式解析功能使得开发者能够构建更加响应迅速和用户友好的 LLM 应用。
在接下来的章节中,我们将探讨 Retrieval Chain 相关内容,包括 VectorStoreRetriever 作为 Runnable<Query, Document[]> 的实现。