Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/taskforcesh/bullmq into f…
Browse files Browse the repository at this point in the history
…eat/telemetry-option-omit-context
  • Loading branch information
manast committed Dec 10, 2024
2 parents 6350576 + 0e3c2e5 commit 8dc79f9
Show file tree
Hide file tree
Showing 18 changed files with 239 additions and 124 deletions.
14 changes: 14 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
## [5.33.1](https://github.com/taskforcesh/bullmq/compare/v5.33.0...v5.33.1) (2024-12-10)


### Bug Fixes

* **job-scheduler:** omit deduplication and debounce options from template options ([#2960](https://github.com/taskforcesh/bullmq/issues/2960)) ([b5fa6a3](https://github.com/taskforcesh/bullmq/commit/b5fa6a3208a8f2a39777dc30c2db2f498addb907))

# [5.33.0](https://github.com/taskforcesh/bullmq/compare/v5.32.0...v5.33.0) (2024-12-09)


### Features

* replace multi by lua scripts in moveToFailed ([#2958](https://github.com/taskforcesh/bullmq/issues/2958)) ([c19c914](https://github.com/taskforcesh/bullmq/commit/c19c914969169c660a3e108126044c5152faf0cd))

# [5.32.0](https://github.com/taskforcesh/bullmq/compare/v5.31.2...v5.32.0) (2024-12-08)


Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.32.0",
"version": "5.33.1",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
8 changes: 6 additions & 2 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import {
RepeatBaseOptions,
RepeatOptions,
} from '../interfaces';
import { JobsOptions, RepeatStrategy } from '../types';
import {
JobSchedulerTemplateOptions,
JobsOptions,
RepeatStrategy,
} from '../types';
import { Job } from './job';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';
Expand All @@ -32,7 +36,7 @@ export class JobScheduler extends QueueBase {
repeatOpts: Omit<RepeatOptions, 'key' | 'prevMillis' | 'offset'>,
jobName: N,
jobData: T,
opts: Omit<JobsOptions, 'jobId' | 'repeat' | 'delay'>,
opts: JobSchedulerTemplateOptions,
{ override }: { override: boolean },
): Promise<Job<T, R, N> | undefined> {
const { every, pattern } = repeatOpts;
Expand Down
135 changes: 55 additions & 80 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,83 +716,68 @@ export class Job<
token: string,
fetchNext = false,
): Promise<void | any[]> {
const client = await this.queue.client;
const message = err?.message;

this.failedReason = message;

let command: string;
const multi = client.multi();

this.saveStacktrace(multi, err);

//
// Check if an automatic retry should be performed
//
let finishedOn: number;
const [shouldRetry, retryDelay] = await this.shouldRetryJob(err);
if (shouldRetry) {
if (retryDelay) {
const args = this.scripts.moveToDelayedArgs(
this.id,
Date.now(),
token,
retryDelay,
);
this.scripts.execCommand(multi, 'moveToDelayed', args);
command = 'moveToDelayed';
} else {
// Retry immediately
this.scripts.execCommand(
multi,
'retryJob',
this.scripts.retryJobArgs(this.id, this.opts.lifo, token),
);
command = 'retryJob';
}
} else {
const args = this.scripts.moveToFailedArgs(
this,
message,
this.opts.removeOnFail,
token,
fetchNext,
);

this.scripts.execCommand(multi, 'moveToFinished', args);
finishedOn = args[this.scripts.moveToFinishedKeys.length + 1] as number;
command = 'moveToFinished';
}
this.failedReason = err?.message;

return this.queue.trace<Promise<void | any[]>>(
SpanKind.INTERNAL,
this.getSpanOperation(command),
this.getSpanOperation('moveToFailed'),
this.queue.name,
async (span, dstPropagationMedatadata) => {
let tm;
if (!this.opts?.telemetry?.omitContext && dstPropagationMedatadata) {
this.scripts.execCommand(multi, 'updateJobOption', [
this.toKey(this.id),
'tm',
dstPropagationMedatadata,
]);
tm = dstPropagationMedatadata;
}

const results = await multi.exec();
const anyError = results.find(result => result[0]);
if (anyError) {
throw new Error(
`Error "moveToFailed" with command ${command}: ${anyError}`,
let result;

this.updateStacktrace(err);

const fieldsToUpdate = {
failedReason: this.failedReason,
stacktrace: JSON.stringify(this.stacktrace),
tm,
};

//
// Check if an automatic retry should be performed
//
let finishedOn: number;
const [shouldRetry, retryDelay] = await this.shouldRetryJob(err);

if (shouldRetry) {
if (retryDelay) {
// Retry with delay
result = await this.scripts.moveToDelayed(
this.id,
Date.now(),
retryDelay,
token,
{ fieldsToUpdate },
);
} else {
// Retry immediately
result = await this.scripts.retryJob(
this.id,
this.opts.lifo,
token,
{
fieldsToUpdate,
},
);
}
} else {
const args = this.scripts.moveToFailedArgs(
this,
this.failedReason,
this.opts.removeOnFail,
token,
fetchNext,
fieldsToUpdate,
);
}

const result = results[results.length - 1][1] as number;
if (result < 0) {
throw this.scripts.finishedErrors({
code: result,
jobId: this.id,
command,
state: 'active',
});
result = await this.scripts.moveToFinished(this.id, args);
finishedOn = args[
this.scripts.moveToFinishedKeys.length + 1
] as number;
}

if (finishedOn && typeof finishedOn === 'number') {
Expand All @@ -805,9 +790,7 @@ export class Job<

this.attemptsMade += 1;

if (Array.isArray(result)) {
return raw2NextJobData(result);
}
return result;
},
);
}
Expand Down Expand Up @@ -1322,7 +1305,7 @@ export class Job<
}
}

protected saveStacktrace(multi: ChainableCommander, err: Error): void {
protected updateStacktrace(err: Error) {
this.stacktrace = this.stacktrace || [];

if (err?.stack) {
Expand All @@ -1333,14 +1316,6 @@ export class Job<
this.stacktrace = this.stacktrace.slice(-this.opts.stackTraceLimit);
}
}

const args = this.scripts.saveStacktraceArgs(
this.id,
JSON.stringify(this.stacktrace),
err?.message,
);

this.scripts.execCommand(multi, 'saveStacktrace', args);
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ import {
RepeatableJob,
RepeatOptions,
} from '../interfaces';
import { FinishedStatus, JobsOptions, MinimalQueue } from '../types';
import {
FinishedStatus,
JobsOptions,
JobSchedulerTemplateOptions,
MinimalQueue,
} from '../types';
import { Job } from './job';
import { QueueGetters } from './queue-getters';
import { Repeat } from './repeat';
Expand Down Expand Up @@ -453,7 +458,7 @@ export class Queue<
jobTemplate?: {
name?: NameType;
data?: DataType;
opts?: Omit<JobsOptions, 'jobId' | 'repeat' | 'delay'>;
opts?: JobSchedulerTemplateOptions;
},
) {
if (repeatOpts.endDate) {
Expand Down
71 changes: 56 additions & 15 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ import {
RedisJobOptions,
} from '../types';
import { ErrorCode } from '../enums';
import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils';
import {
array2obj,
getParentKey,
isRedisVersionLowerThan,
objectToFlatArray,
} from '../utils';
import { ChainableCommander } from 'ioredis';
import { version as packageVersion } from '../version';
export type JobData = [JobJsonRaw | number, string?];
Expand Down Expand Up @@ -457,6 +462,23 @@ export class Scripts {
return this.execCommand(client, 'extendLock', args);
}

async extendLocks(
jobIds: string[],
tokens: string[],
duration: number,
): Promise<string[]> {
const client = await this.queue.client;

const args = [
this.queue.keys.stalled,
this.queue.toKey(''),
pack(tokens),
pack(jobIds),
duration,
];
return this.execCommand(client, 'extendLocks', args);
}

async updateData<T = any, R = any, N extends string = string>(
job: MinimalJob<T, R, N>,
data: T,
Expand Down Expand Up @@ -547,6 +569,7 @@ export class Scripts {
token: string,
timestamp: number,
fetchNext = true,
fieldsToUpdate?: Record<string, any>,
): (string | number | boolean | Buffer)[] {
const queueKeys = this.queue.keys;
const opts: WorkerOptions = <WorkerOptions>this.queue.opts;
Expand Down Expand Up @@ -584,6 +607,7 @@ export class Scripts {
idof: !!job.opts?.ignoreDependencyOnFailure,
rdof: !!job.opts?.removeDependencyOnFailure,
}),
fieldsToUpdate ? pack(objectToFlatArray(fieldsToUpdate)) : void 0,
];

return keys.concat(args);
Expand Down Expand Up @@ -787,6 +811,7 @@ export class Scripts {
removeOnFailed: boolean | number | KeepJobs,
token: string,
fetchNext = false,
fieldsToUpdate?: Record<string, any>,
): (string | number | boolean | Buffer)[] {
const timestamp = Date.now();
return this.moveToFinishedArgs(
Expand All @@ -798,6 +823,7 @@ export class Scripts {
token,
timestamp,
fetchNext,
fieldsToUpdate,
);
}

Expand Down Expand Up @@ -916,9 +942,9 @@ export class Scripts {
token: string,
delay: number,
opts: MoveToDelayedOpts = {},
): (string | number)[] {
): (string | number | Buffer)[] {
const queueKeys = this.queue.keys;
const keys: (string | number)[] = [
const keys: (string | number | Buffer)[] = [
queueKeys.marker,
queueKeys.active,
queueKeys.prioritized,
Expand All @@ -936,19 +962,12 @@ export class Scripts {
token,
delay,
opts.skipAttempt ? '1' : '0',
opts.fieldsToUpdate
? pack(objectToFlatArray(opts.fieldsToUpdate))
: void 0,
]);
}

saveStacktraceArgs(
jobId: string,
stacktrace: string,
failedReason: string,
): string[] {
const keys: string[] = [this.queue.toKey(jobId)];

return keys.concat([stacktrace, failedReason]);
}

moveToWaitingChildrenArgs(
jobId: string,
token: string,
Expand Down Expand Up @@ -1106,8 +1125,9 @@ export class Scripts {
jobId: string,
lifo: boolean,
token: string,
): (string | number)[] {
const keys: (string | number)[] = [
fieldsToUpdate?: Record<string, any>,
): (string | number | Buffer)[] {
const keys: (string | number | Buffer)[] = [
this.queue.keys.active,
this.queue.keys.wait,
this.queue.keys.paused,
Expand All @@ -1129,9 +1149,30 @@ export class Scripts {
pushCmd,
jobId,
token,
fieldsToUpdate ? pack(objectToFlatArray(fieldsToUpdate)) : void 0,
]);
}

async retryJob(
jobId: string,
lifo: boolean,
token: string,
fieldsToUpdate?: Record<string, any>,
): Promise<void> {
const client = await this.queue.client;

const args = this.retryJobArgs(jobId, lifo, token, fieldsToUpdate);
const result = await this.execCommand(client, 'retryJob', args);
if (result < 0) {
throw this.finishedErrors({
code: result,
jobId,
command: 'retryJob',
state: 'active',
});
}
}

protected moveJobsToWaitArgs(
state: FinishedStatus | 'delayed',
count: number,
Expand Down
Loading

0 comments on commit 8dc79f9

Please sign in to comment.