Skip to content

Commit

Permalink
src,lib: the handle keeps loop alive in cluster rr mode
Browse files Browse the repository at this point in the history
PR-URL: #46161
Reviewed-By: Ben Noordhuis <[email protected]>
Reviewed-By: Minwoo Jung <[email protected]>
  • Loading branch information
theanarkh authored and juanarbol committed Jan 26, 2023
1 parent c7abebf commit aebdef8
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 7 deletions.
34 changes: 27 additions & 7 deletions lib/internal/cluster/child.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const EventEmitter = require('events');
const { owner_symbol } = require('internal/async_hooks').symbols;
const Worker = require('internal/cluster/worker');
const { internal, sendHelper } = require('internal/cluster/utils');
const { TIMEOUT_MAX } = require('internal/timers');
const { setInterval, clearInterval } = require('timers');

const cluster = new EventEmitter();
const handles = new SafeMap();
const indexes = new SafeMap();
Expand Down Expand Up @@ -160,6 +163,21 @@ function rr(message, { indexesKey, index }, cb) {

let key = message.key;

let fakeHandle = null;

function ref() {
if (!fakeHandle) {
fakeHandle = setInterval(noop, TIMEOUT_MAX);
}
}

function unref() {
if (fakeHandle) {
clearInterval(fakeHandle);
fakeHandle = null;
}
}

function listen(backlog) {
// TODO(bnoordhuis) Send a message to the primary that tells it to
// update the backlog size. The actual backlog should probably be
Expand All @@ -175,7 +193,11 @@ function rr(message, { indexesKey, index }, cb) {
// the primary.
if (key === undefined)
return;

unref();
// If the handle is the last handle in process,
// the parent process will delete the handle when worker process exits.
// So it is ok if the close message get lost.
// See the comments of https://github.com/nodejs/node/pull/46161
send({ act: 'close', key });
handles.delete(key);
removeIndexesKey(indexesKey, index);
Expand All @@ -189,12 +211,10 @@ function rr(message, { indexesKey, index }, cb) {
return 0;
}

// Faux handle. Mimics a TCPWrap with just enough fidelity to get away
// with it. Fools net.Server into thinking that it's backed by a real
// handle. Use a noop function for ref() and unref() because the control
// channel is going to keep the worker alive anyway.
const handle = { close, listen, ref: noop, unref: noop };

// Faux handle. net.Server is not associated with handle,
// so we control its state(ref or unref) by setInterval.
const handle = { close, listen, ref, unref };
handle.ref();
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
Expand Down
18 changes: 18 additions & 0 deletions test/parallel/test-cluster-rr-handle-close.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict';

const common = require('../common');
const cluster = require('cluster');
const net = require('net');

cluster.schedulingPolicy = cluster.SCHED_RR;

if (cluster.isPrimary) {
const worker = cluster.fork();
worker.on('exit', common.mustCall());
} else {
const server = net.createServer(common.mustNotCall());
server.listen(0, common.mustCall(() => {
process.channel.unref();
server.close();
}));
}
23 changes: 23 additions & 0 deletions test/parallel/test-cluster-rr-handle-keep-loop-alive.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';

const common = require('../common');
const cluster = require('cluster');
const net = require('net');
const assert = require('assert');

cluster.schedulingPolicy = cluster.SCHED_RR;

if (cluster.isPrimary) {
let exited = false;
const worker = cluster.fork();
worker.on('exit', () => {
exited = true;
});
setTimeout(() => {
assert.ok(!exited);
worker.kill();
}, 3000);
} else {
const server = net.createServer(common.mustNotCall());
server.listen(0, common.mustCall(() => process.channel.unref()));
}
20 changes: 20 additions & 0 deletions test/parallel/test-cluster-rr-handle-ref-unref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
'use strict';

const common = require('../common');
const cluster = require('cluster');
const net = require('net');

cluster.schedulingPolicy = cluster.SCHED_RR;

if (cluster.isPrimary) {
const worker = cluster.fork();
worker.on('exit', common.mustCall());
} else {
const server = net.createServer(common.mustNotCall());
server.listen(0, common.mustCall(() => {
server.ref();
server.unref();
process.channel.unref();
}));
server.unref();
}

0 comments on commit aebdef8

Please sign in to comment.