Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful Shutdown [isolated commits] #121

Closed
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 46 additions & 11 deletions lib/kue.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ exports.createQueue = function(){

function Queue() {
this.client = redis.createClient();
this.workers = [];
}

/**
Expand Down Expand Up @@ -179,23 +180,57 @@ Queue.prototype.setting = function(name, fn){
*/

Queue.prototype.process = function(type, n, fn){
var self = this;

if ('function' == typeof n) fn = n, n = 1;

while (n--) {
(function(worker){
worker.on('error', function(err){
self.emit('error', err);
});

worker.on('job complete', function(job){
self.client.incrby('q:stats:work-time', job.duration);
});
})(new Worker(this, type).start(fn));
var worker = new Worker(this, type).start(fn);

worker.on('error', function(err){
this.emit('error', err);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, node binds this to worker here so this will be cyclic


worker.on('job complete', function(job){
this.client.incrby('q:stats:work-time', job.duration);
});

// Save worker so we can access it later
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thing, we'll need to remove the completed jobs from this.workers otherwise we'll leak a ton of mem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I'm being dense, but I don't see the leak. Won't each worker be GC'd when Queue is destroyed? Each Worker sets this.job to null as soon as the job is done (whether failed or complete), so those should get GC'd right away.

I'm probably just missing it. I don't have as strong an intuition about Node/JS memory management as I'd like (and I couldn't get v8-profiler to work on my machine :-/)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean because we're holding a reference to every single one until the process goes down

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest commit sets this.workers = [] when all workers have shut down. does that do the trick?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm actually im on crack, this should be fine

this.workers.push(worker);
}
};

/**
* Graceful shutdown
*
* @param {Function} fn callback
* @return {Queue} for chaining
* @api public
*/

Queue.prototype.shutdown = function(fn, timeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline after */ :D

var origFn = fn || function(){}
, n = this.workers.length;
, self = this

// Wrap `fn` to only call after all workers finished
fn = function(err) {
if (err) return origFn(err);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will cause issues if you have multiple errors, we should use
https://github.com/visionmedia/batch

to delegate

if (! --n) {
self.workers = [];
origFn.apply(null, arguments);
}
}

// Shut down workers 1 by 1
this.workers.forEach(function(worker) {
worker.shutdown(function(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nitpick worker.shutdown(fn) would work fine since we're just delegating

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True story. Changed it.

if (err) return fn(err);
fn();
}, timeout);
});

return this;
};

/**
* Get the job types present and callback `fn(err, types)`.
*
Expand Down
35 changes: 35 additions & 0 deletions lib/queue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ function Worker(queue, type) {
this.type = type;
this.client = Worker.client || (Worker.client = redis.createClient());
this.interval = 1000;
this.running = true;
this.job = null;
}

/**
Expand All @@ -59,6 +61,7 @@ Worker.prototype.__proto__ = EventEmitter.prototype;

Worker.prototype.start = function(fn){
var self = this;
if (!this.running) return;
self.getJob(function(err, job){
if (err) self.error(err, job);
if (!job || err) return process.nextTick(function(){ self.start(fn); });
Expand Down Expand Up @@ -123,7 +126,9 @@ Worker.prototype.process = function(job, fn){
var self = this
, start = new Date;
job.active();
this.job = job;
fn(job, function(err){
self.job = null;
if (err) return self.failed(job, err, fn);
job.complete();
job.set('duration', job.duration = new Date - start);
Expand Down Expand Up @@ -177,3 +182,33 @@ Worker.prototype.getJob = function(fn){
});
});
};

/**
* Gracefully shut down the worker
*
* @param {Function} fn
* @api private
*/

Worker.prototype.shutdown = function(fn, timeout) {
var self = this;

if (!this.running) return fn();
this.running = false;

// Close redis connection (if zpopping, break it)
this.client.end();

// As soon as we're free, signal that we're done
if (!this.job) return fn();
this.once('job complete', fn);

if (timeout) {
setTimeout(function() {
self.job.failed();
self.job = null;
fn();
}, timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depending on the application and what you're doing in the shutdown callback this might be a potential issue,
because we're not flagging that the callback has occurred. After invoking fn() we might want to fn = function(){}
so that when "job complete" etc still figure you're not getting the callback invoked a bunch more, though this might
not be an issue for most if people are just exiting etc

}
};