From 1151022e4825fbb20cf1ef6ce1ff3e7fe929de5c Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Mon, 26 Feb 2024 21:07:16 -0600 Subject: [PATCH] feat(job): add removeChildDependency method (#2435) --- docs/gitbook/SUMMARY.md | 1 + .../guide/flows/remove-child-dependency.md | 31 +++ src/classes/job.ts | 26 ++- src/classes/scripts.ts | 129 +++++++++-- .../includes/removeParentDependencyKey.lua | 3 + src/commands/removeChildDependency-1.lua | 34 +++ tests/test_flow.ts | 200 ++++++++++++++++++ 7 files changed, 401 insertions(+), 23 deletions(-) create mode 100644 docs/gitbook/guide/flows/remove-child-dependency.md create mode 100644 src/commands/removeChildDependency-1.lua diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 24933aed35..8cb60e381f 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -41,6 +41,7 @@ * [Fail Parent](guide/flows/fail-parent.md) * [Remove Dependency](guide/flows/remove-dependency.md) * [Ignore Dependency](guide/flows/ignore-dependency.md) + * [Remove Child Dependency](guide/flows/remove-child-dependency.md) * [Metrics](guide/metrics/metrics.md) * [Rate limiting](guide/rate-limiting.md) * [Retrying failing jobs](guide/retrying-failing-jobs.md) diff --git a/docs/gitbook/guide/flows/remove-child-dependency.md b/docs/gitbook/guide/flows/remove-child-dependency.md new file mode 100644 index 0000000000..d254b30965 --- /dev/null +++ b/docs/gitbook/guide/flows/remove-child-dependency.md @@ -0,0 +1,31 @@ +# Remove Child Dependency + +In some situations, you may have a parent job and need to remove the dependency of one of its children. + +The pattern to solve this requirement consists on using the **removeChildDependency** method. It will make sure that if the job is the last pending child, to move its parent to _waiting_ and it won't be listed in unprocessed list of the parent. + +```typescript +const flow = new FlowProducer({ connection }); + +const originalTree = await flow.add({ + name: 'root-job', + queueName: 'topQueueName', + data: {}, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName: 'childrenQueueName', + opts: {}, + }, + ], +}); + +await originalTree.children[0].job.removeChildDependency(); +``` + +{% hint style="waring" %} +As soon as a **child** calls this method, it will verify if it has an existing parent, if not, it'll throw an error. +{% endhint %} + +Failed or completed children using this option won't generate any removal as they won't be part of unprocessed list: diff --git a/src/classes/job.ts b/src/classes/job.ts index 48f1d22564..532f03c038 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -522,6 +522,25 @@ export class Job< return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs); } + /** + * Removes child dependency from parent when child is not yet finished + * + * @returns True if the relationship existed and if it was removed. + */ + async removeChildDependency(): Promise { + const childDependencyIsRemoved = await this.scripts.removeChildDependency( + this.id, + this.parentKey, + ); + if (childDependencyIsRemoved) { + this.parent = undefined; + this.parentKey = undefined; + return true; + } + + return false; + } + /** * Clears job's logs * @@ -704,7 +723,12 @@ export class Job< const code = results[results.length - 1][1] as number; if (code < 0) { - throw this.scripts.finishedErrors(code, this.id, command, 'active'); + throw this.scripts.finishedErrors({ + code, + jobId: this.id, + command, + state: 'active', + }); } if (finishedOn && typeof finishedOn === 'number') { diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 938bebeed0..5a34adebe9 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -216,7 +216,11 @@ export class Scripts { } if (result < 0) { - throw this.finishedErrors(result, parentOpts.parentKey, 'addJob'); + throw this.finishedErrors({ + code: result, + parentKey: parentOpts.parentKey, + command: 'addJob', + }); } return result; @@ -314,7 +318,11 @@ export class Scripts { const result = await (client).updateData(keys.concat([dataJson])); if (result < 0) { - throw this.finishedErrors(result, job.id, 'updateData'); + throw this.finishedErrors({ + code: result, + jobId: job.id, + command: 'updateData', + }); } } @@ -336,7 +344,11 @@ export class Scripts { ); if (result < 0) { - throw this.finishedErrors(result, jobId, 'updateProgress'); + throw this.finishedErrors({ + code: result, + jobId, + command: 'updateProgress', + }); } } @@ -414,7 +426,12 @@ export class Scripts { const result = await (client).moveToFinished(args); if (result < 0) { - throw this.finishedErrors(result, jobId, 'moveToFinished', 'active'); + throw this.finishedErrors({ + code: result, + jobId, + command: 'moveToFinished', + state: 'active', + }); } else { if (typeof result !== 'undefined') { return raw2NextJobData(result); @@ -422,12 +439,19 @@ export class Scripts { } } - finishedErrors( - code: number, - jobId: string, - command: string, - state?: string, - ): Error { + finishedErrors({ + code, + jobId, + parentKey, + command, + state, + }: { + code: number; + jobId?: string; + parentKey?: string; + command: string; + state?: string; + }): Error { switch (code) { case ErrorCode.JobNotExist: return new Error(`Missing key for job ${jobId}. ${command}`); @@ -440,14 +464,14 @@ export class Scripts { case ErrorCode.JobPendingDependencies: return new Error(`Job ${jobId} has pending dependencies. ${command}`); case ErrorCode.ParentJobNotExist: - return new Error(`Missing key for parent job ${jobId}. ${command}`); + return new Error(`Missing key for parent job ${parentKey}. ${command}`); case ErrorCode.JobLockMismatch: return new Error( `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, ); case ErrorCode.ParentJobCannotBeReplaced: return new Error( - `The parent job ${jobId} cannot be replaced. ${command}`, + `The parent job ${parentKey} cannot be replaced. ${command}`, ); default: return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); @@ -476,6 +500,43 @@ export class Scripts { return (client).drain(args); } + private removeChildDependencyArgs( + jobId: string, + parentKey: string, + ): (string | number)[] { + const queueKeys = this.queue.keys; + + const keys: string[] = [queueKeys['']]; + + const args = [this.queue.toKey(jobId), parentKey]; + + return keys.concat(args); + } + + async removeChildDependency( + jobId: string, + parentKey: string, + ): Promise { + const client = await this.queue.client; + const args = this.removeChildDependencyArgs(jobId, parentKey); + + const result = await (client).removeChildDependency(args); + + switch (result) { + case 0: + return true; + case 1: + return false; + default: + throw this.finishedErrors({ + code: result, + jobId, + parentKey, + command: 'removeChildDependency', + }); + } + } + private getRangesArgs( types: JobType[], start: number, @@ -609,7 +670,12 @@ export class Scripts { const args = this.changeDelayArgs(jobId, delay); const result = await (client).changeDelay(args); if (result < 0) { - throw this.finishedErrors(result, jobId, 'changeDelay', 'delayed'); + throw this.finishedErrors({ + code: result, + jobId, + command: 'changeDelay', + state: 'delayed', + }); } } @@ -652,7 +718,11 @@ export class Scripts { const args = this.changePriorityArgs(jobId, priority, lifo); const result = await (client).changePriority(args); if (result < 0) { - throw this.finishedErrors(result, jobId, 'changePriority'); + throw this.finishedErrors({ + code: result, + jobId, + command: 'changePriority', + }); } } @@ -766,7 +836,12 @@ export class Scripts { const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); const result = await (client).moveToDelayed(args); if (result < 0) { - throw this.finishedErrors(result, jobId, 'moveToDelayed', 'active'); + throw this.finishedErrors({ + code: result, + jobId, + command: 'moveToDelayed', + state: 'active', + }); } } @@ -797,12 +872,12 @@ export class Scripts { case 1: return false; default: - throw this.finishedErrors( - result, + throw this.finishedErrors({ + code: result, jobId, - 'moveToWaitingChildren', - 'active', - ); + command: 'moveToWaitingChildren', + state: 'active', + }); } } @@ -939,7 +1014,12 @@ export class Scripts { case 1: return; default: - throw this.finishedErrors(result, job.id, 'reprocessJob', state); + throw this.finishedErrors({ + code: result, + jobId: job.id, + command: 'reprocessJob', + state, + }); } } @@ -997,7 +1077,12 @@ export class Scripts { const code = await (client).promote(keys.concat(args)); if (code < 0) { - throw this.finishedErrors(code, jobId, 'promote', 'delayed'); + throw this.finishedErrors({ + code, + jobId, + command: 'promote', + state: 'delayed', + }); } } diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index f3a0cff11e..fb75120cb8 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -46,6 +46,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) end end end + return true end else local missedParentKey = rcall("HGET", jobKey, "parentKey") @@ -74,7 +75,9 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) end end end + return true end end end + return false end diff --git a/src/commands/removeChildDependency-1.lua b/src/commands/removeChildDependency-1.lua new file mode 100644 index 0000000000..6eddcb2e60 --- /dev/null +++ b/src/commands/removeChildDependency-1.lua @@ -0,0 +1,34 @@ +--[[ + Break parent-child dependency by removing + child reference from parent + + Input: + KEYS[1] 'key' prefix, + + ARGV[1] job key + ARGV[2] parent key + + Output: + 0 - OK + 1 - There is not relationship. + -1 - Missing job key + -5 - Missing parent key +]] +local rcall = redis.call +local jobKey = ARGV[1] +local parentKey = ARGV[2] + +-- Includes +--- @include "includes/removeParentDependencyKey" + +if rcall("EXISTS", jobKey) ~= 1 then return -1 end + +if rcall("EXISTS", parentKey) ~= 1 then return -5 end + +if removeParentDependencyKey(jobKey, false, parentKey, KEYS[1]) then + rcall("HDEL", jobKey, "parentKey", "parent") + + return 0 +else + return 1 +end \ No newline at end of file diff --git a/tests/test_flow.ts b/tests/test_flow.ts index b97ffb8e35..0da0b1e7f3 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -460,6 +460,206 @@ describe('flows', () => { await removeAllQueueData(new IORedis(redisHost), parentQueueName); }); + describe('when removeChildDependency is called', () => { + describe('when last child call this method', () => { + it('moves parent to wait', async () => { + const flow = new FlowProducer({ connection, prefix }); + const { job, children } = await flow.add({ + name: 'parent', + data: {}, + queueName, + children: [ + { + queueName, + name: 'child0', + data: {}, + opts: {}, + }, + ], + }); + + const relationshipIsBroken = + await children![0].job.removeChildDependency(); + + expect(relationshipIsBroken).to.be.true; + expect(children![0].job.parent).to.be.undefined; + expect(children![0].job.parentKey).to.be.undefined; + + const parentState = await job.getState(); + + expect(parentState).to.be.equal('waiting'); + + const worker = new Worker( + queueName, + async () => { + await delay(100); + }, + { connection, prefix }, + ); + await worker.waitUntilReady(); + + const completed = new Promise((resolve, reject) => { + worker.on('completed', async (job: Job) => { + try { + if (job.name === 'parent') { + const { unprocessed, processed } = + await job.getDependenciesCount(); + expect(unprocessed).to.equal(0); + expect(processed).to.equal(0); + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + await completed; + + await flow.close(); + await worker.close(); + }); + }); + + describe('when there are pending children when calling this method', () => { + it('keeps parent in waiting-children state', async () => { + const flow = new FlowProducer({ connection, prefix }); + const { job, children } = await flow.add({ + name: 'parent', + data: {}, + queueName, + children: [ + { + queueName, + name: 'child0', + data: {}, + opts: {}, + }, + { + queueName, + name: 'child1', + data: {}, + opts: {}, + }, + ], + }); + + const relationshipIsBroken = + await children![0].job.removeChildDependency(); + + expect(relationshipIsBroken).to.be.true; + expect(children![0].job.parent).to.be.undefined; + expect(children![0].job.parentKey).to.be.undefined; + + const parentState = await job.getState(); + + expect(parentState).to.be.equal('waiting-children'); + + const worker = new Worker( + queueName, + async () => { + await delay(100); + }, + { connection, prefix }, + ); + await worker.waitUntilReady(); + + const completed = new Promise((resolve, reject) => { + worker.on('completed', async (job: Job) => { + try { + if (job.name === 'parent') { + const { unprocessed, processed } = + await job.getDependenciesCount(); + expect(unprocessed).to.equal(0); + expect(processed).to.equal(1); + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + await completed; + + await flow.close(); + await worker.close(); + }); + }); + + describe('when parent does not exist', () => { + it('throws an error', async () => { + const flow = new FlowProducer({ connection, prefix }); + const { job, children } = await flow.add({ + name: 'parent', + data: {}, + queueName, + children: [ + { + queueName, + name: 'child0', + data: {}, + opts: {}, + }, + { + queueName, + name: 'child1', + data: {}, + opts: {}, + }, + ], + }); + + await job.remove({ removeChildren: false }); + + await expect( + children![0].job.removeChildDependency(), + ).to.be.rejectedWith( + `Missing key for parent job ${ + children![0].job.parentKey + }. removeChildDependency`, + ); + + await flow.close(); + }); + }); + + describe('when child does not exist', () => { + it('throws an error', async () => { + const flow = new FlowProducer({ connection, prefix }); + const { children } = await flow.add({ + name: 'parent', + data: {}, + queueName, + children: [ + { + queueName, + name: 'child0', + data: {}, + opts: {}, + }, + { + queueName, + name: 'child1', + data: {}, + opts: {}, + }, + ], + }); + + await children![0].job.remove(); + + await expect( + children![0].job.removeChildDependency(), + ).to.be.rejectedWith( + `Missing key for job ${children![0].job.id}. removeChildDependency`, + ); + + await flow.close(); + }); + }); + }); + describe('when ignoreDependencyOnFailure is provided', async () => { it('moves parent to wait after children fail', async () => { const parentQueueName = `parent-queue-${v4()}`;