WebSocket 网关:@WebSocketGateway() 如何集成 Socket.IO?
在现代 Web 应用中,实时通信变得越来越重要。NestJS 通过 @WebSocketGateway() 装饰器提供了强大的 WebSocket 支持,并默认集成了 Socket.IO 库。本文将深入探讨 WebSocket 网关的工作机制,包括连接管理、会话状态和广播机制的实现。
1. WebSocket 基础概念
1.1 什么是 WebSocket?
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务器主动向客户端推送数据:
typescript
// 传统的 HTTP 请求-响应模式
// Client → Request → Server
// Client ← Response ← Server
// WebSocket 全双工通信
// Client ↔ Data ↔ Server (双向实时通信)1.2 Socket.IO 简介
Socket.IO 是一个基于 WebSocket 的实时应用框架,提供了更多高级特性:
typescript
// Socket.IO 的主要特性
// 1. 自动重连
// 2. 心跳检测
// 3. 房间和广播
// 4. 跨浏览器兼容
// 5. 多路复用2. 基本 WebSocket 网关
2.1 创建 WebSocket 网关
typescript
// gateways/chat.gateway.ts
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
OnGatewayInit
} from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';
@WebSocketGateway({
cors: {
origin: '*',
},
})
export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private logger: Logger = new Logger('ChatGateway');
afterInit(server: Server) {
this.logger.log('WebSocket Gateway initialized');
}
handleConnection(client: Socket, ...args: any[]) {
this.logger.log(`Client connected: ${client.id}`);
}
handleDisconnect(client: Socket) {
this.logger.log(`Client disconnected: ${client.id}`);
}
@SubscribeMessage('message')
handleMessage(client: Socket, payload: any): void {
this.server.emit('message', {
...payload,
clientId: client.id,
timestamp: new Date(),
});
}
}2.2 网关配置选项
typescript
// 详细的网关配置
@WebSocketGateway({
namespace: '/chat', // 命名空间
cors: { // CORS 配置
origin: ['http://localhost:3000', 'https://myapp.com'],
credentials: true,
},
transports: ['websocket', 'polling'], // 传输方式
allowEIO3: true, // 兼容 Socket.IO v3
pingInterval: 10000, // 心跳间隔
pingTimeout: 5000, // 心跳超时
})
export class ChatGateway {
// 网关实现
}3. 连接管理
3.1 客户端连接处理
typescript
// 连接管理实现
@WebSocketGateway()
export class ConnectionGateway implements OnGatewayConnection, OnGatewayDisconnect {
private readonly logger = new Logger(ConnectionGateway.name);
private connectedClients = new Map<string, Socket>();
handleConnection(client: Socket, ...args: any[]) {
// 存储客户端连接
this.connectedClients.set(client.id, client);
// 获取连接信息
const handshake = client.handshake;
this.logger.log(`Client connected: ${client.id}`);
this.logger.log(`Query params: ${JSON.stringify(handshake.query)}`);
this.logger.log(`Headers: ${JSON.stringify(handshake.headers)}`);
// 发送欢迎消息
client.emit('welcome', {
message: 'Welcome to the WebSocket server!',
clientId: client.id,
timestamp: new Date(),
});
}
handleDisconnect(client: Socket) {
// 清理客户端连接
this.connectedClients.delete(client.id);
this.logger.log(`Client disconnected: ${client.id}`);
// 通知其他客户端
this.server.emit('user_disconnected', {
clientId: client.id,
timestamp: new Date(),
});
}
@WebSocketServer()
server: Server;
// 获取在线客户端数量
getConnectedClientsCount(): number {
return this.connectedClients.size;
}
// 向特定客户端发送消息
sendToClient(clientId: string, event: string, data: any) {
const client = this.connectedClients.get(clientId);
if (client) {
client.emit(event, data);
}
}
}3.2 身份验证和授权
typescript
// 身份验证网关
@WebSocketGateway()
export class AuthGateway implements OnGatewayConnection {
private readonly logger = new Logger(AuthGateway.name);
handleConnection(client: Socket, ...args: any[]) {
try {
// 从查询参数中获取令牌
const token = client.handshake.query.token as string;
if (!token) {
this.logger.warn(`Unauthorized connection attempt from ${client.id}`);
client.disconnect();
return;
}
// 验证令牌
const user = this.validateToken(token);
if (!user) {
client.disconnect();
return;
}
// 将用户信息附加到客户端
(client as any).user = user;
this.logger.log(`User ${user.id} connected with client ID: ${client.id}`);
} catch (error) {
this.logger.error('Error during connection authentication', error);
client.disconnect();
}
}
private validateToken(token: string): User | null {
// 实现令牌验证逻辑
try {
// 这里可以使用 JWT 验证或其他认证机制
return jwt.verify(token, process.env.JWT_SECRET) as User;
} catch (error) {
return null;
}
}
}4. 会话状态管理
4.1 会话数据存储
typescript
// 会话状态管理
interface ClientSession {
userId: string;
username: string;
joinTime: Date;
rooms: Set<string>;
lastActivity: Date;
}
@WebSocketGateway()
export class SessionGateway {
private readonly logger = new Logger(SessionGateway.name);
private readonly sessions = new Map<string, ClientSession>();
handleConnection(client: Socket, ...args: any[]) {
// 初始化会话
const session: ClientSession = {
userId: (client as any).user?.id || 'anonymous',
username: (client as any).user?.username || 'Guest',
joinTime: new Date(),
rooms: new Set<string>(),
lastActivity: new Date(),
};
this.sessions.set(client.id, session);
this.logger.log(`Session created for client ${client.id}`);
}
handleDisconnect(client: Socket) {
// 清理会话
this.sessions.delete(client.id);
this.logger.log(`Session removed for client ${client.id}`);
}
@SubscribeMessage('join_room')
handleJoinRoom(client: Socket, room: string): void {
const session = this.sessions.get(client.id);
if (session) {
client.join(room);
session.rooms.add(room);
session.lastActivity = new Date();
client.emit('room_joined', {
room,
timestamp: new Date(),
});
}
}
@SubscribeMessage('leave_room')
handleLeaveRoom(client: Socket, room: string): void {
const session = this.sessions.get(client.id);
if (session) {
client.leave(room);
session.rooms.delete(room);
session.lastActivity = new Date();
client.emit('room_left', {
room,
timestamp: new Date(),
});
}
}
// 获取会话信息
getSessionInfo(clientId: string): ClientSession | undefined {
return this.sessions.get(clientId);
}
// 获取所有会话
getAllSessions(): Map<string, ClientSession> {
return new Map(this.sessions);
}
}4.2 用户状态同步
typescript
// 用户状态同步
@WebSocketGateway()
export class UserStatusGateway {
private readonly logger = new Logger(UserStatusGateway.name);
private readonly userConnections = new Map<string, Set<string>>(); // userId -> Set<clientIds>
handleConnection(client: Socket, ...args: any[]) {
const userId = (client as any).user?.id;
if (userId) {
// 记录用户连接
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Set());
}
this.userConnections.get(userId).add(client.id);
// 通知其他用户该用户上线
this.broadcastUserStatus(userId, 'online');
}
}
handleDisconnect(client: Socket) {
const userId = (client as any).user?.id;
if (userId) {
const userClients = this.userConnections.get(userId);
if (userClients) {
userClients.delete(client.id);
// 如果用户没有其他连接,通知下线
if (userClients.size === 0) {
this.userConnections.delete(userId);
this.broadcastUserStatus(userId, 'offline');
}
}
}
}
private broadcastUserStatus(userId: string, status: 'online' | 'offline') {
this.server.emit('user_status_changed', {
userId,
status,
timestamp: new Date(),
});
}
// 获取用户在线状态
getUserStatus(userId: string): 'online' | 'offline' {
return this.userConnections.has(userId) ? 'online' : 'offline';
}
// 获取在线用户列表
getOnlineUsers(): string[] {
return Array.from(this.userConnections.keys());
}
}5. 广播机制
5.1 基本广播功能
typescript
// 广播机制实现
@WebSocketGateway()
export class BroadcastGateway {
@WebSocketServer()
server: Server;
@SubscribeMessage('broadcast_message')
handleBroadcastMessage(client: Socket, payload: any): void {
// 向所有客户端广播消息
this.server.emit('broadcast', {
...payload,
senderId: client.id,
timestamp: new Date(),
});
}
@SubscribeMessage('room_message')
handleRoomMessage(client: Socket, payload: { room: string; message: string }): void {
// 向特定房间广播消息
this.server.to(payload.room).emit('room_message', {
room: payload.room,
message: payload.message,
senderId: client.id,
timestamp: new Date(),
});
}
@SubscribeMessage('private_message')
handlePrivateMessage(client: Socket, payload: { targetId: string; message: string }): void {
// 向特定客户端发送私信
client.to(payload.targetId).emit('private_message', {
message: payload.message,
senderId: client.id,
timestamp: new Date(),
});
}
}5.2 房间管理
typescript
// 房间管理实现
@WebSocketGateway()
export class RoomGateway {
@WebSocketServer()
server: Server;
private readonly logger = new Logger(RoomGateway.name);
private readonly rooms = new Map<string, Set<string>>(); // roomName -> Set<clientIds>
@SubscribeMessage('join_room')
handleJoinRoom(client: Socket, room: string): void {
client.join(room);
// 更新房间成员列表
if (!this.rooms.has(room)) {
this.rooms.set(room, new Set());
}
this.rooms.get(room).add(client.id);
// 通知房间内其他成员
client.to(room).emit('user_joined', {
userId: (client as any).user?.id,
clientId: client.id,
room,
timestamp: new Date(),
});
// 向客户端确认加入成功
client.emit('room_joined', {
room,
members: Array.from(this.rooms.get(room)),
timestamp: new Date(),
});
this.logger.log(`Client ${client.id} joined room ${room}`);
}
@SubscribeMessage('leave_room')
handleLeaveRoom(client: Socket, room: string): void {
client.leave(room);
// 更新房间成员列表
const roomMembers = this.rooms.get(room);
if (roomMembers) {
roomMembers.delete(client.id);
// 如果房间为空,删除房间
if (roomMembers.size === 0) {
this.rooms.delete(room);
}
}
// 通知房间内其他成员
client.to(room).emit('user_left', {
userId: (client as any).user?.id,
clientId: client.id,
room,
timestamp: new Date(),
});
client.emit('room_left', {
room,
timestamp: new Date(),
});
this.logger.log(`Client ${client.id} left room ${room}`);
}
// 获取房间信息
getRoomInfo(room: string): {
name: string;
memberCount: number;
members: string[]
} | null {
const members = this.rooms.get(room);
if (!members) {
return null;
}
return {
name: room,
memberCount: members.size,
members: Array.from(members),
};
}
// 获取所有房间
getAllRooms(): { name: string; memberCount: number }[] {
return Array.from(this.rooms.entries()).map(([name, members]) => ({
name,
memberCount: members.size,
}));
}
}6. 错误处理和监控
6.1 错误处理
typescript
// 错误处理实现
@WebSocketGateway()
export class ErrorHandlingGateway {
private readonly logger = new Logger(ErrorHandlingGateway.name);
@SubscribeMessage('risky_operation')
async handleRiskyOperation(client: Socket, payload: any) {
try {
// 执行可能出错的操作
const result = await this.performRiskyOperation(payload);
client.emit('operation_success', {
result,
timestamp: new Date(),
});
} catch (error) {
this.logger.error('Error in risky operation', error);
client.emit('operation_error', {
message: error.message,
code: error.code,
timestamp: new Date(),
});
}
}
private async performRiskyOperation(payload: any): Promise<any> {
// 模拟可能出错的操作
if (Math.random() < 0.3) {
throw new Error('Random operation failure');
}
return { success: true, data: payload };
}
// 全局错误处理
@UseFilters(new WsExceptionFilter())
@SubscribeMessage('another_operation')
handleAnotherOperation(client: Socket, payload: any) {
// 处理逻辑
}
}6.2 性能监控
typescript
// 性能监控实现
@WebSocketGateway()
export class MonitoringGateway {
private readonly logger = new Logger(MonitoringGateway.name);
private readonly messageStats = new Map<string, { count: number; totalTime: number }>();
@SubscribeMessage('tracked_message')
handleTrackedMessage(client: Socket, payload: { type: string; data: any }) {
const startTime = Date.now();
// 处理消息
this.processMessage(payload);
const endTime = Date.now();
const duration = endTime - startTime;
// 更新统计信息
if (!this.messageStats.has(payload.type)) {
this.messageStats.set(payload.type, { count: 0, totalTime: 0 });
}
const stats = this.messageStats.get(payload.type);
stats.count++;
stats.totalTime += duration;
// 记录性能日志
if (duration > 1000) { // 超过1秒的慢操作
this.logger.warn(`Slow message processing: ${payload.type} took ${duration}ms`);
}
}
private processMessage(payload: any) {
// 消息处理逻辑
}
// 获取消息统计信息
getMessageStats(): any {
const stats = {};
for (const [type, data] of this.messageStats.entries()) {
stats[type] = {
count: data.count,
averageTime: data.totalTime / data.count,
totalTime: data.totalTime,
};
}
return stats;
}
}7. 总结
NestJS 的 @WebSocketGateway() 通过集成 Socket.IO 提供了完整的 WebSocket 解决方案:
- 连接管理:处理客户端连接和断开连接
- 会话状态:维护客户端和用户状态信息
- 广播机制:支持全局、房间和私信广播
- 房间管理:实现灵活的房间分组功能
- 错误处理:提供完善的错误处理机制
- 性能监控:支持性能统计和监控
通过合理使用 WebSocket 网关,我们可以构建出:
- 实时聊天应用:支持消息广播和私信
- 在线游戏:实现实时游戏状态同步
- 协作工具:支持实时文档编辑和状态共享
- 通知系统:实现实时推送通知
- 监控面板:实时显示系统状态和指标
WebSocket 网关的核心优势:
- 全双工通信:支持服务器主动推送数据
- 低延迟:相比轮询方式延迟更低
- 高并发:支持大量并发连接
- 灵活广播:支持多种消息广播模式
- 状态管理:提供完整的连接和会话管理
至此,我们已经完成了 NestJS 专栏第四阶段的所有内容。在下一篇文章中,我们将进入第五阶段,探讨 Nest CLI 的代码生成原理:nest g service user 发生了什么?