diff --git a/README.md b/README.md index c61b96e356..212adfe6a1 100644 --- a/README.md +++ b/README.md @@ -217,27 +217,27 @@ This is just scratching the surface, check all the features and more in the offi Since there are a few job queue solutions, here is a table comparing them: -| Feature | [BullMQ-Pro](https://bullmq.io/#bullmq-pro) | [BullMQ](https://bullmq.io) | Bull | Kue | Bee | Agenda | -| :------------------------ | :-------------: | :-------------: | :-------------: | :---: | -------- | ------ | -| Backend | redis | redis | redis | redis | redis | mongo | -| Observables | ✓ | | | | | | -| Group Rate Limit | ✓ | | | | | | -| Group Support | ✓ | | | | | | -| Batches Support | ✓ | | | | | | -| Parent/Child Dependencies | ✓ | ✓ | | | | | -| Debouncing | ✓ | ✓ | ✓ | | | | -| Priorities | ✓ | ✓ | ✓ | ✓ | | ✓ | -| Concurrency | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | -| Delayed jobs | ✓ | ✓ | ✓ | ✓ | | ✓ | -| Global events | ✓ | ✓ | ✓ | ✓ | | | -| Rate Limiter | ✓ | ✓ | ✓ | | | | -| Pause/Resume | ✓ | ✓ | ✓ | ✓ | | | -| Sandboxed worker | ✓ | ✓ | ✓ | | | | -| Repeatable jobs | ✓ | ✓ | ✓ | | | ✓ | -| Atomic ops | ✓ | ✓ | ✓ | | ✓ | | -| Persistence | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | -| UI | ✓ | ✓ | ✓ | ✓ | | ✓ | -| Optimized for | Jobs / Messages | Jobs / Messages | Jobs / Messages | Jobs | Messages | Jobs | +| Feature | [BullMQ-Pro](https://bullmq.io/#bullmq-pro) | [BullMQ](https://bullmq.io) | Bull | Kue | Bee | Agenda | +| :------------------------ | :-----------------------------------------: | :-------------------------: | :-------------: | :---: | -------- | ------ | +| Backend | redis | redis | redis | redis | redis | mongo | +| Observables | ✓ | | | | | | +| Group Rate Limit | ✓ | | | | | | +| Group Support | ✓ | | | | | | +| Batches Support | ✓ | | | | | | +| Parent/Child Dependencies | ✓ | ✓ | | | | | +| Debouncing | ✓ | ✓ | ✓ | | | | +| Priorities | ✓ | ✓ | ✓ | ✓ | | ✓ | +| Concurrency | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +| Delayed jobs | ✓ | ✓ | ✓ | ✓ | | ✓ | +| Global events | ✓ | ✓ | ✓ | ✓ | | | +| Rate Limiter | ✓ | ✓ | ✓ | | | | +| Pause/Resume | ✓ | ✓ | ✓ | ✓ | | | +| Sandboxed worker | ✓ | ✓ | ✓ | | | | +| Repeatable jobs | ✓ | ✓ | ✓ | | | ✓ | +| Atomic ops | ✓ | ✓ | ✓ | | ✓ | | +| Persistence | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +| UI | ✓ | ✓ | ✓ | ✓ | | ✓ | +| Optimized for | Jobs / Messages | Jobs / Messages | Jobs / Messages | Jobs | Messages | Jobs | ## Contributing diff --git a/docs/gitbook/bullmq-pro/groups/concurrency.md b/docs/gitbook/bullmq-pro/groups/concurrency.md index 3d107aa092..6ff3e18516 100644 --- a/docs/gitbook/bullmq-pro/groups/concurrency.md +++ b/docs/gitbook/bullmq-pro/groups/concurrency.md @@ -10,11 +10,11 @@ The concurrency factor is configured as follows: import { WorkerPro } from '@taskforcesh/bullmq-pro'; const worker = new WorkerPro('myQueue', processFn, { - group: { - concurrency: 3 // Limit to max 3 parallel jobs per group - }, - concurrency: 100, - connection + group: { + concurrency: 3, // Limit to max 3 parallel jobs per group + }, + concurrency: 100, + connection, }); ``` diff --git a/src/classes/flow-producer.ts b/src/classes/flow-producer.ts index 5753aaec07..c7d70fa6dc 100644 --- a/src/classes/flow-producer.ts +++ b/src/classes/flow-producer.ts @@ -456,6 +456,7 @@ export class FlowProducer extends EventEmitter { emit: this.emit.bind(this) as any, on: this.on.bind(this) as any, redisVersion: this.connection.redisVersion, + trace: async (): Promise => {}, }; } diff --git a/src/classes/job.ts b/src/classes/job.ts index b236139552..313bc283d0 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -34,6 +34,7 @@ import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; import { UnrecoverableError } from './errors/unrecoverable-error'; import type { QueueEvents } from './queue-events'; +import { SpanKind } from '../enums'; const logger = debuglog('bull'); @@ -43,6 +44,7 @@ const optsDecodeMap = { idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', rdof: 'removeDependencyOnFailure', + tm: 'telemetryMetadata', }; const optsEncodeMap = invertObject(optsDecodeMap); @@ -656,6 +658,28 @@ export class Job< return result; } + private async shouldRetryJob(err: Error): Promise<[boolean, number]> { + if ( + this.attemptsMade + 1 < this.opts.attempts && + !this.discarded && + !(err instanceof UnrecoverableError || err.name == 'UnrecoverableError') + ) { + const opts = this.queue.opts as WorkerOptions; + + const delay = await Backoffs.calculate( + this.opts.backoff, + this.attemptsMade + 1, + err, + this, + opts.settings && opts.settings.backoffStrategy, + ); + + return [delay == -1 ? false : true, delay == -1 ? 0 : delay]; + } else { + return [false, 0]; + } + } + /** * Moves a job to the failed queue. * @@ -672,7 +696,6 @@ export class Job< const client = await this.queue.client; const message = err?.message; - const queue = this.queue; this.failedReason = message; let command: string; @@ -683,32 +706,15 @@ export class Job< // // Check if an automatic retry should be performed // - let moveToFailed = false; - let finishedOn, delay; - if ( - this.attemptsMade + 1 < this.opts.attempts && - !this.discarded && - !(err instanceof UnrecoverableError || err.name == 'UnrecoverableError') - ) { - const opts = queue.opts as WorkerOptions; - - // Check if backoff is needed - delay = await Backoffs.calculate( - this.opts.backoff, - this.attemptsMade + 1, - err, - this, - opts.settings && opts.settings.backoffStrategy, - ); - - if (delay === -1) { - moveToFailed = true; - } else if (delay) { + let finishedOn: number; + const [shouldRetry, retryDelay] = await this.shouldRetryJob(err); + if (shouldRetry) { + if (retryDelay) { const args = this.scripts.moveToDelayedArgs( this.id, Date.now(), token, - delay, + retryDelay, ); this.scripts.execCommand(multi, 'moveToDelayed', args); command = 'moveToDelayed'; @@ -722,11 +728,6 @@ export class Job< command = 'retryJob'; } } else { - // If not, move to failed - moveToFailed = true; - } - - if (moveToFailed) { const args = this.scripts.moveToFailedArgs( this, message, @@ -740,36 +741,62 @@ export class Job< command = 'moveToFinished'; } - const results = await multi.exec(); - const anyError = results.find(result => result[0]); - if (anyError) { - throw new Error( - `Error "moveToFailed" with command ${command}: ${anyError}`, - ); - } + return this.queue.trace>( + SpanKind.INTERNAL, + this.getSpanOperation(command), + this.queue.name, + async (span, dstPropagationMedatadata) => { + if (dstPropagationMedatadata) { + this.scripts.execCommand(multi, 'updateJobOption', [ + this.toKey(this.id), + 'tm', + dstPropagationMedatadata, + ]); + } - const result = results[results.length - 1][1] as number; - if (result < 0) { - throw this.scripts.finishedErrors({ - code: result, - jobId: this.id, - command, - state: 'active', - }); - } + const results = await multi.exec(); + const anyError = results.find(result => result[0]); + if (anyError) { + throw new Error( + `Error "moveToFailed" with command ${command}: ${anyError}`, + ); + } - if (finishedOn && typeof finishedOn === 'number') { - this.finishedOn = finishedOn; - } + const result = results[results.length - 1][1] as number; + if (result < 0) { + throw this.scripts.finishedErrors({ + code: result, + jobId: this.id, + command, + state: 'active', + }); + } - if (delay && typeof delay === 'number') { - this.delay = delay; - } + if (finishedOn && typeof finishedOn === 'number') { + this.finishedOn = finishedOn; + } - this.attemptsMade += 1; + if (retryDelay && typeof retryDelay === 'number') { + this.delay = retryDelay; + } + + this.attemptsMade += 1; + + if (Array.isArray(result)) { + return raw2NextJobData(result); + } + }, + ); + } - if (Array.isArray(result)) { - return raw2NextJobData(result); + private getSpanOperation(command: string) { + switch (command) { + case 'moveToDelayed': + return 'delay'; + case 'retryJob': + return 'retry'; + case 'moveToFinished': + return 'fail'; } } diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 41097e966d..6b5ef9d385 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -1,5 +1,5 @@ import { EventEmitter } from 'events'; -import { QueueBaseOptions, RedisClient } from '../interfaces'; +import { QueueBaseOptions, RedisClient, Span, Tracer } from '../interfaces'; import { MinimalQueue } from '../types'; import { delay, @@ -11,6 +11,7 @@ import { RedisConnection } from './redis-connection'; import { Job } from './job'; import { KeysMap, QueueKeys } from './queue-keys'; import { Scripts } from './scripts'; +import { TelemetryAttributes, SpanKind } from '../enums'; /** * @class QueueBase @@ -30,6 +31,13 @@ export class QueueBase extends EventEmitter implements MinimalQueue { protected connection: RedisConnection; public readonly qualifiedName: string; + /** + * Instance of a telemetry client + * To use it wrap the code with trace helper + * It will check if tracer is provided and if not it will continue as is + */ + private tracer: Tracer | undefined; + /** * * @param name - The name of the queue. @@ -76,6 +84,10 @@ export class QueueBase extends EventEmitter implements MinimalQueue { this.keys = queueKeys.getKeys(name); this.toKey = (type: string) => queueKeys.toKey(name, type); this.setScripts(); + + if (opts?.telemetry) { + this.tracer = opts.telemetry.tracer; + } } /** @@ -175,4 +187,75 @@ export class QueueBase extends EventEmitter implements MinimalQueue { } } } + + /** + * Wraps the code with telemetry and provides a span for configuration. + * + * @param spanKind - kind of the span: Producer, Consumer, Internal + * @param operation - operation name (such as add, process, etc) + * @param destination - destination name (normally the queue name) + * @param callback - code to wrap with telemetry + * @param srcPropagationMedatada - + * @returns + */ + async trace( + spanKind: SpanKind, + operation: string, + destination: string, + callback: (span?: Span, dstPropagationMetadata?: string) => Promise | T, + srcPropagationMetadata?: string, + ) { + if (!this.tracer) { + return callback(); + } + + const currentContext = this.opts.telemetry.contextManager.active(); + + let parentContext; + if (srcPropagationMetadata) { + parentContext = this.opts.telemetry.contextManager.fromMetadata( + currentContext, + srcPropagationMetadata, + ); + } + + const spanName = `${operation} ${destination}`; + const span = this.tracer.startSpan( + spanName, + { + kind: spanKind, + }, + parentContext, + ); + + try { + span.setAttributes({ + [TelemetryAttributes.QueueName]: this.name, + [TelemetryAttributes.QueueOperation]: operation, + }); + + let messageContext; + let dstPropagationMetadata: undefined | string; + + if (spanKind === SpanKind.CONSUMER) { + messageContext = span.setSpanOnContext(parentContext); + } else { + messageContext = span.setSpanOnContext(currentContext); + } + + if (callback.length == 2) { + dstPropagationMetadata = + this.opts.telemetry.contextManager.getMetadata(messageContext); + } + + return await this.opts.telemetry.contextManager.with(messageContext, () => + callback(span, dstPropagationMetadata), + ); + } catch (err) { + span.recordException(err as Error); + throw err; + } finally { + span.end(); + } + } } diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index 1f5c6c41ee..76e977fa21 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -58,7 +58,10 @@ export interface QueueEventsListener extends IoredisListener { * * This event is triggered when a job is deduplicated because deduplicatedId still existed. */ - deduplicated: (args: { jobId: string; deduplicationId: string }, id: string) => void; + deduplicated: ( + args: { jobId: string; deduplicationId: string }, + id: string, + ) => void; /** * Listen to 'delayed' event. diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index 66b094f89f..edaf192812 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -135,7 +135,7 @@ export class QueueGetters< return client.get(`${this.keys.de}:${id}`); } - + /** * Job counts by type * diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 15a70bd8ce..ddbadb9cd5 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -12,6 +12,7 @@ import { Job } from './job'; import { QueueGetters } from './queue-getters'; import { Repeat } from './repeat'; import { RedisConnection } from './redis-connection'; +import { SpanKind, TelemetryAttributes } from '../enums'; import { JobScheduler } from './job-scheduler'; import { version } from '../version'; @@ -250,38 +251,56 @@ export class Queue< data: DataType, opts?: JobsOptions, ): Promise> { - if (opts && opts.repeat) { - if (opts.repeat.endDate) { - if (+new Date(opts.repeat.endDate) < Date.now()) { - throw new Error('End date must be greater than current timestamp'); + return this.trace>( + SpanKind.PRODUCER, + 'add', + `${this.name}.${name}`, + async (span, srcPropagationMedatada) => { + if (srcPropagationMedatada) { + opts = { ...opts, telemetryMetadata: srcPropagationMedatada }; } - } - - return (await this.repeat).updateRepeatableJob< - DataType, - ResultType, - NameType - >(name, data, { ...this.jobsOpts, ...opts }, { override: true }); - } else { - const jobId = opts?.jobId; - if (jobId == '0' || jobId?.startsWith('0:')) { - throw new Error("JobId cannot be '0' or start with 0:"); - } - - const job = await this.Job.create( - this as MinimalQueue, - name, - data, - { - ...this.jobsOpts, - ...opts, - jobId, - }, - ); - this.emit('waiting', job); - return job; - } + if (opts && opts.repeat) { + if (opts.repeat.endDate) { + if (+new Date(opts.repeat.endDate) < Date.now()) { + throw new Error( + 'End date must be greater than current timestamp', + ); + } + } + + return (await this.repeat).updateRepeatableJob< + DataType, + ResultType, + NameType + >(name, data, { ...this.jobsOpts, ...opts }, { override: true }); + } else { + const jobId = opts?.jobId; + + if (jobId == '0' || jobId?.startsWith('0:')) { + throw new Error("JobId cannot be '0' or start with 0:"); + } + + const job = await this.Job.create( + this as MinimalQueue, + name, + data, + { + ...this.jobsOpts, + ...opts, + jobId, + }, + ); + this.emit('waiting', job); + + span?.setAttributes({ + [TelemetryAttributes.JobId]: job.id, + }); + + return job; + } + }, + ); } /** @@ -291,20 +310,35 @@ export class Queue< * @param jobs - The array of jobs to add to the queue. Each job is defined by 3 * properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'. */ - addBulk( + async addBulk( jobs: { name: NameType; data: DataType; opts?: BulkJobOptions }[], ): Promise[]> { - return this.Job.createBulk( - this as MinimalQueue, - jobs.map(job => ({ - name: job.name, - data: job.data, - opts: { - ...this.jobsOpts, - ...job.opts, - jobId: job.opts?.jobId, - }, - })), + return this.trace[]>( + SpanKind.PRODUCER, + 'addBulk', + this.name, + async (span, srcPropagationMedatada) => { + if (span) { + span.setAttributes({ + [TelemetryAttributes.BulkNames]: jobs.map(job => job.name), + [TelemetryAttributes.BulkCount]: jobs.length, + }); + } + + return await this.Job.createBulk( + this as MinimalQueue, + jobs.map(job => ({ + name: job.name, + data: job.data, + opts: { + ...this.jobsOpts, + ...job.opts, + jobId: job.opts?.jobId, + tm: span && srcPropagationMedatada, + }, + })), + ); + }, ); } @@ -363,8 +397,11 @@ export class Queue< * and in that case it will add it there instead of the wait list. */ async pause(): Promise { - await this.scripts.pause(true); - this.emit('paused'); + await this.trace(SpanKind.INTERNAL, 'pause', this.name, async () => { + await this.scripts.pause(true); + + this.emit('paused'); + }); } /** @@ -372,12 +409,15 @@ export class Queue< * */ async close(): Promise { - if (!this.closing) { - if (this._repeat) { - await this._repeat.close(); + await this.trace(SpanKind.INTERNAL, 'close', this.name, async () => { + if (!this.closing) { + if (this._repeat) { + await this._repeat.close(); + } } - } - return super.close(); + + await super.close(); + }); } /** * Resumes the processing of this queue globally. @@ -386,8 +426,11 @@ export class Queue< * queue. */ async resume(): Promise { - await this.scripts.pause(false); - this.emit('resumed'); + await this.trace(SpanKind.INTERNAL, 'resume', this.name, async () => { + await this.scripts.pause(false); + + this.emit('resumed'); + }); } /** @@ -461,10 +504,22 @@ export class Queue< repeatOpts: RepeatOptions, jobId?: string, ): Promise { - const repeat = await this.repeat; - const removed = await repeat.removeRepeatable(name, repeatOpts, jobId); + return this.trace( + SpanKind.INTERNAL, + 'removeRepeatable', + `${this.name}.${name}`, + async span => { + span?.setAttributes({ + [TelemetryAttributes.JobName]: name, + [TelemetryAttributes.JobId]: jobId, + }); - return !removed; + const repeat = await this.repeat; + const removed = await repeat.removeRepeatable(name, repeatOpts, jobId); + + return !removed; + }, + ); } /** @@ -489,9 +544,20 @@ export class Queue< * @param id - identifier */ async removeDebounceKey(id: string): Promise { - const client = await this.client; + return this.trace( + SpanKind.INTERNAL, + 'removeDebounceKey', + `${this.name}`, + async span => { + span?.setAttributes({ + [TelemetryAttributes.JobKey]: id, + }); + + const client = await this.client; - return client.del(`${this.keys.de}:${id}`); + return await client.del(`${this.keys.de}:${id}`); + }, + ); } /** @@ -500,9 +566,19 @@ export class Queue< * @param id - identifier */ async removeDeduplicationKey(id: string): Promise { - const client = await this.client; + return this.trace( + SpanKind.INTERNAL, + 'removeDeduplicationKey', + `${this.name}`, + async span => { + span?.setAttributes({ + [TelemetryAttributes.DeduplicationKey]: id, + }); - return client.del(`${this.keys.de}:${id}`); + const client = await this.client; + return client.del(`${this.keys.de}:${id}`); + }, + ); } /** @@ -518,10 +594,21 @@ export class Queue< * @returns */ async removeRepeatableByKey(key: string): Promise { - const repeat = await this.repeat; - const removed = await repeat.removeRepeatableByKey(key); + return this.trace( + SpanKind.INTERNAL, + 'removeRepeatableByKey', + `${this.name}`, + async span => { + span?.setAttributes({ + [TelemetryAttributes.JobKey]: key, + }); - return !removed; + const repeat = await this.repeat; + const removed = await repeat.removeRepeatableByKey(key); + + return !removed; + }, + ); } /** @@ -533,8 +620,22 @@ export class Queue< * @returns 1 if it managed to remove the job or 0 if the job or * any of its dependencies were locked. */ - remove(jobId: string, { removeChildren = true } = {}): Promise { - return this.scripts.remove(jobId, removeChildren); + async remove(jobId: string, { removeChildren = true } = {}): Promise { + return this.trace( + SpanKind.INTERNAL, + 'remove', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.JobId]: jobId, + [TelemetryAttributes.JobOptions]: JSON.stringify({ + removeChildren, + }), + }); + + return await this.scripts.remove(jobId, removeChildren); + }, + ); } /** @@ -547,7 +648,19 @@ export class Queue< jobId: string, progress: number | object, ): Promise { - return this.scripts.updateProgress(jobId, progress); + await this.trace( + SpanKind.INTERNAL, + 'updateJobProgress', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.JobId]: jobId, + [TelemetryAttributes.JobProgress]: JSON.stringify(progress), + }); + + await this.scripts.updateProgress(jobId, progress); + }, + ); } /** @@ -574,8 +687,19 @@ export class Queue< * @param delayed - Pass true if it should also clean the * delayed jobs. */ - drain(delayed = false): Promise { - return this.scripts.drain(delayed); + async drain(delayed = false): Promise { + await this.trace( + SpanKind.INTERNAL, + 'drain', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.QueueDrainDelay]: delayed, + }); + + await this.scripts.drain(delayed); + }, + ); } /** @@ -600,28 +724,43 @@ export class Queue< | 'delayed' | 'failed' = 'completed', ): Promise { - const maxCount = limit || Infinity; - const maxCountPerCall = Math.min(10000, maxCount); - const timestamp = Date.now() - grace; - let deletedCount = 0; - const deletedJobsIds: string[] = []; - - while (deletedCount < maxCount) { - const jobsIds = await this.scripts.cleanJobsInSet( - type, - timestamp, - maxCountPerCall, - ); - - this.emit('cleaned', jobsIds, type); - deletedCount += jobsIds.length; - deletedJobsIds.push(...jobsIds); - - if (jobsIds.length < maxCountPerCall) { - break; - } - } - return deletedJobsIds; + return this.trace( + SpanKind.INTERNAL, + 'clean', + this.name, + async span => { + const maxCount = limit || Infinity; + const maxCountPerCall = Math.min(10000, maxCount); + const timestamp = Date.now() - grace; + let deletedCount = 0; + const deletedJobsIds: string[] = []; + + while (deletedCount < maxCount) { + const jobsIds = await this.scripts.cleanJobsInSet( + type, + timestamp, + maxCountPerCall, + ); + + this.emit('cleaned', jobsIds, type); + deletedCount += jobsIds.length; + deletedJobsIds.push(...jobsIds); + + if (jobsIds.length < maxCountPerCall) { + break; + } + } + + span?.setAttributes({ + [TelemetryAttributes.QueueGrace]: grace, + [TelemetryAttributes.JobType]: type, + [TelemetryAttributes.QueueCleanLimit]: maxCount, + [TelemetryAttributes.JobIds]: deletedJobsIds, + }); + + return deletedJobsIds; + }, + ); } /** @@ -636,16 +775,23 @@ export class Queue< * @param opts - Obliterate options. */ async obliterate(opts?: ObliterateOpts): Promise { - await this.pause(); - - let cursor = 0; - do { - cursor = await this.scripts.obliterate({ - force: false, - count: 1000, - ...opts, - }); - } while (cursor); + await this.trace( + SpanKind.INTERNAL, + 'obliterate', + this.name, + async () => { + await this.pause(); + + let cursor = 0; + do { + cursor = await this.scripts.obliterate({ + force: false, + count: 1000, + ...opts, + }); + } while (cursor); + }, + ); } /** @@ -661,14 +807,25 @@ export class Queue< async retryJobs( opts: { count?: number; state?: FinishedStatus; timestamp?: number } = {}, ): Promise { - let cursor = 0; - do { - cursor = await this.scripts.retryJobs( - opts.state, - opts.count, - opts.timestamp, - ); - } while (cursor); + await this.trace( + SpanKind.PRODUCER, + 'retryJobs', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.QueueOptions]: JSON.stringify(opts), + }); + + let cursor = 0; + do { + cursor = await this.scripts.retryJobs( + opts.state, + opts.count, + opts.timestamp, + ); + } while (cursor); + }, + ); } /** @@ -680,10 +837,21 @@ export class Queue< * @returns */ async promoteJobs(opts: { count?: number } = {}): Promise { - let cursor = 0; - do { - cursor = await this.scripts.promoteJobs(opts.count); - } while (cursor); + await this.trace( + SpanKind.INTERNAL, + 'promoteJobs', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.QueueOptions]: JSON.stringify(opts), + }); + + let cursor = 0; + do { + cursor = await this.scripts.promoteJobs(opts.count); + } while (cursor); + }, + ); } /** @@ -692,8 +860,19 @@ export class Queue< * @param maxLength - */ async trimEvents(maxLength: number): Promise { - const client = await this.client; - return client.xtrim(this.keys.events, 'MAXLEN', '~', maxLength); + return this.trace( + SpanKind.INTERNAL, + 'trimEvents', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.QueueEventMaxLength]: maxLength, + }); + + const client = await this.client; + return await client.xtrim(this.keys.events, 'MAXLEN', '~', maxLength); + }, + ); } /** diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 57156d9414..f354c94253 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -13,6 +13,7 @@ import { JobJsonRaw, Processor, RedisClient, + Span, WorkerOptions, } from '../interfaces'; import { MinimalQueue } from '../types'; @@ -35,6 +36,7 @@ import { RATE_LIMIT_ERROR, WaitingChildrenError, } from './errors'; +import { SpanKind, TelemetryAttributes } from '../enums'; import { JobScheduler } from './job-scheduler'; // 10 seconds is the maximum time a BRPOPLPUSH can block. @@ -183,7 +185,8 @@ export class Worker< private extendLocksTimer: NodeJS.Timeout | null = null; private limitUntil = 0; private resumeWorker: () => void; - private stalledCheckTimer: NodeJS.Timeout; + + private stalledCheckStopper?: () => void; private waiting: Promise | null = null; private _repeat: Repeat; // To be deprecated in v6 in favor of Job Scheduler @@ -526,7 +529,7 @@ export class Worker< } this.running = false; - return asyncFifoQueue.waitAll(); + return await asyncFifoQueue.waitAll(); } catch (error) { this.running = false; throw error; @@ -539,12 +542,30 @@ export class Worker< * @returns a Job or undefined if no job was available in the queue. */ async getNextJob(token: string, { block = true }: GetNextJobOptions = {}) { - return this._getNextJob( + const nextJob = await this._getNextJob( await this.client, await this.blockingConnection.client, token, { block }, ); + + return this.trace>( + SpanKind.INTERNAL, + 'getNextJob', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.QueueName]: this.name, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerOptions]: JSON.stringify({ block }), + [TelemetryAttributes.JobId]: nextJob?.id, + }); + + return nextJob; + }, + nextJob?.opts.telemetryMetadata, + ); } private async _getNextJob( @@ -604,13 +625,25 @@ export class Worker< * @param expireTimeMs - expire time in ms of this rate limit. */ async rateLimit(expireTimeMs: number): Promise { - await this.client.then(client => - client.set( - this.keys.limiter, - Number.MAX_SAFE_INTEGER, - 'PX', - expireTimeMs, - ), + await this.trace( + SpanKind.INTERNAL, + 'rateLimit', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerRateLimit]: expireTimeMs, + }); + + await this.client.then(client => + client.set( + this.keys.limiter, + Number.MAX_SAFE_INTEGER, + 'PX', + expireTimeMs, + ), + ); + }, ); } @@ -783,68 +816,102 @@ will never work with more accuracy than 1ms. */ return; } - const handleCompleted = async (result: ResultType) => { - if (!this.connection.closing) { - const completed = await job.moveToCompleted( - result, - token, - fetchNextCallback() && !(this.closing || this.paused), - ); - this.emit('completed', job, result, 'active'); - const [jobData, jobId, limitUntil, delayUntil] = completed || []; - this.updateDelays(limitUntil, delayUntil); - - return this.nextJobFromJobData(jobData, jobId, token); - } - }; - - const handleFailed = async (err: Error) => { - if (!this.connection.closing) { - try { - if (err.message == RATE_LIMIT_ERROR) { - this.limitUntil = await this.moveLimitedBackToWait(job, token); - return; - } + const { telemetryMetadata: srcPropagationMedatada } = job.opts; + + return this.trace>( + SpanKind.CONSUMER, + 'process', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.JobId]: job.id, + }); - if ( - err instanceof DelayedError || - err.name == 'DelayedError' || - err instanceof WaitingChildrenError || - err.name == 'WaitingChildrenError' - ) { - return; - } + const handleCompleted = async (result: ResultType) => { + if (!this.connection.closing) { + const completed = await job.moveToCompleted( + result, + token, + fetchNextCallback() && !(this.closing || this.paused), + ); + this.emit('completed', job, result, 'active'); - const result = await job.moveToFailed(err, token, true); - this.emit('failed', job, err, 'active'); + span?.addEvent('job completed', { + [TelemetryAttributes.JobResult]: JSON.stringify(result), + }); - if (result) { - const [jobData, jobId, limitUntil, delayUntil] = result; + const [jobData, jobId, limitUntil, delayUntil] = completed || []; this.updateDelays(limitUntil, delayUntil); + return this.nextJobFromJobData(jobData, jobId, token); } - } catch (err) { - this.emit('error', err); - // It probably means that the job has lost the lock before completion - // A worker will (or already has) moved the job back - // to the waiting list (as stalled) - } - } - }; + }; - this.emit('active', job, 'waiting'); + const handleFailed = async (err: Error) => { + if (!this.connection.closing) { + try { + // Check if the job was manually rate-limited + if (err.message == RATE_LIMIT_ERROR) { + this.limitUntil = await this.moveLimitedBackToWait(job, token); + return; + } + + if ( + err instanceof DelayedError || + err.name == 'DelayedError' || + err instanceof WaitingChildrenError || + err.name == 'WaitingChildrenError' + ) { + return; + } + + const result = await job.moveToFailed(err, token, true); + this.emit('failed', job, err, 'active'); + + span?.addEvent('job failed', { + [TelemetryAttributes.JobFailedReason]: err.message, + }); + + if (result) { + const [jobData, jobId, limitUntil, delayUntil] = result; + this.updateDelays(limitUntil, delayUntil); + return this.nextJobFromJobData(jobData, jobId, token); + } + } catch (err) { + this.emit('error', err); + // It probably means that the job has lost the lock before completion + // A worker will (or already has) moved the job back + // to the waiting list (as stalled) + span?.recordException((err).message); + } + } + }; - const inProgressItem = { job, ts: Date.now() }; + this.emit('active', job, 'waiting'); - try { - jobsInProgress.add(inProgressItem); - const result = await this.callProcessJob(job, token); - return await handleCompleted(result); - } catch (err) { - return handleFailed(err); - } finally { - jobsInProgress.delete(inProgressItem); - } + const processedOn = Date.now(); + const inProgressItem = { job, ts: processedOn }; + + try { + jobsInProgress.add(inProgressItem); + const result = await this.callProcessJob(job, token); + return await handleCompleted(result); + } catch (err) { + const failed = await handleFailed(err); + return failed; + } finally { + span?.setAttributes({ + [TelemetryAttributes.JobFinishedTimestamp]: Date.now(), + [TelemetryAttributes.JobProcessedTimestamp]: processedOn, + }); + + jobsInProgress.delete(inProgressItem); + } + }, + srcPropagationMedatada, + ); } /** @@ -852,17 +919,30 @@ will never work with more accuracy than 1ms. */ * Pauses the processing of this queue only for this worker. */ async pause(doNotWaitActive?: boolean): Promise { - if (!this.paused) { - this.paused = new Promise(resolve => { - this.resumeWorker = function () { - resolve(); - this.paused = null; // Allow pause to be checked externally for paused state. - this.resumeWorker = null; - }; - }); - await (!doNotWaitActive && this.whenCurrentJobsFinished()); - this.emit('paused'); - } + await this.trace( + SpanKind.INTERNAL, + 'pause', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerDoNotWaitActive]: doNotWaitActive, + }); + + if (!this.paused) { + this.paused = new Promise(resolve => { + this.resumeWorker = function () { + resolve(); + this.paused = null; // Allow pause to be checked externally for paused state. + this.resumeWorker = null; + }; + }); + await (!doNotWaitActive && this.whenCurrentJobsFinished()); + this.emit('paused'); + } + }, + ); } /** @@ -871,8 +951,15 @@ will never work with more accuracy than 1ms. */ */ resume(): void { if (this.resumeWorker) { - this.resumeWorker(); - this.emit('resumed'); + this.trace(SpanKind.INTERNAL, 'resume', this.name, span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + }); + + this.resumeWorker(); + this.emit('resumed'); + }); } } @@ -907,42 +994,58 @@ will never work with more accuracy than 1ms. */ * * @returns Promise that resolves when the worker has been closed. */ - close(force = false): Promise { + async close(force = false): Promise { if (this.closing) { return this.closing; } - this.closing = (async () => { - this.emit('closing', 'closing queue'); - this.abortDelayController?.abort(); - - this.resume(); - - // Define the async cleanup functions - const asyncCleanups = [ - () => { - return force || this.whenCurrentJobsFinished(false); - }, - () => this.childPool?.clean(), - () => this.blockingConnection.close(force), - () => this.connection.close(force), - ]; - - // Run cleanup functions sequentially and make sure all are run despite any errors - for (const cleanup of asyncCleanups) { - try { - await cleanup(); - } catch (err) { - this.emit('error', err); - } - } - clearTimeout(this.extendLocksTimer); - clearTimeout(this.stalledCheckTimer); + await this.trace( + SpanKind.INTERNAL, + 'close', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerForceClose]: force, + }); + + this.closing = (async () => { + this.emit('closing', 'closing queue'); + this.abortDelayController?.abort(); + + this.resume(); + + // Define the async cleanup functions + const asyncCleanups = [ + () => { + return force || this.whenCurrentJobsFinished(false); + }, + () => this.childPool?.clean(), + () => this.blockingConnection.close(force), + () => this.connection.close(force), + ]; + + // Run cleanup functions sequentially and make sure all are run despite any errors + for (const cleanup of asyncCleanups) { + try { + await cleanup(); + } catch (err) { + this.emit('error', err); + } + } - this.closed = true; - this.emit('closed'); - })(); - return this.closing; + clearTimeout(this.extendLocksTimer); + //clearTimeout(this.stalledCheckTimer); + this.stalledCheckStopper?.(); + + this.closed = true; + this.emit('closed'); + })(); + + return await this.closing; + }, + ); } /** @@ -959,18 +1062,41 @@ will never work with more accuracy than 1ms. */ */ async startStalledCheckTimer(): Promise { if (!this.opts.skipStalledCheck) { - clearTimeout(this.stalledCheckTimer); - if (!this.closing) { - try { - await this.checkConnectionError(() => this.moveStalledJobsToWait()); - this.stalledCheckTimer = setTimeout(async () => { - await this.startStalledCheckTimer(); - }, this.opts.stalledInterval); - } catch (err) { - this.emit('error', err); - } + await this.trace( + SpanKind.INTERNAL, + 'startStalledCheckTimer', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + }); + + this.stalledChecker().catch(err => { + this.emit('error', err); + }); + }, + ); + } + } + } + + private async stalledChecker() { + while (!this.closing) { + try { + await this.checkConnectionError(() => this.moveStalledJobsToWait()); + } catch (err) { + this.emit('error', err); } + + await new Promise(resolve => { + const timeout = setTimeout(resolve, this.opts.stalledInterval); + this.stalledCheckStopper = () => { + clearTimeout(timeout); + resolve(); + }; + }); } } @@ -1053,65 +1179,104 @@ will never work with more accuracy than 1ms. */ } protected async extendLocks(jobs: Job[]) { - try { - const pipeline = (await this.client).pipeline(); - for (const job of jobs) { - await this.scripts.extendLock( - job.id, - job.token, - this.opts.lockDuration, - pipeline, - ); - } - const result = (await pipeline.exec()) as [Error, string][]; - - for (const [err, jobId] of result) { - if (err) { - // TODO: signal process function that the job has been lost. - this.emit( - 'error', - new Error(`could not renew lock for job ${jobId}`), - ); + await this.trace( + SpanKind.INTERNAL, + 'extendLocks', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerJobsToExtendLocks]: jobs.map( + job => job.id, + ), + }); + + try { + const pipeline = (await this.client).pipeline(); + for (const job of jobs) { + await this.scripts.extendLock( + job.id, + job.token, + this.opts.lockDuration, + pipeline, + ); + } + const result = (await pipeline.exec()) as [Error, string][]; + + for (const [err, jobId] of result) { + if (err) { + // TODO: signal process function that the job has been lost. + this.emit( + 'error', + new Error(`could not renew lock for job ${jobId}`), + ); + } + } + } catch (err) { + this.emit('error', err); } - } - } catch (err) { - this.emit('error', err); - } + }, + ); } private async moveStalledJobsToWait() { - const chunkSize = 50; - const [failed, stalled] = await this.scripts.moveStalledJobsToWait(); + await this.trace( + SpanKind.INTERNAL, + 'moveStalledJobsToWait', + this.name, + async span => { + const chunkSize = 50; + const [failed, stalled] = await this.scripts.moveStalledJobsToWait(); + + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerStalledJobs]: stalled, + [TelemetryAttributes.WorkerFailedJobs]: failed, + }); + + stalled.forEach((jobId: string) => { + span?.addEvent('job stalled', { + [TelemetryAttributes.JobId]: jobId, + }); + this.emit('stalled', jobId, 'active'); + }); - stalled.forEach((jobId: string) => this.emit('stalled', jobId, 'active')); + const jobPromises: Promise>[] = []; + for (let i = 0; i < failed.length; i++) { + jobPromises.push( + Job.fromId( + this as MinimalQueue, + failed[i], + ), + ); - const jobPromises: Promise>[] = []; - for (let i = 0; i < failed.length; i++) { - jobPromises.push( - Job.fromId( - this as MinimalQueue, - failed[i], - ), - ); + if ((i + 1) % chunkSize === 0) { + this.notifyFailedJobs(await Promise.all(jobPromises)); + jobPromises.length = 0; + } + } - if ((i + 1) % chunkSize === 0) { this.notifyFailedJobs(await Promise.all(jobPromises)); - jobPromises.length = 0; - } - } - - this.notifyFailedJobs(await Promise.all(jobPromises)); + }, + ); } - private notifyFailedJobs(failedJobs: Job[]) { - failedJobs.forEach((job: Job) => - this.emit( - 'failed', - job, - new Error('job stalled more than allowable limit'), - 'active', - ), - ); + private notifyFailedJobs( + failedJobs: Job[], + span?: Span, + ) { + const failedReason = 'job stalled more than allowable limit'; + + failedJobs.forEach((job: Job) => { + span?.addEvent('job failed', { + [TelemetryAttributes.JobId]: job.id, + [TelemetryAttributes.JobName]: job.name, + [TelemetryAttributes.JobFailedReason]: failedReason, + }); + this.emit('failed', job, new Error(failedReason), 'active'); + }); } private moveLimitedBackToWait( diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 0c35adb541..2e6161ebee 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -149,7 +149,7 @@ if (#stalling > 0) then table.insert(failed, jobId) else - local target, isPausedOrMaxed= + local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey) -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. diff --git a/src/commands/updateJobOption-1.lua b/src/commands/updateJobOption-1.lua new file mode 100644 index 0000000000..03949faf29 --- /dev/null +++ b/src/commands/updateJobOption-1.lua @@ -0,0 +1,26 @@ +--[[ + Update a job option + + Input: + KEYS[1] Job id key + + ARGV[1] field + ARGV[2] value + + Output: + 0 - OK + -1 - Missing job. +]] +local rcall = redis.call + +if rcall("EXISTS", KEYS[1]) == 1 then -- // Make sure job exists + + local opts = rcall("HGET", KEYS[1], "opts") + local jsonOpts = cjson.decode(opts) + jsonOpts[ARGV[1]] = ARGV[2] + + rcall("HSET", KEYS[1], "opts", cjson.encode(jsonOpts)) + return 0 +else + return -1 +end diff --git a/src/enums/index.ts b/src/enums/index.ts index d6bae934f4..3cab38de6c 100644 --- a/src/enums/index.ts +++ b/src/enums/index.ts @@ -2,3 +2,4 @@ export * from './child-command'; export * from './error-code'; export * from './parent-command'; export * from './metrics-time'; +export * from './telemetry-attributes'; diff --git a/src/enums/telemetry-attributes.ts b/src/enums/telemetry-attributes.ts new file mode 100644 index 0000000000..3806242e1b --- /dev/null +++ b/src/enums/telemetry-attributes.ts @@ -0,0 +1,40 @@ +export enum TelemetryAttributes { + QueueName = 'bullmq.queue.name', + QueueOperation = 'bullmq.queue.operation', + BulkCount = 'bullmq.job.bulk.count', + BulkNames = 'bullmq.job.bulk.names', + JobName = 'bullmq.job.name', + JobId = 'bullmq.job.id', + JobKey = 'bullmq.job.key', + JobIds = 'bullmq.job.ids', + DeduplicationKey = 'bullmq.job.deduplication.key', + JobOptions = 'bullmq.job.options', + JobProgress = 'bullmq.job.progress', + QueueDrainDelay = 'bullmq.queue.drain.delay', + QueueGrace = 'bullmq.queue.grace', + QueueCleanLimit = 'bullmq.queue.clean.limit', + JobType = 'bullmq.job.type', + QueueOptions = 'bullmq.queue.options', + QueueEventMaxLength = 'bullmq.queue.event.max.length', + WorkerOptions = 'bullmq.worker.options', + WorkerName = 'bullmq.worker.name', + WorkerId = 'bullmq.worker.id', + WorkerRateLimit = 'bullmq.worker.rate.limit', + WorkerDoNotWaitActive = 'bullmq.worker.do.not.wait.active', + WorkerForceClose = 'bullmq.worker.force.close', + WorkerStalledJobs = 'bullmq.worker.stalled.jobs', + WorkerFailedJobs = 'bullmq.worker.failed.jobs', + WorkerJobsToExtendLocks = 'bullmq.worker.jobs.to.extend.locks', + JobFinishedTimestamp = 'bullmq.job.finished.timestamp', + JobProcessedTimestamp = 'bullmq.job.processed.timestamp', + JobResult = 'bullmq.job.result', + JobFailedReason = 'bullmq.job.failed.reason', +} + +export enum SpanKind { + INTERNAL = 0, + SERVER = 1, + CLIENT = 2, + PRODUCER = 3, + CONSUMER = 4, +} diff --git a/src/interfaces/base-job-options.ts b/src/interfaces/base-job-options.ts index 7d5d7d4c2a..bb10f1caa3 100644 --- a/src/interfaces/base-job-options.ts +++ b/src/interfaces/base-job-options.ts @@ -112,4 +112,9 @@ export interface BaseJobOptions extends DefaultJobOptions { * Internal property used by repeatable jobs. */ prevMillis?: number; + + /** + * TelemetryMetadata, provide for context propagation. + */ + telemetryMetadata?: string; } diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index 132c9bd6ed..bf8a6ac0be 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -24,3 +24,4 @@ export * from './sandboxed-job-processor'; export * from './sandboxed-job'; export * from './sandboxed-options'; export * from './worker-options'; +export * from './telemetry'; diff --git a/src/interfaces/queue-options.ts b/src/interfaces/queue-options.ts index 0f771403b9..b3fc75c3dc 100644 --- a/src/interfaces/queue-options.ts +++ b/src/interfaces/queue-options.ts @@ -1,6 +1,7 @@ import { AdvancedRepeatOptions } from './advanced-options'; import { DefaultJobOptions } from './base-job-options'; import { ConnectionOptions } from './redis-options'; +import { Telemetry } from './telemetry'; export enum ClientType { blocking = 'blocking', @@ -31,6 +32,11 @@ export interface QueueBaseOptions { * @defaultValue false */ skipVersionCheck?: boolean; + + /** + * Telemetry client + */ + telemetry?: Telemetry; } /** @@ -68,6 +74,11 @@ export interface QueueOptions extends QueueBaseOptions { * Advanced options for the repeatable jobs. */ settings?: AdvancedRepeatOptions; + + /** + * Telemetry client + */ + telemetry?: Telemetry; } /** diff --git a/src/interfaces/telemetry.ts b/src/interfaces/telemetry.ts new file mode 100644 index 0000000000..e55c0990dc --- /dev/null +++ b/src/interfaces/telemetry.ts @@ -0,0 +1,181 @@ +import { SpanKind } from '../enums'; + +/** + * Telemetry interface + * + * This interface allows third-party libraries to integrate their own telemetry + * system. The interface is heavily inspired by OpenTelemetry but it's not + * limited to it. + * + */ +export interface Telemetry { + /** + * Tracer instance + * + * The tracer is responsible for creating spans and propagating the context + * across the application. + */ + tracer: Tracer; + + /** + * Context manager instance + * + * The context manager is responsible for managing the context and propagating + * it across the application. + */ + contextManager: ContextManager; +} + +/** + * Context manager interface + * + * The context manager is responsible for managing the context and propagating + * it across the application. + */ +export interface ContextManager { + /** + * Creates a new context and sets it as active for the fn passed as last argument + * + * @param context + * @param fn + */ + with any>( + context: Context, + fn: A, + ): ReturnType; + + /** + * Returns the active context + */ + active(): Context; + + /** + * Returns a serialized version of the current context. The metadata + * is the mechanism used to propagate the context across a distributed + * application. + * + * @param context + */ + getMetadata(context: Context): string; + + /** + * Creates a new context from a serialized version effectively + * linking the new context to the parent context. + * + * @param activeContext + * @param metadata + */ + fromMetadata(activeContext: Context, metadata: string): Context; +} + +/** + * Tracer interface + * + */ +export interface Tracer { + /** + * startSpan creates a new Span with the given name and options on an optional + * context. If the context is not provided, the current active context should be + * used. + * + * @param name + * @param options + * @param context + */ + startSpan(name: string, options?: SpanOptions, context?: Context): Span; +} + +export interface SpanOptions { + kind: SpanKind; +} + +/** + * Span interface + */ +export interface Span { + /** + * setSpanOnContext sets the span on the context. This is useful when you want + * to propagate the span across the application. + * + * @param ctx + */ + setSpanOnContext(ctx: Context): Context; + + /** + * setAttribute sets an attribute on the span. + * + * @param ctx + */ + setAttribute(key: string, value: AttributeValue): void; + + /** + * setAttributes sets multiple attributes on the span. + * + * @param attributes + */ + setAttributes(attributes: Attributes): void; + + /** + * addEvent adds an event to the span. + * + * @param name + * @param attributes + */ + addEvent(name: string, attributes?: Attributes): void; + + /** + * recordException records an exception on the span. + * + * @param exception + * @param time + */ + recordException(exception: Exception, time?: Time): void; + + /** + * end ends the span. + * + * Note: spans must be ended so that they can be exported. + */ + end(): void; +} + +export interface Attributes { + [attribute: string]: AttributeValue | undefined; +} + +export type AttributeValue = + | string + | number + | boolean + | Array + | Array + | Array; + +export type Exception = string | ExceptionType; + +export type ExceptionType = CodeException | MessageException | NameException; + +interface CodeException { + code: string | number; + name?: string; + message?: string; + stack?: string; +} + +interface MessageException { + code?: string | number; + name?: string; + message: string; + stack?: string; +} + +interface NameException { + code?: string | number; + name: string; + message?: string; + stack?: string; +} + +export type Time = HighResolutionTime | number | Date; + +type HighResolutionTime = [number, number]; diff --git a/src/interfaces/worker-options.ts b/src/interfaces/worker-options.ts index 73324e145d..13ea4047a4 100644 --- a/src/interfaces/worker-options.ts +++ b/src/interfaces/worker-options.ts @@ -4,6 +4,7 @@ import { QueueBaseOptions } from './queue-options'; import { RateLimiterOptions } from './rate-limiter-options'; import { MetricsOptions } from './metrics-options'; import { KeepJobs } from './keep-jobs'; +import { Telemetry } from './telemetry'; import { SandboxedOptions } from './sandboxed-options'; /** @@ -136,6 +137,20 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions { * More advanced options. */ settings?: AdvancedOptions; + + /** + * Use Worker Threads instead of Child Processes. + * Note: This option can only be used when specifying + * a file for the processor argument. + * + * @default false + */ + useWorkerThreads?: boolean; + + /** + * Telemetry Addon + */ + telemetry?: Telemetry; } export interface GetNextJobOptions { diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 15cdd8ad16..fb8b74d264 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -56,4 +56,9 @@ export type RedisJobOptions = BaseJobOptions & { * If true, removes the job from its parent dependencies when it fails after all attempts. */ rdof?: boolean; + + /** + * TelemetryMetadata, provide for context propagation. + */ + tm?: string; }; diff --git a/src/types/minimal-queue.ts b/src/types/minimal-queue.ts index 2f435c07da..f27f0c1b0d 100644 --- a/src/types/minimal-queue.ts +++ b/src/types/minimal-queue.ts @@ -14,4 +14,5 @@ export type MinimalQueue = Pick< | 'emit' | 'on' | 'redisVersion' + | 'trace' >; diff --git a/tests/test_telemetry_interface.ts b/tests/test_telemetry_interface.ts new file mode 100644 index 0000000000..125db85216 --- /dev/null +++ b/tests/test_telemetry_interface.ts @@ -0,0 +1,292 @@ +import { expect, assert } from 'chai'; +import { default as IORedis } from 'ioredis'; +import { after, beforeEach, describe, it, before } from 'mocha'; +import { v4 } from 'uuid'; +import { Queue, Worker } from '../src/classes'; +import { removeAllQueueData } from '../src/utils'; +import { + Telemetry, + ContextManager, + Tracer, + Span, + SpanOptions, + Attributes, + Exception, + Time, +} from '../src/interfaces'; +import * as sinon from 'sinon'; +import { SpanKind, TelemetryAttributes } from '../src/enums'; + +describe('Telemetry', () => { + type ExtendedException = Exception & { + message: string; + }; + + const redisHost = process.env.REDIS_HOST || 'localhost'; + const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull'; + + class MockTelemetry implements Telemetry { + public tracer: Tracer; + public contextManager: ContextManager; + + constructor(name: string) { + this.tracer = new MockTracer(); + this.contextManager = new MockContextManager(); + } + } + + class MockTracer implements Tracer { + startSpan(name: string, options?: SpanOptions): Span { + return new MockSpan(name, options); + } + } + + class MockContextManager implements ContextManager { + private activeContext: Context = {} as Context; + + with any>( + context: Context, + fn: A, + ): ReturnType { + this.activeContext = context; + return fn(); + } + + active(): Context { + return this.activeContext; + } + + getMetadata(context: Context): string { + if (!context) { + return ''; + } + const metadata: Record = {}; + Object.keys(context as object).forEach(key => { + if (key.startsWith('getMetadata_')) { + const value = context[key]; + metadata[key] = value; + } + }); + return JSON.stringify(metadata); + } + + fromMetadata(activeContext: Context, metadataString: string): Context { + const newContext = { ...activeContext }; + if (metadataString) { + const metadata = JSON.parse(metadataString); + Object.keys(metadata).forEach(key => { + newContext[key] = () => metadata[key]; + }); + } + return newContext; + } + } + + class MockSpan implements Span { + attributes: Attributes = {}; + name: string; + options: SpanOptions | undefined; + exception: ExtendedException | undefined; + + constructor(name: string, options?: SpanOptions) { + this.name = name; + this.options = options; + } + + setSpanOnContext(ctx: any): any { + context['getSpan'] = () => this; + return { ...context, getMetadata_span: this['name'] }; + } + + addEvent(name: string, attributes?: Attributes): void {} + + setAttribute(key: string, value: any): void { + this.attributes[key] = value; + } + + setAttributes(attributes: Attributes): void { + this.attributes = { ...this.attributes, ...attributes }; + } + + recordException(exception: ExtendedException, time?: Time): void { + this.exception = exception; + } + + end(): void {} + } + + let telemetryClient; + + let queue: Queue; + let queueName: string; + + let connection; + before(async function () { + connection = new IORedis(redisHost, { maxRetriesPerRequest: null }); + }); + + beforeEach(async function () { + queueName = `test-${v4()}`; + telemetryClient = new MockTelemetry('mockTracer'); + + queue = new Queue(queueName, { + connection, + prefix, + telemetry: telemetryClient, + }); + }); + + afterEach(async function () { + await queue.close(); + await removeAllQueueData(new IORedis(redisHost), queueName); + }); + + after(async function () { + await connection.quit(); + }); + + describe('Queue.add', () => { + it('should correctly interact with telemetry when adding a job', async () => { + await queue.add('testJob', { foo: 'bar' }); + + const activeContext = telemetryClient.contextManager.active(); + + const span = activeContext.getSpan?.() as MockSpan; + expect(span).to.be.an.instanceOf(MockSpan); + expect(span.name).to.equal(`add ${queueName}.testJob`); + expect(span.options?.kind).to.equal(SpanKind.PRODUCER); + expect(span.attributes[TelemetryAttributes.QueueName]).to.equal( + queueName, + ); + }); + + it('should correctly handle errors and record them in telemetry', async () => { + const opts = { + repeat: { + endDate: 1, + }, + }; + + const recordExceptionSpy = sinon.spy( + MockSpan.prototype, + 'recordException', + ); + + try { + await queue.add('testJob', { someData: 'testData' }, opts); + } catch (e) { + assert(recordExceptionSpy.calledOnce); + const recordedError = recordExceptionSpy.firstCall.args[0]; + assert.equal( + recordedError.message, + 'End date must be greater than current timestamp', + ); + } finally { + recordExceptionSpy.restore(); + } + }); + }); + + describe('Queue.addBulk', () => { + it('should correctly interact with telemetry when adding multiple jobs', async () => { + const jobs = [ + { name: 'job1', data: { foo: 'bar' } }, + { name: 'job2', data: { baz: 'qux' } }, + ]; + + await queue.addBulk(jobs); + + const activeContext = telemetryClient.contextManager.active(); + const span = activeContext.getSpan?.() as MockSpan; + expect(span).to.be.an.instanceOf(MockSpan); + expect(span.name).to.equal(`addBulk ${queueName}`); + expect(span.options?.kind).to.equal(SpanKind.PRODUCER); + expect(span.attributes[TelemetryAttributes.BulkNames]).to.deep.equal( + jobs.map(job => job.name), + ); + expect(span.attributes[TelemetryAttributes.BulkCount]).to.equal( + jobs.length, + ); + }); + + it('should correctly handle errors and record them in telemetry for addBulk', async () => { + const recordExceptionSpy = sinon.spy( + MockSpan.prototype, + 'recordException', + ); + + try { + await queue.addBulk([ + { name: 'testJob1', data: { someData: 'testData1' } }, + { + name: 'testJob2', + data: { someData: 'testData2' }, + opts: { jobId: '0' }, + }, + ]); + } catch (e) { + assert(recordExceptionSpy.calledOnce); + const recordedError = recordExceptionSpy.firstCall.args[0]; + assert.equal(recordedError.message, 'Custom Ids cannot be integers'); + } finally { + recordExceptionSpy.restore(); + } + }); + }); + + describe('Worker.processJob', async () => { + it('should correctly interact with telemetry when processing a job', async () => { + const job = await queue.add('testJob', { foo: 'bar' }); + + const worker = new Worker(queueName, async () => 'some result', { + connection, + telemetry: telemetryClient, + name: 'testWorker', + }); + + await worker.waitUntilReady(); + const moveToCompletedStub = sinon.stub(job, 'moveToCompleted').resolves(); + + const startSpanSpy = sinon.spy(worker['tracer'], 'startSpan'); + + const token = 'some-token'; + + await worker.processJob(job, token, () => false, new Set()); + + const span = startSpanSpy.returnValues[0] as MockSpan; + + expect(span).to.be.an.instanceOf(MockSpan); + expect(span.name).to.equal(`process ${queueName}`); + expect(span.options?.kind).to.equal(SpanKind.CONSUMER); + expect(span.attributes[TelemetryAttributes.WorkerId]).to.equal(worker.id); + expect(span.attributes[TelemetryAttributes.WorkerName]).to.equal( + 'testWorker', + ); + expect(span.attributes[TelemetryAttributes.JobId]).to.equal(job.id); + + moveToCompletedStub.restore(); + await worker.close(); + }); + + it('should propagate context correctly between queue and worker using telemetry', async () => { + const job = await queue.add('testJob', { foo: 'bar' }); + + const worker = new Worker(queueName, async () => 'some result', { + connection, + telemetry: telemetryClient, + }); + await worker.waitUntilReady(); + + const moveToCompletedStub = sinon.stub(job, 'moveToCompleted').resolves(); + + await worker.processJob(job, 'some-token', () => false, new Set()); + + const workerActiveContext = telemetryClient.contextManager.active(); + const queueActiveContext = telemetryClient.contextManager.active(); + expect(workerActiveContext).to.equal(queueActiveContext); + + moveToCompletedStub.restore(); + await worker.close(); + }); + }); +});