From 09f257196f6d5a6690edbf55f12d585cec86ee8f Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Thu, 21 Nov 2024 15:54:25 +0100 Subject: [PATCH] feat(queue): refactor a protected addJob method allowing telemetry extensions --- src/classes/queue.ts | 91 ++++++++++++++++++++++++++----------------- src/classes/worker.ts | 2 +- 2 files changed, 56 insertions(+), 37 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 2042b180eb..89373010ae 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -311,47 +311,66 @@ export class Queue< opts = { ...opts, telemetryMetadata: srcPropagationMedatada }; } - if (opts && opts.repeat) { - if (opts.repeat.endDate) { - if (+new Date(opts.repeat.endDate) < Date.now()) { - throw new Error( - 'End date must be greater than current timestamp', - ); - } - } + const job = await this.addJob(name, data, opts); - return (await this.repeat).updateRepeatableJob< - DataType, - ResultType, - NameType - >(name, data, { ...this.jobsOpts, ...opts }, { override: true }); - } else { - const jobId = opts?.jobId; + span?.setAttributes({ + [TelemetryAttributes.JobName]: name, + [TelemetryAttributes.JobId]: job.id, + }); - if (jobId == '0' || jobId?.startsWith('0:')) { - throw new Error("JobId cannot be '0' or start with 0:"); - } + return job; + }, + ); + } - const job = await this.Job.create( - this as MinimalQueue, - name, - data, - { - ...this.jobsOpts, - ...opts, - jobId, - }, - ); - this.emit('waiting', job as JobBase); + /** + * addJob is a telemetry free version of the add method, useful in order to wrap it + * with custom telemetry on subclasses. + * + * @param name + * @param data + * @param opts + * + * @returns Job + */ + protected async addJob( + name: NameType, + data: DataType, + opts?: JobsOptions, + ): Promise> { + if (opts && opts.repeat) { + if (opts.repeat.endDate) { + if (+new Date(opts.repeat.endDate) < Date.now()) { + throw new Error('End date must be greater than current timestamp'); + } + } - span?.setAttributes({ - [TelemetryAttributes.JobId]: job.id, - }); + return (await this.repeat).updateRepeatableJob< + DataType, + ResultType, + NameType + >(name, data, { ...this.jobsOpts, ...opts }, { override: true }); + } else { + const jobId = opts?.jobId; - return job; - } - }, - ); + if (jobId == '0' || jobId?.startsWith('0:')) { + throw new Error("JobId cannot be '0' or start with 0:"); + } + + const job = await this.Job.create( + this as MinimalQueue, + name, + data, + { + ...this.jobsOpts, + ...opts, + jobId, + }, + ); + this.emit('waiting', job as JobBase); + + return job; + } } /** diff --git a/src/classes/worker.ts b/src/classes/worker.ts index e5092ee772..58fc702f9d 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -995,7 +995,7 @@ will never work with more accuracy than 1ms. */ * This method waits for current jobs to finalize before returning. * * @param force - Use force boolean parameter if you do not want to wait for - * current jobs to be processed. When using telemetry, be mindful that it can + * current jobs to be processed. When using telemetry, be mindful that it can * interfere with the proper closure of spans, potentially preventing them from being exported. * * @returns Promise that resolves when the worker has been closed.