From e544be3f559e935091101305c288dfa237d1d5e4 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Fri, 18 Oct 2024 23:40:21 -0500 Subject: [PATCH] perf(queue): allow to add more than one base marker --- src/classes/queue.ts | 2 + src/classes/scripts.ts | 2 + src/commands/addStandardJob-8.lua | 4 +- .../includes/addBaseMarkerIfNeeded.lua | 4 +- src/commands/includes/addJobInTargetList.lua | 4 +- src/interfaces/queue-options.ts | 7 +++ tests/test_bulk.ts | 47 ++++++++++++++++++- 7 files changed, 63 insertions(+), 7 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 557319c30f..1389cb1d79 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -101,6 +101,7 @@ export class Queue< private _repeat?: Repeat; // To be deprecated in v6 in favor of JobScheduler private _jobScheduler?: JobScheduler; + private markerCount: number; constructor( name: string, @@ -118,6 +119,7 @@ export class Queue< this.jobsOpts = opts?.defaultJobOptions ?? {}; + this.markerCount = opts?.markerCount || 1; this.waitUntilReady() .then(client => { if (!this.closing && !opts?.skipMetasUpdate) { diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 786874cfa8..c6f35b5c40 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -183,6 +183,7 @@ export class Scripts { parentOpts: ParentOpts = {}, ): Promise { const queueKeys = this.queue.keys; + const markerCount = (this.queue as any).markerCount as number; const parent: Record = job.parent ? { ...job.parent, fpof: opts.fpof, rdof: opts.rdof, idof: opts.idof } @@ -199,6 +200,7 @@ export class Scripts { parent, job.repeatJobKey, job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null, + markerCount || 1, ]; let encodedOpts; diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index 7005e91af7..5f4e39ea05 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -35,6 +35,7 @@ [8] parent? {id, queueKey} [9] repeat job key [10] deduplication key + [11] marker count ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -58,6 +59,7 @@ local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] local deduplicationKey = args[10] +local markerCount = args[11] local parentData -- Includes @@ -108,7 +110,7 @@ local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KE -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' -addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId) +addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/includes/addBaseMarkerIfNeeded.lua b/src/commands/includes/addBaseMarkerIfNeeded.lua index af10026589..5b36c62358 100644 --- a/src/commands/includes/addBaseMarkerIfNeeded.lua +++ b/src/commands/includes/addBaseMarkerIfNeeded.lua @@ -2,8 +2,8 @@ Add marker if needed when a job is available. ]] -local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) +local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount) if not isPausedOrMaxed then - rcall("ZADD", markerKey, 0, "0") + rcall("ZADD", markerKey, (jobCounter or 1) % (markerCount or 1), "0") end end diff --git a/src/commands/includes/addJobInTargetList.lua b/src/commands/includes/addJobInTargetList.lua index 80f7bc0173..08e8e35946 100644 --- a/src/commands/includes/addJobInTargetList.lua +++ b/src/commands/includes/addJobInTargetList.lua @@ -5,7 +5,7 @@ -- Includes --- @include "addBaseMarkerIfNeeded" -local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) +local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount) rcall(pushCmd, targetKey, jobId) - addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount) end diff --git a/src/interfaces/queue-options.ts b/src/interfaces/queue-options.ts index 0f771403b9..5a2e43f29f 100644 --- a/src/interfaces/queue-options.ts +++ b/src/interfaces/queue-options.ts @@ -39,6 +39,13 @@ export interface QueueBaseOptions { export interface QueueOptions extends QueueBaseOptions { defaultJobOptions?: DefaultJobOptions; + /** + * Max quantity of base markers to be added. It's recommend to be the same + * as the quantity of worker instances for this specific queue + * @default 1 + */ + markerCount?: number; + /** * Options for the streams used internally in BullMQ. */ diff --git a/tests/test_bulk.ts b/tests/test_bulk.ts index 5cff0f2122..1d6b8eb4ec 100644 --- a/tests/test_bulk.ts +++ b/tests/test_bulk.ts @@ -1,9 +1,10 @@ import { expect } from 'chai'; import { default as IORedis } from 'ioredis'; +import { after as afterNumExecutions } from 'lodash'; import { after, beforeEach, describe, it, before } from 'mocha'; import { v4 } from 'uuid'; -import { Queue, Worker, Job } from '../src/classes'; -import { removeAllQueueData } from '../src/utils'; +import { Queue, QueueEvents, Worker, Job } from '../src/classes'; +import { removeAllQueueData, delay } from '../src/utils'; describe('bulk jobs', () => { const redisHost = process.env.REDIS_HOST || 'localhost'; @@ -119,6 +120,48 @@ describe('bulk jobs', () => { await removeAllQueueData(new IORedis(redisHost), parentQueueName); }); + it('should keep workers busy', async () => { + const numJobs = 6; + const queue2 = new Queue(queueName, { connection, markerCount: 2, prefix }); + + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + + const worker = new Worker( + queueName, + async () => { + await delay(1000); + }, + { connection, prefix }, + ); + const worker2 = new Worker( + queueName, + async () => { + await delay(1000); + }, + { connection, prefix }, + ); + await worker.waitUntilReady(); + await worker2.waitUntilReady(); + + const completed = new Promise(resolve => { + queueEvents.on('completed', afterNumExecutions(numJobs, resolve)); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(index => ({ + name: 'test', + data: { index }, + })); + + await queue2.addBulk(jobs); + + await completed; + await queue2.close(); + await worker.close(); + await worker2.close(); + await queueEvents.close(); + }); + it('should process jobs with custom ids', async () => { const name = 'test'; let processor;