Skip to content

Commit

Permalink
Merge branch 'nextorigin-issue-185'
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 16, 2016
2 parents 8459ecf + 6cf1ca2 commit 82a90b6
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 331 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ sudo: false

# test on node.js versions
node_js:
- '6'
- '4'

services:
Expand Down
71 changes: 29 additions & 42 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ var Promise = require('bluebird');
var _ = require('lodash');
var scripts = require('./scripts');
var debuglog = require('debuglog')('bull');
var uuid = require('node-uuid');

/**
interface JobOptions
Expand Down Expand Up @@ -59,11 +58,11 @@ Job.fromId = function(queue, jobId){
if(!jobId) {
return Promise.resolve();
}
return queue.client.hgetallAsync(queue.toKey(jobId)).then(function(jobData){
if(jobData){
return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){
if(!_.isEmpty(jobData)){
return Job.fromData(queue, jobId, jobData);
}else{
return jobData;
return null;
}
});
};
Expand All @@ -85,7 +84,7 @@ Job.prototype.toData = function(){
Job.prototype.progress = function(progress){
if(progress){
var _this = this;
return this.queue.client.hsetAsync(this.queue.toKey(this.jobId), 'progress', progress).then(function(){
return this.queue.client.hset(this.queue.toKey(this.jobId), 'progress', progress).then(function(){
_this.queue.distEmit('progress', _this.toJSON(), progress);
});
}else{
Expand Down Expand Up @@ -123,24 +122,29 @@ Job.prototype.lockKey = function(){
Takes a lock for this job so that no other queue worker can process it at the
same time.
*/
Job.prototype.takeLock = function(token, renew, ensureActive){
return scripts.takeLock(this.queue, this, token, renew, ensureActive).then(function(res){
return res === 1; // Indicates successful lock.
Job.prototype.takeLock = function(renew, ensureActive){
var _this = this;
return scripts.takeLock(this.queue, this, renew, ensureActive)
.then(function(lock) {
if (lock) _this.lock = lock;
return lock || false;
});
};

/**
Renews a lock so that it gets some more time before expiring.
*/
Job.prototype.renewLock = function(token){
return this.takeLock(token, true /* Renew */);
Job.prototype.renewLock = function(){
return this.takeLock(true /* Renew */);
};

/**
Releases the lock. Only locks owned by the queue instance can be released.
*/
Job.prototype.releaseLock = function(token){
return scripts.releaseLock(this, token);
Job.prototype.releaseLock = function(){
var _this = this;
return scripts.releaseLock(this)
.then(function() { _this.lock = null; });
};

Job.prototype.delayIfNeeded = function(){
Expand All @@ -155,19 +159,19 @@ Job.prototype.delayIfNeeded = function(){
return Promise.resolve(false);
};

Job.prototype.moveToCompleted = function(returnValue, token){
Job.prototype.moveToCompleted = function(returnValue){
this.returnvalue = returnValue || 0;
return scripts.moveToCompleted(this, token || 0, this.opts.removeOnComplete);
return scripts.moveToCompleted(this || 0, this.opts.removeOnComplete);
};

Job.prototype.move = function(src, target, token, returnValue){
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
}
}
return scripts.move(this, token || 0, src, target);
return scripts.move(this || 0, src, target);
}

Job.prototype.moveToFailed = function(err){
Expand Down Expand Up @@ -211,7 +215,7 @@ Job.prototype.promote = function(){
return queue.toKey(name);
});

return queue.client.evalAsync(
return queue.client.eval(
script,
keys.length,
keys[0],
Expand Down Expand Up @@ -256,7 +260,7 @@ Job.prototype.isFailed = function(){

Job.prototype.isDelayed = function() {
return this.queue.client
.zrankAsync(this.queue.toKey('delayed'), this.jobId).then(function(rank) {
.zrank(this.queue.toKey('delayed'), this.jobId).then(function(rank) {
return rank !== null;
});
};
Expand Down Expand Up @@ -305,13 +309,11 @@ Job.prototype.getState = function() {
/**
Removes a job from the queue and from all the lists where it may be stored.
*/
Job.prototype.remove = function(token){
Job.prototype.remove = function(){
var queue = this.queue;
var job = this;

var token = token || uuid();

return job.takeLock(token).then(function(lock) {
return job.takeLock().then(function(lock) {
if (!lock) {
throw new Error('Could not get lock for job: ' + job.jobId + '. Cannot remove job.');
}
Expand All @@ -320,7 +322,7 @@ Job.prototype.remove = function(token){
queue.emit('removed', job.toJSON());
})
.finally(function () {
return job.releaseLock(token);
return job.releaseLock();
});
});
};
Expand Down Expand Up @@ -399,28 +401,13 @@ Job.prototype.finished = function(){
// -----------------------------------------------------------------------------
Job.prototype._isDone = function(list){
return this.queue.client
.sismemberAsync(this.queue.toKey(list), this.jobId).then(function(isMember){
.sismember(this.queue.toKey(list), this.jobId).then(function(isMember){
return isMember === 1;
});
};

Job.prototype._isInList = function(list) {
var script = [
'local function item_in_list (list, item)',
' for _, v in pairs(list) do',
' if v == item then',
' return 1',
' end',
' end',
' return nil',
'end',
'local items = redis.call("LRANGE", KEYS[1], 0, -1)',
'return item_in_list(items, ARGV[1])'
].join('\n');

return this.queue.client.evalAsync(script, 1, this.queue.toKey(list), this.jobId).then(function(result){
return result === 1;
});
return scripts.isJobInList(this.queue.client, this.queue.toKey(list), this.jobId);
};

Job.prototype._moveToSet = function(set, context){
Expand Down Expand Up @@ -466,7 +453,7 @@ Job.prototype._retryAtOnce = function(){

var pushCmd = (this.opts.lifo ? 'R' : 'L') + 'PUSH';

return queue.client.evalAsync(
return queue.client.eval(
script,
keys.length,
keys[0],
Expand Down Expand Up @@ -495,7 +482,7 @@ Job.prototype._saveAttempt = function(err){

params.failedReason = err.message;

return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params);
return this.queue.client.hmset(this.queue.toKey(this.jobId), params);
};

/**
Expand Down
Loading

0 comments on commit 82a90b6

Please sign in to comment.