Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: allow shared reused dgram sockets #2548

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Worker.prototype.isConnected = function isConnected() {

// Master/worker specific methods are defined in the *Init() functions.

function SharedHandle(key, address, port, addressType, backlog, fd) {
function SharedHandle(key, address, port, addressType, backlog, fd, flags) {
this.key = key;
this.workers = [];
this.handle = null;
Expand All @@ -66,7 +66,7 @@ function SharedHandle(key, address, port, addressType, backlog, fd) {
// FIXME(bnoordhuis) Polymorphic return type for lack of a better solution.
var rval;
if (addressType === 'udp4' || addressType === 'udp6')
rval = dgram._createSocketHandle(address, port, addressType, fd);
rval = dgram._createSocketHandle(address, port, addressType, fd, flags);
else
rval = net._createServerHandle(address, port, addressType, fd);

Expand Down Expand Up @@ -438,7 +438,8 @@ function masterInit() {
var args = [message.address,
message.port,
message.addressType,
message.fd];
message.fd,
message.index];
var key = args.join(':');
var handle = handles[key];
if (handle === undefined) {
Expand All @@ -456,7 +457,8 @@ function masterInit() {
message.port,
message.addressType,
message.backlog,
message.fd);
message.fd,
message.flags);
}
if (!handle.data) handle.data = message.data;

Expand Down Expand Up @@ -485,7 +487,7 @@ function masterInit() {
cluster.emit('listening', worker, info);
}

// Round-robin only. Server in worker is closing, remove from list.
// Server in worker is closing, remove from list.
function close(worker, message) {
var key = message.key;
var handle = handles[key];
Expand All @@ -500,6 +502,7 @@ function masterInit() {

function workerInit() {
var handles = {};
var indexes = {};

// Called from src/node.js
cluster._setupWorker = function() {
Expand Down Expand Up @@ -528,15 +531,22 @@ function workerInit() {
};

// obj is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, address, port, addressType, fd, cb) {
var message = {
addressType: addressType,
address: address,
port: port,
cluster._getServer = function(obj, options, cb) {
var key = [ options.address,
options.port,
options.addressType,
options.fd ].join(':');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const key = ...?

if (indexes[key] === undefined)
indexes[key] = 0;
else
indexes[key]++;

var message = util._extend({
act: 'queryServer',
fd: fd,
index: indexes[key],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid the extra lookup if you write the initial one like this: const index = indexes[key] = (indexes[key] || -1) + 1;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No extra lookup, v8 is using GVN! :) Also, I'm kind of against a = b = c.

data: null
};
}, options);

// Set custom data on handle (i.e. tls tickets key)
if (obj._getServerData) message.data = obj._getServerData();
send(message, function(reply, handle) {
Expand All @@ -551,7 +561,7 @@ function workerInit() {
cluster.worker.state = 'listening';
var address = obj.address();
message.act = 'listening';
message.port = address && address.port || port;
message.port = address && address.port || options.port;
send(message);
});
};
Expand All @@ -563,6 +573,7 @@ function workerInit() {
// closed. Avoids resource leaks when the handle is short-lived.
var close = handle.close;
handle.close = function() {
send({ act: 'close', key: key });
delete handles[key];
return close.apply(this, arguments);
};
Expand Down
23 changes: 15 additions & 8 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ function newHandle(type) {
}


exports._createSocketHandle = function(address, port, addressType, fd) {
exports._createSocketHandle = function(address, port, addressType, fd, flags) {
// Opening an existing fd is not supported for UDP handles.
assert(typeof fd !== 'number' || fd < 0);

var handle = newHandle(addressType);

if (port || address) {
var err = handle.bind(address, port || 0, 0);
var err = handle.bind(address, port || 0, flags);
if (err) {
handle.close();
return err;
Expand Down Expand Up @@ -176,8 +176,12 @@ Socket.prototype.bind = function(port /*, address, callback*/) {
if (!cluster)
cluster = require('cluster');

var flags = 0;
if (self._reuseAddr)
flags |= constants.UV_UDP_REUSEADDR;

if (cluster.isWorker && !exclusive) {
cluster._getServer(self, ip, port, self.type, -1, function(err, handle) {
function onHandle(err, handle) {
if (err) {
var ex = exceptionWithHostPort(err, 'bind', ip, port);
self.emit('error', ex);
Expand All @@ -191,16 +195,19 @@ Socket.prototype.bind = function(port /*, address, callback*/) {

replaceHandle(self, handle);
startListening(self);
});
}
cluster._getServer(self, {
address: ip,
port: port,
addressType: self.type,
fd: -1,
flags: flags
}, onHandle);

} else {
if (!self._handle)
return; // handle has been closed in the mean time

var flags = 0;
if (self._reuseAddr)
flags |= constants.UV_UDP_REUSEADDR;

var err = self._handle.bind(ip, port || 0, flags);
if (err) {
var ex = exceptionWithHostPort(err, 'bind', ip, port);
Expand Down
8 changes: 7 additions & 1 deletion lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,13 @@ function listen(self, address, port, addressType, backlog, fd, exclusive) {
return;
}

cluster._getServer(self, address, port, addressType, fd, cb);
cluster._getServer(self, {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags: 0
}, cb);

function cb(err, handle) {
// EADDRINUSE may not be reported until we call listen(). To complicate
Expand Down
34 changes: 34 additions & 0 deletions test/parallel/test-cluster-dgram-reuse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict';
var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
var dgram = require('dgram');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const?


if (cluster.isMaster) {
cluster.fork().on('exit', function(code) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common.mustCall(function(code) {

assert.equal(code, 0);
});
return;
}

var sockets = [];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const?

function next() {
sockets.push(this);
if (sockets.length !== 2)
return;

// Work around health check issue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment could go into a little more detail.

process.nextTick(function() {
for (var i = 0; i < sockets.length; i++)
sockets[i].close(close);
});
}

var waiting = 2;
function close() {
if (--waiting === 0)
cluster.worker.disconnect();
}

for (var i = 0; i < 2; i++)
dgram.createSocket({ type: 'udp4', reuseAddr: true }).bind(common.PORT, next);