Skip to content

Commit

Permalink
fix(marker): differentiate standard and delayed markers
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Feb 3, 2024
1 parent b3ab285 commit cbbb502
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/commands/includes/addDelayMarkerIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ local function addDelayMarkerIfNeeded(markerKey, delayedKey)
if nextTimestamp ~= nil then
-- Replace the score of the marker with the newest known
-- next timestamp.
rcall("ZADD", markerKey, nextTimestamp, "0")
rcall("ZADD", markerKey, nextTimestamp, "1")
end
end
56 changes: 56 additions & 0 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { v4 } from 'uuid';
import { Job, Queue, QueueEvents, Worker } from '../src/classes';
import { JobsOptions } from '../src/types';
import { delay, getParentKey, removeAllQueueData } from '../src/utils';
import { SSL_OP_NO_TLSv1_2 } from 'constants';

describe('Job', function () {
const redisHost = process.env.REDIS_HOST || 'localhost';
Expand Down Expand Up @@ -882,6 +883,61 @@ describe('Job', function () {
`Job ${job.id} is not in the delayed state. changeDelay`,
);
});

describe('when adding delayed job after standard one when worker is drained', () => {
it('pick standard job without delay', async function () {
this.timeout(6000);

await Job.create(queue, 'test1', { foo: 'bar' });

const worker = new Worker(
queueName,
async job => {
await delay(1000);
},
{
connection,
prefix,
},
);
await worker.waitUntilReady();

// after this event, worker should be drained
const completing = new Promise<void>(resolve => {
worker.once('completed', async () => {
await queue.addBulk([
{ name: 'test1', data: { idx: 0, foo: 'bar' } },
{
name: 'test2',
data: { idx: 1, foo: 'baz' },
opts: { delay: 3000 },
},
]);

resolve();
});
});

await completing;

const now = Date.now();
const completing2 = new Promise<void>(resolve => {
worker.on(
'completed',
after(2, job => {
const timeDiff = Date.now() - now;
expect(timeDiff).to.be.greaterThanOrEqual(4000);
expect(timeDiff).to.be.lessThan(4500);
expect(job.delay).to.be.equal(0);
resolve();
}),
);
});

await completing2;
await worker.close();
});
});
});

describe('.changePriority', () => {
Expand Down

0 comments on commit cbbb502

Please sign in to comment.