-
Notifications
You must be signed in to change notification settings - Fork 867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful Shutdown [isolated commits] #121
Changes from 8 commits
6c15599
39f5b9d
5ba44d3
1f0c176
7c13b83
93d4dde
5a34b72
0b19220
b7e1120
b724235
363efe6
9cc34fa
147837c
3ddc35a
034cef6
4184b28
0149898
504f53a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,6 +74,7 @@ exports.createQueue = function(){ | |
|
||
function Queue() { | ||
this.client = redis.createClient(); | ||
this.workers = []; | ||
} | ||
|
||
/** | ||
|
@@ -179,23 +180,56 @@ Queue.prototype.setting = function(name, fn){ | |
*/ | ||
|
||
Queue.prototype.process = function(type, n, fn){ | ||
var self = this; | ||
|
||
if ('function' == typeof n) fn = n, n = 1; | ||
|
||
while (n--) { | ||
(function(worker){ | ||
worker.on('error', function(err){ | ||
self.emit('error', err); | ||
}); | ||
|
||
worker.on('job complete', function(job){ | ||
self.client.incrby('q:stats:work-time', job.duration); | ||
}); | ||
})(new Worker(this, type).start(fn)); | ||
var worker = new Worker(this, type).start(fn); | ||
|
||
worker.on('error', function(err){ | ||
this.emit('error', err); | ||
}); | ||
|
||
worker.on('job complete', function(job){ | ||
this.client.incrby('q:stats:work-time', job.duration); | ||
}); | ||
|
||
// Save worker so we can access it later | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one more thing, we'll need to remove the completed jobs from this.workers otherwise we'll leak a ton of mem There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry if I'm being dense, but I don't see the leak. Won't each worker be GC'd when Queue is destroyed? Each Worker sets I'm probably just missing it. I don't have as strong an intuition about Node/JS memory management as I'd like (and I couldn't get v8-profiler to work on my machine :-/) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just mean because we're holding a reference to every single one until the process goes down There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Latest commit sets There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nvm actually im on crack, this should be fine |
||
this.workers.push(worker); | ||
} | ||
}; | ||
|
||
/** | ||
* Graceful shutdown | ||
* | ||
* @param {Function} fn callback | ||
* @return {Queue} for chaining | ||
* @api public | ||
*/ | ||
|
||
Queue.prototype.shutdown = function(fn, timeout) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. newline after |
||
var origFn = fn || function(){} | ||
, n = this.workers.length; | ||
|
||
// Wrap `fn` to only call after all workers finished | ||
fn = function(err) { | ||
if (err) return origFn(err); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this will cause issues if you have multiple errors, we should use to delegate |
||
if (! --n) { | ||
this.workers = []; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oops. you're right. fixed. |
||
origFn.apply(null, arguments); | ||
} | ||
} | ||
|
||
// Shut down workers 1 by 1 | ||
this.workers.forEach(function(worker) { | ||
worker.shutdown(function(err) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. small nitpick There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True story. Changed it. |
||
if (err) return fn(err); | ||
fn(); | ||
}, timeout); | ||
}); | ||
|
||
return this; | ||
}; | ||
|
||
/** | ||
* Get the job types present and callback `fn(err, types)`. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,8 @@ function Worker(queue, type) { | |
this.type = type; | ||
this.client = Worker.client || (Worker.client = redis.createClient()); | ||
this.interval = 1000; | ||
this.running = true; | ||
this.job = null; | ||
} | ||
|
||
/** | ||
|
@@ -59,6 +61,7 @@ Worker.prototype.__proto__ = EventEmitter.prototype; | |
|
||
Worker.prototype.start = function(fn){ | ||
var self = this; | ||
if (!this.running) return; | ||
self.getJob(function(err, job){ | ||
if (err) self.error(err, job); | ||
if (!job || err) return process.nextTick(function(){ self.start(fn); }); | ||
|
@@ -123,7 +126,9 @@ Worker.prototype.process = function(job, fn){ | |
var self = this | ||
, start = new Date; | ||
job.active(); | ||
this.job = job; | ||
fn(job, function(err){ | ||
self.job = null; | ||
if (err) return self.failed(job, err, fn); | ||
job.complete(); | ||
job.set('duration', job.duration = new Date - start); | ||
|
@@ -177,3 +182,33 @@ Worker.prototype.getJob = function(fn){ | |
}); | ||
}); | ||
}; | ||
|
||
/** | ||
* Gracefully shut down the worker | ||
* | ||
* @param {Function} fn | ||
* @api private | ||
*/ | ||
|
||
Worker.prototype.shutdown = function(fn, timeout) { | ||
var self = this; | ||
|
||
if (!this.running) return fn(); | ||
this.running = false; | ||
|
||
// Close redis connection (if zpopping, break it) | ||
this.client.end(); | ||
|
||
// As soon as we're free, signal that we're done | ||
if (!this.job) return fn(); | ||
this.once('job complete', fn); | ||
|
||
if (timeout) { | ||
setTimeout(function() { | ||
self.job.failed(); | ||
self.job = null; | ||
fn(); | ||
}, timeout); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. depending on the application and what you're doing in the shutdown callback this might be a potential issue, |
||
} | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, node binds
this
toworker
here so this will be cyclic