diff --git a/package.json b/package.json index c6e945cbd0..af8a314221 100644 --- a/package.json +++ b/package.json @@ -174,7 +174,7 @@ { "releaseRules": [ { - "message": "*python*", + "message": "*[python]*", "release": false } ] diff --git a/src/commands/moveToFinished-13.lua b/src/commands/moveToFinished-13.lua index 02293cbffd..9d924ccb38 100644 --- a/src/commands/moveToFinished-13.lua +++ b/src/commands/moveToFinished-13.lua @@ -216,10 +216,17 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2]) - -- If jobId is special ID 0:delay, then there is no job to process if jobId then if string.sub(jobId, 1, 2) == "0:" then rcall("LREM", KEYS[2], 1, jobId) + + -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process + -- but if ID is 0:0, then there is at least 1 prioritized job to process + if jobId == "0:0" then + jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) + return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, timestamp, + maxJobs, expireTime, opts) + end else return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, timestamp, maxJobs, expireTime, opts) diff --git a/tests/test_worker.ts b/tests/test_worker.ts index c63418b4e4..26a68c9173 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -624,10 +624,10 @@ describe('workers', function () { await worker.close(); }); - it('should processes jobs by priority', async () => { - const normalPriority = []; - const mediumPriority = []; - const highPriority = []; + it('should process jobs by priority', async () => { + const normalPriority: Promise[] = []; + const mediumPriority: Promise[] = []; + const highPriority: Promise[] = []; let processor; @@ -677,6 +677,40 @@ describe('workers', function () { await worker.close(); }); + describe('when prioritized job is added while processing last active job', () => { + it('should process prioritized job whithout delay', async function () { + this.timeout(1000); + await queue.add('test1', { p: 2 }, { priority: 2 }); + let counter = 0; + let processor; + const processing = new Promise((resolve, reject) => { + processor = async (job: Job) => { + try { + if (job.name == 'test1') { + await queue.add('test', { p: 2 }, { priority: 2 }); + } + + expect(job.id).to.be.ok; + expect(job.data.p).to.be.eql(2); + } catch (err) { + reject(err); + } + + if (++counter === 2) { + resolve(); + } + }; + }); + + const worker = new Worker(queueName, processor, { connection }); + await worker.waitUntilReady(); + + await processing; + + await worker.close(); + }); + }); + it('process several jobs serially', async () => { let counter = 1; const maxJobs = 35;