Skip to content

Commit

Permalink
feat(job): add breakRelationship method
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Feb 20, 2024
1 parent 571b417 commit ebb1e93
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,25 @@ export class Job<
return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs);
}

/**
* Breaks parent-child relationship when child is not yet finished
*
* @returns True if the relationship existed and if it was removed.
*/
async breakRelationship(): Promise<boolean> {
const relationshipIsBroken = await this.scripts.breakRelationship(
this.id,
this.parentKey,
);
if (relationshipIsBroken) {
this.parent = undefined;
this.parentKey = undefined;
return true;
}

return false;
}

/**
* Clears job's logs
*
Expand Down
29 changes: 29 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,35 @@ export class Scripts {
return (<any>client).drain(args);
}

private breakRelationshipArgs(
jobId: string,
parentKey: string,
): (string | number)[] {
const queueKeys = this.queue.keys;

const keys: string[] = [queueKeys['']];

const args = [this.queue.toKey(jobId), parentKey];

return keys.concat(args);
}

async breakRelationship(jobId: string, parentKey: string): Promise<boolean> {
const client = await this.queue.client;
const args = this.breakRelationshipArgs(jobId, parentKey);

const result = await (<any>client).breakRelationship(args);

switch (result) {
case 0:
return true;
case 1:
return false;
default:
throw this.finishedErrors(result, jobId, 'breakRelationship');
}
}

private getRangesArgs(
types: JobType[],
start: number,
Expand Down
34 changes: 34 additions & 0 deletions src/commands/breakRelationship-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
--[[
Break parent-child dependency by removing
child reference from parent
Input:
KEYS[1] 'key' prefix,
ARGV[1] job key
ARGV[2] parent key
Output:
0 - OK
1 - There is not relationship.
-1 - Missing job key
-5 - Missing parent key
]]
local rcall = redis.call
local jobKey = ARGV[1]
local parentKey = ARGV[2]

-- Includes
--- @include "includes/removeParentDependencyKey"

if rcall("EXISTS", jobKey) ~= 1 then return -1 end

if rcall("EXISTS", parentKey) ~= 1 then return -5 end

if removeParentDependencyKey(jobKey, false, parentKey, KEYS[1]) then
rcall("HDEL", jobKey, "parentKey", "parent")

return 0
else
return 1
end
3 changes: 3 additions & 0 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
end
end
end
return true
end
else
local missedParentKey = rcall("HGET", jobKey, "parentKey")
Expand Down Expand Up @@ -74,7 +75,9 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
end
end
end
return true
end
end
end
return false
end
63 changes: 63 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,69 @@ describe('flows', () => {
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});

describe('when breakRelationship is called', () => {
describe('when last child call this method', () => {
it('moves parent to wait', async () => {
const flow = new FlowProducer({ connection, prefix });
const { job, children } = await flow.add({
name: 'parent',
data: {},
queueName,
children: [
{
queueName,
name: 'child0',
data: {},
opts: {
removeOnFail: true,
},
},
],
});

const relationshipIsBroken = await children![0].job.breakRelationship();

expect(relationshipIsBroken).to.be.true;
expect(children![0].job.parent).to.be.undefined;
expect(children![0].job.parentKey).to.be.undefined;

const parentState = await job.getState();

expect(parentState).to.be.equal('waiting');

const worker = new Worker(
queueName,
async () => {
await delay(100);
},
{ connection, prefix },
);
await worker.waitUntilReady();

const completed = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job) => {
try {
if (job.name === 'parent') {
const { unprocessed, processed } =
await job.getDependenciesCount();
expect(unprocessed).to.equal(0);
expect(processed).to.equal(0);
resolve();
}
} catch (err) {
reject(err);
}
});
});

await completed;

await flow.close();
await worker.close();
});
});
});

describe('when ignoreDependencyOnFailure is provided', async () => {
it('moves parent to wait after children fail', async () => {
const parentQueueName = `parent-queue-${v4()}`;
Expand Down

0 comments on commit ebb1e93

Please sign in to comment.