LLM & ChatModel 统一接口:BaseLanguageModel 抽象 generate, streamGenerate
在 LangChain 的演进过程中,语言模型接口的统一是一个重要进步。LangChain V3 通过 BaseLanguageModel 抽象类为各种语言模型提供了统一的接口,包括传统的 和现代的 ChatModel。这种统一不仅简化了开发者的学习和使用成本,还增强了系统的可扩展性和可维护性。本章将深入探讨 BaseLanguageModel 的设计和实现。
BaseLanguageModel 的设计理念
BaseLanguageModel 的核心设计理念是提供一个统一的抽象层,隐藏不同语言模型实现之间的差异:
typescript
abstract class BaseLanguageModel implements Runnable<string | BaseMessage[], string> {
// 核心生成方法
abstract generate(
prompts: string[] | BaseMessage[][],
options?: BaseLanguageModelCallOptions
): Promise<LLMResult>;
// 流式生成方法
abstract streamGenerate(
prompts: string[] | BaseMessage[][],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<LLMResult>;
// Runnable 接口实现
async invoke(input: string | BaseMessage[]): Promise<string> {
const result = await this.generate([input]);
return this.extractOutput(result);
}
async batch(inputs: (string | BaseMessage[])[]): Promise<string[]> {
const result = await this.generate(inputs);
return this.extractOutputs(result);
}
async *stream(input: string | BaseMessage[]): AsyncGenerator<string> {
const stream = this.streamGenerate([input]);
for await (const chunk of stream) {
const output = this.extractOutput(chunk);
yield output;
}
}
protected extractOutput(result: LLMResult): string {
return result.generations[0][0].text;
}
protected extractOutputs(result: LLMResult): string[] {
return result.generations.map(generations => generations[0].text);
}
}统一接口的核心方法
generate 方法
generate 方法是 BaseLanguageModel 的核心,负责处理文本生成任务:
typescript
interface BaseLanguageModelCallOptions {
stop?: string[];
timeout?: number;
signal?: AbortSignal;
temperature?: number;
maxTokens?: number;
[key: string]: any;
}
interface LLMResult {
generations: Generation[][];
llmOutput?: Record<string, any>;
}
interface Generation {
text: string;
message?: BaseMessage;
generationInfo?: Record<string, any>;
}
abstract class BaseLanguageModel {
abstract generate(
prompts: string[] | BaseMessage[][],
options?: BaseLanguageModelCallOptions
): Promise<LLMResult>;
}streamGenerate 方法
streamGenerate 方法支持流式生成,允许逐步获取生成结果:
typescript
abstract class BaseLanguageModel {
abstract streamGenerate(
prompts: string[] | BaseMessage[][],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<LLMResult>;
}LLM 实现
传统的语言模型(LLM)主要处理字符串输入:
typescript
class BaseLLM extends BaseLanguageModel {
async generate(
prompts: string[],
options?: BaseLanguageModelCallOptions
): Promise<LLMResult> {
// 调用具体的 LLM 实现
const generations: Generation[][] = [];
for (const prompt of prompts) {
const text = await this._call(prompt, options);
generations.push([{
text,
generationInfo: {}
}]);
}
return {
generations,
llmOutput: {}
};
}
protected abstract _call(
prompt: string,
options?: BaseLanguageModelCallOptions
): Promise<string>;
async streamGenerate(
prompts: string[],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<LLMResult> {
// 对于传统 LLM,流式生成可能需要模拟
const result = await this.generate(prompts, options);
yield result;
}
}
// 具体的 LLM 实现示例
class OpenAI extends BaseLLM {
private apiKey: string;
private modelName: string;
constructor(config: { apiKey: string; modelName: string }) {
super();
this.apiKey = config.apiKey;
this.modelName = config.modelName;
}
protected async _call(
prompt: string,
options?: BaseLanguageModelCallOptions
): Promise<string> {
const response = await fetch('https://api.openai.com/v1/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: this.modelName,
prompt,
temperature: options?.temperature,
max_tokens: options?.maxTokens,
stop: options?.stop
}),
signal: options?.signal
});
const data = await response.json();
return data.choices[0].text;
}
}ChatModel 实现
现代的聊天模型(ChatModel)处理消息列表输入:
typescript
type BaseMessage = {
role: string;
content: string;
};
class BaseChatModel extends BaseLanguageModel {
async generate(
messageSets: BaseMessage[][],
options?: BaseLanguageModelCallOptions
): Promise<LLMResult> {
const generations: Generation[][] = [];
for (const messages of messageSets) {
const response = await this._call(messages, options);
generations.push([{
text: response.content,
message: response,
generationInfo: {}
}]);
}
return {
generations,
llmOutput: {}
};
}
protected abstract _call(
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): Promise<BaseMessage>;
async streamGenerate(
messageSets: BaseMessage[][],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<LLMResult> {
for (const messages of messageSets) {
const stream = this._streamCall(messages, options);
for await (const chunk of stream) {
yield {
generations: [[{
text: chunk.content,
message: chunk,
generationInfo: {}
}]],
llmOutput: {}
};
}
}
}
protected abstract _streamCall(
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<BaseMessage>;
}
// 具体的 ChatModel 实现示例
class ChatOpenAI extends BaseChatModel {
private apiKey: string;
private modelName: string;
constructor(config: { apiKey: string; modelName: string }) {
super();
this.apiKey = config.apiKey;
this.modelName = config.modelName;
}
protected async _call(
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): Promise<BaseMessage> {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: this.modelName,
messages,
temperature: options?.temperature,
max_tokens: options?.maxTokens,
stop: options?.stop
}),
signal: options?.signal
});
const data = await response.json();
return {
role: 'assistant',
content: data.choices[0].message.content
};
}
protected async *_streamCall(
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<BaseMessage> {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: this.modelName,
messages,
temperature: options?.temperature,
max_tokens: options?.maxTokens,
stop: options?.stop,
stream: true
}),
signal: options?.signal
});
// 处理流式响应
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (reader) {
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 解析 SSE 格式的响应
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
return;
}
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content;
if (content) {
yield {
role: 'assistant',
content
};
}
} catch (e) {
// 忽略解析错误
}
}
}
}
} finally {
reader.releaseLock();
}
}
}
}本地模型支持
统一接口也支持本地模型,如 Ollama:
typescript
class Ollama extends BaseChatModel {
private baseUrl: string;
private modelName: string;
constructor(config: { baseUrl?: string; modelName: string }) {
super();
this.baseUrl = config.baseUrl || 'http://localhost:11434';
this.modelName = config.modelName;
}
protected async _call(
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): Promise<BaseMessage> {
const response = await fetch(`${this.baseUrl}/api/chat`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: this.modelName,
messages,
stream: false,
options: {
temperature: options?.temperature,
num_predict: options?.maxTokens
}
}),
signal: options?.signal
});
const data = await response.json();
return {
role: 'assistant',
content: data.message.content
};
}
protected async *_streamCall(
messages: BaseMessage[],
options?: BaseLanguageModelCallOptions
): AsyncGenerator<BaseMessage> {
const response = await fetch(`${this.baseUrl}/api/chat`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: this.modelName,
messages,
stream: true,
options: {
temperature: options?.temperature,
num_predict: options?.maxTokens
}
}),
signal: options?.signal
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (reader) {
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.trim()) {
try {
const data = JSON.parse(line);
if (data.message?.content) {
yield {
role: 'assistant',
content: data.message.content
};
}
} catch (e) {
// 忽略解析错误
}
}
}
}
} finally {
reader.releaseLock();
}
}
}
}实际应用示例
让我们看一个使用统一接口的实际应用示例:
typescript
// 创建一个通用的语言模型处理器
class LanguageModelProcessor {
private model: BaseLanguageModel;
constructor(model: BaseLanguageModel) {
this.model = model;
}
async processText(input: string): Promise<string> {
// 使用统一的 invoke 接口
return await this.model.invoke(input);
}
async processBatch(inputs: string[]): Promise<string[]> {
// 使用统一的 batch 接口
return await this.model.batch(inputs);
}
async *processStream(input: string): AsyncGenerator<string> {
// 使用统一的 stream 接口
for await (const chunk of this.model.stream(input)) {
yield chunk;
}
}
async processChat(messages: BaseMessage[]): Promise<BaseMessage> {
// 使用统一的 invoke 接口处理聊天消息
const response = await this.model.invoke(messages);
return {
role: 'assistant',
content: response
};
}
}
// 使用不同的模型
const openAIProcessor = new LanguageModelProcessor(
new ChatOpenAI({
apiKey: process.env.OPENAI_API_KEY!,
modelName: 'gpt-3.5-turbo'
})
);
const ollamaProcessor = new LanguageModelProcessor(
new Ollama({
modelName: 'llama2'
})
);
// 同样的接口,不同的实现
const openAIResult = await openAIProcessor.processText("写一首关于春天的诗");
const ollamaResult = await ollamaProcessor.processText("写一首关于春天的诗");
// 流式处理
for await (const chunk of openAIProcessor.processStream("讲一个长故事")) {
process.stdout.write(chunk);
}
// 聊天处理
const chatMessages: BaseMessage[] = [
{ role: 'system', content: '你是一个 helpful assistant' },
{ role: 'user', content: '你好,今天天气怎么样?' }
];
const chatResponse = await openAIProcessor.processChat(chatMessages);
console.log(chatResponse.content);错误处理和配置管理
统一接口还提供了统一的错误处理和配置管理:
typescript
class BaseLanguageModel {
protected timeout?: number;
protected maxRetries: number = 3;
protected retryDelay: number = 1000;
constructor(config?: {
timeout?: number;
maxRetries?: number;
retryDelay?: number
}) {
this.timeout = config?.timeout;
this.maxRetries = config?.maxRetries ?? this.maxRetries;
this.retryDelay = config?.retryDelay ?? this.retryDelay;
}
async generate(
prompts: string[] | BaseMessage[][],
options?: BaseLanguageModelCallOptions
): Promise<LLMResult> {
// 应用默认配置
const mergedOptions = {
timeout: this.timeout,
...options
};
// 实现重试机制
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
return await this._generateWithTimeout(prompts, mergedOptions);
} catch (error) {
if (attempt === this.maxRetries) {
throw error;
}
// 指数退避
await new Promise(resolve =>
setTimeout(resolve, this.retryDelay * Math.pow(2, attempt))
);
}
}
throw new Error("Unreachable");
}
private async _generateWithTimeout(
prompts: string[] | BaseMessage[][],
options?: BaseLanguageModelCallOptions
): Promise<LLMResult> {
if (options?.timeout) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), options.timeout);
try {
const result = await this._generate(prompts, {
...options,
signal: controller.signal
});
return result;
} finally {
clearTimeout(timeoutId);
}
}
return await this._generate(prompts, options);
}
protected abstract _generate(
prompts: string[] | BaseMessage[][],
options?: BaseLanguageModelCallOptions
): Promise<LLMResult>;
}总结
BaseLanguageModel 通过提供统一的抽象接口,成功地将传统 和现代 ChatModel 整合在一起。这种设计带来了以下优势:
- 接口统一 - 开发者可以使用相同的接口操作不同类型的模型
- 可扩展性强 - 可以轻松添加新的模型实现
- 功能完整 - 支持 generate、streamGenerate、batch 等多种操作模式
- 错误处理 - 提供统一的错误处理和重试机制
- 配置管理 - 支持统一的配置管理和超时控制
通过这种统一接口,LangChain V3 为构建灵活、可扩展的 LLM 应用提供了坚实的基础。
在下一章中,我们将探讨流式实现:stream() 返回 ReadableStream<ChatCompletionChunk>,深入了解分块解析 SSE 响应的机制。