詳解 WebSocket 實(shí)現(xiàn)
當(dāng)前位置:點(diǎn)晴教程→知識(shí)管理交流
→『 技術(shù)文檔交流 』
前言為什么寫(xiě)這篇文章?對(duì)于應(yīng)用協(xié)議的了解,相信大部分👨🎓始終停留在使用上,可能讀了如 《圖解HTTP》、《計(jì)算機(jī)網(wǎng)絡(luò)》 一類的書(shū)籍,了解了更深層次的理論,但理論始終是理論,我們很難有機(jī)會(huì)能在工作場(chǎng)景里面去觸碰到協(xié)議的實(shí)現(xiàn)上,對(duì)應(yīng)用層協(xié)議的理解是片面的。 WebSocket 的實(shí)現(xiàn)非常適合前端的同學(xué)學(xué)習(xí),通過(guò)了解 WebSocket Node版實(shí)現(xiàn),站在更高的維度上去看待這些應(yīng)用層協(xié)議,無(wú)論是WebRTC,還是HTTP都好,有了對(duì)一種協(xié)議實(shí)現(xiàn)的整體思路,就有了面對(duì)各種協(xié)議的技術(shù)自信。 WebSocket簡(jiǎn)介WebSocket 是 HTML5 開(kāi)始提供的一種在單個(gè) TCP 連接上進(jìn)行全雙工(full-duplex)通訊的協(xié)議。沒(méi)有了 Request 和 Response 的概念,兩者地位完全平等,連接一旦建立,就建立了真•持久性連接,雙方可以隨時(shí)向?qū)Ψ桨l(fā)送數(shù)據(jù)。 注:全雙工(Full Duplex)是一種通信方式,指通信的雙方可以同時(shí)發(fā)送和接收數(shù)據(jù),而且在同一時(shí)刻,發(fā)送和 接收是獨(dú)立進(jìn)行的。 概念老生常談了,不妨思考兩個(gè)問(wèn)題: WebSocket 為什么能進(jìn)行全雙工通信?HTTP 卻不行? HTTP是非持久化連接,每次客戶端向服務(wù)器發(fā)送請(qǐng)求時(shí),都需要建立一個(gè)新的 TCP 連接。這個(gè)連接在響應(yīng)結(jié)束后就會(huì)被關(guān)閉,不保留在系統(tǒng)中,而 websocket 會(huì)保留,所以可以繼續(xù)保持通信。 HTTP 為什么是非持久化連接而 WebSocket 是持久化連接? HTTP 協(xié)議的設(shè)計(jì)初衷是為了傳輸靜態(tài)文本信息,如 HTML、CSS、JS 等等,傳輸靜態(tài)文本大部分時(shí)間連接并不會(huì)被頻繁地打開(kāi)和關(guān)閉,所以使用持久化連接帶來(lái)的復(fù)雜度可能會(huì)超過(guò)它的收益。而不像 websocket 更多是為了實(shí)時(shí)通信的場(chǎng)景,所以采用持久化連接。 WebSocket握手過(guò)程我們經(jīng)常聽(tīng)到一種說(shuō)法,WebSocket 基于 HTTP,實(shí)際上只有在建立握手時(shí),數(shù)據(jù)是通過(guò) HTTP 傳輸?shù)摹5墙⒅?,在真正傳輸時(shí)候是不需要 HTTP協(xié)議。握手具體過(guò)程如圖所示: 注:websocket 會(huì)保留 HTTP 握手后的 socket 連接,后續(xù)即可利用這個(gè) socket 進(jìn)行通訊,無(wú)需再關(guān)注 HTTP。 websocket 為什么采用 HTTP 握手? WebSocket 是相對(duì)較新的協(xié)議,可能并不是所有的網(wǎng)絡(luò)設(shè)備和服務(wù)器都支持。因此,在瀏覽器請(qǐng)求服務(wù)器進(jìn)行 WebSocket 握手時(shí),使用基于 HTTP 協(xié)議的握手方式可以避免協(xié)議兼容性問(wèn)題。 WebSocket實(shí)現(xiàn)原理WebSocket數(shù)據(jù)幀說(shuō)明WebSocket 以幀的形式進(jìn)行數(shù)據(jù)傳輸,幀組成包括以下幾個(gè)部分:
各字段含義如表格所示:
把數(shù)據(jù)幀組成搞清楚,可以說(shuō) websocket 你就了解了一大半,協(xié)議實(shí)現(xiàn)里大部分操作都是對(duì)于數(shù)據(jù)幀的處理。 構(gòu)造幀websocket 的數(shù)據(jù)是以幀的形式傳輸,那么我們就需要了解如何構(gòu)造幀。構(gòu)造幀只是聽(tīng)起來(lái)很復(fù)雜,構(gòu)造一個(gè)數(shù)據(jù)幀我們只需要遵守協(xié)議規(guī)則填寫(xiě)即可,按照WebSocket的協(xié)議標(biāo)準(zhǔn),構(gòu)造一個(gè)最短數(shù)據(jù)幀我們只需要三個(gè)字節(jié)就能完成,構(gòu)成如表格所示下:
代碼實(shí)現(xiàn): 注:本質(zhì)就是做一些字節(jié)拼接操作,把對(duì)應(yīng)的標(biāo)識(shí)放到對(duì)應(yīng)的位置即可。 數(shù)據(jù)傳輸了解往構(gòu)造幀的過(guò)程,那 websocket 是如何把幀發(fā)送出去的? WebSocket 在握手過(guò)程中會(huì)保留 HTTP 握手后的 socket 連接,這在前面有提到,所以我們可以通過(guò)這個(gè) socket 連接進(jìn)行數(shù)據(jù)的傳輸。 代碼實(shí)現(xiàn)如下: 心跳機(jī)制在連接過(guò)中,防止連接因長(zhǎng)時(shí)間無(wú)數(shù)據(jù)傳輸而被提前關(guān)閉,WebSocket 還引入了心跳機(jī)制,原理可以概括為:定期發(fā)送心跳包,以確認(rèn)客戶端與服務(wù)器的連接狀態(tài),并避免連接因長(zhǎng)時(shí)間空閑而被中斷。 代碼實(shí)現(xiàn)如下: mini-ws以下為一個(gè) websocket server 的簡(jiǎn)易實(shí)現(xiàn)(代碼來(lái)源): 測(cè)試代碼: var crypto = require("crypto"); var { EventEmitter } = require("events"); var MAX_FRAME_SIZE = 1024; // 最長(zhǎng)長(zhǎng)度限制 var MAGIC_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; /** * 數(shù)據(jù)類型操作碼 TEXT 字符串 * BINARY 二進(jìn)制數(shù)據(jù) 常用來(lái)保存照片 * PING,PONG 用作心跳檢測(cè) * CLOSE 關(guān)閉連接的數(shù)據(jù)幀 (有很多關(guān)閉連接的代碼 1001,1009,1007,1002) */ var OPCODES = { CONTINUE: 0, TEXT: 1, BINARY: 2, CLOSE: 8, PING: 9, PONG: 10, }; var hashWebSocketKey = function (key) { var sha1 = crypto.createHash("sha1"); sha1.update(key + MAGIC_STRING, "ascii"); return sha1.digest("base64"); }; /** * 解掩碼 * @param maskBytes 掩碼數(shù)據(jù) * @param data payload * @returns {Buffer} */ var unmask = function (maskBytes, data) { var payload = Buffer.alloc(data.length); for (var i = 0; i < data.length; i++) { payload[i] = maskBytes[i % 4] ^ data[i]; } return payload; }; /** * 編碼數(shù)據(jù) * @param opcode 操作碼 * @param payload 數(shù)據(jù) * @returns {*} */ var encodeMessage = function (opcode, payload, isFinal = true) { var buf; var b1 = (isFinal ? 0x80 : 0x00) | opcode; var b2; var length = payload.length; if (length < 126) { buf = Buffer.alloc(payload.length + 2 + 0); b2 |= length; //buffer ,offset buf.writeUInt8(b1, 0); //讀前8bit buf.writeUInt8(b2, 1); //讀8―15bit payload.copy(buf, 2); //復(fù)制數(shù)據(jù),從2(第三)字節(jié)開(kāi)始 } else if (length < 1 << 16) { buf = Buffer.alloc(payload.length + 2 + 2); b2 |= 126; buf.writeUInt8(b1, 0); buf.writeUInt8(b2, 1); buf.writeUInt16BE(length, 2); payload.copy(buf, 4); } else { buf = Buffer.alloc(payload.length + 2 + 8); b2 |= 127; buf.writeUInt8(b1, 0); buf.writeUInt8(b2, 1); buf.writeUInt32BE(0, 2); buf.writeUInt32BE(length, 6); payload.copy(buf, 10); } return buf; }; class WebSocket extends EventEmitter { constructor(req, socket, upgradeHead) { super(); var resKey = hashWebSocketKey(req.headers["sec-websocket-key"]); // 構(gòu)造響應(yīng)頭 var resHeaders = [ "HTTP/1.1 101 Switching Protocols", "Upgrade: websocket", "Connection: Upgrade", "Sec-WebSocket-Accept: " + resKey, ] .concat("", "") .join("\r\n"); socket.on("data", (data) => { this.buffer = Buffer.concat([this.buffer, data]); while (this._processBuffer()) {} }); socket.on("close", (had_error) => { if (!this.closed) { this.emit("close", 1006); this.closed = true; } }); socket.write(resHeaders); this.socket = socket; this.buffer = Buffer.alloc(0); this.closed = false; this.frames = Buffer.alloc(0); this.frameOpcode = 0; this.keepLiveTimer = null; } /* 發(fā)送數(shù)據(jù)函數(shù) * */ send(obj) { var opcode; var payload; // 如果是二進(jìn)制 if (Buffer.isBuffer(obj)) { opcode = OPCODES.BINARY; payload = obj; } else if (typeof obj) { // 承載的文本內(nèi)容 opcode = OPCODES.TEXT; //創(chuàng)造一個(gè)utf8的編碼,可以被編碼為字符串 payload = Buffer.from(obj, "utf8"); } else { throw new Error("cannot send object.Must be string of Buffer"); } this._doSend(opcode, payload); } // 默認(rèn) 45 秒 保持發(fā)送心跳 keepLive(timeout = 45000) { var self = this; function keepit() { self._doSend(OPCODES.PING, Buffer.from("ping")); console.log("server send ping..."); // 在關(guān)閉連接的情況下就不再需要發(fā)送 ping 請(qǐng)求了 if (!self.closed) { self.keepLiveTimer = setTimeout(keepit, timeout); } } keepit(); } /* 關(guān)閉連接函數(shù) * */ close(code, reason) { var opcode = OPCODES.CLOSE; var buffer; if (code) { buffer = Buffer.alloc(Buffer.byteLength(reason) + 2); buffer.writeUInt16BE(code, 0); buffer.write(reason, 2); } else { buffer = Buffer.alloc(0); } this._doSend(opcode, buffer); this.closed = true; } _processBuffer() { var buf = this.buffer; if (buf.length < 2) { return; } var idx = 2; var byte1 = buf.readUInt8(0); // 讀取數(shù)據(jù)幀的前 8 bit var FIN = byte1 & 0x80; // 如果為0x80,則標(biāo)志傳輸結(jié)束,獲取高位 bit var opcode = byte1 & 0x0f; //截取第一個(gè)字節(jié)的后 4 位,即 opcode 碼 // 如果是 0 的話,說(shuō)明是延續(xù)幀,需要保存好 opCode if (!FIN) { this.frameOpcode = opcode || this.frameOpcode; // 確保不為 0; } var byte2 = buf.readUInt8(1); // 讀取數(shù)據(jù)幀第二個(gè)字節(jié) var MASK = byte2 & 0x80; // 判斷是否有掩碼,客戶端必須要有,獲取高位 bit var length = byte2 & 0x7f; //獲取length屬性,也是小于126數(shù)據(jù)長(zhǎng)度的數(shù)據(jù)真實(shí)值 if (length > 125) { if (buf.length < 8) { return; // 如果大于125,而字節(jié)數(shù)小于 8,則顯然不合規(guī)范要求 } } if (length === 126) { //獲取的值為126 ,表示后兩個(gè)字節(jié)(16位)用于表示數(shù)據(jù)長(zhǎng)度 length = buf.readUInt16BE(2); // 讀取 16bit 的值 idx += 2; // +2 } else if (length === 127) { //獲取的值為 127 ,表示后 8 個(gè)字節(jié)(64位)用于表示數(shù)據(jù)長(zhǎng)度,其中高 4 字節(jié)是 0 var highBits = buf.readUInt32BE(2); //(1/0)1111111,切記 MSB 最高位是 0 if (highBits != 0) { this.close(1009, ""); //1009 關(guān)閉代碼,說(shuō)明數(shù)據(jù)太大; 協(xié)議里是支持 63 位長(zhǎng)度,不過(guò)這里我們自己實(shí)現(xiàn)的話,只支持 32 位長(zhǎng)度,防止數(shù)據(jù)過(guò)大; } length = buf.readUInt32BE(6); // 從第 6 到第 10 個(gè)字節(jié)(32位)為真實(shí)存放的數(shù)據(jù)長(zhǎng)度 idx += 8; } if (buf.length < idx + 4 + length) { //不夠長(zhǎng) 4為掩碼字節(jié)數(shù) return; } // 如果有 mask 標(biāo)志位,默認(rèn)都是有的 if (MASK) { var maskBytes = buf.slice(idx, idx + 4); //獲取掩碼數(shù)據(jù) idx += 4; //指針前移到真實(shí)數(shù)據(jù)段 var payload = buf.slice(idx, idx + length); // 數(shù)據(jù)長(zhǎng)度的單位是字節(jié) payload = unmask(maskBytes, payload); //解碼真實(shí)數(shù)據(jù) } else { payload = buf.slice(idx, idx + length); } this.buffer = buf.slice(idx + length); // 緩存 buffer // 有可能是分幀,需要拼接數(shù)據(jù) this.frames = Buffer.concat([this.frames, payload]); // 保存到 frames 中 if (!FIN) { console.log( "server detect fragment, sizeof payload:", Buffer.byteLength(payload) ); } if (FIN) { payload = this.frames.slice(0); // 獲取所有拼接完整的數(shù)據(jù) opcode = opcode || this.frameOpcode; // 如果是 0 ,則保持獲取之前保存的 code this.frames = Buffer.alloc(0); // 清空 frames this.frameOpcode = 0; // 清空 opcode this._handleFrame(opcode, payload); // 處理操作碼 } return true; // 繼續(xù)處理 } /** * 針對(duì)不同操作碼進(jìn)行不同處理 * @param 操作碼 * @param 數(shù)據(jù) */ _handleFrame(opcode, buffer) { var payload; switch (opcode) { case OPCODES.TEXT: payload = buffer.toString("utf8"); //如果是文本需要轉(zhuǎn)化為utf8的編碼 this.emit("data", opcode, payload); //Buffer.toString()默認(rèn)utf8 這里是故意指示的 break; case OPCODES.BINARY: //二進(jìn)制文件直接交付 payload = buffer; this.emit("data", opcode, payload); break; case OPCODES.PING: // 發(fā)送 pong 做響應(yīng) this._doSend(OPCODES.PONG, buffer); break; case OPCODES.PONG: //不做處理 console.log("server receive pong"); break; case OPCODES.CLOSE: // close有很多關(guān)閉碼 let code, reason; // 用于獲取關(guān)閉碼和關(guān)閉原因 if (buffer.length >= 2) { code = buffer.readUInt16BE(0); reason = buffer.toString("utf8", 2); } this.close(code, reason); this.emit("close", code, reason); break; default: this.close(1002, "unhandle opcode:" + opcode); } } // 這里可以針對(duì) payload 的長(zhǎng)度做分片 _doSend(opcode, payload) { var len = Buffer.byteLength(payload); // 分片的距離邏輯 var count = 0; while (len > MAX_FRAME_SIZE) { var framePayload = payload.slice(0, MAX_FRAME_SIZE); payload = payload.slice(MAX_FRAME_SIZE); this.socket.write( encodeMessage( count > 0 ? OPCODES.CONTINUE : opcode, framePayload, false ) ); //編碼后直接通過(guò)socket發(fā)送 count++; len = Buffer.byteLength(payload); } this.socket.write( encodeMessage(count > 0 ? OPCODES.CONTINUE : opcode, payload) ); //編碼后直接通過(guò)socket發(fā)送 } } module.exports = WebSocket; 運(yùn)行測(cè)試代碼,打開(kāi)瀏覽器訪問(wèn):http://localhost:3000/,在控制臺(tái)輸入: var http = require('http'); var WebSocket = require('./websocket'); // HTTP服務(wù)器部分 var server = http.createServer(function(req, res) { res.end('websocket test\r\n'); }); console.log('starting...'); // Upgrade請(qǐng)求處理 server.on('upgrade', callback); function callback(req, socket, upgradeHead) { var ws = new WebSocket(req, socket, upgradeHead); // ws.keepLive(); // 保持心跳連接,否則一般經(jīng)過(guò)一定的時(shí)間沒(méi)有數(shù)據(jù)交互,瀏覽器端會(huì)主動(dòng)關(guān)閉 ws 鏈接 ws.on('data', function(opcode, payload) { console.log('receive data:', opcode, payload.length); ws.send('good job'); }); ws.on('close', function(code, reason) { console.log('close:', code, reason); }); } server.listen(3000); 建立連接后發(fā)送消息: ws.send('hello world'); 成功發(fā)送并收到回復(fù)! 也可以通過(guò)報(bào)文查看: 參考文檔———————————————— https://juejin.cn/post/7236954203555151933 該文章在 2023/5/30 10:09:07 編輯過(guò) |
關(guān)鍵字查詢
相關(guān)文章
正在查詢... |