Skip to content

Commit

Permalink
fix(move-to-finished): consider addition of prioritized jobs when pro…
Browse files Browse the repository at this point in the history
…cessing last active job (#2176) (python)
  • Loading branch information
roggervalf authored Sep 11, 2023
1 parent 09bae92 commit 4b01f35
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 6 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@
{
"releaseRules": [
{
"message": "*python*",
"message": "*[python]*",
"release": false
}
]
Expand Down
9 changes: 8 additions & 1 deletion src/commands/moveToFinished-13.lua
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,17 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists

jobId = rcall("RPOPLPUSH", KEYS[1], KEYS[2])

-- If jobId is special ID 0:delay, then there is no job to process
if jobId then
if string.sub(jobId, 1, 2) == "0:" then
rcall("LREM", KEYS[2], 1, jobId)

-- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process
-- but if ID is 0:0, then there is at least 1 prioritized job to process
if jobId == "0:0" then
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, timestamp,
maxJobs, expireTime, opts)
end
else
return prepareJobForProcessing(KEYS, ARGV[8], target, jobId, timestamp, maxJobs,
expireTime, opts)
Expand Down
42 changes: 38 additions & 4 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,10 @@ describe('workers', function () {
await worker.close();
});

it('should processes jobs by priority', async () => {
const normalPriority = [];
const mediumPriority = [];
const highPriority = [];
it('should process jobs by priority', async () => {
const normalPriority: Promise<Job>[] = [];
const mediumPriority: Promise<Job>[] = [];
const highPriority: Promise<Job>[] = [];

let processor;

Expand Down Expand Up @@ -677,6 +677,40 @@ describe('workers', function () {
await worker.close();
});

describe('when prioritized job is added while processing last active job', () => {
it('should process prioritized job whithout delay', async function () {
this.timeout(1000);
await queue.add('test1', { p: 2 }, { priority: 2 });
let counter = 0;
let processor;
const processing = new Promise<void>((resolve, reject) => {
processor = async (job: Job) => {
try {
if (job.name == 'test1') {
await queue.add('test', { p: 2 }, { priority: 2 });
}

expect(job.id).to.be.ok;
expect(job.data.p).to.be.eql(2);
} catch (err) {
reject(err);
}

if (++counter === 2) {
resolve();
}
};
});

const worker = new Worker(queueName, processor, { connection });
await worker.waitUntilReady();

await processing;

await worker.close();
});
});

it('process several jobs serially', async () => {
let counter = 1;
const maxJobs = 35;
Expand Down

0 comments on commit 4b01f35

Please sign in to comment.