Skip to content

Commit

Permalink
fix(worker): should cap update progress events
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 24, 2023
1 parent bec84e8 commit 2cab9e9
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,11 @@ export class Scripts {
): Promise<void> {
const client = await this.queue.client;

const keys = [this.queue.toKey(jobId), this.queue.keys.events];
const keys = [
this.queue.toKey(jobId),
this.queue.keys.events,
this.queue.keys.meta,
];
const progressJson = JSON.stringify(progress);

const result = await (<any>client).updateProgress(
Expand Down
30 changes: 30 additions & 0 deletions src/commands/updateProgress-3.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
--[[
Update job progress
Input:
KEYS[1] Job id key
KEYS[2] event stream key
KEYS[3] meta key
ARGV[1] id
ARGV[2] progress
Output:
0 - OK
-1 - Missing job.
Event:
progress(jobId, progress)
]]
local rcall = redis.call

if rcall("EXISTS", KEYS[1]) == 1 then -- // Make sure job exists
local maxEvents = rcall("HGET", KEYS[3], "opts.maxLenEvents") or 10000

rcall("HSET", KEYS[1], "progress", ARGV[2])
rcall("XADD", KEYS[2], "MAXLEN", "~", maxEvents, "*", "event", "progress",
"jobId", ARGV[1], "data", ARGV[2]);
return 0
else
return -1
end
46 changes: 46 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,52 @@ describe('workers', function () {
await worker.close();
});

it('should cap progress events', async () => {
let processor;

const maxEvents = 10;
const numUpdateProgress = 500;

const trimmedEventsQueue = new Queue(queueName, {
connection,
prefix,
streams: { events: { maxLen: maxEvents } },
});

const job = await trimmedEventsQueue.add('test', { foo: 'bar' });
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');

const processing = new Promise<void>((resolve, reject) => {
processor = async (job: Job) => {
try {
expect(job.data.foo).to.be.equal('bar');

for (let i = 0; i < numUpdateProgress; i++) {
await job.updateProgress(42);
}
resolve();
} catch (err) {
reject(err);
}
};
});

const worker = new Worker(queueName, processor, { connection, prefix });
await worker.waitUntilReady();

await processing;

const eventsLength = await (
await trimmedEventsQueue.client
).xlen(trimmedEventsQueue.keys.events);

expect(eventsLength).to.be.lt(numUpdateProgress);
expect(eventsLength).to.be.gte(maxEvents);

await worker.close();
});

it('process a job that updates progress as object', async () => {
let processor;

Expand Down

0 comments on commit 2cab9e9

Please sign in to comment.