Node.js 网络编程与协议实现
Node.js 在网络编程方面表现出色,提供了丰富的 API 来处理各种网络协议。从底层的 TCP/UDP 到高层的 HTTP/HTTPS,Node.js 都有相应的模块支持。本文将深入探讨 Node.js 的网络编程能力和协议实现机制。
一、TCP 编程
1. TCP 服务器实现
javascript
// TCP 服务器示例
const net = require('net');
const { EventEmitter } = require('events');
class TCPServer extends EventEmitter {
constructor(options = {}) {
super();
this.port = options.port || 8080;
this.host = options.host || 'localhost';
this.connections = new Map();
this.server = null;
}
start() {
this.server = net.createServer((socket) => {
this.handleConnection(socket);
});
this.server.listen(this.port, this.host, () => {
console.log(`TCP 服务器运行在 ${this.host}:${this.port}`);
this.emit('listening', { port: this.port, host: this.host });
});
this.server.on('error', (err) => {
this.emit('error', err);
});
return this.server;
}
handleConnection(socket) {
const clientId = `${socket.remoteAddress}:${socket.remotePort}`;
console.log(`客户端 ${clientId} 已连接`);
// 存储连接信息
this.connections.set(clientId, {
socket,
connectedAt: new Date(),
bytesReceived: 0,
bytesSent: 0
});
// 设置 socket 选项
socket.setEncoding('utf8');
socket.setTimeout(30000); // 30秒超时
// 处理数据接收
socket.on('data', (data) => {
const connection = this.connections.get(clientId);
connection.bytesReceived += data.length;
console.log(`从 ${clientId} 接收到: ${data.trim()}`);
this.emit('data', { clientId, data });
// 回显数据
this.sendToClient(clientId, `Echo: ${data}`);
});
// 处理连接关闭
socket.on('end', () => {
console.log(`客户端 ${clientId} 断开连接`);
this.connections.delete(clientId);
this.emit('disconnect', { clientId });
});
// 处理错误
socket.on('error', (err) => {
console.error(`客户端 ${clientId} 发生错误:`, err);
this.connections.delete(clientId);
this.emit('error', { clientId, error: err });
});
// 处理超时
socket.on('timeout', () => {
console.log(`客户端 ${clientId} 连接超时`);
socket.end('连接超时\r\n');
});
// 发送欢迎消息
socket.write('欢迎连接到 TCP 服务器!\r\n');
this.emit('connect', { clientId, socket });
}
sendToClient(clientId, message) {
const connection = this.connections.get(clientId);
if (connection && connection.socket.writable) {
connection.socket.write(message + '\r\n');
connection.bytesSent += Buffer.byteLength(message + '\r\n');
return true;
}
return false;
}
broadcast(message) {
let count = 0;
for (const [clientId, connection] of this.connections) {
if (this.sendToClient(clientId, message)) {
count++;
}
}
return count;
}
getConnectionInfo(clientId) {
return this.connections.get(clientId);
}
getAllConnections() {
return Array.from(this.connections.keys());
}
close() {
if (this.server) {
// 关闭所有连接
for (const [clientId, connection] of this.connections) {
connection.socket.end('服务器关闭\r\n');
}
this.server.close(() => {
console.log('TCP 服务器已关闭');
this.emit('close');
});
}
}
}
// 使用 TCP 服务器
const server = new TCPServer({ port: 8080, host: 'localhost' });
server.on('listening', (info) => {
console.log(`服务器开始监听: ${info.host}:${info.port}`);
});
server.on('connect', (info) => {
console.log(`新连接: ${info.clientId}`);
});
server.on('data', (info) => {
console.log(`处理数据: ${info.data}`);
});
server.on('disconnect', (info) => {
console.log(`连接断开: ${info.clientId}`);
});
server.start();
// 优雅关闭
process.on('SIGINT', () => {
console.log('正在关闭服务器...');
server.close();
process.exit(0);
});2. TCP 客户端实现
javascript
// TCP 客户端示例
const net = require('net');
class TCPClient extends EventEmitter {
constructor(options = {}) {
super();
this.host = options.host || 'localhost';
this.port = options.port || 8080;
this.socket = null;
this.connected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.reconnectDelay = options.reconnectDelay || 1000;
}
connect() {
return new Promise((resolve, reject) => {
this.socket = net.createConnection(this.port, this.host, () => {
this.connected = true;
this.reconnectAttempts = 0;
console.log(`已连接到 ${this.host}:${this.port}`);
this.emit('connect');
resolve(this.socket);
});
this.socket.setEncoding('utf8');
this.socket.on('data', (data) => {
console.log(`接收到数据: ${data.trim()}`);
this.emit('data', data);
});
this.socket.on('end', () => {
this.connected = false;
console.log('连接已断开');
this.emit('disconnect');
// 尝试重连
this.attemptReconnect();
});
this.socket.on('error', (err) => {
this.connected = false;
console.error('连接错误:', err);
this.emit('error', err);
reject(err);
// 尝试重连
this.attemptReconnect();
});
});
}
attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})...`);
setTimeout(() => {
this.connect().catch(err => {
console.error('重连失败:', err);
});
}, this.reconnectDelay * this.reconnectAttempts);
} else {
console.log('达到最大重连次数,停止重连');
this.emit('reconnectFailed');
}
}
send(message) {
if (this.connected && this.socket && this.socket.writable) {
this.socket.write(message + '\r\n');
return true;
}
return false;
}
disconnect() {
if (this.socket) {
this.socket.end();
this.connected = false;
}
}
}
// 使用 TCP 客户端
const client = new TCPClient({ host: 'localhost', port: 8080 });
client.on('connect', () => {
console.log('客户端已连接');
// 发送消息
client.send('Hello Server!');
});
client.on('data', (data) => {
console.log('客户端收到:', data.trim());
});
client.on('disconnect', () => {
console.log('客户端断开连接');
});
client.on('error', (err) => {
console.error('客户端错误:', err);
});
// 连接服务器
client.connect().catch(err => {
console.error('连接失败:', err);
});二、UDP 编程
1. UDP 服务器实现
javascript
// UDP 服务器示例
const dgram = require('dgram');
class UDPServer {
constructor(options = {}) {
this.port = options.port || 8081;
this.host = options.host || 'localhost';
this.socket = null;
this.clients = new Map();
}
start() {
this.socket = dgram.createSocket('udp4');
this.socket.on('message', (msg, rinfo) => {
this.handleMessage(msg, rinfo);
});
this.socket.on('listening', () => {
const address = this.socket.address();
console.log(`UDP 服务器监听 ${address.address}:${address.port}`);
});
this.socket.on('error', (err) => {
console.error('UDP 服务器错误:', err);
this.socket.close();
});
this.socket.bind(this.port, this.host);
}
handleMessage(msg, rinfo) {
const clientId = `${rinfo.address}:${rinfo.port}`;
console.log(`从 ${clientId} 接收到: ${msg}`);
// 更新客户端信息
this.clients.set(clientId, {
address: rinfo.address,
port: rinfo.port,
lastSeen: new Date()
});
// 回复消息
const response = Buffer.from(`Echo: ${msg}`);
this.socket.send(response, rinfo.port, rinfo.address, (err) => {
if (err) {
console.error('发送回复失败:', err);
}
});
}
broadcast(message) {
const msg = Buffer.from(message);
for (const [clientId, client] of this.clients) {
this.socket.send(msg, client.port, client.address, (err) => {
if (err) {
console.error(`向 ${clientId} 广播失败:`, err);
}
});
}
}
close() {
if (this.socket) {
this.socket.close(() => {
console.log('UDP 服务器已关闭');
});
}
}
}
// 使用 UDP 服务器
const udpServer = new UDPServer({ port: 8081, host: 'localhost' });
udpServer.start();
// 优雅关闭
process.on('SIGINT', () => {
udpServer.close();
process.exit(0);
});2. UDP 客户端实现
javascript
// UDP 客户端示例
const dgram = require('dgram');
class UDPClient {
constructor(options = {}) {
this.host = options.host || 'localhost';
this.port = options.port || 8081;
this.socket = null;
}
connect() {
this.socket = dgram.createSocket('udp4');
this.socket.on('message', (msg, rinfo) => {
console.log(`从服务器接收到: ${msg}`);
});
this.socket.on('error', (err) => {
console.error('UDP 客户端错误:', err);
});
return this.socket;
}
send(message) {
const msg = Buffer.from(message);
this.socket.send(msg, this.port, this.host, (err) => {
if (err) {
console.error('发送消息失败:', err);
}
});
}
close() {
if (this.socket) {
this.socket.close();
}
}
}
// 使用 UDP 客户端
const udpClient = new UDPClient({ host: 'localhost', port: 8081 });
udpClient.connect();
// 发送消息
udpClient.send('Hello UDP Server!');
// 5秒后关闭
setTimeout(() => {
udpClient.close();
}, 5000);三、HTTP 协议实现
1. HTTP 服务器实现
javascript
// HTTP 服务器示例
const http = require('http');
const url = require('url');
const querystring = require('querystring');
class HTTPServer {
constructor(options = {}) {
this.port = options.port || 3000;
this.host = options.host || 'localhost';
this.routes = new Map();
this.middlewares = [];
this.server = null;
}
use(middleware) {
this.middlewares.push(middleware);
}
route(method, path, handler) {
const key = `${method.toUpperCase()}:${path}`;
this.routes.set(key, handler);
}
get(path, handler) {
this.route('GET', path, handler);
}
post(path, handler) {
this.route('POST', path, handler);
}
put(path, handler) {
this.route('PUT', path, handler);
}
delete(path, handler) {
this.route('DELETE', path, handler);
}
start() {
this.server = http.createServer((req, res) => {
this.handleRequest(req, res);
});
this.server.listen(this.port, this.host, () => {
console.log(`HTTP 服务器运行在 http://${this.host}:${this.port}`);
});
this.server.on('error', (err) => {
console.error('HTTP 服务器错误:', err);
});
return this.server;
}
async handleRequest(req, res) {
const parsedUrl = url.parse(req.url, true);
const pathname = parsedUrl.pathname;
const method = req.method;
// 设置响应头
res.setHeader('Content-Type', 'application/json');
res.setHeader('X-Powered-By', 'Custom HTTP Server');
// 创建上下文对象
const ctx = {
req,
res,
url: parsedUrl,
params: {},
query: parsedUrl.query,
body: null,
status: 200,
headers: {}
};
try {
// 解析请求体(仅对 POST/PUT 请求)
if (['POST', 'PUT', 'PATCH'].includes(method)) {
ctx.body = await this.parseBody(req);
}
// 执行中间件
for (const middleware of this.middlewares) {
await middleware(ctx, () => Promise.resolve());
}
// 查找路由处理器
const routeKey = `${method}:${pathname}`;
const handler = this.routes.get(routeKey);
if (handler) {
await handler(ctx);
} else {
// 404 处理
ctx.status = 404;
ctx.body = { error: 'Not Found' };
}
} catch (err) {
console.error('处理请求时出错:', err);
ctx.status = 500;
ctx.body = { error: 'Internal Server Error' };
}
// 发送响应
this.sendResponse(ctx);
}
parseBody(req) {
return new Promise((resolve, reject) => {
let body = '';
req.on('data', chunk => {
body += chunk.toString();
});
req.on('end', () => {
try {
if (req.headers['content-type'] === 'application/json') {
resolve(JSON.parse(body));
} else if (req.headers['content-type'] === 'application/x-www-form-urlencoded') {
resolve(querystring.parse(body));
} else {
resolve(body);
}
} catch (err) {
reject(err);
}
});
req.on('error', reject);
});
}
sendResponse(ctx) {
const { res, status, body, headers } = ctx;
// 设置状态码
res.statusCode = status;
// 设置自定义头
Object.keys(headers).forEach(key => {
res.setHeader(key, headers[key]);
});
// 发送响应体
if (body !== null && body !== undefined) {
if (typeof body === 'object') {
res.end(JSON.stringify(body, null, 2));
} else {
res.end(body.toString());
}
} else {
res.end();
}
}
close() {
if (this.server) {
this.server.close(() => {
console.log('HTTP 服务器已关闭');
});
}
}
}
// 使用 HTTP 服务器
const server = new HTTPServer({ port: 3000, host: 'localhost' });
// 添加中间件
server.use(async (ctx, next) => {
console.log(`${ctx.req.method} ${ctx.url.pathname}`);
const start = Date.now();
await next();
const ms = Date.now() - start;
console.log(`请求处理耗时: ${ms}ms`);
});
// 定义路由
server.get('/', (ctx) => {
ctx.body = { message: 'Hello World!', timestamp: new Date() };
});
server.get('/users', (ctx) => {
ctx.body = [
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' }
];
});
server.post('/users', (ctx) => {
const newUser = ctx.body;
newUser.id = Date.now();
ctx.status = 201;
ctx.body = { message: 'User created', user: newUser };
});
server.start();
// 优雅关闭
process.on('SIGINT', () => {
server.close();
process.exit(0);
});2. HTTP 客户端实现
javascript
// HTTP 客户端示例
const http = require('http');
const https = require('https');
const { URL } = require('url');
class HTTPClient {
constructor(options = {}) {
this.timeout = options.timeout || 5000;
this.retryAttempts = options.retryAttempts || 3;
}
async request(url, options = {}) {
const parsedUrl = new URL(url);
const isHttps = parsedUrl.protocol === 'https:';
const client = isHttps ? https : http;
const requestOptions = {
hostname: parsedUrl.hostname,
port: parsedUrl.port || (isHttps ? 443 : 80),
path: parsedUrl.pathname + parsedUrl.search,
method: options.method || 'GET',
headers: options.headers || {},
timeout: this.timeout
};
return this.makeRequest(client, requestOptions, options.body, 0);
}
makeRequest(client, options, body, attempt) {
return new Promise((resolve, reject) => {
const req = client.request(options, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
try {
const response = {
statusCode: res.statusCode,
headers: res.headers,
body: this.parseResponseBody(data, res.headers['content-type'])
};
resolve(response);
} catch (err) {
reject(err);
}
});
});
req.on('error', (err) => {
if (attempt < this.retryAttempts) {
console.log(`请求失败,正在重试 (${attempt + 1}/${this.retryAttempts})...`);
setTimeout(() => {
this.makeRequest(client, options, body, attempt + 1)
.then(resolve)
.catch(reject);
}, 1000 * (attempt + 1));
} else {
reject(err);
}
});
req.on('timeout', () => {
req.destroy();
const err = new Error('请求超时');
if (attempt < this.retryAttempts) {
console.log(`请求超时,正在重试 (${attempt + 1}/${this.retryAttempts})...`);
setTimeout(() => {
this.makeRequest(client, options, body, attempt + 1)
.then(resolve)
.catch(reject);
}, 1000 * (attempt + 1));
} else {
reject(err);
}
});
// 发送请求体
if (body) {
if (typeof body === 'object') {
req.write(JSON.stringify(body));
} else {
req.write(body.toString());
}
}
req.end();
});
}
parseResponseBody(body, contentType) {
if (contentType && contentType.includes('application/json')) {
return JSON.parse(body);
}
return body;
}
async get(url, options = {}) {
return this.request(url, { ...options, method: 'GET' });
}
async post(url, body, options = {}) {
const headers = {
'Content-Type': 'application/json',
...options.headers
};
return this.request(url, {
...options,
method: 'POST',
headers,
body
});
}
async put(url, body, options = {}) {
const headers = {
'Content-Type': 'application/json',
...options.headers
};
return this.request(url, {
...options,
method: 'PUT',
headers,
body
});
}
async delete(url, options = {}) {
return this.request(url, { ...options, method: 'DELETE' });
}
}
// 使用 HTTP 客户端
const client = new HTTPClient({ timeout: 5000, retryAttempts: 3 });
async function example() {
try {
// GET 请求
const getResponse = await client.get('http://localhost:3000/users');
console.log('GET 响应:', getResponse);
// POST 请求
const postResponse = await client.post('http://localhost:3000/users', {
name: 'Charlie',
email: 'charlie@example.com'
});
console.log('POST 响应:', postResponse);
} catch (err) {
console.error('请求失败:', err);
}
}
// example();四、WebSocket 实现
1. WebSocket 服务器
javascript
// 简单的 WebSocket 服务器实现
const http = require('http');
const crypto = require('crypto');
class WebSocketServer {
constructor(options = {}) {
this.port = options.port || 8080;
this.clients = new Set();
}
start() {
this.server = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('WebSocket server is running here');
});
this.server.on('upgrade', (req, socket, head) => {
this.handleUpgrade(req, socket, head);
});
this.server.listen(this.port, () => {
console.log(`WebSocket 服务器运行在端口 ${this.port}`);
});
}
handleUpgrade(req, socket, head) {
// 检查是否为 WebSocket 升级请求
if (req.headers.upgrade !== 'websocket') {
socket.destroy();
return;
}
// 验证 WebSocket 密钥
const key = req.headers['sec-websocket-key'];
if (!key) {
socket.destroy();
return;
}
// 生成接受密钥
const acceptKey = this.generateAcceptKey(key);
// 发送升级响应
const headers = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${acceptKey}`,
'\r\n'
];
socket.write(headers.join('\r\n'));
// 处理 WebSocket 连接
this.handleConnection(socket, req);
}
generateAcceptKey(key) {
const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
const sha1 = crypto.createHash('sha1');
sha1.update(key + GUID);
return sha1.digest('base64');
}
handleConnection(socket, req) {
const clientId = `${req.socket.remoteAddress}:${req.socket.remotePort}`;
console.log(`WebSocket 客户端 ${clientId} 已连接`);
this.clients.add(socket);
socket.on('data', (data) => {
try {
const message = this.parseWebSocketFrame(data);
console.log(`从 ${clientId} 接收到:`, message);
// 广播消息给所有客户端
this.broadcast(message, socket);
} catch (err) {
console.error('解析 WebSocket 帧时出错:', err);
}
});
socket.on('close', () => {
console.log(`WebSocket 客户端 ${clientId} 已断开`);
this.clients.delete(socket);
});
socket.on('error', (err) => {
console.error(`WebSocket 客户端 ${clientId} 错误:`, err);
this.clients.delete(socket);
});
}
parseWebSocketFrame(buffer) {
// 简化的 WebSocket 帧解析
const opcode = buffer[0] & 0x0F;
const masked = (buffer[1] & 0x80) === 0x80;
let payloadLength = buffer[1] & 0x7F;
let offset = 2;
if (payloadLength === 126) {
payloadLength = buffer.readUInt16BE(2);
offset = 4;
} else if (payloadLength === 127) {
payloadLength = buffer.readUInt32BE(6);
offset = 10;
}
let maskKey;
if (masked) {
maskKey = buffer.slice(offset, offset + 4);
offset += 4;
}
let payload = buffer.slice(offset, offset + payloadLength);
if (masked) {
for (let i = 0; i < payload.length; i++) {
payload[i] ^= maskKey[i % 4];
}
}
return payload.toString();
}
createWebSocketFrame(message) {
const payload = Buffer.from(message);
const payloadLength = payload.length;
let frame;
if (payloadLength <= 125) {
frame = Buffer.alloc(2 + payloadLength);
frame[1] = payloadLength;
} else if (payloadLength <= 65535) {
frame = Buffer.alloc(4 + payloadLength);
frame[1] = 126;
frame.writeUInt16BE(payloadLength, 2);
frame = Buffer.concat([frame, payload]);
} else {
frame = Buffer.alloc(10 + payloadLength);
frame[1] = 127;
frame.writeUInt32BE(0, 2);
frame.writeUInt32BE(payloadLength, 6);
frame = Buffer.concat([frame, payload]);
}
frame[0] = 0x81; // FIN + text frame
if (frame.length === 2 + payloadLength) {
payload.copy(frame, 2);
}
return frame;
}
broadcast(message, excludeSocket = null) {
const frame = this.createWebSocketFrame(message);
for (const client of this.clients) {
if (client !== excludeSocket && client.readyState === 'open') {
client.write(frame);
}
}
}
close() {
if (this.server) {
this.server.close(() => {
console.log('WebSocket 服务器已关闭');
});
}
}
}
// 使用 WebSocket 服务器
const wsServer = new WebSocketServer({ port: 8080 });
wsServer.start();
// 优雅关闭
process.on('SIGINT', () => {
wsServer.close();
process.exit(0);
});五、总结
Node.js 提供了强大的网络编程能力,支持多种网络协议:
- TCP:可靠的面向连接的协议,适用于需要保证数据完整性的应用
- UDP:无连接的协议,适用于实时性要求高但可以容忍少量数据丢失的应用
- HTTP/HTTPS:应用层协议,是 Web 开发的基础
- WebSocket:全双工通信协议,适用于实时通信应用
通过深入理解这些协议的实现机制,我们可以:
- 构建高性能的网络应用
- 选择合适的协议满足不同业务需求
- 实现自定义协议处理器
- 优化网络通信性能
在实际开发中,应根据具体需求选择合适的协议和实现方式,同时注意处理异常情况、实现重连机制、优化性能等,以确保网络应用的稳定性和可靠性。