Skip to content

Commit

Permalink
fix(priority): use module instead of bit.band to keep order
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jun 12, 2024
1 parent 74e7cce commit d1f32c5
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/commands/includes/addJobWithPriority.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused)
local prioCounter = rcall("INCR", priorityCounterKey)
local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff)
local score = priority * 0x100000000 + prioCounter % 0x100000000
rcall("ZADD", prioritizedKey, score, jobId)
addBaseMarkerIfNeeded(markerKey, isPaused)
end
61 changes: 61 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,67 @@ describe('workers', function () {
await worker.close();
});

describe('when priority counter is having a high number', () => {
it('should process jobs by priority', async () => {
let processor;

const numJobsPerPriority = 6;

const jobs = Array.from(Array(18).keys()).map(index => ({
name: 'test',
data: { p: (index % 3) + 1 },
opts: {
priority: (index % 3) + 1,
},
}));
await queue.addBulk(jobs);
const client = await queue.client;
await client.incrby(`${prefix}:${queue.name}:pc`, 2147483648);
await queue.addBulk(jobs);

let currentPriority = 1;
let counter = 0;
let total = 0;
const countersPerPriority = {};

const processing = new Promise<void>((resolve, reject) => {
processor = async (job: Job) => {
await delay(10);
try {
if (countersPerPriority[job.data.p]) {
expect(countersPerPriority[job.data.p]).to.be.lessThan(
+job.id!,
);
}

countersPerPriority[job.data.p] = +job.id!;
expect(job.id).to.be.ok;
expect(job.data.p).to.be.eql(currentPriority);
} catch (err) {
reject(err);
}

total++;
if (++counter === numJobsPerPriority * 2) {
currentPriority++;
counter = 0;

if (currentPriority === 4 && total === numJobsPerPriority * 6) {
resolve();
}
}
};
});

const worker = new Worker(queueName, processor, { connection, prefix });
await worker.waitUntilReady();

await processing;

await worker.close();
});
});

describe('while processing last active job', () => {
it('should process prioritized job whithout delay', async function () {
this.timeout(1000);
Expand Down

0 comments on commit d1f32c5

Please sign in to comment.