WebSocket(WebSocket)
一、WebSocket 概述
1.1 为什么需要 WebSocket
HTTP 是请求-响应模型,客户端发起请求,服务器返回响应。这种模式不适合需要服务器主动推送的场景,比如:
- 实时聊天应用
- 股票行情更新
- 在线游戏
- 协作编辑
- 进度通知
传统解决方案如轮询(polling)和长轮询(long polling)都有性能或延迟问题。WebSocket 提供了真正的双向通信能力。
1.2 WebSocket 与 HTTP 的对比
| 特性 | HTTP | WebSocket |
|---|---|---|
| 连接方向 | 客户端发起请求 | 双向对等 |
| 服务器推送 | 不支持 | 完全支持 |
| 头部开销 | 大(每个请求携带完整 Header) | 小(握手后几乎没有额外头部) |
| 连接次数 | 每次请求建立新连接 | 建立一次连接 |
| 数据格式 | 请求-响应 | 帧(frame) |
| 适用场景 | RESTful API | 实时应用 |
1.3 协议升级过程
WebSocket 使用 HTTP 的 Upgrade 机制建立连接:
GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25nZQ==
Sec-WebSocket-Version: 13
服务器响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
二、WebSocket API
2.1 建立连接
const ws = new WebSocket('ws://example.com/ws');
// wss:// 表示加密的 WebSocket(生产环境推荐)
// const ws = new WebSocket('wss://example.com/ws');
ws.addEventListener('open', (event) => {
console.log('Connection opened');
// 发送消息
ws.send('Hello, server!');
});
ws.addEventListener('message', (event) => {
console.log('Message received:', event.data);
});
ws.addEventListener('error', (event) => {
console.error('Error:', event);
});
ws.addEventListener('close', (event) => {
console.log('Connection closed');
console.log('Code:', event.code);
console.log('Reason:', event.reason);
});
2.2 发送和接收数据
// 发送字符串
ws.send('Hello, world!');
// 发送 JSON
ws.send(JSON.stringify({ type: 'message', content: 'Hello!' }));
// 发送二进制数据
const buffer = new ArrayBuffer(8);
ws.send(buffer);
// 发送 Blob
const blob = new Blob(['Hello'], { type: 'text/plain' });
ws.send(blob);
// 接收消息
ws.addEventListener('message', (event) => {
// event.data 可以是 String、Blob、ArrayBuffer
if (typeof event.data === 'string') {
console.log('Text message:', event.data);
} else if (event.data instanceof Blob) {
event.data.text().then(text => {
console.log('Blob message:', text);
});
} else if (event.data instanceof ArrayBuffer) {
console.log('ArrayBuffer message:', event.data);
}
});
2.3 连接状态
const ws = new WebSocket('ws://example.com/ws');
console.log(ws.readyState);
// 0: CONNECTING
// 1: OPEN
// 2: CLOSING
// 3: CLOSED
// 检查连接状态
if (ws.readyState === WebSocket.OPEN) {
ws.send('Hello!');
}
// 监听状态变化
ws.addEventListener('open', () => {
console.log('Connected');
});
ws.addEventListener('close', () => {
console.log('Disconnected');
});
2.4 关闭连接
// 客户端主动关闭
ws.close(1000, 'Normal closure');
// 关闭码说明:
// 1000: 正常关闭
// 1001: 服务器关闭
// 1002: 协议错误
// 1003: 不支持的数据类型
// 1005: 无关闭码(内部使用)
// 1006: 连接异常关闭(不应该在代码中使用)
// 1007: 消息编码错误
// 1008: 违反政策
// 1009: 消息太大
// 1010: 缺少扩展
// 1011: 服务器错误
三、重连策略
3.1 基本重连实现
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.reconnectInterval = options.reconnectInterval || 1000;
this.maxReconnectInterval = options.maxReconnectInterval || 30000;
this.reconnectDecay = options.reconnectDecay || 1.5;
this.maxReconnectAttempts = options.maxReconnectAttempts || Infinity;
this.readyState = WebSocket.CONNECTING;
this.reconnectAttempts = 0;
this.shouldReconnect = true;
this.connect();
}
connect() {
try {
this.ws = new WebSocket(this.url);
} catch (error) {
this.scheduleReconnect();
return;
}
this.ws.addEventListener('open', () => {
this.readyState = WebSocket.OPEN;
this.reconnectAttempts = 0;
});
this.ws.addEventListener('message', (event) => {
this.onmessage(event);
});
this.ws.addEventListener('close', (event) => {
this.readyState = WebSocket.CLOSED;
if (this.shouldReconnect) {
this.scheduleReconnect();
}
this.onclose(event);
});
this.ws.addEventListener('error', (event) => {
this.onerror(event);
});
}
scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.log('Max reconnect attempts reached');
return;
}
const delay = this.reconnectInterval * Math.pow(this.reconnectDecay, this.reconnectAttempts);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, delay);
}
send(data) {
if (this.ws && this.readyState === WebSocket.OPEN) {
this.ws.send(data);
} else {
console.error('WebSocket is not open');
}
}
close() {
this.shouldReconnect = false;
this.ws.close();
}
// 可被重写的回调
onopen(event) {}
onmessage(event) {}
onclose(event) {}
onerror(event) {}
}
3.2 使用示例
const ws = new ReconnectingWebSocket('wss://example.com/ws', {
reconnectInterval: 1000,
maxReconnectAttempts: 10
});
ws.onopen = () => console.log('Connected');
ws.onmessage = (event) => console.log('Message:', event.data);
ws.onclose = (event) => console.log('Closed:', event.code, event.reason);
ws.onerror = (event) => console.error('Error:', event);
四、心跳机制
4.1 心跳检测连接状态
class HeartbeatWebSocket {
constructor(url, options = {}) {
this.url = url;
this.pingInterval = options.pingInterval || 30000;
this.pongTimeout = options.pongTimeout || 10000;
this.ws = null;
this.pingTimer = null;
this.pongTimer = null;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.addEventListener('open', () => {
this.startHeartbeat();
});
this.ws.addEventListener('message', (event) => {
if (event.data === 'pong') {
this.onPong();
} else {
this.onmessage(event);
}
});
this.ws.addEventListener('close', () => {
this.stopHeartbeat();
});
}
startHeartbeat() {
this.stopHeartbeat();
this.pingTimer = setInterval(() => {
this.send('ping');
this.resetPongTimer();
}, this.pingInterval);
}
stopHeartbeat() {
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = null;
}
this.stopPongTimer();
}
resetPongTimer() {
this.stopPongTimer();
this.pongTimer = setTimeout(() => {
console.log('Pong timeout, reconnecting...');
this.ws.close();
}, this.pongTimeout);
}
stopPongTimer() {
if (this.pongTimer) {
clearTimeout(this.pongTimer);
this.pongTimer = null;
}
}
onPong() {
this.resetPongTimer();
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
}
}
close() {
this.stopHeartbeat();
this.ws.close();
}
onmessage(event) {}
}
五、消息协议设计
5.1 JSON 消息格式
// 消息类型定义
const MessageType = {
CHAT: 'chat',
NOTIFICATION: 'notification',
PRESENCE: 'presence',
ERROR: 'error',
PING: 'ping',
PONG: 'pong'
};
// 发送消息
function sendMessage(ws, type, payload) {
const message = {
type,
payload,
timestamp: Date.now()
};
ws.send(JSON.stringify(message));
}
// 接收消息
ws.addEventListener('message', (event) => {
try {
const message = JSON.parse(event.data);
switch (message.type) {
case MessageType.CHAT:
handleChat(message.payload);
break;
case MessageType.NOTIFICATION:
handleNotification(message.payload);
break;
case MessageType.ERROR:
handleError(message.payload);
break;
case MessageType.PING:
ws.send(JSON.stringify({ type: MessageType.PONG }));
break;
}
} catch (error) {
console.error('Failed to parse message:', error);
}
});
5.2 二进制协议设计
// 固定头部长度 8 字节
// | 1 byte | 4 bytes | 2 bytes | 1 byte |
// | version | timestamp | payloadLen | type |
const PROTOCOL_VERSION = 1;
function encodeMessage(type, payload) {
const payloadBytes = new TextEncoder().encode(JSON.stringify(payload));
const buffer = new ArrayBuffer(8 + payloadBytes.length);
const view = new DataView(buffer);
// 写入版本
view.setUint8(0, PROTOCOL_VERSION);
// 写入时间戳
view.setUint32(1, Date.now());
// 写入长度
view.setUint16(5, payloadBytes.length);
// 写入类型
view.setUint8(7, type);
// 写入数据
const payloadView = new Uint8Array(buffer, 8);
payloadView.set(payloadBytes);
return buffer;
}
function decodeMessage(buffer) {
const view = new DataView(buffer);
const version = view.getUint8(0);
const timestamp = view.getUint32(1);
const payloadLen = view.getUint16(5);
const type = view.getUint8(7);
const payloadBytes = new Uint8Array(buffer, 8, payloadLen);
const payload = JSON.parse(new TextDecoder().decode(payloadBytes));
return { version, timestamp, type, payload };
}
六、安全考虑
6.1 验证与认证
// 通过 URL 参数传递 token(不安全)
// const ws = new WebSocket(`wss://example.com/ws?token=${token}`);
// 更好的方式:握手后立即发送认证消息
const ws = new WebSocket('wss://example.com/ws');
ws.addEventListener('open', () => {
ws.send(JSON.stringify({
type: 'auth',
token: 'your-jwt-token'
}));
});
ws.addEventListener('message', (event) => {
const message = JSON.parse(event.data);
if (message.type === 'auth_success') {
console.log('Authenticated');
} else if (message.type === 'auth_error') {
console.error('Authentication failed');
ws.close();
}
});
6.2 跨域 WebSocket
// 客户端
const ws = new WebSocket('wss://api.example.com/ws');
// 服务器需要正确设置 CORS 头
// Access-Control-Allow-Origin: https://your-domain.com
七、性能优化
7.1 消息分片
WebSocket 支持消息分片,用于发送大消息:
// 发送大消息时分片
const MESSAGE_THRESHOLD = 64 * 1024; // 64KB
function sendLargeMessage(ws, data) {
const message = typeof data === 'string' ? data : JSON.stringify(data);
if (message.length > MESSAGE_THRESHOLD) {
// 分片发送
const fragments = [];
let offset = 0;
while (offset < message.length) {
const chunk = message.slice(offset, offset + MESSAGE_THRESHOLD);
const isLast = offset + chunk.length >= message.length;
ws.send(JSON.stringify({
type: 'fragment',
data: chunk,
isLast,
offset
}));
offset += chunk.length;
}
} else {
ws.send(message);
}
}
7.2 连接池
class WebSocketPool {
constructor(url, poolSize = 5) {
this.url = url;
this.poolSize = poolSize;
this.available = [];
this.inUse = new Set();
// 初始化连接池
for (let i = 0; i < poolSize; i++) {
this.available.push(this.createConnection());
}
}
createConnection() {
const ws = new WebSocket(this.url);
ws.addEventListener('close', () => {
// 连接关闭时重新创建
const index = this.available.indexOf(ws);
if (index > -1) {
this.available.splice(index, 1);
this.available.push(this.createConnection());
}
this.inUse.delete(ws);
});
return ws;
}
acquire() {
if (this.available.length === 0) {
return this.createConnection();
}
const ws = this.available.pop();
this.inUse.add(ws);
return ws;
}
release(ws) {
this.inUse.delete(ws);
if (ws.readyState === WebSocket.OPEN) {
this.available.push(ws);
} else {
this.available.push(this.createConnection());
}
}
}