Skip to content

Commit

Permalink
perf(queue): allow to add more than one base marker
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Oct 19, 2024
1 parent 98e1ec6 commit e544be3
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export class Queue<
private _repeat?: Repeat; // To be deprecated in v6 in favor of JobScheduler

private _jobScheduler?: JobScheduler;
private markerCount: number;

constructor(
name: string,
Expand All @@ -118,6 +119,7 @@ export class Queue<

this.jobsOpts = opts?.defaultJobOptions ?? {};

this.markerCount = opts?.markerCount || 1;
this.waitUntilReady()
.then(client => {
if (!this.closing && !opts?.skipMetasUpdate) {
Expand Down
2 changes: 2 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ export class Scripts {
parentOpts: ParentOpts = {},
): Promise<string> {
const queueKeys = this.queue.keys;
const markerCount = (this.queue as any).markerCount as number;

const parent: Record<string, any> = job.parent
? { ...job.parent, fpof: opts.fpof, rdof: opts.rdof, idof: opts.idof }
Expand All @@ -199,6 +200,7 @@ export class Scripts {
parent,
job.repeatJobKey,
job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null,
markerCount || 1,
];

let encodedOpts;
Expand Down
4 changes: 3 additions & 1 deletion src/commands/addStandardJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
[8] parent? {id, queueKey}
[9] repeat job key
[10] deduplication key
[11] marker count
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand All @@ -58,6 +59,7 @@ local parentKey = args[5]
local parent = args[8]
local repeatJobKey = args[9]
local deduplicationKey = args[10]
local markerCount = args[11]
local parentData

-- Includes
Expand Down Expand Up @@ -108,7 +110,7 @@ local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KE

-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId)
addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount)

-- Emit waiting event
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
Expand Down
4 changes: 2 additions & 2 deletions src/commands/includes/addBaseMarkerIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
Add marker if needed when a job is available.
]]

local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount)
if not isPausedOrMaxed then
rcall("ZADD", markerKey, 0, "0")
rcall("ZADD", markerKey, (jobCounter or 1) % (markerCount or 1), "0")
end
end
4 changes: 2 additions & 2 deletions src/commands/includes/addJobInTargetList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
-- Includes
--- @include "addBaseMarkerIfNeeded"

local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount)
rcall(pushCmd, targetKey, jobId)
addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount)
end
7 changes: 7 additions & 0 deletions src/interfaces/queue-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ export interface QueueBaseOptions {
export interface QueueOptions extends QueueBaseOptions {
defaultJobOptions?: DefaultJobOptions;

/**
* Max quantity of base markers to be added. It's recommend to be the same
* as the quantity of worker instances for this specific queue
* @default 1
*/
markerCount?: number;

/**
* Options for the streams used internally in BullMQ.
*/
Expand Down
47 changes: 45 additions & 2 deletions tests/test_bulk.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { expect } from 'chai';
import { default as IORedis } from 'ioredis';
import { after as afterNumExecutions } from 'lodash';
import { after, beforeEach, describe, it, before } from 'mocha';
import { v4 } from 'uuid';
import { Queue, Worker, Job } from '../src/classes';
import { removeAllQueueData } from '../src/utils';
import { Queue, QueueEvents, Worker, Job } from '../src/classes';
import { removeAllQueueData, delay } from '../src/utils';

describe('bulk jobs', () => {
const redisHost = process.env.REDIS_HOST || 'localhost';
Expand Down Expand Up @@ -119,6 +120,48 @@ describe('bulk jobs', () => {
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});

it('should keep workers busy', async () => {
const numJobs = 6;
const queue2 = new Queue(queueName, { connection, markerCount: 2, prefix });

const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();

const worker = new Worker(
queueName,
async () => {
await delay(1000);
},
{ connection, prefix },
);
const worker2 = new Worker(
queueName,
async () => {
await delay(1000);
},
{ connection, prefix },
);
await worker.waitUntilReady();
await worker2.waitUntilReady();

const completed = new Promise(resolve => {
queueEvents.on('completed', afterNumExecutions(numJobs, resolve));
});

const jobs = Array.from(Array(numJobs).keys()).map(index => ({
name: 'test',
data: { index },
}));

await queue2.addBulk(jobs);

await completed;
await queue2.close();
await worker.close();
await worker2.close();
await queueEvents.close();
});

it('should process jobs with custom ids', async () => {
const name = 'test';
let processor;
Expand Down

0 comments on commit e544be3

Please sign in to comment.