-
Notifications
You must be signed in to change notification settings - Fork 29.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cluster: refactor module into multiple files #10746
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
'use strict'; | ||
const assert = require('assert'); | ||
const util = require('util'); | ||
const EventEmitter = require('events'); | ||
const Worker = require('internal/cluster/worker'); | ||
const { internal, sendHelper } = require('internal/cluster/utils'); | ||
const cluster = new EventEmitter(); | ||
const handles = {}; | ||
const indexes = {}; | ||
const noop = () => {}; | ||
|
||
module.exports = cluster; | ||
|
||
cluster.isWorker = true; | ||
cluster.isMaster = false; | ||
cluster.worker = null; | ||
cluster.Worker = Worker; | ||
|
||
cluster._setupWorker = function() { | ||
const worker = new Worker({ | ||
id: +process.env.NODE_UNIQUE_ID | 0, | ||
process: process, | ||
state: 'online' | ||
}); | ||
|
||
cluster.worker = worker; | ||
|
||
process.once('disconnect', () => { | ||
worker.emit('disconnect'); | ||
|
||
if (!worker.exitedAfterDisconnect) { | ||
// Unexpected disconnect, master exited, or some such nastiness, so | ||
// worker exits immediately. | ||
process.exit(0); | ||
} | ||
}); | ||
|
||
process.on('internalMessage', internal(worker, onmessage)); | ||
send({ act: 'online' }); | ||
|
||
function onmessage(message, handle) { | ||
if (message.act === 'newconn') | ||
onconnection(message, handle); | ||
else if (message.act === 'disconnect') | ||
_disconnect.call(worker, true); | ||
} | ||
}; | ||
|
||
// obj is a net#Server or a dgram#Socket object. | ||
cluster._getServer = function(obj, options, cb) { | ||
const indexesKey = [options.address, | ||
options.port, | ||
options.addressType, | ||
options.fd ].join(':'); | ||
|
||
if (indexes[indexesKey] === undefined) | ||
indexes[indexesKey] = 0; | ||
else | ||
indexes[indexesKey]++; | ||
|
||
const message = util._extend({ | ||
act: 'queryServer', | ||
index: indexes[indexesKey], | ||
data: null | ||
}, options); | ||
|
||
// Set custom data on handle (i.e. tls tickets key) | ||
if (obj._getServerData) | ||
message.data = obj._getServerData(); | ||
|
||
send(message, (reply, handle) => { | ||
if (typeof obj._setServerData === 'function') | ||
obj._setServerData(reply.data); | ||
|
||
if (handle) | ||
shared(reply, handle, indexesKey, cb); // Shared listen socket. | ||
else | ||
rr(reply, indexesKey, cb); // Round-robin. | ||
}); | ||
|
||
obj.once('listening', () => { | ||
cluster.worker.state = 'listening'; | ||
const address = obj.address(); | ||
message.act = 'listening'; | ||
message.port = address && address.port || options.port; | ||
send(message); | ||
}); | ||
}; | ||
|
||
// Shared listen socket. | ||
function shared(message, handle, indexesKey, cb) { | ||
const key = message.key; | ||
// Monkey-patch the close() method so we can keep track of when it's | ||
// closed. Avoids resource leaks when the handle is short-lived. | ||
const close = handle.close; | ||
|
||
handle.close = function() { | ||
send({ act: 'close', key }); | ||
delete handles[key]; | ||
delete indexes[indexesKey]; | ||
return close.apply(this, arguments); | ||
}; | ||
assert(handles[key] === undefined); | ||
handles[key] = handle; | ||
cb(message.errno, handle); | ||
} | ||
|
||
// Round-robin. Master distributes handles across workers. | ||
function rr(message, indexesKey, cb) { | ||
if (message.errno) | ||
return cb(message.errno, null); | ||
|
||
var key = message.key; | ||
|
||
function listen(backlog) { | ||
// TODO(bnoordhuis) Send a message to the master that tells it to | ||
// update the backlog size. The actual backlog should probably be | ||
// the largest requested size by any worker. | ||
return 0; | ||
} | ||
|
||
function close() { | ||
// lib/net.js treats server._handle.close() as effectively synchronous. | ||
// That means there is a time window between the call to close() and | ||
// the ack by the master process in which we can still receive handles. | ||
// onconnection() below handles that by sending those handles back to | ||
// the master. | ||
if (key === undefined) | ||
return; | ||
|
||
send({ act: 'close', key }); | ||
delete handles[key]; | ||
delete indexes[indexesKey]; | ||
key = undefined; | ||
} | ||
|
||
function getsockname(out) { | ||
if (key) | ||
util._extend(out, message.sockname); | ||
|
||
return 0; | ||
} | ||
|
||
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away | ||
// with it. Fools net.Server into thinking that it's backed by a real | ||
// handle. Use a noop function for ref() and unref() because the control | ||
// channel is going to keep the worker alive anyway. | ||
const handle = { close, listen, ref: noop, unref: noop }; | ||
|
||
if (message.sockname) { | ||
handle.getsockname = getsockname; // TCP handles only. | ||
} | ||
|
||
assert(handles[key] === undefined); | ||
handles[key] = handle; | ||
cb(0, handle); | ||
} | ||
|
||
// Round-robin connection. | ||
function onconnection(message, handle) { | ||
const key = message.key; | ||
const server = handles[key]; | ||
const accepted = server !== undefined; | ||
|
||
send({ ack: message.seq, accepted }); | ||
|
||
if (accepted) | ||
server.onconnection(0, handle); | ||
} | ||
|
||
function send(message, cb) { | ||
return sendHelper(process, message, null, cb); | ||
} | ||
|
||
function _disconnect(masterInitiated) { | ||
this.exitedAfterDisconnect = true; | ||
let waitingCount = 1; | ||
|
||
function checkWaitingCount() { | ||
waitingCount--; | ||
|
||
if (waitingCount === 0) { | ||
// If disconnect is worker initiated, wait for ack to be sure | ||
// exitedAfterDisconnect is properly set in the master, otherwise, if | ||
// it's master initiated there's no need to send the | ||
// exitedAfterDisconnect message | ||
if (masterInitiated) { | ||
process.disconnect(); | ||
} else { | ||
send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); | ||
} | ||
} | ||
} | ||
|
||
for (const key in handles) { | ||
const handle = handles[key]; | ||
delete handles[key]; | ||
waitingCount++; | ||
|
||
if (handle.owner) | ||
handle.owner.close(checkWaitingCount); | ||
else | ||
handle.close(checkWaitingCount); | ||
} | ||
|
||
checkWaitingCount(); | ||
} | ||
|
||
// Extend generic Worker with methods specific to worker processes. | ||
Worker.prototype.disconnect = function() { | ||
_disconnect.call(this); | ||
return this; | ||
}; | ||
|
||
Worker.prototype.destroy = function() { | ||
this.exitedAfterDisconnect = true; | ||
|
||
if (!this.isConnected()) { | ||
process.exit(0); | ||
} else { | ||
send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); | ||
process.once('disconnect', () => process.exit(0)); | ||
} | ||
}; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sort requires?