微服务通信:ClientProxy 如何封装 TCP/RMQ/Kafka?
在现代分布式系统中,微服务架构已成为主流。NestJS 提供了强大的微服务支持,通过 ClientProxy 抽象层封装了多种传输协议,包括 TCP、Redis、RabbitMQ、Kafka 等。本文将深入探讨 ClientProxy 的工作机制,以及 emit() 与 send() 方法在事件驱动与请求响应模式中的区别。
1. 微服务基础概念
1.1 什么是微服务通信?
微服务通信是指在分布式系统中,不同服务之间进行数据交换和协调的机制:
typescript
// 传统的单体应用
// 所有功能都在一个应用中
// 用户服务 <-> 订单服务 <-> 支付服务 (都在同一进程中)
// 微服务架构
// 用户服务 (独立进程)
// 订单服务 (独立进程)
// 支付服务 (独立进程)
// 服务间通过网络进行通信1.2 NestJS 微服务支持
NestJS 提供了统一的微服务抽象层:
typescript
// 微服务配置示例
@Module({
imports: [
// TCP 微服务
ClientsModule.register([
{
name: 'USER_SERVICE',
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
},
},
]),
// RabbitMQ 微服务
ClientsModule.register([
{
name: 'ORDER_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'orders_queue',
queueOptions: {
durable: false,
},
},
},
]),
],
})
export class AppModule {}2. ClientProxy 核心概念
2.1 ClientProxy 简介
ClientProxy 是 NestJS 中用于微服务通信的抽象客户端:
typescript
import { ClientProxy } from '@nestjs/microservices';
@Controller('users')
export class UserController {
constructor(
@Inject('USER_SERVICE') private readonly userService: ClientProxy,
@Inject('ORDER_SERVICE') private readonly orderService: ClientProxy,
) {}
@Get(':id')
async getUser(@Param('id') id: string) {
// 使用 send() 发送请求并等待响应
const user = await this.userService.send({ cmd: 'get_user' }, { id }).toPromise();
// 使用 emit() 发送事件(无需等待响应)
this.orderService.emit('user_viewed', { userId: id });
return user;
}
}2.2 send() vs emit() 的区别
typescript
// send() - 请求-响应模式
// 等待服务响应,返回 Observable
const response = client.send(pattern, data);
// emit() - 事件驱动模式
// 发送事件不等待响应,返回 Observable
const ack = client.emit(pattern, data);3. TCP 传输协议
3.1 TCP 客户端配置
typescript
// TCP 客户端配置
@Module({
imports: [
ClientsModule.register([
{
name: 'TCP_SERVICE',
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
retryAttempts: 5,
retryDelay: 3000,
},
},
]),
],
})
export class AppModule {}
// 使用 TCP 客户端
@Controller()
export class AppController {
constructor(
@Inject('TCP_SERVICE') private readonly client: ClientProxy,
) {}
@Get('data')
async getData() {
return this.client.send('get_data', {}).toPromise();
}
}3.2 TCP 服务端实现
typescript
// TCP 服务端
@Controller()
export class TcpController {
@MessagePattern('get_data')
getData() {
return { message: 'Hello from TCP service', timestamp: new Date() };
}
@EventPattern('user_created')
async handleUserCreated(data: any) {
console.log('User created:', data);
// 处理用户创建事件
}
}4. RabbitMQ 传输协议
4.1 RabbitMQ 客户端配置
typescript
// RabbitMQ 客户端配置
@Module({
imports: [
ClientsModule.register([
{
name: 'RMQ_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'messages_queue',
queueOptions: {
durable: true, // 队列持久化
},
prefetchCount: 1, // 预取消息数量
},
},
]),
],
})
export class AppModule {}4.2 RabbitMQ 服务端实现
typescript
// RabbitMQ 服务端
@Controller()
export class RmqController {
@MessagePattern('process_order')
async processOrder(data: any) {
console.log('Processing order:', data);
// 模拟处理时间
await new Promise(resolve => setTimeout(resolve, 1000));
return {
orderId: data.orderId,
status: 'processed',
processedAt: new Date(),
};
}
@EventPattern('order_placed')
async handleOrderPlaced(data: any) {
console.log('Order placed event received:', data);
// 处理订单放置事件
}
}5. Kafka 传输协议
5.1 Kafka 客户端配置
typescript
// Kafka 客户端配置
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-app-consumer-group',
},
},
},
]),
],
})
export class AppModule {}5.2 Kafka 服务端实现
typescript
// Kafka 服务端
@Controller()
export class KafkaController {
@MessagePattern('user.registered')
async handleUserRegistered(data: any) {
console.log('User registered:', data);
// 处理用户注册事件
return { processed: true };
}
@EventPattern('order.created')
async handleOrderCreated(data: any) {
console.log('Order created event:', data);
// 处理订单创建事件
}
}6. ClientProxy 内部实现
6.1 ClientProxy 抽象类
typescript
// ClientProxy 抽象实现
export abstract class ClientProxy implements Closeable {
protected readonly logger = new Logger(ClientProxy.name);
// 发送请求并等待响应
abstract send<T>(pattern: any, data: any): Observable<T>;
// 发送事件不等待响应
abstract emit<TResult = any, TInput = any>(
pattern: any,
data: TInput,
): Observable<TResult>;
// 连接到微服务
abstract connect(): Promise<any>;
// 关闭连接
abstract close(): any;
// 处理连接错误
protected handleError(error: any): Observable<never> {
this.logger.error(error);
return throwError(error);
}
}6.2 TCP 客户端实现
typescript
// TCP 客户端实现示例
export class TcpClient extends ClientProxy {
private readonly logger = new Logger(TcpClient.name);
private socket: net.Socket;
constructor(private readonly options: TcpClientOptions) {
super();
}
send<T>(pattern: any, data: any): Observable<T> {
return new Observable(observer => {
// 建立连接
this.connect()
.then(() => {
// 发送消息
const id = randomString();
const message = { id, pattern, data };
this.socket.write(JSON.stringify(message));
// 监听响应
const handler = (response: any) => {
if (response.id === id) {
observer.next(response.data);
observer.complete();
}
};
this.socket.on('data', handler);
// 清理资源
return () => {
this.socket.removeListener('data', handler);
};
})
.catch(error => observer.error(error));
});
}
emit(pattern: any, data: any): Observable<any> {
return new Observable(observer => {
this.connect()
.then(() => {
const message = { pattern, data, isEvent: true };
this.socket.write(JSON.stringify(message));
observer.next(true);
observer.complete();
})
.catch(error => observer.error(error));
});
}
async connect(): Promise<any> {
if (this.socket && !this.socket.destroyed) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
this.socket = net.createConnection(this.options);
this.socket.on('connect', () => {
this.logger.log('Connected to TCP server');
resolve();
});
this.socket.on('error', reject);
});
}
close() {
if (this.socket) {
this.socket.destroy();
}
}
}7. 高级用法和模式
7.1 负载均衡和故障转移
typescript
// 负载均衡配置
@Module({
imports: [
ClientsModule.register([
{
name: 'LOAD_BALANCED_SERVICE',
transport: Transport.RMQ,
options: {
urls: [
'amqp://node1:5672',
'amqp://node2:5672',
'amqp://node3:5672',
],
queue: 'service_queue',
queueOptions: {
durable: true,
},
},
},
]),
],
})
export class AppModule {}7.2 消息确认和重试机制
typescript
// 带重试机制的客户端使用
@Controller()
export class AppController {
constructor(
@Inject('RMQ_SERVICE') private readonly client: ClientProxy,
) {}
@Post('order')
async createOrder(@Body() orderData: any) {
try {
// 发送订单处理请求,带超时和重试
const result = await this.client
.send('process_order', orderData)
.pipe(
timeout(5000), // 5秒超时
retry(3), // 重试3次
)
.toPromise();
return result;
} catch (error) {
throw new HttpException('Order processing failed', 500);
}
}
}7.3 批量消息处理
typescript
// 批量消息处理
@Injectable()
export class BatchMessageService {
constructor(
@Inject('KAFKA_SERVICE') private readonly client: ClientProxy,
) {}
async sendBatchMessages(messages: any[]) {
// 并行发送多个消息
const observables = messages.map(message =>
this.client.emit('batch_event', message)
);
// 等待所有消息发送完成
return forkJoin(observables).toPromise();
}
}8. 总结
NestJS 的 ClientProxy 通过统一的抽象层封装了多种微服务传输协议:
- TCP:适用于高性能、低延迟的场景
- RabbitMQ:适用于需要消息队列和复杂路由的场景
- Kafka:适用于高吞吐量、流处理的场景
两种通信模式的区别:
- send():请求-响应模式,等待服务响应
- emit():事件驱动模式,发送事件不等待响应
通过合理选择传输协议和通信模式,我们可以构建出:
- 高可用性:通过负载均衡和故障转移
- 高可靠性:通过消息确认和重试机制
- 高性能:通过批量处理和异步通信
- 可扩展性:通过微服务架构和消息队列
在下一篇文章中,我们将探讨 WebSocket 网关:@WebSocketGateway() 如何集成 Socket.IO,了解连接管理、会话状态、广播机制的实现。