WebSocket实现
WebSocket简介
WebSocket是一种在单个TCP连接上进行全双工通信的协议。它允许服务端主动向客户端推送数据,解决了HTTP协议无法实现服务器主动推送的问题。WebSocket协议在2011年被IETF定为标准RFC 6455,后被W3C定为标准。
WebSocket与HTTP对比
HTTP协议是无状态的,每次请求都需要建立新的连接,而WebSocket只需要建立一次连接:
- HTTP是半双工,WebSocket是全双工
- HTTP每次请求都包含头部信息,WebSocket建立连接后数据传输量小
- HTTP需要客户端轮询获取新数据,WebSocket支持服务端主动推送
// HTTP轮询示例
setInterval(() => {
fetch('/api/data')
.then(res => res.json())
.then(data => console.log(data))
}, 1000);
// WebSocket示例
const ws = new WebSocket('ws://example.com');
ws.onmessage = (event) => {
console.log(event.data);
};
Node.js中的WebSocket实现
使用ws模块
ws是Node.js中最流行的WebSocket库,轻量且高效:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
console.log('新客户端连接');
ws.on('message', (message) => {
console.log(`收到消息: ${message}`);
// 广播给所有客户端
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
});
ws.send('欢迎连接WebSocket服务器');
});
使用Socket.io
Socket.io提供了更高级的功能,如自动重连、房间支持等:
const server = require('http').createServer();
const io = require('socket.io')(server);
io.on('connection', (socket) => {
console.log('用户连接');
socket.on('chat message', (msg) => {
io.emit('chat message', msg);
});
socket.on('disconnect', () => {
console.log('用户断开连接');
});
});
server.listen(3000);
WebSocket握手过程
WebSocket连接通过HTTP升级请求建立:
- 客户端发送握手请求:
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
- 服务端响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
消息帧格式
WebSocket数据以帧的形式传输,包含:
- FIN: 是否是最后一帧
- Opcode: 操作码(0x1文本,0x2二进制)
- Mask: 是否掩码
- Payload length: 数据长度
- Masking-key: 掩码键(如果有)
- Payload data: 实际数据
// 手动解析WebSocket帧(简化版)
function parseFrame(buffer) {
const firstByte = buffer.readUInt8(0);
const secondByte = buffer.readUInt8(1);
const opcode = firstByte & 0x0F;
const isMasked = Boolean((secondByte >>> 7) & 0x01);
let payloadLength = secondByte & 0x7F;
let offset = 2;
if (payloadLength === 126) {
payloadLength = buffer.readUInt16BE(offset);
offset += 2;
} else if (payloadLength === 127) {
payloadLength = buffer.readBigUInt64BE(offset);
offset += 8;
}
// 省略掩码处理...
return {
opcode,
payloadLength,
payloadData: buffer.slice(offset)
};
}
心跳机制
保持连接活跃的心跳机制实现:
// 服务端心跳
function setupHeartbeat(ws) {
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
const interval = setInterval(() => {
if (!ws.isAlive) return ws.terminate();
ws.isAlive = false;
ws.ping(null, false);
}, 30000);
ws.on('close', () => clearInterval(interval));
}
// 客户端心跳
const ws = new WebSocket('ws://example.com');
const heartbeatInterval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
}
}, 25000);
ws.on('close', () => clearInterval(heartbeatInterval));
安全考虑
认证授权
// Token认证示例
const wss = new WebSocket.Server({ noServer: true });
server.on('upgrade', (request, socket, head) => {
const token = request.url.split('token=')[1];
if (!validateToken(token)) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
});
消息验证
ws.on('message', (message) => {
try {
const data = JSON.parse(message);
if (!isValid(data)) {
throw new Error('Invalid message');
}
// 处理有效消息
} catch (err) {
ws.close(1008, 'Invalid message format');
}
});
性能优化
消息压缩
const WebSocket = require('ws');
const zlib = require('zlib');
const wss = new WebSocket.Server({
port: 8080,
perMessageDeflate: {
zlibDeflateOptions: {
chunkSize: 1024,
memLevel: 7,
level: 3
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
threshold: 1024 // 仅压缩大于此字节的消息
}
});
连接限制
const wss = new WebSocket.Server({ port: 8080 });
const connections = new Set();
wss.on('connection', (ws) => {
if (connections.size >= 1000) {
ws.close(1013, 'Server too busy');
return;
}
connections.add(ws);
ws.on('close', () => connections.delete(ws));
});
实际应用场景
实时聊天应用
// 服务端
wss.on('connection', (ws) => {
ws.userData = { id: generateId() };
ws.on('message', (message) => {
const data = JSON.parse(message);
switch(data.type) {
case 'join':
ws.userData.room = data.room;
break;
case 'message':
broadcastToRoom(ws.userData.room, {
type: 'message',
user: ws.userData.id,
text: data.text,
timestamp: Date.now()
});
break;
}
});
});
function broadcastToRoom(room, message) {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN &&
client.userData.room === room) {
client.send(JSON.stringify(message));
}
});
}
实时数据监控
// 服务端推送传感器数据
const sensorData = {
temperature: 0,
humidity: 0
};
setInterval(() => {
// 模拟数据变化
sensorData.temperature = 20 + 5 * Math.random();
sensorData.humidity = 50 + 10 * Math.random();
sensorData.timestamp = Date.now();
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({
type: 'sensorUpdate',
data: sensorData
}));
}
});
}, 1000);
错误处理与调试
常见错误代码
- 1006: 异常关闭
- 1001: 端点离开
- 1002: 协议错误
- 1008: 策略违规
- 1011: 服务器错误
ws.on('close', (code, reason) => {
console.log(`连接关闭,代码: ${code}, 原因: ${reason}`);
if (code === 1006) {
console.log('可能是网络问题导致连接异常断开');
}
});
ws.on('error', (error) => {
console.error('WebSocket错误:', error);
});
调试工具
使用浏览器开发者工具查看WebSocket流量:
- Chrome开发者工具 → Network → WS
- 点击WebSocket连接查看消息帧
- 可以过滤和查看每条消息内容
浏览器兼容性
现代浏览器都支持WebSocket API,但需要考虑降级方案:
function connectWebSocket() {
if ('WebSocket' in window) {
return new WebSocket('ws://example.com');
} else if ('MozWebSocket' in window) {
return new MozWebSocket('ws://example.com');
} else {
// 降级为长轮询
setupPolling();
return null;
}
}
function setupPolling() {
// 实现HTTP长轮询
let timestamp = 0;
function poll() {
fetch(`/poll?since=${timestamp}`)
.then(res => res.json())
.then(data => {
timestamp = data.timestamp;
processMessages(data.messages);
poll();
});
}
poll();
}
WebSocket扩展
自定义协议
// 定义简单协议
// 格式: type|payload
const PROTOCOL = {
MESSAGE: 'msg',
NOTIFICATION: 'noti',
COMMAND: 'cmd'
};
// 发送消息
function sendMessage(ws, type, payload) {
ws.send(`${type}|${JSON.stringify(payload)}`);
}
// 接收处理
ws.on('message', (data) => {
const [type, payload] = data.split('|');
const parsedPayload = JSON.parse(payload);
switch(type) {
case PROTOCOL.MESSAGE:
handleMessage(parsedPayload);
break;
case PROTOCOL.NOTIFICATION:
handleNotification(parsedPayload);
break;
}
});
二进制数据传输
// 发送二进制数据
const buffer = new ArrayBuffer(8);
const view = new Uint8Array(buffer);
for (let i = 0; i < 8; i++) view[i] = i;
ws.onopen = () => {
ws.send(buffer);
};
// 接收二进制数据
ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
const view = new Uint8Array(event.data);
console.log('收到二进制数据:', view);
}
};
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn