Skip to content

持久化会话:RunnableWithMessageHistory 如何管理 chat_history

需要实现 BaseChatMessageHistory 存储适配器

在构建对话式 AI 应用时,维护对话历史是提供连贯用户体验的关键。LangChain V3 通过 RunnableWithMessageHistory 提供了管理对话历史的机制,并支持通过 BaseChatMessageHistory 存储适配器实现持久化。本章将深入探讨如何实现和使用对话历史管理功能。

BaseChatMessageHistory 存储适配器

BaseChatMessageHistory 是所有对话历史存储适配器的基础接口:

typescript
// 消息类型定义
interface BaseMessage {
  role: string;
  content: string;
  additional_kwargs?: Record<string, any>;
  timestamp?: number;
}

interface HumanMessage extends BaseMessage {
  role: 'user';
}

interface AIMessage extends BaseMessage {
  role: 'assistant';
}

interface SystemMessage extends BaseMessage {
  role: 'system';
}

// 基础聊天历史接口
abstract class BaseChatMessageHistory {
  abstract getMessages(): Promise<BaseMessage[]>;
  abstract addMessage(message: BaseMessage): Promise<void>;
  abstract clear(): Promise<void>;
  
  // 可选方法
  async addMessages(messages: BaseMessage[]): Promise<void> {
    for (const message of messages) {
      await this.addMessage(message);
    }
  }
  
  async getLatestMessages(count: number): Promise<BaseMessage[]> {
    const messages = await this.getMessages();
    return messages.slice(-count);
  }
  
  async getMessagesByRole(role: string): Promise<BaseMessage[]> {
    const messages = await this.getMessages();
    return messages.filter(message => message.role === role);
  }
}

内存存储适配器

实现基于内存的聊天历史存储:

typescript
// 内存存储适配器
class ChatMessageHistory extends BaseChatMessageHistory {
  private messages: BaseMessage[];
  private sessionId: string;
  private maxSize: number;
  
  constructor(sessionId: string, maxSize: number = 50) {
    super();
    this.sessionId = sessionId;
    this.messages = [];
    this.maxSize = maxSize;
  }
  
  async getMessages(): Promise<BaseMessage[]> {
    return [...this.messages]; // 返回副本以防止外部修改
  }
  
  async addMessage(message: BaseMessage): Promise<void> {
    // 添加时间戳
    const messageWithTimestamp = {
      ...message,
      timestamp: message.timestamp || Date.now()
    };
    
    this.messages.push(messageWithTimestamp);
    
    // 限制历史记录大小
    if (this.messages.length > this.maxSize) {
      this.messages = this.messages.slice(-this.maxSize);
    }
  }
  
  async clear(): Promise<void> {
    this.messages = [];
  }
  
  getSessionId(): string {
    return this.sessionId;
  }
  
  // 扩展方法:获取会话信息
  async getSessionInfo(): Promise<{
    sessionId: string;
    messageCount: number;
    firstMessageTime?: number;
    lastMessageTime?: number;
  }> {
    return {
      sessionId: this.sessionId,
      messageCount: this.messages.length,
      firstMessageTime: this.messages[0]?.timestamp,
      lastMessageTime: this.messages[this.messages.length - 1]?.timestamp
    };
  }
}

持久化存储适配器

实现基于文件系统的持久化存储适配器:

typescript
// 文件系统存储适配器
import * as fs from 'fs/promises';
import * as path from 'path';

class FileChatMessageHistory extends BaseChatMessageHistory {
  private filePath: string;
  private messages: BaseMessage[];
  private loaded: boolean;
  
  constructor(filePath: string) {
    super();
    this.filePath = filePath;
    this.messages = [];
    this.loaded = false;
  }
  
  private async loadMessages(): Promise<void> {
    if (this.loaded) return;
    
    try {
      const data = await fs.readFile(this.filePath, 'utf8');
      this.messages = JSON.parse(data);
      this.loaded = true;
    } catch (error) {
      if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
        // 文件不存在,创建空数组
        this.messages = [];
        await this.saveMessages();
      } else {
        throw error;
      }
    }
  }
  
  private async saveMessages(): Promise<void> {
    await fs.mkdir(path.dirname(this.filePath), { recursive: true });
    await fs.writeFile(this.filePath, JSON.stringify(this.messages, null, 2));
  }
  
  async getMessages(): Promise<BaseMessage[]> {
    await this.loadMessages();
    return [...this.messages];
  }
  
  async addMessage(message: BaseMessage): Promise<void> {
    await this.loadMessages();
    
    const messageWithTimestamp = {
      ...message,
      timestamp: message.timestamp || Date.now()
    };
    
    this.messages.push(messageWithTimestamp);
    await this.saveMessages();
  }
  
  async clear(): Promise<void> {
    this.messages = [];
    await this.saveMessages();
  }
}

// Redis 存储适配器示例
// npm install ioredis
import Redis from 'ioredis';

class RedisChatMessageHistory extends BaseChatMessageHistory {
  private redis: Redis;
  private sessionId: string;
  private key: string;
  
  constructor(redis: Redis, sessionId: string) {
    super();
    this.redis = redis;
    this.sessionId = sessionId;
    this.key = `chat_history:${sessionId}`;
  }
  
  async getMessages(): Promise<BaseMessage[]> {
    const data = await this.redis.get(this.key);
    if (!data) return [];
    
    try {
      return JSON.parse(data);
    } catch (error) {
      console.error('解析 Redis 数据失败:', error);
      return [];
    }
  }
  
  async addMessage(message: BaseMessage): Promise<void> {
    const messages = await this.getMessages();
    
    const messageWithTimestamp = {
      ...message,
      timestamp: message.timestamp || Date.now()
    };
    
    messages.push(messageWithTimestamp);
    
    // 限制历史记录大小(保留最近 50 条消息)
    const limitedMessages = messages.slice(-50);
    
    await this.redis.setex(
      this.key,
      3600, // 1小时过期
      JSON.stringify(limitedMessages)
    );
  }
  
  async clear(): Promise<void> {
    await this.redis.del(this.key);
  }
}

RunnableWithMessageHistory 实现

实现带消息历史的 Runnable 包装器:

typescript
// 带消息历史的 Runnable
class RunnableWithMessageHistory<Input, Output> extends Runnable<Input, Output> {
  private runnable: Runnable<Input, Output>;
  private getMessageHistory: (sessionId: string) => Promise<BaseChatMessageHistory>;
  private inputMessagesKey: string;
  private outputMessagesKey: string;
  
  constructor(
    runnable: Runnable<Input, Output>,
    getMessageHistory: (sessionId: string) => Promise<BaseChatMessageHistory>,
    config: {
      inputMessagesKey?: string;
      outputMessagesKey?: string;
    } = {}
  ) {
    super();
    this.runnable = runnable;
    this.getMessageHistory = getMessageHistory;
    this.inputMessagesKey = config.inputMessagesKey || 'messages';
    this.outputMessagesKey = config.outputMessagesKey || 'message';
  }
  
  async invoke(input: Input, options?: RunnableConfig & { sessionId: string }): Promise<Output> {
    if (!options?.sessionId) {
      throw new Error('sessionId 是必需的');
    }
    
    // 获取消息历史
    const messageHistory = await this.getMessageHistory(options.sessionId);
    const historyMessages = await messageHistory.getMessages();
    
    // 将历史消息添加到输入中
    const inputWithHistory = {
      ...input,
      [this.inputMessagesKey]: historyMessages
    };
    
    // 执行 Runnable
    const result = await this.runnable.invoke(inputWithHistory, options);
    
    // 保存新消息到历史记录
    await this.saveMessagesToHistory(input, result, messageHistory);
    
    return result;
  }
  
  private async saveMessagesToHistory(
    input: Input,
    output: Output,
    messageHistory: BaseChatMessageHistory
  ): Promise<void> {
    try {
      // 保存输入消息(假设输入包含用户消息)
      if (typeof input === 'object' && input !== null && 'input' in input) {
        await messageHistory.addMessage({
          role: 'user',
          content: (input as any).input,
          timestamp: Date.now()
        });
      }
      
      // 保存输出消息
      if (typeof output === 'string') {
        await messageHistory.addMessage({
          role: 'assistant',
          content: output,
          timestamp: Date.now()
        });
      } else if (typeof output === 'object' && output !== null && this.outputMessagesKey in output) {
        await messageHistory.addMessage({
          role: 'assistant',
          content: (output as any)[this.outputMessagesKey],
          timestamp: Date.now()
        });
      }
    } catch (error) {
      console.warn('保存消息历史失败:', error);
    }
  }
}

高级消息历史管理

实现更高级的消息历史管理功能:

typescript
// 高级聊天历史管理器
class AdvancedChatMessageHistory extends BaseChatMessageHistory {
  private storage: BaseChatMessageHistory;
  private maxSize: number;
  private ttl: number; // 生存时间(毫秒)
  
  constructor(
    storage: BaseChatMessageHistory,
    options: { maxSize?: number; ttl?: number } = {}
  ) {
    super();
    this.storage = storage;
    this.maxSize = options.maxSize || 50;
    this.ttl = options.ttl || 3600000; // 默认1小时
  }
  
  async getMessages(): Promise<BaseMessage[]> {
    const messages = await this.storage.getMessages();
    const now = Date.now();
    
    // 过滤过期消息
    const validMessages = messages.filter(
      msg => !msg.timestamp || (now - msg.timestamp) < this.ttl
    );
    
    // 如果消息数量超过最大值,只返回最新的消息
    if (validMessages.length > this.maxSize) {
      return validMessages.slice(-this.maxSize);
    }
    
    return validMessages;
  }
  
  async addMessage(message: BaseMessage): Promise<void> {
    // 清理过期消息
    await this.cleanupExpiredMessages();
    
    // 添加新消息
    await this.storage.addMessage(message);
  }
  
  async clear(): Promise<void> {
    await this.storage.clear();
  }
  
  private async cleanupExpiredMessages(): Promise<void> {
    const messages = await this.storage.getMessages();
    const now = Date.now();
    const validMessages = messages.filter(
      msg => !msg.timestamp || (now - msg.timestamp) < this.ttl
    );
    
    if (validMessages.length < messages.length) {
      await this.storage.clear();
      await this.storage.addMessages(validMessages);
    }
  }
  
  // 获取会话摘要
  async getSessionSummary(): Promise<{
    totalMessages: number;
    userMessages: number;
    assistantMessages: number;
    duration: number;
    lastActivity: number;
  }> {
    const messages = await this.getMessages();
    if (messages.length === 0) {
      return {
        totalMessages: 0,
        userMessages: 0,
        assistantMessages: 0,
        duration: 0,
        lastActivity: 0
      };
    }
    
    const userMessages = messages.filter(msg => msg.role === 'user').length;
    const assistantMessages = messages.filter(msg => msg.role === 'assistant').length;
    const firstMessageTime = messages[0].timestamp || Date.now();
    const lastMessageTime = messages[messages.length - 1].timestamp || Date.now();
    
    return {
      totalMessages: messages.length,
      userMessages,
      assistantMessages,
      duration: lastMessageTime - firstMessageTime,
      lastActivity: lastMessageTime
    };
  }
  
  // 压缩历史记录(移除冗余信息)
  async compressHistory(): Promise<void> {
    const messages = await this.getMessages();
    if (messages.length <= 5) return; // 如果消息很少,不需要压缩
    
    // 简单的压缩策略:保留开始、结束和关键转折点
    const compressedMessages = [
      messages[0], // 第一条消息
      ...messages.slice(-4) // 最后4条消息
    ];
    
    await this.storage.clear();
    await this.storage.addMessages(compressedMessages);
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何在对话系统中使用消息历史管理:

typescript
// 对话式问答系统
class ConversationalQASystem {
  private getMessageHistory: (sessionId: string) => Promise<BaseChatMessageHistory>;
  private llm: BaseChatModel;
  
  constructor(
    getMessageHistory: (sessionId: string) => Promise<BaseChatMessageHistory>,
    llm: BaseChatModel
  ) {
    this.getMessageHistory = getMessageHistory;
    this.llm = llm;
  }
  
  async answerQuestion(
    question: string,
    sessionId: string,
    options?: { useRAG?: boolean }
  ): Promise<{
    answer: string;
    sessionId: string;
    messageHistory?: {
      totalMessages: number;
      userMessages: number;
      assistantMessages: number;
    };
  }> {
    try {
      // 获取消息历史
      const messageHistory = await this.getMessageHistory(sessionId);
      
      // 构建带历史的提示
      const prompt = await this.buildPromptWithHistory(
        question,
        messageHistory,
        options?.useRAG
      );
      
      // 生成答案
      const answer = await this.llm.invoke([{ role: 'user', content: prompt }]);
      
      // 保存对话历史
      await messageHistory.addMessage({
        role: 'user',
        content: question,
        timestamp: Date.now()
      });
      
      await messageHistory.addMessage({
        role: 'assistant',
        content: answer,
        timestamp: Date.now()
      });
      
      // 获取历史统计信息
      const historySummary = await this.getHistorySummary(messageHistory);
      
      return {
        answer,
        sessionId,
        messageHistory: historySummary
      };
    } catch (error) {
      throw new Error(`对话处理失败: ${error.message}`);
    }
  }
  
  private async buildPromptWithHistory(
    question: string,
    messageHistory: BaseChatMessageHistory,
    useRAG?: boolean
  ): Promise<string> {
    let prompt = '';
    
    // 添加系统提示
    prompt += `你是一个专业的问答助手。请基于对话历史和相关文档回答用户问题。\n\n`;
    
    // 添加对话历史
    const historyMessages = await messageHistory.getLatestMessages(10);
    if (historyMessages.length > 0) {
      prompt += '对话历史:\n';
      for (const msg of historyMessages) {
        const role = msg.role === 'user' ? '用户' : '助手';
        prompt += `${role}: ${msg.content}\n`;
      }
      prompt += '\n';
    }
    
    // 如果启用 RAG,添加相关文档
    if (useRAG) {
      const relevantDocs = await this.getRelevantDocuments(question);
      if (relevantDocs.length > 0) {
        prompt += '相关文档:\n';
        prompt += relevantDocs.map(doc => doc.pageContent).join('\n\n');
        prompt += '\n\n';
      }
    }
    
    // 添加当前问题
    prompt += `当前问题: ${question}\n\n`;
    prompt += '请提供准确、简洁的回答:';
    
    return prompt;
  }
  
  private async getRelevantDocuments(question: string): Promise<Document[]> {
    // 这里简化处理,实际应用中应该实现真正的文档检索
    return [
      {
        pageContent: "LangChain 是一个用于开发由语言模型驱动的应用程序的框架。",
        metadata: { source: "langchain-docs" }
      }
    ];
  }
  
  private async getHistorySummary(
    messageHistory: BaseChatMessageHistory
  ): Promise<{
    totalMessages: number;
    userMessages: number;
    assistantMessages: number;
  }> {
    const messages = await messageHistory.getMessages();
    const userMessages = messages.filter(msg => msg.role === 'user').length;
    const assistantMessages = messages.filter(msg => msg.role === 'assistant').length;
    
    return {
      totalMessages: messages.length,
      userMessages,
      assistantMessages
    };
  }
  
  // 获取会话信息
  async getSessionInfo(sessionId: string): Promise<any> {
    const messageHistory = await this.getMessageHistory(sessionId);
    if (messageHistory instanceof AdvancedChatMessageHistory) {
      return await messageHistory.getSessionSummary();
    }
    
    const messages = await messageHistory.getMessages();
    return {
      messageCount: messages.length,
      firstMessage: messages[0]?.timestamp,
      lastMessage: messages[messages.length - 1]?.timestamp
    };
  }
  
  // 清除会话历史
  async clearSession(sessionId: string): Promise<void> {
    const messageHistory = await this.getMessageHistory(sessionId);
    await messageHistory.clear();
  }
}

// 会话管理器
class SessionManager {
  private histories: Map<string, BaseChatMessageHistory>;
  private defaultStorageType: 'memory' | 'file' | 'redis';
  private storagePath?: string;
  private redisClient?: Redis;
  
  constructor(
    defaultStorageType: 'memory' | 'file' | 'redis' = 'memory',
    storageOptions?: { storagePath?: string; redisClient?: Redis }
  ) {
    this.histories = new Map();
    this.defaultStorageType = defaultStorageType;
    this.storagePath = storageOptions?.storagePath;
    this.redisClient = storageOptions?.redisClient;
  }
  
  async getMessageHistory(sessionId: string): Promise<BaseChatMessageHistory> {
    // 检查缓存
    if (this.histories.has(sessionId)) {
      return this.histories.get(sessionId)!;
    }
    
    // 创建新的消息历史存储
    let history: BaseChatMessageHistory;
    
    switch (this.defaultStorageType) {
      case 'memory':
        history = new ChatMessageHistory(sessionId, 50);
        break;
        
      case 'file':
        if (!this.storagePath) {
          throw new Error('文件存储需要指定存储路径');
        }
        history = new FileChatMessageHistory(
          `${this.storagePath}/chat_history_${sessionId}.json`
        );
        break;
        
      case 'redis':
        if (!this.redisClient) {
          throw new Error('Redis 存储需要 Redis 客户端');
        }
        history = new RedisChatMessageHistory(this.redisClient, sessionId);
        break;
        
      default:
        history = new ChatMessageHistory(sessionId, 50);
    }
    
    // 包装为高级历史管理器
    const advancedHistory = new AdvancedChatMessageHistory(history, {
      maxSize: 50,
      ttl: 3600000 // 1小时
    });
    
    // 缓存历史管理器
    this.histories.set(sessionId, advancedHistory);
    
    return advancedHistory;
  }
  
  // 获取所有活跃会话
  getActiveSessions(): string[] {
    return Array.from(this.histories.keys());
  }
  
  // 清理过期会话
  async cleanupExpiredSessions(): Promise<void> {
    for (const [sessionId, history] of this.histories.entries()) {
      try {
        const messages = await history.getMessages();
        if (messages.length === 0) {
          this.histories.delete(sessionId);
        }
      } catch (error) {
        console.warn(`清理会话 ${sessionId} 时出错:`, error);
        this.histories.delete(sessionId);
      }
    }
  }
}

// 使用示例
async function demonstrateConversationalQA() {
  console.log('=== 对话式问答系统演示 ===\n');
  
  // 创建会话管理器
  const sessionManager = new SessionManager('memory');
  
  // 创建对话系统
  const qaSystem = new ConversationalQASystem(
    (sessionId) => sessionManager.getMessageHistory(sessionId),
    new ChatOpenAI({ modelName: 'gpt-3.5-turbo' })
  );
  
  // 模拟对话
  const sessionId = 'session_' + Date.now();
  
  console.log('第一轮对话:');
  let result = await qaSystem.answerQuestion(
    '你好,能介绍一下 LangChain 吗?',
    sessionId
  );
  
  console.log(`问题: 你好,能介绍一下 LangChain 吗?`);
  console.log(`答案: ${result.answer}`);
  console.log(`消息历史: ${JSON.stringify(result.messageHistory)}`);
  console.log();
  
  console.log('第二轮对话:');
  result = await qaSystem.answerQuestion(
    '它有什么主要特性?',
    sessionId
  );
  
  console.log(`问题: 它有什么主要特性?`);
  console.log(`答案: ${result.answer}`);
  console.log(`消息历史: ${JSON.stringify(result.messageHistory)}`);
  console.log();
  
  console.log('第三轮对话:');
  result = await qaSystem.answerQuestion(
    '能详细说说 LCEL 吗?',
    sessionId,
    { useRAG: true }
  );
  
  console.log(`问题: 能详细说说 LCEL 吗?`);
  console.log(`答案: ${result.answer}`);
  console.log(`消息历史: ${JSON.stringify(result.messageHistory)}`);
  console.log();
  
  // 获取会话信息
  console.log('会话信息:');
  const sessionInfo = await qaSystem.getSessionInfo(sessionId);
  console.log(sessionInfo);
}

// NestJS 集成示例
/*
// chat-history.service.ts
import { Injectable } from '@nestjs/common';

@Injectable()
export class ChatHistoryService {
  private sessionManager: SessionManager;
  
  constructor() {
    this.sessionManager = new SessionManager('redis', {
      redisClient: new Redis({
        host: process.env.REDIS_HOST || 'localhost',
        port: parseInt(process.env.REDIS_PORT || '6379')
      })
    });
  }
  
  async getMessageHistory(sessionId: string) {
    return await this.sessionManager.getMessageHistory(sessionId);
  }
}

// chat.controller.ts
import { Controller, Post, Body, Query } from '@nestjs/common';

interface ChatMessageDto {
  message: string;
  sessionId: string;
  useRAG?: boolean;
}

@Controller('chat')
export class ChatController {
  constructor(private readonly chatHistoryService: ChatHistoryService) {}
  
  @Post('message')
  async handleMessage(@Body() body: ChatMessageDto) {
    const qaSystem = new ConversationalQASystem(
      (sessionId) => this.chatHistoryService.getMessageHistory(sessionId),
      new ChatOpenAI({ modelName: 'gpt-3.5-turbo' })
    );
    
    return await qaSystem.answerQuestion(
      body.message,
      body.sessionId,
      { useRAG: body.useRAG }
    );
  }
  
  @Post('session/info')
  async getSessionInfo(@Query('sessionId') sessionId: string) {
    const qaSystem = new ConversationalQASystem(
      (sessionId) => this.chatHistoryService.getMessageHistory(sessionId),
      new ChatOpenAI({ modelName: 'gpt-3.5-turbo' })
    );
    
    return await qaSystem.getSessionInfo(sessionId);
  }
  
  @Post('session/clear')
  async clearSession(@Query('sessionId') sessionId: string) {
    const messageHistory = await this.chatHistoryService.getMessageHistory(sessionId);
    await messageHistory.clear();
    return { success: true, message: '会话历史已清除' };
  }
}
*/

总结

通过 RunnableWithMessageHistoryBaseChatMessageHistory 存储适配器,LangChain V3 提供了强大的对话历史管理能力:

  1. 基础存储接口 - BaseChatMessageHistory 定义了标准的存储接口
  2. 多种存储实现 - 支持内存、文件系统和 Redis 等多种存储方式
  3. 持久化支持 - 实现会话数据的持久化存储
  4. 高级管理功能 - 提供历史记录清理、压缩和统计功能
  5. 会话管理 - 支持多会话管理和过期清理
  6. 灵活集成 - 可以轻松集成到各种应用框架中

这些功能使得开发者能够构建具有连贯对话体验的 AI 应用,为用户提供更加自然和智能的交互体验。

在下一章中,我们将探讨微服务化:将不同 Runnable 部署为独立服务,通过 HTTP/gRPC 调用,了解如何将 LangChain 组件拆分为独立的微服务。