缓存机制:RunnableCache 如何避免重复调用?
基于输入哈希缓存输出,支持 Redis、In-Memory
在构建高性能的 LLM 应用时,缓存是一个至关重要的优化技术。通过避免重复的昂贵计算(如 LLM 调用),缓存可以显著提高响应速度并降低运营成本。LangChain V3 通过 RunnableCache 机制提供了灵活的缓存支持,可以基于输入哈希缓存输出,并支持多种存储后端,包括 Redis 和内存存储。本章将深入探讨缓存机制的实现和应用。
RunnableCache 的基本概念
RunnableCache 是 LangChain V3 中用于缓存 Runnable 组件输出的机制:
typescript
interface BaseCache {
lookup(key: string): Promise<any | null>;
update(key: string, value: any): Promise<void>;
clear(): Promise<void>;
}
abstract class RunnableCache implements BaseCache {
abstract lookup(key: string): Promise<any | null>;
abstract update(key: string, value: any): Promise<void>;
abstract clear(): Promise<void>;
// 生成缓存键
protected hashInput(input: any): string {
const inputString = typeof input === 'string'
? input
: JSON.stringify(input, Object.keys(input).sort());
// 简单的哈希实现(实际应用中应使用更安全的哈希算法)
let hash = 0;
for (let i = 0; i < inputString.length; i++) {
const char = inputString.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // 转换为 32 位整数
}
return Math.abs(hash).toString(16);
}
}内存缓存实现
首先实现一个基于内存的缓存:
typescript
class InMemoryCache extends RunnableCache {
private cache: Map<string, { value: any; timestamp: number }>;
private ttl: number; // 生存时间(毫秒)
private maxSize: number;
constructor(ttl: number = 5 * 60 * 1000, maxSize: number = 1000) { // 默认 5 分钟 TTL,最大 1000 项
super();
this.cache = new Map();
this.ttl = ttl;
this.maxSize = maxSize;
}
async lookup(key: string): Promise<any | null> {
this.cleanupExpired();
const entry = this.cache.get(key);
if (entry) {
return entry.value;
}
return null;
}
async update(key: string, value: any): Promise<void> {
this.cleanupExpired();
// 如果缓存已满,删除最旧的项
if (this.cache.size >= this.maxSize) {
const oldestKey = this.getOldestKey();
if (oldestKey) {
this.cache.delete(oldestKey);
}
}
this.cache.set(key, {
value,
timestamp: Date.now()
});
}
async clear(): Promise<void> {
this.cache.clear();
}
private cleanupExpired(): void {
const now = Date.now();
for (const [key, entry] of this.cache.entries()) {
if (now - entry.timestamp > this.ttl) {
this.cache.delete(key);
}
}
}
private getOldestKey(): string | null {
let oldestKey: string | null = null;
let oldestTimestamp = Infinity;
for (const [key, entry] of this.cache.entries()) {
if (entry.timestamp < oldestTimestamp) {
oldestTimestamp = entry.timestamp;
oldestKey = key;
}
}
return oldestKey;
}
// 获取缓存统计信息
getStats(): { size: number; maxSize: number; ttl: number } {
return {
size: this.cache.size,
maxSize: this.maxSize,
ttl: this.ttl
};
}
}Redis 缓存实现
实现一个基于 Redis 的缓存,适用于分布式环境:
typescript
// 假设使用 ioredis 库
// npm install ioredis
import Redis from 'ioredis';
class RedisCache extends RunnableCache {
private redis: Redis;
private ttl: number;
private prefix: string;
constructor(
redis: Redis,
ttl: number = 5 * 60, // Redis TTL 以秒为单位
prefix: string = 'langchain:cache:'
) {
super();
this.redis = redis;
this.ttl = ttl;
this.prefix = prefix;
}
async lookup(key: string): Promise<any | null> {
try {
const value = await this.redis.get(`${this.prefix}${key}`);
if (value) {
return JSON.parse(value);
}
} catch (error) {
console.warn('Redis 缓存查找失败:', error);
}
return null;
}
async update(key: string, value: any): Promise<void> {
try {
const cacheKey = `${this.prefix}${key}`;
await this.redis.setex(
cacheKey,
this.ttl,
JSON.stringify(value)
);
} catch (error) {
console.warn('Redis 缓存更新失败:', error);
}
}
async clear(): Promise<void> {
try {
const keys = await this.redis.keys(`${this.prefix}*`);
if (keys.length > 0) {
await this.redis.del(...keys);
}
} catch (error) {
console.warn('Redis 缓存清空失败:', error);
}
}
// 清除特定模式的缓存
async clearPattern(pattern: string): Promise<void> {
try {
const keys = await this.redis.keys(`${this.prefix}${pattern}`);
if (keys.length > 0) {
await this.redis.del(...keys);
}
} catch (error) {
console.warn('Redis 缓存模式清空失败:', error);
}
}
}带缓存的 Runnable 实现
实现一个支持缓存的 Runnable 包装器:
typescript
class CachedRunnable<Input, Output> extends Runnable<Input, Output> {
private runnable: Runnable<Input, Output>;
private cache: BaseCache;
private shouldCache: (input: Input, output: Output) => boolean;
constructor(
runnable: Runnable<Input, Output>,
cache: BaseCache,
shouldCache?: (input: Input, output: Output) => boolean
) {
super();
this.runnable = runnable;
this.cache = cache;
this.shouldCache = shouldCache || (() => true); // 默认缓存所有结果
}
async invoke(input: Input, options?: RunnableConfig): Promise<Output> {
// 生成缓存键
const cacheKey = this.generateCacheKey(input, options);
// 尝试从缓存中获取结果
let cachedResult: Output | null = null;
try {
cachedResult = await this.cache.lookup(cacheKey);
} catch (error) {
console.warn('缓存查找失败:', error);
}
if (cachedResult !== null) {
console.log(`缓存命中: ${cacheKey}`);
return cachedResult;
}
// 缓存未命中,执行实际的 Runnable
console.log(`缓存未命中: ${cacheKey}`);
const result = await this.runnable.invoke(input, options);
// 如果结果应该被缓存,则更新缓存
if (this.shouldCache(input, result)) {
try {
await this.cache.update(cacheKey, result);
console.log(`缓存已更新: ${cacheKey}`);
} catch (error) {
console.warn('缓存更新失败:', error);
}
}
return result;
}
async batch(inputs: Input[], options?: RunnableConfig): Promise<Output[]> {
// 为批处理实现缓存优化
const cacheKeys = inputs.map(input => this.generateCacheKey(input, options));
// 尝试从缓存中获取所有结果
const cachedResults: (Output | null)[] = await Promise.all(
cacheKeys.map(key => this.cache.lookup(key).catch(() => null))
);
// 确定哪些输入需要实际处理
const uncachedInputs: Input[] = [];
const uncachedIndices: number[] = [];
for (let i = 0; i < inputs.length; i++) {
if (cachedResults[i] === null) {
uncachedInputs.push(inputs[i]);
uncachedIndices.push(i);
}
}
// 处理未缓存的输入
let uncachedResults: Output[] = [];
if (uncachedInputs.length > 0) {
console.log(`批处理中有 ${uncachedInputs.length} 个未缓存的项`);
uncachedResults = await this.runnable.batch(uncachedInputs, options);
// 更新缓存
for (let i = 0; i < uncachedInputs.length; i++) {
const index = uncachedIndices[i];
const cacheKey = cacheKeys[index];
const result = uncachedResults[i];
if (this.shouldCache(uncachedInputs[i], result)) {
try {
await this.cache.update(cacheKey, result);
} catch (error) {
console.warn('批处理缓存更新失败:', error);
}
}
}
}
// 组合结果
const finalResults: Output[] = new Array(inputs.length);
let uncachedResultIndex = 0;
for (let i = 0; i < inputs.length; i++) {
if (cachedResults[i] !== null) {
finalResults[i] = cachedResults[i] as Output;
} else {
finalResults[i] = uncachedResults[uncachedResultIndex++];
}
}
return finalResults;
}
private generateCacheKey(input: Input, options?: RunnableConfig): string {
// 创建包含输入和配置的复合键
const keyData = {
input,
config: {
tags: options?.tags,
metadata: options?.metadata
}
};
const keyString = JSON.stringify(keyData, Object.keys(keyData).sort());
// 生成哈希
let hash = 0;
for (let i = 0; i < keyString.length; i++) {
const char = keyString.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash;
}
return Math.abs(hash).toString(16);
}
}多级缓存实现
实现多级缓存策略,结合内存和 Redis 缓存:
typescript
class MultiLevelCache extends RunnableCache {
private caches: BaseCache[];
private cacheNames: string[];
constructor(caches: BaseCache[], cacheNames: string[]) {
super();
if (caches.length !== cacheNames.length) {
throw new Error('缓存和缓存名称数量必须一致');
}
this.caches = caches;
this.cacheNames = cacheNames;
}
async lookup(key: string): Promise<any | null> {
// 从最高级缓存开始查找
for (let i = 0; i < this.caches.length; i++) {
try {
const result = await this.caches[i].lookup(key);
if (result !== null) {
console.log(`多级缓存命中: ${this.cacheNames[i]}`);
// 将结果提升到更高级的缓存
for (let j = i - 1; j >= 0; j--) {
try {
await this.caches[j].update(key, result);
} catch (error) {
console.warn(`缓存提升失败 (${this.cacheNames[j]}):`, error);
}
}
return result;
}
} catch (error) {
console.warn(`缓存查找失败 (${this.cacheNames[i]}):`, error);
}
}
return null;
}
async update(key: string, value: any): Promise<void> {
// 更新所有级别的缓存
await Promise.all(
this.caches.map((cache, index) =>
cache.update(key, value).catch(error => {
console.warn(`缓存更新失败 (${this.cacheNames[index]}):`, error);
})
)
);
}
async clear(): Promise<void> {
// 清空所有级别的缓存
await Promise.all(
this.caches.map((cache, index) =>
cache.clear().catch(error => {
console.warn(`缓存清空失败 (${this.cacheNames[index]}):`, error);
})
)
);
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何在问答系统中使用缓存机制:
typescript
// 带缓存的智能问答系统
class CachedQASystem {
private qaChain: CachedRunnable<string, string>;
private cache: BaseCache;
constructor(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel,
cache: BaseCache
) {
this.cache = cache;
this.qaChain = this.buildQAChain(retriever, llm, cache);
}
private buildQAChain(
retriever: Runnable<string, Document[]>,
llm: BaseLanguageModel,
cache: BaseCache
): CachedRunnable<string, string> {
// 创建基础 QA 链
const baseChain = RunnableMap.from({
question: (input: string) => input,
context: retriever.pipe((docs: Document[]) =>
docs.map(d => d.pageContent).join('\n\n')
)
})
.pipe(new PromptTemplate({
template: `使用以下文档内容回答问题。如果文档中没有相关信息,请说明无法回答。
文档:
{context}
问题: {question}
答案:`,
inputVariables: ["context", "question"]
}))
.pipe(llm)
.pipe(new StringOutputParser())
.withRetry({ maxAttempts: 3 });
// 创建缓存策略:只缓存高置信度的答案
const shouldCache = (input: string, output: string): boolean => {
// 简单策略:答案长度超过一定字符数才缓存
return output.length > 50;
};
// 包装为缓存 Runnable
return new CachedRunnable(baseChain, cache, shouldCache);
}
async answerQuestion(question: string): Promise<{
answer: string;
cacheHit: boolean;
processingTime: number;
}> {
const startTime = Date.now();
try {
// 记录缓存状态
const cacheKey = this.generateCacheKey(question);
const wasCached = await this.cache.lookup(cacheKey) !== null;
const answer = await this.qaChain.invoke(question);
const processingTime = Date.now() - startTime;
return {
answer,
cacheHit: wasCached,
processingTime
};
} catch (error) {
console.error('问答系统错误:', error);
return {
answer: "抱歉,处理您的问题时出现了错误。",
cacheHit: false,
processingTime: Date.now() - startTime
};
}
}
async answerQuestions(questions: string[]): Promise<Array<{
question: string;
answer: string;
cacheHit: boolean;
processingTime: number;
}>> {
const startTime = Date.now();
try {
// 批量处理
const answers = await this.qaChain.batch(questions);
const processingTime = Date.now() - startTime;
const avgTime = processingTime / questions.length;
// 检查缓存命中情况(简化实现)
const cacheHits = await Promise.all(
questions.map(async q => {
const key = this.generateCacheKey(q);
return await this.cache.lookup(key) !== null;
})
);
return questions.map((question, index) => ({
question,
answer: answers[index],
cacheHit: cacheHits[index],
processingTime: avgTime
}));
} catch (error) {
console.error('批量问答系统错误:', error);
return questions.map(question => ({
question,
answer: "抱歉,处理您的问题时出现了错误。",
cacheHit: false,
processingTime: Date.now() - startTime
}));
}
}
private generateCacheKey(question: string): string {
let hash = 0;
for (let i = 0; i < question.length; i++) {
const char = question.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash;
}
return Math.abs(hash).toString(16);
}
// 缓存统计信息
async getCacheStats(): Promise<any> {
if (this.cache instanceof InMemoryCache) {
return (this.cache as InMemoryCache).getStats();
} else if (this.cache instanceof MultiLevelCache) {
// 简化实现,实际应用中可能需要更复杂的统计
return { type: 'multi-level' };
} else {
return { type: 'unknown' };
}
}
// 清空缓存
async clearCache(): Promise<void> {
await this.cache.clear();
}
}
// 创建带缓存的问答系统
async function createCachedQASystem() {
// 创建向量存储和检索器
const vectorStore = new MemoryVectorStore();
// 添加示例文档
const sampleDocuments: Document[] = [
{
pageContent: `LangChain 是一个用于开发由语言模型驱动的应用程序的框架。它提供了一套工具和组件,帮助开发者将语言模型与其他数据源、计算逻辑和外部系统集成。主要特性包括组件化设计、链式调用、数据连接和智能代理。`,
metadata: { source: "langchain-intro", category: "framework" }
},
{
pageContent: `LCEL (LangChain Expression Language) 是 LangChain V3 的重要特性。它是一种声明式语言,允许开发者使用管道操作符(|)将不同的组件连接在一起。LCEL 提供了组件化、类型安全、流式处理等优势。`,
metadata: { source: "lcel-docs", category: "language" }
},
{
pageContent: `VectorStoreRetriever 是 LangChain 中的检索组件,负责从向量存储中检索与查询相关的文档。它实现了 Runnable<string, Document[]> 接口,可以无缝集成到 LCEL 管道中。支持向量相似度搜索、多种搜索策略和文档过滤。`,
metadata: { source: "retriever-docs", category: "retrieval" }
}
];
await vectorStore.addDocuments(sampleDocuments);
const retriever = new VectorStoreRetriever({
vectorStore,
k: 3
});
// 创建多级缓存
const memoryCache = new InMemoryCache(5 * 60 * 1000, 1000); // 5分钟 TTL,最大1000项
const redis = new Redis({ host: 'localhost', port: 6379 }); // 假设 Redis 服务运行在本地
const redisCache = new RedisCache(redis, 30 * 60); // 30分钟 TTL
const multiLevelCache = new MultiLevelCache(
[memoryCache, redisCache],
['memory', 'redis']
);
// 创建带缓存的问答系统
const cachedQASystem = new CachedQASystem(
retriever,
new ChatOpenAI({ modelName: "gpt-3.5-turbo" }),
multiLevelCache
);
return { cachedQASystem, memoryCache, redisCache };
}
// 使用示例
async function demonstrateCaching() {
console.log('=== 带缓存的问答系统演示 ===\n');
// 创建系统
const { cachedQASystem } = await createCachedQASystem();
// 第一次提问
console.log('第一次提问:');
let result = await cachedQASystem.answerQuestion("什么是 LangChain?");
console.log(`问题: 什么是 LangChain?`);
console.log(`答案: ${result.answer.substring(0, 100)}...`);
console.log(`缓存命中: ${result.cacheHit}`);
console.log(`处理时间: ${result.processingTime}ms\n`);
// 第二次相同提问(应该命中缓存)
console.log('第二次相同提问:');
result = await cachedQASystem.answerQuestion("什么是 LangChain?");
console.log(`问题: 什么是 LangChain?`);
console.log(`答案: ${result.answer.substring(0, 100)}...`);
console.log(`缓存命中: ${result.cacheHit}`);
console.log(`处理时间: ${result.processingTime}ms\n`);
// 批量处理
console.log('批量处理:');
const questions = [
"LCEL 有什么优势?",
"VectorStoreRetriever 的作用是什么?",
"如何构建 RAG 应用?"
];
const batchResults = await cachedQASystem.answerQuestions(questions);
batchResults.forEach((result, index) => {
console.log(`问题 ${index + 1}: ${result.question}`);
console.log(`答案: ${result.answer.substring(0, 80)}...`);
console.log(`缓存命中: ${result.cacheHit}`);
console.log(`处理时间: ${result.processingTime.toFixed(2)}ms\n`);
});
// 查看缓存统计
const cacheStats = await cachedQASystem.getCacheStats();
console.log('缓存统计:', cacheStats);
}总结
通过 RunnableCache 机制,LangChain V3 提供了灵活而强大的缓存支持:
- 多种存储后端 - 支持内存缓存和 Redis 缓存
- 智能缓存策略 - 可以根据输入和输出决定是否缓存
- 多级缓存 - 支持内存+Redis等多级缓存架构
- 批处理优化 - 在批处理中智能利用缓存
- 生命周期管理 - 支持 TTL 和大小限制
- 统计和监控 - 提供缓存使用情况的统计信息
缓存机制显著提高了 LLM 应用的性能和成本效益,特别是在处理重复查询时。通过合理的缓存策略,开发者可以构建高性能、低成本的生产级应用。
在下一章中,我们将探讨 A/B 测试:RunnableParallel 并行执行多个版本,比较结果,了解如何通过并行执行进行效果对比。