Skip to content

流式解析:transform() 方法如何处理增量文本?

在现代 LLM 应用中,流式处理是一个关键特性,它允许用户在生成过程中逐步看到结果。然而,流式处理不仅仅是逐个输出 token,还需要能够流式解析增量文本。LangChain V3 通过 transform() 方法实现了这一功能,使得 OutputParser 能够处理逐步到达的文本片段。本章将深入探讨流式解析的机制和实现。

流式解析的重要性

流式解析在 LLM 应用中有以下几个重要作用:

  1. 实时反馈 - 用户可以立即看到部分解析结果
  2. 内存效率 - 不需要等待完整输出就可以开始处理
  3. 用户体验 - 提供更流畅的交互体验
  4. 错误检测 - 可以在早期检测和处理解析错误

Transform 方法的设计

在 LangChain V3 中,transform() 方法是实现流式解析的关键:

typescript
abstract class BaseOutputParser<T> implements Runnable<string, T> {
  // 传统的完整解析方法
  abstract parse(text: string): Promise<T> | T;
  
  // 流式解析方法
  async *transform(
    inputGenerator: AsyncGenerator<string>
  ): AsyncGenerator<T> {
    let accumulatedText = '';
    
    // 逐步累积文本
    for await (const chunk of inputGenerator) {
      accumulatedText += chunk;
      
      try {
        // 尝试解析累积的文本
        const result = await this.parse(accumulatedText);
        yield result;
      } catch (error) {
        // 如果解析失败,继续累积更多文本
        if (!(error instanceof OutputParserException)) {
          throw error;
        }
        // 解析失败时继续等待更多内容
        continue;
      }
    }
    
    // 最后确保完整解析
    try {
      const finalResult = await this.parse(accumulatedText);
      yield finalResult;
    } catch (error) {
      throw new OutputParserException(
        `流式解析最终失败: ${error.message}`,
        accumulatedText,
        error
      );
    }
  }
  
  // Runnable 接口实现
  async invoke(input: string): Promise<T> {
    return await this.parse(input);
  }
}

JsonOutputParser 的流式实现

让我们详细看看 JsonOutputParser 如何实现流式解析:

typescript
class JsonOutputParser<T extends Record<string, any> = Record<string, any>> 
  extends BaseOutputParser<T> {
  
  private schema?: any;
  private partialParsing: boolean = true;
  
  constructor(schema?: any, options?: { partialParsing?: boolean }) {
    super();
    this.schema = schema;
    this.partialParsing = options?.partialParsing ?? true;
  }
  
  async parse(text: string): Promise<T> {
    const jsonText = this.extractJson(text);
    
    try {
      const parsed = JSON.parse(jsonText);
      
      if (this.schema) {
        return this.validate(parsed);
      }
      
      return parsed;
    } catch (error) {
      throw new OutputParserException(
        `JSON 解析失败: ${error.message}`,
        text,
        error
      );
    }
  }
  
  async *transform(
    inputGenerator: AsyncGenerator<string>
  ): AsyncGenerator<T> {
    let accumulatedText = '';
    let lastYielded: T | null = null;
    let hasYieldedValid = false;
    
    for await (const chunk of inputGenerator) {
      accumulatedText += chunk;
      
      try {
        // 尝试提取和解析 JSON
        const jsonText = this.extractJson(accumulatedText);
        if (jsonText) {
          // 尝试解析 JSON
          const parsed = JSON.parse(jsonText);
          
          // 验证(如果提供了 schema)
          let validated: T;
          if (this.schema) {
            validated = this.validate(parsed);
          } else {
            validated = parsed;
          }
          
          // 只有当新结果与上次不同时才 yield
          if (!this.deepEquals(lastYielded, validated)) {
            lastYielded = validated;
            hasYieldedValid = true;
            yield validated;
          }
        }
      } catch (error) {
        // JSON 不完整或格式错误,继续累积
        if (error instanceof SyntaxError) {
          continue;
        }
        
        // 其他错误抛出
        if (!(error instanceof OutputParserException)) {
          throw new OutputParserException(
            `流式解析错误: ${error.message}`,
            accumulatedText,
            error
          );
        }
      }
    }
    
    // 确保最终输出
    if (!hasYieldedValid && accumulatedText) {
      try {
        const finalResult = await this.parse(accumulatedText);
        yield finalResult;
      } catch (error) {
        throw new OutputParserException(
          `流式解析最终失败: ${error.message}`,
          accumulatedText,
          error
        );
      }
    }
  }
  
  private extractJson(text: string): string {
    // 查找 JSON 对象或数组
    const objectMatch = text.match(/(\{[^]*\})/);
    const arrayMatch = text.match(/($$[^]*\$$)/);
    
    // 优先返回更完整的匹配
    if (objectMatch && arrayMatch) {
      return objectMatch[0].length > arrayMatch[0].length ? objectMatch[0] : arrayMatch[0];
    }
    
    if (objectMatch) return objectMatch[0];
    if (arrayMatch) return arrayMatch[0];
    
    return '';
  }
  
  private validate(data: any): T {
    if (this.schema && typeof this.schema.parse === 'function') {
      try {
        return this.schema.parse(data);
      } catch (error) {
        throw new OutputParserException(
          `JSON 验证失败: ${error.message}`,
          JSON.stringify(data),
          error
        );
      }
    }
    
    return data;
  }
  
  private deepEquals(a: any, b: any): boolean {
    if (a === b) return true;
    if (a == null || b == null) return a === b;
    if (typeof a !== 'object' || typeof b !== 'object') return a === b;
    
    const keysA = Object.keys(a);
    const keysB = Object.keys(b);
    
    if (keysA.length !== keysB.length) return false;
    
    for (const key of keysA) {
      if (!keysB.includes(key) || !this.deepEquals(a[key], b[key])) {
        return false;
      }
    }
    
    return true;
  }
}

流式解析的实际应用

让我们看一个完整的实际应用示例,展示如何使用流式解析:

typescript
// 创建一个流式问答系统
class StreamingQASystem {
  private llm: BaseChatModel;
  private parser: JsonOutputParser<Answer>;
  
  constructor(llm: BaseChatModel) {
    this.llm = llm;
    
    // 定义答案结构
    const AnswerSchema = {
      parse: (data: any) => {
        if (typeof data !== 'object' || data === null) {
          throw new Error('数据必须是对象');
        }
        
        return {
          answer: typeof data.answer === 'string' ? data.answer : '',
          confidence: typeof data.confidence === 'number' ? data.confidence : 0,
          sources: Array.isArray(data.sources) ? data.sources : [],
          followUpQuestions: Array.isArray(data.followUpQuestions) 
            ? data.followUpQuestions 
            : []
        };
      }
    };
    
    this.parser = new JsonOutputParser<Answer>(AnswerSchema);
  }
  
  async *streamAnswer(question: string): AsyncGenerator<Partial<Answer>> {
    try {
      // 创建提示模板
      const prompt = new PromptTemplate({
        template: `请逐步回答以下问题并以 JSON 格式返回:
问题: {question}

请按照以下格式逐步返回 JSON (可以是不完整的 JSON):
{
  "answer": "你的答案",
  "confidence": 0.95,
  "sources": ["https://example.com"],
  "followUpQuestions": ["相关问题1", "相关问题2"]
}`,
        inputVariables: ["question"]
      });
      
      // 生成流式响应
      const messages: BaseMessage[] = [
        { role: 'user', content: await prompt.format({ question }) }
      ];
      
      const stream = await this.llm.stream(messages);
      
      // 使用流式解析器处理响应
      const transformStream = this.parser.transform(stream);
      
      // 逐个 yield 解析结果
      for await (const parsedAnswer of transformStream) {
        yield parsedAnswer;
      }
      
    } catch (error) {
      if (error instanceof OutputParserException) {
        console.error('流式解析错误:', error.message);
        yield { answer: `解析错误: ${error.message}` } as Partial<Answer>;
      } else {
        console.error('未预期错误:', error);
        yield { answer: '系统错误' } as Partial<Answer>;
      }
    }
  }
}

interface Answer {
  answer: string;
  confidence: number;
  sources: string[];
  followUpQuestions: string[];
}

// 在前端组件中使用流式解析
class StreamingAnswerComponent {
  private qaSystem: StreamingQASystem;
  private answerContainer: HTMLElement;
  private sourcesContainer: HTMLElement;
  
  constructor(qaSystem: StreamingQASystem) {
    this.qaSystem = qaSystem;
    this.answerContainer = document.getElementById('answer')!;
    this.sourcesContainer = document.getElementById('sources')!;
  }
  
  async askQuestion(question: string) {
    // 清空之前的内容
    this.answerContainer.textContent = '';
    this.sourcesContainer.innerHTML = '';
    
    // 处理流式响应
    for await (const partialAnswer of this.qaSystem.streamAnswer(question)) {
      // 更新答案
      if (partialAnswer.answer) {
        this.answerContainer.textContent = partialAnswer.answer;
      }
      
      // 更新置信度(如果需要显示)
      if (partialAnswer.confidence !== undefined) {
        this.updateConfidence(partialAnswer.confidence);
      }
      
      // 更新来源
      if (partialAnswer.sources) {
        this.updateSources(partialAnswer.sources);
      }
      
      // 更新后续问题
      if (partialAnswer.followUpQuestions) {
        this.updateFollowUpQuestions(partialAnswer.followUpQuestions);
      }
    }
  }
  
  private updateConfidence(confidence: number) {
    const confidenceElement = document.getElementById('confidence');
    if (confidenceElement) {
      confidenceElement.textContent = `置信度: ${(confidence * 100).toFixed(1)}%`;
      confidenceElement.className = confidence > 0.8 ? 'high-confidence' : 
                                   confidence > 0.5 ? 'medium-confidence' : 'low-confidence';
    }
  }
  
  private updateSources(sources: string[]) {
    this.sourcesContainer.innerHTML = '';
    sources.forEach(source => {
      const sourceElement = document.createElement('div');
      sourceElement.className = 'source';
      sourceElement.innerHTML = `<a href="${source}" target="_blank">${source}</a>`;
      this.sourcesContainer.appendChild(sourceElement);
    });
  }
  
  private updateFollowUpQuestions(questions: string[]) {
    const followUpContainer = document.getElementById('follow-up');
    if (followUpContainer) {
      followUpContainer.innerHTML = '';
      questions.forEach(question => {
        const questionElement = document.createElement('button');
        questionElement.className = 'follow-up-question';
        questionElement.textContent = question;
        questionElement.onclick = () => this.askQuestion(question);
        followUpContainer.appendChild(questionElement);
      });
    }
  }
}

复杂结构的流式解析

对于更复杂的结构,流式解析需要更智能的处理:

typescript
// 处理列表结构的流式解析器
class ListOutputParser<T> extends BaseOutputParser<T[]> {
  private itemParser: BaseOutputParser<T>;
  
  constructor(itemParser: BaseOutputParser<T>) {
    super();
    this.itemParser = itemParser;
  }
  
  async parse(text: string): Promise<T[]> {
    // 提取列表结构
    const listMatch = text.match(/$$(?:.|\n)*$$/);
    if (!listMatch) {
      throw new OutputParserException('未找到列表结构', text);
    }
    
    try {
      const parsedArray = JSON.parse(listMatch[0]);
      if (!Array.isArray(parsedArray)) {
        throw new Error('解析结果不是数组');
      }
      
      return await Promise.all(parsedArray.map(item => this.itemParser.parse(JSON.stringify(item))));
    } catch (error) {
      throw new OutputParserException(
        `列表解析失败: ${error.message}`,
        text,
        error
      );
    }
  }
  
  async *transform(inputGenerator: AsyncGenerator<string>): AsyncGenerator<T[]> {
    let accumulatedText = '';
    let lastYieldedLength = 0;
    
    for await (const chunk of inputGenerator) {
      accumulatedText += chunk;
      
      try {
        // 尝试提取数组
        const arrayMatch = accumulatedText.match(/$$(?:.|\n)*$$/);
        if (arrayMatch) {
          const parsedArray = JSON.parse(arrayMatch[0]);
          if (Array.isArray(parsedArray) && parsedArray.length > lastYieldedLength) {
            // 只解析新增的项目
            const newItems = parsedArray.slice(lastYieldedLength);
            const parsedNewItems = await Promise.all(
              newItems.map(item => this.itemParser.parse(JSON.stringify(item)))
            );
            
            lastYieldedLength = parsedArray.length;
            yield parsedArray.slice(0, lastYieldedLength);
          }
        }
      } catch (error) {
        // JSON 不完整,继续累积
        if (error instanceof SyntaxError) {
          continue;
        }
        throw error;
      }
    }
    
    // 最终解析
    try {
      const finalResult = await this.parse(accumulatedText);
      yield finalResult;
    } catch (error) {
      throw new OutputParserException(
        `列表流式解析最终失败: ${error.message}`,
        accumulatedText,
        error
      );
    }
  }
}

// 使用示例
interface Product {
  name: string;
  price: number;
  description: string;
}

const productParser = new JsonOutputParser<Product>();
const productListParser = new ListOutputParser<Product>(productParser);

async function* mockStream(): AsyncGenerator<string> {
  const response = `[{"name": "产品A", "price": 100, "description": "描述A"}, {"name": "产品B", "price": 200, "description": "描述B"}]`;
  
  for (let i = 0; i < response.length; i += 10) {
    yield response.slice(i, i + 10);
    await new Promise(resolve => setTimeout(resolve, 100)); // 模拟延迟
  }
}

async function demonstrateListStreaming() {
  console.log('开始流式解析产品列表...');
  
  for await (const productList of productListParser.transform(mockStream())) {
    console.log('当前解析结果:', productList);
  }
  
  console.log('流式解析完成');
}

错误处理和恢复

流式解析中的错误处理和恢复机制:

typescript
class ResilientStreamingParser<T> extends BaseOutputParser<T> {
  private parser: BaseOutputParser<T>;
  private bufferWindow: number = 1000; // 缓冲区大小
  private recoveryAttempts: number = 3;
  
  constructor(parser: BaseOutputParser<T>) {
    super();
    this.parser = parser;
  }
  
  async *transform(inputGenerator: AsyncGenerator<string>): AsyncGenerator<T> {
    let buffer = '';
    let recoveryCount = 0;
    
    for await (const chunk of inputGenerator) {
      buffer += chunk;
      
      // 限制缓冲区大小
      if (buffer.length > this.bufferWindow) {
        buffer = buffer.slice(-this.bufferWindow);
      }
      
      try {
        const result = await this.parser.parse(buffer);
        yield result;
        recoveryCount = 0; // 重置恢复计数
      } catch (error) {
        if (error instanceof OutputParserException) {
          if (recoveryCount < this.recoveryAttempts) {
            recoveryCount++;
            console.warn(`流式解析失败,尝试恢复 (${recoveryCount}/${this.recoveryAttempts}):`, error.message);
            continue;
          } else {
            console.error('流式解析恢复尝试用尽:', error.message);
            throw new OutputParserException(
              `流式解析持续失败: ${error.message}`,
              buffer,
              error
            );
          }
        } else {
          throw error;
        }
      }
    }
    
    // 最终尝试
    if (buffer) {
      try {
        const finalResult = await this.parser.parse(buffer);
        yield finalResult;
      } catch (error) {
        throw new OutputParserException(
          `流式解析最终失败: ${error.message}`,
          buffer,
          error
        );
      }
    }
  }
  
  async parse(text: string): Promise<T> {
    return await this.parser.parse(text);
  }
}

总结

流式解析通过 transform() 方法为 LangChain V3 提供了强大的增量文本处理能力。关键要点包括:

  1. 增量处理 - 逐步处理到达的文本片段
  2. 智能解析 - 在文本不完整时继续累积,完整时进行解析
  3. 去重机制 - 避免重复 yield 相同的结果
  4. 错误恢复 - 在解析失败时继续尝试而不是立即报错
  5. 内存效率 - 通过缓冲区管理控制内存使用
  6. 用户体验 - 提供实时反馈和流畅的交互体验

通过这些机制,LangChain V3 的流式解析功能使得开发者能够构建更加响应迅速和用户友好的 LLM 应用。

在接下来的章节中,我们将探讨 Retrieval Chain 相关内容,包括 VectorStoreRetriever 作为 Runnable<Query, Document[]> 的实现。