微服务化:将不同 Runnable 部署为独立服务,通过 HTTP/gRPC 调用
在构建大型 LLM 应用时,微服务架构可以提供更好的可扩展性、可维护性和部署灵活性。LangChain V3 的 Runnable 接口为微服务化提供了天然的支持,使得不同的组件可以独立部署并通过 HTTP/gRPC 进行通信。本章将深入探讨如何将 LangChain 组件微服务化。
Runnable 微服务基础
Runnable 接口的标准化使得微服务化变得简单直接:
typescript
// 微服务化的 Runnable 基础接口
interface RunnableService<Input, Output> {
invoke(input: Input): Promise<Output>;
batch(inputs: Input[]): Promise<Output[]>;
stream?(input: Input): AsyncGenerator<Output>;
}
// HTTP 服务客户端
class HttpRunnableClient<Input, Output> implements Runnable<Input, Output> {
private baseUrl: string;
private serviceName: string;
constructor(baseUrl: string, serviceName: string) {
this.baseUrl = baseUrl;
this.serviceName = serviceName;
}
async invoke(input: Input): Promise<Output> {
const response = await fetch(`${this.baseUrl}/${this.serviceName}/invoke`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ input }),
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const result = await response.json();
return result.output;
}
async batch(inputs: Input[]): Promise<Output[]> {
const response = await fetch(`${this.baseUrl}/${this.serviceName}/batch`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ inputs }),
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const result = await response.json();
return result.outputs;
}
async *stream(input: Input): AsyncGenerator<Output> {
const response = await fetch(`${this.baseUrl}/${this.serviceName}/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ input }),
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
if (!response.body) {
throw new Error('响应体为空');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
const result = JSON.parse(data);
yield result.output;
}
}
}
} finally {
reader.releaseLock();
}
}
}gRPC 服务实现
使用 gRPC 实现更高效的微服务通信:
typescript
// gRPC 服务定义 (protobuf)
/*
syntax = "proto3";
package langchain;
service RunnableService {
rpc Invoke(InvokeRequest) returns (InvokeResponse);
rpc Batch(BatchRequest) returns (BatchResponse);
rpc Stream(StreamRequest) returns (stream StreamResponse);
}
message InvokeRequest {
bytes input = 1;
}
message InvokeResponse {
bytes output = 1;
}
message BatchRequest {
repeated bytes inputs = 1;
}
message BatchResponse {
repeated bytes outputs = 1;
}
message StreamRequest {
bytes input = 1;
}
message StreamResponse {
bytes output = 1;
}
*/
// gRPC 服务端实现
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
class GrpcRunnableService {
private runnable: Runnable<any, any>;
constructor(runnable: Runnable<any, any>) {
this.runnable = runnable;
}
async invoke(call: any, callback: any): Promise<void> {
try {
const input = JSON.parse(call.request.input);
const output = await this.runnable.invoke(input);
callback(null, { output: JSON.stringify(output) });
} catch (error) {
callback(error, null);
}
}
async batch(call: any, callback: any): Promise<void> {
try {
const inputs = call.request.inputs.map((input: string) => JSON.parse(input));
const outputs = await this.runnable.batch(inputs);
callback(null, {
outputs: outputs.map(output => JSON.stringify(output))
});
} catch (error) {
callback(error, null);
}
}
async *stream(call: any): AsyncGenerator<void> {
try {
const input = JSON.parse(call.request.input);
const stream = await this.runnable.stream(input);
for await (const output of stream) {
call.write({ output: JSON.stringify(output) });
}
call.end();
} catch (error) {
call.emit('error', error);
}
}
}
// gRPC 客户端
class GrpcRunnableClient<Input, Output> implements Runnable<Input, Output> {
private client: any; // gRPC 客户端
constructor(address: string, serviceName: string) {
// 初始化 gRPC 客户端
const packageDefinition = protoLoader.loadSync('runnable.proto');
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
// this.client = new protoDescriptor.langchain.RunnableService(
// address,
// grpc.credentials.createInsecure()
// );
}
async invoke(input: Input): Promise<Output> {
// const result = await new Promise((resolve, reject) => {
// this.client.invoke({ input: JSON.stringify(input) }, (error: any, response: any) => {
// if (error) {
// reject(error);
// } else {
// resolve(JSON.parse(response.output));
// }
// });
// });
// return result as Output;
throw new Error('gRPC 客户端实现需要根据具体 proto 文件完成');
}
async batch(inputs: Input[]): Promise<Output[]> {
// const result = await new Promise((resolve, reject) => {
// this.client.batch({
// inputs: inputs.map(input => JSON.stringify(input))
// }, (error: any, response: any) => {
// if (error) {
// reject(error);
// } else {
// resolve(response.outputs.map((output: string) => JSON.parse(output)));
// }
// });
// });
// return result as Output[];
throw new Error('gRPC 客户端实现需要根据具体 proto 文件完成');
}
async *stream(input: Input): AsyncGenerator<Output> {
// gRPC 流式调用实现
throw new Error('gRPC 流式调用实现需要根据具体 proto 文件完成');
}
}微服务化具体组件
将具体的 LangChain 组件微服务化:
typescript
// LLM 服务
class LLMService extends BaseChatModel {
private modelName: string;
private apiKey: string;
constructor(modelName: string, apiKey: string) {
super();
this.modelName = modelName;
this.apiKey = apiKey;
}
async _call(messages: BaseMessage[]): Promise<string> {
// 调用实际的 LLM API
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: this.modelName,
messages: messages,
stream: false,
}),
});
const data = await response.json();
return data.choices[0].message.content;
}
async *_streamResponseChunks(
messages: BaseMessage[]
): AsyncGenerator<ChatGeneration> {
// 实现流式响应
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: this.modelName,
messages: messages,
stream: true,
}),
});
if (!response.body) {
throw new Error('响应体为空');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
return;
}
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content;
if (content) {
yield {
text: content,
message: new AIMessage(content),
};
}
} catch (error) {
// 忽略解析错误
}
}
}
}
} finally {
reader.releaseLock();
}
}
}
// 提示模板服务
class PromptTemplateService extends PromptTemplate {
// 提示模板服务可以包含额外的业务逻辑
// 例如:模板版本管理、A/B 测试等
async formatWithVersion(input: any, version?: string): Promise<string> {
// 根据版本选择不同的模板
if (version) {
// 这里可以实现版本管理逻辑
console.log(`使用模板版本: ${version}`);
}
return await this.format(input);
}
}
// 检索服务
class RetrievalService {
private vectorStore: VectorStore;
constructor(vectorStore: VectorStore) {
this.vectorStore = vectorStore;
}
async search(query: string, k: number = 4): Promise<Document[]> {
return await this.vectorStore.similaritySearch(query, k);
}
async mmrSearch(
query: string,
k: number = 4,
fetchK: number = 20
): Promise<Document[]> {
return await this.vectorStore.maxMarginalRelevanceSearch(query, k, fetchK);
}
}服务注册与发现
实现服务注册与发现机制:
typescript
// 服务注册表
interface ServiceInfo {
name: string;
address: string;
port: number;
protocol: 'http' | 'grpc';
version: string;
tags: string[];
healthCheckUrl: string;
}
class ServiceRegistry {
private services: Map<string, ServiceInfo>;
private healthChecks: Map<string, () => Promise<boolean>>;
constructor() {
this.services = new Map();
this.healthChecks = new Map();
}
registerService(serviceInfo: ServiceInfo): void {
const serviceId = `${serviceInfo.name}:${serviceInfo.version}`;
this.services.set(serviceId, serviceInfo);
// 注册健康检查
this.healthChecks.set(serviceId, () => this.checkHealth(serviceInfo));
console.log(`服务已注册: ${serviceId} at ${serviceInfo.address}:${serviceInfo.port}`);
}
getService(name: string, version?: string): ServiceInfo | undefined {
if (version) {
return this.services.get(`${name}:${version}`);
}
// 返回最新版本
const services = Array.from(this.services.values())
.filter(service => service.name === name)
.sort((a, b) => b.version.localeCompare(a.version));
return services[0];
}
getAllServices(): ServiceInfo[] {
return Array.from(this.services.values());
}
async checkHealth(serviceInfo: ServiceInfo): Promise<boolean> {
try {
const response = await fetch(serviceInfo.healthCheckUrl, {
timeout: 5000,
});
return response.ok;
} catch (error) {
console.warn(`健康检查失败 (${serviceInfo.name}):`, error);
return false;
}
}
async checkAllServices(): Promise<Record<string, boolean>> {
const results: Record<string, boolean> = {};
for (const [serviceId, serviceInfo] of this.services.entries()) {
const isHealthy = await this.checkHealth(serviceInfo);
results[serviceId] = isHealthy;
if (!isHealthy) {
console.warn(`服务不健康: ${serviceId}`);
}
}
return results;
}
}
// 服务发现客户端
class ServiceDiscoveryClient {
private registry: ServiceRegistry;
constructor(registry: ServiceRegistry) {
this.registry = registry;
}
async getRunnableClient<Input, Output>(
serviceName: string,
version?: string
): Promise<Runnable<Input, Output>> {
const serviceInfo = this.registry.getService(serviceName, version);
if (!serviceInfo) {
throw new Error(`未找到服务: ${serviceName}`);
}
if (serviceInfo.protocol === 'http') {
return new HttpRunnableClient<Input, Output>(
`http://${serviceInfo.address}:${serviceInfo.port}`,
serviceName
);
} else {
return new GrpcRunnableClient<Input, Output>(
`${serviceInfo.address}:${serviceInfo.port}`,
serviceName
);
}
}
}微服务编排
实现微服务的编排和组合:
typescript
// 微服务编排器
class MicroserviceOrchestrator {
private serviceDiscovery: ServiceDiscoveryClient;
private serviceRegistry: ServiceRegistry;
constructor(
serviceDiscovery: ServiceDiscoveryClient,
serviceRegistry: ServiceRegistry
) {
this.serviceDiscovery = serviceDiscovery;
this.serviceRegistry = serviceRegistry;
}
async createRAGChain(sessionId?: string): Promise<Runnable<string, string>> {
try {
// 获取检索服务客户端
const retrievalService = await this.serviceDiscovery.getRunnableClient<
string,
Document[]
>('retrieval-service');
// 获取提示模板服务客户端
const promptService = await this.serviceDiscovery.getRunnableClient<
{ context: string; question: string },
string
>('prompt-template-service');
// 获取 LLM 服务客户端
const llmService = await this.serviceDiscovery.getRunnableClient<
BaseMessage[],
string
>('llm-service');
// 构建 RAG 链
const ragChain = RunnableMap.from({
question: (input: string) => input,
context: async (input: string) => {
const documents = await retrievalService.invoke(input);
return documents.map(doc => doc.pageContent).join('\n\n');
}
}).pipe(
async (input: { question: string; context: string }) => {
return await promptService.invoke({
context: input.context,
question: input.question
});
}
).pipe(
async (prompt: string) => {
return await llmService.invoke([{ role: 'user', content: prompt }]);
}
);
// 如果需要会话历史,包装为带历史的 Runnable
if (sessionId) {
const messageHistoryService = await this.getMessageHistoryService(sessionId);
return new RunnableWithMessageHistory(
ragChain,
async () => messageHistoryService,
{ sessionId }
);
}
return ragChain;
} catch (error) {
throw new Error(`创建 RAG 链失败: ${error.message}`);
}
}
private async getMessageHistoryService(sessionId: string): Promise<BaseChatMessageHistory> {
// 获取消息历史服务客户端
const historyService = await this.serviceDiscovery.getRunnableClient<
any,
BaseMessage[]
>('message-history-service');
// 创建适配器
return new class extends BaseChatMessageHistory {
async getMessages(): Promise<BaseMessage[]> {
return await historyService.invoke({ sessionId, action: 'getMessages' });
}
async addMessage(message: BaseMessage): Promise<void> {
await historyService.invoke({ sessionId, action: 'addMessage', message });
}
async clear(): Promise<void> {
await historyService.invoke({ sessionId, action: 'clear' });
}
};
}
async createChatChain(): Promise<Runnable<BaseMessage[], string>> {
try {
// 获取 LLM 服务客户端
const llmService = await this.serviceDiscovery.getRunnableClient<
BaseMessage[],
string
>('llm-service');
return llmService;
} catch (error) {
throw new Error(`创建聊天链失败: ${error.message}`);
}
}
}
// 服务组合示例
class CompositeAIService {
private orchestrator: MicroserviceOrchestrator;
constructor(orchestrator: MicroserviceOrchestrator) {
this.orchestrator = orchestrator;
}
async answerQuestion(
question: string,
options?: { useRAG?: boolean; sessionId?: string }
): Promise<string> {
try {
let chain: Runnable<string, string>;
if (options?.useRAG) {
chain = await this.orchestrator.createRAGChain(options.sessionId);
} else {
const chatChain = await this.orchestrator.createChatChain();
chain = new RunnableLambda(async (input: string) => {
return await chatChain.invoke([{ role: 'user', content: input }]);
});
}
return await chain.invoke(question);
} catch (error) {
throw new Error(`回答问题失败: ${error.message}`);
}
}
async *streamAnswer(
question: string,
options?: { useRAG?: boolean; sessionId?: string }
): AsyncGenerator<string> {
try {
let chain: Runnable<string, string>;
if (options?.useRAG) {
chain = await this.orchestrator.createRAGChain(options.sessionId);
} else {
const chatChain = await this.orchestrator.createChatChain();
chain = new RunnableLambda(async (input: string) => {
return await chatChain.invoke([{ role: 'user', content: input }]);
});
}
const stream = await chain.stream(question);
for await (const chunk of stream) {
yield chunk;
}
} catch (error) {
throw new Error(`流式回答失败: ${error.message}`);
}
}
}实际应用示例
让我们看一个完整的实际应用示例,展示如何构建和使用微服务化的 LangChain 应用:
typescript
// 微服务化问答系统
class MicroserviceQASystem {
private serviceRegistry: ServiceRegistry;
private serviceDiscovery: ServiceDiscoveryClient;
private orchestrator: MicroserviceOrchestrator;
private compositeAI: CompositeAIService;
constructor() {
this.serviceRegistry = new ServiceRegistry();
this.serviceDiscovery = new ServiceDiscoveryClient(this.serviceRegistry);
this.orchestrator = new MicroserviceOrchestrator(
this.serviceDiscovery,
this.serviceRegistry
);
this.compositeAI = new CompositeAIService(this.orchestrator);
// 注册服务
this.registerServices();
}
private registerServices(): void {
// 注册 LLM 服务
this.serviceRegistry.registerService({
name: 'llm-service',
address: 'llm-service.internal',
port: 8080,
protocol: 'http',
version: '1.0.0',
tags: ['llm', 'openai'],
healthCheckUrl: 'http://llm-service.internal:8080/health'
});
// 注册检索服务
this.serviceRegistry.registerService({
name: 'retrieval-service',
address: 'retrieval-service.internal',
port: 8081,
protocol: 'http',
version: '1.0.0',
tags: ['retrieval', 'vector'],
healthCheckUrl: 'http://retrieval-service.internal:8081/health'
});
// 注册提示模板服务
this.serviceRegistry.registerService({
name: 'prompt-template-service',
address: 'prompt-service.internal',
port: 8082,
protocol: 'http',
version: '1.0.0',
tags: ['prompt', 'template'],
healthCheckUrl: 'http://prompt-service.internal:8082/health'
});
// 注册消息历史服务
this.serviceRegistry.registerService({
name: 'message-history-service',
address: 'history-service.internal',
port: 8083,
protocol: 'http',
version: '1.0.0',
tags: ['history', 'chat'],
healthCheckUrl: 'http://history-service.internal:8083/health'
});
}
async answerQuestion(
question: string,
options?: {
useRAG?: boolean;
sessionId?: string;
stream?: boolean;
}
): Promise<string | AsyncGenerator<string>> {
try {
if (options?.stream) {
return this.compositeAI.streamAnswer(
question,
{
useRAG: options.useRAG,
sessionId: options.sessionId
}
);
} else {
const answer = await this.compositeAI.answerQuestion(
question,
{
useRAG: options?.useRAG,
sessionId: options?.sessionId
}
);
return answer;
}
} catch (error) {
throw new Error(`问答处理失败: ${error.message}`);
}
}
async checkSystemHealth(): Promise<Record<string, boolean>> {
return await this.serviceRegistry.checkAllServices();
}
getAvailableServices(): ServiceInfo[] {
return this.serviceRegistry.getAllServices();
}
}
// 使用示例
async function demonstrateMicroserviceQA() {
console.log('=== 微服务化问答系统演示 ===\n');
const qaSystem = new MicroserviceQASystem();
// 检查系统健康状态
console.log('系统健康状态:');
const healthStatus = await qaSystem.checkSystemHealth();
console.log(healthStatus);
console.log();
// 显示可用服务
console.log('可用服务:');
const services = qaSystem.getAvailableServices();
services.forEach(service => {
console.log(`- ${service.name} (${service.version}) at ${service.address}:${service.port}`);
});
console.log();
try {
// 普通问答
console.log('普通问答:');
const answer1 = await qaSystem.answerQuestion('你好,介绍一下 LangChain');
console.log(`答案: ${answer1}`);
console.log();
// RAG 问答
console.log('RAG 问答:');
const answer2 = await qaSystem.answerQuestion(
'LangChain 有什么主要特性?',
{ useRAG: true }
);
console.log(`答案: ${answer2}`);
console.log();
// 带会话历史的问答
console.log('带会话历史的问答:');
const sessionId = 'session_' + Date.now();
const answer3 = await qaSystem.answerQuestion(
'能详细说说 LCEL 吗?',
{ useRAG: true, sessionId }
);
console.log(`答案: ${answer3}`);
console.log();
// 流式问答
console.log('流式问答:');
const stream = await qaSystem.answerQuestion(
'请逐步解释 LangChain 的工作原理',
{ stream: true }
) as AsyncGenerator<string>;
console.log('逐步响应:');
for await (const chunk of stream) {
process.stdout.write(chunk);
}
console.log('\n');
} catch (error) {
console.error('错误:', error.message);
}
}
// NestJS 微服务集成示例
/*
// llm-service.controller.ts
import { Controller, Post, Body } from '@nestjs/common';
@Controller('llm-service')
export class LLMServiceController {
private readonly llmService: LLMService;
constructor() {
this.llmService = new LLMService(
process.env.MODEL_NAME || 'gpt-3.5-turbo',
process.env.OPENAI_API_KEY || ''
);
}
@Post('invoke')
async invoke(@Body() body: { input: BaseMessage[] }) {
const output = await this.llmService.invoke(body.input);
return { output };
}
@Post('batch')
async batch(@Body() body: { inputs: BaseMessage[][] }) {
const outputs = await this.llmService.batch(body.inputs);
return { outputs };
}
}
// main.ts (LLM 服务)
import { NestFactory } from '@nestjs/core';
import { LLMServiceModule } from './llm-service.module';
async function bootstrap() {
const app = await NestFactory.create(LLMServiceModule);
await app.listen(8080);
console.log('LLM 服务运行在端口 8080');
}
bootstrap();
*/总结
通过将不同的 Runnable 组件部署为独立的微服务,LangChain V3 应用可以获得以下优势:
- 独立部署 - 每个组件可以独立开发、测试和部署
- 可扩展性 - 可以根据需要独立扩展不同的服务
- 技术多样性 - 不同的服务可以使用最适合的技术栈
- 故障隔离 - 单个服务的故障不会影响整个系统
- 团队协作 - 不同的团队可以负责不同的服务
- 版本管理 - 可以独立管理不同服务的版本
微服务化使得 LangChain 应用更加灵活和可维护,但也带来了分布式系统的复杂性,需要仔细考虑服务间通信、数据一致性、监控和调试等问题。
在下一章中,我们将探讨 Agent 2.0:基于 LCEL 的自主代理,了解如何构建更智能的自主 AI 代理。