diff --git a/lib/websocket.js b/lib/websocket.js index 7f1e3bcfa..b37d24fa1 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -1002,10 +1002,13 @@ function socketOnClose() { const websocket = this[kWebSocket]; this.removeListener('close', socketOnClose); + this.removeListener('data', socketOnData); this.removeListener('end', socketOnEnd); websocket._readyState = WebSocket.CLOSING; + let chunk; + // // The close frame might not have been received or the `'end'` event emitted, // for example, if the socket was destroyed due to an error. Ensure that the @@ -1013,13 +1016,19 @@ function socketOnClose() { // it. If the readable side of the socket is in flowing mode then there is no // buffered data as everything has been already written and `readable.read()` // will return `null`. If instead, the socket is paused, any possible buffered - // data will be read as a single chunk and emitted synchronously in a single - // `'data'` event. + // data will be read as a single chunk. // - websocket._socket.read(); + if ( + !this._readableState.endEmitted && + !websocket._closeFrameReceived && + !websocket._receiver._writableState.errorEmitted && + (chunk = websocket._socket.read()) !== null + ) { + websocket._receiver.write(chunk); + } + websocket._receiver.end(); - this.removeListener('data', socketOnData); this[kWebSocket] = undefined; clearTimeout(websocket._closeTimer); diff --git a/test/websocket.test.js b/test/websocket.test.js index a1a1cda43..994debe19 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -10,6 +10,7 @@ const tls = require('tls'); const fs = require('fs'); const { URL } = require('url'); +const Sender = require('../lib/sender'); const WebSocket = require('..'); const { GUID, NOOP } = require('../lib/constants'); @@ -2735,15 +2736,21 @@ describe('WebSocket', () => { }); }); - it('consumes all received data when connection is closed abnormally', (done) => { + it('consumes all received data when connection is closed (1/2)', (done) => { const wss = new WebSocket.Server( { perMessageDeflate: { threshold: 0 }, port: 0 }, () => { - const ws = new WebSocket(`ws://localhost:${wss.address().port}`); const messages = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws._socket.on('close', () => { + assert.strictEqual(ws._receiver._state, 5); + }); + }); ws.on('message', (message) => messages.push(message)); ws.on('close', (code) => { @@ -2762,6 +2769,76 @@ describe('WebSocket', () => { }); }); + it('consumes all received data when connection is closed (2/2)', (done) => { + const payload1 = Buffer.alloc(15 * 1024); + const payload2 = Buffer.alloc(1); + + const opts = { + fin: true, + opcode: 0x02, + mask: false, + readOnly: false + }; + + const list = [ + ...Sender.frame(payload1, { rsv1: false, ...opts }), + ...Sender.frame(payload2, { rsv1: true, ...opts }) + ]; + + for (let i = 0; i < 399; i++) { + list.push(list[list.length - 2], list[list.length - 1]); + } + + const data = Buffer.concat(list); + + const wss = new WebSocket.Server( + { + perMessageDeflate: true, + port: 0 + }, + () => { + const messageLengths = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws._socket.prependListener('close', () => { + assert.strictEqual(ws._receiver._state, 5); + assert.strictEqual(ws._socket._readableState.length, 3); + }); + + const push = ws._socket.push; + + ws._socket.push = (data) => { + ws._socket.push = push; + ws._socket.push(data); + ws.terminate(); + }; + + // This hack is used because there is no guarantee that more than + // 16 KiB will be sent as a single TCP packet. + push.call(ws._socket, data); + + wss.clients + .values() + .next() + .value.send(payload2, { compress: false }); + }); + + ws.on('message', (message) => { + messageLengths.push(message.length); + }); + + ws.on('close', (code) => { + assert.strictEqual(code, 1006); + assert.strictEqual(messageLengths.length, 402); + assert.strictEqual(messageLengths[0], 15360); + assert.strictEqual(messageLengths[messageLengths.length - 1], 1); + wss.close(done); + }); + } + ); + }); + it('handles a close frame received while compressing data', (done) => { const wss = new WebSocket.Server( {