Skip to content

Commit

Permalink
lib: add UV_TCP_REUSEPORT for tcp
Browse files Browse the repository at this point in the history
  • Loading branch information
theanarkh committed Oct 17, 2024
1 parent 87da1f3 commit 9615473
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 9 deletions.
8 changes: 8 additions & 0 deletions doc/api/net.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ Listening on a file descriptor is not supported on Windows.
<!-- YAML
added: v0.11.14
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/55408
description: The `reusePort` option is supported.
- version: v15.6.0
pr-url: https://github.com/nodejs/node/pull/36623
description: AbortSignal support was added.
Expand All @@ -487,6 +490,11 @@ changes:
* `ipv6Only` {boolean} For TCP servers, setting `ipv6Only` to `true` will
disable dual-stack support, i.e., binding to host `::` won't make
`0.0.0.0` be bound. **Default:** `false`.
* `reusePort` {boolean} For TCP servers, setting `reusePort` to `true` allows
bind to an identical socket address cross multiple sockets if they all
set `reusePort`. Incoming connections are distributed by OS across the listener
sockets. This flag is available only on some platforms, such as Linux 3.9+,
DragonFlyBSD 3.6+, FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+. **Default:** `false`.
* `path` {string} Will be ignored if `port` is specified. See
[Identifying paths for IPC connections][].
* `port` {number}
Expand Down
22 changes: 16 additions & 6 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,15 @@ const {
} = require('internal/perf/observe');
const { getDefaultHighWaterMark } = require('internal/streams/state');

function getFlags(ipv6Only) {
return ipv6Only === true ? TCPConstants.UV_TCP_IPV6ONLY : 0;
function getFlags(options) {
let flags = 0;
if (options.ipv6Only === true) {
flags |= TCPConstants.UV_TCP_IPV6ONLY;
}
if (options.reusePort === true) {
flags |= TCPConstants.UV_TCP_REUSEPORT;
}
return flags;
}

function createHandle(fd, is_server) {
Expand Down Expand Up @@ -1833,12 +1840,12 @@ function createServerHandle(address, port, addressType, fd, flags) {
if (err) {
handle.close();
// Fallback to ipv4
return createServerHandle(DEFAULT_IPV4_ADDR, port);
return createServerHandle(DEFAULT_IPV4_ADDR, port, undefined, undefined, flags);
}
} else if (addressType === 6) {
err = handle.bind6(address, port, flags);
} else {
err = handle.bind(address, port);
err = handle.bind(address, port, flags);
}
}

Expand Down Expand Up @@ -2022,7 +2029,7 @@ Server.prototype.listen = function(...args) {
toNumber(args.length > 2 && args[2]); // (port, host, backlog)

options = options._handle || options.handle || options;
const flags = getFlags(options.ipv6Only);
const flags = getFlags(options);
// Refresh the id to make the previous call invalid
this._listeningId++;
// (handle[, backlog][, cb]) where handle is an object with a handle
Expand Down Expand Up @@ -2055,14 +2062,17 @@ Server.prototype.listen = function(...args) {
if (typeof options.port === 'number' || typeof options.port === 'string') {
validatePort(options.port, 'options.port');
backlog = options.backlog || backlogFromArgs;
if (options.reusePort === true) {
options.exclusive = true;
}
// start TCP server listening on host:port
if (options.host) {
lookupAndListen(this, options.port | 0, options.host, backlog,
options.exclusive, flags);
} else { // Undefined host, listens on unspecified address
// Default addressType 4 will be used to search for primary server
listenInCluster(this, null, options.port | 0, 4,
backlog, undefined, options.exclusive);
backlog, undefined, options.exclusive, flags);
}
return this;
}
Expand Down
10 changes: 7 additions & 3 deletions src/tcp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ void TCPWrap::Initialize(Local<Object> target,
NODE_DEFINE_CONSTANT(constants, SOCKET);
NODE_DEFINE_CONSTANT(constants, SERVER);
NODE_DEFINE_CONSTANT(constants, UV_TCP_IPV6ONLY);
NODE_DEFINE_CONSTANT(constants, UV_TCP_REUSEPORT);
target->Set(context,
env->constants_string(),
constants).Check();
Expand Down Expand Up @@ -246,9 +247,12 @@ void TCPWrap::Bind(
int port;
unsigned int flags = 0;
if (!args[1]->Int32Value(env->context()).To(&port)) return;
if (family == AF_INET6 &&
!args[2]->Uint32Value(env->context()).To(&flags)) {
return;
if (args.Length() >= 3 && args[2]->IsUint32()) {
if (!args[2]->Uint32Value(env->context()).To(&flags)) return;
// Can not set IPV6 flags on IPV4 socket
if (family == AF_INET) {
flags &= ~UV_TCP_IPV6ONLY;
}
}

T addr;
Expand Down
24 changes: 24 additions & 0 deletions test/common/net.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';
const net = require('net');

const options = { port: 0, reusePort: true };

function checkSupportReusePort() {
return new Promise((resolve, reject) => {
const server = net.createServer().listen(options);
server.on('listening', () => {
server.close();
resolve();
});
server.on('error', (err) => {
console.log('don not support reusePort flag: ', err.message);
server.close();
reject();
});
});
}

module.exports = {
checkSupportReusePort,
options,
};
36 changes: 36 additions & 0 deletions test/parallel/test-child-process-net-reuseport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict';
const common = require('../common');
const { checkSupportReusePort, options } = require('../common/net');
const assert = require('assert');
const child_process = require('child_process');
const net = require('net');

if (!process.env.isWorker) {
checkSupportReusePort().then(() => {
const server = net.createServer();
server.listen(options, common.mustCall(() => {
const port = server.address().port;
const workerOptions = { env: { 'isWorker': 1, port } };
let count = 2;
for (let i = 0; i < 2; i++) {
const worker = child_process.fork(__filename, workerOptions);
worker.on('exit', common.mustCall((code) => {
assert.strictEqual(code, 0);
if (--count === 0) {
server.close();
}
}));
}
}));
server.on('error', common.mustNotCall());
}, process.exit);
return;
}

const server = net.createServer();

server.listen({ ...options, port: +process.env.port }, common.mustCall(() => {
server.close(common.mustCall(() => {
process.exit(0);
}));
})).on('error', common.mustNotCall());
39 changes: 39 additions & 0 deletions test/parallel/test-cluster-net-reuseport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';
const common = require('../common');

const { checkSupportReusePort, options } = require('../common/net');
const assert = require('assert');
const cluster = require('cluster');
const net = require('net');

if (cluster.isPrimary) {
checkSupportReusePort().then(() => {
cluster.fork().on('exit', common.mustCall((code) => {
assert.strictEqual(code, 0);
}));
}, process.exit);
return;
}

let waiting = 2;
function close() {
if (--waiting === 0)
cluster.worker.disconnect();
}

const server1 = net.createServer();
const server2 = net.createServer();

// Test if the worker requests the main process to create a socket
cluster._getServer = common.mustNotCall();

server1.listen(options, common.mustCall(() => {
const port = server1.address().port;
server2.listen({ ...options, port }, common.mustCall(() => {
server1.close(close);
server2.close(close);
}));
}));

server1.on('error', common.mustNotCall());
server2.on('error', common.mustNotCall());
24 changes: 24 additions & 0 deletions test/parallel/test-net-reuseport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';
const common = require('../common');
const { checkSupportReusePort, options } = require('../common/net');
const net = require('net');

function test(host) {
const server1 = net.createServer();
const server2 = net.createServer();
server1.listen({ ...options, host }, common.mustCall(() => {
const port = server1.address().port;
server2.listen({ ...options, host, port }, common.mustCall(() => {
server1.close();
server2.close();
}));
}));
server1.on('error', common.mustNotCall());
server2.on('error', common.mustNotCall());
}

checkSupportReusePort()
.then(() => {
test();
common.hasIPv6 && test('::');
}, process.exit);

0 comments on commit 9615473

Please sign in to comment.