持久化会话:RunnableWithMessageHistory 如何管理 chat_history?
需要实现 BaseChatMessageHistory 存储适配器
在构建对话式 AI 应用时,维护对话历史是提供连贯用户体验的关键。LangChain V3 通过 RunnableWithMessageHistory 提供了管理对话历史的机制,并支持通过 BaseChatMessageHistory 存储适配器实现持久化。本章将深入探讨如何实现和使用对话历史管理功能。
BaseChatMessageHistory 存储适配器
BaseChatMessageHistory 是所有对话历史存储适配器的基础接口:
typescript
// 消息类型定义
interface BaseMessage {
role: string;
content: string;
additional_kwargs?: Record<string, any>;
timestamp?: number;
}
interface HumanMessage extends BaseMessage {
role: 'user';
}
interface AIMessage extends BaseMessage {
role: 'assistant';
}
interface SystemMessage extends BaseMessage {
role: 'system';
}
// 基础聊天历史接口
abstract class BaseChatMessageHistory {
abstract getMessages(): Promise<BaseMessage[]>;
abstract addMessage(message: BaseMessage): Promise<void>;
abstract clear(): Promise<void>;
// 可选方法
async addMessages(messages: BaseMessage[]): Promise<void> {
for (const message of messages) {
await this.addMessage(message);
}
}
async getLatestMessages(count: number): Promise<BaseMessage[]> {
const messages = await this.getMessages();
return messages.slice(-count);
}
async getMessagesByRole(role: string): Promise<BaseMessage[]> {
const messages = await this.getMessages();
return messages.filter(message => message.role === role);
}
}内存存储适配器
实现基于内存的聊天历史存储:
typescript
// 内存存储适配器
class ChatMessageHistory extends BaseChatMessageHistory {
private messages: BaseMessage[];
private sessionId: string;
private maxSize: number;
constructor(sessionId: string, maxSize: number = 50) {
super();
this.sessionId = sessionId;
this.messages = [];
this.maxSize = maxSize;
}
async getMessages(): Promise<BaseMessage[]> {
return [...this.messages]; // 返回副本以防止外部修改
}
async addMessage(message: BaseMessage): Promise<void> {
// 添加时间戳
const messageWithTimestamp = {
...message,
timestamp: message.timestamp || Date.now()
};
this.messages.push(messageWithTimestamp);
// 限制历史记录大小
if (this.messages.length > this.maxSize) {
this.messages = this.messages.slice(-this.maxSize);
}
}
async clear(): Promise<void> {
this.messages = [];
}
getSessionId(): string {
return this.sessionId;
}
// 扩展方法:获取会话信息
async getSessionInfo(): Promise<{
sessionId: string;
messageCount: number;
firstMessageTime?: number;
lastMessageTime?: number;
}> {
return {
sessionId: this.sessionId,
messageCount: this.messages.length,
firstMessageTime: this.messages[0]?.timestamp,
lastMessageTime: this.messages[this.messages.length - 1]?.timestamp
};
}
}持久化存储适配器
实现基于文件系统的持久化存储适配器:
typescript
// 文件系统存储适配器
import * as fs from 'fs/promises';
import * as path from 'path';
class FileChatMessageHistory extends BaseChatMessageHistory {
private filePath: string;
private messages: BaseMessage[];
private loaded: boolean;
constructor(filePath: string) {
super();
this.filePath = filePath;
this.messages = [];
this.loaded = false;
}
private async loadMessages(): Promise<void> {
if (this.loaded) return;
try {
const data = await fs.readFile(this.filePath, 'utf8');
this.messages = JSON.parse(data);
this.loaded = true;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
// 文件不存在,创建空数组
this.messages = [];
await this.saveMessages();
} else {
throw error;
}
}
}
private async saveMessages(): Promise<void> {
await fs.mkdir(path.dirname(this.filePath), { recursive: true });
await fs.writeFile(this.filePath, JSON.stringify(this.messages, null, 2));
}
async getMessages(): Promise<BaseMessage[]> {
await this.loadMessages();
return [...this.messages];
}
async addMessage(message: BaseMessage): Promise<void> {
await this.loadMessages();
const messageWithTimestamp = {
...message,
timestamp: message.timestamp || Date.now()
};
this.messages.push(messageWithTimestamp);
await this.saveMessages();
}
async clear(): Promise<void> {
this.messages = [];
await this.saveMessages();
}
}
// Redis 存储适配器示例
// npm install ioredis
import Redis from 'ioredis';
class RedisChatMessageHistory extends BaseChatMessageHistory {
private redis: Redis;
private sessionId: string;
private key: string;
constructor(redis: Redis, sessionId: string) {
super();
this.redis = redis;
this.sessionId = sessionId;
this.key = `chat_history:${sessionId}`;
}
async getMessages(): Promise<BaseMessage[]> {
const data = await this.redis.get(this.key);
if (!data) return [];
try {
return JSON.parse(data);
} catch (error) {
console.error('解析 Redis 数据失败:', error);
return [];
}
}
async addMessage(message: BaseMessage): Promise<void> {
const messages = await this.getMessages();
const messageWithTimestamp = {
...message,
timestamp: message.timestamp || Date.now()
};
messages.push(messageWithTimestamp);
// 限制历史记录大小(保留最近 50 条消息)
const limitedMessages = messages.slice(-50);
await this.redis.setex(
this.key,
3600, // 1小时过期
JSON.stringify(limitedMessages)
);
}
async clear(): Promise<void> {
await this.redis.del(this.key);
}
}RunnableWithMessageHistory 实现
实现带消息历史的 Runnable 包装器:
typescript
// 带消息历史的 Runnable
class RunnableWithMessageHistory<Input, Output> extends Runnable<Input, Output> {
private runnable: Runnable<Input, Output>;
private getMessageHistory: (sessionId: string) => Promise<BaseChatMessageHistory>;
private inputMessagesKey: string;
private outputMessagesKey: string;
constructor(
runnable: Runnable<Input, Output>,
getMessageHistory: (sessionId: string) => Promise<BaseChatMessageHistory>,
config: {
inputMessagesKey?: string;
outputMessagesKey?: string;
} = {}
) {
super();
this.runnable = runnable;
this.getMessageHistory = getMessageHistory;
this.inputMessagesKey = config.inputMessagesKey || 'messages';
this.outputMessagesKey = config.outputMessagesKey || 'message';
}
async invoke(input: Input, options?: RunnableConfig & { sessionId: string }): Promise<Output> {
if (!options?.sessionId) {
throw new Error('sessionId 是必需的');
}
// 获取消息历史
const messageHistory = await this.getMessageHistory(options.sessionId);
const historyMessages = await messageHistory.getMessages();
// 将历史消息添加到输入中
const inputWithHistory = {
...input,
[this.inputMessagesKey]: historyMessages
};
// 执行 Runnable
const result = await this.runnable.invoke(inputWithHistory, options);
// 保存新消息到历史记录
await this.saveMessagesToHistory(input, result, messageHistory);
return result;
}
private async saveMessagesToHistory(
input: Input,
output: Output,
messageHistory: BaseChatMessageHistory
): Promise<void> {
try {
// 保存输入消息(假设输入包含用户消息)
if (typeof input === 'object' && input !== null && 'input' in input) {
await messageHistory.addMessage({
role: 'user',
content: (input as any).input,
timestamp: Date.now()
});
}
// 保存输出消息
if (typeof output === 'string') {
await messageHistory.addMessage({
role: 'assistant',
content: output,
timestamp: Date.now()
});
} else if (typeof output === 'object' && output !== null && this.outputMessagesKey in output) {
await messageHistory.addMessage({
role: 'assistant',
content: (output as any)[this.outputMessagesKey],
timestamp: Date.now()
});
}
} catch (error) {
console.warn('保存消息历史失败:', error);
}
}
}高级消息历史管理
实现更高级的消息历史管理功能:
typescript
// 高级聊天历史管理器
class AdvancedChatMessageHistory extends BaseChatMessageHistory {
private storage: BaseChatMessageHistory;
private maxSize: number;
private ttl: number; // 生存时间(毫秒)
constructor(
storage: BaseChatMessageHistory,
options: { maxSize?: number; ttl?: number } = {}
) {
super();
this.storage = storage;
this.maxSize = options.maxSize || 50;
this.ttl = options.ttl || 3600000; // 默认1小时
}
async getMessages(): Promise<BaseMessage[]> {
const messages = await this.storage.getMessages();
const now = Date.now();
// 过滤过期消息
const validMessages = messages.filter(
msg => !msg.timestamp || (now - msg.timestamp) < this.ttl
);
// 如果消息数量超过最大值,只返回最新的消息
if (validMessages.length > this.maxSize) {
return validMessages.slice(-this.maxSize);
}
return validMessages;
}
async addMessage(message: BaseMessage): Promise<void> {
// 清理过期消息
await this.cleanupExpiredMessages();
// 添加新消息
await this.storage.addMessage(message);
}
async clear(): Promise<void> {
await this.storage.clear();
}
private async cleanupExpiredMessages(): Promise<void> {
const messages = await this.storage.getMessages();
const now = Date.now();
const validMessages = messages.filter(
msg => !msg.timestamp || (now - msg.timestamp) < this.ttl
);
if (validMessages.length < messages.length) {
await this.storage.clear();
await this.storage.addMessages(validMessages);
}
}
// 获取会话摘要
async getSessionSummary(): Promise<{
totalMessages: number;
userMessages: number;
assistantMessages: number;
duration: number;
lastActivity: number;
}> {
const messages = await this.getMessages();
if (messages.length === 0) {
return {
totalMessages: 0,
userMessages: 0,
assistantMessages: 0,
duration: 0,
lastActivity: 0
};
}
const userMessages = messages.filter(msg => msg.role === 'user').length;
const assistantMessages = messages.filter(msg => msg.role === 'assistant').length;
const firstMessageTime = messages[0].timestamp || Date.now();
const lastMessageTime = messages[messages.length - 1].timestamp || Date.now();
return {
totalMessages: messages.length,
userMessages,
assistantMessages,
duration: lastMessageTime - firstMessageTime,
lastActivity: lastMessageTime
};
}
// 压缩历史记录(移除冗余信息)
async compressHistory(): Promise<void> {
const messages = await this.getMessages();
if (messages.length <= 5) return; // 如果消息很少,不需要压缩
// 简单的压缩策略:保留开始、结束和关键转折点
const compressedMessages = [
messages[0], // 第一条消息
...messages.slice(-4) // 最后4条消息
];
await this.storage.clear();
await this.storage.addMessages(compressedMessages);
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何在对话系统中使用消息历史管理:
typescript
// 对话式问答系统
class ConversationalQASystem {
private getMessageHistory: (sessionId: string) => Promise<BaseChatMessageHistory>;
private llm: BaseChatModel;
constructor(
getMessageHistory: (sessionId: string) => Promise<BaseChatMessageHistory>,
llm: BaseChatModel
) {
this.getMessageHistory = getMessageHistory;
this.llm = llm;
}
async answerQuestion(
question: string,
sessionId: string,
options?: { useRAG?: boolean }
): Promise<{
answer: string;
sessionId: string;
messageHistory?: {
totalMessages: number;
userMessages: number;
assistantMessages: number;
};
}> {
try {
// 获取消息历史
const messageHistory = await this.getMessageHistory(sessionId);
// 构建带历史的提示
const prompt = await this.buildPromptWithHistory(
question,
messageHistory,
options?.useRAG
);
// 生成答案
const answer = await this.llm.invoke([{ role: 'user', content: prompt }]);
// 保存对话历史
await messageHistory.addMessage({
role: 'user',
content: question,
timestamp: Date.now()
});
await messageHistory.addMessage({
role: 'assistant',
content: answer,
timestamp: Date.now()
});
// 获取历史统计信息
const historySummary = await this.getHistorySummary(messageHistory);
return {
answer,
sessionId,
messageHistory: historySummary
};
} catch (error) {
throw new Error(`对话处理失败: ${error.message}`);
}
}
private async buildPromptWithHistory(
question: string,
messageHistory: BaseChatMessageHistory,
useRAG?: boolean
): Promise<string> {
let prompt = '';
// 添加系统提示
prompt += `你是一个专业的问答助手。请基于对话历史和相关文档回答用户问题。\n\n`;
// 添加对话历史
const historyMessages = await messageHistory.getLatestMessages(10);
if (historyMessages.length > 0) {
prompt += '对话历史:\n';
for (const msg of historyMessages) {
const role = msg.role === 'user' ? '用户' : '助手';
prompt += `${role}: ${msg.content}\n`;
}
prompt += '\n';
}
// 如果启用 RAG,添加相关文档
if (useRAG) {
const relevantDocs = await this.getRelevantDocuments(question);
if (relevantDocs.length > 0) {
prompt += '相关文档:\n';
prompt += relevantDocs.map(doc => doc.pageContent).join('\n\n');
prompt += '\n\n';
}
}
// 添加当前问题
prompt += `当前问题: ${question}\n\n`;
prompt += '请提供准确、简洁的回答:';
return prompt;
}
private async getRelevantDocuments(question: string): Promise<Document[]> {
// 这里简化处理,实际应用中应该实现真正的文档检索
return [
{
pageContent: "LangChain 是一个用于开发由语言模型驱动的应用程序的框架。",
metadata: { source: "langchain-docs" }
}
];
}
private async getHistorySummary(
messageHistory: BaseChatMessageHistory
): Promise<{
totalMessages: number;
userMessages: number;
assistantMessages: number;
}> {
const messages = await messageHistory.getMessages();
const userMessages = messages.filter(msg => msg.role === 'user').length;
const assistantMessages = messages.filter(msg => msg.role === 'assistant').length;
return {
totalMessages: messages.length,
userMessages,
assistantMessages
};
}
// 获取会话信息
async getSessionInfo(sessionId: string): Promise<any> {
const messageHistory = await this.getMessageHistory(sessionId);
if (messageHistory instanceof AdvancedChatMessageHistory) {
return await messageHistory.getSessionSummary();
}
const messages = await messageHistory.getMessages();
return {
messageCount: messages.length,
firstMessage: messages[0]?.timestamp,
lastMessage: messages[messages.length - 1]?.timestamp
};
}
// 清除会话历史
async clearSession(sessionId: string): Promise<void> {
const messageHistory = await this.getMessageHistory(sessionId);
await messageHistory.clear();
}
}
// 会话管理器
class SessionManager {
private histories: Map<string, BaseChatMessageHistory>;
private defaultStorageType: 'memory' | 'file' | 'redis';
private storagePath?: string;
private redisClient?: Redis;
constructor(
defaultStorageType: 'memory' | 'file' | 'redis' = 'memory',
storageOptions?: { storagePath?: string; redisClient?: Redis }
) {
this.histories = new Map();
this.defaultStorageType = defaultStorageType;
this.storagePath = storageOptions?.storagePath;
this.redisClient = storageOptions?.redisClient;
}
async getMessageHistory(sessionId: string): Promise<BaseChatMessageHistory> {
// 检查缓存
if (this.histories.has(sessionId)) {
return this.histories.get(sessionId)!;
}
// 创建新的消息历史存储
let history: BaseChatMessageHistory;
switch (this.defaultStorageType) {
case 'memory':
history = new ChatMessageHistory(sessionId, 50);
break;
case 'file':
if (!this.storagePath) {
throw new Error('文件存储需要指定存储路径');
}
history = new FileChatMessageHistory(
`${this.storagePath}/chat_history_${sessionId}.json`
);
break;
case 'redis':
if (!this.redisClient) {
throw new Error('Redis 存储需要 Redis 客户端');
}
history = new RedisChatMessageHistory(this.redisClient, sessionId);
break;
default:
history = new ChatMessageHistory(sessionId, 50);
}
// 包装为高级历史管理器
const advancedHistory = new AdvancedChatMessageHistory(history, {
maxSize: 50,
ttl: 3600000 // 1小时
});
// 缓存历史管理器
this.histories.set(sessionId, advancedHistory);
return advancedHistory;
}
// 获取所有活跃会话
getActiveSessions(): string[] {
return Array.from(this.histories.keys());
}
// 清理过期会话
async cleanupExpiredSessions(): Promise<void> {
for (const [sessionId, history] of this.histories.entries()) {
try {
const messages = await history.getMessages();
if (messages.length === 0) {
this.histories.delete(sessionId);
}
} catch (error) {
console.warn(`清理会话 ${sessionId} 时出错:`, error);
this.histories.delete(sessionId);
}
}
}
}
// 使用示例
async function demonstrateConversationalQA() {
console.log('=== 对话式问答系统演示 ===\n');
// 创建会话管理器
const sessionManager = new SessionManager('memory');
// 创建对话系统
const qaSystem = new ConversationalQASystem(
(sessionId) => sessionManager.getMessageHistory(sessionId),
new ChatOpenAI({ modelName: 'gpt-3.5-turbo' })
);
// 模拟对话
const sessionId = 'session_' + Date.now();
console.log('第一轮对话:');
let result = await qaSystem.answerQuestion(
'你好,能介绍一下 LangChain 吗?',
sessionId
);
console.log(`问题: 你好,能介绍一下 LangChain 吗?`);
console.log(`答案: ${result.answer}`);
console.log(`消息历史: ${JSON.stringify(result.messageHistory)}`);
console.log();
console.log('第二轮对话:');
result = await qaSystem.answerQuestion(
'它有什么主要特性?',
sessionId
);
console.log(`问题: 它有什么主要特性?`);
console.log(`答案: ${result.answer}`);
console.log(`消息历史: ${JSON.stringify(result.messageHistory)}`);
console.log();
console.log('第三轮对话:');
result = await qaSystem.answerQuestion(
'能详细说说 LCEL 吗?',
sessionId,
{ useRAG: true }
);
console.log(`问题: 能详细说说 LCEL 吗?`);
console.log(`答案: ${result.answer}`);
console.log(`消息历史: ${JSON.stringify(result.messageHistory)}`);
console.log();
// 获取会话信息
console.log('会话信息:');
const sessionInfo = await qaSystem.getSessionInfo(sessionId);
console.log(sessionInfo);
}
// NestJS 集成示例
/*
// chat-history.service.ts
import { Injectable } from '@nestjs/common';
@Injectable()
export class ChatHistoryService {
private sessionManager: SessionManager;
constructor() {
this.sessionManager = new SessionManager('redis', {
redisClient: new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379')
})
});
}
async getMessageHistory(sessionId: string) {
return await this.sessionManager.getMessageHistory(sessionId);
}
}
// chat.controller.ts
import { Controller, Post, Body, Query } from '@nestjs/common';
interface ChatMessageDto {
message: string;
sessionId: string;
useRAG?: boolean;
}
@Controller('chat')
export class ChatController {
constructor(private readonly chatHistoryService: ChatHistoryService) {}
@Post('message')
async handleMessage(@Body() body: ChatMessageDto) {
const qaSystem = new ConversationalQASystem(
(sessionId) => this.chatHistoryService.getMessageHistory(sessionId),
new ChatOpenAI({ modelName: 'gpt-3.5-turbo' })
);
return await qaSystem.answerQuestion(
body.message,
body.sessionId,
{ useRAG: body.useRAG }
);
}
@Post('session/info')
async getSessionInfo(@Query('sessionId') sessionId: string) {
const qaSystem = new ConversationalQASystem(
(sessionId) => this.chatHistoryService.getMessageHistory(sessionId),
new ChatOpenAI({ modelName: 'gpt-3.5-turbo' })
);
return await qaSystem.getSessionInfo(sessionId);
}
@Post('session/clear')
async clearSession(@Query('sessionId') sessionId: string) {
const messageHistory = await this.chatHistoryService.getMessageHistory(sessionId);
await messageHistory.clear();
return { success: true, message: '会话历史已清除' };
}
}
*/总结
通过 RunnableWithMessageHistory 和 BaseChatMessageHistory 存储适配器,LangChain V3 提供了强大的对话历史管理能力:
- 基础存储接口 -
BaseChatMessageHistory定义了标准的存储接口 - 多种存储实现 - 支持内存、文件系统和 Redis 等多种存储方式
- 持久化支持 - 实现会话数据的持久化存储
- 高级管理功能 - 提供历史记录清理、压缩和统计功能
- 会话管理 - 支持多会话管理和过期清理
- 灵活集成 - 可以轻松集成到各种应用框架中
这些功能使得开发者能够构建具有连贯对话体验的 AI 应用,为用户提供更加自然和智能的交互体验。
在下一章中,我们将探讨微服务化:将不同 Runnable 部署为独立服务,通过 HTTP/gRPC 调用,了解如何将 LangChain 组件拆分为独立的微服务。