diff --git a/src/classes/job.ts b/src/classes/job.ts index b10bf70084..9da2462d52 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -523,6 +523,25 @@ export class Job< return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs); } + /** + * Breaks parent-child relationship when child is not yet finished + * + * @returns True if the relationship existed and if it was removed. + */ + async breakRelationship(): Promise { + const relationshipIsBroken = await this.scripts.breakRelationship( + this.id, + this.parentKey, + ); + if (relationshipIsBroken) { + this.parent = undefined; + this.parentKey = undefined; + return true; + } + + return false; + } + /** * Clears job's logs * diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index bd23d2e46a..00410c18d1 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -476,6 +476,35 @@ export class Scripts { return (client).drain(args); } + private breakRelationshipArgs( + jobId: string, + parentKey: string, + ): (string | number)[] { + const queueKeys = this.queue.keys; + + const keys: string[] = [queueKeys['']]; + + const args = [this.queue.toKey(jobId), parentKey]; + + return keys.concat(args); + } + + async breakRelationship(jobId: string, parentKey: string): Promise { + const client = await this.queue.client; + const args = this.breakRelationshipArgs(jobId, parentKey); + + const result = await (client).breakRelationship(args); + + switch (result) { + case 0: + return true; + case 1: + return false; + default: + throw this.finishedErrors(result, jobId, 'breakRelationship'); + } + } + private getRangesArgs( types: JobType[], start: number, diff --git a/src/commands/breakRelationship-1.lua b/src/commands/breakRelationship-1.lua new file mode 100644 index 0000000000..6eddcb2e60 --- /dev/null +++ b/src/commands/breakRelationship-1.lua @@ -0,0 +1,34 @@ +--[[ + Break parent-child dependency by removing + child reference from parent + + Input: + KEYS[1] 'key' prefix, + + ARGV[1] job key + ARGV[2] parent key + + Output: + 0 - OK + 1 - There is not relationship. + -1 - Missing job key + -5 - Missing parent key +]] +local rcall = redis.call +local jobKey = ARGV[1] +local parentKey = ARGV[2] + +-- Includes +--- @include "includes/removeParentDependencyKey" + +if rcall("EXISTS", jobKey) ~= 1 then return -1 end + +if rcall("EXISTS", parentKey) ~= 1 then return -5 end + +if removeParentDependencyKey(jobKey, false, parentKey, KEYS[1]) then + rcall("HDEL", jobKey, "parentKey", "parent") + + return 0 +else + return 1 +end \ No newline at end of file diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index f3a0cff11e..fb75120cb8 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -46,6 +46,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) end end end + return true end else local missedParentKey = rcall("HGET", jobKey, "parentKey") @@ -74,7 +75,9 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey) end end end + return true end end end + return false end diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 9020f92176..c9871689ee 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -460,6 +460,69 @@ describe('flows', () => { await removeAllQueueData(new IORedis(redisHost), parentQueueName); }); + describe('when breakRelationship is called', () => { + describe('when last child call this method', () => { + it('moves parent to wait', async () => { + const flow = new FlowProducer({ connection, prefix }); + const { job, children } = await flow.add({ + name: 'parent', + data: {}, + queueName, + children: [ + { + queueName, + name: 'child0', + data: {}, + opts: { + removeOnFail: true, + }, + }, + ], + }); + + const relationshipIsBroken = await children![0].job.breakRelationship(); + + expect(relationshipIsBroken).to.be.true; + expect(children![0].job.parent).to.be.undefined; + expect(children![0].job.parentKey).to.be.undefined; + + const parentState = await job.getState(); + + expect(parentState).to.be.equal('waiting'); + + const worker = new Worker( + queueName, + async () => { + await delay(100); + }, + { connection, prefix }, + ); + await worker.waitUntilReady(); + + const completed = new Promise((resolve, reject) => { + worker.on('completed', async (job: Job) => { + try { + if (job.name === 'parent') { + const { unprocessed, processed } = + await job.getDependenciesCount(); + expect(unprocessed).to.equal(0); + expect(processed).to.equal(0); + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + await completed; + + await flow.close(); + await worker.close(); + }); + }); + }); + describe('when ignoreDependencyOnFailure is provided', async () => { it('moves parent to wait after children fail', async () => { const parentQueueName = `parent-queue-${v4()}`;