Skip to content

LLM & ChatModel 统一接口:BaseLanguageModel 抽象 generate, streamGenerate

在 LangChain 的演进过程中,语言模型接口的统一是一个重要进步。LangChain V3 通过 BaseLanguageModel 抽象类为各种语言模型提供了统一的接口,包括传统的 和现代的 ChatModel。这种统一不仅简化了开发者的学习和使用成本,还增强了系统的可扩展性和可维护性。本章将深入探讨 BaseLanguageModel 的设计和实现。

BaseLanguageModel 的设计理念

BaseLanguageModel 的核心设计理念是提供一个统一的抽象层,隐藏不同语言模型实现之间的差异:

typescript
abstract class BaseLanguageModel implements Runnable<string | BaseMessage[], string> {
  // 核心生成方法
  abstract generate(
    prompts: string[] | BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): Promise<LLMResult>;
  
  // 流式生成方法
  abstract streamGenerate(
    prompts: string[] | BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<LLMResult>;
  
  // Runnable 接口实现
  async invoke(input: string | BaseMessage[]): Promise<string> {
    const result = await this.generate([input]);
    return this.extractOutput(result);
  }
  
  async batch(inputs: (string | BaseMessage[])[]): Promise<string[]> {
    const result = await this.generate(inputs);
    return this.extractOutputs(result);
  }
  
  async *stream(input: string | BaseMessage[]): AsyncGenerator<string> {
    const stream = this.streamGenerate([input]);
    for await (const chunk of stream) {
      const output = this.extractOutput(chunk);
      yield output;
    }
  }
  
  protected extractOutput(result: LLMResult): string {
    return result.generations[0][0].text;
  }
  
  protected extractOutputs(result: LLMResult): string[] {
    return result.generations.map(generations => generations[0].text);
  }
}

统一接口的核心方法

generate 方法

generate 方法是 BaseLanguageModel 的核心,负责处理文本生成任务:

typescript
interface BaseLanguageModelCallOptions {
  stop?: string[];
  timeout?: number;
  signal?: AbortSignal;
  temperature?: number;
  maxTokens?: number;
  [key: string]: any;
}

interface LLMResult {
  generations: Generation[][];
  llmOutput?: Record<string, any>;
}

interface Generation {
  text: string;
  message?: BaseMessage;
  generationInfo?: Record<string, any>;
}

abstract class BaseLanguageModel {
  abstract generate(
    prompts: string[] | BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): Promise<LLMResult>;
}

streamGenerate 方法

streamGenerate 方法支持流式生成,允许逐步获取生成结果:

typescript
abstract class BaseLanguageModel {
  abstract streamGenerate(
    prompts: string[] | BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<LLMResult>;
}

LLM 实现

传统的语言模型(LLM)主要处理字符串输入:

typescript
class BaseLLM extends BaseLanguageModel {
  async generate(
    prompts: string[],
    options?: BaseLanguageModelCallOptions
  ): Promise<LLMResult> {
    // 调用具体的 LLM 实现
    const generations: Generation[][] = [];
    
    for (const prompt of prompts) {
      const text = await this._call(prompt, options);
      generations.push([{
        text,
        generationInfo: {}
      }]);
    }
    
    return {
      generations,
      llmOutput: {}
    };
  }
  
  protected abstract _call(
    prompt: string,
    options?: BaseLanguageModelCallOptions
  ): Promise<string>;
  
  async streamGenerate(
    prompts: string[],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<LLMResult> {
    // 对于传统 LLM,流式生成可能需要模拟
    const result = await this.generate(prompts, options);
    yield result;
  }
}

// 具体的 LLM 实现示例
class OpenAI extends BaseLLM {
  private apiKey: string;
  private modelName: string;
  
  constructor(config: { apiKey: string; modelName: string }) {
    super();
    this.apiKey = config.apiKey;
    this.modelName = config.modelName;
  }
  
  protected async _call(
    prompt: string,
    options?: BaseLanguageModelCallOptions
  ): Promise<string> {
    const response = await fetch('https://api.openai.com/v1/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: this.modelName,
        prompt,
        temperature: options?.temperature,
        max_tokens: options?.maxTokens,
        stop: options?.stop
      }),
      signal: options?.signal
    });
    
    const data = await response.json();
    return data.choices[0].text;
  }
}

ChatModel 实现

现代的聊天模型(ChatModel)处理消息列表输入:

typescript
type BaseMessage = {
  role: string;
  content: string;
};

class BaseChatModel extends BaseLanguageModel {
  async generate(
    messageSets: BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): Promise<LLMResult> {
    const generations: Generation[][] = [];
    
    for (const messages of messageSets) {
      const response = await this._call(messages, options);
      generations.push([{
        text: response.content,
        message: response,
        generationInfo: {}
      }]);
    }
    
    return {
      generations,
      llmOutput: {}
    };
  }
  
  protected abstract _call(
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): Promise<BaseMessage>;
  
  async streamGenerate(
    messageSets: BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<LLMResult> {
    for (const messages of messageSets) {
      const stream = this._streamCall(messages, options);
      
      for await (const chunk of stream) {
        yield {
          generations: [[{
            text: chunk.content,
            message: chunk,
            generationInfo: {}
          }]],
          llmOutput: {}
        };
      }
    }
  }
  
  protected abstract _streamCall(
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<BaseMessage>;
}

// 具体的 ChatModel 实现示例
class ChatOpenAI extends BaseChatModel {
  private apiKey: string;
  private modelName: string;
  
  constructor(config: { apiKey: string; modelName: string }) {
    super();
    this.apiKey = config.apiKey;
    this.modelName = config.modelName;
  }
  
  protected async _call(
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): Promise<BaseMessage> {
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: this.modelName,
        messages,
        temperature: options?.temperature,
        max_tokens: options?.maxTokens,
        stop: options?.stop
      }),
      signal: options?.signal
    });
    
    const data = await response.json();
    return {
      role: 'assistant',
      content: data.choices[0].message.content
    };
  }
  
  protected async *_streamCall(
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<BaseMessage> {
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: this.modelName,
        messages,
        temperature: options?.temperature,
        max_tokens: options?.maxTokens,
        stop: options?.stop,
        stream: true
      }),
      signal: options?.signal
    });
    
    // 处理流式响应
    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
    
    if (reader) {
      try {
        while (true) {
          const { done, value } = await reader.read();
          if (done) break;
          
          const chunk = decoder.decode(value);
          // 解析 SSE 格式的响应
          const lines = chunk.split('\n');
          
          for (const line of lines) {
            if (line.startsWith('data: ')) {
              const data = line.slice(6);
              if (data === '[DONE]') {
                return;
              }
              
              try {
                const parsed = JSON.parse(data);
                const content = parsed.choices[0]?.delta?.content;
                if (content) {
                  yield {
                    role: 'assistant',
                    content
                  };
                }
              } catch (e) {
                // 忽略解析错误
              }
            }
          }
        }
      } finally {
        reader.releaseLock();
      }
    }
  }
}

本地模型支持

统一接口也支持本地模型,如 Ollama:

typescript
class Ollama extends BaseChatModel {
  private baseUrl: string;
  private modelName: string;
  
  constructor(config: { baseUrl?: string; modelName: string }) {
    super();
    this.baseUrl = config.baseUrl || 'http://localhost:11434';
    this.modelName = config.modelName;
  }
  
  protected async _call(
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): Promise<BaseMessage> {
    const response = await fetch(`${this.baseUrl}/api/chat`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: this.modelName,
        messages,
        stream: false,
        options: {
          temperature: options?.temperature,
          num_predict: options?.maxTokens
        }
      }),
      signal: options?.signal
    });
    
    const data = await response.json();
    return {
      role: 'assistant',
      content: data.message.content
    };
  }
  
  protected async *_streamCall(
    messages: BaseMessage[],
    options?: BaseLanguageModelCallOptions
  ): AsyncGenerator<BaseMessage> {
    const response = await fetch(`${this.baseUrl}/api/chat`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: this.modelName,
        messages,
        stream: true,
        options: {
          temperature: options?.temperature,
          num_predict: options?.maxTokens
        }
      }),
      signal: options?.signal
    });
    
    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
    
    if (reader) {
      try {
        while (true) {
          const { done, value } = await reader.read();
          if (done) break;
          
          const chunk = decoder.decode(value);
          const lines = chunk.split('\n');
          
          for (const line of lines) {
            if (line.trim()) {
              try {
                const data = JSON.parse(line);
                if (data.message?.content) {
                  yield {
                    role: 'assistant',
                    content: data.message.content
                  };
                }
              } catch (e) {
                // 忽略解析错误
              }
            }
          }
        }
      } finally {
        reader.releaseLock();
      }
    }
  }
}

实际应用示例

让我们看一个使用统一接口的实际应用示例:

typescript
// 创建一个通用的语言模型处理器
class LanguageModelProcessor {
  private model: BaseLanguageModel;
  
  constructor(model: BaseLanguageModel) {
    this.model = model;
  }
  
  async processText(input: string): Promise<string> {
    // 使用统一的 invoke 接口
    return await this.model.invoke(input);
  }
  
  async processBatch(inputs: string[]): Promise<string[]> {
    // 使用统一的 batch 接口
    return await this.model.batch(inputs);
  }
  
  async *processStream(input: string): AsyncGenerator<string> {
    // 使用统一的 stream 接口
    for await (const chunk of this.model.stream(input)) {
      yield chunk;
    }
  }
  
  async processChat(messages: BaseMessage[]): Promise<BaseMessage> {
    // 使用统一的 invoke 接口处理聊天消息
    const response = await this.model.invoke(messages);
    return {
      role: 'assistant',
      content: response
    };
  }
}

// 使用不同的模型
const openAIProcessor = new LanguageModelProcessor(
  new ChatOpenAI({
    apiKey: process.env.OPENAI_API_KEY!,
    modelName: 'gpt-3.5-turbo'
  })
);

const ollamaProcessor = new LanguageModelProcessor(
  new Ollama({
    modelName: 'llama2'
  })
);

// 同样的接口,不同的实现
const openAIResult = await openAIProcessor.processText("写一首关于春天的诗");
const ollamaResult = await ollamaProcessor.processText("写一首关于春天的诗");

// 流式处理
for await (const chunk of openAIProcessor.processStream("讲一个长故事")) {
  process.stdout.write(chunk);
}

// 聊天处理
const chatMessages: BaseMessage[] = [
  { role: 'system', content: '你是一个 helpful assistant' },
  { role: 'user', content: '你好,今天天气怎么样?' }
];

const chatResponse = await openAIProcessor.processChat(chatMessages);
console.log(chatResponse.content);

错误处理和配置管理

统一接口还提供了统一的错误处理和配置管理:

typescript
class BaseLanguageModel {
  protected timeout?: number;
  protected maxRetries: number = 3;
  protected retryDelay: number = 1000;
  
  constructor(config?: { 
    timeout?: number; 
    maxRetries?: number; 
    retryDelay?: number 
  }) {
    this.timeout = config?.timeout;
    this.maxRetries = config?.maxRetries ?? this.maxRetries;
    this.retryDelay = config?.retryDelay ?? this.retryDelay;
  }
  
  async generate(
    prompts: string[] | BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): Promise<LLMResult> {
    // 应用默认配置
    const mergedOptions = {
      timeout: this.timeout,
      ...options
    };
    
    // 实现重试机制
    for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
      try {
        return await this._generateWithTimeout(prompts, mergedOptions);
      } catch (error) {
        if (attempt === this.maxRetries) {
          throw error;
        }
        
        // 指数退避
        await new Promise(resolve => 
          setTimeout(resolve, this.retryDelay * Math.pow(2, attempt))
        );
      }
    }
    
    throw new Error("Unreachable");
  }
  
  private async _generateWithTimeout(
    prompts: string[] | BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): Promise<LLMResult> {
    if (options?.timeout) {
      const controller = new AbortController();
      const timeoutId = setTimeout(() => controller.abort(), options.timeout);
      
      try {
        const result = await this._generate(prompts, {
          ...options,
          signal: controller.signal
        });
        return result;
      } finally {
        clearTimeout(timeoutId);
      }
    }
    
    return await this._generate(prompts, options);
  }
  
  protected abstract _generate(
    prompts: string[] | BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): Promise<LLMResult>;
}

总结

BaseLanguageModel 通过提供统一的抽象接口,成功地将传统 和现代 ChatModel 整合在一起。这种设计带来了以下优势:

  1. 接口统一 - 开发者可以使用相同的接口操作不同类型的模型
  2. 可扩展性强 - 可以轻松添加新的模型实现
  3. 功能完整 - 支持 generate、streamGenerate、batch 等多种操作模式
  4. 错误处理 - 提供统一的错误处理和重试机制
  5. 配置管理 - 支持统一的配置管理和超时控制

通过这种统一接口,LangChain V3 为构建灵活、可扩展的 LLM 应用提供了坚实的基础。

在下一章中,我们将探讨流式实现:stream() 返回 ReadableStream<ChatCompletionChunk>,深入了解分块解析 SSE 响应的机制。