Skip to content

缓存机制:RunnableCache 如何避免重复调用?

基于输入哈希缓存输出,支持 Redis、In-Memory

在构建高性能的 LLM 应用时,缓存是一个至关重要的优化技术。通过避免重复的昂贵计算(如 LLM 调用),缓存可以显著提高响应速度并降低运营成本。LangChain V3 通过 RunnableCache 机制提供了灵活的缓存支持,可以基于输入哈希缓存输出,并支持多种存储后端,包括 Redis 和内存存储。本章将深入探讨缓存机制的实现和应用。

RunnableCache 的基本概念

RunnableCache 是 LangChain V3 中用于缓存 Runnable 组件输出的机制:

typescript
interface BaseCache {
  lookup(key: string): Promise<any | null>;
  update(key: string, value: any): Promise<void>;
  clear(): Promise<void>;
}

abstract class RunnableCache implements BaseCache {
  abstract lookup(key: string): Promise<any | null>;
  abstract update(key: string, value: any): Promise<void>;
  abstract clear(): Promise<void>;
  
  // 生成缓存键
  protected hashInput(input: any): string {
    const inputString = typeof input === 'string' 
      ? input 
      : JSON.stringify(input, Object.keys(input).sort());
    
    // 简单的哈希实现(实际应用中应使用更安全的哈希算法)
    let hash = 0;
    for (let i = 0; i < inputString.length; i++) {
      const char = inputString.charCodeAt(i);
      hash = ((hash << 5) - hash) + char;
      hash = hash & hash; // 转换为 32 位整数
    }
    return Math.abs(hash).toString(16);
  }
}

内存缓存实现

首先实现一个基于内存的缓存:

typescript
class InMemoryCache extends RunnableCache {
  private cache: Map<string, { value: any; timestamp: number }>;
  private ttl: number; // 生存时间(毫秒)
  private maxSize: number;
  
  constructor(ttl: number = 5 * 60 * 1000, maxSize: number = 1000) { // 默认 5 分钟 TTL,最大 1000 项
    super();
    this.cache = new Map();
    this.ttl = ttl;
    this.maxSize = maxSize;
  }
  
  async lookup(key: string): Promise<any | null> {
    this.cleanupExpired();
    
    const entry = this.cache.get(key);
    if (entry) {
      return entry.value;
    }
    
    return null;
  }
  
  async update(key: string, value: any): Promise<void> {
    this.cleanupExpired();
    
    // 如果缓存已满,删除最旧的项
    if (this.cache.size >= this.maxSize) {
      const oldestKey = this.getOldestKey();
      if (oldestKey) {
        this.cache.delete(oldestKey);
      }
    }
    
    this.cache.set(key, {
      value,
      timestamp: Date.now()
    });
  }
  
  async clear(): Promise<void> {
    this.cache.clear();
  }
  
  private cleanupExpired(): void {
    const now = Date.now();
    for (const [key, entry] of this.cache.entries()) {
      if (now - entry.timestamp > this.ttl) {
        this.cache.delete(key);
      }
    }
  }
  
  private getOldestKey(): string | null {
    let oldestKey: string | null = null;
    let oldestTimestamp = Infinity;
    
    for (const [key, entry] of this.cache.entries()) {
      if (entry.timestamp < oldestTimestamp) {
        oldestTimestamp = entry.timestamp;
        oldestKey = key;
      }
    }
    
    return oldestKey;
  }
  
  // 获取缓存统计信息
  getStats(): { size: number; maxSize: number; ttl: number } {
    return {
      size: this.cache.size,
      maxSize: this.maxSize,
      ttl: this.ttl
    };
  }
}

Redis 缓存实现

实现一个基于 Redis 的缓存,适用于分布式环境:

typescript
// 假设使用 ioredis 库
// npm install ioredis
import Redis from 'ioredis';

class RedisCache extends RunnableCache {
  private redis: Redis;
  private ttl: number;
  private prefix: string;
  
  constructor(
    redis: Redis, 
    ttl: number = 5 * 60, // Redis TTL 以秒为单位
    prefix: string = 'langchain:cache:'
  ) {
    super();
    this.redis = redis;
    this.ttl = ttl;
    this.prefix = prefix;
  }
  
  async lookup(key: string): Promise<any | null> {
    try {
      const value = await this.redis.get(`${this.prefix}${key}`);
      if (value) {
        return JSON.parse(value);
      }
    } catch (error) {
      console.warn('Redis 缓存查找失败:', error);
    }
    
    return null;
  }
  
  async update(key: string, value: any): Promise<void> {
    try {
      const cacheKey = `${this.prefix}${key}`;
      await this.redis.setex(
        cacheKey,
        this.ttl,
        JSON.stringify(value)
      );
    } catch (error) {
      console.warn('Redis 缓存更新失败:', error);
    }
  }
  
  async clear(): Promise<void> {
    try {
      const keys = await this.redis.keys(`${this.prefix}*`);
      if (keys.length > 0) {
        await this.redis.del(...keys);
      }
    } catch (error) {
      console.warn('Redis 缓存清空失败:', error);
    }
  }
  
  // 清除特定模式的缓存
  async clearPattern(pattern: string): Promise<void> {
    try {
      const keys = await this.redis.keys(`${this.prefix}${pattern}`);
      if (keys.length > 0) {
        await this.redis.del(...keys);
      }
    } catch (error) {
      console.warn('Redis 缓存模式清空失败:', error);
    }
  }
}

带缓存的 Runnable 实现

实现一个支持缓存的 Runnable 包装器:

typescript
class CachedRunnable<Input, Output> extends Runnable<Input, Output> {
  private runnable: Runnable<Input, Output>;
  private cache: BaseCache;
  private shouldCache: (input: Input, output: Output) => boolean;
  
  constructor(
    runnable: Runnable<Input, Output>,
    cache: BaseCache,
    shouldCache?: (input: Input, output: Output) => boolean
  ) {
    super();
    this.runnable = runnable;
    this.cache = cache;
    this.shouldCache = shouldCache || (() => true); // 默认缓存所有结果
  }
  
  async invoke(input: Input, options?: RunnableConfig): Promise<Output> {
    // 生成缓存键
    const cacheKey = this.generateCacheKey(input, options);
    
    // 尝试从缓存中获取结果
    let cachedResult: Output | null = null;
    try {
      cachedResult = await this.cache.lookup(cacheKey);
    } catch (error) {
      console.warn('缓存查找失败:', error);
    }
    
    if (cachedResult !== null) {
      console.log(`缓存命中: ${cacheKey}`);
      return cachedResult;
    }
    
    // 缓存未命中,执行实际的 Runnable
    console.log(`缓存未命中: ${cacheKey}`);
    const result = await this.runnable.invoke(input, options);
    
    // 如果结果应该被缓存,则更新缓存
    if (this.shouldCache(input, result)) {
      try {
        await this.cache.update(cacheKey, result);
        console.log(`缓存已更新: ${cacheKey}`);
      } catch (error) {
        console.warn('缓存更新失败:', error);
      }
    }
    
    return result;
  }
  
  async batch(inputs: Input[], options?: RunnableConfig): Promise<Output[]> {
    // 为批处理实现缓存优化
    const cacheKeys = inputs.map(input => this.generateCacheKey(input, options));
    
    // 尝试从缓存中获取所有结果
    const cachedResults: (Output | null)[] = await Promise.all(
      cacheKeys.map(key => this.cache.lookup(key).catch(() => null))
    );
    
    // 确定哪些输入需要实际处理
    const uncachedInputs: Input[] = [];
    const uncachedIndices: number[] = [];
    
    for (let i = 0; i < inputs.length; i++) {
      if (cachedResults[i] === null) {
        uncachedInputs.push(inputs[i]);
        uncachedIndices.push(i);
      }
    }
    
    // 处理未缓存的输入
    let uncachedResults: Output[] = [];
    if (uncachedInputs.length > 0) {
      console.log(`批处理中有 ${uncachedInputs.length} 个未缓存的项`);
      uncachedResults = await this.runnable.batch(uncachedInputs, options);
      
      // 更新缓存
      for (let i = 0; i < uncachedInputs.length; i++) {
        const index = uncachedIndices[i];
        const cacheKey = cacheKeys[index];
        const result = uncachedResults[i];
        
        if (this.shouldCache(uncachedInputs[i], result)) {
          try {
            await this.cache.update(cacheKey, result);
          } catch (error) {
            console.warn('批处理缓存更新失败:', error);
          }
        }
      }
    }
    
    // 组合结果
    const finalResults: Output[] = new Array(inputs.length);
    let uncachedResultIndex = 0;
    
    for (let i = 0; i < inputs.length; i++) {
      if (cachedResults[i] !== null) {
        finalResults[i] = cachedResults[i] as Output;
      } else {
        finalResults[i] = uncachedResults[uncachedResultIndex++];
      }
    }
    
    return finalResults;
  }
  
  private generateCacheKey(input: Input, options?: RunnableConfig): string {
    // 创建包含输入和配置的复合键
    const keyData = {
      input,
      config: {
        tags: options?.tags,
        metadata: options?.metadata
      }
    };
    
    const keyString = JSON.stringify(keyData, Object.keys(keyData).sort());
    
    // 生成哈希
    let hash = 0;
    for (let i = 0; i < keyString.length; i++) {
      const char = keyString.charCodeAt(i);
      hash = ((hash << 5) - hash) + char;
      hash = hash & hash;
    }
    
    return Math.abs(hash).toString(16);
  }
}

多级缓存实现

实现多级缓存策略,结合内存和 Redis 缓存:

typescript
class MultiLevelCache extends RunnableCache {
  private caches: BaseCache[];
  private cacheNames: string[];
  
  constructor(caches: BaseCache[], cacheNames: string[]) {
    super();
    if (caches.length !== cacheNames.length) {
      throw new Error('缓存和缓存名称数量必须一致');
    }
    
    this.caches = caches;
    this.cacheNames = cacheNames;
  }
  
  async lookup(key: string): Promise<any | null> {
    // 从最高级缓存开始查找
    for (let i = 0; i < this.caches.length; i++) {
      try {
        const result = await this.caches[i].lookup(key);
        if (result !== null) {
          console.log(`多级缓存命中: ${this.cacheNames[i]}`);
          
          // 将结果提升到更高级的缓存
          for (let j = i - 1; j >= 0; j--) {
            try {
              await this.caches[j].update(key, result);
            } catch (error) {
              console.warn(`缓存提升失败 (${this.cacheNames[j]}):`, error);
            }
          }
          
          return result;
        }
      } catch (error) {
        console.warn(`缓存查找失败 (${this.cacheNames[i]}):`, error);
      }
    }
    
    return null;
  }
  
  async update(key: string, value: any): Promise<void> {
    // 更新所有级别的缓存
    await Promise.all(
      this.caches.map((cache, index) => 
        cache.update(key, value).catch(error => {
          console.warn(`缓存更新失败 (${this.cacheNames[index]}):`, error);
        })
      )
    );
  }
  
  async clear(): Promise<void> {
    // 清空所有级别的缓存
    await Promise.all(
      this.caches.map((cache, index) => 
        cache.clear().catch(error => {
          console.warn(`缓存清空失败 (${this.cacheNames[index]}):`, error);
        })
      )
    );
  }
}

实际应用示例

让我们看一个完整的实际应用示例,展示如何在问答系统中使用缓存机制:

typescript
// 带缓存的智能问答系统
class CachedQASystem {
  private qaChain: CachedRunnable<string, string>;
  private cache: BaseCache;
  
  constructor(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel,
    cache: BaseCache
  ) {
    this.cache = cache;
    this.qaChain = this.buildQAChain(retriever, llm, cache);
  }
  
  private buildQAChain(
    retriever: Runnable<string, Document[]>,
    llm: BaseLanguageModel,
    cache: BaseCache
  ): CachedRunnable<string, string> {
    // 创建基础 QA 链
    const baseChain = RunnableMap.from({
      question: (input: string) => input,
      context: retriever.pipe((docs: Document[]) => 
        docs.map(d => d.pageContent).join('\n\n')
      )
    })
    .pipe(new PromptTemplate({
      template: `使用以下文档内容回答问题。如果文档中没有相关信息,请说明无法回答。

文档:
{context}

问题: {question}

答案:`,
      inputVariables: ["context", "question"]
    }))
    .pipe(llm)
    .pipe(new StringOutputParser())
    .withRetry({ maxAttempts: 3 });
    
    // 创建缓存策略:只缓存高置信度的答案
    const shouldCache = (input: string, output: string): boolean => {
      // 简单策略:答案长度超过一定字符数才缓存
      return output.length > 50;
    };
    
    // 包装为缓存 Runnable
    return new CachedRunnable(baseChain, cache, shouldCache);
  }
  
  async answerQuestion(question: string): Promise<{ 
    answer: string; 
    cacheHit: boolean;
    processingTime: number;
  }> {
    const startTime = Date.now();
    
    try {
      // 记录缓存状态
      const cacheKey = this.generateCacheKey(question);
      const wasCached = await this.cache.lookup(cacheKey) !== null;
      
      const answer = await this.qaChain.invoke(question);
      
      const processingTime = Date.now() - startTime;
      
      return {
        answer,
        cacheHit: wasCached,
        processingTime
      };
    } catch (error) {
      console.error('问答系统错误:', error);
      return {
        answer: "抱歉,处理您的问题时出现了错误。",
        cacheHit: false,
        processingTime: Date.now() - startTime
      };
    }
  }
  
  async answerQuestions(questions: string[]): Promise<Array<{ 
    question: string;
    answer: string; 
    cacheHit: boolean;
    processingTime: number;
  }>> {
    const startTime = Date.now();
    
    try {
      // 批量处理
      const answers = await this.qaChain.batch(questions);
      
      const processingTime = Date.now() - startTime;
      const avgTime = processingTime / questions.length;
      
      // 检查缓存命中情况(简化实现)
      const cacheHits = await Promise.all(
        questions.map(async q => {
          const key = this.generateCacheKey(q);
          return await this.cache.lookup(key) !== null;
        })
      );
      
      return questions.map((question, index) => ({
        question,
        answer: answers[index],
        cacheHit: cacheHits[index],
        processingTime: avgTime
      }));
    } catch (error) {
      console.error('批量问答系统错误:', error);
      return questions.map(question => ({
        question,
        answer: "抱歉,处理您的问题时出现了错误。",
        cacheHit: false,
        processingTime: Date.now() - startTime
      }));
    }
  }
  
  private generateCacheKey(question: string): string {
    let hash = 0;
    for (let i = 0; i < question.length; i++) {
      const char = question.charCodeAt(i);
      hash = ((hash << 5) - hash) + char;
      hash = hash & hash;
    }
    return Math.abs(hash).toString(16);
  }
  
  // 缓存统计信息
  async getCacheStats(): Promise<any> {
    if (this.cache instanceof InMemoryCache) {
      return (this.cache as InMemoryCache).getStats();
    } else if (this.cache instanceof MultiLevelCache) {
      // 简化实现,实际应用中可能需要更复杂的统计
      return { type: 'multi-level' };
    } else {
      return { type: 'unknown' };
    }
  }
  
  // 清空缓存
  async clearCache(): Promise<void> {
    await this.cache.clear();
  }
}

// 创建带缓存的问答系统
async function createCachedQASystem() {
  // 创建向量存储和检索器
  const vectorStore = new MemoryVectorStore();
  
  // 添加示例文档
  const sampleDocuments: Document[] = [
    {
      pageContent: `LangChain 是一个用于开发由语言模型驱动的应用程序的框架。它提供了一套工具和组件,帮助开发者将语言模型与其他数据源、计算逻辑和外部系统集成。主要特性包括组件化设计、链式调用、数据连接和智能代理。`,
      metadata: { source: "langchain-intro", category: "framework" }
    },
    {
      pageContent: `LCEL (LangChain Expression Language) 是 LangChain V3 的重要特性。它是一种声明式语言,允许开发者使用管道操作符(|)将不同的组件连接在一起。LCEL 提供了组件化、类型安全、流式处理等优势。`,
      metadata: { source: "lcel-docs", category: "language" }
    },
    {
      pageContent: `VectorStoreRetriever 是 LangChain 中的检索组件,负责从向量存储中检索与查询相关的文档。它实现了 Runnable<string, Document[]> 接口,可以无缝集成到 LCEL 管道中。支持向量相似度搜索、多种搜索策略和文档过滤。`,
      metadata: { source: "retriever-docs", category: "retrieval" }
    }
  ];
  
  await vectorStore.addDocuments(sampleDocuments);
  
  const retriever = new VectorStoreRetriever({ 
    vectorStore, 
    k: 3
  });
  
  // 创建多级缓存
  const memoryCache = new InMemoryCache(5 * 60 * 1000, 1000); // 5分钟 TTL,最大1000项
  const redis = new Redis({ host: 'localhost', port: 6379 }); // 假设 Redis 服务运行在本地
  const redisCache = new RedisCache(redis, 30 * 60); // 30分钟 TTL
  
  const multiLevelCache = new MultiLevelCache(
    [memoryCache, redisCache],
    ['memory', 'redis']
  );
  
  // 创建带缓存的问答系统
  const cachedQASystem = new CachedQASystem(
    retriever,
    new ChatOpenAI({ modelName: "gpt-3.5-turbo" }),
    multiLevelCache
  );
  
  return { cachedQASystem, memoryCache, redisCache };
}

// 使用示例
async function demonstrateCaching() {
  console.log('=== 带缓存的问答系统演示 ===\n');
  
  // 创建系统
  const { cachedQASystem } = await createCachedQASystem();
  
  // 第一次提问
  console.log('第一次提问:');
  let result = await cachedQASystem.answerQuestion("什么是 LangChain?");
  console.log(`问题: 什么是 LangChain?`);
  console.log(`答案: ${result.answer.substring(0, 100)}...`);
  console.log(`缓存命中: ${result.cacheHit}`);
  console.log(`处理时间: ${result.processingTime}ms\n`);
  
  // 第二次相同提问(应该命中缓存)
  console.log('第二次相同提问:');
  result = await cachedQASystem.answerQuestion("什么是 LangChain?");
  console.log(`问题: 什么是 LangChain?`);
  console.log(`答案: ${result.answer.substring(0, 100)}...`);
  console.log(`缓存命中: ${result.cacheHit}`);
  console.log(`处理时间: ${result.processingTime}ms\n`);
  
  // 批量处理
  console.log('批量处理:');
  const questions = [
    "LCEL 有什么优势?",
    "VectorStoreRetriever 的作用是什么?",
    "如何构建 RAG 应用?"
  ];
  
  const batchResults = await cachedQASystem.answerQuestions(questions);
  batchResults.forEach((result, index) => {
    console.log(`问题 ${index + 1}: ${result.question}`);
    console.log(`答案: ${result.answer.substring(0, 80)}...`);
    console.log(`缓存命中: ${result.cacheHit}`);
    console.log(`处理时间: ${result.processingTime.toFixed(2)}ms\n`);
  });
  
  // 查看缓存统计
  const cacheStats = await cachedQASystem.getCacheStats();
  console.log('缓存统计:', cacheStats);
}

总结

通过 RunnableCache 机制,LangChain V3 提供了灵活而强大的缓存支持:

  1. 多种存储后端 - 支持内存缓存和 Redis 缓存
  2. 智能缓存策略 - 可以根据输入和输出决定是否缓存
  3. 多级缓存 - 支持内存+Redis等多级缓存架构
  4. 批处理优化 - 在批处理中智能利用缓存
  5. 生命周期管理 - 支持 TTL 和大小限制
  6. 统计和监控 - 提供缓存使用情况的统计信息

缓存机制显著提高了 LLM 应用的性能和成本效益,特别是在处理重复查询时。通过合理的缓存策略,开发者可以构建高性能、低成本的生产级应用。

在下一章中,我们将探讨 A/B 测试:RunnableParallel 并行执行多个版本,比较结果,了解如何通过并行执行进行效果对比。