From d1f32c5d3625723757ae9986a59f32bd05ca2e45 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 8 Jun 2024 00:00:07 -0500 Subject: [PATCH] fix(priority): use module instead of bit.band to keep order --- src/commands/includes/addJobWithPriority.lua | 2 +- tests/test_worker.ts | 61 ++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/src/commands/includes/addJobWithPriority.lua b/src/commands/includes/addJobWithPriority.lua index 639b1efc2d..f83e887368 100644 --- a/src/commands/includes/addJobWithPriority.lua +++ b/src/commands/includes/addJobWithPriority.lua @@ -7,7 +7,7 @@ local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) local prioCounter = rcall("INCR", priorityCounterKey) - local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff) + local score = priority * 0x100000000 + prioCounter % 0x100000000 rcall("ZADD", prioritizedKey, score, jobId) addBaseMarkerIfNeeded(markerKey, isPaused) end diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 4cc39ca466..2d34c47c19 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1517,6 +1517,67 @@ describe('workers', function () { await worker.close(); }); + describe('when priority counter is having a high number', () => { + it('should process jobs by priority', async () => { + let processor; + + const numJobsPerPriority = 6; + + const jobs = Array.from(Array(18).keys()).map(index => ({ + name: 'test', + data: { p: (index % 3) + 1 }, + opts: { + priority: (index % 3) + 1, + }, + })); + await queue.addBulk(jobs); + const client = await queue.client; + await client.incrby(`${prefix}:${queue.name}:pc`, 2147483648); + await queue.addBulk(jobs); + + let currentPriority = 1; + let counter = 0; + let total = 0; + const countersPerPriority = {}; + + const processing = new Promise((resolve, reject) => { + processor = async (job: Job) => { + await delay(10); + try { + if (countersPerPriority[job.data.p]) { + expect(countersPerPriority[job.data.p]).to.be.lessThan( + +job.id!, + ); + } + + countersPerPriority[job.data.p] = +job.id!; + expect(job.id).to.be.ok; + expect(job.data.p).to.be.eql(currentPriority); + } catch (err) { + reject(err); + } + + total++; + if (++counter === numJobsPerPriority * 2) { + currentPriority++; + counter = 0; + + if (currentPriority === 4 && total === numJobsPerPriority * 6) { + resolve(); + } + } + }; + }); + + const worker = new Worker(queueName, processor, { connection, prefix }); + await worker.waitUntilReady(); + + await processing; + + await worker.close(); + }); + }); + describe('while processing last active job', () => { it('should process prioritized job whithout delay', async function () { this.timeout(1000);