Skip to content

Commit

Permalink
fix(job): remove debounce option in favor of deduplication (#2924)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Nov 22, 2024
1 parent 3b451d2 commit 10bf0fb
Show file tree
Hide file tree
Showing 25 changed files with 130 additions and 361 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Since there are a few job queue solutions, here is a table comparing them:
| Group Support || | | | | |
| Batches Support || | | | | |
| Parent/Child Dependencies ||| | | | |
| Debouncing |||| | | |
| Deduplication |||| | | |
| Priorities ||||| ||
| Concurrency |||||||
| Delayed jobs ||||| ||
Expand Down
7 changes: 7 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# [5.29.0](https://github.com/taskforcesh/bullmq/compare/v5.28.2...v5.29.0) (2024-11-22)


### Features

* **queue:** refactor a protected addJob method allowing telemetry extensions ([09f2571](https://github.com/taskforcesh/bullmq/commit/09f257196f6d5a6690edbf55f12d585cec86ee8f))

## [5.28.2](https://github.com/taskforcesh/bullmq/compare/v5.28.1...v5.28.2) (2024-11-22)


Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.28.2",
"version": "5.29.0",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
14 changes: 2 additions & 12 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ const optsDecodeMap = {
kl: 'keepLogs',
ocf: 'onChildFailure',
rdof: 'removeDependencyOnFailure',
tm: 'telemetryMetadata'
tm: 'telemetryMetadata',
};

const optsEncodeMap = invertObject(optsDecodeMap);
optsEncodeMap.debounce = 'de';

export const PRIORITY_LIMIT = 2 ** 21;

Expand Down Expand Up @@ -150,12 +149,6 @@ export class Job<
*/
parent?: ParentKeys;

/**
* Debounce identifier.
* @deprecated use deduplicationId
*/
debounceId?: string;

/**
* Deduplication identifier.
*/
Expand Down Expand Up @@ -225,10 +218,9 @@ export class Job<
? { id: opts.parent.id, queueKey: opts.parent.queue }
: undefined;

this.debounceId = opts.debounce ? opts.debounce.id : undefined;
this.deduplicationId = opts.deduplication
? opts.deduplication.id
: this.debounceId;
: undefined;

this.toKey = queue.toKey.bind(queue);
this.setScripts();
Expand Down Expand Up @@ -354,7 +346,6 @@ export class Job<
}

if (json.deid) {
job.debounceId = json.deid;
job.deduplicationId = json.deid;
}

Expand Down Expand Up @@ -481,7 +472,6 @@ export class Job<
timestamp: this.timestamp,
failedReason: JSON.stringify(this.failedReason),
stacktrace: JSON.stringify(this.stacktrace),
debounceId: this.debounceId,
deduplicationId: this.deduplicationId,
repeatJobKey: this.repeatJobKey,
returnvalue: JSON.stringify(this.returnvalue),
Expand Down
8 changes: 0 additions & 8 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ export interface QueueEventsListener extends IoredisListener {
id: string,
) => void;

/**
* Listen to 'debounced' event.
* @deprecated use deduplicated event
*
* This event is triggered when a job is debounced because debounceId still existed.
*/
debounced: (args: { jobId: string; debounceId: string }, id: string) => void;

/**
* Listen to 'deduplicated' event.
*
Expand Down
12 changes: 0 additions & 12 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,6 @@ export class QueueGetters<JobBase extends Job = Job> extends QueueBase {
return this.scripts.getRateLimitTtl(maxJobs);
}

/**
* Get jobId that starts debounced state.
* @deprecated use getDeduplicationJobId method
*
* @param id - debounce identifier
*/
async getDebounceJobId(id: string): Promise<string | null> {
const client = await this.client;

return client.get(`${this.keys.de}:${id}`);
}

/**
* Get jobId from deduplicated state.
*
Expand Down
114 changes: 55 additions & 59 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,47 +310,66 @@ export class Queue<
opts = { ...opts, telemetryMetadata: srcPropagationMedatada };
}

if (opts && opts.repeat) {
if (opts.repeat.endDate) {
if (+new Date(opts.repeat.endDate) < Date.now()) {
throw new Error(
'End date must be greater than current timestamp',
);
}
}
const job = await this.addJob(name, data, opts);

return (await this.repeat).updateRepeatableJob<
DataType,
ResultType,
NameType
>(name, data, { ...this.jobsOpts, ...opts }, { override: true });
} else {
const jobId = opts?.jobId;
span?.setAttributes({
[TelemetryAttributes.JobName]: name,
[TelemetryAttributes.JobId]: job.id,
});

if (jobId == '0' || jobId?.includes(':')) {
throw new Error("JobId cannot be '0' or contain :");
}
return job;
},
);
}

const job = await this.Job.create<DataType, ResultType, NameType>(
this as MinimalQueue,
name,
data,
{
...this.jobsOpts,
...opts,
jobId,
},
);
this.emit('waiting', job as JobBase<DataType, ResultType, NameType>);
/**
* addJob is a telemetry free version of the add method, useful in order to wrap it
* with custom telemetry on subclasses.
*
* @param name
* @param data
* @param opts
*
* @returns Job
*/
protected async addJob(
name: NameType,
data: DataType,
opts?: JobsOptions,
): Promise<Job<DataType, ResultType, NameType>> {
if (opts && opts.repeat) {
if (opts.repeat.endDate) {
if (+new Date(opts.repeat.endDate) < Date.now()) {
throw new Error('End date must be greater than current timestamp');
}
}

span?.setAttributes({
[TelemetryAttributes.JobId]: job.id,
});
return (await this.repeat).updateRepeatableJob<
DataType,
ResultType,
NameType
>(name, data, { ...this.jobsOpts, ...opts }, { override: true });
} else {
const jobId = opts?.jobId;

return job;
}
},
);
if (jobId == '0' || jobId?.includes(':')) {
throw new Error("JobId cannot be '0' or contain :");
}

const job = await this.Job.create<DataType, ResultType, NameType>(
this as MinimalQueue,
name,
data,
{
...this.jobsOpts,
...opts,
jobId,
},
);
this.emit('waiting', job as JobBase<DataType, ResultType, NameType>);

return job;
}
}

/**
Expand Down Expand Up @@ -624,29 +643,6 @@ export class Queue<
return !removed;
}

/**
* Removes a debounce key.
* @deprecated use removeDeduplicationKey
*
* @param id - identifier
*/
async removeDebounceKey(id: string): Promise<number> {
return this.trace<number>(
SpanKind.INTERNAL,
'removeDebounceKey',
`${this.name}`,
async span => {
span?.setAttributes({
[TelemetryAttributes.JobKey]: id,
});

const client = await this.client;

return await client.del(`${this.keys.de}:${id}`);
},
);
}

/**
* Removes a deduplication key.
*
Expand Down
2 changes: 1 addition & 1 deletion src/commands/addJobScheduler-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ if prevMillis ~= false then

if rcall("ZSCORE", delayedKey, delayedJobId) ~= false
and rcall("EXISTS", nextDelayedJobId) ~= 1 then
removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]])
removeJob(delayedJobId, true, prefixKey, true --[[remove deduplication key]])
rcall("ZREM", delayedKey, delayedJobId)
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/commands/addRepeatableJob-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ if prevMillis ~= false then

if rcall("ZSCORE", delayedKey, delayedJobId) ~= false
and rcall("EXISTS", nextDelayedJobId) ~= 1 then
removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]])
removeJob(delayedJobId, true, prefixKey, true --[[remove deduplication key]])
rcall("ZREM", delayedKey, delayedJobId)
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/cleanList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd,
-- replace the entry with a deletion marker; the actual deletion will
-- occur at the end of the script
rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
removeJob(job, true, jobKeyPrefix, true --[[remove deduplication key]])
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
Expand Down
4 changes: 2 additions & 2 deletions src/commands/includes/cleanSet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ local function cleanSet(
if not isJobSchedulerJob(job, jobSchedulersKey) then
local jobKey = jobKeyPrefix .. job
if isFinished then
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] )
removeJob(job, true, jobKeyPrefix, true --[[remove deduplication key]] )
deletedCount = deletedCount + 1
table.insert(deleted, job)
else
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS <= timestamp) then
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] )
removeJob(job, true, jobKeyPrefix, true --[[remove deduplication key]] )
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
Expand Down
10 changes: 4 additions & 6 deletions src/commands/includes/deduplicateJob.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
--[[
Function to debounce a job.
Function to deduplicate a job.
]]

local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplicationKey, eventsKey, maxEvents)
Expand All @@ -13,12 +13,10 @@ local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplication
deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
end
if deduplicationKeyExists then
local currentDebounceJobId = rcall('GET', deduplicationKey)
local currentDeduplicationJobId = rcall('GET', deduplicationKey)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
"debounced", "jobId", currentDebounceJobId, "debounceId", deduplicationId)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
"deduplicated", "jobId", currentDebounceJobId, "deduplicationId", deduplicationId)
return currentDebounceJobId
"deduplicated", "jobId", currentDeduplicationJobId, "deduplicationId", deduplicationId)
return currentDeduplicationJobId
end
end
end
2 changes: 1 addition & 1 deletion src/commands/includes/removeJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

local function removeJobs(keys, hard, baseKey, max)
for i, key in ipairs(keys) do
removeJob(key, hard, baseKey, true --[[remove debounce key]])
removeJob(key, hard, baseKey, true --[[remove deduplication key]])
end
return max - #keys
end
5 changes: 2 additions & 3 deletions src/commands/includes/removeJobsByMaxAge.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
-- Includes
--- @include "removeJob"

local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix,
shouldRemoveDebounceKey)
local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix)
local start = timestamp - maxAge * 1000
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf")
for i, jobId in ipairs(jobIds) do
removeJob(jobId, false, prefix, false --[[remove debounce key]])
removeJob(jobId, false, prefix, false --[[remove deduplication key]])
end
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start)
end
2 changes: 1 addition & 1 deletion src/commands/includes/removeJobsByMaxCount.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ local function removeJobsByMaxCount(maxCount, targetSet, prefix)
local start = maxCount
local jobIds = rcall("ZREVRANGE", targetSet, start, -1)
for i, jobId in ipairs(jobIds) do
removeJob(jobId, false, prefix, false --[[remove debounce key]])
removeJob(jobId, false, prefix, false --[[remove deduplication key]])
end
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1))
end
6 changes: 3 additions & 3 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ local function moveParentToWait(parentPrefix, parentId, emitEvent)
end
end

local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, deduplicationId)
if parentKey then
local parentDependenciesKey = parentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
Expand All @@ -37,8 +37,8 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debou
if parentPrefix == baseKey then
removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
removeJobKeys(parentKey)
if debounceId then
rcall("DEL", parentPrefix .. "de:" .. debounceId)
if deduplicationId then
rcall("DEL", parentPrefix .. "de:" .. deduplicationId)
end
else
moveParentToWait(parentPrefix, parentId)
Expand Down
6 changes: 3 additions & 3 deletions src/commands/includes/storeJob.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
local jsonOpts = cjson.encode(opts)
local delay = opts['delay'] or 0
local priority = opts['priority'] or 0
local debounceId = opts['de'] and opts['de']['id']
local deduplicationId = opts['de'] and opts['de']['id']

local optionalValues = {}
if parentKey ~= nil then
Expand All @@ -21,9 +21,9 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
table.insert(optionalValues, repeatJobKey)
end

if debounceId then
if deduplicationId then
table.insert(optionalValues, "deid")
table.insert(optionalValues, debounceId)
table.insert(optionalValues, deduplicationId)
end

rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
Expand Down
2 changes: 1 addition & 1 deletion src/commands/moveStalledJobsToWait-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ if (#stalling > 0) then
elseif removeOnFailType == "boolean" then
if opts["removeOnFail"] then
removeJob(jobId, false, queueKeyPrefix,
false --[[remove debounce key]])
false --[[remove deduplication key]])
rcall("ZREM", failedKey, jobId)
end
elseif removeOnFailType ~= "nil" then
Expand Down
Loading

0 comments on commit 10bf0fb

Please sign in to comment.