Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): allow passing debounce as option #2666

Merged
merged 12 commits into from
Jul 29, 2024
1 change: 1 addition & 0 deletions docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [LIFO](guide/jobs/lifo.md)
* [Job Ids](guide/jobs/job-ids.md)
* [Job Data](guide/jobs/job-data.md)
* [Debouncing](guide/jobs/debouncing.md)
* [Delayed](guide/jobs/delayed.md)
* [Repeatable](guide/jobs/repeatable.md)
* [Prioritized](guide/jobs/prioritized.md)
Expand Down
49 changes: 49 additions & 0 deletions docs/gitbook/guide/jobs/debouncing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Debouncing

Debouncing in BullMQ is a process where job execution is delayed and deduplicated based on specific identifiers. It ensures that within a specified period, or until a specific job is completed or failed, no new jobs with the same identifier will be added to the queue. Instead, these attempts will trigger a debounced event.

## Fixed Mode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> In the Fixed Mode, debouncing works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique debouncer ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization.

In the Fixed Mode, debouncing works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique debouncer ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization.

```typescript
import { Queue } from 'bullmq';

const myQueue = new Queue('Paint');

// Add a job that will be debounced for 5 seconds.
await myQueue.add(
'house',
{ color: 'white' },
{ debounce: { id: 'customValue', ttl: 5000 } },
);
```

In this example, after adding the house painting job with the debouncing parameters (id and ttl), any subsequent job with the same debouncing ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job.

Note that you must provide a debounce id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier.

## Extended Mode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Extended Mode takes a different approach by extending the debouncing duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same debouncer ID will be ignored.

The Extended Mode takes a different approach by extending the debouncing duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same debouncer ID will be ignored.

```typescript
// Add a job that will be debounced as this record is not finished (completed or failed).
await myQueue.add(
'house',
{ color: 'white' },
{ debounce: { id: 'customValue' } },
);
```

While this job is not moved to completed or failed state, next jobs added with same **debounce id** will be ignored and a _debounced_ event will be triggered by our QueueEvent class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress.

This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress.

{% hint style="warning" %}
Any manual deletion will disable the debouncing. For example, when calling _job.remove_ method.
{% endhint %}

## Read more:

- 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add)
13 changes: 13 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import type { QueueEvents } from './queue-events';
const logger = debuglog('bull');

const optsDecodeMap = {
de: 'debounce',
fpof: 'failParentOnFailure',
idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
Expand Down Expand Up @@ -136,6 +137,11 @@ export class Job<
*/
parent?: ParentKeys;

/**
* Debounce identifier.
*/
debounceId?: string;

/**
* Base repeat job key.
*/
Expand Down Expand Up @@ -199,6 +205,8 @@ export class Job<
? { id: opts.parent.id, queueKey: opts.parent.queue }
: undefined;

this.debounceId = opts.debounce ? opts.debounce.id : undefined;

this.toKey = queue.toKey.bind(queue);
this.setScripts();

Expand Down Expand Up @@ -322,6 +330,10 @@ export class Job<
job.repeatJobKey = json.rjk;
}

if (json.deid) {
job.debounceId = json.deid;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be better to use the same opts structure instead of having a new debounceId field in the job class? This will keep the class cleaner, specially if we add more options to the debounce option.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't save all the option because ttl is not reuse as debounceId, similar to repeatable jobs, we only save repeatJobKey instead of saving all repeat options in job instance.

}

job.failedReason = json.failedReason;

job.attemptsStarted = parseInt(json.ats || '0');
Expand Down Expand Up @@ -445,6 +457,7 @@ export class Job<
timestamp: this.timestamp,
failedReason: JSON.stringify(this.failedReason),
stacktrace: JSON.stringify(this.stacktrace),
debounceId: this.debounceId,
repeatJobKey: this.repeatJobKey,
returnvalue: JSON.stringify(this.returnvalue),
};
Expand Down
7 changes: 7 additions & 0 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export interface QueueEventsListener extends IoredisListener {
id: string,
) => void;

/**
* Listen to 'debounced' event.
*
* This event is triggered when a job is debounced because debouncedId still existed.
*/
debounced: (args: { jobId: string }, id: string) => void;

/**
* Listen to 'delayed' event.
*
Expand Down
1 change: 1 addition & 0 deletions src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export class QueueKeys {
'events',
'pc', // priority counter key
'marker', // marker key
'de', // debounce key
].forEach(key => {
keys[key] = this.toKey(name, key);
});
Expand Down
26 changes: 18 additions & 8 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ export class Queue<
* Get global concurrency value.
* Returns null in case no value is set.
*/
async getGlobalConcurrency():Promise<number|null> {
async getGlobalConcurrency(): Promise<number | null> {
const client = await this.client;
const concurrency = await client.hget(this.keys.meta, 'concurrency');
if(concurrency){
if (concurrency) {
return Number(concurrency);
}
return null;
Expand All @@ -203,12 +203,11 @@ export class Queue<
* is processed at any given time. If this limit is not defined, there will be no
* restriction on the number of concurrent jobs.
*/
async setGlobalConcurrency(concurrency: number) {
const client = await this.client;
return client.hset(this.keys.meta, 'concurrency', concurrency);
}


async setGlobalConcurrency(concurrency: number) {
const client = await this.client;
return client.hset(this.keys.meta, 'concurrency', concurrency);
}

/**
* Adds a new job to the queue.
*
Expand Down Expand Up @@ -374,6 +373,17 @@ export class Queue<
return !removed;
}

/**
* Removes a debounce key.
*
* @param id - identifier
*/
async removeDebounceKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.de}:${id}`);
}

/**
* Removes a repeatable job by its key. Note that the key is the one used
* to store the repeatable job metadata and not one of the job iterations
Expand Down
1 change: 1 addition & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ export class Scripts {
parentOpts.parentDependenciesKey || null,
parent,
job.repeatJobKey,
job.debounceId ? `${queueKeys.de}:${job.debounceId}` : null,
];

let encodedOpts;
Expand Down
12 changes: 11 additions & 1 deletion src/commands/addDelayedJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key
[10] debounce key

ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand All @@ -49,12 +50,14 @@ local args = cmsgpack.unpack(ARGV[1])
local data = ARGV[2]

local parentKey = args[5]
local repeatJobKey = args[9]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local parentData

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/debounceJob"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
Expand All @@ -73,6 +76,7 @@ local opts = cmsgpack.unpack(ARGV[3])

local parentDependenciesKey = args[7]
local timestamp = args[4]

if args[2] == "" then
jobId = jobCounter
jobIdKey = args[1] .. jobId
Expand All @@ -86,6 +90,12 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
end

-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are storing the opts with the debounce options, why do we need to pass the debounceId again?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I can refactor it

opts, timestamp, parentKey, parentData,
Expand Down
11 changes: 10 additions & 1 deletion src/commands/addParentJob-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key
[10] debounce key

ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand All @@ -44,11 +45,13 @@ local data = ARGV[2]
local opts = cmsgpack.unpack(ARGV[3])

local parentKey = args[5]
local repeatJobKey = args[9]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local parentData

-- Includes
--- @include "includes/debounceJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
--- @include "includes/storeJob"
Expand Down Expand Up @@ -78,6 +81,12 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
end

-- Store the job.
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
parentKey, parentData, repeatJobKey)
Expand Down
13 changes: 11 additions & 2 deletions src/commands/addPrioritizedJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key

[10] debounce key

ARGV[2] Json stringified job data
ARGV[3] msgpacked options

Expand All @@ -51,12 +52,14 @@ local data = ARGV[2]
local opts = cmsgpack.unpack(ARGV[3])

local parentKey = args[5]
local repeatJobKey = args[9]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local parentData

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/debounceJob"
--- @include "includes/storeJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
Expand Down Expand Up @@ -87,6 +90,12 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
end

-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
Expand Down
13 changes: 11 additions & 2 deletions src/commands/addStandardJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key

[10] debounce key

ARGV[2] Json stringified job data
ARGV[3] msgpacked options

Expand All @@ -54,12 +55,14 @@ local data = ARGV[2]
local opts = cmsgpack.unpack(ARGV[3])

local parentKey = args[5]
local repeatJobKey = args[9]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local parentData

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/debounceJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
--- @include "includes/handleDuplicatedJob"
Expand Down Expand Up @@ -91,6 +94,12 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
end

-- Store the job.
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
parentKey, parentData, repeatJobKey)
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/cleanList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd,
-- replace the entry with a deletion marker; the actual deletion will
-- occur at the end of the script
rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)
removeJob(job, true, jobKeyPrefix)
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
Expand Down
4 changes: 2 additions & 2 deletions src/commands/includes/cleanSet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attrib

local jobKey = jobKeyPrefix .. job
if isFinished then
removeJob(job, true, jobKeyPrefix)
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
deletedCount = deletedCount + 1
table.insert(deleted, job)
else
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS <= timestamp) then
removeJob(job, true, jobKeyPrefix)
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
Expand Down
23 changes: 23 additions & 0 deletions src/commands/includes/debounceJob.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
--[[
Function to debounce a job.
]]

local function debounceJob(prefixKey, debounceOpts, jobId, debounceKey, eventsKey, maxEvents)
local debounceId = debounceOpts and debounceOpts['id']
if debounceId then
local ttl = debounceOpts['ttl']
local debounceKeyExists
if ttl then
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX')
else
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX')
end
if debounceKeyExists then
local currentDebounceJobId = rcall('GET', debounceKey)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
"debounced", "jobId", currentDebounceJobId)
return currentDebounceJobId
end
end
end

Loading
Loading