Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): add ignoreDependencyOnFailure option #2426

Merged
merged 3 commits into from
Feb 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [Get Flow Tree](guide/flows/get-flow-tree.md)
* [Fail Parent](guide/flows/fail-parent.md)
* [Remove Dependency](guide/flows/remove-dependency.md)
* [Ignore Dependency](guide/flows/ignore-dependency.md)
* [Metrics](guide/metrics/metrics.md)
* [Rate limiting](guide/rate-limiting.md)
* [Retrying failing jobs](guide/retrying-failing-jobs.md)
Expand Down
54 changes: 54 additions & 0 deletions docs/gitbook/guide/flows/ignore-dependency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Ignore Dependency

In some situations, you may have a parent job and need to ignore when one of its children fail.

The pattern to solve this requirement consists on using the **ignoreDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is ignored from the parent, so the parent will complete without waiting for the failed children.

```typescript
const flow = new FlowProducer({ connection });

const originalTree = await flow.add({
name: 'root-job',
queueName: 'topQueueName',
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: { ignoreDependencyOnFailure: true },
children: [
{
name,
data: { idx: 1, foo: 'bah' },
queueName: 'grandChildrenQueueName',
},
{
name,
data: { idx: 2, foo: 'baz' },
queueName: 'grandChildrenQueueName',
},
],
},
{
name,
data: { idx: 3, foo: 'foo' },
queueName: 'childrenQueueName',
},
],
});
```

{% hint style="info" %}
As soon as a **child** with this option fails, the parent job will be moved to a waiting state only if there are no more pending children.
{% endhint %}

Failed children using this option can be retrieved by **getFailedChildrenValues** method:

```typescript
const failedChildrenValues = await originalTree.job.getFailedChildrenValues();
```

## Read more:

- 💡 [Add Flow API Reference](https://api.docs.bullmq.io/classes/v5.FlowProducer.html#add)
2 changes: 1 addition & 1 deletion docs/gitbook/guide/flows/remove-dependency.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Remove Dependency

In some situations, you may have a parent job and need to ignore when one of its children fail.
In some situations, you may have a parent job and need to remove the relationship when one of its children fail.

The pattern to solve this requirement consists on using the **removeDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is removed from the parent, so the parent will complete without waiting for the failed children.

Expand Down
12 changes: 12 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const logger = debuglog('bull');

const optsDecodeMap = {
fpof: 'failParentOnFailure',
idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
rdof: 'removeDependencyOnFailure',
};
Expand Down Expand Up @@ -815,6 +816,17 @@ export class Job<
}
}

/**
* Get this jobs children failure values if any.
*
* @returns Object mapping children job keys with their failure values.
*/
async getFailedChildrenValues(): Promise<{ [jobKey: string]: string }> {
const client = await this.queue.client;

return client.hgetall(this.toKey(`${this.id}:failed`));
}

/**
* Get children job keys if this job is a parent and has children.
* @remarks
Expand Down
1 change: 1 addition & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ export class Scripts {
? opts.metrics?.maxDataPoints
: '',
fpof: !!job.opts?.failParentOnFailure,
idof: !!job.opts?.ignoreDependencyOnFailure,
rdof: !!job.opts?.removeDependencyOnFailure,
}),
];
Expand Down
4 changes: 2 additions & 2 deletions src/commands/includes/removeJob.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
]]

-- Includes
--- @include "removeJobKeys"
--- @include "removeParentDependencyKey"

local function removeJob(jobId, hard, baseKey)
local jobKey = baseKey .. jobId
removeParentDependencyKey(jobKey, hard, nil, baseKey)
rcall("DEL", jobKey, jobKey .. ':logs',
jobKey .. ':dependencies', jobKey .. ':processed')
removeJobKeys(jobKey)
end
8 changes: 8 additions & 0 deletions src/commands/includes/removeJobKeys.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
--[[
Function to remove job keys.
]]

local function removeJobKeys(jobKey)
return rcall("DEL", jobKey, jobKey .. ':logs',
jobKey .. ':dependencies', jobKey .. ':processed', jobKey .. ':failed')
end
7 changes: 3 additions & 4 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
--- @include "addJobInTargetList"
--- @include "destructureJobKey"
--- @include "getTargetQueueList"
--- @include "removeJobKeys"

local function moveParentToWait(parentPrefix, parentId, emitEvent)
local parentTarget, isPaused = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait",
Expand Down Expand Up @@ -36,8 +37,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
if hard then -- remove parent in same queue
if parentPrefix == baseKey then
removeParentDependencyKey(parentKey, hard, nil, baseKey)
rcall("DEL", parentKey, parentKey .. ':logs',
parentKey .. ':dependencies', parentKey .. ':processed')
removeJobKeys(parentKey)
else
moveParentToWait(parentPrefix, parentId)
end
Expand Down Expand Up @@ -65,8 +65,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
if hard then
if parentPrefix == baseKey then
removeParentDependencyKey(missedParentKey, hard, nil, baseKey)
rcall("DEL", missedParentKey, missedParentKey .. ':logs',
missedParentKey .. ':dependencies', missedParentKey .. ':processed')
removeJobKeys(missedParentKey)
else
moveParentToWait(parentPrefix, parentId)
end
Expand Down
7 changes: 6 additions & 1 deletion src/commands/moveToFinished-14.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
opts - attempts max attempts
opts - maxMetricsSize
opts - fpof - fail parent on fail
opts - idof - ignore dependency on fail
opts - rdof - remove dependency on fail

Output:
Expand Down Expand Up @@ -147,11 +148,15 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey,
parentId, jobIdKey,
timestamp)
elseif opts['rdof'] then
elseif opts['idof'] or opts['rdof'] then
local dependenciesSet = parentKey .. ":dependencies"
if rcall("SREM", dependenciesSet, jobIdKey) == 1 then
moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet,
parentKey, parentId, timestamp)
if opts['idof'] then
local failedSet = parentKey .. ":failed"
rcall("HSET", failedSet, jobIdKey, ARGV[4])
end
end
end
end
Expand Down
13 changes: 12 additions & 1 deletion src/commands/removeJob-1.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ local rcall = redis.call
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isLocked"
--- @include "includes/removeJobFromAnyState"
--- @include "includes/removeJobKeys"
--- @include "includes/removeParentDependencyKey"

local function removeJob( prefix, jobId, parentKey, removeChildren)
Expand Down Expand Up @@ -50,11 +51,21 @@ local function removeJob( prefix, jobId, parentKey, removeChildren)
removeJob( childJobPrefix, childJobId, jobKey, removeChildren )
end
end

local failed = rcall("HGETALL", jobKey .. ":failed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call could potentially require a lot of memory and resources if the number of failed jobs is very large. A solution would be to use "https://redis.io/commands/hlen/" first, and if the len is larger than a specific size, maybe 100 or so, start a iteration process. Obviously this is going to be complicated in general as the iteration would need to be done by calling "removeJob" iteratively... Maybe it is not critical to get this done in this PR, but we should mark it in an issue as a known issue/future enhancement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I referenced one issue to remember this improvement


if (#failed > 0) then
for i = 1, #failed, 2 do
local childJobId = getJobIdFromKey(failed[i])
local childJobPrefix = getJobKeyPrefix(failed[i], childJobId)
removeJob( childJobPrefix, childJobId, jobKey, removeChildren )
end
end
end

local prev = removeJobFromAnyState(prefix, jobId)

if rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") > 0 then
if removeJobKeys(jobKey) > 0 then
local maxEvents = getOrSetMaxEvents(prefix .. "meta")
rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
"jobId", jobId, "prev", prev)
Expand Down
10 changes: 10 additions & 0 deletions src/types/job-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ export type JobsOptions = BaseJobOptions & {
*/
failParentOnFailure?: boolean;

/**
* If true, moves the jobId from its parent dependencies to failed dependencies when it fails after all attempts.
*/
ignoreDependencyOnFailure?: boolean;

/**
* If true, removes the job from its parent dependencies when it fails after all attempts.
*/
Expand All @@ -21,6 +26,11 @@ export type RedisJobOptions = BaseJobOptions & {
*/
fpof?: boolean;

/**
* If true, moves the jobId from its parent dependencies to failed dependencies when it fails after all attempts.
*/
idof?: boolean;

/**
* Maximum amount of log entries that will be preserved
*/
Expand Down
105 changes: 105 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,111 @@ describe('flows', () => {
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});

describe('when ignoreDependencyOnFailure is provided', async () => {
it('moves parent to wait after children fail', async () => {
const parentQueueName = `parent-queue-${v4()}`;
const parentQueue = new Queue(parentQueueName, { connection, prefix });
const name = 'child-job';

const parentProcessor = async (job: Job) => {
const values = await job.getDependencies({
processed: {},
});
expect(values).to.deep.equal({
processed: {},
nextProcessedCursor: 0,
});
};

const parentWorker = new Worker(parentQueueName, parentProcessor, {
connection,
prefix,
});
const childrenWorker = new Worker(
queueName,
async () => {
await delay(10);
throw new Error('error');
},
{
connection,
prefix,
},
);
await parentWorker.waitUntilReady();
await childrenWorker.waitUntilReady();

const completed = new Promise<void>(resolve => {
parentWorker.on('completed', async (job: Job) => {
expect(job.finishedOn).to.be.string;
const counts = await parentQueue.getJobCounts('completed');
expect(counts.completed).to.be.equal(1);
resolve();
});
});

const flow = new FlowProducer({ connection, prefix });
const tree = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName,
opts: { ignoreDependencyOnFailure: true },
},
{
name,
data: { idx: 1, foo: 'baz' },
queueName,
opts: { ignoreDependencyOnFailure: true },
},
{
name,
data: { idx: 2, foo: 'qux' },
queueName,
opts: { ignoreDependencyOnFailure: true },
},
],
});

expect(tree).to.have.property('job');
expect(tree).to.have.property('children');

const { children, job } = tree;
const parentState = await job.getState();

expect(parentState).to.be.eql('waiting-children');
expect(children).to.have.length(3);

expect(children[0].job.id).to.be.ok;
expect(children[0].job.data.foo).to.be.eql('bar');
expect(children[1].job.id).to.be.ok;
expect(children[1].job.data.foo).to.be.eql('baz');
expect(children[2].job.id).to.be.ok;
expect(children[2].job.data.foo).to.be.eql('qux');

await completed;

const failedChildrenValues = await job.getFailedChildrenValues();

expect(failedChildrenValues).to.deep.equal({
[`${queue.qualifiedName}:${children[0].job.id}`]: 'error',
[`${queue.qualifiedName}:${children[1].job.id}`]: 'error',
[`${queue.qualifiedName}:${children[2].job.id}`]: 'error',
});

await childrenWorker.close();
await parentWorker.close();
await flow.close();
await parentQueue.close();

await removeAllQueueData(new IORedis(redisHost), parentQueueName);
}).timeout(8000);
});

describe('when removeDependencyOnFailure is provided', async () => {
it('moves parent to wait after children fail', async () => {
const parentQueueName = `parent-queue-${v4()}`;
Expand Down
Loading