diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index d23432190f..38172d4744 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -40,6 +40,7 @@ * [Get Flow Tree](guide/flows/get-flow-tree.md) * [Fail Parent](guide/flows/fail-parent.md) * [Remove Dependency](guide/flows/remove-dependency.md) + * [Ignore Dependency](guide/flows/ignore-dependency.md) * [Metrics](guide/metrics/metrics.md) * [Rate limiting](guide/rate-limiting.md) * [Retrying failing jobs](guide/retrying-failing-jobs.md) diff --git a/docs/gitbook/guide/flows/ignore-dependency.md b/docs/gitbook/guide/flows/ignore-dependency.md new file mode 100644 index 0000000000..8dcd628f18 --- /dev/null +++ b/docs/gitbook/guide/flows/ignore-dependency.md @@ -0,0 +1,54 @@ +# Ignore Dependency + +In some situations, you may have a parent job and need to ignore when one of its children fail. + +The pattern to solve this requirement consists on using the **ignoreDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is ignored from the parent, so the parent will complete without waiting for the failed children. + +```typescript +const flow = new FlowProducer({ connection }); + +const originalTree = await flow.add({ + name: 'root-job', + queueName: 'topQueueName', + data: {}, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName: 'childrenQueueName', + opts: { ignoreDependencyOnFailure: true }, + children: [ + { + name, + data: { idx: 1, foo: 'bah' }, + queueName: 'grandChildrenQueueName', + }, + { + name, + data: { idx: 2, foo: 'baz' }, + queueName: 'grandChildrenQueueName', + }, + ], + }, + { + name, + data: { idx: 3, foo: 'foo' }, + queueName: 'childrenQueueName', + }, + ], +}); +``` + +{% hint style="info" %} +As soon as a **child** with this option fails, the parent job will be moved to a waiting state only if there are no more pending children. +{% endhint %} + +Failed children using this option can be retrieved by **getFailedChildrenValues** method: + +```typescript +const failedChildrenValues = await originalTree.job.getFailedChildrenValues(); +``` + +## Read more: + +- 💡 [Add Flow API Reference](https://api.docs.bullmq.io/classes/v5.FlowProducer.html#add) diff --git a/docs/gitbook/guide/flows/remove-dependency.md b/docs/gitbook/guide/flows/remove-dependency.md index 223c09f4c7..ced85f147e 100644 --- a/docs/gitbook/guide/flows/remove-dependency.md +++ b/docs/gitbook/guide/flows/remove-dependency.md @@ -1,6 +1,6 @@ # Remove Dependency -In some situations, you may have a parent job and need to ignore when one of its children fail. +In some situations, you may have a parent job and need to remove the relationship when one of its children fail. The pattern to solve this requirement consists on using the **removeDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is removed from the parent, so the parent will complete without waiting for the failed children. diff --git a/src/classes/job.ts b/src/classes/job.ts index 9797367315..806508c34c 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -40,6 +40,7 @@ const logger = debuglog('bull'); const optsDecodeMap = { fpof: 'failParentOnFailure', + idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', rdof: 'removeDependencyOnFailure', }; @@ -815,6 +816,17 @@ export class Job< } } + /** + * Get this jobs children failure values if any. + * + * @returns Object mapping children job keys with their failure values. + */ + async getFailedChildrenValues(): Promise<{ [jobKey: string]: string }> { + const client = await this.queue.client; + + return client.hgetall(this.toKey(`${this.id}:failed`)); + } + /** * Get children job keys if this job is a parent and has children. * @remarks diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 392ad8081d..6f65edd48c 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -383,6 +383,7 @@ export class Scripts { ? opts.metrics?.maxDataPoints : '', fpof: !!job.opts?.failParentOnFailure, + idof: !!job.opts?.ignoreDependencyOnFailure, rdof: !!job.opts?.removeDependencyOnFailure, }), ]; diff --git a/src/commands/includes/removeJob.lua b/src/commands/includes/removeJob.lua index a24812d4d4..c9c02a182c 100644 --- a/src/commands/includes/removeJob.lua +++ b/src/commands/includes/removeJob.lua @@ -3,11 +3,11 @@ ]] -- Includes +--- @include "removeJobKeys" --- @include "removeParentDependencyKey" local function removeJob(jobId, hard, baseKey) local jobKey = baseKey .. jobId removeParentDependencyKey(jobKey, hard, nil, baseKey) - rcall("DEL", jobKey, jobKey .. ':logs', - jobKey .. ':dependencies', jobKey .. ':processed') + removeJobKeys(jobKey) end diff --git a/src/commands/includes/removeJobKeys.lua b/src/commands/includes/removeJobKeys.lua new file mode 100644 index 0000000000..aa98aee139 --- /dev/null +++ b/src/commands/includes/removeJobKeys.lua @@ -0,0 +1,8 @@ +--[[ + Function to remove job keys. +]] + +local function removeJobKeys(jobKey) + return rcall("DEL", jobKey, jobKey .. ':logs', + jobKey .. ':dependencies', jobKey .. ':processed', jobKey .. ':failed') +end diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index c61a93ebf3..f3a0cff11e 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -8,6 +8,7 @@ --- @include "addJobInTargetList" --- @include "destructureJobKey" --- @include "getTargetQueueList" +--- @include "removeJobKeys" local function moveParentToWait(parentPrefix, parentId, emitEvent) local parentTarget, isPaused = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait", @@ -36,8 +37,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) if hard then -- remove parent in same queue if parentPrefix == baseKey then removeParentDependencyKey(parentKey, hard, nil, baseKey) - rcall("DEL", parentKey, parentKey .. ':logs', - parentKey .. ':dependencies', parentKey .. ':processed') + removeJobKeys(parentKey) else moveParentToWait(parentPrefix, parentId) end @@ -65,8 +65,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) if hard then if parentPrefix == baseKey then removeParentDependencyKey(missedParentKey, hard, nil, baseKey) - rcall("DEL", missedParentKey, missedParentKey .. ':logs', - missedParentKey .. ':dependencies', missedParentKey .. ':processed') + removeJobKeys(missedParentKey) else moveParentToWait(parentPrefix, parentId) end diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-14.lua index e0d49585b1..d73e91fe64 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-14.lua @@ -39,6 +39,7 @@ opts - attempts max attempts opts - maxMetricsSize opts - fpof - fail parent on fail + opts - idof - ignore dependency on fail opts - rdof - remove dependency on fail Output: @@ -147,11 +148,15 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey, parentId, jobIdKey, timestamp) - elseif opts['rdof'] then + elseif opts['idof'] or opts['rdof'] then local dependenciesSet = parentKey .. ":dependencies" if rcall("SREM", dependenciesSet, jobIdKey) == 1 then moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet, parentKey, parentId, timestamp) + if opts['idof'] then + local failedSet = parentKey .. ":failed" + rcall("HSET", failedSet, jobIdKey, ARGV[4]) + end end end end diff --git a/src/commands/removeJob-1.lua b/src/commands/removeJob-1.lua index cb0bd3d013..5ce1c01c1e 100644 --- a/src/commands/removeJob-1.lua +++ b/src/commands/removeJob-1.lua @@ -19,6 +19,7 @@ local rcall = redis.call --- @include "includes/getOrSetMaxEvents" --- @include "includes/isLocked" --- @include "includes/removeJobFromAnyState" +--- @include "includes/removeJobKeys" --- @include "includes/removeParentDependencyKey" local function removeJob( prefix, jobId, parentKey, removeChildren) @@ -50,11 +51,21 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) removeJob( childJobPrefix, childJobId, jobKey, removeChildren ) end end + + local failed = rcall("HGETALL", jobKey .. ":failed") + + if (#failed > 0) then + for i = 1, #failed, 2 do + local childJobId = getJobIdFromKey(failed[i]) + local childJobPrefix = getJobKeyPrefix(failed[i], childJobId) + removeJob( childJobPrefix, childJobId, jobKey, removeChildren ) + end + end end local prev = removeJobFromAnyState(prefix, jobId) - if rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") > 0 then + if removeJobKeys(jobKey) > 0 then local maxEvents = getOrSetMaxEvents(prefix .. "meta") rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed", "jobId", jobId, "prev", prev) diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 9a88d7e604..51d5416312 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -6,6 +6,11 @@ export type JobsOptions = BaseJobOptions & { */ failParentOnFailure?: boolean; + /** + * If true, moves the jobId from its parent dependencies to failed dependencies when it fails after all attempts. + */ + ignoreDependencyOnFailure?: boolean; + /** * If true, removes the job from its parent dependencies when it fails after all attempts. */ @@ -21,6 +26,11 @@ export type RedisJobOptions = BaseJobOptions & { */ fpof?: boolean; + /** + * If true, moves the jobId from its parent dependencies to failed dependencies when it fails after all attempts. + */ + idof?: boolean; + /** * Maximum amount of log entries that will be preserved */ diff --git a/tests/test_flow.ts b/tests/test_flow.ts index ca52a4b28e..9020f92176 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -460,6 +460,111 @@ describe('flows', () => { await removeAllQueueData(new IORedis(redisHost), parentQueueName); }); + describe('when ignoreDependencyOnFailure is provided', async () => { + it('moves parent to wait after children fail', async () => { + const parentQueueName = `parent-queue-${v4()}`; + const parentQueue = new Queue(parentQueueName, { connection, prefix }); + const name = 'child-job'; + + const parentProcessor = async (job: Job) => { + const values = await job.getDependencies({ + processed: {}, + }); + expect(values).to.deep.equal({ + processed: {}, + nextProcessedCursor: 0, + }); + }; + + const parentWorker = new Worker(parentQueueName, parentProcessor, { + connection, + prefix, + }); + const childrenWorker = new Worker( + queueName, + async () => { + await delay(10); + throw new Error('error'); + }, + { + connection, + prefix, + }, + ); + await parentWorker.waitUntilReady(); + await childrenWorker.waitUntilReady(); + + const completed = new Promise(resolve => { + parentWorker.on('completed', async (job: Job) => { + expect(job.finishedOn).to.be.string; + const counts = await parentQueue.getJobCounts('completed'); + expect(counts.completed).to.be.equal(1); + resolve(); + }); + }); + + const flow = new FlowProducer({ connection, prefix }); + const tree = await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + data: {}, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName, + opts: { ignoreDependencyOnFailure: true }, + }, + { + name, + data: { idx: 1, foo: 'baz' }, + queueName, + opts: { ignoreDependencyOnFailure: true }, + }, + { + name, + data: { idx: 2, foo: 'qux' }, + queueName, + opts: { ignoreDependencyOnFailure: true }, + }, + ], + }); + + expect(tree).to.have.property('job'); + expect(tree).to.have.property('children'); + + const { children, job } = tree; + const parentState = await job.getState(); + + expect(parentState).to.be.eql('waiting-children'); + expect(children).to.have.length(3); + + expect(children[0].job.id).to.be.ok; + expect(children[0].job.data.foo).to.be.eql('bar'); + expect(children[1].job.id).to.be.ok; + expect(children[1].job.data.foo).to.be.eql('baz'); + expect(children[2].job.id).to.be.ok; + expect(children[2].job.data.foo).to.be.eql('qux'); + + await completed; + + const failedChildrenValues = await job.getFailedChildrenValues(); + + expect(failedChildrenValues).to.deep.equal({ + [`${queue.qualifiedName}:${children[0].job.id}`]: 'error', + [`${queue.qualifiedName}:${children[1].job.id}`]: 'error', + [`${queue.qualifiedName}:${children[2].job.id}`]: 'error', + }); + + await childrenWorker.close(); + await parentWorker.close(); + await flow.close(); + await parentQueue.close(); + + await removeAllQueueData(new IORedis(redisHost), parentQueueName); + }).timeout(8000); + }); + describe('when removeDependencyOnFailure is provided', async () => { it('moves parent to wait after children fail', async () => { const parentQueueName = `parent-queue-${v4()}`;