Skip to content

Commit

Permalink
cluster: fix edge cases that throw ERR_INTERNAL_ASSERTION
Browse files Browse the repository at this point in the history
Some cases use both `cluster` and `net`/`cluser` will throw
ERR_INTERNAL_ASSERTION when `listen`/`bind` to the port of `0`. This
PR maitains a separate map of the index to fix the issue. See the new
tests added for the detail cases.

PR-URL: #36764
Reviewed-By: Antoine du Hamel <[email protected]>
  • Loading branch information
oyyd authored and aduh95 committed Jan 11, 2021
1 parent 6b18987 commit 8e3f606
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 13 deletions.
39 changes: 26 additions & 13 deletions lib/internal/cluster/child.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
ObjectAssign,
ReflectApply,
SafeMap,
SafeSet,
} = primordials;

const assert = require('internal/assert');
Expand Down Expand Up @@ -74,14 +75,14 @@ cluster._getServer = function(obj, options, cb) {
options.fd,
], ':');

let index = indexes.get(indexesKey);
let indexSet = indexes.get(indexesKey);

if (index === undefined)
index = 0;
else
index++;

indexes.set(indexesKey, index);
if (indexSet === undefined) {
indexSet = { nextIndex: 0, set: new SafeSet() };
indexes.set(indexesKey, indexSet);
}
const index = indexSet.nextIndex++;
indexSet.set.add(index);

const message = {
act: 'queryServer',
Expand All @@ -101,9 +102,9 @@ cluster._getServer = function(obj, options, cb) {
obj._setServerData(reply.data);

if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
shared(reply, handle, indexesKey, index, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
rr(reply, indexesKey, index, cb); // Round-robin.
});

obj.once('listening', () => {
Expand All @@ -115,8 +116,20 @@ cluster._getServer = function(obj, options, cb) {
});
};

function removeIndexesKey(indexesKey, index) {
const indexSet = indexes.get(indexesKey);
if (!indexSet) {
return;
}

indexSet.set.delete(index);
if (indexSet.set.size === 0) {
indexes.delete(indexesKey);
}
}

// Shared listen socket.
function shared(message, handle, indexesKey, cb) {
function shared(message, handle, indexesKey, index, cb) {
const key = message.key;
// Monkey-patch the close() method so we can keep track of when it's
// closed. Avoids resource leaks when the handle is short-lived.
Expand All @@ -125,7 +138,7 @@ function shared(message, handle, indexesKey, cb) {
handle.close = function() {
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
removeIndexesKey(indexesKey, index);
return ReflectApply(close, handle, arguments);
};
assert(handles.has(key) === false);
Expand All @@ -134,7 +147,7 @@ function shared(message, handle, indexesKey, cb) {
}

// Round-robin. Primary distributes handles across workers.
function rr(message, indexesKey, cb) {
function rr(message, indexesKey, index, cb) {
if (message.errno)
return cb(message.errno, null);

Expand All @@ -158,7 +171,7 @@ function rr(message, indexesKey, cb) {

send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
removeIndexesKey(indexesKey, index);
key = undefined;
}

Expand Down
40 changes: 40 additions & 0 deletions test/parallel/test-cluster-child-index-dgram.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';
const common = require('../common');
const Countdown = require('../common/countdown');
if (common.isWindows)
common.skip('dgram clustering is currently not supported on Windows.');

const cluster = require('cluster');
const dgram = require('dgram');

// Test an edge case when using `cluster` and `dgram.Socket.bind()`
// the port of `0`.
const kPort = 0;

function child() {
const kTime = 2;
const countdown = new Countdown(kTime * 2, () => {
process.exit(0);
});
for (let i = 0; i < kTime; i += 1) {
const socket = new dgram.Socket('udp4');
socket.bind(kPort, common.mustCall(() => {
// `process.nextTick()` or `socket2.close()` would throw
// ERR_SOCKET_DGRAM_NOT_RUNNING
process.nextTick(() => {
socket.close(countdown.dec());
const socket2 = new dgram.Socket('udp4');
socket2.bind(kPort, common.mustCall(() => {
process.nextTick(() => {
socket2.close(countdown.dec());
});
}));
});
}));
}
}

if (cluster.isMaster)
cluster.fork(__filename);
else
child();
31 changes: 31 additions & 0 deletions test/parallel/test-cluster-child-index-net.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict';
const common = require('../common');
const Countdown = require('../common/countdown');
const cluster = require('cluster');
const net = require('net');

// Test an edge case when using `cluster` and `net.Server.listen()` to
// the port of `0`.
const kPort = 0;

function child() {
const kTime = 2;
const countdown = new Countdown(kTime * 2, () => {
process.exit(0);
});
for (let i = 0; i < kTime; i += 1) {
const server = net.createServer();
server.listen(kPort, common.mustCall(() => {
server.close(countdown.dec());
const server2 = net.createServer();
server2.listen(kPort, common.mustCall(() => {
server2.close(countdown.dec());
}));
}));
}
}

if (cluster.isMaster)
cluster.fork(__filename);
else
child();

0 comments on commit 8e3f606

Please sign in to comment.