Skip to content

Commit

Permalink
[major] Do not close existing connections
Browse files Browse the repository at this point in the history
When `WebSocketServer.prototype.close()` is called, stop accepting new
connections but do not close the existing ones.

If the HTTP/S server was created internally, then close it and emit the
`'close'` event when it closes. Otherwise, if client tracking is
enabled, then emit the `'close'` event when the number of connections
goes down to zero. Otherwise, emit it in the next tick.

Refs: #1902
  • Loading branch information
lpinca committed Jul 14, 2021
1 parent 78adf5f commit df7de57
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 141 deletions.
11 changes: 8 additions & 3 deletions doc/ws.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,14 @@ added when the `clientTracking` is truthy.

### server.close([callback])

Close the HTTP server if created internally, terminate all clients and call
callback when done. If an external HTTP server is used via the `server` or
`noServer` constructor options, it must be closed manually.
Prevent the server from accepting new connections and close the HTTP server if
created internally. If an external HTTP server is used via the `server` or
`noServer` constructor options, it must be closed manually. Existing connections
are not closed automatically. The server emits a `'close'` event when all
connections are closed unless an external HTTP server is used and client
tracking is disabled. In this case the `'close'` event is emitted in the next
tick. The optional callback is called when the `'close'` event occurs and
receives an `Error` if the server is already closed.

### server.handleUpgrade(request, socket, head, callback)

Expand Down
55 changes: 36 additions & 19 deletions lib/websocket-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ class WebSocketServer extends EventEmitter {
}

if (options.perMessageDeflate === true) options.perMessageDeflate = {};
if (options.clientTracking) this.clients = new Set();
if (options.clientTracking) {
this.clients = new Set();
this._shouldEmitClose = false;
}

this.options = options;
this._state = RUNNING;
}
Expand All @@ -135,9 +139,10 @@ class WebSocketServer extends EventEmitter {
}

/**
* Close the server.
* Stop the server from accepting new connections and emit the `'close'` event
* when all existing connections are closed.
*
* @param {Function} [cb] Callback
* @param {Function} [cb] A one-time listener for the `'close'` event
* @public
*/
close(cb) {
Expand All @@ -151,29 +156,35 @@ class WebSocketServer extends EventEmitter {
if (this._state === CLOSING) return;
this._state = CLOSING;

//
// Terminate all associated clients.
//
if (this.clients) {
for (const client of this.clients) client.terminate();
}
if (this.options.noServer || this.options.server) {
if (this._server) {
this._removeListeners();
this._removeListeners = this._server = null;
}

const server = this._server;
if (this.clients) {
if (!this.clients.size) {
process.nextTick(emitClose, this);
} else {
this._shouldEmitClose = true;
}
} else {
process.nextTick(emitClose, this);
}
} else {
const server = this._server;

if (server) {
this._removeListeners();
this._removeListeners = this._server = null;

//
// Close the http server if it was internally created.
// The HTTP/S server was created internally. Close it, and rely on its
// `'close'` event.
//
if (this.options.port != null) {
server.close(emitClose.bind(undefined, this));
return;
}
server.close(() => {
emitClose(this);
});
}

process.nextTick(emitClose, this);
}

/**
Expand Down Expand Up @@ -373,7 +384,13 @@ class WebSocketServer extends EventEmitter {

if (this.clients) {
this.clients.add(ws);
ws.on('close', () => this.clients.delete(ws));
ws.on('close', () => {
this.clients.delete(ws);

if (this._shouldEmitClose && !this.clients.size) {
process.nextTick(emitClose, this);
}
});
}

cb(ws, req);
Expand Down
25 changes: 25 additions & 0 deletions test/sender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,31 @@ describe('Sender', () => {
});

describe('#close', () => {
it('throws an error if the first argument is invalid', () => {
const mockSocket = new MockSocket();
const sender = new Sender(mockSocket);

assert.throws(
() => sender.close('error'),
/^TypeError: First argument must be a valid error code number$/
);

assert.throws(
() => sender.close(1004),
/^TypeError: First argument must be a valid error code number$/
);
});

it('throws an error if the message is greater than 123 bytes', () => {
const mockSocket = new MockSocket();
const sender = new Sender(mockSocket);

assert.throws(
() => sender.close(1000, 'a'.repeat(124)),
/^RangeError: The message must not be greater than 123 bytes$/
);
});

it('should consume all data before closing', (done) => {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });

Expand Down
100 changes: 62 additions & 38 deletions test/websocket-server.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ describe('WebSocketServer', () => {
},
() => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

ws.on('open', ws.close);
}
);

Expand Down Expand Up @@ -103,6 +105,8 @@ describe('WebSocketServer', () => {
const port = 1337;
const wss = new WebSocket.Server({ port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`);

ws.on('open', ws.close);
});

wss.on('connection', () => wss.close(done));
Expand All @@ -120,12 +124,14 @@ describe('WebSocketServer', () => {

server.listen(0, () => {
const wss = new WebSocket.Server({ server });
const ws = new WebSocket(`ws://localhost:${server.address().port}`);

wss.on('connection', () => {
wss.close();
server.close(done);
});

const ws = new WebSocket(`ws://localhost:${server.address().port}`);

ws.on('open', ws.close);
});
});

Expand Down Expand Up @@ -169,7 +175,11 @@ describe('WebSocketServer', () => {
assert.strictEqual(req.url, '/foo?bar=bar');
} else {
assert.strictEqual(req.url, '/');
wss.close();

for (const client of wss.clients) {
client.close();
}

server.close(done);
}
});
Expand Down Expand Up @@ -209,30 +219,13 @@ describe('WebSocketServer', () => {
});

describe('#close', () => {
it('does not throw when called twice', (done) => {
it('does not throw if called multiple times', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
wss.on('close', done);

wss.close();
wss.close();
wss.close();

done();
});
});

it('closes all clients', (done) => {
let closes = 0;
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
ws.on('close', () => {
if (++closes === 2) done();
});
});

wss.on('connection', (ws) => {
ws.on('close', () => {
if (++closes === 2) done();
});
wss.close();
});
});

Expand All @@ -254,6 +247,8 @@ describe('WebSocketServer', () => {

server.listen(0, () => {
const ws = new WebSocket(`ws://localhost:${server.address().port}`);

ws.on('open', ws.close);
});
});

Expand Down Expand Up @@ -309,6 +304,16 @@ describe('WebSocketServer', () => {
});
});

it("emits the 'close' event if client tracking is disabled", (done) => {
const wss = new WebSocket.Server({
noServer: true,
clientTracking: false
});

wss.on('close', done);
wss.close();
});

it("emits the 'close' event if the server is already closed", (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
wss.close(() => {
Expand All @@ -324,7 +329,10 @@ describe('WebSocketServer', () => {
it('returns a list of connected clients', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
assert.strictEqual(wss.clients.size, 0);

const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

ws.on('open', ws.close);
});

wss.on('connection', () => {
Expand Down Expand Up @@ -404,17 +412,17 @@ describe('WebSocketServer', () => {
const wss = new WebSocket.Server({ noServer: true });

server.on('upgrade', (req, socket, head) => {
wss.handleUpgrade(req, socket, head, (client) =>
client.send('hello')
);
wss.handleUpgrade(req, socket, head, (ws) => {
ws.send('hello');
ws.close();
});
});

const ws = new WebSocket(`ws://localhost:${server.address().port}`);

ws.on('message', (message, isBinary) => {
assert.deepStrictEqual(message, Buffer.from('hello'));
assert.ok(!isBinary);
wss.close();
server.close(done);
});
});
Expand Down Expand Up @@ -683,6 +691,7 @@ describe('WebSocketServer', () => {

socket.once('data', (chunk) => {
assert.strictEqual(chunk[0], 0x88);
socket.destroy();
wss.close(done);
});
});
Expand Down Expand Up @@ -742,7 +751,6 @@ describe('WebSocketServer', () => {
});

wss.on('connection', () => {
wss.close();
server.close(done);
});

Expand All @@ -751,6 +759,8 @@ describe('WebSocketServer', () => {
headers: { Origin: 'https://example.com', foo: 'bar' },
rejectUnauthorized: false
});

ws.on('open', ws.close);
});
});

Expand All @@ -762,6 +772,8 @@ describe('WebSocketServer', () => {
},
() => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

ws.on('open', ws.close);
}
);

Expand Down Expand Up @@ -959,24 +971,30 @@ describe('WebSocketServer', () => {
wss.close(done);
});
});

wss.on('connection', (ws) => {
ws.close();
});
});
});

it("emits the 'headers' event", (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

wss.on('headers', (headers, request) => {
assert.deepStrictEqual(headers.slice(0, 3), [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade'
]);
assert.ok(request instanceof http.IncomingMessage);
assert.strictEqual(request.url, '/');
ws.on('open', ws.close);
});

wss.on('connection', () => wss.close(done));
});
wss.on('headers', (headers, request) => {
assert.deepStrictEqual(headers.slice(0, 3), [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade'
]);
assert.ok(request instanceof http.IncomingMessage);
assert.strictEqual(request.url, '/');

wss.on('connection', () => wss.close(done));
});
});
});
Expand All @@ -985,6 +1003,8 @@ describe('WebSocketServer', () => {
it('is disabled by default', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

ws.on('open', ws.close);
});

wss.on('connection', (ws, req) => {
Expand Down Expand Up @@ -1016,6 +1036,10 @@ describe('WebSocketServer', () => {
});
}
);

wss.on('connection', (ws) => {
ws.close();
});
});
});
});
Loading

0 comments on commit df7de57

Please sign in to comment.