Token 计算与截断:如何集成 tiktoken 或 sentencepiece?
在构建 LLM 应用时,正确计算和管理 token 数量是至关重要的。不同的语言模型有不同的 token 限制,超出限制可能导致请求失败或额外的成本。LangChain V3 通过集成 tiktoken 和 sentencepiece 等工具,提供了强大的 token 计算和截断功能。本章将深入探讨如何在 LangChain 中实现 token 管理。
Token 计算的重要性
Token 计算是 LLM 应用中的关键环节,原因包括:
- 模型限制 - 大多数 LLM 都有上下文长度限制
- 成本控制 - token 数量直接影响 API 调用成本
- 性能优化 - 合理控制 token 数量可以提升响应速度
- 质量保证 - 避免因超出限制导致的截断问题
Tiktoken 集成
tiktoken 是 OpenAI 官方提供的 token 计算库,支持 OpenAI 的各种模型。
基础集成
typescript
// 安装: npm install tiktoken
import { encoding_for_model, get_encoding } from "tiktoken";
class TiktokenTokenizer {
private encoder: any;
constructor(modelName: string) {
try {
this.encoder = encoding_for_model(modelName);
} catch (error) {
console.warn(`模型 ${modelName} 不支持,使用默认编码`);
this.encoder = get_encoding("cl100k_base");
}
}
encode(text: string): number[] {
return this.encoder.encode(text);
}
decode(tokens: number[]): string {
return new TextDecoder().decode(this.encoder.decode(tokens));
}
countTokens(text: string): number {
return this.encode(text).length;
}
truncateText(text: string, maxTokens: number): string {
const tokens = this.encode(text);
if (tokens.length <= maxTokens) {
return text;
}
const truncatedTokens = tokens.slice(0, maxTokens);
return this.decode(truncatedTokens);
}
}在 LangChain 中集成 Tiktoken
typescript
class TokenAwareChatModel extends BaseChatModel {
private tokenizer: TiktokenTokenizer;
private maxContextTokens: number;
constructor(config: {
modelName: string;
apiKey: string;
maxContextTokens?: number;
}) {
super();
this.tokenizer = new TiktokenTokenizer(config.modelName);
this.maxContextTokens = config.maxContextTokens || 4096;
}
async generate(
messageSets: BaseMessage[][],
options?: BaseLanguageModelCallOptions
): Promise<LLMResult> {
// 计算消息的 token 数量
const tokenCounts = await Promise.all(
messageSets.map(messages => this.countMessageTokens(messages))
);
// 检查是否超出限制
for (let i = 0; i < tokenCounts.length; i++) {
if (tokenCounts[i] > this.maxContextTokens) {
// 截断消息以适应限制
messageSets[i] = await this.truncateMessages(
messageSets[i],
this.maxContextTokens
);
}
}
// 调用父类的生成方法
return await super.generate(messageSets, options);
}
private async countMessageTokens(messages: BaseMessage[]): Promise<number> {
let totalTokens = 0;
for (const message of messages) {
// 计算角色和内容的 token
totalTokens += this.tokenizer.countTokens(message.role);
totalTokens += this.tokenizer.countTokens(message.content);
// 添加每个消息的格式化开销(根据模型而定)
totalTokens += 4; // 每个消息的开销
}
// 添加回复的开销
totalTokens += 3;
return totalTokens;
}
private async truncateMessages(
messages: BaseMessage[],
maxTokens: number
): Promise<BaseMessage[]> {
// 保留系统消息(如果存在)
const systemMessages = messages.filter(m => m.role === 'system');
const nonSystemMessages = messages.filter(m => m.role !== 'system');
// 计算系统消息的 token 数量
let systemTokens = 0;
for (const msg of systemMessages) {
systemTokens += this.tokenizer.countTokens(msg.role);
systemTokens += this.tokenizer.countTokens(msg.content);
systemTokens += 4;
}
// 为系统消息保留空间
const availableTokens = maxTokens - systemTokens - 3;
// 从最新的消息开始保留
const truncatedMessages = [...systemMessages];
let usedTokens = systemTokens;
// 从后往前处理消息(保留最新的)
for (let i = nonSystemMessages.length - 1; i >= 0; i--) {
const message = nonSystemMessages[i];
const messageTokens = this.tokenizer.countTokens(message.role) +
this.tokenizer.countTokens(message.content) + 4;
if (usedTokens + messageTokens <= availableTokens) {
truncatedMessages.unshift(message);
usedTokens += messageTokens;
} else {
// 截断最后一个可以添加的消息
const remainingTokens = availableTokens - usedTokens;
if (remainingTokens > 10) { // 至少保留一些 token
const truncatedContent = this.tokenizer.truncateText(
message.content,
remainingTokens - 5
);
truncatedMessages.unshift({
role: message.role,
content: truncatedContent
});
}
break;
}
}
return truncatedMessages;
}
}SentencePiece 集成
对于使用 SentencePiece 的模型(如 LLaMA 系列),可以集成相应的库:
typescript
// 安装: npm install sentencepiece-js
import { SentencePieceProcessor } from 'sentencepiece-js';
class SentencePieceTokenizer {
private processor: SentencePieceProcessor;
constructor(modelPath: string) {
this.processor = new SentencePieceProcessor(modelPath);
}
async encode(text: string): Promise<number[]> {
return await this.processor.encode(text);
}
async decode(tokens: number[]): Promise<string> {
return await this.processor.decode(tokens);
}
async countTokens(text: string): Promise<number> {
const tokens = await this.encode(text);
return tokens.length;
}
async truncateText(text: string, maxTokens: number): Promise<string> {
const tokens = await this.encode(text);
if (tokens.length <= maxTokens) {
return text;
}
const truncatedTokens = tokens.slice(0, maxTokens);
return await this.decode(truncatedTokens);
}
}
class SentencePieceChatModel extends BaseChatModel {
private tokenizer: SentencePieceTokenizer;
private maxContextTokens: number;
constructor(config: {
modelPath: string;
modelName: string;
maxContextTokens?: number;
}) {
super();
this.tokenizer = new SentencePieceTokenizer(config.modelPath);
this.maxContextTokens = config.maxContextTokens || 2048;
}
// 类似于 Tiktoken 的实现...
}通用 Token 管理器
创建一个通用的 token 管理器来支持多种 tokenizer:
typescript
interface Tokenizer {
encode(text: string): Promise<number[]> | number[];
decode(tokens: number[]): Promise<string> | string;
countTokens(text: string): Promise<number> | number;
truncateText(text: string, maxTokens: number): Promise<string> | string;
}
class UniversalTokenManager {
private tokenizer: Tokenizer;
private modelName: string;
constructor(modelName: string, tokenizer?: Tokenizer) {
this.modelName = modelName;
if (tokenizer) {
this.tokenizer = tokenizer;
} else if (modelName.startsWith('gpt') || modelName.includes('openai')) {
this.tokenizer = new TiktokenTokenizer(modelName);
} else {
// 默认使用基本的字符计数(不准确但可用)
this.tokenizer = new BasicTokenizer();
}
}
async countTokens(text: string): Promise<number> {
return await Promise.resolve(this.tokenizer.countTokens(text));
}
async truncateText(text: string, maxTokens: number): Promise<string> {
return await Promise.resolve(this.tokenizer.truncateText(text, maxTokens));
}
async countMessageTokens(messages: BaseMessage[]): Promise<number> {
let totalTokens = 0;
for (const message of messages) {
const roleTokens = await this.countTokens(message.role);
const contentTokens = await this.countTokens(message.content);
totalTokens += roleTokens + contentTokens;
// 根据模型添加格式化开销
if (this.modelName.startsWith('gpt')) {
totalTokens += 4; // OpenAI 模型的开销
} else {
totalTokens += 3; // 其他模型的开销
}
}
// 添加回复开销
totalTokens += this.modelName.startsWith('gpt') ? 3 : 1;
return totalTokens;
}
async fitMessagesToLimit(
messages: BaseMessage[],
maxTokens: number
): Promise<BaseMessage[]> {
// 实现消息截断逻辑
let currentTokens = await this.countMessageTokens(messages);
if (currentTokens <= maxTokens) {
return messages;
}
// 优先保留系统消息
const systemMessages = messages.filter(m => m.role === 'system');
const otherMessages = messages.filter(m => m.role !== 'system');
let systemTokens = 0;
for (const msg of systemMessages) {
systemTokens += await this.countTokens(msg.role);
systemTokens += await this.countTokens(msg.content);
systemTokens += 4;
}
const availableTokens = maxTokens - systemTokens - 3;
const truncatedMessages = [...systemMessages];
let usedTokens = systemTokens;
// 从新到旧保留消息
for (let i = otherMessages.length - 1; i >= 0; i--) {
const message = otherMessages[i];
const messageTokens = (await this.countTokens(message.role)) +
(await this.countTokens(message.content)) + 4;
if (usedTokens + messageTokens <= availableTokens) {
truncatedMessages.unshift(message);
usedTokens += messageTokens;
} else {
// 尝试截断消息内容
const remainingTokens = availableTokens - usedTokens;
if (remainingTokens > 20) {
const contentTokens = await this.countTokens(message.content);
const maxContentTokens = remainingTokens - 10;
if (contentTokens > maxContentTokens) {
const truncatedContent = await this.tokenizer.truncateText(
message.content,
maxContentTokens
);
truncatedMessages.unshift({
role: message.role,
content: truncatedContent
});
} else {
truncatedMessages.unshift(message);
}
}
break;
}
}
return truncatedMessages;
}
}
class BasicTokenizer implements Tokenizer {
encode(text: string): number[] {
// 简单的基于空格的分词(仅用于演示)
return text.split(/\s+/).map((_, index) => index);
}
decode(tokens: number[]): string {
// 无法真正解码,返回占位符
return tokens.map(() => "token").join(" ");
}
countTokens(text: string): number {
// 简单估算:英文约 1.3 字符 per token,中文约 1 字符 per token
const englishChars = (text.match(/[a-zA-Z]/g) || []).length;
const chineseChars = (text.match(/[\u4e00-\u9fa5]/g) || []).length;
const otherChars = text.length - englishChars - chineseChars;
return Math.ceil(
(englishChars / 1.3 + chineseChars + otherChars) / 2
);
}
truncateText(text: string, maxTokens: number): string {
// 简单的字符截断
const estimatedChars = maxTokens * 2;
return text.slice(0, estimatedChars);
}
}在 PromptTemplate 中集成 Token 管理
将 token 管理集成到 PromptTemplate 中:
typescript
class TokenAwarePromptTemplate extends PromptTemplate {
private tokenManager: UniversalTokenManager;
private maxTokens?: number;
constructor(config: {
template: string;
inputVariables: string[];
modelName: string;
maxTokens?: number;
}) {
super({ template: config.template, inputVariables: config.inputVariables });
this.tokenManager = new UniversalTokenManager(config.modelName);
this.maxTokens = config.maxTokens;
}
async format(input: Record<string, any>): Promise<string> {
let formatted = super.format(input);
// 如果设置了最大 token 数量,则进行截断
if (this.maxTokens) {
formatted = await this.tokenManager.truncateText(formatted, this.maxTokens);
}
return formatted;
}
async countTokens(input: Record<string, any>): Promise<number> {
const formatted = super.format(input);
return await this.tokenManager.countTokens(formatted);
}
}
// 使用示例
const tokenAwareTemplate = new TokenAwarePromptTemplate({
template: "请总结以下文本:\n{text}\n\n总结:",
inputVariables: ["text"],
modelName: "gpt-3.5-turbo",
maxTokens: 2000
});
const longText = "很长的文本..."; // 假设这是一个很长的文本
const prompt = await tokenAwareTemplate.format({ text: longText });
const tokenCount = await tokenAwareTemplate.countTokens({ text: longText });实际应用示例:智能文档摘要
让我们看一个完整的实际应用示例,展示如何在文档摘要应用中使用 token 管理:
typescript
class DocumentSummarizer {
private model: BaseChatModel;
private tokenManager: UniversalTokenManager;
private maxContextTokens: number;
constructor(model: BaseChatModel, modelName: string, maxContextTokens: number = 4096) {
this.model = model;
this.tokenManager = new UniversalTokenManager(modelName);
this.maxContextTokens = maxContextTokens;
}
async summarize(document: string, targetLength: 'short' | 'medium' | 'long' = 'medium'): Promise<string> {
// 计算文档的 token 数量
const docTokens = await this.tokenManager.countTokens(document);
// 确定摘要提示的最大 token 数量
const maxPromptTokens = this.maxContextTokens - 500; // 为输出保留空间
let prompt: string;
if (docTokens <= maxPromptTokens) {
// 文档可以直接处理
prompt = this.createSummaryPrompt(document, targetLength);
} else {
// 需要分块处理
return await this.summarizeByChunks(document, targetLength, maxPromptTokens);
}
// 生成摘要
const messages: BaseMessage[] = [
{ role: 'system', content: '你是一个专业的文档摘要助手。' },
{ role: 'user', content: prompt }
];
const result = await this.model.invoke(messages);
return result;
}
private createSummaryPrompt(document: string, targetLength: string): string {
const lengthInstructions = {
short: '请提供一个非常简洁的摘要,不超过3句话。',
medium: '请提供一个中等长度的摘要,大约5-8句话。',
long: '请提供一个详细的摘要,涵盖文档的主要观点。'
};
return `请阅读以下文档并${lengthInstructions[targetLength]}
文档内容:
${document}
摘要:`;
}
private async summarizeByChunks(
document: string,
targetLength: string,
maxTokens: number
): Promise<string> {
// 将文档分割成适合的块
const chunks = await this.splitDocumentIntoChunks(document, maxTokens - 500);
// 分别摘要每个块
const chunkSummaries = await Promise.all(
chunks.map(async (chunk, index) => {
console.log(`处理第 ${index + 1}/${chunks.length} 块`);
return await this.summarize(chunk, targetLength);
})
);
// 如果只有一个块,直接返回
if (chunkSummaries.length === 1) {
return chunkSummaries[0];
}
// 合并摘要并生成最终摘要
const combinedSummary = chunkSummaries.join('\n\n');
const finalPrompt = `以下是对一个长文档各部分的摘要,请基于这些摘要生成一个综合的摘要:
各部分摘要:
${combinedSummary}
综合摘要:`;
const messages: BaseMessage[] = [
{ role: 'system', content: '你是一个专业的文档摘要助手。' },
{ role: 'user', content: finalPrompt }
];
return await this.model.invoke(messages);
}
private async splitDocumentIntoChunks(document: string, maxTokens: number): Promise<string[]> {
const paragraphs = document.split('\n\n');
const chunks: string[] = [];
let currentChunk = '';
let currentTokens = 0;
for (const paragraph of paragraphs) {
const paragraphTokens = await this.tokenManager.countTokens(paragraph);
if (currentTokens + paragraphTokens <= maxTokens) {
currentChunk += (currentChunk ? '\n\n' : '') + paragraph;
currentTokens += paragraphTokens;
} else {
// 当前段落会使块超出限制
if (currentChunk) {
chunks.push(currentChunk);
}
if (paragraphTokens <= maxTokens) {
// 段落可以单独作为一个块
currentChunk = paragraph;
currentTokens = paragraphTokens;
} else {
// 段落太大,需要进一步分割
const sentences = paragraph.split(/(?<=[.!?])\s+/);
let subChunk = '';
let subTokens = 0;
for (const sentence of sentences) {
const sentenceTokens = await this.tokenManager.countTokens(sentence);
if (subTokens + sentenceTokens <= maxTokens) {
subChunk += (subChunk ? ' ' : '') + sentence;
subTokens += sentenceTokens;
} else {
if (subChunk) {
chunks.push(subChunk);
}
subChunk = sentence;
subTokens = sentenceTokens;
}
}
currentChunk = subChunk;
currentTokens = subTokens;
}
}
}
if (currentChunk) {
chunks.push(currentChunk);
}
return chunks;
}
}
// 使用示例
const summarizer = new DocumentSummarizer(
new ChatOpenAI({ modelName: "gpt-3.5-turbo" }),
"gpt-3.5-turbo",
4096
);
const longDocument = `
这是一个很长的文档...
包含多个段落和大量内容...
需要被智能地摘要...
`;
const summary = await summarizer.summarize(longDocument, 'medium');
console.log('文档摘要:', summary);总结
Token 计算与截断是构建生产级 LLM 应用的关键技术。通过集成 tiktoken、sentencepiece 等工具,LangChain V3 提供了强大的 token 管理能力:
- 精确计算 - 准确计算文本的 token 数量
- 智能截断 - 在超出限制时智能截断内容
- 上下文管理 - 合理管理消息历史和上下文
- 多模型支持 - 支持不同模型的 token 化策略
- 性能优化 - 避免因超出限制导致的错误和额外成本
通过这些机制,开发者可以构建更加健壮和高效的 LLM 应用,确保在各种场景下都能正常工作并控制成本。
在下一章中,我们将探讨 OutputParser:从"后处理字符串"到"结构化输出生成器",了解如何将 LLM 的文本输出转换为结构化数据。