Skip to content

ContextualCompressionRetriever:在检索后压缩上下文

使用 LLMChainExtractor 移除无关句子

在检索增强生成(RAG)应用中,检索到的文档往往包含大量信息,其中有些可能与当前查询无关。ContextualCompressionRetriever 通过在检索后压缩上下文,移除无关内容,提高生成质量。本章将深入探讨 ContextualCompressionRetriever 的设计和实现。

ContextualCompressionRetriever 的基本概念

ContextualCompressionRetriever 在传统的检索器基础上增加了一个压缩步骤,使用 LLM 来提取与查询最相关的内容:

typescript
interface BaseCompressor {
  compressDocuments(
    documents: Document[],
    query: string
  ): Promise<Document[]>;
}

class ContextualCompressionRetriever implements Runnable<string, Document[]> {
  private baseRetriever: Runnable<string, Document[]>;
  private baseCompressor: BaseCompressor;
  
  constructor(
    baseRetriever: Runnable<string, Document[]>,
    baseCompressor: BaseCompressor
  ) {
    this.baseRetriever = baseRetriever;
    this.baseCompressor = baseCompressor;
  }
  
  async invoke(query: string, options?: RunnableConfig): Promise<Document[]> {
    // 第一步:使用基础检索器检索文档
    const documents = await this.baseRetriever.invoke(query, options);
    
    // 第二步:使用压缩器压缩文档
    const compressedDocuments = await this.baseCompressor.compressDocuments(documents, query);
    
    return compressedDocuments;
  }
  
  async batch(queries: string[], options?: RunnableConfig): Promise<Document[][]> {
    // 批量处理查询
    return await Promise.all(queries.map(query => this.invoke(query, options)));
  }
}

LLMChainExtractor 的实现

LLMChainExtractor 是最常用的压缩器之一,它使用 LLM 来提取与查询相关的内容:

typescript
class LLMChainExtractor implements BaseCompressor {
  private llm: BaseLanguageModel;
  private promptTemplate: PromptTemplate;
  
  constructor(llm: BaseLanguageModel, promptTemplate?: PromptTemplate) {
    this.llm = llm;
    this.promptTemplate = promptTemplate || this.getDefaultPromptTemplate();
  }
  
  async compressDocuments(documents: Document[], query: string): Promise<Document[]> {
    const compressedDocuments: Document[] = [];
    
    // 对每个文档进行压缩
    for (const document of documents) {
      try {
        const compressedContent = await this.extractRelevantContent(
          document.pageContent,
          query
        );
        
        if (compressedContent.trim()) {
          compressedDocuments.push({
            pageContent: compressedContent,
            metadata: {
              ...document.metadata,
              originalContentLength: document.pageContent.length,
              compressedContentLength: compressedContent.length,
              compressionRatio: compressedContent.length / document.pageContent.length
            }
          });
        }
      } catch (error) {
        console.warn('文档压缩失败,保留原始文档:', error);
        compressedDocuments.push(document);
      }
    }
    
    return compressedDocuments;
  }
  
  private async extractRelevantContent(content: string, query: string): Promise<string> {
    const prompt = await this.promptTemplate.format({
      content,
      query
    });
    
    const response = await this.llm.invoke(prompt);
    return response.trim();
  }
  
  private getDefaultPromptTemplate(): PromptTemplate {
    return new PromptTemplate({
      template: `从以下文档内容中提取与查询问题直接相关的信息。
如果文档内容与查询无关,则返回空字符串。

文档内容:
{content}

查询问题:
{query}

相关内容:`,
      inputVariables: ["content", "query"]
    });
  }
}

更智能的压缩策略

让我们实现一个更智能的压缩器,能够处理复杂场景:

typescript
interface CompressionOptions {
  maxTokens?: number;
  minCompressionRatio?: number;
  preserveMetadata?: boolean;
}

class SmartDocumentCompressor implements BaseCompressor {
  private llm: BaseLanguageModel;
  private maxTokens: number;
  private minCompressionRatio: number;
  private preserveMetadata: boolean;
  
  constructor(
    llm: BaseLanguageModel,
    options: CompressionOptions = {}
  ) {
    this.llm = llm;
    this.maxTokens = options.maxTokens || 1000;
    this.minCompressionRatio = options.minCompressionRatio || 0.3;
    this.preserveMetadata = options.preserveMetadata ?? true;
  }
  
  async compressDocuments(documents: Document[], query: string): Promise<Document[]> {
    const compressedDocuments: Document[] = [];
    
    for (const document of documents) {
      try {
        const compressed = await this.compressDocument(document, query);
        if (compressed) {
          compressedDocuments.push(compressed);
        }
      } catch (error) {
        console.warn('文档压缩失败:', error);
        // 如果压缩失败,根据配置决定是否保留原始文档
        if (this.shouldPreserveFailedDocuments()) {
          compressedDocuments.push(document);
        }
      }
    }
    
    return compressedDocuments;
  }
  
  private async compressDocument(document: Document, query: string): Promise<Document | null> {
    const { pageContent, metadata } = document;
    
    // 对于很短的文档,直接返回
    if (pageContent.length < 100) {
      return document;
    }
    
    // 使用 LLM 提取相关内容
    const extractedContent = await this.extractRelevantContent(pageContent, query);
    
    // 如果提取的内容太少,可能提取失败
    if (extractedContent.length < 10) {
      // 尝试另一种策略:摘要
      const summary = await this.summarizeContent(pageContent, query);
      if (summary.length > 10) {
        return {
          pageContent: summary,
          metadata: this.preserveMetadata ? {
            ...metadata,
            compressionType: 'summary',
            originalContentLength: pageContent.length,
            compressedContentLength: summary.length,
            compressionRatio: summary.length / pageContent.length
          } : {}
        };
      } else {
        return null; // 内容太不相关
      }
    }
    
    // 检查压缩比
    const compressionRatio = extractedContent.length / pageContent.length;
    if (compressionRatio < this.minCompressionRatio) {
      // 压缩比过低,可能丢失了太多信息,使用摘要代替
      const summary = await this.summarizeContent(pageContent, query);
      return {
        pageContent: summary,
        metadata: this.preserveMetadata ? {
          ...metadata,
          compressionType: 'summary',
          originalContentLength: pageContent.length,
          compressedContentLength: summary.length,
          compressionRatio: summary.length / pageContent.length
        } : {}
      };
    }
    
    return {
      pageContent: extractedContent,
      metadata: this.preserveMetadata ? {
        ...metadata,
        compressionType: 'extraction',
        originalContentLength: pageContent.length,
        compressedContentLength: extractedContent.length,
        compressionRatio
      } : {}
    };
  }
  
  private async extractRelevantContent(content: string, query: string): Promise<string> {
    const prompt = new PromptTemplate({
      template: `从以下文档中提取与查询问题直接相关的内容片段。
只返回相关的内容,不要添加解释或其他文本。

文档内容:
{content}

查询问题:
{query}

相关内容:`,
      inputVariables: ["content", "query"]
    });
    
    const response = await prompt.pipe(this.llm).pipe(new StringOutputParser()).invoke({
      content,
      query
    });
    
    return response;
  }
  
  private async summarizeContent(content: string, query: string): Promise<string> {
    const prompt = new PromptTemplate({
      template: `为以下查询问题总结文档内容。
提供一个简洁但信息丰富的摘要。

文档内容:
{content}

查询相关性:
{query}

文档摘要:`,
      inputVariables: ["content", "query"]
    });
    
    const response = await prompt.pipe(this.llm).pipe(new StringOutputParser()).invoke({
      content,
      query
    });
    
    return response;
  }
  
  private shouldPreserveFailedDocuments(): boolean {
    // 可以根据配置或上下文决定是否保留压缩失败的文档
    return true;
  }
}

多阶段压缩策略

实现一个多阶段的压缩策略,逐步优化压缩效果:

typescript
class MultiStageCompressor implements BaseCompressor {
  private compressors: BaseCompressor[];
  private strategy: 'sequential' | 'parallel' | 'adaptive';
  
  constructor(
    compressors: BaseCompressor[],
    strategy: 'sequential' | 'parallel' | 'adaptive' = 'sequential'
  ) {
    this.compressors = compressors;
    this.strategy = strategy;
  }
  
  async compressDocuments(documents: Document[], query: string): Promise<Document[]> {
    switch (this.strategy) {
      case 'sequential':
        return await this.compressSequential(documents, query);
      
      case 'parallel':
        return await this.compressParallel(documents, query);
      
      case 'adaptive':
        return await this.compressAdaptive(documents, query);
      
      default:
        return await this.compressSequential(documents, query);
    }
  }
  
  private async compressSequential(documents: Document[], query: string): Promise<Document[]> {
    let compressedDocs = documents;
    
    // 依次应用每个压缩器
    for (const compressor of this.compressors) {
      compressedDocs = await compressor.compressDocuments(compressedDocs, query);
    }
    
    return compressedDocs;
  }
  
  private async compressParallel(documents: Document[], query: string): Promise<Document[]> {
    // 并行应用所有压缩器
    const results = await Promise.all(
      this.compressors.map(compressor => 
        compressor.compressDocuments(documents, query)
      )
    );
    
    // 合并结果,去重并选择最佳内容
    return this.mergeCompressedResults(results, query);
  }
  
  private async compressAdaptive(documents: Document[], query: string): Promise<Document[]> {
    let currentDocs = documents;
    
    for (const compressor of this.compressors) {
      const compressed = await compressor.compressDocuments(currentDocs, query);
      
      // 评估压缩效果
      const quality = this.evaluateCompressionQuality(currentDocs, compressed);
      
      // 如果质量足够好,继续使用压缩结果
      if (quality > 0.7) {
        currentDocs = compressed;
      }
      // 否则保留原始文档或尝试其他策略
    }
    
    return currentDocs;
  }
  
  private mergeCompressedResults(results: Document[][], query: string): Document[] {
    // 实现结果合并逻辑
    // 这里简化处理,实际应用中可能需要更复杂的合并策略
    const merged: Document[] = [];
    const docMap = new Map<string, Document[]>();
    
    // 按文档ID分组
    results.flat().forEach(doc => {
      const docId = this.getDocumentId(doc);
      if (!docMap.has(docId)) {
        docMap.set(docId, []);
      }
      docMap.get(docId)!.push(doc);
    });
    
    // 对每组文档选择最佳版本
    for (const [_, docs] of docMap) {
      const bestDoc = this.selectBestDocument(docs, query);
      if (bestDoc) {
        merged.push(bestDoc);
      }
    }
    
    return merged;
  }
  
  private evaluateCompressionQuality(original: Document[], compressed: Document[]): number {
    if (original.length === 0) return 1;
    if (compressed.length === 0) return 0;
    
    // 计算平均压缩比
    const totalRatio = compressed.reduce((sum, doc) => {
      const originalDoc = original.find(d => this.getDocumentId(d) === this.getDocumentId(doc));
      if (originalDoc) {
        return sum + (doc.pageContent.length / originalDoc.pageContent.length);
      }
      return sum;
    }, 0);
    
    const avgRatio = totalRatio / compressed.length;
    
    // 压缩比在 0.3-0.7 之间被认为是较好的
    if (avgRatio >= 0.3 && avgRatio <= 0.7) {
      return 1;
    } else if (avgRatio > 0.7) {
      return 0.5; // 压缩效果一般
    } else {
      return 0.8; // 过度压缩
    }
  }
  
  private selectBestDocument(documents: Document[], query: string): Document | null {
    if (documents.length === 0) return null;
    if (documents.length === 1) return documents[0];
    
    // 选择压缩比最合适的文档
    return documents.reduce((best, current) => {
      const bestRatio = best.metadata?.compressionRatio || 1;
      const currentRatio = current.metadata?.compressionRatio || 1;
      
      // 寻找压缩比在 0.3-0.7 之间的最佳文档
      const bestScore = Math.abs(0.5 - bestRatio);
      const currentScore = Math.abs(0.5 - currentRatio);
      
      return currentScore < bestScore ? current : best;
    });
  }
  
  private getDocumentId(doc: Document): string {
    return doc.metadata?.id || this.hashCode(doc.pageContent);
  }
  
  private hashCode(str: string): string {
    let hash = 0;
    for (let i = 0; i < str.length; i++) {
      const char = str.charCodeAt(i);
      hash = ((hash << 5) - hash) + char;
      hash = hash & hash;
    }
    return Math.abs(hash).toString();
  }
}

实际应用示例

让我们看一个完整的实际应用示例:

typescript
// 创建一个智能文档问答系统
class SmartDocumentQASystem {
  private retriever: ContextualCompressionRetriever;
  private llm: BaseChatModel;
  
  constructor(retriever: ContextualCompressionRetriever, llm: BaseChatModel) {
    this.retriever = retriever;
    this.llm = llm;
  }
  
  async answerQuestion(question: string): Promise<{ answer: string; sources: Document[] }> {
    try {
      // 检索并压缩相关文档
      console.log('检索并压缩相关文档...');
      const documents = await this.retriever.invoke(question);
      
      console.log(`检索到 ${documents.length} 个相关文档`);
      documents.forEach((doc, index) => {
        console.log(`文档 ${index + 1}: 压缩比 ${doc.metadata.compressionRatio?.toFixed(2) || 'N/A'}`);
      });
      
      // 构建上下文
      const context = this.formatContext(documents);
      
      // 生成答案
      const prompt = new PromptTemplate({
        template: `基于以下文档内容回答问题。只使用文档中提供的信息,如果文档中没有相关信息,请说明无法回答。
        
文档内容:
{context}

问题: {question}

答案:`,
        inputVariables: ["context", "question"]
      });
      
      const answer = await prompt
        .pipe(this.llm)
        .pipe(new StringOutputParser())
        .invoke({
          context,
          question
        });
      
      return {
        answer,
        sources: documents
      };
    } catch (error) {
      console.error('问答系统错误:', error);
      return {
        answer: "抱歉,处理您的问题时出现了错误。",
        sources: []
      };
    }
  }
  
  private formatContext(documents: Document[]): string {
    return documents
      .map((doc, index) => {
        let docText = `[文档 ${index + 1}]\n${doc.pageContent}`;
        if (doc.metadata.source) {
          docText += `\n来源: ${doc.metadata.source}`;
        }
        if (doc.metadata.compressionType) {
          docText += `\n压缩类型: ${doc.metadata.compressionType}`;
        }
        return docText;
      })
      .join('\n\n');
  }
  
  // 流式问答
  async *streamAnswer(question: string): AsyncGenerator<{ 
    type: 'content' | 'sources' | 'complete'; 
    data: string | Document[] 
  }> {
    try {
      // 检索文档
      const documents = await this.retriever.invoke(question);
      yield { type: 'sources', data: documents };
      
      const context = this.formatContext(documents);
      
      // 流式生成答案
      const prompt = new PromptTemplate({
        template: `基于以下文档内容回答问题。只使用文档中提供的信息,如果文档中没有相关信息,请说明无法回答。
        
文档内容:
{context}

问题: {question}

答案:`,
        inputVariables: ["context", "question"]
      });
      
      const stream = await prompt
        .pipe(this.llm)
        .pipe(new StringOutputParser())
        .stream({
          context,
          question
        });
      
      for await (const chunk of stream) {
        yield { type: 'content', data: chunk };
      }
      
      yield { type: 'complete', data: '' };
    } catch (error) {
      console.error('流式问答错误:', error);
      yield { type: 'content', data: "抱歉,处理您的问题时出现了错误。" };
    }
  }
}

// 创建系统组件
async function createSmartQASystem() {
  // 创建基础向量存储和检索器
  const vectorStore = new MemoryVectorStore();
  
  // 添加示例文档
  const sampleDocuments: Document[] = [
    {
      pageContent: `LangChain框架介绍:
LangChain是一个开源框架,专门用于构建由大型语言模型(LLM)驱动的应用程序。它提供了一套工具和组件,帮助开发者将语言模型与其他数据源、计算逻辑和外部系统集成。

主要特性包括:
1. 组件化设计:提供可重用的组件,如提示模板、模型接口、索引等
2. 链式调用:支持将多个组件链接在一起形成处理链
3. 数据连接:支持连接各种数据源,包括文件、数据库、API等
4. 智能代理:提供构建自主代理的能力,可以执行复杂任务

LangChain支持多种编程语言,包括Python和JavaScript/TypeScript。`,
      metadata: { source: "langchain-intro", category: "framework" }
    },
    {
      pageContent: `LCEL (LangChain Expression Language) 是LangChain V3的重要特性:
LCEL是一种声明式语言,允许开发者使用管道操作符(|)将不同的组件连接在一起。这种设计使得构建复杂的处理链变得更加直观和简洁。

LCEL的核心概念:
1. Runnable接口:所有组件都实现Runnable接口
2. 管道操作:使用|操作符连接组件
3. 类型安全:提供完整的TypeScript类型支持
4. 流式处理:原生支持流式数据处理

示例代码:
const chain = promptTemplate | llm | outputParser;`,
      metadata: { source: "lcel-docs", category: "language" }
    },
    {
      pageContent: `VectorStoreRetriever是LangChain中的检索组件:
VectorStoreRetriever负责从向量存储中检索与查询相关的文档。它实现了Runnable<string, Document[]>接口,可以无缝集成到LCEL管道中。

主要功能:
1. 向量相似度搜索:基于嵌入向量进行相似度匹配
2. 多种搜索策略:支持相似度搜索、MMR等
3. 文档过滤:支持基于元数据的文档过滤
4. 批量处理:支持批量查询处理

使用示例:
const retriever = new VectorStoreRetriever({ vectorStore, k: 4 });`,
      metadata: { source: "retriever-docs", category: "retrieval" }
    }
  ];
  
  await vectorStore.addDocuments(sampleDocuments);
  
  const baseRetriever = new VectorStoreRetriever({
    vectorStore,
    k: 3
  });
  
  // 创建压缩器
  const compressor = new SmartDocumentCompressor(
    new ChatOpenAI({ modelName: "gpt-3.5-turbo" }),
    {
      maxTokens: 500,
      minCompressionRatio: 0.2
    }
  );
  
  // 创建上下文压缩检索器
  const compressionRetriever = new ContextualCompressionRetriever(
    baseRetriever,
    compressor
  );
  
  // 创建问答系统
  const qaSystem = new SmartDocumentQASystem(
    compressionRetriever,
    new ChatOpenAI({ modelName: "gpt-3.5-turbo" })
  );
  
  return qaSystem;
}

// 使用示例
async function demonstrateSmartQASystem() {
  const qaSystem = await createSmartQASystem();
  
  // 普通问答
  console.log('=== 智能文档问答系统演示 ===\n');
  
  const result = await qaSystem.answerQuestion("LangChain的主要特性是什么?");
  console.log('问题: LangChain的主要特性是什么?');
  console.log('答案:', result.answer);
  console.log('使用的文档数量:', result.sources.length);
  console.log();
  
  // 流式问答
  console.log('流式回答 "什么是LCEL?":');
  for await (const { type, data } of qaSystem.streamAnswer("什么是LCEL?")) {
    if (type === 'content') {
      process.stdout.write(data as string);
    }
  }
  console.log('\n');
}

与 LCEL 的集成

ContextualCompressionRetriever 可以无缝集成到 LCEL 管道中:

typescript
// 创建完整的压缩 RAG 链
async function createCompressedRAGChain() {
  // 创建组件
  const vectorStore = new MemoryVectorStore();
  const baseRetriever = new VectorStoreRetriever({ vectorStore, k: 4 });
  const compressor = new LLMChainExtractor(new ChatOpenAI());
  const compressionRetriever = new ContextualCompressionRetriever(baseRetriever, compressor);
  
  const prompt = new PromptTemplate({
    template: `使用以下文档内容回答问题:
    
文档:
{context}

问题: {question}

答案:`,
    inputVariables: ["context", "question"]
  });
  
  const llm = new ChatOpenAI({ modelName: "gpt-3.5-turbo" });
  const parser = new StringOutputParser();
  
  // 构建压缩 RAG 链
  const compressedRAGChain = 
    RunnableMap.from({
      question: (input: string) => input,
      context: compressionRetriever.pipe((docs: Document[]) => 
        docs.map(doc => doc.pageContent).join('\n\n')
      )
    })
    .pipe(prompt)
    .pipe(llm)
    .pipe(parser);
  
  return compressedRAGChain;
}

总结

ContextualCompressionRetriever 通过在检索后压缩上下文,显著提高了 RAG 应用的效果:

  1. 智能压缩 - 使用 LLMChainExtractor 等压缩器提取相关内容
  2. 多策略支持 - 支持提取、摘要等多种压缩策略
  3. 质量控制 - 通过压缩比等指标控制压缩质量
  4. 灵活集成 - 可以与任何基础检索器集成
  5. LCEL 兼容 - 无缝集成到 LCEL 管道中

通过这些特性,ContextualCompressionRetriever 使得 RAG 应用能够更精准地使用相关文档,提高生成质量和准确性。

在下一章中,我们将探讨 MultiQueryRetriever:如何提升召回率,了解如何通过生成多个查询来改善检索效果。