Skip to content

Token 计算与截断:如何集成 tiktokensentencepiece

在构建 LLM 应用时,正确计算和管理 token 数量是至关重要的。不同的语言模型有不同的 token 限制,超出限制可能导致请求失败或额外的成本。LangChain V3 通过集成 tiktokensentencepiece 等工具,提供了强大的 token 计算和截断功能。本章将深入探讨如何在 LangChain 中实现 token 管理。

Token 计算的重要性

Token 计算是 LLM 应用中的关键环节,原因包括:

  1. 模型限制 - 大多数 LLM 都有上下文长度限制
  2. 成本控制 - token 数量直接影响 API 调用成本
  3. 性能优化 - 合理控制 token 数量可以提升响应速度
  4. 质量保证 - 避免因超出限制导致的截断问题

Tiktoken 集成

tiktoken 是 OpenAI 官方提供的 token 计算库,支持 OpenAI 的各种模型。

基础集成

typescript
// 安装: npm install tiktoken
import { encoding_for_model, get_encoding } from "tiktoken";

class TiktokenTokenizer {
  private encoder: any;
  
  constructor(modelName: string) {
    try {
      this.encoder = encoding_for_model(modelName);
    } catch (error) {
      console.warn(`模型 ${modelName} 不支持,使用默认编码`);
      this.encoder = get_encoding("cl100k_base");
    }
  }
  
  encode(text: string): number[] {
    return this.encoder.encode(text);
  }
  
  decode(tokens: number[]): string {
    return new TextDecoder().decode(this.encoder.decode(tokens));
  }
  
  countTokens(text: string): number {
    return this.encode(text).length;
  }
  
  truncateText(text: string, maxTokens: number): string {
    const tokens = this.encode(text);
    if (tokens.length <= maxTokens) {
      return text;
    }
    
    const truncatedTokens = tokens.slice(0, maxTokens);
    return this.decode(truncatedTokens);
  }
}

在 LangChain 中集成 Tiktoken

typescript
class TokenAwareChatModel extends BaseChatModel {
  private tokenizer: TiktokenTokenizer;
  private maxContextTokens: number;
  
  constructor(config: {
    modelName: string;
    apiKey: string;
    maxContextTokens?: number;
  }) {
    super();
    this.tokenizer = new TiktokenTokenizer(config.modelName);
    this.maxContextTokens = config.maxContextTokens || 4096;
  }
  
  async generate(
    messageSets: BaseMessage[][],
    options?: BaseLanguageModelCallOptions
  ): Promise<LLMResult> {
    // 计算消息的 token 数量
    const tokenCounts = await Promise.all(
      messageSets.map(messages => this.countMessageTokens(messages))
    );
    
    // 检查是否超出限制
    for (let i = 0; i < tokenCounts.length; i++) {
      if (tokenCounts[i] > this.maxContextTokens) {
        // 截断消息以适应限制
        messageSets[i] = await this.truncateMessages(
          messageSets[i], 
          this.maxContextTokens
        );
      }
    }
    
    // 调用父类的生成方法
    return await super.generate(messageSets, options);
  }
  
  private async countMessageTokens(messages: BaseMessage[]): Promise<number> {
    let totalTokens = 0;
    
    for (const message of messages) {
      // 计算角色和内容的 token
      totalTokens += this.tokenizer.countTokens(message.role);
      totalTokens += this.tokenizer.countTokens(message.content);
      
      // 添加每个消息的格式化开销(根据模型而定)
      totalTokens += 4; // 每个消息的开销
    }
    
    // 添加回复的开销
    totalTokens += 3;
    
    return totalTokens;
  }
  
  private async truncateMessages(
    messages: BaseMessage[], 
    maxTokens: number
  ): Promise<BaseMessage[]> {
    // 保留系统消息(如果存在)
    const systemMessages = messages.filter(m => m.role === 'system');
    const nonSystemMessages = messages.filter(m => m.role !== 'system');
    
    // 计算系统消息的 token 数量
    let systemTokens = 0;
    for (const msg of systemMessages) {
      systemTokens += this.tokenizer.countTokens(msg.role);
      systemTokens += this.tokenizer.countTokens(msg.content);
      systemTokens += 4;
    }
    
    // 为系统消息保留空间
    const availableTokens = maxTokens - systemTokens - 3;
    
    // 从最新的消息开始保留
    const truncatedMessages = [...systemMessages];
    let usedTokens = systemTokens;
    
    // 从后往前处理消息(保留最新的)
    for (let i = nonSystemMessages.length - 1; i >= 0; i--) {
      const message = nonSystemMessages[i];
      const messageTokens = this.tokenizer.countTokens(message.role) + 
                           this.tokenizer.countTokens(message.content) + 4;
      
      if (usedTokens + messageTokens <= availableTokens) {
        truncatedMessages.unshift(message);
        usedTokens += messageTokens;
      } else {
        // 截断最后一个可以添加的消息
        const remainingTokens = availableTokens - usedTokens;
        if (remainingTokens > 10) { // 至少保留一些 token
          const truncatedContent = this.tokenizer.truncateText(
            message.content, 
            remainingTokens - 5
          );
          
          truncatedMessages.unshift({
            role: message.role,
            content: truncatedContent
          });
        }
        break;
      }
    }
    
    return truncatedMessages;
  }
}

SentencePiece 集成

对于使用 SentencePiece 的模型(如 LLaMA 系列),可以集成相应的库:

typescript
// 安装: npm install sentencepiece-js
import { SentencePieceProcessor } from 'sentencepiece-js';

class SentencePieceTokenizer {
  private processor: SentencePieceProcessor;
  
  constructor(modelPath: string) {
    this.processor = new SentencePieceProcessor(modelPath);
  }
  
  async encode(text: string): Promise<number[]> {
    return await this.processor.encode(text);
  }
  
  async decode(tokens: number[]): Promise<string> {
    return await this.processor.decode(tokens);
  }
  
  async countTokens(text: string): Promise<number> {
    const tokens = await this.encode(text);
    return tokens.length;
  }
  
  async truncateText(text: string, maxTokens: number): Promise<string> {
    const tokens = await this.encode(text);
    if (tokens.length <= maxTokens) {
      return text;
    }
    
    const truncatedTokens = tokens.slice(0, maxTokens);
    return await this.decode(truncatedTokens);
  }
}

class SentencePieceChatModel extends BaseChatModel {
  private tokenizer: SentencePieceTokenizer;
  private maxContextTokens: number;
  
  constructor(config: {
    modelPath: string;
    modelName: string;
    maxContextTokens?: number;
  }) {
    super();
    this.tokenizer = new SentencePieceTokenizer(config.modelPath);
    this.maxContextTokens = config.maxContextTokens || 2048;
  }
  
  // 类似于 Tiktoken 的实现...
}

通用 Token 管理器

创建一个通用的 token 管理器来支持多种 tokenizer:

typescript
interface Tokenizer {
  encode(text: string): Promise<number[]> | number[];
  decode(tokens: number[]): Promise<string> | string;
  countTokens(text: string): Promise<number> | number;
  truncateText(text: string, maxTokens: number): Promise<string> | string;
}

class UniversalTokenManager {
  private tokenizer: Tokenizer;
  private modelName: string;
  
  constructor(modelName: string, tokenizer?: Tokenizer) {
    this.modelName = modelName;
    
    if (tokenizer) {
      this.tokenizer = tokenizer;
    } else if (modelName.startsWith('gpt') || modelName.includes('openai')) {
      this.tokenizer = new TiktokenTokenizer(modelName);
    } else {
      // 默认使用基本的字符计数(不准确但可用)
      this.tokenizer = new BasicTokenizer();
    }
  }
  
  async countTokens(text: string): Promise<number> {
    return await Promise.resolve(this.tokenizer.countTokens(text));
  }
  
  async truncateText(text: string, maxTokens: number): Promise<string> {
    return await Promise.resolve(this.tokenizer.truncateText(text, maxTokens));
  }
  
  async countMessageTokens(messages: BaseMessage[]): Promise<number> {
    let totalTokens = 0;
    
    for (const message of messages) {
      const roleTokens = await this.countTokens(message.role);
      const contentTokens = await this.countTokens(message.content);
      totalTokens += roleTokens + contentTokens;
      
      // 根据模型添加格式化开销
      if (this.modelName.startsWith('gpt')) {
        totalTokens += 4; // OpenAI 模型的开销
      } else {
        totalTokens += 3; // 其他模型的开销
      }
    }
    
    // 添加回复开销
    totalTokens += this.modelName.startsWith('gpt') ? 3 : 1;
    
    return totalTokens;
  }
  
  async fitMessagesToLimit(
    messages: BaseMessage[], 
    maxTokens: number
  ): Promise<BaseMessage[]> {
    // 实现消息截断逻辑
    let currentTokens = await this.countMessageTokens(messages);
    
    if (currentTokens <= maxTokens) {
      return messages;
    }
    
    // 优先保留系统消息
    const systemMessages = messages.filter(m => m.role === 'system');
    const otherMessages = messages.filter(m => m.role !== 'system');
    
    let systemTokens = 0;
    for (const msg of systemMessages) {
      systemTokens += await this.countTokens(msg.role);
      systemTokens += await this.countTokens(msg.content);
      systemTokens += 4;
    }
    
    const availableTokens = maxTokens - systemTokens - 3;
    const truncatedMessages = [...systemMessages];
    let usedTokens = systemTokens;
    
    // 从新到旧保留消息
    for (let i = otherMessages.length - 1; i >= 0; i--) {
      const message = otherMessages[i];
      const messageTokens = (await this.countTokens(message.role)) + 
                           (await this.countTokens(message.content)) + 4;
      
      if (usedTokens + messageTokens <= availableTokens) {
        truncatedMessages.unshift(message);
        usedTokens += messageTokens;
      } else {
        // 尝试截断消息内容
        const remainingTokens = availableTokens - usedTokens;
        if (remainingTokens > 20) {
          const contentTokens = await this.countTokens(message.content);
          const maxContentTokens = remainingTokens - 10;
          
          if (contentTokens > maxContentTokens) {
            const truncatedContent = await this.tokenizer.truncateText(
              message.content, 
              maxContentTokens
            );
            
            truncatedMessages.unshift({
              role: message.role,
              content: truncatedContent
            });
          } else {
            truncatedMessages.unshift(message);
          }
        }
        break;
      }
    }
    
    return truncatedMessages;
  }
}

class BasicTokenizer implements Tokenizer {
  encode(text: string): number[] {
    // 简单的基于空格的分词(仅用于演示)
    return text.split(/\s+/).map((_, index) => index);
  }
  
  decode(tokens: number[]): string {
    // 无法真正解码,返回占位符
    return tokens.map(() => "token").join(" ");
  }
  
  countTokens(text: string): number {
    // 简单估算:英文约 1.3 字符 per token,中文约 1 字符 per token
    const englishChars = (text.match(/[a-zA-Z]/g) || []).length;
    const chineseChars = (text.match(/[\u4e00-\u9fa5]/g) || []).length;
    const otherChars = text.length - englishChars - chineseChars;
    
    return Math.ceil(
      (englishChars / 1.3 + chineseChars + otherChars) / 2
    );
  }
  
  truncateText(text: string, maxTokens: number): string {
    // 简单的字符截断
    const estimatedChars = maxTokens * 2;
    return text.slice(0, estimatedChars);
  }
}

在 PromptTemplate 中集成 Token 管理

将 token 管理集成到 PromptTemplate 中:

typescript
class TokenAwarePromptTemplate extends PromptTemplate {
  private tokenManager: UniversalTokenManager;
  private maxTokens?: number;
  
  constructor(config: {
    template: string;
    inputVariables: string[];
    modelName: string;
    maxTokens?: number;
  }) {
    super({ template: config.template, inputVariables: config.inputVariables });
    this.tokenManager = new UniversalTokenManager(config.modelName);
    this.maxTokens = config.maxTokens;
  }
  
  async format(input: Record<string, any>): Promise<string> {
    let formatted = super.format(input);
    
    // 如果设置了最大 token 数量,则进行截断
    if (this.maxTokens) {
      formatted = await this.tokenManager.truncateText(formatted, this.maxTokens);
    }
    
    return formatted;
  }
  
  async countTokens(input: Record<string, any>): Promise<number> {
    const formatted = super.format(input);
    return await this.tokenManager.countTokens(formatted);
  }
}

// 使用示例
const tokenAwareTemplate = new TokenAwarePromptTemplate({
  template: "请总结以下文本:\n{text}\n\n总结:",
  inputVariables: ["text"],
  modelName: "gpt-3.5-turbo",
  maxTokens: 2000
});

const longText = "很长的文本..."; // 假设这是一个很长的文本
const prompt = await tokenAwareTemplate.format({ text: longText });
const tokenCount = await tokenAwareTemplate.countTokens({ text: longText });

实际应用示例:智能文档摘要

让我们看一个完整的实际应用示例,展示如何在文档摘要应用中使用 token 管理:

typescript
class DocumentSummarizer {
  private model: BaseChatModel;
  private tokenManager: UniversalTokenManager;
  private maxContextTokens: number;
  
  constructor(model: BaseChatModel, modelName: string, maxContextTokens: number = 4096) {
    this.model = model;
    this.tokenManager = new UniversalTokenManager(modelName);
    this.maxContextTokens = maxContextTokens;
  }
  
  async summarize(document: string, targetLength: 'short' | 'medium' | 'long' = 'medium'): Promise<string> {
    // 计算文档的 token 数量
    const docTokens = await this.tokenManager.countTokens(document);
    
    // 确定摘要提示的最大 token 数量
    const maxPromptTokens = this.maxContextTokens - 500; // 为输出保留空间
    
    let prompt: string;
    
    if (docTokens <= maxPromptTokens) {
      // 文档可以直接处理
      prompt = this.createSummaryPrompt(document, targetLength);
    } else {
      // 需要分块处理
      return await this.summarizeByChunks(document, targetLength, maxPromptTokens);
    }
    
    // 生成摘要
    const messages: BaseMessage[] = [
      { role: 'system', content: '你是一个专业的文档摘要助手。' },
      { role: 'user', content: prompt }
    ];
    
    const result = await this.model.invoke(messages);
    return result;
  }
  
  private createSummaryPrompt(document: string, targetLength: string): string {
    const lengthInstructions = {
      short: '请提供一个非常简洁的摘要,不超过3句话。',
      medium: '请提供一个中等长度的摘要,大约5-8句话。',
      long: '请提供一个详细的摘要,涵盖文档的主要观点。'
    };
    
    return `请阅读以下文档并${lengthInstructions[targetLength]}

文档内容:
${document}

摘要:`;
  }
  
  private async summarizeByChunks(
    document: string, 
    targetLength: string, 
    maxTokens: number
  ): Promise<string> {
    // 将文档分割成适合的块
    const chunks = await this.splitDocumentIntoChunks(document, maxTokens - 500);
    
    // 分别摘要每个块
    const chunkSummaries = await Promise.all(
      chunks.map(async (chunk, index) => {
        console.log(`处理第 ${index + 1}/${chunks.length} 块`);
        return await this.summarize(chunk, targetLength);
      })
    );
    
    // 如果只有一个块,直接返回
    if (chunkSummaries.length === 1) {
      return chunkSummaries[0];
    }
    
    // 合并摘要并生成最终摘要
    const combinedSummary = chunkSummaries.join('\n\n');
    const finalPrompt = `以下是对一个长文档各部分的摘要,请基于这些摘要生成一个综合的摘要:
    
各部分摘要:
${combinedSummary}

综合摘要:`;
    
    const messages: BaseMessage[] = [
      { role: 'system', content: '你是一个专业的文档摘要助手。' },
      { role: 'user', content: finalPrompt }
    ];
    
    return await this.model.invoke(messages);
  }
  
  private async splitDocumentIntoChunks(document: string, maxTokens: number): Promise<string[]> {
    const paragraphs = document.split('\n\n');
    const chunks: string[] = [];
    let currentChunk = '';
    let currentTokens = 0;
    
    for (const paragraph of paragraphs) {
      const paragraphTokens = await this.tokenManager.countTokens(paragraph);
      
      if (currentTokens + paragraphTokens <= maxTokens) {
        currentChunk += (currentChunk ? '\n\n' : '') + paragraph;
        currentTokens += paragraphTokens;
      } else {
        // 当前段落会使块超出限制
        if (currentChunk) {
          chunks.push(currentChunk);
        }
        
        if (paragraphTokens <= maxTokens) {
          // 段落可以单独作为一个块
          currentChunk = paragraph;
          currentTokens = paragraphTokens;
        } else {
          // 段落太大,需要进一步分割
          const sentences = paragraph.split(/(?<=[.!?])\s+/);
          let subChunk = '';
          let subTokens = 0;
          
          for (const sentence of sentences) {
            const sentenceTokens = await this.tokenManager.countTokens(sentence);
            
            if (subTokens + sentenceTokens <= maxTokens) {
              subChunk += (subChunk ? ' ' : '') + sentence;
              subTokens += sentenceTokens;
            } else {
              if (subChunk) {
                chunks.push(subChunk);
              }
              subChunk = sentence;
              subTokens = sentenceTokens;
            }
          }
          
          currentChunk = subChunk;
          currentTokens = subTokens;
        }
      }
    }
    
    if (currentChunk) {
      chunks.push(currentChunk);
    }
    
    return chunks;
  }
}

// 使用示例
const summarizer = new DocumentSummarizer(
  new ChatOpenAI({ modelName: "gpt-3.5-turbo" }),
  "gpt-3.5-turbo",
  4096
);

const longDocument = `
这是一个很长的文档...
包含多个段落和大量内容...
需要被智能地摘要...
`;

const summary = await summarizer.summarize(longDocument, 'medium');
console.log('文档摘要:', summary);

总结

Token 计算与截断是构建生产级 LLM 应用的关键技术。通过集成 tiktokensentencepiece 等工具,LangChain V3 提供了强大的 token 管理能力:

  1. 精确计算 - 准确计算文本的 token 数量
  2. 智能截断 - 在超出限制时智能截断内容
  3. 上下文管理 - 合理管理消息历史和上下文
  4. 多模型支持 - 支持不同模型的 token 化策略
  5. 性能优化 - 避免因超出限制导致的错误和额外成本

通过这些机制,开发者可以构建更加健壮和高效的 LLM 应用,确保在各种场景下都能正常工作并控制成本。

在下一章中,我们将探讨 OutputParser:从"后处理字符串"到"结构化输出生成器",了解如何将 LLM 的文本输出转换为结构化数据。