Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): add removeDependencyOnFail option #1100

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/classes/compat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import { Queue } from './queue';
import { Worker } from './worker';
import { QueueEvents } from './queue-events';
import {
JobsOptions,
Processor,
QueueOptions,
QueueEventsOptions,
QueueSchedulerOptions,
RepeatOptions,
WorkerOptions,
} from '../interfaces';
import { JobsOptions } from '../types';

type CommonOptions = QueueSchedulerOptions &
QueueOptions &
Expand Down
55 changes: 51 additions & 4 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ import {
BackoffOptions,
JobJson,
JobJsonRaw,
JobsOptions,
ParentKeys,
RedisClient,
WorkerOptions,
} from '../interfaces';
import { FinishedStatus, JobState, JobJsonSandbox } from '../types';
import {
FinishedStatus,
JobsOptions,
JobState,
JobJsonSandbox,
RedisJobOptions,
} from '../types';
import {
errorObject,
isEmpty,
Expand Down Expand Up @@ -258,7 +263,7 @@ export class Job<
jobId?: string,
): Job<T, R, N> {
const data = JSON.parse(json.data || '{}');
const opts = JSON.parse(json.opts || '{}');
const opts = Job.optsFromJSON(json.opts);

const job = new this<T, R, N>(
queue,
Expand Down Expand Up @@ -305,6 +310,28 @@ export class Job<
return job;
}

private static optsFromJSON(rawOpts?: string): JobsOptions {
const opts = JSON.parse(rawOpts || '{}');

const optionEntries = Object.entries(opts) as Array<
[keyof RedisJobOptions, any]
>;
const options = optionEntries.reduce<Partial<Record<string, any>>>(
(acc, item) => {
const [attributeName, value] = item;
if (attributeName === 'rdof') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for now but I guess in the future we will have a map object for translating all the options from Redis representation to library representation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is the beginning of it

acc.removeDependencyOnFail = value;
} else {
acc[attributeName] = value;
}
return acc;
},
{},
);

return options as JobsOptions;
}

/**
* Fetches a Job from the queue given the passed job id.
*
Expand Down Expand Up @@ -344,7 +371,7 @@ export class Job<
id: this.id,
name: this.name,
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
opts: this.opts,
opts: this.optsAsJSON(this.opts),
progress: this.progress,
attemptsMade: this.attemptsMade,
finishedOn: this.finishedOn,
Expand All @@ -357,6 +384,26 @@ export class Job<
};
}

private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions {
const optionEntries = Object.entries(opts) as Array<
[keyof JobsOptions, any]
>;
const options = optionEntries.reduce<Partial<Record<string, any>>>(
(acc, item) => {
const [attributeName, value] = item;
if (attributeName === 'removeDependencyOnFail') {
acc.rdof = value;
} else {
acc[attributeName] = value;
}
return acc;
},
{},
);

return options as RedisJobOptions;
}

/**
* Prepares a job to be passed to Sandbox.
* @returns
Expand Down
3 changes: 1 addition & 2 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ import { v4 } from 'uuid';
import {
BaseJobOptions,
IoredisListener,
JobsOptions,
QueueOptions,
RepeatOptions,
} from '../interfaces';
import { FinishedStatus } from '../types';
import { FinishedStatus, JobsOptions } from '../types';
import { isRedisInstance, jobIdForGroup } from '../utils';
import { BulkJobOptions, Job } from './job';
import { QueueGetters } from './queue-getters';
Expand Down
3 changes: 2 additions & 1 deletion src/classes/repeat.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createHash } from 'crypto';
import { JobsOptions, RepeatOptions } from '../interfaces';
import { RepeatOptions } from '../interfaces';
import { JobsOptions } from '../types';
import { Job } from './job';
import { QueueBase } from './queue-base';
import { parseExpression } from 'cron-parser';
Expand Down
3 changes: 2 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ const pack = packer.pack;
import {
JobJson,
JobJsonRaw,
JobsOptions,
QueueSchedulerOptions,
RedisClient,
WorkerOptions,
KeepJobs,
} from '../interfaces';
import { JobsOptions } from '../types';
import { JobState, FinishedStatus, FinishedPropValAttribute } from '../types';
import { ErrorCode } from '../enums';
import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils';
Expand Down Expand Up @@ -291,6 +291,7 @@ export class Scripts {
maxMetricsSize: opts.metrics?.maxDataPoints
? opts.metrics?.maxDataPoints
: '',
rdof: !!job.opts?.removeDependencyOnFail,
}),
];

Expand Down
17 changes: 17 additions & 0 deletions src/commands/includes/moveParentToWaitIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
--[[
Validate and move parent to active if needed.
]]

-- Includes
--- @include "getTargetQueueList"

local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentId )
local isParentActive = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
local parentTarget = getTargetQueueList(parentQueueKey .. ":meta", parentQueueKey .. ":wait", parentQueueKey .. ":paused")
rcall("RPUSH", parentTarget, parentId)

rcall("XADD", parentQueueKey .. ":events", "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
end
end
17 changes: 5 additions & 12 deletions src/commands/includes/updateParentDepsIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
--[[
Validate and move or add dependencies to parent.
Add processed results, validate and move parent to active if needed.
]]

-- Includes
--- @include "moveParentToWaitIfNeeded"

local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
parentId, jobIdKey, returnvalue )
local processedSet = parentKey .. ":processed"
rcall("HSET", processedSet, jobIdKey, returnvalue)
local activeParent = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and activeParent then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
if rcall("HEXISTS", parentQueueKey .. ":meta", "paused") ~= 1 then
rcall("RPUSH", parentQueueKey .. ":wait", parentId)
else
rcall("RPUSH", parentQueueKey .. ":paused", parentId)
end

rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
end
moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentId )
end
17 changes: 12 additions & 5 deletions src/commands/moveToFinished-12.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
ARGV[6] event data (? maybe just send jobid).
ARGV[7] fetch next?
ARGV[8] keys prefix
ARGV[9] opts
ARGV[9] opts

opts - token - lock token
opts - keepJobs
Expand All @@ -41,6 +41,7 @@
opts - attempts max attempts
opts - attemptsMade
opts - maxMetricsSize
opts - rdof -remove dependency on fail

Output:
0 OK
Expand All @@ -55,13 +56,14 @@
local rcall = redis.call

--- Includes
--- @include "includes/collectMetrics"
--- @include "includes/destructureJobKey"
--- @include "includes/moveJobFromWaitToActive"
--- @include "includes/moveParentToWaitIfNeeded"
--- @include "includes/removeJobsByMaxAge"
--- @include "includes/removeJobsByMaxCount"
--- @include "includes/trimEvents"
--- @include "includes/updateParentDepsIfNeeded"
--- @include "includes/collectMetrics"

local jobIdKey = KEYS[10]
if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
Expand Down Expand Up @@ -112,13 +114,18 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
parentId = getJobIdFromKey(parentKey)
parentQueueKey = getJobKeyPrefix(parentKey, ":" .. parentId)
end
if parentId ~= "" and ARGV[5] == "completed" then
local removeDependencyOnFail = opts['rdof']
if parentId ~= "" and (ARGV[5] == "completed" or removeDependencyOnFail) then
local parentKey = parentQueueKey .. ":" .. parentId
local dependenciesSet = parentKey .. ":dependencies"
local result = rcall("SREM", dependenciesSet, jobIdKey)
if result == 1 then
updateParentDepsIfNeeded(parentKey, parentQueueKey, dependenciesSet,
parentId, jobIdKey, ARGV[4])
if ARGV[5] == "completed" then
updateParentDepsIfNeeded(parentKey, parentQueueKey, dependenciesSet,
parentId, jobIdKey, ARGV[4])
elseif removeDependencyOnFail then
moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet, parentId )
end
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { RepeatOptions, KeepJobs, BackoffOptions } from './';
import { RepeatOptions, KeepJobs, BackoffOptions } from '.';

export interface BaseJobOptions {
export interface DefaultJobOptions {
/**
* Timestamp when the job was created.
* @defaultValue Date.now()
Expand Down Expand Up @@ -75,7 +75,7 @@ export interface BaseJobOptions {
sizeLimit?: number;
}

export interface JobsOptions extends BaseJobOptions {
export interface BaseJobOptions extends DefaultJobOptions {
/**
* Repeat this job, for example based on a `cron` schedule.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/flow-job.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JobsOptions } from './jobs-options';
import { JobsOptions } from '../types';
import { QueueOptions } from './queue-options';

export interface FlowJob {
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
export * from './advanced-options';
export * from './backoff-options';
export * from './base-job-options';
export * from './child-command';
export * from './child-message';
export * from './connection';
export * from './flow-job';
export * from './ioredis-events';
export * from './job-json';
export * from './jobs-options';
export * from './keep-jobs';
export * from './metrics-options';
export * from './metrics';
Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/job-json.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { JobsOptions } from './jobs-options';
import { RedisJobOptions } from '../types';

export interface JobJson {
id: string;
name: string;
data: string;
opts: JobsOptions;
opts: RedisJobOptions;
progress: number | object;
attemptsMade: number;
finishedOn?: number;
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/parent.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JobsOptions } from './jobs-options';
import { JobsOptions } from '../types';

/**
* Describes the parent for a Job.
Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/queue-options.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BaseJobOptions } from './jobs-options';
import { DefaultJobOptions } from './base-job-options';
import { ConnectionOptions } from './redis-options';

export enum ClientType {
Expand Down Expand Up @@ -35,7 +35,7 @@ export interface QueueBaseOptions {
* Options for the Queue class.
*/
export interface QueueOptions extends QueueBaseOptions {
defaultJobOptions?: BaseJobOptions;
defaultJobOptions?: DefaultJobOptions;

/**
* Options for the rate limiter.
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/sandboxed-job.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { JobsOptions } from '../types';
import { JobJson } from './job-json';
import { JobsOptions } from './jobs-options';

/**
* @see {@link https://docs.bullmq.io/guide/workers/sandboxed-processors}
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './finished-status';
export * from './job-json-sandbox';
export * from './job-options';
export * from './job-type';
18 changes: 18 additions & 0 deletions src/types/job-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { BaseJobOptions } from '../interfaces';

export type JobsOptions = BaseJobOptions & {
/**
* If true, removes the job from its parent dependencies when it fails after all attempts.
*/
removeDependencyOnFail?: boolean;
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment here clarifying that these fields are the ones stored in Redis and thus with smaller keys for compactness.

/**
* These fields are the ones stored in Redis with smaller keys for compactness.
*/
export type RedisJobOptions = BaseJobOptions & {
/**
* If true, removes the job from its parent dependencies when it fails after all attempts.
*/
rdof?: boolean;
};
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import { get } from 'lodash';
import * as semver from 'semver';
import {
ChildMessage,
JobsOptions,
ParentMessage,
QueueOptions,
RedisClient,
} from './interfaces';
import { JobsOptions } from './types';
import { ChildProcess } from 'child_process';

export const errorObject: { [index: string]: any } = { value: null };
Expand Down
Loading