Skip to content

前端消费流式响应:Response.body.pipeTo(ReadableStreamDefaultReader)

实现打字机效果

在现代 Web 应用中,流式响应为用户提供更好的交互体验。通过逐步显示 LLM 生成的内容,用户可以立即看到部分结果,而不必等待完整响应。本章将深入探讨如何在前端消费流式响应,并实现打字机效果来提升用户体验。

流式响应的基础概念

Web 流式响应基于 ReadableStream API,允许逐步接收和处理数据:

typescript
// ReadableStream 基础使用
async function consumeStreamBasic(response: Response) {
  const reader = response.body?.getReader();
  const decoder = new TextDecoder();
  
  if (!reader) {
    throw new Error('响应体没有可用的读取器');
  }
  
  try {
    while (true) {
      const { done, value } = await reader.read();
      
      if (done) {
        console.log('流完成');
        break;
      }
      
      // 解码并处理数据
      const chunk = decoder.decode(value);
      console.log('接收到数据块:', chunk);
    }
  } finally {
    reader.releaseLock();
  }
}

Server-Sent Events (SSE) 处理

LangChain 流式 API 通常通过 SSE 提供服务,需要特殊处理:

typescript
// SSE 数据处理
class SSEProcessor {
  private decoder: TextDecoder;
  private buffer: string;
  
  constructor() {
    this.decoder = new TextDecoder();
    this.buffer = '';
  }
  
  processChunk(value: Uint8Array): Array<{ type: string; data: string }> {
    const text = this.decoder.decode(value, { stream: true });
    this.buffer += text;
    
    const events: Array<{ type: string; data: string }> = [];
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop() || ''; // 保留不完整的行
    
    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6); // 移除 'data: ' 前缀
        events.push({ type: 'data', data });
      } else if (line.startsWith('event: ')) {
        const eventType = line.slice(7); // 移除 'event: ' 前缀
        // 事件类型信息可以用于后续处理
      } else if (line === '') {
        // 空行表示事件结束
      }
    }
    
    return events;
  }
  
  flush(): Array<{ type: string; data: string }> {
    if (this.buffer) {
      const events = [{ type: 'data', data: this.buffer }];
      this.buffer = '';
      return events;
    }
    return [];
  }
}

打字机效果实现

实现经典的打字机效果来逐步显示文本:

typescript
// 打字机效果类
class TypewriterEffect {
  private targetElement: HTMLElement;
  private speed: number;
  private callback?: () => void;
  
  constructor(
    targetElement: HTMLElement,
    speed: number = 50,
    callback?: () => void
  ) {
    this.targetElement = targetElement;
    this.speed = speed;
    this.callback = callback;
  }
  
  async typeText(text: string): Promise<void> {
    return new Promise((resolve) => {
      let i = 0;
      const timer = setInterval(() => {
        if (i < text.length) {
          this.targetElement.textContent += text.charAt(i);
          i++;
        } else {
          clearInterval(timer);
          if (this.callback) {
            this.callback();
          }
          resolve();
        }
      }, this.speed);
    });
  }
  
  async typeStream(stream: AsyncGenerator<string>): Promise<void> {
    for await (const chunk of stream) {
      await this.typeText(chunk);
    }
  }
  
  clear(): void {
    this.targetElement.textContent = '';
  }
}

// 使用示例
/*
const outputElement = document.getElementById('output');
const typewriter = new TypewriterEffect(outputElement, 30);

// 逐步显示文本
typewriter.typeText('Hello, World!');

// 或者处理流式数据
async function* mockStream() {
  const text = '这是一个流式文本示例';
  for (let i = 0; i < text.length; i++) {
    yield text[i];
    await new Promise(resolve => setTimeout(resolve, 100));
  }
}

typewriter.typeStream(mockStream());
*/

现代流式处理实现

使用现代 Web API 实现更高效的流式处理:

typescript
// 现代流式处理器
class ModernStreamProcessor {
  private targetElement: HTMLElement;
  private onChunk?: (chunk: string) => void;
  private onComplete?: () => void;
  
  constructor(
    targetElement: HTMLElement,
    onChunk?: (chunk: string) => void,
    onComplete?: () => void
  ) {
    this.targetElement = targetElement;
    this.onChunk = onChunk;
    this.onComplete = onComplete;
  }
  
  async processResponse(response: Response): Promise<void> {
    if (!response.body) {
      throw new Error('响应体为空');
    }
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    const sseProcessor = new SSEProcessor();
    
    try {
      while (true) {
        const { done, value } = await reader.read();
        
        if (done) {
          // 处理缓冲区中剩余的数据
          const remainingEvents = sseProcessor.flush();
          for (const event of remainingEvents) {
            this.handleEvent(event);
          }
          break;
        }
        
        // 处理接收到的数据块
        const events = sseProcessor.processChunk(value);
        for (const event of events) {
          this.handleEvent(event);
        }
      }
      
      if (this.onComplete) {
        this.onComplete();
      }
    } finally {
      reader.releaseLock();
    }
  }
  
  private handleEvent(event: { type: string; data: string }): void {
    switch (event.type) {
      case 'data':
        try {
          const parsedData = JSON.parse(event.data);
          this.handleStreamData(parsedData);
        } catch (error) {
          // 如果不是 JSON,直接处理为文本
          this.handleTextData(event.data);
        }
        break;
        
      default:
        console.warn('未知事件类型:', event.type);
    }
  }
  
  private handleStreamData(data: any): void {
    if (typeof data === 'object' && data !== null) {
      switch (data.type) {
        case 'text':
          this.appendText(data.content);
          if (this.onChunk) {
            this.onChunk(data.content);
          }
          break;
          
        case 'thinking':
          this.showThinking(data.content);
          break;
          
        case 'done':
          this.showCompletion(data.content);
          break;
          
        case 'error':
          this.showError(data.content);
          break;
          
        default:
          // 处理其他类型的数据
          this.appendText(JSON.stringify(data));
      }
    } else {
      this.appendText(String(data));
    }
  }
  
  private handleTextData(text: string): void {
    this.appendText(text);
    if (this.onChunk) {
      this.onChunk(text);
    }
  }
  
  private appendText(text: string): void {
    this.targetElement.textContent += text;
    // 滚动到最新内容
    this.targetElement.scrollTop = this.targetElement.scrollHeight;
  }
  
  private showThinking(message: string): void {
    const thinkingElement = document.getElementById('thinking');
    if (thinkingElement) {
      thinkingElement.textContent = message;
      thinkingElement.style.display = 'block';
    }
  }
  
  private showCompletion(data: any): void {
    const completionElement = document.getElementById('completion');
    if (completionElement) {
      completionElement.textContent = typeof data === 'string' ? data : JSON.stringify(data, null, 2);
    }
  }
  
  private showError(message: string): void {
    const errorElement = document.getElementById('error');
    if (errorElement) {
      errorElement.textContent = message;
      errorElement.style.display = 'block';
    }
  }
}

React 组件中的流式处理

在 React 组件中实现流式处理和打字机效果:

typescript
// React 流式处理组件
import React, { useState, useEffect, useRef } from 'react';

interface StreamData {
  type: 'text' | 'thinking' | 'done' | 'error';
  content: string;
  timestamp: number;
}

const StreamProcessor: React.FC = () => {
  const [streamContent, setStreamContent] = useState<string>('');
  const [thinkingMessage, setThinkingMessage] = useState<string>('');
  const [isStreaming, setIsStreaming] = useState<boolean>(false);
  const [error, setError] = useState<string>('');
  const contentRef = useRef<HTMLDivElement>(null);
  
  // 打字机效果
  const typeWriter = (text: string, callback?: () => void) => {
    let i = 0;
    const timer = setInterval(() => {
      if (i < text.length) {
        setStreamContent(prev => prev + text.charAt(i));
        i++;
      } else {
        clearInterval(timer);
        if (callback) callback();
      }
    }, 30);
    
    return timer;
  };
  
  // 处理流式响应
  const processStream = async (prompt: string) => {
    setIsStreaming(true);
    setStreamContent('');
    setThinkingMessage('');
    setError('');
    
    try {
      const response = await fetch('/api/advanced-langchain/stream-chat', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          messages: [{ role: 'user', content: prompt }],
          useRAG: true,
        }),
      });
      
      if (!response.body) {
        throw new Error('响应体为空');
      }
      
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      const sseProcessor = new SSEProcessor();
      let typeWriterTimer: NodeJS.Timeout | null = null;
      
      try {
        while (true) {
          const { done, value } = await reader.read();
          
          if (done) {
            // 处理剩余数据
            const remainingEvents = sseProcessor.flush();
            for (const event of remainingEvents) {
              handleStreamEvent(event);
            }
            break;
          }
          
          // 处理数据块
          const events = sseProcessor.processChunk(value);
          for (const event of events) {
            handleStreamEvent(event);
          }
        }
      } finally {
        reader.releaseLock();
        if (typeWriterTimer) {
          clearInterval(typeWriterTimer);
        }
      }
    } catch (err) {
      setError(err instanceof Error ? err.message : '发生未知错误');
    } finally {
      setIsStreaming(false);
    }
  };
  
  const handleStreamEvent = (event: { type: string; data: string }) => {
    if (event.type === 'data') {
      try {
        const data: StreamData = JSON.parse(event.data);
        handleStreamData(data);
      } catch (error) {
        // 处理非 JSON 数据
        setStreamContent(prev => prev + event.data);
      }
    }
  };
  
  const handleStreamData = (data: StreamData) => {
    switch (data.type) {
      case 'text':
        // 使用打字机效果显示文本
        typeWriter(data.content);
        break;
        
      case 'thinking':
        setThinkingMessage(data.content);
        break;
        
      case 'done':
        // 流完成
        break;
        
      case 'error':
        setError(data.content);
        break;
    }
  };
  
  // 自动滚动到底部
  useEffect(() => {
    if (contentRef.current) {
      contentRef.current.scrollTop = contentRef.current.scrollHeight;
    }
  }, [streamContent]);
  
  return (
    <div className="stream-processor">
      <div className="controls">
        <button 
          onClick={() => processStream('你好,能介绍一下 LangChain 吗?')}
          disabled={isStreaming}
        >
          {isStreaming ? '流式处理中...' : '开始流式处理'}
        </button>
      </div>
      
      {thinkingMessage && (
        <div className="thinking">
          <span className="thinking-icon">💡</span>
          {thinkingMessage}
        </div>
      )}
      
      <div 
        ref={contentRef}
        className="content"
        style={{ 
          minHeight: '200px',
          maxHeight: '400px',
          overflowY: 'auto',
          border: '1px solid #ccc',
          padding: '10px',
          fontFamily: 'monospace'
        }}
      >
        {streamContent}
      </div>
      
      {error && (
        <div className="error" style={{ color: 'red' }}>
          错误: {error}
        </div>
      )}
      
      {isStreaming && (
        <div className="streaming-indicator">
          <span className="spinner"></span>
          正在接收流式数据...
        </div>
      )}
    </div>
  );
};

export default StreamProcessor;

高级打字机效果

实现更高级的打字机效果,支持 HTML 内容和自定义样式:

typescript
// 高级打字机效果类
class AdvancedTypewriter {
  private targetElement: HTMLElement;
  private speed: number;
  private htmlMode: boolean;
  private cursorElement: HTMLElement | null;
  
  constructor(
    targetElement: HTMLElement,
    options: {
      speed?: number;
      htmlMode?: boolean;
      showCursor?: boolean;
    } = {}
  ) {
    this.targetElement = targetElement;
    this.speed = options.speed || 50;
    this.htmlMode = options.htmlMode || false;
    
    if (options.showCursor) {
      this.cursorElement = this.createCursor();
      this.targetElement.appendChild(this.cursorElement);
    } else {
      this.cursorElement = null;
    }
  }
  
  private createCursor(): HTMLElement {
    const cursor = document.createElement('span');
    cursor.className = 'typewriter-cursor';
    cursor.textContent = '|';
    cursor.style.animation = 'blink 1s infinite';
    
    // 添加闪烁动画样式
    if (!document.querySelector('#typewriter-styles')) {
      const style = document.createElement('style');
      style.id = 'typewriter-styles';
      style.textContent = `
        @keyframes blink {
          0%, 100% { opacity: 1; }
          50% { opacity: 0; }
        }
        .typewriter-cursor {
          margin-left: 2px;
        }
      `;
      document.head.appendChild(style);
    }
    
    return cursor;
  }
  
  async typeText(text: string): Promise<void> {
    return new Promise((resolve) => {
      if (this.htmlMode) {
        this.typeHtmlText(text, resolve);
      } else {
        this.typePlainText(text, resolve);
      }
    });
  }
  
  private typePlainText(text: string, resolve: () => void): void {
    let i = 0;
    const timer = setInterval(() => {
      if (i < text.length) {
        this.targetElement.textContent += text.charAt(i);
        i++;
      } else {
        clearInterval(timer);
        resolve();
      }
    }, this.speed);
  }
  
  private typeHtmlText(text: string, resolve: () => void): void {
    let i = 0;
    let buffer = '';
    
    const timer = setInterval(() => {
      if (i < text.length) {
        buffer += text.charAt(i);
        this.targetElement.innerHTML = buffer;
        i++;
      } else {
        clearInterval(timer);
        if (this.cursorElement) {
          this.targetElement.removeChild(this.cursorElement);
        }
        resolve();
      }
    }, this.speed);
  }
  
  async typeStream(stream: AsyncIterable<string>): Promise<void> {
    for await (const chunk of stream) {
      await this.typeText(chunk);
    }
  }
  
  clear(): void {
    this.targetElement.textContent = '';
    if (this.cursorElement) {
      this.targetElement.appendChild(this.cursorElement);
    }
  }
}

// 使用示例
/*
const outputElement = document.getElementById('output');
const advancedTypewriter = new AdvancedTypewriter(outputElement, {
  speed: 30,
  htmlMode: true,
  showCursor: true
});

// 显示带格式的文本
advancedTypewriter.typeText('<strong>重要信息:</strong> <em>这是一段格式化文本</em>');

// 或者处理流式数据
async function* htmlStream() {
  yield '<p>第一段内容</p>';
  await new Promise(resolve => setTimeout(resolve, 1000));
  yield '<p>第二段内容</p>';
  await new Promise(resolve => setTimeout(resolve, 1000));
  yield '<ul><li>列表项1</li><li>列表项2</li></ul>';
}

advancedTypewriter.typeStream(htmlStream());
*/

实际应用示例

让我们看一个完整的实际应用示例,展示如何在 Web 应用中实现流式响应处理和打字机效果:

typescript
// 完整的流式聊天应用
class StreamChatApp {
  private chatContainer: HTMLElement;
  private inputElement: HTMLTextAreaElement;
  private sendButton: HTMLButtonElement;
  private isStreaming: boolean = false;
  
  constructor() {
    this.chatContainer = document.getElementById('chat-container')!;
    this.inputElement = document.getElementById('chat-input') as HTMLTextAreaElement;
    this.sendButton = document.getElementById('send-button') as HTMLButtonElement;
    
    this.bindEvents();
  }
  
  private bindEvents(): void {
    this.sendButton.addEventListener('click', () => this.sendMessage());
    this.inputElement.addEventListener('keypress', (e) => {
      if (e.key === 'Enter' && !e.shiftKey) {
        e.preventDefault();
        this.sendMessage();
      }
    });
  }
  
  private async sendMessage(): Promise<void> {
    if (this.isStreaming) return;
    
    const message = this.inputElement.value.trim();
    if (!message) return;
    
    // 添加用户消息到聊天记录
    this.addMessage('user', message);
    this.inputElement.value = '';
    
    // 添加 AI 消息容器
    const aiMessageElement = this.addMessage('ai', '');
    
    // 创建打字机效果
    const typewriter = new AdvancedTypewriter(aiMessageElement, {
      speed: 20,
      showCursor: true
    });
    
    try {
      this.isStreaming = true;
      this.sendButton.disabled = true;
      this.inputElement.disabled = true;
      
      // 发送请求并处理流式响应
      await this.processStreamResponse(message, typewriter);
      
    } catch (error) {
      console.error('流式处理错误:', error);
      this.addMessage('error', '处理消息时发生错误: ' + (error as Error).message);
    } finally {
      this.isStreaming = false;
      this.sendButton.disabled = false;
      this.inputElement.disabled = false;
      typewriter.clear(); // 移除光标
    }
  }
  
  private async processStreamResponse(
    message: string,
    typewriter: AdvancedTypewriter
  ): Promise<void> {
    const response = await fetch('/api/advanced-langchain/stream-chat', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        messages: [{ role: 'user', content: message }],
        useRAG: true,
      }),
    });
    
    if (!response.body) {
      throw new Error('响应体为空');
    }
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    const sseProcessor = new SSEProcessor();
    
    try {
      while (true) {
        const { done, value } = await reader.read();
        
        if (done) {
          const remainingEvents = sseProcessor.flush();
          for (const event of remainingEvents) {
            await this.handleStreamEvent(event, typewriter);
          }
          break;
        }
        
        const events = sseProcessor.processChunk(value);
        for (const event of events) {
          await this.handleStreamEvent(event, typewriter);
        }
      }
    } finally {
      reader.releaseLock();
    }
  }
  
  private async handleStreamEvent(
    event: { type: string; data: string },
    typewriter: AdvancedTypewriter
  ): Promise<void> {
    if (event.type === 'data') {
      try {
        const data = JSON.parse(event.data);
        await this.handleStreamData(data, typewriter);
      } catch (error) {
        // 处理非 JSON 数据
        await typewriter.typeText(event.data);
      }
    }
  }
  
  private async handleStreamData(
    data: any,
    typewriter: AdvancedTypewriter
  ): Promise<void> {
    switch (data.type) {
      case 'text':
        await typewriter.typeText(data.content);
        break;
        
      case 'thinking':
        this.showThinkingIndicator(data.content);
        break;
        
      case 'done':
        this.hideThinkingIndicator();
        break;
        
      case 'error':
        this.addMessage('error', data.content);
        break;
    }
  }
  
  private addMessage(role: string, content: string): HTMLElement {
    const messageElement = document.createElement('div');
    messageElement.className = `message message-${role}`;
    
    const roleElement = document.createElement('div');
    roleElement.className = 'message-role';
    roleElement.textContent = role === 'user' ? '你' : role === 'ai' ? 'AI' : '错误';
    
    const contentElement = document.createElement('div');
    contentElement.className = 'message-content';
    contentElement.textContent = content;
    
    messageElement.appendChild(roleElement);
    messageElement.appendChild(contentElement);
    this.chatContainer.appendChild(messageElement);
    
    // 滚动到底部
    this.chatContainer.scrollTop = this.chatContainer.scrollHeight;
    
    return contentElement;
  }
  
  private showThinkingIndicator(message: string): void {
    let thinkingElement = document.getElementById('thinking-indicator');
    if (!thinkingElement) {
      thinkingElement = document.createElement('div');
      thinkingElement.id = 'thinking-indicator';
      thinkingElement.className = 'thinking-indicator';
      this.chatContainer.appendChild(thinkingElement);
    }
    thinkingElement.textContent = `💡 ${message}`;
    thinkingElement.style.display = 'block';
    this.chatContainer.scrollTop = this.chatContainer.scrollHeight;
  }
  
  private hideThinkingIndicator(): void {
    const thinkingElement = document.getElementById('thinking-indicator');
    if (thinkingElement) {
      thinkingElement.style.display = 'none';
    }
  }
}

// 初始化应用
document.addEventListener('DOMContentLoaded', () => {
  new StreamChatApp();
});

// 添加必要的 CSS 样式
const styles = `
  .chat-container {
    max-width: 800px;
    margin: 0 auto;
    border: 1px solid #ddd;
    border-radius: 8px;
    overflow: hidden;
  }
  
  .message {
    padding: 12px 16px;
    border-bottom: 1px solid #eee;
    display: flex;
  }
  
  .message-role {
    font-weight: bold;
    width: 80px;
    flex-shrink: 0;
  }
  
  .message-user .message-role {
    color: #007bff;
  }
  
  .message-ai .message-role {
    color: #28a745;
  }
  
  .message-error .message-role {
    color: #dc3545;
  }
  
  .message-content {
    flex-grow: 1;
    white-space: pre-wrap;
    line-height: 1.5;
  }
  
  .thinking-indicator {
    padding: 8px 16px;
    background-color: #fff3cd;
    border-bottom: 1px solid #eee;
    color: #856404;
    font-style: italic;
  }
  
  #chat-input {
    width: 100%;
    padding: 12px;
    border: none;
    border-top: 1px solid #ddd;
    resize: none;
    min-height: 60px;
  }
  
  #send-button {
    width: 100%;
    padding: 12px;
    background-color: #007bff;
    color: white;
    border: none;
    cursor: pointer;
  }
  
  #send-button:disabled {
    background-color: #6c757d;
    cursor: not-allowed;
  }
`;

// 添加样式到页面
const styleSheet = document.createElement('style');
styleSheet.textContent = styles;
document.head.appendChild(styleSheet);

总结

通过 Response.body.pipeTo(ReadableStreamDefaultReader) 和相关技术,我们可以在前端实现流畅的流式响应处理和打字机效果:

  1. 基础流处理 - 使用 ReadableStream API 处理逐步到达的数据
  2. SSE 解析 - 正确解析 Server-Sent Events 格式的响应
  3. 打字机效果 - 实现经典的逐字符显示效果
  4. 现代实现 - 使用现代 Web API 提高效率
  5. React 集成 - 在 React 组件中实现流式处理
  6. 高级效果 - 支持 HTML 内容和自定义样式
  7. 完整应用 - 构建完整的流式聊天应用

这些技术使得前端能够提供更加响应迅速和用户友好的交互体验,特别是在处理 LLM 生成内容时。

在下一章中,我们将探讨身份验证与速率限制:如何在 RunnableConfig 中传递用户信息,了解如何在流式处理中集成安全和访问控制机制。