From a175b557aff6ddd034e23772f040b5f870f225b0 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 20 Jul 2024 00:21:03 -0500 Subject: [PATCH 01/12] feat(job): allow passing a debounceId as option --- src/classes/job.ts | 5 +++ src/classes/queue-events.ts | 7 +++++ src/commands/addDelayedJob-6.lua | 18 ++++++++++- src/types/job-options.ts | 10 ++++++ tests/test_events.ts | 54 ++++++++++++++++++++++++++++++++ 5 files changed, 93 insertions(+), 1 deletion(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index d13b895059..b13625dac0 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -38,6 +38,7 @@ import type { QueueEvents } from './queue-events'; const logger = debuglog('bull'); const optsDecodeMap = { + deid: 'debounceId', fpof: 'failParentOnFailure', idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', @@ -1197,6 +1198,10 @@ export class Job< throw new Error(`Delay and repeat options could not be used together`); } + if (this.opts.debounceId && !this.opts.delay) { + throw new Error(`DebounceId and delay options must be used together`); + } + if (this.opts.removeDependencyOnFailure && this.opts.failParentOnFailure) { throw new Error( `RemoveDependencyOnFailure and failParentOnFailure options can not be used together`, diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index 44dae24740..27201c26ca 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -45,6 +45,13 @@ export interface QueueEventsListener extends IoredisListener { id: string, ) => void; + /** + * Listen to 'debounced' event. + * + * This event is triggered when a job is debounced because debouncedId still existed. + */ + debounced: (args: { jobId: string }, id: string) => void; + /** * Listen to 'delayed' event. * diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index 4a42362534..0c9e1e0362 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -25,6 +25,7 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key + [10] debounce id ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -49,8 +50,8 @@ local args = cmsgpack.unpack(ARGV[1]) local data = ARGV[2] local parentKey = args[5] -local repeatJobKey = args[9] local parent = args[8] +local repeatJobKey = args[9] local parentData -- Includes @@ -73,6 +74,7 @@ local opts = cmsgpack.unpack(ARGV[3]) local parentDependenciesKey = args[7] local timestamp = args[4] + if args[2] == "" then jobId = jobCounter jobIdKey = args[1] .. jobId @@ -86,6 +88,20 @@ else end end +local debounceId = opts['deid'] + +if debounceId then + local currentDebounceJobId = rcall('SET', + args[1] .. "debounce:" .. debounceId, jobId, 'PX', + opts['delay'] or 0, 'NX', 'GET') + if currentDebounceJobId then + rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", + "debounced", "jobId", currentDebounceJobId) + + return args[1] .. currentDebounceJobId + end +end + -- Store the job. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 51d5416312..5af7acd6dc 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -1,6 +1,11 @@ import { BaseJobOptions } from '../interfaces'; export type JobsOptions = BaseJobOptions & { + /** + * Debounce identifier. + */ + debounceId?: string; + /** * If true, moves parent to failed. */ @@ -21,6 +26,11 @@ export type JobsOptions = BaseJobOptions & { * These fields are the ones stored in Redis with smaller keys for compactness. */ export type RedisJobOptions = BaseJobOptions & { + /** + * Debounce identifier. + */ + deid?: string; + /** * If true, moves parent to failed. */ diff --git a/tests/test_events.ts b/tests/test_events.ts index 69477a7a07..e6ff7fd8a5 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -398,6 +398,60 @@ describe('events', function () { }); }); + describe('when job is debounced when added again with same debounceId', function () { + it('emits debounced event', async function () { + const testName = 'test'; + + const job = await queue.add( + testName, + { foo: 'bar' }, + { debounceId: 'a1', delay: 2000 }, + ); + + let debouncedCounter = 0; + let secondJob; + queueEvents.on('debounced', ({ jobId }) => { + if (debouncedCounter > 1) { + expect(jobId).to.be.equal(secondJob.id); + } else { + expect(jobId).to.be.equal(job.id); + } + debouncedCounter++; + }); + + await delay(1000); + await queue.add( + testName, + { foo: 'bar' }, + { debounceId: 'a1', delay: 2000 }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debounceId: 'a1', delay: 2000 }, + ); + await delay(1100); + secondJob = await queue.add( + testName, + { foo: 'bar' }, + { debounceId: 'a1', delay: 2000 }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debounceId: 'a1', delay: 2000 }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debounceId: 'a1', delay: 2000 }, + ); + await delay(100); + + expect(debouncedCounter).to.be.equal(4); + }); + }); + it('should emit an event when a job becomes active', async () => { const worker = new Worker(queueName, async job => {}, { connection, From d9c0abf1f0e1a02f34bfdb700ae0409a3309f9ef Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 20 Jul 2024 09:50:13 -0500 Subject: [PATCH 02/12] fix: separate get flag in set call bc not supported in redis v6 --- src/commands/addDelayedJob-6.lua | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index 0c9e1e0362..71c0ca9508 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -91,13 +91,13 @@ end local debounceId = opts['deid'] if debounceId then - local currentDebounceJobId = rcall('SET', - args[1] .. "debounce:" .. debounceId, jobId, 'PX', - opts['delay'] or 0, 'NX', 'GET') - if currentDebounceJobId then + local debounceKey = args[1] .. "debounce:" .. debounceId + local isFirstSet = rcall('SET', debounceKey, jobId, 'PX', + opts['delay'] or 0, 'NX') + if not isFirstSet then + local currentDebounceJobId = rcall('GET', debounceKey) rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId", currentDebounceJobId) - return args[1] .. currentDebounceJobId end end From 87399a96e688a4ef30f5877d8a28c0125f435427 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sun, 21 Jul 2024 21:08:55 -0500 Subject: [PATCH 03/12] docs(jobs): add debouncing section --- docs/gitbook/SUMMARY.md | 1 + docs/gitbook/guide/jobs/debouncing.md | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 docs/gitbook/guide/jobs/debouncing.md diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 012b930218..fa3416a74c 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -30,6 +30,7 @@ * [LIFO](guide/jobs/lifo.md) * [Job Ids](guide/jobs/job-ids.md) * [Job Data](guide/jobs/job-data.md) + * [Debouncing](guide/jobs/debouncing.md) * [Delayed](guide/jobs/delayed.md) * [Repeatable](guide/jobs/repeatable.md) * [Prioritized](guide/jobs/prioritized.md) diff --git a/docs/gitbook/guide/jobs/debouncing.md b/docs/gitbook/guide/jobs/debouncing.md new file mode 100644 index 0000000000..f1bf496a4c --- /dev/null +++ b/docs/gitbook/guide/jobs/debouncing.md @@ -0,0 +1,24 @@ +# Debouncing + +Debouncing a job implies delaying and deduplicating it. + +```typescript +import { Queue } from 'bullmq'; + +const myQueue = new Queue('Paint'); + +// Add a job that will be debounced for 5 seconds. +await myQueue.add('house', { color: 'white' }, { debouncedId: 'customValue', delay: 5000 }); +``` + +For the next 5 seconds, after adding this job, next jobs added with same **debouncedId** will be ignored and a _debounced_ event will be triggered by our QueueEvent class. + +Note that apart of passing a delay value, you must provide a debouncedId that should represent your job. You can hash your entire job data or a subset of attributes for creating a debouncedId. + +{% hint style="warning" %} +Passing `debouncedId` without a `delay` value greater than 0 will throw an Error. +{% endhint %} + +## Read more: + +* 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add) From d916f471c2046bcbfcff23944d003562e465f482 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 23 Jul 2024 00:32:48 -0500 Subject: [PATCH 04/12] feat: extend debounce logic --- docs/gitbook/guide/jobs/debouncing.md | 29 +++- src/classes/job.ts | 6 +- src/classes/queue.ts | 26 ++- src/commands/addDelayedJob-6.lua | 17 +- src/commands/addParentJob-4.lua | 13 +- src/commands/addPrioritizedJob-8.lua | 13 +- src/commands/addStandardJob-8.lua | 13 +- src/commands/includes/debounceJob.lua | 20 +++ .../moveParentFromWaitingChildrenToFailed.lua | 9 +- .../includes/removeDebounceKeyIfNeeded.lua | 15 ++ src/commands/includes/storeJob.lua | 7 +- src/commands/moveStalledJobsToWait-9.lua | 5 +- src/commands/moveToFinished-14.lua | 23 +-- src/interfaces/debouncing-options.ts | 14 ++ src/interfaces/index.ts | 1 + src/types/job-options.ts | 6 +- tests/test_events.ts | 158 +++++++++++++----- 17 files changed, 278 insertions(+), 97 deletions(-) create mode 100644 src/commands/includes/debounceJob.lua create mode 100644 src/commands/includes/removeDebounceKeyIfNeeded.lua create mode 100644 src/interfaces/debouncing-options.ts diff --git a/docs/gitbook/guide/jobs/debouncing.md b/docs/gitbook/guide/jobs/debouncing.md index f1bf496a4c..e2a21012ab 100644 --- a/docs/gitbook/guide/jobs/debouncing.md +++ b/docs/gitbook/guide/jobs/debouncing.md @@ -2,23 +2,38 @@ Debouncing a job implies delaying and deduplicating it. +## Fixed Mode + ```typescript import { Queue } from 'bullmq'; const myQueue = new Queue('Paint'); // Add a job that will be debounced for 5 seconds. -await myQueue.add('house', { color: 'white' }, { debouncedId: 'customValue', delay: 5000 }); +await myQueue.add( + 'house', + { color: 'white' }, + { debouncing: { id: 'customValue', ttl: 5000 } }, +); ``` -For the next 5 seconds, after adding this job, next jobs added with same **debouncedId** will be ignored and a _debounced_ event will be triggered by our QueueEvent class. +For the next 5 seconds, after adding this job, next jobs added with same **debounce id** will be ignored and a _debounced_ event will be triggered by our QueueEvent class. + +Note that you must provide a debounce id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier. + +## Extended Mode -Note that apart of passing a delay value, you must provide a debouncedId that should represent your job. You can hash your entire job data or a subset of attributes for creating a debouncedId. +```typescript +// Add a job that will be debounced as this record is not finished (completed or failed). +await myQueue.add( + 'house', + { color: 'white' }, + { debouncing: { id: 'customValue' } }, +); +``` -{% hint style="warning" %} -Passing `debouncedId` without a `delay` value greater than 0 will throw an Error. -{% endhint %} +While this job is not moved to completed or failed state, next jobs added with same **debounce id** will be ignored and a _debounced_ event will be triggered by our QueueEvent class. ## Read more: -* 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add) +- 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add) diff --git a/src/classes/job.ts b/src/classes/job.ts index b13625dac0..83b3402495 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -38,7 +38,7 @@ import type { QueueEvents } from './queue-events'; const logger = debuglog('bull'); const optsDecodeMap = { - deid: 'debounceId', + debo: 'debouncing', fpof: 'failParentOnFailure', idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', @@ -1198,10 +1198,6 @@ export class Job< throw new Error(`Delay and repeat options could not be used together`); } - if (this.opts.debounceId && !this.opts.delay) { - throw new Error(`DebounceId and delay options must be used together`); - } - if (this.opts.removeDependencyOnFailure && this.opts.failParentOnFailure) { throw new Error( `RemoveDependencyOnFailure and failParentOnFailure options can not be used together`, diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 834aa53eab..f61349d463 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -187,10 +187,10 @@ export class Queue< * Get global concurrency value. * Returns null in case no value is set. */ - async getGlobalConcurrency():Promise { + async getGlobalConcurrency(): Promise { const client = await this.client; const concurrency = await client.hget(this.keys.meta, 'concurrency'); - if(concurrency){ + if (concurrency) { return Number(concurrency); } return null; @@ -203,12 +203,11 @@ export class Queue< * is processed at any given time. If this limit is not defined, there will be no * restriction on the number of concurrent jobs. */ - async setGlobalConcurrency(concurrency: number) { - const client = await this.client; - return client.hset(this.keys.meta, 'concurrency', concurrency); - } - - + async setGlobalConcurrency(concurrency: number) { + const client = await this.client; + return client.hset(this.keys.meta, 'concurrency', concurrency); + } + /** * Adds a new job to the queue. * @@ -374,6 +373,17 @@ export class Queue< return !removed; } + /** + * Removes a debounce key. + * + * @param id - identifier + */ + async removeDebounceKey(id: string): Promise { + const client = await this.client; + + return client.del(this.toKey('debounce:') + id); + } + /** * Removes a repeatable job by its key. Note that the key is the one used * to store the repeatable job metadata and not one of the job iterations diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index 71c0ca9508..bdff4d2aa7 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -56,6 +56,7 @@ local parentData -- Includes --- @include "includes/addDelayMarkerIfNeeded" +--- @include "includes/debounceJob" --- @include "includes/getDelayedScore" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" @@ -88,24 +89,20 @@ else end end -local debounceId = opts['deid'] +local debounceId = opts['debo'] and opts['debo']['id'] if debounceId then - local debounceKey = args[1] .. "debounce:" .. debounceId - local isFirstSet = rcall('SET', debounceKey, jobId, 'PX', - opts['delay'] or 0, 'NX') - if not isFirstSet then - local currentDebounceJobId = rcall('GET', debounceKey) - rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", - "debounced", "jobId", currentDebounceJobId) - return args[1] .. currentDebounceJobId + local debouncedJobKey = debounceJob(args[1], debounceId, opts['debo']['ttl'], + jobId, eventsKey, maxEvents) + if debouncedJobKey then + return debouncedJobKey end end -- Store the job. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, - repeatJobKey) + repeatJobKey, debounceId) local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay)) diff --git a/src/commands/addParentJob-4.lua b/src/commands/addParentJob-4.lua index 4dc0c87ae9..9d2d054f9f 100644 --- a/src/commands/addParentJob-4.lua +++ b/src/commands/addParentJob-4.lua @@ -49,6 +49,7 @@ local parent = args[8] local parentData -- Includes +--- @include "includes/debounceJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" --- @include "includes/storeJob" @@ -78,9 +79,19 @@ else end end +local debounceId = opts['debo'] and opts['debo']['id'] + +if debounceId then + local debouncedJobKey = debounceJob(args[1], debounceId, opts['debo']['ttl'], + jobId, eventsKey, maxEvents) + if debouncedJobKey then + return debouncedJobKey + end +end + -- Store the job. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, - parentKey, parentData, repeatJobKey) + parentKey, parentData, repeatJobKey, debounceId) local waitChildrenKey = args[6] rcall("ZADD", waitChildrenKey, timestamp, jobId) diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index 53b6410c97..4001140452 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -57,6 +57,7 @@ local parentData -- Includes --- @include "includes/addJobWithPriority" +--- @include "includes/debounceJob" --- @include "includes/storeJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" @@ -87,10 +88,20 @@ else end end +local debounceId = opts['debo'] and opts['debo']['id'] + +if debounceId then + local debouncedJobKey = debounceJob(args[1], debounceId, opts['debo']['ttl'], + jobId, eventsKey, maxEvents) + if debouncedJobKey then + return debouncedJobKey + end +end + -- Store the job. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, - repeatJobKey) + repeatJobKey, debounceId) -- Add the job to the prioritized set local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey) diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index ed07744abe..a8f8afa23d 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -60,6 +60,7 @@ local parentData -- Includes --- @include "includes/addJobInTargetList" +--- @include "includes/debounceJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/getTargetQueueList" --- @include "includes/handleDuplicatedJob" @@ -91,9 +92,19 @@ else end end +local debounceId = opts['debo'] and opts['debo']['id'] + +if debounceId then + local debouncedJobId = debounceJob(args[1], debounceId, opts['debo']['ttl'], + jobId, eventsKey, maxEvents) + if debouncedJobId then + return debouncedJobId + end +end + -- Store the job. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, - parentKey, parentData, repeatJobKey) + parentKey, parentData, repeatJobKey, debounceId) local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2]) diff --git a/src/commands/includes/debounceJob.lua b/src/commands/includes/debounceJob.lua new file mode 100644 index 0000000000..09e09f65bd --- /dev/null +++ b/src/commands/includes/debounceJob.lua @@ -0,0 +1,20 @@ +--[[ + Function to debounce a job. +]] + +local function debounceJob(prefixKey, debounceId, ttl, jobId, eventsKey, maxEvents) + local debounceKey = prefixKey .. "debounce:" .. debounceId + local isFirstSet + if ttl then + isFirstSet = rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX') + else + isFirstSet = rcall('SET', debounceKey, jobId, 'NX') + end + if not isFirstSet then + local currentDebounceJobId = rcall('GET', debounceKey) + rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", + "debounced", "jobId", currentDebounceJobId) + return currentDebounceJobId + end +end + \ No newline at end of file diff --git a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua index bd6a148a6a..da10722d7c 100644 --- a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua +++ b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua @@ -4,6 +4,7 @@ -- Includes --- @include "moveParentToWaitIfNeeded" +--- @include "removeDebounceKeyIfNeeded" local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentId, jobIdKey, timestamp) if rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) == 1 then @@ -13,10 +14,12 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, rcall("XADD", parentQueueKey .. ":events", "*", "event", "failed", "jobId", parentId, "failedReason", failedReason, "prev", "waiting-children") - local rawParentData = rcall("HGET", parentKey, "parent") + local jobAttributes = rcall("HMGET", parentKey, "parent", "deid") - if rawParentData ~= false then - local parentData = cjson.decode(rawParentData) + removeDebounceKeyIfNeeded(parentQueueKey, jobAttributes[2]) + + if jobAttributes[1] then + local parentData = cjson.decode(jobAttributes[1]) if parentData['fpof'] then moveParentFromWaitingChildrenToFailed( parentData['queueKey'], diff --git a/src/commands/includes/removeDebounceKeyIfNeeded.lua b/src/commands/includes/removeDebounceKeyIfNeeded.lua new file mode 100644 index 0000000000..8044452168 --- /dev/null +++ b/src/commands/includes/removeDebounceKeyIfNeeded.lua @@ -0,0 +1,15 @@ +--[[ + Function to remove debounce key if needed. +]] + +local function removeDebounceKeyIfNeeded(prefixKey, debounceId) + if debounceId then + local debounceKey = prefixKey .. "debounce:" .. debounceId + local pttl = rcall("PTTL", debounceKey) + + if pttl == 0 or pttl == -1 then + rcall("DEL", debounceKey) + end + end +end + \ No newline at end of file diff --git a/src/commands/includes/storeJob.lua b/src/commands/includes/storeJob.lua index f9c4728052..7a52785263 100644 --- a/src/commands/includes/storeJob.lua +++ b/src/commands/includes/storeJob.lua @@ -2,7 +2,7 @@ Function to store a job ]] local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, - parentKey, parentData, repeatJobKey) + parentKey, parentData, repeatJobKey, debounceId) local jsonOpts = cjson.encode(opts) local delay = opts['delay'] or 0 local priority = opts['priority'] or 0 @@ -20,6 +20,11 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, table.insert(optionalValues, repeatJobKey) end + if debounceId ~= nil then + table.insert(optionalValues, "deid") + table.insert(optionalValues, debounceId) + end + rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts, "timestamp", timestamp, "delay", delay, "priority", priority, unpack(optionalValues)) diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 9660e77fcf..f4aa35fc9a 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -29,6 +29,7 @@ local rcall = redis.call --- @include "includes/getTargetQueueList" --- @include "includes/moveParentFromWaitingChildrenToFailed" --- @include "includes/moveParentToWaitIfNeeded" +--- @include "includes/removeDebounceKeyIfNeeded" --- @include "includes/removeJob" --- @include "includes/removeJobsByMaxAge" --- @include "includes/removeJobsByMaxCount" @@ -83,12 +84,14 @@ if (#stalling > 0) then local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1) if (stalledCount > MAX_STALLED_JOB_COUNT) then - local jobAttributes = rcall("HMGET", jobKey, "opts", "parent") + local jobAttributes = rcall("HMGET", jobKey, "opts", "parent", "deid") local rawOpts = jobAttributes[1] local rawParentData = jobAttributes[2] local opts = cjson.decode(rawOpts) local removeOnFailType = type(opts["removeOnFail"]) rcall("ZADD", failedKey, timestamp, jobId) + removeDebounceKeyIfNeeded(queueKeyPrefix, jobAttributes[3]) + local failedReason = "job stalled more than allowable limit" rcall("HMSET", jobKey, "failedReason", failedReason, diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-14.lua index 2edbe03835..af2dd4f624 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-14.lua @@ -65,6 +65,7 @@ local rcall = redis.call --- @include "includes/moveParentToWaitIfNeeded" --- @include "includes/prepareJobForProcessing" --- @include "includes/promoteDelayedJobs" +--- @include "includes/removeDebounceKeyIfNeeded" --- @include "includes/removeJobKeys" --- @include "includes/removeJobsByMaxAge" --- @include "includes/removeJobsByMaxCount" @@ -93,12 +94,12 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists return -4 end - local parentReferences = rcall("HMGET", jobIdKey, "parentKey", "parent") - local parentKey = parentReferences[1] or "" + local jobAttributes = rcall("HMGET", jobIdKey, "parentKey", "parent", "deid") + local parentKey = jobAttributes[1] or "" local parentId = "" local parentQueueKey = "" - if parentReferences[2] ~= false then - local jsonDecodedParent = cjson.decode(parentReferences[2]) + if jobAttributes[2] ~= false then + local jsonDecodedParent = cjson.decode(jobAttributes[2]) parentId = jsonDecodedParent['id'] parentQueueKey = jsonDecodedParent['queueKey'] end @@ -116,6 +117,10 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- Trim events before emiting them to avoid trimming events emitted in this script trimEvents(metaKey, eventStreamKey) + local prefix = ARGV[7] + + removeDebounceKeyIfNeeded(prefix, jobAttributes[3]) + -- If job has a parent we need to -- 1) remove this job id from parents dependencies -- 2) move the job Id to parent "processed" set @@ -164,8 +169,6 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- "returnvalue" / "failedReason" and "finishedOn" -- Remove old jobs? - local prefix = ARGV[7] - if maxAge ~= nil then removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix) end @@ -205,7 +208,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8]) -- Check if there are delayed jobs that can be promoted - promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, ARGV[7], + promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, prefix, timestamp, KEYS[10], isPausedOrMaxed) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) @@ -229,19 +232,19 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists if jobId == "0:0" then jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) - return prepareJobForProcessing(ARGV[7], KEYS[6], eventStreamKey, jobId, + return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, timestamp, maxJobs, opts) end else - return prepareJobForProcessing(ARGV[7], KEYS[6], eventStreamKey, jobId, + return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, timestamp, maxJobs, opts) end else jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) if jobId then - return prepareJobForProcessing(ARGV[7], KEYS[6], eventStreamKey, jobId, + return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, timestamp, maxJobs, opts) end diff --git a/src/interfaces/debouncing-options.ts b/src/interfaces/debouncing-options.ts new file mode 100644 index 0000000000..accbf960da --- /dev/null +++ b/src/interfaces/debouncing-options.ts @@ -0,0 +1,14 @@ +/** + * Debouncing options + */ +export interface DebouncingOptions { + /** + * ttl in milliseconds + */ + ttl?: number; + + /** + * Identifier + */ + id: string; +} diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index 533fa81b31..4839fc4fd4 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -3,6 +3,7 @@ export * from './backoff-options'; export * from './base-job-options'; export * from './child-message'; export * from './connection'; +export * from './debouncing-options'; export * from './flow-job'; export * from './ioredis-events'; export * from './job-json'; diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 5af7acd6dc..1c4dc95c88 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -1,10 +1,10 @@ -import { BaseJobOptions } from '../interfaces'; +import { BaseJobOptions, DebouncingOptions } from '../interfaces'; export type JobsOptions = BaseJobOptions & { /** - * Debounce identifier. + * Debouncing options. */ - debounceId?: string; + debouncing?: DebouncingOptions; /** * If true, moves parent to failed. diff --git a/tests/test_events.ts b/tests/test_events.ts index e6ff7fd8a5..d1a00c7f54 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -398,57 +398,123 @@ describe('events', function () { }); }); - describe('when job is debounced when added again with same debounceId', function () { - it('emits debounced event', async function () { - const testName = 'test'; + describe('when job is debounced when added again with same debounce id', function () { + describe('when ttl is provided', function () { + it('used a fixed time period and emits debounced event', async function () { + const testName = 'test'; + + const job = await queue.add( + testName, + { foo: 'bar' }, + { debouncing: { id: 'a1', ttl: 2000 } }, + ); - const job = await queue.add( - testName, - { foo: 'bar' }, - { debounceId: 'a1', delay: 2000 }, - ); + let debouncedCounter = 0; + let secondJob; + queueEvents.on('debounced', ({ jobId }) => { + if (debouncedCounter > 1) { + expect(jobId).to.be.equal(secondJob.id); + } else { + expect(jobId).to.be.equal(job.id); + } + debouncedCounter++; + }); - let debouncedCounter = 0; - let secondJob; - queueEvents.on('debounced', ({ jobId }) => { - if (debouncedCounter > 1) { - expect(jobId).to.be.equal(secondJob.id); - } else { - expect(jobId).to.be.equal(job.id); - } - debouncedCounter++; + await delay(1000); + await queue.add( + testName, + { foo: 'bar' }, + { debouncing: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debouncing: { id: 'a1', ttl: 2000 } }, + ); + await delay(1100); + secondJob = await queue.add( + testName, + { foo: 'bar' }, + { debouncing: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debouncing: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debouncing: { id: 'a1', ttl: 2000 } }, + ); + await delay(100); + + expect(debouncedCounter).to.be.equal(4); }); + }); - await delay(1000); - await queue.add( - testName, - { foo: 'bar' }, - { debounceId: 'a1', delay: 2000 }, - ); - await queue.add( - testName, - { foo: 'bar' }, - { debounceId: 'a1', delay: 2000 }, - ); - await delay(1100); - secondJob = await queue.add( - testName, - { foo: 'bar' }, - { debounceId: 'a1', delay: 2000 }, - ); - await queue.add( - testName, - { foo: 'bar' }, - { debounceId: 'a1', delay: 2000 }, - ); - await queue.add( - testName, - { foo: 'bar' }, - { debounceId: 'a1', delay: 2000 }, - ); - await delay(100); + describe('when ttl is not provided', function () { + it('waits until job is finished before removing debounce key', async function () { + const testName = 'test'; + + const worker = new Worker( + queueName, + async () => { + await delay(100); + await queue.add( + testName, + { foo: 'bar' }, + { debouncing: { id: 'a1' } }, + ); + await delay(100); + await queue.add( + testName, + { foo: 'bar' }, + { debouncing: { id: 'a1' } }, + ); + await delay(100); + }, + { + autorun: false, + connection, + prefix, + }, + ); + await worker.waitUntilReady(); + + let debouncedCounter = 0; - expect(debouncedCounter).to.be.equal(4); + const completing = new Promise(resolve => { + queueEvents.once('completed', ({ jobId }) => { + expect(jobId).to.be.equal('1'); + resolve(); + }); + + queueEvents.on('debounced', ({ jobId }) => { + debouncedCounter++; + }); + }); + + worker.run(); + + await queue.add(testName, { foo: 'bar' }, { debouncing: { id: 'a1' } }); + + await completing; + + const secondJob = await queue.add( + testName, + { foo: 'bar' }, + { debouncing: { id: 'a1' } }, + ); + + const count = await queue.getJobCountByTypes(); + + expect(count).to.be.eql(2); + + expect(debouncedCounter).to.be.equal(2); + expect(secondJob.id).to.be.equal('4'); + await worker.close(); + }); }); }); From dc3837adc72e0aafec4e33f1fefa1fdeb56d1e36 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 23 Jul 2024 00:40:34 -0500 Subject: [PATCH 05/12] chore: update job attributes --- src/classes/job.ts | 11 +++++++++++ src/commands/addDelayedJob-6.lua | 1 - src/interfaces/job-json.ts | 1 + 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index 83b3402495..220e7a32a3 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -137,6 +137,11 @@ export class Job< */ parent?: ParentKeys; + /** + * Debounce identifier. + */ + debounceId?: string; + /** * Base repeat job key. */ @@ -200,6 +205,8 @@ export class Job< ? { id: opts.parent.id, queueKey: opts.parent.queue } : undefined; + this.debounceId = opts.debouncing ? opts.debouncing.id : undefined; + this.toKey = queue.toKey.bind(queue); this.setScripts(); @@ -323,6 +330,10 @@ export class Job< job.repeatJobKey = json.rjk; } + if (json.deid) { + job.debounceId = json.deid; + } + job.failedReason = json.failedReason; job.attemptsStarted = parseInt(json.ats || '0'); diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index bdff4d2aa7..55a7b8dbaa 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -25,7 +25,6 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key - [10] debounce id ARGV[2] Json stringified job data ARGV[3] msgpacked options diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index 38d27e6d8a..6ccac834bb 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -37,6 +37,7 @@ export interface JobJsonRaw { returnvalue: string; parentKey?: string; parent?: string; + deid?: string; rjk?: string; atm?: string; ats?: string; From bb163078bcc3c6ec1a78b73d0597e4f55e716b5e Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 23 Jul 2024 08:40:03 -0500 Subject: [PATCH 06/12] refactor: address comments --- docs/gitbook/guide/jobs/debouncing.md | 14 ++++++--- src/classes/job.ts | 4 +-- src/classes/queue-keys.ts | 1 + src/classes/queue.ts | 2 +- src/classes/scripts.ts | 1 + src/commands/addDelayedJob-6.lua | 14 ++++----- src/commands/addParentJob-4.lua | 16 +++++----- src/commands/addPrioritizedJob-8.lua | 20 ++++++------- src/commands/addStandardJob-8.lua | 20 ++++++------- src/commands/includes/debounceJob.lua | 29 ++++++++++--------- src/commands/includes/storeJob.lua | 7 +++-- ...ouncing-options.ts => debounce-options.ts} | 4 +-- src/interfaces/job-json.ts | 1 + src/types/job-options.ts | 6 ++-- tests/test_events.ts | 20 ++++++------- 15 files changed, 84 insertions(+), 75 deletions(-) rename src/interfaces/{debouncing-options.ts => debounce-options.ts} (64%) diff --git a/docs/gitbook/guide/jobs/debouncing.md b/docs/gitbook/guide/jobs/debouncing.md index e2a21012ab..65c2377f90 100644 --- a/docs/gitbook/guide/jobs/debouncing.md +++ b/docs/gitbook/guide/jobs/debouncing.md @@ -1,9 +1,11 @@ # Debouncing -Debouncing a job implies delaying and deduplicating it. +Debouncing in BullMQ is a process where job execution is delayed and deduplicated based on specific identifiers. It ensures that within a specified period, or until a specific job is completed or failed, no new jobs with the same identifier will be added to the queue. Instead, these attempts will trigger a debounced event. ## Fixed Mode +In the Fixed Mode, debouncing works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique debouncer ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization. + ```typescript import { Queue } from 'bullmq'; @@ -13,27 +15,31 @@ const myQueue = new Queue('Paint'); await myQueue.add( 'house', { color: 'white' }, - { debouncing: { id: 'customValue', ttl: 5000 } }, + { debounce: { id: 'customValue', ttl: 5000 } }, ); ``` -For the next 5 seconds, after adding this job, next jobs added with same **debounce id** will be ignored and a _debounced_ event will be triggered by our QueueEvent class. +In this example, after adding the house painting job with the debouncing parameters (id and ttl), any subsequent job with the same debouncing ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job. Note that you must provide a debounce id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier. ## Extended Mode +The Extended Mode takes a different approach by extending the debouncing duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same debouncer ID will be ignored. + ```typescript // Add a job that will be debounced as this record is not finished (completed or failed). await myQueue.add( 'house', { color: 'white' }, - { debouncing: { id: 'customValue' } }, + { debounce: { id: 'customValue' } }, ); ``` While this job is not moved to completed or failed state, next jobs added with same **debounce id** will be ignored and a _debounced_ event will be triggered by our QueueEvent class. +This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress. + ## Read more: - 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add) diff --git a/src/classes/job.ts b/src/classes/job.ts index 220e7a32a3..60a9cffb6c 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -38,7 +38,7 @@ import type { QueueEvents } from './queue-events'; const logger = debuglog('bull'); const optsDecodeMap = { - debo: 'debouncing', + de: 'debounce', fpof: 'failParentOnFailure', idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', @@ -205,7 +205,7 @@ export class Job< ? { id: opts.parent.id, queueKey: opts.parent.queue } : undefined; - this.debounceId = opts.debouncing ? opts.debouncing.id : undefined; + this.debounceId = opts.debounce ? opts.debounce.id : undefined; this.toKey = queue.toKey.bind(queue); this.setScripts(); diff --git a/src/classes/queue-keys.ts b/src/classes/queue-keys.ts index 29224c2bbd..9109ae8dba 100644 --- a/src/classes/queue-keys.ts +++ b/src/classes/queue-keys.ts @@ -24,6 +24,7 @@ export class QueueKeys { 'events', 'pc', // priority counter key 'marker', // marker key + 'de', // debounce key ].forEach(key => { keys[key] = this.toKey(name, key); }); diff --git a/src/classes/queue.ts b/src/classes/queue.ts index f61349d463..0e6b6cded4 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -381,7 +381,7 @@ export class Queue< async removeDebounceKey(id: string): Promise { const client = await this.client; - return client.del(this.toKey('debounce:') + id); + return client.del(`${this.keys.de}:${id}`); } /** diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index ec66c5a366..df3f722c28 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -183,6 +183,7 @@ export class Scripts { parentOpts.parentDependenciesKey || null, parent, job.repeatJobKey, + job.debounceId ? `${queueKeys.de}:${job.debounceId}` : null, ]; let encodedOpts; diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index 55a7b8dbaa..b6c5f5712f 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -25,6 +25,7 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key + [10] debounce key ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -51,6 +52,7 @@ local data = ARGV[2] local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] +local debounceKey = args[10] local parentData -- Includes @@ -90,18 +92,16 @@ end local debounceId = opts['debo'] and opts['debo']['id'] -if debounceId then - local debouncedJobKey = debounceJob(args[1], debounceId, opts['debo']['ttl'], - jobId, eventsKey, maxEvents) - if debouncedJobKey then - return debouncedJobKey - end +local debouncedJobId = debounceJob(args[1], opts['debo'], + jobId, debounceKey, eventsKey, maxEvents) +if debouncedJobId then + return debouncedJobId end -- Store the job. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, - repeatJobKey, debounceId) + repeatJobKey) local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay)) diff --git a/src/commands/addParentJob-4.lua b/src/commands/addParentJob-4.lua index 9d2d054f9f..3c0fd50b44 100644 --- a/src/commands/addParentJob-4.lua +++ b/src/commands/addParentJob-4.lua @@ -20,6 +20,7 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key + [10] debounce key ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -44,8 +45,9 @@ local data = ARGV[2] local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] -local repeatJobKey = args[9] local parent = args[8] +local repeatJobKey = args[9] +local debounceKey = args[10] local parentData -- Includes @@ -81,17 +83,15 @@ end local debounceId = opts['debo'] and opts['debo']['id'] -if debounceId then - local debouncedJobKey = debounceJob(args[1], debounceId, opts['debo']['ttl'], - jobId, eventsKey, maxEvents) - if debouncedJobKey then - return debouncedJobKey - end +local debouncedJobId = debounceJob(args[1], opts['debo'], + jobId, debounceKey, eventsKey, maxEvents) +if debouncedJobId then + return debouncedJobId end -- Store the job. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, - parentKey, parentData, repeatJobKey, debounceId) + parentKey, parentData, repeatJobKey) local waitChildrenKey = args[6] rcall("ZADD", waitChildrenKey, timestamp, jobId) diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index 4001140452..dc4b679bda 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -24,7 +24,8 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key - + [10] debounce key + ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -51,8 +52,9 @@ local data = ARGV[2] local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] -local repeatJobKey = args[9] local parent = args[8] +local repeatJobKey = args[9] +local debounceKey = args[10] local parentData -- Includes @@ -88,20 +90,16 @@ else end end -local debounceId = opts['debo'] and opts['debo']['id'] - -if debounceId then - local debouncedJobKey = debounceJob(args[1], debounceId, opts['debo']['ttl'], - jobId, eventsKey, maxEvents) - if debouncedJobKey then - return debouncedJobKey - end +local debouncedJobId = debounceJob(args[1], opts['debo'], + jobId, debounceKey, eventsKey, maxEvents) +if debouncedJobId then + return debouncedJobId end -- Store the job. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, - repeatJobKey, debounceId) + repeatJobKey) -- Add the job to the prioritized set local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey) diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index a8f8afa23d..a3a7a792f0 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -34,7 +34,8 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key - + [10] debounce key + ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -54,8 +55,9 @@ local data = ARGV[2] local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] -local repeatJobKey = args[9] local parent = args[8] +local repeatJobKey = args[9] +local debounceKey = args[10] local parentData -- Includes @@ -92,19 +94,15 @@ else end end -local debounceId = opts['debo'] and opts['debo']['id'] - -if debounceId then - local debouncedJobId = debounceJob(args[1], debounceId, opts['debo']['ttl'], - jobId, eventsKey, maxEvents) - if debouncedJobId then - return debouncedJobId - end +local debouncedJobId = debounceJob(args[1], opts['debo'], + jobId, debounceKey, eventsKey, maxEvents) +if debouncedJobId then + return debouncedJobId end -- Store the job. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, - parentKey, parentData, repeatJobKey, debounceId) + parentKey, parentData, repeatJobKey) local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2]) diff --git a/src/commands/includes/debounceJob.lua b/src/commands/includes/debounceJob.lua index 09e09f65bd..534e49c609 100644 --- a/src/commands/includes/debounceJob.lua +++ b/src/commands/includes/debounceJob.lua @@ -2,19 +2,22 @@ Function to debounce a job. ]] -local function debounceJob(prefixKey, debounceId, ttl, jobId, eventsKey, maxEvents) - local debounceKey = prefixKey .. "debounce:" .. debounceId - local isFirstSet - if ttl then - isFirstSet = rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX') - else - isFirstSet = rcall('SET', debounceKey, jobId, 'NX') - end - if not isFirstSet then - local currentDebounceJobId = rcall('GET', debounceKey) - rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", - "debounced", "jobId", currentDebounceJobId) - return currentDebounceJobId +local function debounceJob(prefixKey, debounceOpts, jobId, debounceKey, eventsKey, maxEvents) + local debounceId = debounceOpts and debounceOpts['id'] + if debounceId then + local ttl = debounceOpts['ttl'] + local isFirstSet + if ttl then + isFirstSet = rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX') + else + isFirstSet = rcall('SET', debounceKey, jobId, 'NX') + end + if not isFirstSet then + local currentDebounceJobId = rcall('GET', debounceKey) + rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", + "debounced", "jobId", currentDebounceJobId) + return currentDebounceJobId + end end end \ No newline at end of file diff --git a/src/commands/includes/storeJob.lua b/src/commands/includes/storeJob.lua index 7a52785263..3c798a9c48 100644 --- a/src/commands/includes/storeJob.lua +++ b/src/commands/includes/storeJob.lua @@ -2,11 +2,12 @@ Function to store a job ]] local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, - parentKey, parentData, repeatJobKey, debounceId) + parentKey, parentData, repeatJobKey) local jsonOpts = cjson.encode(opts) local delay = opts['delay'] or 0 local priority = opts['priority'] or 0 - + local debounceId = opts['debo'] and opts['debo']['id'] + local optionalValues = {} if parentKey ~= nil then table.insert(optionalValues, "parentKey") @@ -20,7 +21,7 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, table.insert(optionalValues, repeatJobKey) end - if debounceId ~= nil then + if debounceId then table.insert(optionalValues, "deid") table.insert(optionalValues, debounceId) end diff --git a/src/interfaces/debouncing-options.ts b/src/interfaces/debounce-options.ts similarity index 64% rename from src/interfaces/debouncing-options.ts rename to src/interfaces/debounce-options.ts index accbf960da..02a34fa2fa 100644 --- a/src/interfaces/debouncing-options.ts +++ b/src/interfaces/debounce-options.ts @@ -1,7 +1,7 @@ /** - * Debouncing options + * Debounce options */ -export interface DebouncingOptions { +export interface DebounceOptions { /** * ttl in milliseconds */ diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index 6ccac834bb..79f104d934 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -18,6 +18,7 @@ export interface JobJson { parent?: ParentKeys; parentKey?: string; repeatJobKey?: string; + debounceId?: string; processedBy?: string; } diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 1c4dc95c88..4b0eea7b78 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -1,10 +1,10 @@ -import { BaseJobOptions, DebouncingOptions } from '../interfaces'; +import { BaseJobOptions, DebounceOptions } from '../interfaces'; export type JobsOptions = BaseJobOptions & { /** - * Debouncing options. + * Debounce options. */ - debouncing?: DebouncingOptions; + debounce?: DebounceOptions; /** * If true, moves parent to failed. diff --git a/tests/test_events.ts b/tests/test_events.ts index d1a00c7f54..8e1141eb27 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -406,7 +406,7 @@ describe('events', function () { const job = await queue.add( testName, { foo: 'bar' }, - { debouncing: { id: 'a1', ttl: 2000 } }, + { debounce: { id: 'a1', ttl: 2000 } }, ); let debouncedCounter = 0; @@ -424,28 +424,28 @@ describe('events', function () { await queue.add( testName, { foo: 'bar' }, - { debouncing: { id: 'a1', ttl: 2000 } }, + { debounce: { id: 'a1', ttl: 2000 } }, ); await queue.add( testName, { foo: 'bar' }, - { debouncing: { id: 'a1', ttl: 2000 } }, + { debounce: { id: 'a1', ttl: 2000 } }, ); await delay(1100); secondJob = await queue.add( testName, { foo: 'bar' }, - { debouncing: { id: 'a1', ttl: 2000 } }, + { debounce: { id: 'a1', ttl: 2000 } }, ); await queue.add( testName, { foo: 'bar' }, - { debouncing: { id: 'a1', ttl: 2000 } }, + { debounce: { id: 'a1', ttl: 2000 } }, ); await queue.add( testName, { foo: 'bar' }, - { debouncing: { id: 'a1', ttl: 2000 } }, + { debounce: { id: 'a1', ttl: 2000 } }, ); await delay(100); @@ -464,13 +464,13 @@ describe('events', function () { await queue.add( testName, { foo: 'bar' }, - { debouncing: { id: 'a1' } }, + { debounce: { id: 'a1' } }, ); await delay(100); await queue.add( testName, { foo: 'bar' }, - { debouncing: { id: 'a1' } }, + { debounce: { id: 'a1' } }, ); await delay(100); }, @@ -497,14 +497,14 @@ describe('events', function () { worker.run(); - await queue.add(testName, { foo: 'bar' }, { debouncing: { id: 'a1' } }); + await queue.add(testName, { foo: 'bar' }, { debounce: { id: 'a1' } }); await completing; const secondJob = await queue.add( testName, { foo: 'bar' }, - { debouncing: { id: 'a1' } }, + { debounce: { id: 'a1' } }, ); const count = await queue.getJobCountByTypes(); From 0755ced6edc5d46d4117442d9b26e212c06c0805 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 23 Jul 2024 08:43:54 -0500 Subject: [PATCH 07/12] chore: change debo to de --- src/commands/addDelayedJob-6.lua | 4 +--- src/commands/addParentJob-4.lua | 4 +--- src/commands/addPrioritizedJob-8.lua | 2 +- src/commands/addStandardJob-8.lua | 2 +- src/commands/includes/storeJob.lua | 2 +- 5 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index b6c5f5712f..cbec0dcf63 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -90,9 +90,7 @@ else end end -local debounceId = opts['debo'] and opts['debo']['id'] - -local debouncedJobId = debounceJob(args[1], opts['debo'], +local debouncedJobId = debounceJob(args[1], opts['de'], jobId, debounceKey, eventsKey, maxEvents) if debouncedJobId then return debouncedJobId diff --git a/src/commands/addParentJob-4.lua b/src/commands/addParentJob-4.lua index 3c0fd50b44..0cb89372dc 100644 --- a/src/commands/addParentJob-4.lua +++ b/src/commands/addParentJob-4.lua @@ -81,9 +81,7 @@ else end end -local debounceId = opts['debo'] and opts['debo']['id'] - -local debouncedJobId = debounceJob(args[1], opts['debo'], +local debouncedJobId = debounceJob(args[1], opts['de'], jobId, debounceKey, eventsKey, maxEvents) if debouncedJobId then return debouncedJobId diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index dc4b679bda..288e779154 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -90,7 +90,7 @@ else end end -local debouncedJobId = debounceJob(args[1], opts['debo'], +local debouncedJobId = debounceJob(args[1], opts['de'], jobId, debounceKey, eventsKey, maxEvents) if debouncedJobId then return debouncedJobId diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index a3a7a792f0..54fb363d73 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -94,7 +94,7 @@ else end end -local debouncedJobId = debounceJob(args[1], opts['debo'], +local debouncedJobId = debounceJob(args[1], opts['de'], jobId, debounceKey, eventsKey, maxEvents) if debouncedJobId then return debouncedJobId diff --git a/src/commands/includes/storeJob.lua b/src/commands/includes/storeJob.lua index 3c798a9c48..ef53c9cba1 100644 --- a/src/commands/includes/storeJob.lua +++ b/src/commands/includes/storeJob.lua @@ -6,7 +6,7 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, local jsonOpts = cjson.encode(opts) local delay = opts['delay'] or 0 local priority = opts['priority'] or 0 - local debounceId = opts['debo'] and opts['debo']['id'] + local debounceId = opts['de'] and opts['de']['id'] local optionalValues = {} if parentKey ~= nil then From a8516813e46c439383e8e411c2a5faa5cb94d6aa Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 23 Jul 2024 08:49:02 -0500 Subject: [PATCH 08/12] chore: change export references --- src/interfaces/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index 4839fc4fd4..65dfed0396 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -3,7 +3,7 @@ export * from './backoff-options'; export * from './base-job-options'; export * from './child-message'; export * from './connection'; -export * from './debouncing-options'; +export * from './debounce-options'; export * from './flow-job'; export * from './ioredis-events'; export * from './job-json'; From a28cc43c730f8ad45d706f624ecdd30b3befb480 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 23 Jul 2024 09:06:34 -0500 Subject: [PATCH 09/12] chore: fix debo ref --- src/classes/job.ts | 1 + src/commands/includes/removeDebounceKeyIfNeeded.lua | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index 60a9cffb6c..06035eefd9 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -457,6 +457,7 @@ export class Job< timestamp: this.timestamp, failedReason: JSON.stringify(this.failedReason), stacktrace: JSON.stringify(this.stacktrace), + debounceId: this.debounceId, repeatJobKey: this.repeatJobKey, returnvalue: JSON.stringify(this.returnvalue), }; diff --git a/src/commands/includes/removeDebounceKeyIfNeeded.lua b/src/commands/includes/removeDebounceKeyIfNeeded.lua index 8044452168..fbc058e20f 100644 --- a/src/commands/includes/removeDebounceKeyIfNeeded.lua +++ b/src/commands/includes/removeDebounceKeyIfNeeded.lua @@ -4,7 +4,7 @@ local function removeDebounceKeyIfNeeded(prefixKey, debounceId) if debounceId then - local debounceKey = prefixKey .. "debounce:" .. debounceId + local debounceKey = prefixKey .. "de:" .. debounceId local pttl = rcall("PTTL", debounceKey) if pttl == 0 or pttl == -1 then @@ -12,4 +12,3 @@ local function removeDebounceKeyIfNeeded(prefixKey, debounceId) end end end - \ No newline at end of file From 34f1fed49c2cd5c5859964ccef6cb3c5fda794a4 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 23 Jul 2024 22:18:54 -0500 Subject: [PATCH 10/12] chore: remove debounceKey when manual removal --- docs/gitbook/guide/jobs/debouncing.md | 4 ++++ src/commands/includes/cleanList.lua | 2 +- src/commands/includes/cleanSet.lua | 4 ++-- src/commands/includes/removeDebounceKey.lua | 12 ++++++++++++ src/commands/includes/removeJob.lua | 6 +++++- src/commands/includes/removeJobs.lua | 2 +- src/commands/includes/removeJobsByMaxAge.lua | 5 +++-- src/commands/includes/removeJobsByMaxCount.lua | 2 +- .../includes/removeParentDependencyKey.lua | 15 +++++++++++---- src/commands/moveStalledJobsToWait-9.lua | 2 +- src/commands/moveToFinished-14.lua | 2 +- src/commands/removeChildDependency-1.lua | 2 +- src/commands/removeJob-2.lua | 4 +++- 13 files changed, 46 insertions(+), 16 deletions(-) create mode 100644 src/commands/includes/removeDebounceKey.lua diff --git a/docs/gitbook/guide/jobs/debouncing.md b/docs/gitbook/guide/jobs/debouncing.md index 65c2377f90..fa8ec1d774 100644 --- a/docs/gitbook/guide/jobs/debouncing.md +++ b/docs/gitbook/guide/jobs/debouncing.md @@ -40,6 +40,10 @@ While this job is not moved to completed or failed state, next jobs added with s This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress. +{% hint style="warning" %} +Any manual deletion will remove the debounce key. For example, when calling _job.remove_ method. +{% endhint %} + ## Read more: - 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add) diff --git a/src/commands/includes/cleanList.lua b/src/commands/includes/cleanList.lua index 99b397ef53..90e465f613 100644 --- a/src/commands/includes/cleanList.lua +++ b/src/commands/includes/cleanList.lua @@ -34,7 +34,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd, -- replace the entry with a deletion marker; the actual deletion will -- occur at the end of the script rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker) - removeJob(job, true, jobKeyPrefix) + removeJob(job, true, jobKeyPrefix, true) deletedCount = deletedCount + 1 table.insert(deleted, job) end diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index f83393d50a..78048a1a1b 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -21,14 +21,14 @@ local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attrib local jobKey = jobKeyPrefix .. job if isFinished then - removeJob(job, true, jobKeyPrefix) + removeJob(job, true, jobKeyPrefix, true) deletedCount = deletedCount + 1 table.insert(deleted, job) else -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed jobTS = getTimestamp(jobKey, attributes) if (not jobTS or jobTS <= timestamp) then - removeJob(job, true, jobKeyPrefix) + removeJob(job, true, jobKeyPrefix, true) deletedCount = deletedCount + 1 table.insert(deleted, job) end diff --git a/src/commands/includes/removeDebounceKey.lua b/src/commands/includes/removeDebounceKey.lua new file mode 100644 index 0000000000..5c87385824 --- /dev/null +++ b/src/commands/includes/removeDebounceKey.lua @@ -0,0 +1,12 @@ +--[[ + Function to remove debounce key. +]] + +local function removeDebounceKey(prefixKey, jobKey) + local debounceId = rcall("HGET", jobKey, "deid") + if debounceId then + local debounceKey = prefixKey .. "de:" .. debounceId + rcall("DEL", debounceKey) + end +end + \ No newline at end of file diff --git a/src/commands/includes/removeJob.lua b/src/commands/includes/removeJob.lua index c9c02a182c..8eb245feac 100644 --- a/src/commands/includes/removeJob.lua +++ b/src/commands/includes/removeJob.lua @@ -3,11 +3,15 @@ ]] -- Includes +--- @include "removeDebounceKey" --- @include "removeJobKeys" --- @include "removeParentDependencyKey" -local function removeJob(jobId, hard, baseKey) +local function removeJob(jobId, hard, baseKey, shouldRemoveDebounceKey) local jobKey = baseKey .. jobId removeParentDependencyKey(jobKey, hard, nil, baseKey) + if shouldRemoveDebounceKey then + removeDebounceKey(baseKey, jobKey) + end removeJobKeys(jobKey) end diff --git a/src/commands/includes/removeJobs.lua b/src/commands/includes/removeJobs.lua index 120bc7e00c..59f669d789 100644 --- a/src/commands/includes/removeJobs.lua +++ b/src/commands/includes/removeJobs.lua @@ -7,7 +7,7 @@ local function removeJobs(keys, hard, baseKey, max) for i, key in ipairs(keys) do - removeJob(key, hard, baseKey) + removeJob(key, hard, baseKey, true) end return max - #keys end diff --git a/src/commands/includes/removeJobsByMaxAge.lua b/src/commands/includes/removeJobsByMaxAge.lua index 2acd93b11c..85292bca8c 100644 --- a/src/commands/includes/removeJobsByMaxAge.lua +++ b/src/commands/includes/removeJobsByMaxAge.lua @@ -5,11 +5,12 @@ -- Includes --- @include "removeJob" -local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix) +local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix, + shouldRemoveDebounceKey) local start = timestamp - maxAge * 1000 local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") for i, jobId in ipairs(jobIds) do - removeJob(jobId, false, prefix) + removeJob(jobId, false, prefix, false) end rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) end diff --git a/src/commands/includes/removeJobsByMaxCount.lua b/src/commands/includes/removeJobsByMaxCount.lua index 9fd31d3025..cf1023e4ae 100644 --- a/src/commands/includes/removeJobsByMaxCount.lua +++ b/src/commands/includes/removeJobsByMaxCount.lua @@ -9,7 +9,7 @@ local function removeJobsByMaxCount(maxCount, targetSet, prefix) local start = maxCount local jobIds = rcall("ZREVRANGE", targetSet, start, -1) for i, jobId in ipairs(jobIds) do - removeJob(jobId, false, prefix) + removeJob(jobId, false, prefix, false) end rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)) end diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index dee16ca994..87262850c9 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -21,7 +21,7 @@ local function moveParentToWait(parentPrefix, parentId, emitEvent) end end -local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) +local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId) if parentKey then local parentDependenciesKey = parentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) @@ -36,8 +36,11 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) if numRemovedElements == 1 then if hard then -- remove parent in same queue if parentPrefix == baseKey then - removeParentDependencyKey(parentKey, hard, nil, baseKey) + removeParentDependencyKey(parentKey, hard, nil, baseKey, nil) removeJobKeys(parentKey) + if debounceId then + rcall("DEL", parentPrefix .. "de:" .. debounceId) + end else moveParentToWait(parentPrefix, parentId) end @@ -49,7 +52,8 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) return true end else - local missedParentKey = rcall("HGET", jobKey, "parentKey") + local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid") + local missedParentKey = parentAttributes[1] if( (type(missedParentKey) == "string") and missedParentKey ~= "" and (rcall("EXISTS", missedParentKey) == 1)) then local parentDependenciesKey = missedParentKey .. ":dependencies" @@ -65,8 +69,11 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) if numRemovedElements == 1 then if hard then if parentPrefix == baseKey then - removeParentDependencyKey(missedParentKey, hard, nil, baseKey) + removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil) removeJobKeys(missedParentKey) + if parentAttributes[2] then + rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2]) + end else moveParentToWait(parentPrefix, parentId) end diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index f4aa35fc9a..91546d4c76 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -125,7 +125,7 @@ if (#stalling > 0) then failedKey, queueKeyPrefix) elseif removeOnFailType == "boolean" then if opts["removeOnFail"] then - removeJob(jobId, false, queueKeyPrefix) + removeJob(jobId, false, queueKeyPrefix, false) rcall("ZREM", failedKey, jobId) end elseif removeOnFailType ~= "nil" then diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-14.lua index af2dd4f624..dc396da706 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-14.lua @@ -182,7 +182,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- TODO: when a child is removed when finished, result or failure in parent -- must not be deleted, those value references should be deleted when the parent -- is deleted - removeParentDependencyKey(jobIdKey, false, parentKey) + removeParentDependencyKey(jobIdKey, false, parentKey, jobAttributes[3]) end end diff --git a/src/commands/removeChildDependency-1.lua b/src/commands/removeChildDependency-1.lua index 6eddcb2e60..5caa9aad43 100644 --- a/src/commands/removeChildDependency-1.lua +++ b/src/commands/removeChildDependency-1.lua @@ -25,7 +25,7 @@ if rcall("EXISTS", jobKey) ~= 1 then return -1 end if rcall("EXISTS", parentKey) ~= 1 then return -5 end -if removeParentDependencyKey(jobKey, false, parentKey, KEYS[1]) then +if removeParentDependencyKey(jobKey, false, parentKey, KEYS[1], nil) then rcall("HDEL", jobKey, "parentKey", "parent") return 0 diff --git a/src/commands/removeJob-2.lua b/src/commands/removeJob-2.lua index aaee237727..19c62de1d3 100644 --- a/src/commands/removeJob-2.lua +++ b/src/commands/removeJob-2.lua @@ -19,6 +19,7 @@ local rcall = redis.call --- @include "includes/destructureJobKey" --- @include "includes/getOrSetMaxEvents" --- @include "includes/isLocked" +--- @include "includes/removeDebounceKey" --- @include "includes/removeJobFromAnyState" --- @include "includes/removeJobKeys" --- @include "includes/removeParentDependencyKey" @@ -26,7 +27,7 @@ local rcall = redis.call local function removeJob( prefix, jobId, parentKey, removeChildren) local jobKey = prefix .. jobId; - removeParentDependencyKey(jobKey, false, parentKey) + removeParentDependencyKey(jobKey, false, parentKey, nil) if removeChildren == "1" then -- Check if this job has children @@ -66,6 +67,7 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) local prev = removeJobFromAnyState(prefix, jobId) + removeDebounceKey(prefix, jobKey) if removeJobKeys(jobKey) > 0 then local maxEvents = getOrSetMaxEvents(KEYS[2]) rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed", From 375641aa625f5f93f948116783ceb9d8af2dca09 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 23 Jul 2024 22:28:51 -0500 Subject: [PATCH 11/12] test: add test case when removing debounced job --- docs/gitbook/guide/jobs/debouncing.md | 2 +- tests/test_events.ts | 51 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/docs/gitbook/guide/jobs/debouncing.md b/docs/gitbook/guide/jobs/debouncing.md index fa8ec1d774..276bf3a503 100644 --- a/docs/gitbook/guide/jobs/debouncing.md +++ b/docs/gitbook/guide/jobs/debouncing.md @@ -41,7 +41,7 @@ While this job is not moved to completed or failed state, next jobs added with s This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress. {% hint style="warning" %} -Any manual deletion will remove the debounce key. For example, when calling _job.remove_ method. +Any manual deletion will disable the debouncing. For example, when calling _job.remove_ method. {% endhint %} ## Read more: diff --git a/tests/test_events.ts b/tests/test_events.ts index 8e1141eb27..41d99c2839 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -451,6 +451,57 @@ describe('events', function () { expect(debouncedCounter).to.be.equal(4); }); + + describe('when removing debounced job', function () { + it('removes debounce key', async function () { + const testName = 'test'; + + const job = await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + + let debouncedCounter = 0; + queueEvents.on('debounced', ({ jobId }) => { + debouncedCounter++; + }); + await job.remove(); + + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(1000); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(1100); + const secondJob = await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await secondJob.remove(); + + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { debounce: { id: 'a1', ttl: 2000 } }, + ); + await delay(100); + + expect(debouncedCounter).to.be.equal(2); + }); + }); }); describe('when ttl is not provided', function () { From bbfa2e2809b01eda444d4cf71676e558e05c7fa4 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 24 Jul 2024 07:36:25 -0500 Subject: [PATCH 12/12] chore: address comments --- src/commands/includes/cleanList.lua | 2 +- src/commands/includes/cleanSet.lua | 4 ++-- src/commands/includes/debounceJob.lua | 8 ++++---- src/commands/includes/removeJobs.lua | 2 +- src/commands/includes/removeJobsByMaxAge.lua | 2 +- src/commands/includes/removeJobsByMaxCount.lua | 2 +- src/commands/moveStalledJobsToWait-9.lua | 3 ++- tests/test_worker.ts | 2 +- 8 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/commands/includes/cleanList.lua b/src/commands/includes/cleanList.lua index 90e465f613..5dbae4abc3 100644 --- a/src/commands/includes/cleanList.lua +++ b/src/commands/includes/cleanList.lua @@ -34,7 +34,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd, -- replace the entry with a deletion marker; the actual deletion will -- occur at the end of the script rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker) - removeJob(job, true, jobKeyPrefix, true) + removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) deletedCount = deletedCount + 1 table.insert(deleted, job) end diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index 78048a1a1b..c48b098693 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -21,14 +21,14 @@ local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attrib local jobKey = jobKeyPrefix .. job if isFinished then - removeJob(job, true, jobKeyPrefix, true) + removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) deletedCount = deletedCount + 1 table.insert(deleted, job) else -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed jobTS = getTimestamp(jobKey, attributes) if (not jobTS or jobTS <= timestamp) then - removeJob(job, true, jobKeyPrefix, true) + removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) deletedCount = deletedCount + 1 table.insert(deleted, job) end diff --git a/src/commands/includes/debounceJob.lua b/src/commands/includes/debounceJob.lua index 534e49c609..b40f4cb8db 100644 --- a/src/commands/includes/debounceJob.lua +++ b/src/commands/includes/debounceJob.lua @@ -6,13 +6,13 @@ local function debounceJob(prefixKey, debounceOpts, jobId, debounceKey, eventsKe local debounceId = debounceOpts and debounceOpts['id'] if debounceId then local ttl = debounceOpts['ttl'] - local isFirstSet + local debounceKeyExists if ttl then - isFirstSet = rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX') + debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX') else - isFirstSet = rcall('SET', debounceKey, jobId, 'NX') + debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX') end - if not isFirstSet then + if debounceKeyExists then local currentDebounceJobId = rcall('GET', debounceKey) rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId", currentDebounceJobId) diff --git a/src/commands/includes/removeJobs.lua b/src/commands/includes/removeJobs.lua index 59f669d789..58b67abd97 100644 --- a/src/commands/includes/removeJobs.lua +++ b/src/commands/includes/removeJobs.lua @@ -7,7 +7,7 @@ local function removeJobs(keys, hard, baseKey, max) for i, key in ipairs(keys) do - removeJob(key, hard, baseKey, true) + removeJob(key, hard, baseKey, true --[[remove debounce key]]) end return max - #keys end diff --git a/src/commands/includes/removeJobsByMaxAge.lua b/src/commands/includes/removeJobsByMaxAge.lua index 85292bca8c..ca24fad3a7 100644 --- a/src/commands/includes/removeJobsByMaxAge.lua +++ b/src/commands/includes/removeJobsByMaxAge.lua @@ -10,7 +10,7 @@ local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix, local start = timestamp - maxAge * 1000 local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") for i, jobId in ipairs(jobIds) do - removeJob(jobId, false, prefix, false) + removeJob(jobId, false, prefix, false --[[remove debounce key]]) end rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) end diff --git a/src/commands/includes/removeJobsByMaxCount.lua b/src/commands/includes/removeJobsByMaxCount.lua index cf1023e4ae..af52c612c4 100644 --- a/src/commands/includes/removeJobsByMaxCount.lua +++ b/src/commands/includes/removeJobsByMaxCount.lua @@ -9,7 +9,7 @@ local function removeJobsByMaxCount(maxCount, targetSet, prefix) local start = maxCount local jobIds = rcall("ZREVRANGE", targetSet, start, -1) for i, jobId in ipairs(jobIds) do - removeJob(jobId, false, prefix, false) + removeJob(jobId, false, prefix, false --[[remove debounce key]]) end rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)) end diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 91546d4c76..fe39587ebc 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -125,7 +125,8 @@ if (#stalling > 0) then failedKey, queueKeyPrefix) elseif removeOnFailType == "boolean" then if opts["removeOnFail"] then - removeJob(jobId, false, queueKeyPrefix, false) + removeJob(jobId, false, queueKeyPrefix, + false --[[remove debounce key]]) rcall("ZREM", failedKey, jobId) end elseif removeOnFailType ~= "nil" then diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 066252c5f6..34cd191905 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1948,7 +1948,7 @@ describe('workers', function () { await worker.waitUntilReady(); - const jobs = await Promise.all( + await Promise.all( Array.from({ length: concurrency }).map(() => queue.add('test', { bar: 'baz' }), ),