Skip to content

流式实现:stream() 返回 ReadableStream<ChatCompletionChunk>

分块解析 SSE 响应,逐个 yield token

在现代 LLM 应用中,流式响应是一个关键特性,它允许用户在生成过程中逐步看到结果,提供更好的交互体验。LangChain V3 通过 stream() 方法实现了对流式响应的完整支持,能够处理 Server-Sent Events (SSE) 格式的响应,并逐个生成 token。本章将深入探讨流式实现的机制和最佳实践。

流式响应的重要性

流式响应在 LLM 应用中有以下几个重要作用:

  1. 用户体验 - 用户可以立即看到部分结果,无需等待完整生成
  2. 感知性能 - 即使总时间相同,逐步显示会让用户感觉更快
  3. 实时交互 - 支持实时打断、修改等交互操作
  4. 资源效率 - 不需要等待完整响应就可以开始处理

ReadableStream 和 AsyncGenerator

LangChain V3 的流式实现基于两种主要的流式数据处理方式:

ReadableStream

是 Web Streams API 的一部分,提供了标准化的流式数据处理接口:

typescript
interface ReadableStream<R = any> {
  getReader(): ReadableStreamDefaultReader<R>;
  tee(): [ReadableStream<R>, ReadableStream<R>];
  // 其他方法...
}

interface ReadableStreamDefaultReader<R = any> {
  read(): Promise<ReadableStreamReadResult<R>>;
  releaseLock(): void;
  cancel(reason?: any): Promise<void>;
}

type ReadableStreamReadResult<T> = 
  | { done: false; value: T; }
  | { done: true; value?: undefined; };

AsyncGenerator

AsyncGenerator 是 JavaScript 中处理异步迭代的原生方式:

typescript
interface AsyncGenerator<T = unknown, TReturn = any, TNext = unknown> 
  extends AsyncIterator<T, TReturn, TNext> {
  next(...args: [] | [TNext]): Promise<IteratorResult<T, TReturn>>;
  return(value: TReturn | PromiseLike<TReturn>): Promise<IteratorResult<T, TReturn>>;
  throw(e: any): Promise<IteratorResult<T, TReturn>>;
}

ChatCompletionChunk 接口

在聊天模型中,流式响应的基本单位是 ChatCompletionChunk

typescript
interface ChatCompletionChunk {
  id: string;
  object: string;
  created: number;
  model: string;
  choices: Array<{
    index: number;
    delta: {
      role?: string;
      content?: string;
    };
    finish_reason: string | null;
  }>;
}

流式实现的核心机制

让我们深入探讨 LangChain V3 中流式实现的核心机制:

基础流式实现

typescript
class BaseChatModel {
  async *stream(
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<string> {
    const stream = this.streamGenerate([messages], options);
    
    for await (const chunk of stream) {
      // 从 chunk 中提取文本内容
      const text = this.extractTextFromChunk(chunk);
      if (text) {
        yield text;
      }
    }
  }
  
  protected extractTextFromChunk(chunk: LLMResult): string {
    // 从 LLMResult 中提取文本
    return chunk.generations[0]?.[0]?.text || '';
  }
}

OpenAI 流式实现

typescript
class ChatOpenAI extends BaseChatModel {
  protected async *_streamCall(
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<ChatCompletionChunk> {
    // 发起流式请求
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: this.modelName,
        messages,
        temperature: options?.temperature,
        max_tokens: options?.maxTokens,
        stream: true // 启用流式响应
      }),
      signal: options?.signal
    });
    
    // 处理 SSE 响应
    yield* this.processSSEStream(response);
  }
  
  private async *processSSEStream(response: Response): AsyncGenerator<ChatCompletionChunk> {
    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
    
    if (!reader) {
      throw new Error('无法获取响应流');
    }
    
    try {
      let buffer = '';
      
      while (true) {
        const { done, value } = await reader.read();
        
        if (done) {
          break;
        }
        
        // 解码接收到的数据
        buffer += decoder.decode(value, { stream: true });
        
        // 处理完整的行
        const lines = buffer.split('\n');
        buffer = lines.pop() || ''; // 保留不完整的行
        
        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6); // 移除 'data: ' 前缀
            
            if (data === '[DONE]') {
              // 流结束标记
              return;
            }
            
            try {
              const chunk: ChatCompletionChunk = JSON.parse(data);
              yield chunk;
            } catch (error) {
              console.warn('解析 SSE 数据失败:', error);
            }
          }
        }
      }
    } finally {
      reader.releaseLock();
    }
  }
}

处理分块文本

在流式响应中,文本内容通常以小块形式逐步返回:

typescript
class StreamingTextProcessor {
  private buffer: string = '';
  private isFirstChunk: boolean = true;
  
  processChunk(chunk: ChatCompletionChunk): string[] {
    const texts: string[] = [];
    
    for (const choice of chunk.choices) {
      if (choice.delta?.content) {
        const content = choice.delta.content;
        
        // 处理特殊字符和格式
        const processedContent = this.processContent(content);
        texts.push(processedContent);
      }
    }
    
    return texts;
  }
  
  private processContent(content: string): string {
    // 处理特殊字符
    let processed = content;
    
    // 处理 Unicode 代理对
    if (this.buffer) {
      processed = this.buffer + processed;
      this.buffer = '';
    }
    
    // 检查是否有不完整的 Unicode 字符
    const lastChar = processed.charCodeAt(processed.length - 1);
    if (lastChar >= 0xD800 && lastChar <= 0xDBFF) {
      // 高代理项,需要等待低代理项
      this.buffer = processed.slice(-1);
      processed = processed.slice(0, -1);
    }
    
    return processed;
  }
}

流式响应的消费

在 Node.js 环境中消费流

typescript
async function consumeStreamInNode(model: BaseChatModel, messages: BaseMessage[]) {
  const stream = await model.stream(messages);
  
  // 方式1: 使用 for-await-of
  for await (const chunk of stream) {
    process.stdout.write(chunk);
  }
  
  // 方式2: 手动迭代
  const reader = stream[Symbol.asyncIterator]();
  while (true) {
    const { value, done } = await reader.next();
    if (done) break;
    process.stdout.write(value);
  }
}

在浏览器环境中消费流

typescript
async function consumeStreamInBrowser(model: BaseChatModel, messages: BaseMessage[]) {
  const stream = await model.stream(messages);
  
  const textElement = document.getElementById('output');
  if (!textElement) return;
  
  for await (const chunk of stream) {
    textElement.textContent += chunk;
    
    // 滚动到最新内容
    textElement.scrollTop = textElement.scrollHeight;
  }
}

转换为 ReadableStream

typescript
class BaseChatModel {
  toReadableStream(
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): ReadableStream<string> {
    return new ReadableStream({
      async start(controller) {
        try {
          const stream = this.stream(messages, options);
          for await (const chunk of stream) {
            controller.enqueue(chunk);
          }
          controller.close();
        } catch (error) {
          controller.error(error);
        }
      }
    });
  }
}

错误处理和中断

流式处理中的错误处理和中断机制非常重要:

typescript
class RobustStreamingProcessor {
  async *processStream(
    model: BaseChatModel,
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<string> {
    const controller = new AbortController();
    const timeoutId = options?.timeout 
      ? setTimeout(() => controller.abort(), options.timeout)
      : null;
    
    try {
      const streamOptions = {
        ...options,
        signal: controller.signal
      };
      
      const stream = await model.stream(messages, streamOptions);
      
      for await (const chunk of stream) {
        // 检查是否已被中断
        if (controller.signal.aborted) {
          throw new Error('流处理已被中断');
        }
        
        yield chunk;
      }
    } catch (error) {
      if (error.name === 'AbortError') {
        throw new Error('流处理超时');
      }
      throw error;
    } finally {
      if (timeoutId) {
        clearTimeout(timeoutId);
      }
    }
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何在 Web 应用中使用流式响应:

typescript
// 创建一个聊天应用的流式处理器
class ChatStreamHandler {
  private model: BaseChatModel;
  private messageHistory: BaseMessage[] = [];
  
  constructor(model: BaseChatModel) {
    this.model = model;
  }
  
  async *processUserMessage(userMessage: string): AsyncGenerator<{
    type: 'content' | 'complete' | 'error';
    data: string;
  }> {
    try {
      // 添加用户消息到历史
      this.messageHistory.push({
        role: 'user',
        content: userMessage
      });
      
      // 生成流式响应
      const stream = await this.model.stream(this.messageHistory);
      
      let fullResponse = '';
      
      // 逐个处理响应块
      for await (const chunk of stream) {
        fullResponse += chunk;
        yield { type: 'content', data: chunk };
      }
      
      // 添加助手响应到历史
      this.messageHistory.push({
        role: 'assistant',
        content: fullResponse
      });
      
      yield { type: 'complete', data: '' };
    } catch (error) {
      yield { type: 'error', data: error.message };
    }
  }
}

// 在前端组件中使用
class ChatComponent {
  private chatHandler: ChatStreamHandler;
  private outputElement: HTMLElement;
  
  constructor(model: BaseChatModel) {
    this.chatHandler = new ChatStreamHandler(model);
    this.outputElement = document.getElementById('chat-output')!;
  }
  
  async sendMessage(userMessage: string) {
    // 显示用户消息
    this.displayMessage('user', userMessage);
    
    // 创建新的响应容器
    const responseContainer = this.createResponseContainer();
    
    // 处理流式响应
    let fullResponse = '';
    
    for await (const { type, data } of this.chatHandler.processUserMessage(userMessage)) {
      switch (type) {
        case 'content':
          fullResponse += data;
          responseContainer.textContent = fullResponse;
          // 滚动到最新内容
          responseContainer.scrollIntoView({ behavior: 'smooth' });
          break;
          
        case 'complete':
          // 处理完成
          console.log('响应完成');
          break;
          
        case 'error':
          // 处理错误
          this.displayError(data);
          break;
      }
    }
  }
  
  private displayMessage(role: string, content: string) {
    const messageElement = document.createElement('div');
    messageElement.className = `message ${role}`;
    messageElement.textContent = content;
    this.outputElement.appendChild(messageElement);
  }
  
  private createResponseContainer(): HTMLElement {
    const container = document.createElement('div');
    container.className = 'message assistant';
    this.outputElement.appendChild(container);
    return container;
  }
  
  private displayError(error: string) {
    const errorElement = document.createElement('div');
    errorElement.className = 'message error';
    errorElement.textContent = `错误: ${error}`;
    this.outputElement.appendChild(errorElement);
  }
}

性能优化考虑

在实现流式处理时,需要考虑以下性能优化:

typescript
class OptimizedStreamingProcessor {
  private chunkBuffer: string[] = [];
  private bufferSize: number = 5;
  private debounceTimer: NodeJS.Timeout | null = null;
  
  async *processWithBuffering(
    stream: AsyncGenerator<string>
  ): AsyncGenerator<string> {
    for await (const chunk of stream) {
      this.chunkBuffer.push(chunk);
      
      // 当缓冲区满时或经过一定时间后输出
      if (this.chunkBuffer.length >= this.bufferSize) {
        yield this.flushBuffer();
      } else if (!this.debounceTimer) {
        this.debounceTimer = setTimeout(() => {
          if (this.chunkBuffer.length > 0) {
            yield this.flushBuffer();
          }
          this.debounceTimer = null;
        }, 100);
      }
    }
    
    // 输出剩余的缓冲区内容
    if (this.chunkBuffer.length > 0) {
      yield this.flushBuffer();
    }
  }
  
  private flushBuffer(): string {
    const content = this.chunkBuffer.join('');
    this.chunkBuffer = [];
    return content;
  }
}

总结

LangChain V3 的流式实现通过 stream() 方法返回 ReadableStream<ChatCompletionChunk>,为开发者提供了强大的流式处理能力。关键要点包括:

  1. SSE 解析 - 正确解析 Server-Sent Events 格式的响应
  2. 分块处理 - 逐个处理和生成 token
  3. 错误处理 - 完善的错误处理和中断机制
  4. 性能优化 - 缓冲和防抖等优化技术
  5. 跨环境支持 - 同时支持 Node.js 和浏览器环境

通过这些机制,LangChain V3 能够提供流畅、响应迅速的 LLM 交互体验,满足现代应用对实时性的要求。

在下一章中,我们将探讨 Token 计算与截断:如何集成 tiktokensentencepiece,进一步优化模型调用。