Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QoS extensions #105

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ node_modules
*.sock
testing
lib/http/public/stylesheets/main.css

.project
.settings/
40 changes: 40 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
## Features

- delayed jobs
- auto-restart of stuck/crashed jobs
- job event and progress pubsub
- rich integrated UI
- infinite scrolling
Expand Down Expand Up @@ -90,13 +91,31 @@ job.log('$%d sent to %s', amount, user.name);
job.progress(frames, totalFrames);
```

### Job Heartbeat

Jobs, especially those that are long running, can get stuck or terminated without completing. The heartbeat is used to ensure that jobs get restarted if this happens. To set a heartbeat interval for a job when the job is created, simply invoke `job.heartbeat(ms)`:

```js
job.heartbeat(60000).restarts(2); // heart must beat at least every 60s, and will only be restarted twice (default is one restart)
```

During job execution, invoke `job.heartbeat()` at least as often as the heart beat interval (shorter is fine, longer will trigger a restart).

When using heartbeats, the watchdog that checks every job's heart beat must be started. The watchdog will check for dead jobs every `Queue#watchdoc(ms)`, defaulting to a check every 5 seconds.

```js
jobs.watchdog();
```
When a dead job is restarted, the `restarted` event is fired and it is moved back to the `inactive` state. If too many restarts have occurred, the job is moved to the `failed` state and the `failed` event is triggered.

### Job Events

Job-specific events are fired on the `Job` instances via Redis pubsub. The following events are currently supported:

- `failed` the job has failed
- `complete` the job has completed
- `promotion` the job (when delayed) is now queued
- `restarted` the job has been restarted
- `progress` the job's progress ranging from 0-100

For example this may look something like the following:
Expand Down Expand Up @@ -155,6 +174,24 @@ When using delayed jobs, we must also check the delayed jobs with a timer, promo
jobs.promote();
```

### Job serialization

In some cases, two related jobs can't be processed at the same time (regardless of the worker). Jobs that are being held back because another job is executing are in the `staged` state. To handle this, jobs can be put in named groups, where members of the group are staged - only one executing at a time across all workers (even when workers are distributed or handling different queues). This is controlled by the `.serialize(name)` method.

```js
var rjob1 = jobs.create('sellstocks', {
stocks: [ 'ORCL', 'MSFT' ]
}).serialize('[email protected]').save(); // ensure this user can't buy and sell stocks at the same time

var rjob2 = jobs.create('buystocks', {
stocks: [ 'FB', 'LNKD' ]
}).serialize('[email protected]').save();
```

### Job states

Jobs can combine together use of `delay`, `after`, and `serialize`. If they are combined, first the delay (if any) is handled, and the job stays in the `delayed` state. Once the delay is finished the job is then put in the `staged` state along with the other jobs that are in the same serialization group. Once other jobs from the same serialization group have finished, the job moves to the `inactive` state to wait for a worker. Once a worker takes on the job it is moved to the `active` state.

## Processing Jobs

Processing jobs is simple with Kue. First create a `Queue` instance much like we do for creating jobs, providing us access to redis etc, then invoke `jobs.process()` with the associated type.
Expand All @@ -177,6 +214,9 @@ jobs.process('email', function(job, done){
```js
jobs.process('email', 20, function(job, done){
// ...
},
function(level, msg, data){
// Print out worker error message here
});
```

Expand Down
106 changes: 106 additions & 0 deletions examples/QoS.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@

var kue = require('../')
, express = require('express');

// create our job queue

var jobs = kue.createQueue();
jobs.watchdog();

// start redis with $ redis-server

// create some jobs at random,
// usually you would create these
// in your http processes upon
// user input etc.

var count = 1000;

function create() {
if(count-- <= 0 ) return;

var name = ['tobi', 'loki', 'jane', 'manny'][Math.random() * 4 | 0];
console.log('- creating job for %s', name);

var stage1 = null, stage2 = null;

stage1 = jobs.create('video conversion', {
title: 'converting ' + name + '\'s to avi'
, user: 1
, frames: 200
}).heartbeat(10000).save(function(err){
stage2 = jobs.create('video conversion', {
title: 'converting ' + name + '\'s to mpeg'
, user: 1
, frames: 200
}).heartbeat(10000).save(function(err){
jobs.create('video analysis', {
title: 'analyzing ' + name + '\'s avi'
, user: 1
, frames: 200
}).heartbeat(10000).serialize('analysis').save();

jobs.create('video analysis', {
title: 'analyzing ' + name + '\'s avi and mpeg'
, user: 1
, frames: 200
}).heartbeat(10000).serialize('analysis').save();
});
});

setTimeout(create, Math.random() * 3000 | 0);
}

if(process.argv.length > 2) {
count = Number(process.argv[2]);
create();
}
else
console.log('usage: node QoS.js [<number-of-jobs>]');

// process video analysis jobs, 6 at a time.

jobs.process('video analysis', 6, function(job, done){
var frames = job.data.frames;
console.log("job process %d", job.id);
function next(i) {
// pretend we are doing some work
convertFrame(i, function(err){
if (err) return done(err);
// report progress, i/frames complete
job.progress(i, frames);
if (i == frames) done();
else next(i + 1);
});
}
next(0);
});

//process video conversion jobs, 4 at a time.

jobs.process('video conversion', 4, function(job, done){
var frames = job.data.frames;
console.log("job process %d", job.id);
function next(i) {
// pretend we are doing some work
convertFrame(i, function(err){
if (err) return done(err);
// report progress, i/frames complete
job.progress(i, frames);
if (i == frames) done();
else next(i + 1);
});
}
next(0);
});

function convertFrame(i, fn) {
setTimeout(fn, Math.random() * 100);
}

// start the UI
var app = express.createServer();
app.use(express.basicAuth('foo', 'bar'));
app.use(kue.app);
app.listen(3000);
console.log('UI started on port 3000');
2 changes: 1 addition & 1 deletion examples/video.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs.process('video conversion', 3, function(job, done){
if (err) return done(err);
// report progress, i/frames complete
job.progress(i, frames);
if (i == frames) done()
if (i == frames) done();
else next(i + 1);
});
}
Expand Down
5 changes: 3 additions & 2 deletions lib/http/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ app.del('/job/:id', provides('json'), json.remove);
// routes

app.get('/', routes.jobs('active'));
app.get('/active', routes.jobs('active'));
app.get('/delayed', routes.jobs('delayed'));
app.get('/staged', routes.jobs('staged'));
app.get('/inactive', routes.jobs('inactive'));
app.get('/active', routes.jobs('active'));
app.get('/failed', routes.jobs('failed'));
app.get('/complete', routes.jobs('complete'));
app.get('/delayed', routes.jobs('delayed'));
2 changes: 1 addition & 1 deletion lib/http/middleware/provides.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ module.exports = function(type){
return function(req, res, next){
if (req.accepts(type)) return next();
next('route');
}
};
};
2 changes: 1 addition & 1 deletion lib/http/public/javascripts/caustic.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ View.prototype.visitDIV = function(el, name){
var self = this;
this[name] = function(val){
if (0 == arguments.length) return el;
el.empty().append(val.el || val);
el.empty().append((val && val.el) ? val.el : val);
return this;
};
};
Expand Down
20 changes: 20 additions & 0 deletions lib/http/public/javascripts/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,26 @@ Job.prototype.renderUpdate = function(){
view.attempts().parent().remove();
}

// restarts
if (this.restarts.made) {
view.restarts(this.restarts.made + '/' + this.restarts.max);
} else {
view.restarts().parent().remove();
}

// precursors
if (this.precursors) {
view.precursors(this.precursors);
} else {
view.precursors().parent().remove();
}

if (this.after) {
view.after(this.after);
} else {
view.after().parent().remove();
}

// title
view.title(this.data.title
? this.data.title
Expand Down
14 changes: 9 additions & 5 deletions lib/http/public/javascripts/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var sort = 'asc';

var loading;

var pollInterval = 1000;

/**
* Initialize UI.
*/
Expand All @@ -57,13 +59,14 @@ function init(state) {
loading.ctx = ctx;
loading.size(canvas.width);

pollStats(1000);
pollStats(pollInterval);
show(state)();
o('li.delayed a').click(show('delayed'));
o('li.staged a').click(show('staged'));
o('li.inactive a').click(show('inactive'));
o('li.complete a').click(show('complete'));
o('li.active a').click(show('active'));
o('li.failed a').click(show('failed'));
o('li.delayed a').click(show('delayed'));

o('#filter').change(function(){
filter = $(this).val();
Expand Down Expand Up @@ -141,7 +144,7 @@ function show(state) {
o('#jobs .job').remove();
o('#menu li a').removeClass('active');
o('#menu li.' + state + ' a').addClass('active');
pollForJobs(state, 2000);
pollForJobs(state, pollInterval);
return false;
}
}
Expand All @@ -159,7 +162,7 @@ function pollForJobs(state, ms) {
infiniteScroll();
pollForJobs.timer = setTimeout(function(){
pollForJobs(state, ms);
}, 1000);
}, pollInterval);
});
};

Expand Down Expand Up @@ -236,11 +239,12 @@ function refreshJobs(state, fn) {

function pollStats(ms) {
request('./stats', function(data){
o('li.delayed .count').text(data.delayedCount);
o('li.staged .count').text(data.stagedCount);
o('li.inactive .count').text(data.inactiveCount);
o('li.active .count').text(data.activeCount);
o('li.complete .count').text(data.completeCount);
o('li.failed .count').text(data.failedCount);
o('li.delayed .count').text(data.delayedCount);
setTimeout(function(){
pollStats(ms);
}, ms);
Expand Down
5 changes: 3 additions & 2 deletions lib/http/public/javascripts/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ function relative(ms) {
*/

var states = {
active: 'active'
delayed: 'delayed'
, inactive: 'inactive'
, staged: 'staged'
, active: 'active'
, failed: 'failed'
, complete: 'complete'
, delayed: 'delayed'
};

/**
Expand Down
Loading