流式实现:stream() 返回 ReadableStream<ChatCompletionChunk>
分块解析 SSE 响应,逐个 yield token
在现代 LLM 应用中,流式响应是一个关键特性,它允许用户在生成过程中逐步看到结果,提供更好的交互体验。LangChain V3 通过 stream() 方法实现了对流式响应的完整支持,能够处理 Server-Sent Events (SSE) 格式的响应,并逐个生成 token。本章将深入探讨流式实现的机制和最佳实践。
流式响应的重要性
流式响应在 LLM 应用中有以下几个重要作用:
- 用户体验 - 用户可以立即看到部分结果,无需等待完整生成
- 感知性能 - 即使总时间相同,逐步显示会让用户感觉更快
- 实时交互 - 支持实时打断、修改等交互操作
- 资源效率 - 不需要等待完整响应就可以开始处理
ReadableStream 和 AsyncGenerator
LangChain V3 的流式实现基于两种主要的流式数据处理方式:
ReadableStream
是 Web Streams API 的一部分,提供了标准化的流式数据处理接口:
interface ReadableStream<R = any> {
getReader(): ReadableStreamDefaultReader<R>;
tee(): [ReadableStream<R>, ReadableStream<R>];
// 其他方法...
}
interface ReadableStreamDefaultReader<R = any> {
read(): Promise<ReadableStreamReadResult<R>>;
releaseLock(): void;
cancel(reason?: any): Promise<void>;
}
type ReadableStreamReadResult<T> =
| { done: false; value: T; }
| { done: true; value?: undefined; };AsyncGenerator
AsyncGenerator 是 JavaScript 中处理异步迭代的原生方式:
interface AsyncGenerator<T = unknown, TReturn = any, TNext = unknown>
extends AsyncIterator<T, TReturn, TNext> {
next(...args: [] | [TNext]): Promise<IteratorResult<T, TReturn>>;
return(value: TReturn | PromiseLike<TReturn>): Promise<IteratorResult<T, TReturn>>;
throw(e: any): Promise<IteratorResult<T, TReturn>>;
}ChatCompletionChunk 接口
在聊天模型中,流式响应的基本单位是 ChatCompletionChunk:
interface ChatCompletionChunk {
id: string;
object: string;
created: number;
model: string;
choices: Array<{
index: number;
delta: {
role?: string;
content?: string;
};
finish_reason: string | null;
}>;
}流式实现的核心机制
让我们深入探讨 LangChain V3 中流式实现的核心机制:
基础流式实现
class BaseChatModel {
async *stream(
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<string> {
const stream = this.streamGenerate([messages], options);
for await (const chunk of stream) {
// 从 chunk 中提取文本内容
const text = this.extractTextFromChunk(chunk);
if (text) {
yield text;
}
}
}
protected extractTextFromChunk(chunk: LLMResult): string {
// 从 LLMResult 中提取文本
return chunk.generations[0]?.[0]?.text || '';
}
}OpenAI 流式实现
class ChatOpenAI extends BaseChatModel {
protected async *_streamCall(
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<ChatCompletionChunk> {
// 发起流式请求
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: this.modelName,
messages,
temperature: options?.temperature,
max_tokens: options?.maxTokens,
stream: true // 启用流式响应
}),
signal: options?.signal
});
// 处理 SSE 响应
yield* this.processSSEStream(response);
}
private async *processSSEStream(response: Response): AsyncGenerator<ChatCompletionChunk> {
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) {
throw new Error('无法获取响应流');
}
try {
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// 解码接收到的数据
buffer += decoder.decode(value, { stream: true });
// 处理完整的行
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 保留不完整的行
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6); // 移除 'data: ' 前缀
if (data === '[DONE]') {
// 流结束标记
return;
}
try {
const chunk: ChatCompletionChunk = JSON.parse(data);
yield chunk;
} catch (error) {
console.warn('解析 SSE 数据失败:', error);
}
}
}
}
} finally {
reader.releaseLock();
}
}
}处理分块文本
在流式响应中,文本内容通常以小块形式逐步返回:
class StreamingTextProcessor {
private buffer: string = '';
private isFirstChunk: boolean = true;
processChunk(chunk: ChatCompletionChunk): string[] {
const texts: string[] = [];
for (const choice of chunk.choices) {
if (choice.delta?.content) {
const content = choice.delta.content;
// 处理特殊字符和格式
const processedContent = this.processContent(content);
texts.push(processedContent);
}
}
return texts;
}
private processContent(content: string): string {
// 处理特殊字符
let processed = content;
// 处理 Unicode 代理对
if (this.buffer) {
processed = this.buffer + processed;
this.buffer = '';
}
// 检查是否有不完整的 Unicode 字符
const lastChar = processed.charCodeAt(processed.length - 1);
if (lastChar >= 0xD800 && lastChar <= 0xDBFF) {
// 高代理项,需要等待低代理项
this.buffer = processed.slice(-1);
processed = processed.slice(0, -1);
}
return processed;
}
}流式响应的消费
在 Node.js 环境中消费流
async function consumeStreamInNode(model: BaseChatModel, messages: BaseMessage[]) {
const stream = await model.stream(messages);
// 方式1: 使用 for-await-of
for await (const chunk of stream) {
process.stdout.write(chunk);
}
// 方式2: 手动迭代
const reader = stream[Symbol.asyncIterator]();
while (true) {
const { value, done } = await reader.next();
if (done) break;
process.stdout.write(value);
}
}在浏览器环境中消费流
async function consumeStreamInBrowser(model: BaseChatModel, messages: BaseMessage[]) {
const stream = await model.stream(messages);
const textElement = document.getElementById('output');
if (!textElement) return;
for await (const chunk of stream) {
textElement.textContent += chunk;
// 滚动到最新内容
textElement.scrollTop = textElement.scrollHeight;
}
}转换为 ReadableStream
class BaseChatModel {
toReadableStream(
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): ReadableStream<string> {
return new ReadableStream({
async start(controller) {
try {
const stream = this.stream(messages, options);
for await (const chunk of stream) {
controller.enqueue(chunk);
}
controller.close();
} catch (error) {
controller.error(error);
}
}
});
}
}错误处理和中断
流式处理中的错误处理和中断机制非常重要:
class RobustStreamingProcessor {
async *processStream(
model: BaseChatModel,
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<string> {
const controller = new AbortController();
const timeoutId = options?.timeout
? setTimeout(() => controller.abort(), options.timeout)
: null;
try {
const streamOptions = {
...options,
signal: controller.signal
};
const stream = await model.stream(messages, streamOptions);
for await (const chunk of stream) {
// 检查是否已被中断
if (controller.signal.aborted) {
throw new Error('流处理已被中断');
}
yield chunk;
}
} catch (error) {
if (error.name === 'AbortError') {
throw new Error('流处理超时');
}
throw error;
} finally {
if (timeoutId) {
clearTimeout(timeoutId);
}
}
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何在 Web 应用中使用流式响应:
// 创建一个聊天应用的流式处理器
class ChatStreamHandler {
private model: BaseChatModel;
private messageHistory: BaseMessage[] = [];
constructor(model: BaseChatModel) {
this.model = model;
}
async *processUserMessage(userMessage: string): AsyncGenerator<{
type: 'content' | 'complete' | 'error';
data: string;
}> {
try {
// 添加用户消息到历史
this.messageHistory.push({
role: 'user',
content: userMessage
});
// 生成流式响应
const stream = await this.model.stream(this.messageHistory);
let fullResponse = '';
// 逐个处理响应块
for await (const chunk of stream) {
fullResponse += chunk;
yield { type: 'content', data: chunk };
}
// 添加助手响应到历史
this.messageHistory.push({
role: 'assistant',
content: fullResponse
});
yield { type: 'complete', data: '' };
} catch (error) {
yield { type: 'error', data: error.message };
}
}
}
// 在前端组件中使用
class ChatComponent {
private chatHandler: ChatStreamHandler;
private outputElement: HTMLElement;
constructor(model: BaseChatModel) {
this.chatHandler = new ChatStreamHandler(model);
this.outputElement = document.getElementById('chat-output')!;
}
async sendMessage(userMessage: string) {
// 显示用户消息
this.displayMessage('user', userMessage);
// 创建新的响应容器
const responseContainer = this.createResponseContainer();
// 处理流式响应
let fullResponse = '';
for await (const { type, data } of this.chatHandler.processUserMessage(userMessage)) {
switch (type) {
case 'content':
fullResponse += data;
responseContainer.textContent = fullResponse;
// 滚动到最新内容
responseContainer.scrollIntoView({ behavior: 'smooth' });
break;
case 'complete':
// 处理完成
console.log('响应完成');
break;
case 'error':
// 处理错误
this.displayError(data);
break;
}
}
}
private displayMessage(role: string, content: string) {
const messageElement = document.createElement('div');
messageElement.className = `message ${role}`;
messageElement.textContent = content;
this.outputElement.appendChild(messageElement);
}
private createResponseContainer(): HTMLElement {
const container = document.createElement('div');
container.className = 'message assistant';
this.outputElement.appendChild(container);
return container;
}
private displayError(error: string) {
const errorElement = document.createElement('div');
errorElement.className = 'message error';
errorElement.textContent = `错误: ${error}`;
this.outputElement.appendChild(errorElement);
}
}性能优化考虑
在实现流式处理时,需要考虑以下性能优化:
class OptimizedStreamingProcessor {
private chunkBuffer: string[] = [];
private bufferSize: number = 5;
private debounceTimer: NodeJS.Timeout | null = null;
async *processWithBuffering(
stream: AsyncGenerator<string>
): AsyncGenerator<string> {
for await (const chunk of stream) {
this.chunkBuffer.push(chunk);
// 当缓冲区满时或经过一定时间后输出
if (this.chunkBuffer.length >= this.bufferSize) {
yield this.flushBuffer();
} else if (!this.debounceTimer) {
this.debounceTimer = setTimeout(() => {
if (this.chunkBuffer.length > 0) {
yield this.flushBuffer();
}
this.debounceTimer = null;
}, 100);
}
}
// 输出剩余的缓冲区内容
if (this.chunkBuffer.length > 0) {
yield this.flushBuffer();
}
}
private flushBuffer(): string {
const content = this.chunkBuffer.join('');
this.chunkBuffer = [];
return content;
}
}总结
LangChain V3 的流式实现通过 stream() 方法返回 ReadableStream<ChatCompletionChunk>,为开发者提供了强大的流式处理能力。关键要点包括:
- SSE 解析 - 正确解析 Server-Sent Events 格式的响应
- 分块处理 - 逐个处理和生成 token
- 错误处理 - 完善的错误处理和中断机制
- 性能优化 - 缓冲和防抖等优化技术
- 跨环境支持 - 同时支持 Node.js 和浏览器环境
通过这些机制,LangChain V3 能够提供流畅、响应迅速的 LLM 交互体验,满足现代应用对实时性的要求。
在下一章中,我们将探讨 Token 计算与截断:如何集成 tiktoken 或 sentencepiece,进一步优化模型调用。