Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): expose priority value #2804

Merged
merged 4 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import {
lengthInUtf8Bytes,
parseObjectValues,
tryCatch,
finishedErrors,
} from '../utils';
import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
Expand Down Expand Up @@ -95,7 +94,15 @@ export class Job<
* An amount of milliseconds to wait until this job can be processed.
* @defaultValue 0
*/
delay: number;
delay = 0;

/**
* Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that
* using priorities has a slight impact on performance,
* so do not use it if not required.
* @defaultValue 0
*/
priority = 0;

/**
* Timestamp when the job was created (unless overridden with job options).
Expand Down Expand Up @@ -201,6 +208,8 @@ export class Job<

this.delay = this.opts.delay;

this.priority = this.opts.priority || 0;

this.repeatJobKey = repeatJobKey;

this.timestamp = opts.timestamp ? opts.timestamp : Date.now();
Expand All @@ -214,7 +223,9 @@ export class Job<
: undefined;

this.debounceId = opts.debounce ? opts.debounce.id : undefined;
this.deduplicationId = opts.deduplication ? opts.deduplication.id : this.debounceId;
this.deduplicationId = opts.deduplication
? opts.deduplication.id
: this.debounceId;

this.toKey = queue.toKey.bind(queue);
this.setScripts();
Expand Down Expand Up @@ -737,7 +748,7 @@ export class Job<

const result = results[results.length - 1][1] as number;
if (result < 0) {
throw finishedErrors({
throw this.scripts.finishedErrors({
code: result,
jobId: this.id,
command,
Expand Down Expand Up @@ -840,13 +851,15 @@ export class Job<
/**
* Change job priority.
*
* @param opts - options containing priority and lifo values.
* @returns void
*/
async changePriority(opts: {
priority?: number;
lifo?: boolean;
}): Promise<void> {
await this.scripts.changePriority(this.id, opts.priority, opts.lifo);
this.priority = opts.priority || 0;
}

/**
Expand Down
76 changes: 57 additions & 19 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ import {
RedisJobOptions,
} from '../types';
import { ErrorCode } from '../enums';
import {
array2obj,
finishedErrors,
getParentKey,
isRedisVersionLowerThan,
} from '../utils';
import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils';
import { ChainableCommander } from 'ioredis';

export type JobData = [JobJsonRaw | number, string?];
Expand Down Expand Up @@ -225,7 +220,7 @@ export class Scripts {
}

if (<number>result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: <number>result,
parentKey: parentOpts.parentKey,
command: 'addJob',
Expand Down Expand Up @@ -414,7 +409,7 @@ export class Scripts {
);

if (result == ErrorCode.JobBelongsToJobScheduler) {
throw finishedErrors({
throw this.finishedErrors({
code: ErrorCode.JobBelongsToJobScheduler,
jobId,
command: 'remove',
Expand Down Expand Up @@ -453,7 +448,7 @@ export class Scripts {
const result = await (<any>client).updateData(keys.concat([dataJson]));

if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId: job.id,
command: 'updateData',
Expand All @@ -479,7 +474,7 @@ export class Scripts {
);

if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'updateProgress',
Expand All @@ -504,7 +499,7 @@ export class Scripts {
);

if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'addLog',
Expand Down Expand Up @@ -588,7 +583,7 @@ export class Scripts {

const result = await (<any>client).moveToFinished(args);
if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToFinished',
Expand All @@ -601,6 +596,49 @@ export class Scripts {
}
}

finishedErrors = ({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error => {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`,
);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
};

private drainArgs(delayed: boolean): (string | number)[] {
const queueKeys = this.queue.keys;

Expand Down Expand Up @@ -652,7 +690,7 @@ export class Scripts {
case 1:
return false;
default:
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
parentKey,
Expand Down Expand Up @@ -816,7 +854,7 @@ export class Scripts {
const args = this.changeDelayArgs(jobId, delay);
const result = await (<any>client).changeDelay(args);
if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'changeDelay',
Expand Down Expand Up @@ -853,7 +891,7 @@ export class Scripts {
const args = this.changePriorityArgs(jobId, priority, lifo);
const result = await (<any>client).changePriority(args);
if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'changePriority',
Expand Down Expand Up @@ -971,7 +1009,7 @@ export class Scripts {
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
const result = await (<any>client).moveToDelayed(args);
if (result < 0) {
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToDelayed',
Expand Down Expand Up @@ -1007,7 +1045,7 @@ export class Scripts {
case 1:
return false;
default:
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId,
command: 'moveToWaitingChildren',
Expand Down Expand Up @@ -1166,7 +1204,7 @@ export class Scripts {
case 1:
return;
default:
throw finishedErrors({
throw this.finishedErrors({
code: result,
jobId: job.id,
command: 'reprocessJob',
Expand Down Expand Up @@ -1230,7 +1268,7 @@ export class Scripts {

const code = await (<any>client).promote(keys.concat(args));
if (code < 0) {
throw finishedErrors({
throw this.finishedErrors({
code,
jobId,
command: 'promote',
Expand Down
1 change: 0 additions & 1 deletion src/commands/includes/deduplicateJob.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,3 @@ local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplication
end
end
end

3 changes: 2 additions & 1 deletion src/interfaces/base-job-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ export interface DefaultJobOptions {
timestamp?: number;

/**
* Ranges from 1 (highest priority) to 2 097 152 (lowest priority). Note that
* Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that
* using priorities has a slight impact on performance,
* so do not use it if not required.
* @defaultValue 0
*/
priority?: number;

Expand Down
42 changes: 0 additions & 42 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils';
import { ChildMessage, RedisClient } from './interfaces';
import { EventEmitter } from 'events';
import * as semver from 'semver';
import { ErrorCode } from './enums';

export const errorObject: { [index: string]: any } = { value: null };

Expand Down Expand Up @@ -248,44 +247,3 @@ export const toString = (value: any): string => {
};

export const QUEUE_EVENT_SUFFIX = ':qe';

export const finishedErrors = ({
code,
jobId,
parentKey,
command,
state,
}: {
code: number;
jobId?: string;
parentKey?: string;
command: string;
state?: string;
}): Error => {
switch (code) {
case ErrorCode.JobNotExist:
return new Error(`Missing key for job ${jobId}. ${command}`);
case ErrorCode.JobLockNotExist:
return new Error(`Missing lock for job ${jobId}. ${command}`);
case ErrorCode.JobNotInState:
return new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
case ErrorCode.JobPendingDependencies:
return new Error(`Job ${jobId} has pending dependencies. ${command}`);
case ErrorCode.ParentJobNotExist:
return new Error(`Missing key for parent job ${parentKey}. ${command}`);
case ErrorCode.JobLockMismatch:
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
);
case ErrorCode.ParentJobCannotBeReplaced:
return new Error(
`The parent job ${parentKey} cannot be replaced. ${command}`,
);
case ErrorCode.JobBelongsToJobScheduler:
return new Error(
`Job ${jobId} belongs to a job scheduler and cannot be removed directly`,
);
default:
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
}
};
Loading
Loading