diff --git a/README.md b/README.md index 7c2a941c41..6e8f056881 100644 --- a/README.md +++ b/README.md @@ -228,7 +228,7 @@ Since there are a few job queue solutions, here is a table comparing them: | Group Support | ✓ | | | | | | | Batches Support | ✓ | | | | | | | Parent/Child Dependencies | ✓ | ✓ | | | | | -| Debouncing | ✓ | ✓ | ✓ | | | | +| Deduplication | ✓ | ✓ | ✓ | | | | | Priorities | ✓ | ✓ | ✓ | ✓ | | ✓ | | Concurrency | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | | Delayed jobs | ✓ | ✓ | ✓ | ✓ | | ✓ | diff --git a/docs/gitbook/changelog.md b/docs/gitbook/changelog.md index e54c683c55..e483290cde 100644 --- a/docs/gitbook/changelog.md +++ b/docs/gitbook/changelog.md @@ -1,3 +1,10 @@ +# [5.29.0](https://github.com/taskforcesh/bullmq/compare/v5.28.2...v5.29.0) (2024-11-22) + + +### Features + +* **queue:** refactor a protected addJob method allowing telemetry extensions ([09f2571](https://github.com/taskforcesh/bullmq/commit/09f257196f6d5a6690edbf55f12d585cec86ee8f)) + ## [5.28.2](https://github.com/taskforcesh/bullmq/compare/v5.28.1...v5.28.2) (2024-11-22) diff --git a/package.json b/package.json index db4e0ff1cf..e9a733ea4d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bullmq", - "version": "5.28.2", + "version": "5.29.0", "description": "Queue for messages and jobs based on Redis", "homepage": "https://bullmq.io/", "main": "./dist/cjs/index.js", diff --git a/src/classes/job.ts b/src/classes/job.ts index 1a16bc3c13..deb1a713b2 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -46,11 +46,10 @@ const optsDecodeMap = { kl: 'keepLogs', ocf: 'onChildFailure', rdof: 'removeDependencyOnFailure', - tm: 'telemetryMetadata' + tm: 'telemetryMetadata', }; const optsEncodeMap = invertObject(optsDecodeMap); -optsEncodeMap.debounce = 'de'; export const PRIORITY_LIMIT = 2 ** 21; @@ -150,12 +149,6 @@ export class Job< */ parent?: ParentKeys; - /** - * Debounce identifier. - * @deprecated use deduplicationId - */ - debounceId?: string; - /** * Deduplication identifier. */ @@ -225,10 +218,9 @@ export class Job< ? { id: opts.parent.id, queueKey: opts.parent.queue } : undefined; - this.debounceId = opts.debounce ? opts.debounce.id : undefined; this.deduplicationId = opts.deduplication ? opts.deduplication.id - : this.debounceId; + : undefined; this.toKey = queue.toKey.bind(queue); this.setScripts(); @@ -354,7 +346,6 @@ export class Job< } if (json.deid) { - job.debounceId = json.deid; job.deduplicationId = json.deid; } @@ -481,7 +472,6 @@ export class Job< timestamp: this.timestamp, failedReason: JSON.stringify(this.failedReason), stacktrace: JSON.stringify(this.stacktrace), - debounceId: this.debounceId, deduplicationId: this.deduplicationId, repeatJobKey: this.repeatJobKey, returnvalue: JSON.stringify(this.returnvalue), diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index ea875b8706..17e498bd86 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -45,14 +45,6 @@ export interface QueueEventsListener extends IoredisListener { id: string, ) => void; - /** - * Listen to 'debounced' event. - * @deprecated use deduplicated event - * - * This event is triggered when a job is debounced because debounceId still existed. - */ - debounced: (args: { jobId: string; debounceId: string }, id: string) => void; - /** * Listen to 'deduplicated' event. * diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index 29e0e9d1c3..0f3d7e20e1 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -98,18 +98,6 @@ export class QueueGetters extends QueueBase { return this.scripts.getRateLimitTtl(maxJobs); } - /** - * Get jobId that starts debounced state. - * @deprecated use getDeduplicationJobId method - * - * @param id - debounce identifier - */ - async getDebounceJobId(id: string): Promise { - const client = await this.client; - - return client.get(`${this.keys.de}:${id}`); - } - /** * Get jobId from deduplicated state. * diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 5862765647..67ecf8fb78 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -310,47 +310,66 @@ export class Queue< opts = { ...opts, telemetryMetadata: srcPropagationMedatada }; } - if (opts && opts.repeat) { - if (opts.repeat.endDate) { - if (+new Date(opts.repeat.endDate) < Date.now()) { - throw new Error( - 'End date must be greater than current timestamp', - ); - } - } + const job = await this.addJob(name, data, opts); - return (await this.repeat).updateRepeatableJob< - DataType, - ResultType, - NameType - >(name, data, { ...this.jobsOpts, ...opts }, { override: true }); - } else { - const jobId = opts?.jobId; + span?.setAttributes({ + [TelemetryAttributes.JobName]: name, + [TelemetryAttributes.JobId]: job.id, + }); - if (jobId == '0' || jobId?.includes(':')) { - throw new Error("JobId cannot be '0' or contain :"); - } + return job; + }, + ); + } - const job = await this.Job.create( - this as MinimalQueue, - name, - data, - { - ...this.jobsOpts, - ...opts, - jobId, - }, - ); - this.emit('waiting', job as JobBase); + /** + * addJob is a telemetry free version of the add method, useful in order to wrap it + * with custom telemetry on subclasses. + * + * @param name + * @param data + * @param opts + * + * @returns Job + */ + protected async addJob( + name: NameType, + data: DataType, + opts?: JobsOptions, + ): Promise> { + if (opts && opts.repeat) { + if (opts.repeat.endDate) { + if (+new Date(opts.repeat.endDate) < Date.now()) { + throw new Error('End date must be greater than current timestamp'); + } + } - span?.setAttributes({ - [TelemetryAttributes.JobId]: job.id, - }); + return (await this.repeat).updateRepeatableJob< + DataType, + ResultType, + NameType + >(name, data, { ...this.jobsOpts, ...opts }, { override: true }); + } else { + const jobId = opts?.jobId; - return job; - } - }, - ); + if (jobId == '0' || jobId?.includes(':')) { + throw new Error("JobId cannot be '0' or contain :"); + } + + const job = await this.Job.create( + this as MinimalQueue, + name, + data, + { + ...this.jobsOpts, + ...opts, + jobId, + }, + ); + this.emit('waiting', job as JobBase); + + return job; + } } /** @@ -624,29 +643,6 @@ export class Queue< return !removed; } - /** - * Removes a debounce key. - * @deprecated use removeDeduplicationKey - * - * @param id - identifier - */ - async removeDebounceKey(id: string): Promise { - return this.trace( - SpanKind.INTERNAL, - 'removeDebounceKey', - `${this.name}`, - async span => { - span?.setAttributes({ - [TelemetryAttributes.JobKey]: id, - }); - - const client = await this.client; - - return await client.del(`${this.keys.de}:${id}`); - }, - ); - } - /** * Removes a deduplication key. * diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-2.lua index 4583e6b223..687776f288 100644 --- a/src/commands/addJobScheduler-2.lua +++ b/src/commands/addJobScheduler-2.lua @@ -67,7 +67,7 @@ if prevMillis ~= false then if rcall("ZSCORE", delayedKey, delayedJobId) ~= false and rcall("EXISTS", nextDelayedJobId) ~= 1 then - removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]]) + removeJob(delayedJobId, true, prefixKey, true --[[remove deduplication key]]) rcall("ZREM", delayedKey, delayedJobId) end end diff --git a/src/commands/addRepeatableJob-2.lua b/src/commands/addRepeatableJob-2.lua index 4046df5381..0762cf6128 100644 --- a/src/commands/addRepeatableJob-2.lua +++ b/src/commands/addRepeatableJob-2.lua @@ -71,7 +71,7 @@ if prevMillis ~= false then if rcall("ZSCORE", delayedKey, delayedJobId) ~= false and rcall("EXISTS", nextDelayedJobId) ~= 1 then - removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]]) + removeJob(delayedJobId, true, prefixKey, true --[[remove deduplication key]]) rcall("ZREM", delayedKey, delayedJobId) end end diff --git a/src/commands/includes/cleanList.lua b/src/commands/includes/cleanList.lua index 5dbae4abc3..11b086940c 100644 --- a/src/commands/includes/cleanList.lua +++ b/src/commands/includes/cleanList.lua @@ -34,7 +34,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd, -- replace the entry with a deletion marker; the actual deletion will -- occur at the end of the script rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker) - removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) + removeJob(job, true, jobKeyPrefix, true --[[remove deduplication key]]) deletedCount = deletedCount + 1 table.insert(deleted, job) end diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index f85d41c259..30daf3a226 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -41,14 +41,14 @@ local function cleanSet( if not isJobSchedulerJob(job, jobSchedulersKey) then local jobKey = jobKeyPrefix .. job if isFinished then - removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] ) + removeJob(job, true, jobKeyPrefix, true --[[remove deduplication key]] ) deletedCount = deletedCount + 1 table.insert(deleted, job) else -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed jobTS = getTimestamp(jobKey, attributes) if (not jobTS or jobTS <= timestamp) then - removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] ) + removeJob(job, true, jobKeyPrefix, true --[[remove deduplication key]] ) deletedCount = deletedCount + 1 table.insert(deleted, job) end diff --git a/src/commands/includes/deduplicateJob.lua b/src/commands/includes/deduplicateJob.lua index ff873e599c..03f92ffb8b 100644 --- a/src/commands/includes/deduplicateJob.lua +++ b/src/commands/includes/deduplicateJob.lua @@ -1,5 +1,5 @@ --[[ - Function to debounce a job. + Function to deduplicate a job. ]] local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplicationKey, eventsKey, maxEvents) @@ -13,12 +13,10 @@ local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplication deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX') end if deduplicationKeyExists then - local currentDebounceJobId = rcall('GET', deduplicationKey) + local currentDeduplicationJobId = rcall('GET', deduplicationKey) rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", - "debounced", "jobId", currentDebounceJobId, "debounceId", deduplicationId) - rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", - "deduplicated", "jobId", currentDebounceJobId, "deduplicationId", deduplicationId) - return currentDebounceJobId + "deduplicated", "jobId", currentDeduplicationJobId, "deduplicationId", deduplicationId) + return currentDeduplicationJobId end end end diff --git a/src/commands/includes/removeJobs.lua b/src/commands/includes/removeJobs.lua index 58b67abd97..f3882a7cb4 100644 --- a/src/commands/includes/removeJobs.lua +++ b/src/commands/includes/removeJobs.lua @@ -7,7 +7,7 @@ local function removeJobs(keys, hard, baseKey, max) for i, key in ipairs(keys) do - removeJob(key, hard, baseKey, true --[[remove debounce key]]) + removeJob(key, hard, baseKey, true --[[remove deduplication key]]) end return max - #keys end diff --git a/src/commands/includes/removeJobsByMaxAge.lua b/src/commands/includes/removeJobsByMaxAge.lua index ca24fad3a7..13ff15944e 100644 --- a/src/commands/includes/removeJobsByMaxAge.lua +++ b/src/commands/includes/removeJobsByMaxAge.lua @@ -5,12 +5,11 @@ -- Includes --- @include "removeJob" -local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix, - shouldRemoveDebounceKey) +local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix) local start = timestamp - maxAge * 1000 local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") for i, jobId in ipairs(jobIds) do - removeJob(jobId, false, prefix, false --[[remove debounce key]]) + removeJob(jobId, false, prefix, false --[[remove deduplication key]]) end rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) end diff --git a/src/commands/includes/removeJobsByMaxCount.lua b/src/commands/includes/removeJobsByMaxCount.lua index af52c612c4..f4e165d40e 100644 --- a/src/commands/includes/removeJobsByMaxCount.lua +++ b/src/commands/includes/removeJobsByMaxCount.lua @@ -9,7 +9,7 @@ local function removeJobsByMaxCount(maxCount, targetSet, prefix) local start = maxCount local jobIds = rcall("ZREVRANGE", targetSet, start, -1) for i, jobId in ipairs(jobIds) do - removeJob(jobId, false, prefix, false --[[remove debounce key]]) + removeJob(jobId, false, prefix, false --[[remove deduplication key]]) end rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)) end diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index 7e5b34ce90..8544db0538 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -20,7 +20,7 @@ local function moveParentToWait(parentPrefix, parentId, emitEvent) end end -local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId) +local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, deduplicationId) if parentKey then local parentDependenciesKey = parentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) @@ -37,8 +37,8 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debou if parentPrefix == baseKey then removeParentDependencyKey(parentKey, hard, nil, baseKey, nil) removeJobKeys(parentKey) - if debounceId then - rcall("DEL", parentPrefix .. "de:" .. debounceId) + if deduplicationId then + rcall("DEL", parentPrefix .. "de:" .. deduplicationId) end else moveParentToWait(parentPrefix, parentId) diff --git a/src/commands/includes/storeJob.lua b/src/commands/includes/storeJob.lua index ef53c9cba1..e0c5a254dc 100644 --- a/src/commands/includes/storeJob.lua +++ b/src/commands/includes/storeJob.lua @@ -6,7 +6,7 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, local jsonOpts = cjson.encode(opts) local delay = opts['delay'] or 0 local priority = opts['priority'] or 0 - local debounceId = opts['de'] and opts['de']['id'] + local deduplicationId = opts['de'] and opts['de']['id'] local optionalValues = {} if parentKey ~= nil then @@ -21,9 +21,9 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, table.insert(optionalValues, repeatJobKey) end - if debounceId then + if deduplicationId then table.insert(optionalValues, "deid") - table.insert(optionalValues, debounceId) + table.insert(optionalValues, deduplicationId) end rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts, diff --git a/src/commands/moveStalledJobsToWait-8.lua b/src/commands/moveStalledJobsToWait-8.lua index 123118b5d0..c6e82aa03a 100644 --- a/src/commands/moveStalledJobsToWait-8.lua +++ b/src/commands/moveStalledJobsToWait-8.lua @@ -102,7 +102,7 @@ if (#stalling > 0) then elseif removeOnFailType == "boolean" then if opts["removeOnFail"] then removeJob(jobId, false, queueKeyPrefix, - false --[[remove debounce key]]) + false --[[remove deduplication key]]) rcall("ZREM", failedKey, jobId) end elseif removeOnFailType ~= "nil" then diff --git a/src/interfaces/debounce-options.ts b/src/interfaces/deduplication-options.ts similarity index 61% rename from src/interfaces/debounce-options.ts rename to src/interfaces/deduplication-options.ts index 02a34fa2fa..81ecd7c47f 100644 --- a/src/interfaces/debounce-options.ts +++ b/src/interfaces/deduplication-options.ts @@ -1,7 +1,7 @@ /** - * Debounce options + * Deduplication options */ -export interface DebounceOptions { +export interface DeduplicationOptions { /** * ttl in milliseconds */ diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index bf8a6ac0be..9c4e9c4f2a 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -3,7 +3,7 @@ export * from './backoff-options'; export * from './base-job-options'; export * from './child-message'; export * from './connection'; -export * from './debounce-options'; +export * from './deduplication-options'; export * from './flow-job'; export * from './ioredis-events'; export * from './job-json'; diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index 25ad335145..e446a8f822 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -18,7 +18,6 @@ export interface JobJson { parent?: ParentKeys; parentKey?: string; repeatJobKey?: string; - debounceId?: string; deduplicationId?: string; processedBy?: string; } diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 0055dad9a0..f81f92255c 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -1,16 +1,10 @@ -import { BaseJobOptions, DebounceOptions } from '../interfaces'; +import { BaseJobOptions, DeduplicationOptions } from '../interfaces'; export type JobsOptions = BaseJobOptions & { - /** - * Debounce options. - * @deprecated use deduplication option - */ - debounce?: DebounceOptions; - /** * Deduplication options. */ - deduplication?: DebounceOptions; + deduplication?: DeduplicationOptions; /** * Modes when a child fails: fail, ignore, remove, wait. @@ -24,7 +18,7 @@ export type JobsOptions = BaseJobOptions & { */ export type RedisJobOptions = BaseJobOptions & { /** - * Debounce identifier. + * Deduplication identifier. */ deid?: string; diff --git a/tests/test_events.ts b/tests/test_events.ts index a1fd791d99..6cff5ea349 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -405,213 +405,9 @@ describe('events', function () { }); }); - describe('when job is debounced when added again with same debounce id', function () { + describe('when job is deduplicated when added again with same deduplication id', function () { describe('when ttl is provided', function () { - it('used a fixed time period and emits debounced event', async function () { - const testName = 'test'; - - const job = await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - - let debouncedCounter = 0; - let secondJob; - queueEvents.on('debounced', ({ jobId, debounceId }) => { - if (debouncedCounter > 1) { - expect(jobId).to.be.equal(secondJob.id); - expect(debounceId).to.be.equal('a1'); - } else { - expect(jobId).to.be.equal(job.id); - expect(debounceId).to.be.equal('a1'); - } - debouncedCounter++; - }); - - await delay(1000); - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await delay(1100); - secondJob = await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await delay(100); - - expect(debouncedCounter).to.be.equal(4); - }); - - describe('when removing debounced job', function () { - it('removes debounce key', async function () { - const testName = 'test'; - - const job = await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - - let debouncedCounter = 0; - queueEvents.on('debounced', ({ jobId }) => { - debouncedCounter++; - }); - await job.remove(); - - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await delay(1000); - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await delay(1100); - const secondJob = await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await secondJob.remove(); - - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1', ttl: 2000 } }, - ); - await delay(100); - - expect(debouncedCounter).to.be.equal(2); - }); - }); - }); - - describe('when ttl is not provided', function () { - it('waits until job is finished before removing debounce key', async function () { - const testName = 'test'; - - const worker = new Worker( - queueName, - async () => { - await delay(100); - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1' } }, - ); - await delay(100); - await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1' } }, - ); - await delay(100); - }, - { - autorun: false, - connection, - prefix, - }, - ); - await worker.waitUntilReady(); - - let debouncedCounter = 0; - - const completing = new Promise(resolve => { - queueEvents.once('completed', ({ jobId }) => { - expect(jobId).to.be.equal('1'); - resolve(); - }); - - queueEvents.on('debounced', ({ jobId }) => { - debouncedCounter++; - }); - }); - - worker.run(); - - await queue.add(testName, { foo: 'bar' }, { debounce: { id: 'a1' } }); - - await completing; - - const secondJob = await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1' } }, - ); - - const count = await queue.getJobCountByTypes(); - - expect(count).to.be.eql(2); - - expect(debouncedCounter).to.be.equal(2); - expect(secondJob.id).to.be.equal('4'); - await worker.close(); - }); - - describe('when removing debounced job', function () { - it('removes debounce key', async function () { - const testName = 'test'; - - const job = await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1' } }, - ); - - let debouncedCounter = 0; - queueEvents.on('debounced', ({ jobId }) => { - debouncedCounter++; - }); - await job.remove(); - - await queue.add(testName, { foo: 'bar' }, { debounce: { id: 'a1' } }); - - await queue.add(testName, { foo: 'bar' }, { debounce: { id: 'a1' } }); - await delay(100); - const secondJob = await queue.add( - testName, - { foo: 'bar' }, - { debounce: { id: 'a1' } }, - ); - await secondJob.remove(); - - expect(debouncedCounter).to.be.equal(2); - }); - }); - }); - }); - - describe('when job is deduplicated when added again with same debounce id', function () { - describe('when ttl is provided', function () { - it('used a fixed time period and emits debounced event', async function () { + it('used a fixed time period and emits deduplicated event', async function () { const testName = 'test'; const job = await queue.add( @@ -718,7 +514,7 @@ describe('events', function () { }); describe('when ttl is not provided', function () { - it('waits until job is finished before removing debounce key', async function () { + it('waits until job is finished before removing deduplication key', async function () { const testName = 'test'; const worker = new Worker( @@ -761,7 +557,11 @@ describe('events', function () { worker.run(); - await queue.add(testName, { foo: 'bar' }, { debounce: { id: 'a1' } }); + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1' } }, + ); await completing; diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 84f56fe7bd..3669c00e54 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -254,9 +254,9 @@ describe('flows', () => { }).timeout(8000); }); - describe('when child is debounced when added again with same debounce id', function () { + describe('when child is deduplicated when added again with same deduplication id', function () { describe('when ttl is not provided', function () { - it('waits until job is finished before removing debounce key', async function () { + it('waits until job is finished before removing deduplication key', async function () { const parentQueueName = `parent-queue-${v4()}`; const flow = new FlowProducer({ connection, prefix }); @@ -268,10 +268,10 @@ describe('flows', () => { async job => { await delay(100); - const jobIdFromDebounceKey = await queue.getDebounceJobId( - 'debounce_id', + const jobIdFromDeduplicationKey = await queue.getDeduplicationJobId( + 'deduplication_id', ); - expect(jobIdFromDebounceKey).to.be.equal(job.id); + expect(jobIdFromDeduplicationKey).to.be.equal(job.id); await flow.add({ name: 'parent', @@ -283,8 +283,8 @@ describe('flows', () => { name: 'child0', data: {}, opts: { - debounce: { - id: 'debounce_id', + deduplication: { + id: 'deduplication_id', }, }, }, @@ -311,15 +311,15 @@ describe('flows', () => { name: 'child0', data: {}, opts: { - debounce: { - id: 'debounce_id', + deduplication: { + id: 'deduplication_id', }, }, }, ], }); - let debouncedCounter = 0; + let deduplicatedCounter = 0; const completing = new Promise(resolve => { queueEvents.once('completed', ({ jobId }) => { @@ -327,8 +327,8 @@ describe('flows', () => { resolve(); }); - queueEvents.on('debounced', ({ jobId }) => { - debouncedCounter++; + queueEvents.on('deduplicated', ({ jobId }) => { + deduplicatedCounter++; }); }); @@ -336,12 +336,12 @@ describe('flows', () => { await completing; - const jobIdFromDebounceKey = await queue.getDebounceJobId( - 'debounce_id', + const jobIdFromDeduplicationKey = await queue.getDeduplicationJobId( + 'deduplication_id', ); - expect(jobIdFromDebounceKey).to.be.null; + expect(jobIdFromDeduplicationKey).to.be.null; - expect(debouncedCounter).to.be.equal(1); + expect(deduplicatedCounter).to.be.equal(1); await worker.close(); await queueEvents.close(); diff --git a/tests/test_worker.ts b/tests/test_worker.ts index a7f7616855..4ed21e6d6d 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -505,9 +505,17 @@ describe('workers', function () { await worker.close(); }); - it('do not call moveToActive more than number of jobs + 1', async () => { + it('do not call moveToActive more than number of jobs + 2', async () => { const numJobs = 50; let completedJobs = 0; + + const jobs: Promise[] = []; + for (let i = 0; i < numJobs; i++) { + jobs.push(queue.add('test', { foo: 'bar' })); + } + + await Promise.all(jobs); + const worker = new Worker( queueName, async job => { @@ -516,7 +524,6 @@ describe('workers', function () { }, { connection, prefix, concurrency: 100 }, ); - await worker.waitUntilReady(); // Add spy to worker.moveToActive const spy = sinon.spy(worker, 'moveToActive'); @@ -524,12 +531,9 @@ describe('workers', function () { await (worker as any).blockingConnection.client, 'bzpopmin', ); + await worker.waitUntilReady(); - for (let i = 0; i < numJobs; i++) { - const job = await queue.add('test', { foo: 'bar' }); - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.eql('bar'); - } + expect(bclientSpy.callCount).to.be.equal(0); await new Promise((resolve, reject) => { worker.on('completed', (job: Job, result: any) => { @@ -540,9 +544,11 @@ describe('workers', function () { }); }); + expect(completedJobs).to.be.equal(numJobs); + expect(bclientSpy.callCount).to.be.equal(2); + // Check moveToActive was called numJobs + 2 times expect(spy.callCount).to.be.equal(numJobs + 2); - expect(bclientSpy.callCount).to.be.equal(3); await worker.close(); });