Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): add removeDependencyOnFail option #1100

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/gitbook/guide/workers/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Workers

Workers are the actual instances that perform some job based on the jobs that are added in the queue. A worker is equivalent to a "message" receiver in a traditional message queue. The worker duty is to complete the job, if it succeeds the job will be moved to the "completed" status. If the worker throws an exception during its processing, the job will automatically be moved to the "failed" status.
Workers are the actual instances that perform some job based on the jobs that are added in the queue. A worker is equivalent to a "message" receiver in a traditional message queue. The worker duty is to complete the job, if it succeeds, the job will be moved to the "completed" status. If the worker throws an exception during its processing, the job will automatically be moved to the "failed" status.

{% hint style="info" %}
Failed jobs can be automatically retried, see [Retrying failing jobs](../retrying-failing-jobs.md)
Expand All @@ -13,10 +13,10 @@ import { Worker, Job } from 'bullmq';

const worker = new Worker(queueName, async (job: Job) => {
// Optionally report some progress
job.updateProgress(42);
await job.updateProgress(42);

// Optionally sending an object as progress
job.updateProgress({ foo: 'bar' });
await job.updateProgress({ foo: 'bar' });

// Do something with job
return 'some value';
Expand Down
2 changes: 1 addition & 1 deletion src/classes/compat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import { Queue } from './queue';
import { Worker } from './worker';
import { QueueEvents } from './queue-events';
import {
JobsOptions,
QueueOptions,
RepeatOptions,
QueueEventsOptions,
QueueSchedulerOptions,
WorkerOptions,
Processor,
} from '../interfaces';
import { JobsOptions } from '../types';

type CommonOptions = QueueSchedulerOptions &
QueueOptions &
Expand Down
55 changes: 51 additions & 4 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ import {
BackoffOptions,
JobJson,
JobJsonRaw,
JobsOptions,
ParentKeys,
RedisClient,
WorkerOptions,
} from '../interfaces';
import { JobState, JobJsonSandbox } from '../types';
import {
JobsOptions,
JobState,
JobJsonSandbox,
RedisJobOptions,
} from '../types';
import {
errorObject,
isEmpty,
Expand All @@ -35,6 +39,7 @@ export interface MoveToChildrenOpts {
};
}

type ValueOf<T> = T[keyof T];
export interface DependenciesOpts {
processed?: {
cursor?: number;
Expand Down Expand Up @@ -233,7 +238,7 @@ export class Job<
jobId?: string,
): Job<T, R, N> {
const data = JSON.parse(json.data || '{}');
const opts = JSON.parse(json.opts || '{}');
const opts = Job.optsFromJSON(json.opts);

const job = new this<T, R, N>(
queue,
Expand Down Expand Up @@ -276,6 +281,28 @@ export class Job<
return job;
}

static optsFromJSON(rawOpts?: string): JobsOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this method should be private since it has no use outside of this class.

const opts = JSON.parse(rawOpts || '{}');

const optionEntries = Object.entries(opts) as Array<
[keyof RedisJobOptions, any]
>;
const options = optionEntries.reduce<Partial<Record<string, any>>>(
(acc, item) => {
const [attributeName, value] = item;
if (attributeName === 'rdof') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for now but I guess in the future we will have a map object for translating all the options from Redis representation to library representation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is the beginning of it

acc.removeDependencyOnFail = value;
} else {
acc[attributeName] = value;
}
return acc;
},
{},
);

return options as JobsOptions;
}

/**
* Fetches a Job from the queue given the passed job id.
*
Expand Down Expand Up @@ -311,7 +338,7 @@ export class Job<
id: this.id,
name: this.name,
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
opts: this.opts,
opts: this.optsAsJSON(this.opts),
progress: this.progress,
attemptsMade: this.attemptsMade,
finishedOn: this.finishedOn,
Expand All @@ -323,6 +350,26 @@ export class Job<
};
}

private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions {
const optionEntries = Object.entries(opts) as Array<
[keyof JobsOptions, any]
>;
const options = optionEntries.reduce<Partial<Record<string, any>>>(
(acc, item) => {
const [attributeName, value] = item;
if (attributeName === 'removeDependencyOnFail') {
acc.rdof = value;
} else {
acc[attributeName] = value;
}
return acc;
},
{},
);

return options as RedisJobOptions;
}

/**
* Prepares a job to be passed to Sandbox.
* @returns
Expand Down
3 changes: 2 additions & 1 deletion src/classes/queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { get } from 'lodash';
import { v4 } from 'uuid';
import { JobsOptions, QueueOptions, RepeatOptions } from '../interfaces';
import { QueueOptions, RepeatOptions } from '../interfaces';
import { JobsOptions } from '../types';
import { isRedisInstance, jobIdForGroup } from '../utils';
import { BulkJobOptions, Job } from './job';
import { QueueGetters } from './queue-getters';
Expand Down
3 changes: 2 additions & 1 deletion src/classes/repeat.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createHash } from 'crypto';
import { JobsOptions, RepeatOptions } from '../interfaces';
import { RepeatOptions } from '../interfaces';
import { JobsOptions } from '../types';
import { QueueBase } from './queue-base';
import { Job } from './job';
import { Scripts } from './scripts';
Expand Down
3 changes: 2 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import * as semver from 'semver';
import {
JobJson,
JobJsonRaw,
JobsOptions,
QueueSchedulerOptions,
RedisClient,
WorkerOptions,
KeepJobs,
} from '../interfaces';
import { JobsOptions } from '../types';
import { JobState, FinishedStatus, FinishedPropValAttribute } from '../types';
import { ErrorCode } from '../enums';
import { array2obj, getParentKey } from '../utils';
Expand Down Expand Up @@ -290,6 +290,7 @@ export class Scripts {
job.parentKey,
job.opts.attempts,
job.attemptsMade,
job.opts?.removeDependencyOnFail ? '1' : '0',
];

return keys.concat(args);
Expand Down
17 changes: 5 additions & 12 deletions src/commands/includes/updateParentDepsIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
--[[
Validate and move or add dependencies to parent.
Add processed results, validate and move parent to active if needed.
]]

-- Includes
--- @include "updateParentIfNeeded"

local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
parentId, jobIdKey, returnvalue )
local processedSet = parentKey .. ":processed"
rcall("HSET", processedSet, jobIdKey, returnvalue)
local activeParent = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and activeParent then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
if rcall("HEXISTS", parentQueueKey .. ":meta", "paused") ~= 1 then
rcall("RPUSH", parentQueueKey .. ":wait", parentId)
else
rcall("RPUSH", parentQueueKey .. ":paused", parentId)
end
local parentEventStream = parentQueueKey .. ":events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
end
updateParentIfNeeded(parentQueueKey, parentDependenciesKey, parentId )
end
17 changes: 17 additions & 0 deletions src/commands/includes/updateParentIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
--[[
Validate and move parent to active if needed.
]]

local function updateParentIfNeeded(parentQueueKey, parentDependenciesKey, parentId )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this function be renamed to "moveParentToWaitifNeeded" ? seems like that is what it does, "update" it is a big vague and makes it more difficult to understand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

local activeParent = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, shouldn't "activeParent" variable name be called instead something like: "isParentWaiting" ?

if rcall("SCARD", parentDependenciesKey) == 0 and activeParent then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
if rcall("HEXISTS", parentQueueKey .. ":meta", "paused") ~= 1 then
rcall("RPUSH", parentQueueKey .. ":wait", parentId)
else
rcall("RPUSH", parentQueueKey .. ":paused", parentId)
end
local parentEventStream = parentQueueKey .. ":events"
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
end
end
14 changes: 10 additions & 4 deletions src/commands/moveToFinished-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
ARGV[14] parentKey
ARGV[15] max attempts
ARGV[16] attemptsMade
ARGV[17] removeDependencyOnFail

Output:
0 OK
Expand All @@ -44,11 +45,12 @@
local rcall = redis.call

-- Includes
--- @include "includes/updateParentDepsIfNeeded"
--- @include "includes/updateParentIfNeeded"
--- @include "includes/destructureJobKey"
--- @include "includes/moveJobFromWaitToActive"
--- @include "includes/removeJob"
--- @include "includes/trimEvents"
--- @include "includes/updateParentDepsIfNeeded"

local jobIdKey = KEYS[3]
if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
Expand Down Expand Up @@ -91,13 +93,17 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
parentId = getJobIdFromKey(ARGV[14])
parentQueueKey = getJobKeyPrefix(ARGV[14], ":" .. parentId)
end
if parentId ~= "" and ARGV[5] == "completed" then
if parentId ~= "" and (ARGV[5] == "completed" or ARGV[17] == "1")then
local parentKey = parentQueueKey .. ":" .. parentId
local dependenciesSet = parentKey .. ":dependencies"
local result = rcall("SREM", dependenciesSet, jobIdKey)
if result == 1 then
updateParentDepsIfNeeded(parentKey, parentQueueKey, dependenciesSet,
parentId, jobIdKey, ARGV[4])
if ARGV[5] == "completed" then
updateParentDepsIfNeeded(parentKey, parentQueueKey, dependenciesSet,
parentId, jobIdKey, ARGV[4])
elseif ARGV[17] == "1" then
updateParentIfNeeded(parentQueueKey, dependenciesSet, parentId )
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/flow-job.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JobsOptions } from './jobs-options';
import { JobsOptions } from '../types';
import { QueueOptions } from './queue-options';

export interface FlowJob {
Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/job-json.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { JobsOptions } from './jobs-options';
import { RedisJobOptions } from '../types';

export interface JobJson {
id: string;
name: string;
data: string;
opts: JobsOptions;
opts: RedisJobOptions;
progress: number | object;
attemptsMade: number;
finishedOn?: number;
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/jobs-options.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { RepeatOptions, KeepJobs, BackoffOptions } from './';

export interface JobsOptions {
export interface JobOptionsBase {
/**
* Timestamp when the job was created. Defaults to `Date.now()`.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/parent.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JobsOptions } from './jobs-options';
import { JobsOptions } from '../types';

/**
* Describes the parent for a Job.
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/queue-options.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JobsOptions } from './jobs-options';
import { JobsOptions } from '../types';
import { ConnectionOptions } from './redis-options';

export enum ClientType {
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/sandboxed-job.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { JobsOptions } from '../types';
import { JobJson } from './job-json';
import { JobsOptions } from './jobs-options';

/**
* @see {@link https://docs.bullmq.io/guide/workers/sandboxed-processors}
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './finished-status';
export * from './job-json-sandbox';
export * from './job-options';
export * from './job-type';
15 changes: 15 additions & 0 deletions src/types/job-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { JobOptionsBase } from '../interfaces';

export type JobsOptions = JobOptionsBase & {
/**
* If true, removes the job from its parent dependencies when it fails after all attempts.
*/
removeDependencyOnFail?: boolean;
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment here clarifying that these fields are the ones stored in Redis and thus with smaller keys for compactness.

export type RedisJobOptions = JobOptionsBase & {
/**
* If true, removes the job from its parent dependencies when it fails after all attempts.
*/
rdof?: boolean;
};
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import { v4 } from 'uuid';
import { get } from 'lodash';
import {
RedisClient,
JobsOptions,
QueueOptions,
ChildMessage,
ParentMessage,
} from './interfaces';
import { JobsOptions } from './types';
import { ChildProcess } from 'child_process';

export const errorObject: { [index: string]: any } = { value: null };
Expand Down
Loading