Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
introduce node cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Oct 12, 2011
1 parent 99757cb commit 87339a2
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 25 deletions.
199 changes: 199 additions & 0 deletions lib/cluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

var assert = require('assert');
var fork = require('child_process').fork;
var net = require('net');
var amMaster; // Used for asserts


var debug;
if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
debug = function(x) {
var prefix = process.pid + ',' +
(process.env.NODE_WORKER_ID ? 'Worker' : 'Master');
console.error(prefix, x);
};
} else {
debug = function() { };
}


// Used in the master:
var ids = 0;
var workers = [];
var servers = {};

// Used in the worker:
var workerId = 0;
var queryIds = 0;
var queryCallbacks = {};


exports.start = function() {
amMaster = true;

if (process.argv.length < 1) {
console.error('Usage: node cluster script.js');
process.exit(1);
}

var args = process.argv.slice(2);
var scriptFilename = args.shift();

var cpus = require('os').cpus().length;
console.error("Detected " + cpus + " cpus");

for (var i = 0; i < cpus; i++) {
forkWorker(scriptFilename, args);
}

process.on('uncaughtException', function(e) {

This comment has been minimized.

Copy link
@windyrobin

windyrobin Oct 13, 2011

I think we shouldn't set the uncaughtException here ,
the same to process.on("message") of worker process
these handler/setting should left to users to decide

for example ,when uncaughtException occurred ,I ususally log a error but not exit

// Quickly try to kill all the workers.
// TODO: be session leader - will cause auto SIGHUP to the children.
for (var id in workers) {
if (workers[id]) {
debug("kill worker " + id);
workers[id].kill();
}
}

console.error("Exception in cluster master process: " +
e.message + '\n' + e.stack);
console.error("Please report this bug.");
process.exit(1);
});
}


function handleWorkerMessage(worker, message) {
assert.ok(amMaster);

debug("recv " + JSON.stringify(message));

switch (message.cmd) {
case 'online':
console.log("Worker " + worker.pid + " online");
workers[message._workerId] = worker;
break;

case 'queryServer':
var key = message.address + ":" +
message.port + ":" +
message.addressType;

This comment has been minimized.

Copy link
@ThisIsMissEm

ThisIsMissEm Oct 12, 2011

Why don't you just use an Array join here? might be a little bit cleaner.

This comment has been minimized.

Copy link
@broofa

broofa Oct 12, 2011

Perfomance concerns of (new Array()).join() .vs. simple string concat? (Not sure if that's an issue here)

var response = { _queryId: message._queryId };

if (key in servers == false) {
// Create a new server.
debug('create new server ' + key);
servers[key] = net._createServerHandle(message.address,
message.port,
message.addressType);
}
worker.send(response, servers[key]);
break;

default:
// Ignore.
break;
}
}


function forkWorker(scriptFilename, args) {
var id = ++ids;
var envCopy = {};

for (var x in process.env) {
envCopy[x] = process.env[x];
}

envCopy['NODE_WORKER_ID'] = id;

var worker = fork(scriptFilename, args, {
env: envCopy
});

worker.on('message', function(message) {
handleWorkerMessage(worker, message);
});

worker.on('exit', function() {
debug('worker id=' + id + ' died');
delete workers[id];
});

return worker;
}


exports.startWorker = function() {
assert.ok(!amMaster);
amMaster = false;
workerId = parseInt(process.env.NODE_WORKER_ID);

queryMaster({ cmd: 'online' });

// Make callbacks from queryMaster()
process.on('message', function(msg, handle) {
debug("recv " + JSON.stringify(msg));
if (msg._queryId && msg._queryId in queryCallbacks) {
var cb = queryCallbacks[msg._queryId];
if (typeof cb == 'function') {
cb(msg, handle);
}
delete queryCallbacks[msg._queryId]
}
});
};


function queryMaster(msg, cb) {
assert.ok(!amMaster);

debug('send ' + JSON.stringify(msg));

// Grab some random queryId
msg._queryId = (++queryIds);
msg._workerId = workerId;

// Store callback for later. Callback called in startWorker.
if (cb) {
queryCallbacks[msg._queryId] = cb;
}

// Send message to master.
process.send(msg);
}


exports.getServer = function(address, port, addressType, cb) {
assert.ok(!amMaster);

queryMaster({
cmd: "queryServer",
address: address,
port: port,
addressType: addressType
}, function(msg, handle) {
cb(handle);
});
};
72 changes: 48 additions & 24 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -606,34 +606,45 @@ exports.Server = Server;
function toPort(x) { return (x = Number(x)) >= 0 ? x : false; }


function listen(self, address, port, addressType) {
var createServerHandle = exports._createServerHandle =
function(address, port, addressType) {
var r = 0;
// assign handle in listen, and clean up if bind or listen fails
var handle =
(port == -1 && addressType == -1) ? createPipe() : createTCP();

if (address || port) {
debug('bind to ' + address);
if (addressType == 6) {
r = handle.bind6(address, port);
} else {
r = handle.bind(address, port);
}
}

if (r) {
handle.close();
handle = null;

process.nextTick(function() {
self.emit('error', errnoException(errno, 'listen'));
});
return;
}

return handle;
};


Server.prototype._listen2 = function(address, port, addressType) {
var self = this;
var r = 0;

// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if (!self._handle) {
// assign handle in listen, and clean up if bind or listen fails
self._handle =
(port == -1 && addressType == -1) ? createPipe() : createTCP();

if (address || port) {
debug('bind to ' + address);
if (addressType == 6) {
r = self._handle.bind6(address, port);
} else {
r = self._handle.bind(address, port);
}
}

if (r) {
self._handle.close();
self._handle = null;

process.nextTick(function() {
self.emit('error', errnoException(errno, 'listen'));
});
return;
}
self._handle = createServerHandle(address, port, addressType);
if (!self._handle) return;
}

self._handle.onconnection = onconnection;
Expand All @@ -644,7 +655,6 @@ function listen(self, address, port, addressType) {
if (r) {
self._handle.close();
self._handle = null;

process.nextTick(function() {
self.emit('error', errnoException(errno, 'listen'));
});
Expand All @@ -657,6 +667,20 @@ function listen(self, address, port, addressType) {
}


function listen(self, address, port, addressType) {
if (process.env.NODE_WORKER_ID) {
require('cluster').getServer(address, port, addressType, function(handle) {
self._handle = handle;
self._listen2(address, port, addressType);
});
} else {
process.nextTick(function() {
self._listen2(address, port, addressType);
});
}
}


Server.prototype.listen = function() {
var self = this;

Expand Down
3 changes: 2 additions & 1 deletion src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2292,7 +2292,8 @@ static void ParseDebugOpt(const char* arg) {

static void PrintHelp() {
printf("Usage: node [options] [ -e script | script.js ] [arguments] \n"
" node debug [ -e script | script.js ] [arguments] \n"
" node debug script.js [arguments] \n"
" node cluster script.js [arguments] \n"
"\n"
"Options:\n"
" -v, --version print node's version\n"
Expand Down
11 changes: 11 additions & 0 deletions src/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
var d = NativeModule.require('_debugger');
d.start();

} else if (process.argv[1] == 'cluster') {
var cluster = NativeModule.require('cluster');
cluster.start();

} else if (process._eval != null) {
// User passed '-e' or '--eval' arguments to Node.
var Module = NativeModule.require('module');
Expand All @@ -84,6 +88,13 @@
var path = NativeModule.require('path');
process.argv[1] = path.resolve(process.argv[1]);

// If this is a worker in cluster mode, start up the communiction
// channel.
if (process.env.NODE_WORKER_ID) {
var cluster = NativeModule.require('cluster');
cluster.startWorker();
}

var Module = NativeModule.require('module');
// REMOVEME: nextTick should not be necessary. This hack to get
// test/simple/test-exception-handler2.js working.
Expand Down

12 comments on commit 87339a2

@ry
Copy link
Author

@ry ry commented on 87339a2 Oct 12, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feedback welcome

@indutny
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too high level for core ;) while changes to net that'll allow 3rd-parties to implement this functionality are welcome.

@indutny
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we'll implement node cluster - we may end up having node proxy, node daemon, and etc someday.

@skenqbx
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like the new net._createServerHandle(), the rest really seems too high level for core.

@aikar
Copy link

@aikar aikar commented on 87339a2 Oct 12, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it may seem high level, Id much rather have an officially supported method of doing it thats maintained and kept in mind going forward.

It also will help resolve some of the criticism node gets.

@chriso
Copy link

@chriso chriso commented on 87339a2 Oct 12, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aikar agreed. From the homepage: "Node's goal is to provide an easy way to build scalable network programs". The cluster functionality supports this goal and should be included

@windyrobin
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@indutny agreed,NodeJs is a framework, not a server like Nginx ,
NodeJs should provide something like video-card ,memory ,disk ,dispaly or something like that to us ,
but not a enclosd "Mac" computer
then the user could compose them freely,

so it should just provide core-interface, for example , process.send (handle) is a good interface ,
if NodeJs provide cluster ,well,when we use it ,maybe it's hard for us to change it freely which maybe a neccesay...

@superstructor
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does something like this really belong in core when there are existing solutions that can be installed via NPM e.g. the excellent learnboost/cluster ? Shouldn't the user have the choice via NPM of the solution they deploy ?

Ryan once tweeted "I imagine software as a functionality vector space in which each feature is a basis vector. The dev's goal is to find a min spanning set."

@ry
Copy link
Author

@ry ry commented on 87339a2 Nov 2, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@superstructor Indeed adding this feature is not increasing the dimensionality of our feature space but the existing feature vector for multiprocess load balancing had a shallow angle to the rest of the space. This vector is much steeper.

@sleeplessinc
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think addressing the criticisms in a practical way are enough justification for it, even if a philosophical reason for keeping it out of the core can be made. There's always a messy point where philosophy and reality have to get married and live together. - j.h.

@superstructor
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for points well made. I concede this may have a place in core after all.

@ry
Copy link
Author

@ry ry commented on 87339a2 Nov 2, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should note that the existing API doesn't work on Windows. So we're not really adding functionality - we're migrating an API to a new platform and making it simpler at the same time.

Please sign in to comment.