前端消费流式响应:Response.body.pipeTo(ReadableStreamDefaultReader)
实现打字机效果
在现代 Web 应用中,流式响应为用户提供更好的交互体验。通过逐步显示 LLM 生成的内容,用户可以立即看到部分结果,而不必等待完整响应。本章将深入探讨如何在前端消费流式响应,并实现打字机效果来提升用户体验。
流式响应的基础概念
Web 流式响应基于 ReadableStream API,允许逐步接收和处理数据:
typescript
// ReadableStream 基础使用
async function consumeStreamBasic(response: Response) {
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) {
throw new Error('响应体没有可用的读取器');
}
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('流完成');
break;
}
// 解码并处理数据
const chunk = decoder.decode(value);
console.log('接收到数据块:', chunk);
}
} finally {
reader.releaseLock();
}
}Server-Sent Events (SSE) 处理
LangChain 流式 API 通常通过 SSE 提供服务,需要特殊处理:
typescript
// SSE 数据处理
class SSEProcessor {
private decoder: TextDecoder;
private buffer: string;
constructor() {
this.decoder = new TextDecoder();
this.buffer = '';
}
processChunk(value: Uint8Array): Array<{ type: string; data: string }> {
const text = this.decoder.decode(value, { stream: true });
this.buffer += text;
const events: Array<{ type: string; data: string }> = [];
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || ''; // 保留不完整的行
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6); // 移除 'data: ' 前缀
events.push({ type: 'data', data });
} else if (line.startsWith('event: ')) {
const eventType = line.slice(7); // 移除 'event: ' 前缀
// 事件类型信息可以用于后续处理
} else if (line === '') {
// 空行表示事件结束
}
}
return events;
}
flush(): Array<{ type: string; data: string }> {
if (this.buffer) {
const events = [{ type: 'data', data: this.buffer }];
this.buffer = '';
return events;
}
return [];
}
}打字机效果实现
实现经典的打字机效果来逐步显示文本:
typescript
// 打字机效果类
class TypewriterEffect {
private targetElement: HTMLElement;
private speed: number;
private callback?: () => void;
constructor(
targetElement: HTMLElement,
speed: number = 50,
callback?: () => void
) {
this.targetElement = targetElement;
this.speed = speed;
this.callback = callback;
}
async typeText(text: string): Promise<void> {
return new Promise((resolve) => {
let i = 0;
const timer = setInterval(() => {
if (i < text.length) {
this.targetElement.textContent += text.charAt(i);
i++;
} else {
clearInterval(timer);
if (this.callback) {
this.callback();
}
resolve();
}
}, this.speed);
});
}
async typeStream(stream: AsyncGenerator<string>): Promise<void> {
for await (const chunk of stream) {
await this.typeText(chunk);
}
}
clear(): void {
this.targetElement.textContent = '';
}
}
// 使用示例
/*
const outputElement = document.getElementById('output');
const typewriter = new TypewriterEffect(outputElement, 30);
// 逐步显示文本
typewriter.typeText('Hello, World!');
// 或者处理流式数据
async function* mockStream() {
const text = '这是一个流式文本示例';
for (let i = 0; i < text.length; i++) {
yield text[i];
await new Promise(resolve => setTimeout(resolve, 100));
}
}
typewriter.typeStream(mockStream());
*/现代流式处理实现
使用现代 Web API 实现更高效的流式处理:
typescript
// 现代流式处理器
class ModernStreamProcessor {
private targetElement: HTMLElement;
private onChunk?: (chunk: string) => void;
private onComplete?: () => void;
constructor(
targetElement: HTMLElement,
onChunk?: (chunk: string) => void,
onComplete?: () => void
) {
this.targetElement = targetElement;
this.onChunk = onChunk;
this.onComplete = onComplete;
}
async processResponse(response: Response): Promise<void> {
if (!response.body) {
throw new Error('响应体为空');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
const sseProcessor = new SSEProcessor();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
// 处理缓冲区中剩余的数据
const remainingEvents = sseProcessor.flush();
for (const event of remainingEvents) {
this.handleEvent(event);
}
break;
}
// 处理接收到的数据块
const events = sseProcessor.processChunk(value);
for (const event of events) {
this.handleEvent(event);
}
}
if (this.onComplete) {
this.onComplete();
}
} finally {
reader.releaseLock();
}
}
private handleEvent(event: { type: string; data: string }): void {
switch (event.type) {
case 'data':
try {
const parsedData = JSON.parse(event.data);
this.handleStreamData(parsedData);
} catch (error) {
// 如果不是 JSON,直接处理为文本
this.handleTextData(event.data);
}
break;
default:
console.warn('未知事件类型:', event.type);
}
}
private handleStreamData(data: any): void {
if (typeof data === 'object' && data !== null) {
switch (data.type) {
case 'text':
this.appendText(data.content);
if (this.onChunk) {
this.onChunk(data.content);
}
break;
case 'thinking':
this.showThinking(data.content);
break;
case 'done':
this.showCompletion(data.content);
break;
case 'error':
this.showError(data.content);
break;
default:
// 处理其他类型的数据
this.appendText(JSON.stringify(data));
}
} else {
this.appendText(String(data));
}
}
private handleTextData(text: string): void {
this.appendText(text);
if (this.onChunk) {
this.onChunk(text);
}
}
private appendText(text: string): void {
this.targetElement.textContent += text;
// 滚动到最新内容
this.targetElement.scrollTop = this.targetElement.scrollHeight;
}
private showThinking(message: string): void {
const thinkingElement = document.getElementById('thinking');
if (thinkingElement) {
thinkingElement.textContent = message;
thinkingElement.style.display = 'block';
}
}
private showCompletion(data: any): void {
const completionElement = document.getElementById('completion');
if (completionElement) {
completionElement.textContent = typeof data === 'string' ? data : JSON.stringify(data, null, 2);
}
}
private showError(message: string): void {
const errorElement = document.getElementById('error');
if (errorElement) {
errorElement.textContent = message;
errorElement.style.display = 'block';
}
}
}React 组件中的流式处理
在 React 组件中实现流式处理和打字机效果:
typescript
// React 流式处理组件
import React, { useState, useEffect, useRef } from 'react';
interface StreamData {
type: 'text' | 'thinking' | 'done' | 'error';
content: string;
timestamp: number;
}
const StreamProcessor: React.FC = () => {
const [streamContent, setStreamContent] = useState<string>('');
const [thinkingMessage, setThinkingMessage] = useState<string>('');
const [isStreaming, setIsStreaming] = useState<boolean>(false);
const [error, setError] = useState<string>('');
const contentRef = useRef<HTMLDivElement>(null);
// 打字机效果
const typeWriter = (text: string, callback?: () => void) => {
let i = 0;
const timer = setInterval(() => {
if (i < text.length) {
setStreamContent(prev => prev + text.charAt(i));
i++;
} else {
clearInterval(timer);
if (callback) callback();
}
}, 30);
return timer;
};
// 处理流式响应
const processStream = async (prompt: string) => {
setIsStreaming(true);
setStreamContent('');
setThinkingMessage('');
setError('');
try {
const response = await fetch('/api/advanced-langchain/stream-chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
messages: [{ role: 'user', content: prompt }],
useRAG: true,
}),
});
if (!response.body) {
throw new Error('响应体为空');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
const sseProcessor = new SSEProcessor();
let typeWriterTimer: NodeJS.Timeout | null = null;
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
// 处理剩余数据
const remainingEvents = sseProcessor.flush();
for (const event of remainingEvents) {
handleStreamEvent(event);
}
break;
}
// 处理数据块
const events = sseProcessor.processChunk(value);
for (const event of events) {
handleStreamEvent(event);
}
}
} finally {
reader.releaseLock();
if (typeWriterTimer) {
clearInterval(typeWriterTimer);
}
}
} catch (err) {
setError(err instanceof Error ? err.message : '发生未知错误');
} finally {
setIsStreaming(false);
}
};
const handleStreamEvent = (event: { type: string; data: string }) => {
if (event.type === 'data') {
try {
const data: StreamData = JSON.parse(event.data);
handleStreamData(data);
} catch (error) {
// 处理非 JSON 数据
setStreamContent(prev => prev + event.data);
}
}
};
const handleStreamData = (data: StreamData) => {
switch (data.type) {
case 'text':
// 使用打字机效果显示文本
typeWriter(data.content);
break;
case 'thinking':
setThinkingMessage(data.content);
break;
case 'done':
// 流完成
break;
case 'error':
setError(data.content);
break;
}
};
// 自动滚动到底部
useEffect(() => {
if (contentRef.current) {
contentRef.current.scrollTop = contentRef.current.scrollHeight;
}
}, [streamContent]);
return (
<div className="stream-processor">
<div className="controls">
<button
onClick={() => processStream('你好,能介绍一下 LangChain 吗?')}
disabled={isStreaming}
>
{isStreaming ? '流式处理中...' : '开始流式处理'}
</button>
</div>
{thinkingMessage && (
<div className="thinking">
<span className="thinking-icon">💡</span>
{thinkingMessage}
</div>
)}
<div
ref={contentRef}
className="content"
style={{
minHeight: '200px',
maxHeight: '400px',
overflowY: 'auto',
border: '1px solid #ccc',
padding: '10px',
fontFamily: 'monospace'
}}
>
{streamContent}
</div>
{error && (
<div className="error" style={{ color: 'red' }}>
错误: {error}
</div>
)}
{isStreaming && (
<div className="streaming-indicator">
<span className="spinner">⏳</span>
正在接收流式数据...
</div>
)}
</div>
);
};
export default StreamProcessor;高级打字机效果
实现更高级的打字机效果,支持 HTML 内容和自定义样式:
typescript
// 高级打字机效果类
class AdvancedTypewriter {
private targetElement: HTMLElement;
private speed: number;
private htmlMode: boolean;
private cursorElement: HTMLElement | null;
constructor(
targetElement: HTMLElement,
options: {
speed?: number;
htmlMode?: boolean;
showCursor?: boolean;
} = {}
) {
this.targetElement = targetElement;
this.speed = options.speed || 50;
this.htmlMode = options.htmlMode || false;
if (options.showCursor) {
this.cursorElement = this.createCursor();
this.targetElement.appendChild(this.cursorElement);
} else {
this.cursorElement = null;
}
}
private createCursor(): HTMLElement {
const cursor = document.createElement('span');
cursor.className = 'typewriter-cursor';
cursor.textContent = '|';
cursor.style.animation = 'blink 1s infinite';
// 添加闪烁动画样式
if (!document.querySelector('#typewriter-styles')) {
const style = document.createElement('style');
style.id = 'typewriter-styles';
style.textContent = `
@keyframes blink {
0%, 100% { opacity: 1; }
50% { opacity: 0; }
}
.typewriter-cursor {
margin-left: 2px;
}
`;
document.head.appendChild(style);
}
return cursor;
}
async typeText(text: string): Promise<void> {
return new Promise((resolve) => {
if (this.htmlMode) {
this.typeHtmlText(text, resolve);
} else {
this.typePlainText(text, resolve);
}
});
}
private typePlainText(text: string, resolve: () => void): void {
let i = 0;
const timer = setInterval(() => {
if (i < text.length) {
this.targetElement.textContent += text.charAt(i);
i++;
} else {
clearInterval(timer);
resolve();
}
}, this.speed);
}
private typeHtmlText(text: string, resolve: () => void): void {
let i = 0;
let buffer = '';
const timer = setInterval(() => {
if (i < text.length) {
buffer += text.charAt(i);
this.targetElement.innerHTML = buffer;
i++;
} else {
clearInterval(timer);
if (this.cursorElement) {
this.targetElement.removeChild(this.cursorElement);
}
resolve();
}
}, this.speed);
}
async typeStream(stream: AsyncIterable<string>): Promise<void> {
for await (const chunk of stream) {
await this.typeText(chunk);
}
}
clear(): void {
this.targetElement.textContent = '';
if (this.cursorElement) {
this.targetElement.appendChild(this.cursorElement);
}
}
}
// 使用示例
/*
const outputElement = document.getElementById('output');
const advancedTypewriter = new AdvancedTypewriter(outputElement, {
speed: 30,
htmlMode: true,
showCursor: true
});
// 显示带格式的文本
advancedTypewriter.typeText('<strong>重要信息:</strong> <em>这是一段格式化文本</em>');
// 或者处理流式数据
async function* htmlStream() {
yield '<p>第一段内容</p>';
await new Promise(resolve => setTimeout(resolve, 1000));
yield '<p>第二段内容</p>';
await new Promise(resolve => setTimeout(resolve, 1000));
yield '<ul><li>列表项1</li><li>列表项2</li></ul>';
}
advancedTypewriter.typeStream(htmlStream());
*/实际应用示例
让我们看一个完整的实际应用示例,展示如何在 Web 应用中实现流式响应处理和打字机效果:
typescript
// 完整的流式聊天应用
class StreamChatApp {
private chatContainer: HTMLElement;
private inputElement: HTMLTextAreaElement;
private sendButton: HTMLButtonElement;
private isStreaming: boolean = false;
constructor() {
this.chatContainer = document.getElementById('chat-container')!;
this.inputElement = document.getElementById('chat-input') as HTMLTextAreaElement;
this.sendButton = document.getElementById('send-button') as HTMLButtonElement;
this.bindEvents();
}
private bindEvents(): void {
this.sendButton.addEventListener('click', () => this.sendMessage());
this.inputElement.addEventListener('keypress', (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
this.sendMessage();
}
});
}
private async sendMessage(): Promise<void> {
if (this.isStreaming) return;
const message = this.inputElement.value.trim();
if (!message) return;
// 添加用户消息到聊天记录
this.addMessage('user', message);
this.inputElement.value = '';
// 添加 AI 消息容器
const aiMessageElement = this.addMessage('ai', '');
// 创建打字机效果
const typewriter = new AdvancedTypewriter(aiMessageElement, {
speed: 20,
showCursor: true
});
try {
this.isStreaming = true;
this.sendButton.disabled = true;
this.inputElement.disabled = true;
// 发送请求并处理流式响应
await this.processStreamResponse(message, typewriter);
} catch (error) {
console.error('流式处理错误:', error);
this.addMessage('error', '处理消息时发生错误: ' + (error as Error).message);
} finally {
this.isStreaming = false;
this.sendButton.disabled = false;
this.inputElement.disabled = false;
typewriter.clear(); // 移除光标
}
}
private async processStreamResponse(
message: string,
typewriter: AdvancedTypewriter
): Promise<void> {
const response = await fetch('/api/advanced-langchain/stream-chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
messages: [{ role: 'user', content: message }],
useRAG: true,
}),
});
if (!response.body) {
throw new Error('响应体为空');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
const sseProcessor = new SSEProcessor();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
const remainingEvents = sseProcessor.flush();
for (const event of remainingEvents) {
await this.handleStreamEvent(event, typewriter);
}
break;
}
const events = sseProcessor.processChunk(value);
for (const event of events) {
await this.handleStreamEvent(event, typewriter);
}
}
} finally {
reader.releaseLock();
}
}
private async handleStreamEvent(
event: { type: string; data: string },
typewriter: AdvancedTypewriter
): Promise<void> {
if (event.type === 'data') {
try {
const data = JSON.parse(event.data);
await this.handleStreamData(data, typewriter);
} catch (error) {
// 处理非 JSON 数据
await typewriter.typeText(event.data);
}
}
}
private async handleStreamData(
data: any,
typewriter: AdvancedTypewriter
): Promise<void> {
switch (data.type) {
case 'text':
await typewriter.typeText(data.content);
break;
case 'thinking':
this.showThinkingIndicator(data.content);
break;
case 'done':
this.hideThinkingIndicator();
break;
case 'error':
this.addMessage('error', data.content);
break;
}
}
private addMessage(role: string, content: string): HTMLElement {
const messageElement = document.createElement('div');
messageElement.className = `message message-${role}`;
const roleElement = document.createElement('div');
roleElement.className = 'message-role';
roleElement.textContent = role === 'user' ? '你' : role === 'ai' ? 'AI' : '错误';
const contentElement = document.createElement('div');
contentElement.className = 'message-content';
contentElement.textContent = content;
messageElement.appendChild(roleElement);
messageElement.appendChild(contentElement);
this.chatContainer.appendChild(messageElement);
// 滚动到底部
this.chatContainer.scrollTop = this.chatContainer.scrollHeight;
return contentElement;
}
private showThinkingIndicator(message: string): void {
let thinkingElement = document.getElementById('thinking-indicator');
if (!thinkingElement) {
thinkingElement = document.createElement('div');
thinkingElement.id = 'thinking-indicator';
thinkingElement.className = 'thinking-indicator';
this.chatContainer.appendChild(thinkingElement);
}
thinkingElement.textContent = `💡 ${message}`;
thinkingElement.style.display = 'block';
this.chatContainer.scrollTop = this.chatContainer.scrollHeight;
}
private hideThinkingIndicator(): void {
const thinkingElement = document.getElementById('thinking-indicator');
if (thinkingElement) {
thinkingElement.style.display = 'none';
}
}
}
// 初始化应用
document.addEventListener('DOMContentLoaded', () => {
new StreamChatApp();
});
// 添加必要的 CSS 样式
const styles = `
.chat-container {
max-width: 800px;
margin: 0 auto;
border: 1px solid #ddd;
border-radius: 8px;
overflow: hidden;
}
.message {
padding: 12px 16px;
border-bottom: 1px solid #eee;
display: flex;
}
.message-role {
font-weight: bold;
width: 80px;
flex-shrink: 0;
}
.message-user .message-role {
color: #007bff;
}
.message-ai .message-role {
color: #28a745;
}
.message-error .message-role {
color: #dc3545;
}
.message-content {
flex-grow: 1;
white-space: pre-wrap;
line-height: 1.5;
}
.thinking-indicator {
padding: 8px 16px;
background-color: #fff3cd;
border-bottom: 1px solid #eee;
color: #856404;
font-style: italic;
}
#chat-input {
width: 100%;
padding: 12px;
border: none;
border-top: 1px solid #ddd;
resize: none;
min-height: 60px;
}
#send-button {
width: 100%;
padding: 12px;
background-color: #007bff;
color: white;
border: none;
cursor: pointer;
}
#send-button:disabled {
background-color: #6c757d;
cursor: not-allowed;
}
`;
// 添加样式到页面
const styleSheet = document.createElement('style');
styleSheet.textContent = styles;
document.head.appendChild(styleSheet);总结
通过 Response.body.pipeTo(ReadableStreamDefaultReader) 和相关技术,我们可以在前端实现流畅的流式响应处理和打字机效果:
- 基础流处理 - 使用 ReadableStream API 处理逐步到达的数据
- SSE 解析 - 正确解析 Server-Sent Events 格式的响应
- 打字机效果 - 实现经典的逐字符显示效果
- 现代实现 - 使用现代 Web API 提高效率
- React 集成 - 在 React 组件中实现流式处理
- 高级效果 - 支持 HTML 内容和自定义样式
- 完整应用 - 构建完整的流式聊天应用
这些技术使得前端能够提供更加响应迅速和用户友好的交互体验,特别是在处理 LLM 生成内容时。
在下一章中,我们将探讨身份验证与速率限制:如何在 RunnableConfig 中传递用户信息,了解如何在流式处理中集成安全和访问控制机制。