From 027d52cc4d02e49c50c6f198167beb000928e626 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Thu, 6 Jun 2024 20:12:41 -0500 Subject: [PATCH 1/2] feat(queue): add getCountsPerPriority method --- src/classes/queue-getters.ts | 17 +++++++++++++ src/classes/scripts.ts | 18 ++++++++++++++ src/commands/getCountsPerPriority-2.lua | 25 ++++++++++++++++++++ src/commands/includes/addJobWithPriority.lua | 2 +- tests/test_getters.ts | 24 +++++++++++++++++++ 5 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 src/commands/getCountsPerPriority-2.lua diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index 4b40395c8f..ba3690e89a 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -190,6 +190,23 @@ export class QueueGetters< return this.getJobCountByTypes('prioritized'); } + /** + * Returns the number of jobs per priority. + */ + async getCountsPerPriority(priorities: number[]): Promise<{ + [index: string]: number; + }> { + const uniquePriorities = [...new Set(priorities)]; + const responses = await this.scripts.getCountsPerPriority(uniquePriorities); + + const counts: { [index: string]: number } = {}; + responses.forEach((res, index) => { + counts[`${uniquePriorities[index]}`] = res || 0; + }); + + return counts; + } + /** * Returns the number of jobs in waiting or paused statuses. */ diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index f8a029d744..5a86b30bf6 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -614,6 +614,24 @@ export class Scripts { return (client).getCounts(args); } + private getCountsPerPriorityArgs(priorities: number[]): (string | number)[] { + const keys: (string | number)[] = [ + this.queue.keys.wait, + this.queue.keys.prioritized, + ]; + + const args = priorities; + + return keys.concat(args); + } + + async getCountsPerPriority(priorities: number[]): Promise { + const client = await this.queue.client; + const args = this.getCountsPerPriorityArgs(priorities); + + return (client).getCountsPerPriority(args); + } + moveToCompletedArgs( job: MinimalJob, returnvalue: R, diff --git a/src/commands/getCountsPerPriority-2.lua b/src/commands/getCountsPerPriority-2.lua new file mode 100644 index 0000000000..a6e4347593 --- /dev/null +++ b/src/commands/getCountsPerPriority-2.lua @@ -0,0 +1,25 @@ +--[[ + Get counts per provided states + + Input: + KEYS[1] wait key + KEYS[2] prioritized key + + ARGV[1...] priorities +]] +local rcall = redis.call +local results = {} +local waitKey = KEYS[1] +local prioritizedKey = KEYS[2] + +for i = 1, #ARGV do + local priority = tonumber(ARGV[i]) + if priority == 0 then + results[#results+1] = rcall("LLEN", waitKey) + else + results[#results+1] = rcall("ZCOUNT", prioritizedKey, + priority * 0x100000000, (priority + 1) * 0x100000000 - 1) + end +end + +return results diff --git a/src/commands/includes/addJobWithPriority.lua b/src/commands/includes/addJobWithPriority.lua index 639b1efc2d..594c96347d 100644 --- a/src/commands/includes/addJobWithPriority.lua +++ b/src/commands/includes/addJobWithPriority.lua @@ -7,7 +7,7 @@ local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) local prioCounter = rcall("INCR", priorityCounterKey) - local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff) + local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffff) rcall("ZADD", prioritizedKey, score, jobId) addBaseMarkerIfNeeded(markerKey, isPaused) end diff --git a/tests/test_getters.ts b/tests/test_getters.ts index 4c22cfab96..ecc853c6f1 100644 --- a/tests/test_getters.ts +++ b/tests/test_getters.ts @@ -844,6 +844,30 @@ describe('Jobs getters', function () { }); }); + describe('.getCountsPerPriority', () => { + it('returns job counts per priority', async () => { + await queue.waitUntilReady(); + + const jobs = Array.from(Array(42).keys()).map(index => ({ + name: 'test', + data: {}, + opts: { + priority: index % 4, + }, + })); + await queue.addBulk(jobs); + + const counts = await queue.getCountsPerPriority([0, 1, 2, 3]); + + expect(counts).to.be.eql({ + '0': 11, + '1': 11, + '2': 10, + '3': 10, + }); + }); + }); + describe('.getDependencies', () => { it('return unprocessed jobs that are dependencies of a given parent job', async () => { const flowProducer = new FlowProducer({ connection, prefix }); From f3f9dd9becc4ca97aab150384318ae0b2b23279e Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Fri, 7 Jun 2024 08:31:38 -0600 Subject: [PATCH 2/2] chore: restore previous priority number --- src/commands/includes/addJobWithPriority.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commands/includes/addJobWithPriority.lua b/src/commands/includes/addJobWithPriority.lua index 594c96347d..639b1efc2d 100644 --- a/src/commands/includes/addJobWithPriority.lua +++ b/src/commands/includes/addJobWithPriority.lua @@ -7,7 +7,7 @@ local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) local prioCounter = rcall("INCR", priorityCounterKey) - local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffff) + local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff) rcall("ZADD", prioritizedKey, score, jobId) addBaseMarkerIfNeeded(markerKey, isPaused) end