From d39dca9c09b7f2d2063380d536780af04bb89098 Mon Sep 17 00:00:00 2001 From: fgozdz Date: Tue, 5 Nov 2024 12:00:36 +0100 Subject: [PATCH 1/4] feat(worker): getter for a concurrency --- src/classes/worker.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index f354c94253..86b67da1aa 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -395,6 +395,10 @@ export class Worker< this.opts.concurrency = concurrency; } + get concurrency() { + return this.opts.concurrency; + } + get repeat(): Promise { return new Promise(async resolve => { if (!this._repeat) { From 2e3c54d15a3ab327494675c7ec4ff8749e94d3af Mon Sep 17 00:00:00 2001 From: fgozdz Date: Tue, 5 Nov 2024 15:31:35 +0100 Subject: [PATCH 2/4] feat(worker): test for the getter --- tests/test_worker.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_worker.ts b/tests/test_worker.ts index a97c70f5b7..d8542039a9 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -4497,4 +4497,13 @@ describe('workers', function () { await worker.close(); }); + + it('should retrieve concurrency from getter', async () => { + const worker = new Worker(queueName, async () => {}, { connection, concurrency: 100 }); + worker.concurrency = 10; + + expect(worker.concurrency).to.equal(10); + + await worker.close(); + }); }); From 7d5c1c4ef4dd01bcc95720e8196ba88754e611b9 Mon Sep 17 00:00:00 2001 From: fgozdz Date: Thu, 7 Nov 2024 10:06:20 +0100 Subject: [PATCH 3/4] feat(worker): create private value for concurrency --- src/classes/worker.ts | 2585 +++++++++++++++++++++-------------------- 1 file changed, 1293 insertions(+), 1292 deletions(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 86b67da1aa..a78e84d4ac 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -1,1292 +1,1293 @@ -import * as fs from 'fs'; -import { URL } from 'url'; -import { Redis } from 'ioredis'; -import * as path from 'path'; -import { v4 } from 'uuid'; - -// Note: this Polyfill is only needed for Node versions < 15.4.0 -import { AbortController } from 'node-abort-controller'; - -import { - GetNextJobOptions, - IoredisListener, - JobJsonRaw, - Processor, - RedisClient, - Span, - WorkerOptions, -} from '../interfaces'; -import { MinimalQueue } from '../types'; -import { - delay, - DELAY_TIME_1, - isNotConnectionError, - isRedisInstance, -} from '../utils'; -import { QueueBase } from './queue-base'; -import { Repeat } from './repeat'; -import { ChildPool } from './child-pool'; -import { Job } from './job'; -import { RedisConnection } from './redis-connection'; -import sandbox from './sandbox'; -import { AsyncFifoQueue } from './async-fifo-queue'; -import { - DelayedError, - RateLimitError, - RATE_LIMIT_ERROR, - WaitingChildrenError, -} from './errors'; -import { SpanKind, TelemetryAttributes } from '../enums'; -import { JobScheduler } from './job-scheduler'; - -// 10 seconds is the maximum time a BRPOPLPUSH can block. -const maximumBlockTimeout = 10; - -// 30 seconds is the maximum limit until. -const maximumLimitUntil = 30000; - -// note: sandboxed processors would also like to define concurrency per process -// for better resource utilization. - -export interface WorkerListener< - DataType = any, - ResultType = any, - NameType extends string = string, -> extends IoredisListener { - /** - * Listen to 'active' event. - * - * This event is triggered when a job enters the 'active' state. - */ - active: (job: Job, prev: string) => void; - - /** - * Listen to 'closing' event. - * - * This event is triggered when the worker is closed. - */ - closed: () => void; - - /** - * Listen to 'closing' event. - * - * This event is triggered when the worker is closing. - */ - closing: (msg: string) => void; - - /** - * Listen to 'completed' event. - * - * This event is triggered when a job has successfully completed. - */ - completed: ( - job: Job, - result: ResultType, - prev: string, - ) => void; - - /** - * Listen to 'drained' event. - * - * This event is triggered when the queue has drained the waiting list. - * Note that there could still be delayed jobs waiting their timers to expire - * and this event will still be triggered as long as the waiting list has emptied. - */ - drained: () => void; - - /** - * Listen to 'error' event. - * - * This event is triggered when an error is throw. - */ - error: (failedReason: Error) => void; - - /** - * Listen to 'failed' event. - * - * This event is triggered when a job has thrown an exception. - * Note: job parameter could be received as undefined when an stalled job - * reaches the stalled limit and it is deleted by the removeOnFail option. - */ - failed: ( - job: Job | undefined, - error: Error, - prev: string, - ) => void; - - /** - * Listen to 'paused' event. - * - * This event is triggered when the queue is paused. - */ - paused: () => void; - - /** - * Listen to 'progress' event. - * - * This event is triggered when a job updates it progress, i.e. the - * Job##updateProgress() method is called. This is useful to notify - * progress or any other data from within a processor to the rest of the - * world. - */ - progress: ( - job: Job, - progress: number | object, - ) => void; - - /** - * Listen to 'ready' event. - * - * This event is triggered when blockingConnection is ready. - */ - ready: () => void; - - /** - * Listen to 'resumed' event. - * - * This event is triggered when the queue is resumed. - */ - resumed: () => void; - - /** - * Listen to 'stalled' event. - * - * This event is triggered when a job has stalled and - * has been moved back to the wait list. - */ - stalled: (jobId: string, prev: string) => void; -} - -/** - * - * This class represents a worker that is able to process jobs from the queue. - * As soon as the class is instantiated and a connection to Redis is established - * it will start processing jobs. - * - */ -export class Worker< - DataType = any, - ResultType = any, - NameType extends string = string, -> extends QueueBase { - readonly opts: WorkerOptions; - readonly id: string; - - private abortDelayController: AbortController | null = null; - private asyncFifoQueue: AsyncFifoQueue>; - private blockingConnection: RedisConnection; - private blockUntil = 0; - private childPool: ChildPool; - private drained: boolean = false; - private extendLocksTimer: NodeJS.Timeout | null = null; - private limitUntil = 0; - private resumeWorker: () => void; - - private stalledCheckStopper?: () => void; - private waiting: Promise | null = null; - private _repeat: Repeat; // To be deprecated in v6 in favor of Job Scheduler - - private _jobScheduler: JobScheduler; - - protected paused: Promise; - protected processFn: Processor; - protected running = false; - - static RateLimitError(): Error { - return new RateLimitError(); - } - - constructor( - name: string, - processor?: string | URL | null | Processor, - opts?: WorkerOptions, - Connection?: typeof RedisConnection, - ) { - super( - name, - { - ...opts, - blockingConnection: true, - }, - Connection, - ); - - if (!opts || !opts.connection) { - throw new Error('Worker requires a connection'); - } - - this.opts = { - drainDelay: 5, - concurrency: 1, - lockDuration: 30000, - maxStalledCount: 1, - stalledInterval: 30000, - autorun: true, - runRetryDelay: 15000, - ...this.opts, - }; - - if (this.opts.stalledInterval <= 0) { - throw new Error('stalledInterval must be greater than 0'); - } - - if (this.opts.drainDelay <= 0) { - throw new Error('drainDelay must be greater than 0'); - } - - this.concurrency = this.opts.concurrency; - - this.opts.lockRenewTime = - this.opts.lockRenewTime || this.opts.lockDuration / 2; - - this.id = v4(); - - if (processor) { - if (typeof processor === 'function') { - this.processFn = processor; - } else { - // SANDBOXED - if (processor instanceof URL) { - if (!fs.existsSync(processor)) { - throw new Error( - `URL ${processor} does not exist in the local file system`, - ); - } - processor = processor.href; - } else { - const supportedFileTypes = ['.js', '.ts', '.flow', '.cjs']; - const processorFile = - processor + - (supportedFileTypes.includes(path.extname(processor)) ? '' : '.js'); - - if (!fs.existsSync(processorFile)) { - throw new Error(`File ${processorFile} does not exist`); - } - } - - // Separate paths so that bundling tools can resolve dependencies easier - const dirname = path.dirname(module.filename || __filename); - const workerThreadsMainFile = path.join(dirname, 'main-worker.js'); - const spawnProcessMainFile = path.join(dirname, 'main.js'); - - let mainFilePath = this.opts.useWorkerThreads - ? workerThreadsMainFile - : spawnProcessMainFile; - - try { - fs.statSync(mainFilePath); // would throw if file not exists - } catch (_) { - const mainFile = this.opts.useWorkerThreads - ? 'main-worker.js' - : 'main.js'; - mainFilePath = path.join( - process.cwd(), - `dist/cjs/classes/${mainFile}`, - ); - fs.statSync(mainFilePath); - } - - this.childPool = new ChildPool({ - mainFile: mainFilePath, - useWorkerThreads: this.opts.useWorkerThreads, - workerForkOptions: this.opts.workerForkOptions, - workerThreadsOptions: this.opts.workerThreadsOptions, - }); - - this.processFn = sandbox( - processor, - this.childPool, - ).bind(this); - } - - if (this.opts.autorun) { - this.run().catch(error => this.emit('error', error)); - } - } - - const connectionName = - this.clientName() + (this.opts.name ? `:w:${this.opts.name}` : ''); - this.blockingConnection = new RedisConnection( - isRedisInstance(opts.connection) - ? (opts.connection).duplicate({ connectionName }) - : { ...opts.connection, connectionName }, - false, - true, - opts.skipVersionCheck, - ); - this.blockingConnection.on('error', error => this.emit('error', error)); - this.blockingConnection.on('ready', () => - setTimeout(() => this.emit('ready'), 0), - ); - } - - emit>( - event: U, - ...args: Parameters[U]> - ): boolean { - return super.emit(event, ...args); - } - - off>( - eventName: U, - listener: WorkerListener[U], - ): this { - super.off(eventName, listener); - return this; - } - - on>( - event: U, - listener: WorkerListener[U], - ): this { - super.on(event, listener); - return this; - } - - once>( - event: U, - listener: WorkerListener[U], - ): this { - super.once(event, listener); - return this; - } - - protected callProcessJob( - job: Job, - token: string, - ): Promise { - return this.processFn(job, token); - } - - protected createJob( - data: JobJsonRaw, - jobId: string, - ): Job { - return this.Job.fromJSON(this as MinimalQueue, data, jobId) as Job< - DataType, - ResultType, - NameType - >; - } - - /** - * - * Waits until the worker is ready to start processing jobs. - * In general only useful when writing tests. - * - */ - async waitUntilReady(): Promise { - await super.waitUntilReady(); - return this.blockingConnection.client; - } - - set concurrency(concurrency: number) { - if ( - typeof concurrency !== 'number' || - concurrency < 1 || - !isFinite(concurrency) - ) { - throw new Error('concurrency must be a finite number greater than 0'); - } - this.opts.concurrency = concurrency; - } - - get concurrency() { - return this.opts.concurrency; - } - - get repeat(): Promise { - return new Promise(async resolve => { - if (!this._repeat) { - const connection = await this.client; - this._repeat = new Repeat(this.name, { - ...this.opts, - connection, - }); - this._repeat.on('error', e => this.emit.bind(this, e)); - } - resolve(this._repeat); - }); - } - - get jobScheduler(): Promise { - return new Promise(async resolve => { - if (!this._jobScheduler) { - const connection = await this.client; - this._jobScheduler = new JobScheduler(this.name, { - ...this.opts, - connection, - }); - this._jobScheduler.on('error', e => this.emit.bind(this, e)); - } - resolve(this._jobScheduler); - }); - } - - async run() { - if (!this.processFn) { - throw new Error('No process function is defined.'); - } - - if (this.running) { - throw new Error('Worker is already running.'); - } - - try { - this.running = true; - - if (this.closing) { - return; - } - - await this.startStalledCheckTimer(); - - const jobsInProgress = new Set<{ job: Job; ts: number }>(); - this.startLockExtenderTimer(jobsInProgress); - - const asyncFifoQueue = (this.asyncFifoQueue = - new AsyncFifoQueue>()); - - let tokenPostfix = 0; - - const client = await this.client; - const bclient = await this.blockingConnection.client; - - /** - * This is the main loop in BullMQ. Its goals are to fetch jobs from the queue - * as efficiently as possible, providing concurrency and minimal unnecessary calls - * to Redis. - */ - while (!this.closing) { - let numTotal = asyncFifoQueue.numTotal(); - - /** - * This inner loop tries to fetch jobs concurrently, but if we are waiting for a job - * to arrive at the queue we should not try to fetch more jobs (as it would be pointless) - */ - while ( - !this.waiting && - numTotal < this.opts.concurrency && - (!this.limitUntil || numTotal == 0) - ) { - const token = `${this.id}:${tokenPostfix++}`; - - const fetchedJob = this.retryIfFailed>( - () => this._getNextJob(client, bclient, token, { block: true }), - this.opts.runRetryDelay, - ); - asyncFifoQueue.add(fetchedJob); - - numTotal = asyncFifoQueue.numTotal(); - - if (this.waiting && numTotal > 1) { - // We are waiting for jobs but we have others that we could start processing already - break; - } - - // We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls - // to Redis in high concurrency scenarios. - const job = await fetchedJob; - - // No more jobs waiting but we have others that could start processing already - if (!job && numTotal > 1) { - break; - } - - // If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting - // for processing this job. - if (this.blockUntil) { - break; - } - } - - // Since there can be undefined jobs in the queue (when a job fails or queue is empty) - // we iterate until we find a job. - let job: Job | void; - do { - job = await asyncFifoQueue.fetch(); - } while (!job && asyncFifoQueue.numQueued() > 0); - - if (job) { - const token = job.token; - asyncFifoQueue.add( - this.retryIfFailed>( - () => - this.processJob( - >job, - token, - () => asyncFifoQueue.numTotal() <= this.opts.concurrency, - jobsInProgress, - ), - this.opts.runRetryDelay, - ), - ); - } - } - - this.running = false; - return await asyncFifoQueue.waitAll(); - } catch (error) { - this.running = false; - throw error; - } - } - - /** - * Returns a promise that resolves to the next job in queue. - * @param token - worker token to be assigned to retrieved job - * @returns a Job or undefined if no job was available in the queue. - */ - async getNextJob(token: string, { block = true }: GetNextJobOptions = {}) { - const nextJob = await this._getNextJob( - await this.client, - await this.blockingConnection.client, - token, - { block }, - ); - - return this.trace>( - SpanKind.INTERNAL, - 'getNextJob', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.QueueName]: this.name, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerOptions]: JSON.stringify({ block }), - [TelemetryAttributes.JobId]: nextJob?.id, - }); - - return nextJob; - }, - nextJob?.opts.telemetryMetadata, - ); - } - - private async _getNextJob( - client: RedisClient, - bclient: RedisClient, - token: string, - { block = true }: GetNextJobOptions = {}, - ): Promise | undefined> { - if (this.paused) { - if (block) { - await this.paused; - } else { - return; - } - } - - if (this.closing) { - return; - } - - if (this.drained && block && !this.limitUntil && !this.waiting) { - this.waiting = this.waitForJob(bclient, this.blockUntil); - try { - this.blockUntil = await this.waiting; - - if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 1) { - return this.moveToActive(client, token, this.opts.name); - } - } catch (err) { - // Swallow error if locally paused or closing since we did force a disconnection - if ( - !(this.paused || this.closing) && - isNotConnectionError(err) - ) { - throw err; - } - } finally { - this.waiting = null; - } - } else { - const limitUntil = this.limitUntil; - if (limitUntil) { - this.abortDelayController?.abort(); - this.abortDelayController = new AbortController(); - await this.delay( - this.getLimitUntil(limitUntil), - this.abortDelayController, - ); - } - return this.moveToActive(client, token, this.opts.name); - } - } - - /** - * Overrides the rate limit to be active for the next jobs. - * - * @param expireTimeMs - expire time in ms of this rate limit. - */ - async rateLimit(expireTimeMs: number): Promise { - await this.trace( - SpanKind.INTERNAL, - 'rateLimit', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerRateLimit]: expireTimeMs, - }); - - await this.client.then(client => - client.set( - this.keys.limiter, - Number.MAX_SAFE_INTEGER, - 'PX', - expireTimeMs, - ), - ); - }, - ); - } - - get minimumBlockTimeout(): number { - return this.blockingConnection.capabilities.canBlockFor1Ms - ? /* 1 millisecond is chosen because the granularity of our timestamps are milliseconds. -Obviously we can still process much faster than 1 job per millisecond but delays and rate limits -will never work with more accuracy than 1ms. */ - 0.001 - : 0.002; - } - - protected async moveToActive( - client: RedisClient, - token: string, - name?: string, - ): Promise> { - const [jobData, id, limitUntil, delayUntil] = - await this.scripts.moveToActive(client, token, name); - this.updateDelays(limitUntil, delayUntil); - - return this.nextJobFromJobData(jobData, id, token); - } - - private async waitForJob( - bclient: RedisClient, - blockUntil: number, - ): Promise { - if (this.paused) { - return Infinity; - } - - let timeout: NodeJS.Timeout; - try { - if (!this.closing && !this.limitUntil) { - let blockTimeout = this.getBlockTimeout(blockUntil); - - if (blockTimeout > 0) { - blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout - ? blockTimeout - : Math.ceil(blockTimeout); - - // We cannot trust that the blocking connection stays blocking forever - // due to issues in Redis and IORedis, so we will reconnect if we - // don't get a response in the expected time. - timeout = setTimeout(async () => { - bclient.disconnect(!this.closing); - }, blockTimeout * 1000 + 1000); - - this.updateDelays(); // reset delays to avoid reusing same values in next iteration - - // Markers should only be used for un-blocking, so we will handle them in this - // function only. - const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); - - if (result) { - const [_key, member, score] = result; - - if (member) { - return parseInt(score); - } - } - } - - return 0; - } - } catch (error) { - if (isNotConnectionError(error)) { - this.emit('error', error); - } - if (!this.closing) { - await this.delay(); - } - } finally { - clearTimeout(timeout); - } - return Infinity; - } - - protected getBlockTimeout(blockUntil: number): number { - const opts: WorkerOptions = this.opts; - - // when there are delayed jobs - if (blockUntil) { - const blockDelay = blockUntil - Date.now(); - // when we reach the time to get new jobs - if (blockDelay <= 0) { - return blockDelay; - } else if (blockDelay < this.minimumBlockTimeout * 1000) { - return this.minimumBlockTimeout; - } else { - // We restrict the maximum block timeout to 10 second to avoid - // blocking the connection for too long in the case of reconnections - // reference: https://github.com/taskforcesh/bullmq/issues/1658 - return Math.min(blockDelay / 1000, maximumBlockTimeout); - } - } else { - return Math.max(opts.drainDelay, this.minimumBlockTimeout); - } - } - - protected getLimitUntil(limitUntil: number): number { - // We restrict the maximum limit until to 30 second to - // be able to promote delayed jobs while queue is rate limited - return Math.min(limitUntil, maximumLimitUntil); - } - - /** - * - * This function is exposed only for testing purposes. - */ - async delay( - milliseconds?: number, - abortController?: AbortController, - ): Promise { - await delay(milliseconds || DELAY_TIME_1, abortController); - } - - private updateDelays(limitUntil = 0, delayUntil = 0) { - this.limitUntil = Math.max(limitUntil, 0) || 0; - this.blockUntil = Math.max(delayUntil, 0) || 0; - } - - protected async nextJobFromJobData( - jobData?: JobJsonRaw, - jobId?: string, - token?: string, - ): Promise> { - if (!jobData) { - if (!this.drained) { - this.emit('drained'); - this.drained = true; - } - } else { - this.drained = false; - const job = this.createJob(jobData, jobId); - job.token = token; - - // Add next scheduled job if necessary. - if (job.opts.repeat) { - // Use new job scheduler if possible - if (job.repeatJobKey) { - const jobScheduler = await this.jobScheduler; - await jobScheduler.upsertJobScheduler( - job.repeatJobKey, - job.opts.repeat, - job.name, - job.data, - job.opts, - { override: false }, - ); - } else { - const repeat = await this.repeat; - await repeat.updateRepeatableJob(job.name, job.data, job.opts, { - override: false, - }); - } - } - return job; - } - } - - async processJob( - job: Job, - token: string, - fetchNextCallback = () => true, - jobsInProgress: Set<{ job: Job; ts: number }>, - ): Promise> { - if (!job || this.closing || this.paused) { - return; - } - - const { telemetryMetadata: srcPropagationMedatada } = job.opts; - - return this.trace>( - SpanKind.CONSUMER, - 'process', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.JobId]: job.id, - }); - - const handleCompleted = async (result: ResultType) => { - if (!this.connection.closing) { - const completed = await job.moveToCompleted( - result, - token, - fetchNextCallback() && !(this.closing || this.paused), - ); - this.emit('completed', job, result, 'active'); - - span?.addEvent('job completed', { - [TelemetryAttributes.JobResult]: JSON.stringify(result), - }); - - const [jobData, jobId, limitUntil, delayUntil] = completed || []; - this.updateDelays(limitUntil, delayUntil); - - return this.nextJobFromJobData(jobData, jobId, token); - } - }; - - const handleFailed = async (err: Error) => { - if (!this.connection.closing) { - try { - // Check if the job was manually rate-limited - if (err.message == RATE_LIMIT_ERROR) { - this.limitUntil = await this.moveLimitedBackToWait(job, token); - return; - } - - if ( - err instanceof DelayedError || - err.name == 'DelayedError' || - err instanceof WaitingChildrenError || - err.name == 'WaitingChildrenError' - ) { - return; - } - - const result = await job.moveToFailed(err, token, true); - this.emit('failed', job, err, 'active'); - - span?.addEvent('job failed', { - [TelemetryAttributes.JobFailedReason]: err.message, - }); - - if (result) { - const [jobData, jobId, limitUntil, delayUntil] = result; - this.updateDelays(limitUntil, delayUntil); - return this.nextJobFromJobData(jobData, jobId, token); - } - } catch (err) { - this.emit('error', err); - // It probably means that the job has lost the lock before completion - // A worker will (or already has) moved the job back - // to the waiting list (as stalled) - span?.recordException((err).message); - } - } - }; - - this.emit('active', job, 'waiting'); - - const processedOn = Date.now(); - const inProgressItem = { job, ts: processedOn }; - - try { - jobsInProgress.add(inProgressItem); - const result = await this.callProcessJob(job, token); - return await handleCompleted(result); - } catch (err) { - const failed = await handleFailed(err); - return failed; - } finally { - span?.setAttributes({ - [TelemetryAttributes.JobFinishedTimestamp]: Date.now(), - [TelemetryAttributes.JobProcessedTimestamp]: processedOn, - }); - - jobsInProgress.delete(inProgressItem); - } - }, - srcPropagationMedatada, - ); - } - - /** - * - * Pauses the processing of this queue only for this worker. - */ - async pause(doNotWaitActive?: boolean): Promise { - await this.trace( - SpanKind.INTERNAL, - 'pause', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerDoNotWaitActive]: doNotWaitActive, - }); - - if (!this.paused) { - this.paused = new Promise(resolve => { - this.resumeWorker = function () { - resolve(); - this.paused = null; // Allow pause to be checked externally for paused state. - this.resumeWorker = null; - }; - }); - await (!doNotWaitActive && this.whenCurrentJobsFinished()); - this.emit('paused'); - } - }, - ); - } - - /** - * - * Resumes processing of this worker (if paused). - */ - resume(): void { - if (this.resumeWorker) { - this.trace(SpanKind.INTERNAL, 'resume', this.name, span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - }); - - this.resumeWorker(); - this.emit('resumed'); - }); - } - } - - /** - * - * Checks if worker is paused. - * - * @returns true if worker is paused, false otherwise. - */ - isPaused(): boolean { - return !!this.paused; - } - - /** - * - * Checks if worker is currently running. - * - * @returns true if worker is running, false otherwise. - */ - isRunning(): boolean { - return this.running; - } - - /** - * - * Closes the worker and related redis connections. - * - * This method waits for current jobs to finalize before returning. - * - * @param force - Use force boolean parameter if you do not want to wait for - * current jobs to be processed. - * - * @returns Promise that resolves when the worker has been closed. - */ - async close(force = false): Promise { - if (this.closing) { - return this.closing; - } - - await this.trace( - SpanKind.INTERNAL, - 'close', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerForceClose]: force, - }); - - this.closing = (async () => { - this.emit('closing', 'closing queue'); - this.abortDelayController?.abort(); - - this.resume(); - - // Define the async cleanup functions - const asyncCleanups = [ - () => { - return force || this.whenCurrentJobsFinished(false); - }, - () => this.childPool?.clean(), - () => this.blockingConnection.close(force), - () => this.connection.close(force), - ]; - - // Run cleanup functions sequentially and make sure all are run despite any errors - for (const cleanup of asyncCleanups) { - try { - await cleanup(); - } catch (err) { - this.emit('error', err); - } - } - - clearTimeout(this.extendLocksTimer); - //clearTimeout(this.stalledCheckTimer); - this.stalledCheckStopper?.(); - - this.closed = true; - this.emit('closed'); - })(); - - return await this.closing; - }, - ); - } - - /** - * - * Manually starts the stalled checker. - * The check will run once as soon as this method is called, and - * then every opts.stalledInterval milliseconds until the worker is closed. - * Note: Normally you do not need to call this method, since the stalled checker - * is automatically started when the worker starts processing jobs after - * calling run. However if you want to process the jobs manually you need - * to call this method to start the stalled checker. - * - * @see {@link https://docs.bullmq.io/patterns/manually-fetching-jobs} - */ - async startStalledCheckTimer(): Promise { - if (!this.opts.skipStalledCheck) { - if (!this.closing) { - await this.trace( - SpanKind.INTERNAL, - 'startStalledCheckTimer', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - }); - - this.stalledChecker().catch(err => { - this.emit('error', err); - }); - }, - ); - } - } - } - - private async stalledChecker() { - while (!this.closing) { - try { - await this.checkConnectionError(() => this.moveStalledJobsToWait()); - } catch (err) { - this.emit('error', err); - } - - await new Promise(resolve => { - const timeout = setTimeout(resolve, this.opts.stalledInterval); - this.stalledCheckStopper = () => { - clearTimeout(timeout); - resolve(); - }; - }); - } - } - - private startLockExtenderTimer( - jobsInProgress: Set<{ job: Job; ts: number }>, - ): void { - if (!this.opts.skipLockRenewal) { - clearTimeout(this.extendLocksTimer); - - if (!this.closed) { - this.extendLocksTimer = setTimeout(async () => { - // Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime - const now = Date.now(); - const jobsToExtend = []; - - for (const item of jobsInProgress) { - const { job, ts } = item; - if (!ts) { - item.ts = now; - continue; - } - - if (ts + this.opts.lockRenewTime / 2 < now) { - item.ts = now; - jobsToExtend.push(job); - } - } - - try { - if (jobsToExtend.length) { - await this.extendLocks(jobsToExtend); - } - } catch (err) { - this.emit('error', err); - } - - this.startLockExtenderTimer(jobsInProgress); - }, this.opts.lockRenewTime / 2); - } - } - } - - /** - * Returns a promise that resolves when active jobs are cleared - * - * @returns - */ - private async whenCurrentJobsFinished(reconnect = true) { - // - // Force reconnection of blocking connection to abort blocking redis call immediately. - // - if (this.waiting) { - // If we are not going to reconnect, we will not wait for the disconnection. - await this.blockingConnection.disconnect(reconnect); - } else { - reconnect = false; - } - - if (this.asyncFifoQueue) { - await this.asyncFifoQueue.waitAll(); - } - - reconnect && (await this.blockingConnection.reconnect()); - } - - private async retryIfFailed(fn: () => Promise, delayInMs: number) { - const retry = 1; - do { - try { - return await fn(); - } catch (err) { - this.emit('error', err); - if (delayInMs) { - await this.delay(delayInMs); - } else { - return; - } - } - } while (retry); - } - - protected async extendLocks(jobs: Job[]) { - await this.trace( - SpanKind.INTERNAL, - 'extendLocks', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerJobsToExtendLocks]: jobs.map( - job => job.id, - ), - }); - - try { - const pipeline = (await this.client).pipeline(); - for (const job of jobs) { - await this.scripts.extendLock( - job.id, - job.token, - this.opts.lockDuration, - pipeline, - ); - } - const result = (await pipeline.exec()) as [Error, string][]; - - for (const [err, jobId] of result) { - if (err) { - // TODO: signal process function that the job has been lost. - this.emit( - 'error', - new Error(`could not renew lock for job ${jobId}`), - ); - } - } - } catch (err) { - this.emit('error', err); - } - }, - ); - } - - private async moveStalledJobsToWait() { - await this.trace( - SpanKind.INTERNAL, - 'moveStalledJobsToWait', - this.name, - async span => { - const chunkSize = 50; - const [failed, stalled] = await this.scripts.moveStalledJobsToWait(); - - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerStalledJobs]: stalled, - [TelemetryAttributes.WorkerFailedJobs]: failed, - }); - - stalled.forEach((jobId: string) => { - span?.addEvent('job stalled', { - [TelemetryAttributes.JobId]: jobId, - }); - this.emit('stalled', jobId, 'active'); - }); - - const jobPromises: Promise>[] = []; - for (let i = 0; i < failed.length; i++) { - jobPromises.push( - Job.fromId( - this as MinimalQueue, - failed[i], - ), - ); - - if ((i + 1) % chunkSize === 0) { - this.notifyFailedJobs(await Promise.all(jobPromises)); - jobPromises.length = 0; - } - } - - this.notifyFailedJobs(await Promise.all(jobPromises)); - }, - ); - } - - private notifyFailedJobs( - failedJobs: Job[], - span?: Span, - ) { - const failedReason = 'job stalled more than allowable limit'; - - failedJobs.forEach((job: Job) => { - span?.addEvent('job failed', { - [TelemetryAttributes.JobId]: job.id, - [TelemetryAttributes.JobName]: job.name, - [TelemetryAttributes.JobFailedReason]: failedReason, - }); - this.emit('failed', job, new Error(failedReason), 'active'); - }); - } - - private moveLimitedBackToWait( - job: Job, - token: string, - ) { - return this.scripts.moveJobFromActiveToWait(job.id, token); - } -} +import * as fs from 'fs'; +import { URL } from 'url'; +import { Redis } from 'ioredis'; +import * as path from 'path'; +import { v4 } from 'uuid'; + +// Note: this Polyfill is only needed for Node versions < 15.4.0 +import { AbortController } from 'node-abort-controller'; + +import { + GetNextJobOptions, + IoredisListener, + JobJsonRaw, + Processor, + RedisClient, + Span, + WorkerOptions, +} from '../interfaces'; +import { MinimalQueue } from '../types'; +import { + delay, + DELAY_TIME_1, + isNotConnectionError, + isRedisInstance, +} from '../utils'; +import { QueueBase } from './queue-base'; +import { Repeat } from './repeat'; +import { ChildPool } from './child-pool'; +import { Job } from './job'; +import { RedisConnection } from './redis-connection'; +import sandbox from './sandbox'; +import { AsyncFifoQueue } from './async-fifo-queue'; +import { + DelayedError, + RateLimitError, + RATE_LIMIT_ERROR, + WaitingChildrenError, +} from './errors'; +import { SpanKind, TelemetryAttributes } from '../enums'; +import { JobScheduler } from './job-scheduler'; + +// 10 seconds is the maximum time a BRPOPLPUSH can block. +const maximumBlockTimeout = 10; + +// 30 seconds is the maximum limit until. +const maximumLimitUntil = 30000; + +// note: sandboxed processors would also like to define concurrency per process +// for better resource utilization. + +export interface WorkerListener< + DataType = any, + ResultType = any, + NameType extends string = string, +> extends IoredisListener { + /** + * Listen to 'active' event. + * + * This event is triggered when a job enters the 'active' state. + */ + active: (job: Job, prev: string) => void; + + /** + * Listen to 'closing' event. + * + * This event is triggered when the worker is closed. + */ + closed: () => void; + + /** + * Listen to 'closing' event. + * + * This event is triggered when the worker is closing. + */ + closing: (msg: string) => void; + + /** + * Listen to 'completed' event. + * + * This event is triggered when a job has successfully completed. + */ + completed: ( + job: Job, + result: ResultType, + prev: string, + ) => void; + + /** + * Listen to 'drained' event. + * + * This event is triggered when the queue has drained the waiting list. + * Note that there could still be delayed jobs waiting their timers to expire + * and this event will still be triggered as long as the waiting list has emptied. + */ + drained: () => void; + + /** + * Listen to 'error' event. + * + * This event is triggered when an error is throw. + */ + error: (failedReason: Error) => void; + + /** + * Listen to 'failed' event. + * + * This event is triggered when a job has thrown an exception. + * Note: job parameter could be received as undefined when an stalled job + * reaches the stalled limit and it is deleted by the removeOnFail option. + */ + failed: ( + job: Job | undefined, + error: Error, + prev: string, + ) => void; + + /** + * Listen to 'paused' event. + * + * This event is triggered when the queue is paused. + */ + paused: () => void; + + /** + * Listen to 'progress' event. + * + * This event is triggered when a job updates it progress, i.e. the + * Job##updateProgress() method is called. This is useful to notify + * progress or any other data from within a processor to the rest of the + * world. + */ + progress: ( + job: Job, + progress: number | object, + ) => void; + + /** + * Listen to 'ready' event. + * + * This event is triggered when blockingConnection is ready. + */ + ready: () => void; + + /** + * Listen to 'resumed' event. + * + * This event is triggered when the queue is resumed. + */ + resumed: () => void; + + /** + * Listen to 'stalled' event. + * + * This event is triggered when a job has stalled and + * has been moved back to the wait list. + */ + stalled: (jobId: string, prev: string) => void; +} + +/** + * + * This class represents a worker that is able to process jobs from the queue. + * As soon as the class is instantiated and a connection to Redis is established + * it will start processing jobs. + * + */ +export class Worker< + DataType = any, + ResultType = any, + NameType extends string = string, +> extends QueueBase { + readonly opts: WorkerOptions; + readonly id: string; + + private abortDelayController: AbortController | null = null; + private asyncFifoQueue: AsyncFifoQueue>; + private blockingConnection: RedisConnection; + private blockUntil = 0; + private _concurrency: number; + private childPool: ChildPool; + private drained: boolean = false; + private extendLocksTimer: NodeJS.Timeout | null = null; + private limitUntil = 0; + private resumeWorker: () => void; + + private stalledCheckStopper?: () => void; + private waiting: Promise | null = null; + private _repeat: Repeat; // To be deprecated in v6 in favor of Job Scheduler + + private _jobScheduler: JobScheduler; + + protected paused: Promise; + protected processFn: Processor; + protected running = false; + + static RateLimitError(): Error { + return new RateLimitError(); + } + + constructor( + name: string, + processor?: string | URL | null | Processor, + opts?: WorkerOptions, + Connection?: typeof RedisConnection, + ) { + super( + name, + { + ...opts, + blockingConnection: true, + }, + Connection, + ); + + if (!opts || !opts.connection) { + throw new Error('Worker requires a connection'); + } + + this.opts = { + drainDelay: 5, + concurrency: 1, + lockDuration: 30000, + maxStalledCount: 1, + stalledInterval: 30000, + autorun: true, + runRetryDelay: 15000, + ...this.opts, + }; + + if (this.opts.stalledInterval <= 0) { + throw new Error('stalledInterval must be greater than 0'); + } + + if (this.opts.drainDelay <= 0) { + throw new Error('drainDelay must be greater than 0'); + } + + this.concurrency = this.opts.concurrency; + + this.opts.lockRenewTime = + this.opts.lockRenewTime || this.opts.lockDuration / 2; + + this.id = v4(); + + if (processor) { + if (typeof processor === 'function') { + this.processFn = processor; + } else { + // SANDBOXED + if (processor instanceof URL) { + if (!fs.existsSync(processor)) { + throw new Error( + `URL ${processor} does not exist in the local file system`, + ); + } + processor = processor.href; + } else { + const supportedFileTypes = ['.js', '.ts', '.flow', '.cjs']; + const processorFile = + processor + + (supportedFileTypes.includes(path.extname(processor)) ? '' : '.js'); + + if (!fs.existsSync(processorFile)) { + throw new Error(`File ${processorFile} does not exist`); + } + } + + // Separate paths so that bundling tools can resolve dependencies easier + const dirname = path.dirname(module.filename || __filename); + const workerThreadsMainFile = path.join(dirname, 'main-worker.js'); + const spawnProcessMainFile = path.join(dirname, 'main.js'); + + let mainFilePath = this.opts.useWorkerThreads + ? workerThreadsMainFile + : spawnProcessMainFile; + + try { + fs.statSync(mainFilePath); // would throw if file not exists + } catch (_) { + const mainFile = this.opts.useWorkerThreads + ? 'main-worker.js' + : 'main.js'; + mainFilePath = path.join( + process.cwd(), + `dist/cjs/classes/${mainFile}`, + ); + fs.statSync(mainFilePath); + } + + this.childPool = new ChildPool({ + mainFile: mainFilePath, + useWorkerThreads: this.opts.useWorkerThreads, + workerForkOptions: this.opts.workerForkOptions, + workerThreadsOptions: this.opts.workerThreadsOptions, + }); + + this.processFn = sandbox( + processor, + this.childPool, + ).bind(this); + } + + if (this.opts.autorun) { + this.run().catch(error => this.emit('error', error)); + } + } + + const connectionName = + this.clientName() + (this.opts.name ? `:w:${this.opts.name}` : ''); + this.blockingConnection = new RedisConnection( + isRedisInstance(opts.connection) + ? (opts.connection).duplicate({ connectionName }) + : { ...opts.connection, connectionName }, + false, + true, + opts.skipVersionCheck, + ); + this.blockingConnection.on('error', error => this.emit('error', error)); + this.blockingConnection.on('ready', () => + setTimeout(() => this.emit('ready'), 0), + ); + } + + emit>( + event: U, + ...args: Parameters[U]> + ): boolean { + return super.emit(event, ...args); + } + + off>( + eventName: U, + listener: WorkerListener[U], + ): this { + super.off(eventName, listener); + return this; + } + + on>( + event: U, + listener: WorkerListener[U], + ): this { + super.on(event, listener); + return this; + } + + once>( + event: U, + listener: WorkerListener[U], + ): this { + super.once(event, listener); + return this; + } + + protected callProcessJob( + job: Job, + token: string, + ): Promise { + return this.processFn(job, token); + } + + protected createJob( + data: JobJsonRaw, + jobId: string, + ): Job { + return this.Job.fromJSON(this as MinimalQueue, data, jobId) as Job< + DataType, + ResultType, + NameType + >; + } + + /** + * + * Waits until the worker is ready to start processing jobs. + * In general only useful when writing tests. + * + */ + async waitUntilReady(): Promise { + await super.waitUntilReady(); + return this.blockingConnection.client; + } + + set concurrency(concurrency: number) { + if ( + typeof concurrency !== 'number' || + concurrency < 1 || + !isFinite(concurrency) + ) { + throw new Error('concurrency must be a finite number greater than 0'); + } + this._concurrency = concurrency; + } + + get concurrency() { + return this._concurrency; + } + + get repeat(): Promise { + return new Promise(async resolve => { + if (!this._repeat) { + const connection = await this.client; + this._repeat = new Repeat(this.name, { + ...this.opts, + connection, + }); + this._repeat.on('error', e => this.emit.bind(this, e)); + } + resolve(this._repeat); + }); + } + + get jobScheduler(): Promise { + return new Promise(async resolve => { + if (!this._jobScheduler) { + const connection = await this.client; + this._jobScheduler = new JobScheduler(this.name, { + ...this.opts, + connection, + }); + this._jobScheduler.on('error', e => this.emit.bind(this, e)); + } + resolve(this._jobScheduler); + }); + } + + async run() { + if (!this.processFn) { + throw new Error('No process function is defined.'); + } + + if (this.running) { + throw new Error('Worker is already running.'); + } + + try { + this.running = true; + + if (this.closing) { + return; + } + + await this.startStalledCheckTimer(); + + const jobsInProgress = new Set<{ job: Job; ts: number }>(); + this.startLockExtenderTimer(jobsInProgress); + + const asyncFifoQueue = (this.asyncFifoQueue = + new AsyncFifoQueue>()); + + let tokenPostfix = 0; + + const client = await this.client; + const bclient = await this.blockingConnection.client; + + /** + * This is the main loop in BullMQ. Its goals are to fetch jobs from the queue + * as efficiently as possible, providing concurrency and minimal unnecessary calls + * to Redis. + */ + while (!this.closing) { + let numTotal = asyncFifoQueue.numTotal(); + + /** + * This inner loop tries to fetch jobs concurrently, but if we are waiting for a job + * to arrive at the queue we should not try to fetch more jobs (as it would be pointless) + */ + while ( + !this.waiting && + numTotal < this._concurrency && + (!this.limitUntil || numTotal == 0) + ) { + const token = `${this.id}:${tokenPostfix++}`; + + const fetchedJob = this.retryIfFailed>( + () => this._getNextJob(client, bclient, token, { block: true }), + this.opts.runRetryDelay, + ); + asyncFifoQueue.add(fetchedJob); + + numTotal = asyncFifoQueue.numTotal(); + + if (this.waiting && numTotal > 1) { + // We are waiting for jobs but we have others that we could start processing already + break; + } + + // We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls + // to Redis in high concurrency scenarios. + const job = await fetchedJob; + + // No more jobs waiting but we have others that could start processing already + if (!job && numTotal > 1) { + break; + } + + // If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting + // for processing this job. + if (this.blockUntil) { + break; + } + } + + // Since there can be undefined jobs in the queue (when a job fails or queue is empty) + // we iterate until we find a job. + let job: Job | void; + do { + job = await asyncFifoQueue.fetch(); + } while (!job && asyncFifoQueue.numQueued() > 0); + + if (job) { + const token = job.token; + asyncFifoQueue.add( + this.retryIfFailed>( + () => + this.processJob( + >job, + token, + () => asyncFifoQueue.numTotal() <= this._concurrency, + jobsInProgress, + ), + this.opts.runRetryDelay, + ), + ); + } + } + + this.running = false; + return await asyncFifoQueue.waitAll(); + } catch (error) { + this.running = false; + throw error; + } + } + + /** + * Returns a promise that resolves to the next job in queue. + * @param token - worker token to be assigned to retrieved job + * @returns a Job or undefined if no job was available in the queue. + */ + async getNextJob(token: string, { block = true }: GetNextJobOptions = {}) { + const nextJob = await this._getNextJob( + await this.client, + await this.blockingConnection.client, + token, + { block }, + ); + + return this.trace>( + SpanKind.INTERNAL, + 'getNextJob', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.QueueName]: this.name, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerOptions]: JSON.stringify({ block }), + [TelemetryAttributes.JobId]: nextJob?.id, + }); + + return nextJob; + }, + nextJob?.opts.telemetryMetadata, + ); + } + + private async _getNextJob( + client: RedisClient, + bclient: RedisClient, + token: string, + { block = true }: GetNextJobOptions = {}, + ): Promise | undefined> { + if (this.paused) { + if (block) { + await this.paused; + } else { + return; + } + } + + if (this.closing) { + return; + } + + if (this.drained && block && !this.limitUntil && !this.waiting) { + this.waiting = this.waitForJob(bclient, this.blockUntil); + try { + this.blockUntil = await this.waiting; + + if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 1) { + return this.moveToActive(client, token, this.opts.name); + } + } catch (err) { + // Swallow error if locally paused or closing since we did force a disconnection + if ( + !(this.paused || this.closing) && + isNotConnectionError(err) + ) { + throw err; + } + } finally { + this.waiting = null; + } + } else { + const limitUntil = this.limitUntil; + if (limitUntil) { + this.abortDelayController?.abort(); + this.abortDelayController = new AbortController(); + await this.delay( + this.getLimitUntil(limitUntil), + this.abortDelayController, + ); + } + return this.moveToActive(client, token, this.opts.name); + } + } + + /** + * Overrides the rate limit to be active for the next jobs. + * + * @param expireTimeMs - expire time in ms of this rate limit. + */ + async rateLimit(expireTimeMs: number): Promise { + await this.trace( + SpanKind.INTERNAL, + 'rateLimit', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerRateLimit]: expireTimeMs, + }); + + await this.client.then(client => + client.set( + this.keys.limiter, + Number.MAX_SAFE_INTEGER, + 'PX', + expireTimeMs, + ), + ); + }, + ); + } + + get minimumBlockTimeout(): number { + return this.blockingConnection.capabilities.canBlockFor1Ms + ? /* 1 millisecond is chosen because the granularity of our timestamps are milliseconds. +Obviously we can still process much faster than 1 job per millisecond but delays and rate limits +will never work with more accuracy than 1ms. */ + 0.001 + : 0.002; + } + + protected async moveToActive( + client: RedisClient, + token: string, + name?: string, + ): Promise> { + const [jobData, id, limitUntil, delayUntil] = + await this.scripts.moveToActive(client, token, name); + this.updateDelays(limitUntil, delayUntil); + + return this.nextJobFromJobData(jobData, id, token); + } + + private async waitForJob( + bclient: RedisClient, + blockUntil: number, + ): Promise { + if (this.paused) { + return Infinity; + } + + let timeout: NodeJS.Timeout; + try { + if (!this.closing && !this.limitUntil) { + let blockTimeout = this.getBlockTimeout(blockUntil); + + if (blockTimeout > 0) { + blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout + ? blockTimeout + : Math.ceil(blockTimeout); + + // We cannot trust that the blocking connection stays blocking forever + // due to issues in Redis and IORedis, so we will reconnect if we + // don't get a response in the expected time. + timeout = setTimeout(async () => { + bclient.disconnect(!this.closing); + }, blockTimeout * 1000 + 1000); + + this.updateDelays(); // reset delays to avoid reusing same values in next iteration + + // Markers should only be used for un-blocking, so we will handle them in this + // function only. + const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); + + if (result) { + const [_key, member, score] = result; + + if (member) { + return parseInt(score); + } + } + } + + return 0; + } + } catch (error) { + if (isNotConnectionError(error)) { + this.emit('error', error); + } + if (!this.closing) { + await this.delay(); + } + } finally { + clearTimeout(timeout); + } + return Infinity; + } + + protected getBlockTimeout(blockUntil: number): number { + const opts: WorkerOptions = this.opts; + + // when there are delayed jobs + if (blockUntil) { + const blockDelay = blockUntil - Date.now(); + // when we reach the time to get new jobs + if (blockDelay <= 0) { + return blockDelay; + } else if (blockDelay < this.minimumBlockTimeout * 1000) { + return this.minimumBlockTimeout; + } else { + // We restrict the maximum block timeout to 10 second to avoid + // blocking the connection for too long in the case of reconnections + // reference: https://github.com/taskforcesh/bullmq/issues/1658 + return Math.min(blockDelay / 1000, maximumBlockTimeout); + } + } else { + return Math.max(opts.drainDelay, this.minimumBlockTimeout); + } + } + + protected getLimitUntil(limitUntil: number): number { + // We restrict the maximum limit until to 30 second to + // be able to promote delayed jobs while queue is rate limited + return Math.min(limitUntil, maximumLimitUntil); + } + + /** + * + * This function is exposed only for testing purposes. + */ + async delay( + milliseconds?: number, + abortController?: AbortController, + ): Promise { + await delay(milliseconds || DELAY_TIME_1, abortController); + } + + private updateDelays(limitUntil = 0, delayUntil = 0) { + this.limitUntil = Math.max(limitUntil, 0) || 0; + this.blockUntil = Math.max(delayUntil, 0) || 0; + } + + protected async nextJobFromJobData( + jobData?: JobJsonRaw, + jobId?: string, + token?: string, + ): Promise> { + if (!jobData) { + if (!this.drained) { + this.emit('drained'); + this.drained = true; + } + } else { + this.drained = false; + const job = this.createJob(jobData, jobId); + job.token = token; + + // Add next scheduled job if necessary. + if (job.opts.repeat) { + // Use new job scheduler if possible + if (job.repeatJobKey) { + const jobScheduler = await this.jobScheduler; + await jobScheduler.upsertJobScheduler( + job.repeatJobKey, + job.opts.repeat, + job.name, + job.data, + job.opts, + { override: false }, + ); + } else { + const repeat = await this.repeat; + await repeat.updateRepeatableJob(job.name, job.data, job.opts, { + override: false, + }); + } + } + return job; + } + } + + async processJob( + job: Job, + token: string, + fetchNextCallback = () => true, + jobsInProgress: Set<{ job: Job; ts: number }>, + ): Promise> { + if (!job || this.closing || this.paused) { + return; + } + + const { telemetryMetadata: srcPropagationMedatada } = job.opts; + + return this.trace>( + SpanKind.CONSUMER, + 'process', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.JobId]: job.id, + }); + + const handleCompleted = async (result: ResultType) => { + if (!this.connection.closing) { + const completed = await job.moveToCompleted( + result, + token, + fetchNextCallback() && !(this.closing || this.paused), + ); + this.emit('completed', job, result, 'active'); + + span?.addEvent('job completed', { + [TelemetryAttributes.JobResult]: JSON.stringify(result), + }); + + const [jobData, jobId, limitUntil, delayUntil] = completed || []; + this.updateDelays(limitUntil, delayUntil); + + return this.nextJobFromJobData(jobData, jobId, token); + } + }; + + const handleFailed = async (err: Error) => { + if (!this.connection.closing) { + try { + // Check if the job was manually rate-limited + if (err.message == RATE_LIMIT_ERROR) { + this.limitUntil = await this.moveLimitedBackToWait(job, token); + return; + } + + if ( + err instanceof DelayedError || + err.name == 'DelayedError' || + err instanceof WaitingChildrenError || + err.name == 'WaitingChildrenError' + ) { + return; + } + + const result = await job.moveToFailed(err, token, true); + this.emit('failed', job, err, 'active'); + + span?.addEvent('job failed', { + [TelemetryAttributes.JobFailedReason]: err.message, + }); + + if (result) { + const [jobData, jobId, limitUntil, delayUntil] = result; + this.updateDelays(limitUntil, delayUntil); + return this.nextJobFromJobData(jobData, jobId, token); + } + } catch (err) { + this.emit('error', err); + // It probably means that the job has lost the lock before completion + // A worker will (or already has) moved the job back + // to the waiting list (as stalled) + span?.recordException((err).message); + } + } + }; + + this.emit('active', job, 'waiting'); + + const processedOn = Date.now(); + const inProgressItem = { job, ts: processedOn }; + + try { + jobsInProgress.add(inProgressItem); + const result = await this.callProcessJob(job, token); + return await handleCompleted(result); + } catch (err) { + const failed = await handleFailed(err); + return failed; + } finally { + span?.setAttributes({ + [TelemetryAttributes.JobFinishedTimestamp]: Date.now(), + [TelemetryAttributes.JobProcessedTimestamp]: processedOn, + }); + + jobsInProgress.delete(inProgressItem); + } + }, + srcPropagationMedatada, + ); + } + + /** + * + * Pauses the processing of this queue only for this worker. + */ + async pause(doNotWaitActive?: boolean): Promise { + await this.trace( + SpanKind.INTERNAL, + 'pause', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerDoNotWaitActive]: doNotWaitActive, + }); + + if (!this.paused) { + this.paused = new Promise(resolve => { + this.resumeWorker = function () { + resolve(); + this.paused = null; // Allow pause to be checked externally for paused state. + this.resumeWorker = null; + }; + }); + await (!doNotWaitActive && this.whenCurrentJobsFinished()); + this.emit('paused'); + } + }, + ); + } + + /** + * + * Resumes processing of this worker (if paused). + */ + resume(): void { + if (this.resumeWorker) { + this.trace(SpanKind.INTERNAL, 'resume', this.name, span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + }); + + this.resumeWorker(); + this.emit('resumed'); + }); + } + } + + /** + * + * Checks if worker is paused. + * + * @returns true if worker is paused, false otherwise. + */ + isPaused(): boolean { + return !!this.paused; + } + + /** + * + * Checks if worker is currently running. + * + * @returns true if worker is running, false otherwise. + */ + isRunning(): boolean { + return this.running; + } + + /** + * + * Closes the worker and related redis connections. + * + * This method waits for current jobs to finalize before returning. + * + * @param force - Use force boolean parameter if you do not want to wait for + * current jobs to be processed. + * + * @returns Promise that resolves when the worker has been closed. + */ + async close(force = false): Promise { + if (this.closing) { + return this.closing; + } + + await this.trace( + SpanKind.INTERNAL, + 'close', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerForceClose]: force, + }); + + this.closing = (async () => { + this.emit('closing', 'closing queue'); + this.abortDelayController?.abort(); + + this.resume(); + + // Define the async cleanup functions + const asyncCleanups = [ + () => { + return force || this.whenCurrentJobsFinished(false); + }, + () => this.childPool?.clean(), + () => this.blockingConnection.close(force), + () => this.connection.close(force), + ]; + + // Run cleanup functions sequentially and make sure all are run despite any errors + for (const cleanup of asyncCleanups) { + try { + await cleanup(); + } catch (err) { + this.emit('error', err); + } + } + + clearTimeout(this.extendLocksTimer); + //clearTimeout(this.stalledCheckTimer); + this.stalledCheckStopper?.(); + + this.closed = true; + this.emit('closed'); + })(); + + return await this.closing; + }, + ); + } + + /** + * + * Manually starts the stalled checker. + * The check will run once as soon as this method is called, and + * then every opts.stalledInterval milliseconds until the worker is closed. + * Note: Normally you do not need to call this method, since the stalled checker + * is automatically started when the worker starts processing jobs after + * calling run. However if you want to process the jobs manually you need + * to call this method to start the stalled checker. + * + * @see {@link https://docs.bullmq.io/patterns/manually-fetching-jobs} + */ + async startStalledCheckTimer(): Promise { + if (!this.opts.skipStalledCheck) { + if (!this.closing) { + await this.trace( + SpanKind.INTERNAL, + 'startStalledCheckTimer', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + }); + + this.stalledChecker().catch(err => { + this.emit('error', err); + }); + }, + ); + } + } + } + + private async stalledChecker() { + while (!this.closing) { + try { + await this.checkConnectionError(() => this.moveStalledJobsToWait()); + } catch (err) { + this.emit('error', err); + } + + await new Promise(resolve => { + const timeout = setTimeout(resolve, this.opts.stalledInterval); + this.stalledCheckStopper = () => { + clearTimeout(timeout); + resolve(); + }; + }); + } + } + + private startLockExtenderTimer( + jobsInProgress: Set<{ job: Job; ts: number }>, + ): void { + if (!this.opts.skipLockRenewal) { + clearTimeout(this.extendLocksTimer); + + if (!this.closed) { + this.extendLocksTimer = setTimeout(async () => { + // Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime + const now = Date.now(); + const jobsToExtend = []; + + for (const item of jobsInProgress) { + const { job, ts } = item; + if (!ts) { + item.ts = now; + continue; + } + + if (ts + this.opts.lockRenewTime / 2 < now) { + item.ts = now; + jobsToExtend.push(job); + } + } + + try { + if (jobsToExtend.length) { + await this.extendLocks(jobsToExtend); + } + } catch (err) { + this.emit('error', err); + } + + this.startLockExtenderTimer(jobsInProgress); + }, this.opts.lockRenewTime / 2); + } + } + } + + /** + * Returns a promise that resolves when active jobs are cleared + * + * @returns + */ + private async whenCurrentJobsFinished(reconnect = true) { + // + // Force reconnection of blocking connection to abort blocking redis call immediately. + // + if (this.waiting) { + // If we are not going to reconnect, we will not wait for the disconnection. + await this.blockingConnection.disconnect(reconnect); + } else { + reconnect = false; + } + + if (this.asyncFifoQueue) { + await this.asyncFifoQueue.waitAll(); + } + + reconnect && (await this.blockingConnection.reconnect()); + } + + private async retryIfFailed(fn: () => Promise, delayInMs: number) { + const retry = 1; + do { + try { + return await fn(); + } catch (err) { + this.emit('error', err); + if (delayInMs) { + await this.delay(delayInMs); + } else { + return; + } + } + } while (retry); + } + + protected async extendLocks(jobs: Job[]) { + await this.trace( + SpanKind.INTERNAL, + 'extendLocks', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerJobsToExtendLocks]: jobs.map( + job => job.id, + ), + }); + + try { + const pipeline = (await this.client).pipeline(); + for (const job of jobs) { + await this.scripts.extendLock( + job.id, + job.token, + this.opts.lockDuration, + pipeline, + ); + } + const result = (await pipeline.exec()) as [Error, string][]; + + for (const [err, jobId] of result) { + if (err) { + // TODO: signal process function that the job has been lost. + this.emit( + 'error', + new Error(`could not renew lock for job ${jobId}`), + ); + } + } + } catch (err) { + this.emit('error', err); + } + }, + ); + } + + private async moveStalledJobsToWait() { + await this.trace( + SpanKind.INTERNAL, + 'moveStalledJobsToWait', + this.name, + async span => { + const chunkSize = 50; + const [failed, stalled] = await this.scripts.moveStalledJobsToWait(); + + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerStalledJobs]: stalled, + [TelemetryAttributes.WorkerFailedJobs]: failed, + }); + + stalled.forEach((jobId: string) => { + span?.addEvent('job stalled', { + [TelemetryAttributes.JobId]: jobId, + }); + this.emit('stalled', jobId, 'active'); + }); + + const jobPromises: Promise>[] = []; + for (let i = 0; i < failed.length; i++) { + jobPromises.push( + Job.fromId( + this as MinimalQueue, + failed[i], + ), + ); + + if ((i + 1) % chunkSize === 0) { + this.notifyFailedJobs(await Promise.all(jobPromises)); + jobPromises.length = 0; + } + } + + this.notifyFailedJobs(await Promise.all(jobPromises)); + }, + ); + } + + private notifyFailedJobs( + failedJobs: Job[], + span?: Span, + ) { + const failedReason = 'job stalled more than allowable limit'; + + failedJobs.forEach((job: Job) => { + span?.addEvent('job failed', { + [TelemetryAttributes.JobId]: job.id, + [TelemetryAttributes.JobName]: job.name, + [TelemetryAttributes.JobFailedReason]: failedReason, + }); + this.emit('failed', job, new Error(failedReason), 'active'); + }); + } + + private moveLimitedBackToWait( + job: Job, + token: string, + ) { + return this.scripts.moveJobFromActiveToWait(job.id, token); + } +} From cb1fb5e1dcbea49126e9d8ea1c7ef1e0cdaa198b Mon Sep 17 00:00:00 2001 From: fgozdz Date: Thu, 7 Nov 2024 10:19:21 +0100 Subject: [PATCH 4/4] feat(worker): fix line endings --- src/classes/worker.ts | 2586 ++++++++++++++++++++--------------------- 1 file changed, 1293 insertions(+), 1293 deletions(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index a78e84d4ac..05146e9f7b 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -1,1293 +1,1293 @@ -import * as fs from 'fs'; -import { URL } from 'url'; -import { Redis } from 'ioredis'; -import * as path from 'path'; -import { v4 } from 'uuid'; - -// Note: this Polyfill is only needed for Node versions < 15.4.0 -import { AbortController } from 'node-abort-controller'; - -import { - GetNextJobOptions, - IoredisListener, - JobJsonRaw, - Processor, - RedisClient, - Span, - WorkerOptions, -} from '../interfaces'; -import { MinimalQueue } from '../types'; -import { - delay, - DELAY_TIME_1, - isNotConnectionError, - isRedisInstance, -} from '../utils'; -import { QueueBase } from './queue-base'; -import { Repeat } from './repeat'; -import { ChildPool } from './child-pool'; -import { Job } from './job'; -import { RedisConnection } from './redis-connection'; -import sandbox from './sandbox'; -import { AsyncFifoQueue } from './async-fifo-queue'; -import { - DelayedError, - RateLimitError, - RATE_LIMIT_ERROR, - WaitingChildrenError, -} from './errors'; -import { SpanKind, TelemetryAttributes } from '../enums'; -import { JobScheduler } from './job-scheduler'; - -// 10 seconds is the maximum time a BRPOPLPUSH can block. -const maximumBlockTimeout = 10; - -// 30 seconds is the maximum limit until. -const maximumLimitUntil = 30000; - -// note: sandboxed processors would also like to define concurrency per process -// for better resource utilization. - -export interface WorkerListener< - DataType = any, - ResultType = any, - NameType extends string = string, -> extends IoredisListener { - /** - * Listen to 'active' event. - * - * This event is triggered when a job enters the 'active' state. - */ - active: (job: Job, prev: string) => void; - - /** - * Listen to 'closing' event. - * - * This event is triggered when the worker is closed. - */ - closed: () => void; - - /** - * Listen to 'closing' event. - * - * This event is triggered when the worker is closing. - */ - closing: (msg: string) => void; - - /** - * Listen to 'completed' event. - * - * This event is triggered when a job has successfully completed. - */ - completed: ( - job: Job, - result: ResultType, - prev: string, - ) => void; - - /** - * Listen to 'drained' event. - * - * This event is triggered when the queue has drained the waiting list. - * Note that there could still be delayed jobs waiting their timers to expire - * and this event will still be triggered as long as the waiting list has emptied. - */ - drained: () => void; - - /** - * Listen to 'error' event. - * - * This event is triggered when an error is throw. - */ - error: (failedReason: Error) => void; - - /** - * Listen to 'failed' event. - * - * This event is triggered when a job has thrown an exception. - * Note: job parameter could be received as undefined when an stalled job - * reaches the stalled limit and it is deleted by the removeOnFail option. - */ - failed: ( - job: Job | undefined, - error: Error, - prev: string, - ) => void; - - /** - * Listen to 'paused' event. - * - * This event is triggered when the queue is paused. - */ - paused: () => void; - - /** - * Listen to 'progress' event. - * - * This event is triggered when a job updates it progress, i.e. the - * Job##updateProgress() method is called. This is useful to notify - * progress or any other data from within a processor to the rest of the - * world. - */ - progress: ( - job: Job, - progress: number | object, - ) => void; - - /** - * Listen to 'ready' event. - * - * This event is triggered when blockingConnection is ready. - */ - ready: () => void; - - /** - * Listen to 'resumed' event. - * - * This event is triggered when the queue is resumed. - */ - resumed: () => void; - - /** - * Listen to 'stalled' event. - * - * This event is triggered when a job has stalled and - * has been moved back to the wait list. - */ - stalled: (jobId: string, prev: string) => void; -} - -/** - * - * This class represents a worker that is able to process jobs from the queue. - * As soon as the class is instantiated and a connection to Redis is established - * it will start processing jobs. - * - */ -export class Worker< - DataType = any, - ResultType = any, - NameType extends string = string, -> extends QueueBase { - readonly opts: WorkerOptions; - readonly id: string; - - private abortDelayController: AbortController | null = null; - private asyncFifoQueue: AsyncFifoQueue>; - private blockingConnection: RedisConnection; - private blockUntil = 0; - private _concurrency: number; - private childPool: ChildPool; - private drained: boolean = false; - private extendLocksTimer: NodeJS.Timeout | null = null; - private limitUntil = 0; - private resumeWorker: () => void; - - private stalledCheckStopper?: () => void; - private waiting: Promise | null = null; - private _repeat: Repeat; // To be deprecated in v6 in favor of Job Scheduler - - private _jobScheduler: JobScheduler; - - protected paused: Promise; - protected processFn: Processor; - protected running = false; - - static RateLimitError(): Error { - return new RateLimitError(); - } - - constructor( - name: string, - processor?: string | URL | null | Processor, - opts?: WorkerOptions, - Connection?: typeof RedisConnection, - ) { - super( - name, - { - ...opts, - blockingConnection: true, - }, - Connection, - ); - - if (!opts || !opts.connection) { - throw new Error('Worker requires a connection'); - } - - this.opts = { - drainDelay: 5, - concurrency: 1, - lockDuration: 30000, - maxStalledCount: 1, - stalledInterval: 30000, - autorun: true, - runRetryDelay: 15000, - ...this.opts, - }; - - if (this.opts.stalledInterval <= 0) { - throw new Error('stalledInterval must be greater than 0'); - } - - if (this.opts.drainDelay <= 0) { - throw new Error('drainDelay must be greater than 0'); - } - - this.concurrency = this.opts.concurrency; - - this.opts.lockRenewTime = - this.opts.lockRenewTime || this.opts.lockDuration / 2; - - this.id = v4(); - - if (processor) { - if (typeof processor === 'function') { - this.processFn = processor; - } else { - // SANDBOXED - if (processor instanceof URL) { - if (!fs.existsSync(processor)) { - throw new Error( - `URL ${processor} does not exist in the local file system`, - ); - } - processor = processor.href; - } else { - const supportedFileTypes = ['.js', '.ts', '.flow', '.cjs']; - const processorFile = - processor + - (supportedFileTypes.includes(path.extname(processor)) ? '' : '.js'); - - if (!fs.existsSync(processorFile)) { - throw new Error(`File ${processorFile} does not exist`); - } - } - - // Separate paths so that bundling tools can resolve dependencies easier - const dirname = path.dirname(module.filename || __filename); - const workerThreadsMainFile = path.join(dirname, 'main-worker.js'); - const spawnProcessMainFile = path.join(dirname, 'main.js'); - - let mainFilePath = this.opts.useWorkerThreads - ? workerThreadsMainFile - : spawnProcessMainFile; - - try { - fs.statSync(mainFilePath); // would throw if file not exists - } catch (_) { - const mainFile = this.opts.useWorkerThreads - ? 'main-worker.js' - : 'main.js'; - mainFilePath = path.join( - process.cwd(), - `dist/cjs/classes/${mainFile}`, - ); - fs.statSync(mainFilePath); - } - - this.childPool = new ChildPool({ - mainFile: mainFilePath, - useWorkerThreads: this.opts.useWorkerThreads, - workerForkOptions: this.opts.workerForkOptions, - workerThreadsOptions: this.opts.workerThreadsOptions, - }); - - this.processFn = sandbox( - processor, - this.childPool, - ).bind(this); - } - - if (this.opts.autorun) { - this.run().catch(error => this.emit('error', error)); - } - } - - const connectionName = - this.clientName() + (this.opts.name ? `:w:${this.opts.name}` : ''); - this.blockingConnection = new RedisConnection( - isRedisInstance(opts.connection) - ? (opts.connection).duplicate({ connectionName }) - : { ...opts.connection, connectionName }, - false, - true, - opts.skipVersionCheck, - ); - this.blockingConnection.on('error', error => this.emit('error', error)); - this.blockingConnection.on('ready', () => - setTimeout(() => this.emit('ready'), 0), - ); - } - - emit>( - event: U, - ...args: Parameters[U]> - ): boolean { - return super.emit(event, ...args); - } - - off>( - eventName: U, - listener: WorkerListener[U], - ): this { - super.off(eventName, listener); - return this; - } - - on>( - event: U, - listener: WorkerListener[U], - ): this { - super.on(event, listener); - return this; - } - - once>( - event: U, - listener: WorkerListener[U], - ): this { - super.once(event, listener); - return this; - } - - protected callProcessJob( - job: Job, - token: string, - ): Promise { - return this.processFn(job, token); - } - - protected createJob( - data: JobJsonRaw, - jobId: string, - ): Job { - return this.Job.fromJSON(this as MinimalQueue, data, jobId) as Job< - DataType, - ResultType, - NameType - >; - } - - /** - * - * Waits until the worker is ready to start processing jobs. - * In general only useful when writing tests. - * - */ - async waitUntilReady(): Promise { - await super.waitUntilReady(); - return this.blockingConnection.client; - } - - set concurrency(concurrency: number) { - if ( - typeof concurrency !== 'number' || - concurrency < 1 || - !isFinite(concurrency) - ) { - throw new Error('concurrency must be a finite number greater than 0'); - } - this._concurrency = concurrency; - } - - get concurrency() { - return this._concurrency; - } - - get repeat(): Promise { - return new Promise(async resolve => { - if (!this._repeat) { - const connection = await this.client; - this._repeat = new Repeat(this.name, { - ...this.opts, - connection, - }); - this._repeat.on('error', e => this.emit.bind(this, e)); - } - resolve(this._repeat); - }); - } - - get jobScheduler(): Promise { - return new Promise(async resolve => { - if (!this._jobScheduler) { - const connection = await this.client; - this._jobScheduler = new JobScheduler(this.name, { - ...this.opts, - connection, - }); - this._jobScheduler.on('error', e => this.emit.bind(this, e)); - } - resolve(this._jobScheduler); - }); - } - - async run() { - if (!this.processFn) { - throw new Error('No process function is defined.'); - } - - if (this.running) { - throw new Error('Worker is already running.'); - } - - try { - this.running = true; - - if (this.closing) { - return; - } - - await this.startStalledCheckTimer(); - - const jobsInProgress = new Set<{ job: Job; ts: number }>(); - this.startLockExtenderTimer(jobsInProgress); - - const asyncFifoQueue = (this.asyncFifoQueue = - new AsyncFifoQueue>()); - - let tokenPostfix = 0; - - const client = await this.client; - const bclient = await this.blockingConnection.client; - - /** - * This is the main loop in BullMQ. Its goals are to fetch jobs from the queue - * as efficiently as possible, providing concurrency and minimal unnecessary calls - * to Redis. - */ - while (!this.closing) { - let numTotal = asyncFifoQueue.numTotal(); - - /** - * This inner loop tries to fetch jobs concurrently, but if we are waiting for a job - * to arrive at the queue we should not try to fetch more jobs (as it would be pointless) - */ - while ( - !this.waiting && - numTotal < this._concurrency && - (!this.limitUntil || numTotal == 0) - ) { - const token = `${this.id}:${tokenPostfix++}`; - - const fetchedJob = this.retryIfFailed>( - () => this._getNextJob(client, bclient, token, { block: true }), - this.opts.runRetryDelay, - ); - asyncFifoQueue.add(fetchedJob); - - numTotal = asyncFifoQueue.numTotal(); - - if (this.waiting && numTotal > 1) { - // We are waiting for jobs but we have others that we could start processing already - break; - } - - // We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls - // to Redis in high concurrency scenarios. - const job = await fetchedJob; - - // No more jobs waiting but we have others that could start processing already - if (!job && numTotal > 1) { - break; - } - - // If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting - // for processing this job. - if (this.blockUntil) { - break; - } - } - - // Since there can be undefined jobs in the queue (when a job fails or queue is empty) - // we iterate until we find a job. - let job: Job | void; - do { - job = await asyncFifoQueue.fetch(); - } while (!job && asyncFifoQueue.numQueued() > 0); - - if (job) { - const token = job.token; - asyncFifoQueue.add( - this.retryIfFailed>( - () => - this.processJob( - >job, - token, - () => asyncFifoQueue.numTotal() <= this._concurrency, - jobsInProgress, - ), - this.opts.runRetryDelay, - ), - ); - } - } - - this.running = false; - return await asyncFifoQueue.waitAll(); - } catch (error) { - this.running = false; - throw error; - } - } - - /** - * Returns a promise that resolves to the next job in queue. - * @param token - worker token to be assigned to retrieved job - * @returns a Job or undefined if no job was available in the queue. - */ - async getNextJob(token: string, { block = true }: GetNextJobOptions = {}) { - const nextJob = await this._getNextJob( - await this.client, - await this.blockingConnection.client, - token, - { block }, - ); - - return this.trace>( - SpanKind.INTERNAL, - 'getNextJob', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.QueueName]: this.name, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerOptions]: JSON.stringify({ block }), - [TelemetryAttributes.JobId]: nextJob?.id, - }); - - return nextJob; - }, - nextJob?.opts.telemetryMetadata, - ); - } - - private async _getNextJob( - client: RedisClient, - bclient: RedisClient, - token: string, - { block = true }: GetNextJobOptions = {}, - ): Promise | undefined> { - if (this.paused) { - if (block) { - await this.paused; - } else { - return; - } - } - - if (this.closing) { - return; - } - - if (this.drained && block && !this.limitUntil && !this.waiting) { - this.waiting = this.waitForJob(bclient, this.blockUntil); - try { - this.blockUntil = await this.waiting; - - if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 1) { - return this.moveToActive(client, token, this.opts.name); - } - } catch (err) { - // Swallow error if locally paused or closing since we did force a disconnection - if ( - !(this.paused || this.closing) && - isNotConnectionError(err) - ) { - throw err; - } - } finally { - this.waiting = null; - } - } else { - const limitUntil = this.limitUntil; - if (limitUntil) { - this.abortDelayController?.abort(); - this.abortDelayController = new AbortController(); - await this.delay( - this.getLimitUntil(limitUntil), - this.abortDelayController, - ); - } - return this.moveToActive(client, token, this.opts.name); - } - } - - /** - * Overrides the rate limit to be active for the next jobs. - * - * @param expireTimeMs - expire time in ms of this rate limit. - */ - async rateLimit(expireTimeMs: number): Promise { - await this.trace( - SpanKind.INTERNAL, - 'rateLimit', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerRateLimit]: expireTimeMs, - }); - - await this.client.then(client => - client.set( - this.keys.limiter, - Number.MAX_SAFE_INTEGER, - 'PX', - expireTimeMs, - ), - ); - }, - ); - } - - get minimumBlockTimeout(): number { - return this.blockingConnection.capabilities.canBlockFor1Ms - ? /* 1 millisecond is chosen because the granularity of our timestamps are milliseconds. -Obviously we can still process much faster than 1 job per millisecond but delays and rate limits -will never work with more accuracy than 1ms. */ - 0.001 - : 0.002; - } - - protected async moveToActive( - client: RedisClient, - token: string, - name?: string, - ): Promise> { - const [jobData, id, limitUntil, delayUntil] = - await this.scripts.moveToActive(client, token, name); - this.updateDelays(limitUntil, delayUntil); - - return this.nextJobFromJobData(jobData, id, token); - } - - private async waitForJob( - bclient: RedisClient, - blockUntil: number, - ): Promise { - if (this.paused) { - return Infinity; - } - - let timeout: NodeJS.Timeout; - try { - if (!this.closing && !this.limitUntil) { - let blockTimeout = this.getBlockTimeout(blockUntil); - - if (blockTimeout > 0) { - blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout - ? blockTimeout - : Math.ceil(blockTimeout); - - // We cannot trust that the blocking connection stays blocking forever - // due to issues in Redis and IORedis, so we will reconnect if we - // don't get a response in the expected time. - timeout = setTimeout(async () => { - bclient.disconnect(!this.closing); - }, blockTimeout * 1000 + 1000); - - this.updateDelays(); // reset delays to avoid reusing same values in next iteration - - // Markers should only be used for un-blocking, so we will handle them in this - // function only. - const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); - - if (result) { - const [_key, member, score] = result; - - if (member) { - return parseInt(score); - } - } - } - - return 0; - } - } catch (error) { - if (isNotConnectionError(error)) { - this.emit('error', error); - } - if (!this.closing) { - await this.delay(); - } - } finally { - clearTimeout(timeout); - } - return Infinity; - } - - protected getBlockTimeout(blockUntil: number): number { - const opts: WorkerOptions = this.opts; - - // when there are delayed jobs - if (blockUntil) { - const blockDelay = blockUntil - Date.now(); - // when we reach the time to get new jobs - if (blockDelay <= 0) { - return blockDelay; - } else if (blockDelay < this.minimumBlockTimeout * 1000) { - return this.minimumBlockTimeout; - } else { - // We restrict the maximum block timeout to 10 second to avoid - // blocking the connection for too long in the case of reconnections - // reference: https://github.com/taskforcesh/bullmq/issues/1658 - return Math.min(blockDelay / 1000, maximumBlockTimeout); - } - } else { - return Math.max(opts.drainDelay, this.minimumBlockTimeout); - } - } - - protected getLimitUntil(limitUntil: number): number { - // We restrict the maximum limit until to 30 second to - // be able to promote delayed jobs while queue is rate limited - return Math.min(limitUntil, maximumLimitUntil); - } - - /** - * - * This function is exposed only for testing purposes. - */ - async delay( - milliseconds?: number, - abortController?: AbortController, - ): Promise { - await delay(milliseconds || DELAY_TIME_1, abortController); - } - - private updateDelays(limitUntil = 0, delayUntil = 0) { - this.limitUntil = Math.max(limitUntil, 0) || 0; - this.blockUntil = Math.max(delayUntil, 0) || 0; - } - - protected async nextJobFromJobData( - jobData?: JobJsonRaw, - jobId?: string, - token?: string, - ): Promise> { - if (!jobData) { - if (!this.drained) { - this.emit('drained'); - this.drained = true; - } - } else { - this.drained = false; - const job = this.createJob(jobData, jobId); - job.token = token; - - // Add next scheduled job if necessary. - if (job.opts.repeat) { - // Use new job scheduler if possible - if (job.repeatJobKey) { - const jobScheduler = await this.jobScheduler; - await jobScheduler.upsertJobScheduler( - job.repeatJobKey, - job.opts.repeat, - job.name, - job.data, - job.opts, - { override: false }, - ); - } else { - const repeat = await this.repeat; - await repeat.updateRepeatableJob(job.name, job.data, job.opts, { - override: false, - }); - } - } - return job; - } - } - - async processJob( - job: Job, - token: string, - fetchNextCallback = () => true, - jobsInProgress: Set<{ job: Job; ts: number }>, - ): Promise> { - if (!job || this.closing || this.paused) { - return; - } - - const { telemetryMetadata: srcPropagationMedatada } = job.opts; - - return this.trace>( - SpanKind.CONSUMER, - 'process', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.JobId]: job.id, - }); - - const handleCompleted = async (result: ResultType) => { - if (!this.connection.closing) { - const completed = await job.moveToCompleted( - result, - token, - fetchNextCallback() && !(this.closing || this.paused), - ); - this.emit('completed', job, result, 'active'); - - span?.addEvent('job completed', { - [TelemetryAttributes.JobResult]: JSON.stringify(result), - }); - - const [jobData, jobId, limitUntil, delayUntil] = completed || []; - this.updateDelays(limitUntil, delayUntil); - - return this.nextJobFromJobData(jobData, jobId, token); - } - }; - - const handleFailed = async (err: Error) => { - if (!this.connection.closing) { - try { - // Check if the job was manually rate-limited - if (err.message == RATE_LIMIT_ERROR) { - this.limitUntil = await this.moveLimitedBackToWait(job, token); - return; - } - - if ( - err instanceof DelayedError || - err.name == 'DelayedError' || - err instanceof WaitingChildrenError || - err.name == 'WaitingChildrenError' - ) { - return; - } - - const result = await job.moveToFailed(err, token, true); - this.emit('failed', job, err, 'active'); - - span?.addEvent('job failed', { - [TelemetryAttributes.JobFailedReason]: err.message, - }); - - if (result) { - const [jobData, jobId, limitUntil, delayUntil] = result; - this.updateDelays(limitUntil, delayUntil); - return this.nextJobFromJobData(jobData, jobId, token); - } - } catch (err) { - this.emit('error', err); - // It probably means that the job has lost the lock before completion - // A worker will (or already has) moved the job back - // to the waiting list (as stalled) - span?.recordException((err).message); - } - } - }; - - this.emit('active', job, 'waiting'); - - const processedOn = Date.now(); - const inProgressItem = { job, ts: processedOn }; - - try { - jobsInProgress.add(inProgressItem); - const result = await this.callProcessJob(job, token); - return await handleCompleted(result); - } catch (err) { - const failed = await handleFailed(err); - return failed; - } finally { - span?.setAttributes({ - [TelemetryAttributes.JobFinishedTimestamp]: Date.now(), - [TelemetryAttributes.JobProcessedTimestamp]: processedOn, - }); - - jobsInProgress.delete(inProgressItem); - } - }, - srcPropagationMedatada, - ); - } - - /** - * - * Pauses the processing of this queue only for this worker. - */ - async pause(doNotWaitActive?: boolean): Promise { - await this.trace( - SpanKind.INTERNAL, - 'pause', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerDoNotWaitActive]: doNotWaitActive, - }); - - if (!this.paused) { - this.paused = new Promise(resolve => { - this.resumeWorker = function () { - resolve(); - this.paused = null; // Allow pause to be checked externally for paused state. - this.resumeWorker = null; - }; - }); - await (!doNotWaitActive && this.whenCurrentJobsFinished()); - this.emit('paused'); - } - }, - ); - } - - /** - * - * Resumes processing of this worker (if paused). - */ - resume(): void { - if (this.resumeWorker) { - this.trace(SpanKind.INTERNAL, 'resume', this.name, span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - }); - - this.resumeWorker(); - this.emit('resumed'); - }); - } - } - - /** - * - * Checks if worker is paused. - * - * @returns true if worker is paused, false otherwise. - */ - isPaused(): boolean { - return !!this.paused; - } - - /** - * - * Checks if worker is currently running. - * - * @returns true if worker is running, false otherwise. - */ - isRunning(): boolean { - return this.running; - } - - /** - * - * Closes the worker and related redis connections. - * - * This method waits for current jobs to finalize before returning. - * - * @param force - Use force boolean parameter if you do not want to wait for - * current jobs to be processed. - * - * @returns Promise that resolves when the worker has been closed. - */ - async close(force = false): Promise { - if (this.closing) { - return this.closing; - } - - await this.trace( - SpanKind.INTERNAL, - 'close', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerForceClose]: force, - }); - - this.closing = (async () => { - this.emit('closing', 'closing queue'); - this.abortDelayController?.abort(); - - this.resume(); - - // Define the async cleanup functions - const asyncCleanups = [ - () => { - return force || this.whenCurrentJobsFinished(false); - }, - () => this.childPool?.clean(), - () => this.blockingConnection.close(force), - () => this.connection.close(force), - ]; - - // Run cleanup functions sequentially and make sure all are run despite any errors - for (const cleanup of asyncCleanups) { - try { - await cleanup(); - } catch (err) { - this.emit('error', err); - } - } - - clearTimeout(this.extendLocksTimer); - //clearTimeout(this.stalledCheckTimer); - this.stalledCheckStopper?.(); - - this.closed = true; - this.emit('closed'); - })(); - - return await this.closing; - }, - ); - } - - /** - * - * Manually starts the stalled checker. - * The check will run once as soon as this method is called, and - * then every opts.stalledInterval milliseconds until the worker is closed. - * Note: Normally you do not need to call this method, since the stalled checker - * is automatically started when the worker starts processing jobs after - * calling run. However if you want to process the jobs manually you need - * to call this method to start the stalled checker. - * - * @see {@link https://docs.bullmq.io/patterns/manually-fetching-jobs} - */ - async startStalledCheckTimer(): Promise { - if (!this.opts.skipStalledCheck) { - if (!this.closing) { - await this.trace( - SpanKind.INTERNAL, - 'startStalledCheckTimer', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - }); - - this.stalledChecker().catch(err => { - this.emit('error', err); - }); - }, - ); - } - } - } - - private async stalledChecker() { - while (!this.closing) { - try { - await this.checkConnectionError(() => this.moveStalledJobsToWait()); - } catch (err) { - this.emit('error', err); - } - - await new Promise(resolve => { - const timeout = setTimeout(resolve, this.opts.stalledInterval); - this.stalledCheckStopper = () => { - clearTimeout(timeout); - resolve(); - }; - }); - } - } - - private startLockExtenderTimer( - jobsInProgress: Set<{ job: Job; ts: number }>, - ): void { - if (!this.opts.skipLockRenewal) { - clearTimeout(this.extendLocksTimer); - - if (!this.closed) { - this.extendLocksTimer = setTimeout(async () => { - // Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime - const now = Date.now(); - const jobsToExtend = []; - - for (const item of jobsInProgress) { - const { job, ts } = item; - if (!ts) { - item.ts = now; - continue; - } - - if (ts + this.opts.lockRenewTime / 2 < now) { - item.ts = now; - jobsToExtend.push(job); - } - } - - try { - if (jobsToExtend.length) { - await this.extendLocks(jobsToExtend); - } - } catch (err) { - this.emit('error', err); - } - - this.startLockExtenderTimer(jobsInProgress); - }, this.opts.lockRenewTime / 2); - } - } - } - - /** - * Returns a promise that resolves when active jobs are cleared - * - * @returns - */ - private async whenCurrentJobsFinished(reconnect = true) { - // - // Force reconnection of blocking connection to abort blocking redis call immediately. - // - if (this.waiting) { - // If we are not going to reconnect, we will not wait for the disconnection. - await this.blockingConnection.disconnect(reconnect); - } else { - reconnect = false; - } - - if (this.asyncFifoQueue) { - await this.asyncFifoQueue.waitAll(); - } - - reconnect && (await this.blockingConnection.reconnect()); - } - - private async retryIfFailed(fn: () => Promise, delayInMs: number) { - const retry = 1; - do { - try { - return await fn(); - } catch (err) { - this.emit('error', err); - if (delayInMs) { - await this.delay(delayInMs); - } else { - return; - } - } - } while (retry); - } - - protected async extendLocks(jobs: Job[]) { - await this.trace( - SpanKind.INTERNAL, - 'extendLocks', - this.name, - async span => { - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerJobsToExtendLocks]: jobs.map( - job => job.id, - ), - }); - - try { - const pipeline = (await this.client).pipeline(); - for (const job of jobs) { - await this.scripts.extendLock( - job.id, - job.token, - this.opts.lockDuration, - pipeline, - ); - } - const result = (await pipeline.exec()) as [Error, string][]; - - for (const [err, jobId] of result) { - if (err) { - // TODO: signal process function that the job has been lost. - this.emit( - 'error', - new Error(`could not renew lock for job ${jobId}`), - ); - } - } - } catch (err) { - this.emit('error', err); - } - }, - ); - } - - private async moveStalledJobsToWait() { - await this.trace( - SpanKind.INTERNAL, - 'moveStalledJobsToWait', - this.name, - async span => { - const chunkSize = 50; - const [failed, stalled] = await this.scripts.moveStalledJobsToWait(); - - span?.setAttributes({ - [TelemetryAttributes.WorkerId]: this.id, - [TelemetryAttributes.WorkerName]: this.opts.name, - [TelemetryAttributes.WorkerStalledJobs]: stalled, - [TelemetryAttributes.WorkerFailedJobs]: failed, - }); - - stalled.forEach((jobId: string) => { - span?.addEvent('job stalled', { - [TelemetryAttributes.JobId]: jobId, - }); - this.emit('stalled', jobId, 'active'); - }); - - const jobPromises: Promise>[] = []; - for (let i = 0; i < failed.length; i++) { - jobPromises.push( - Job.fromId( - this as MinimalQueue, - failed[i], - ), - ); - - if ((i + 1) % chunkSize === 0) { - this.notifyFailedJobs(await Promise.all(jobPromises)); - jobPromises.length = 0; - } - } - - this.notifyFailedJobs(await Promise.all(jobPromises)); - }, - ); - } - - private notifyFailedJobs( - failedJobs: Job[], - span?: Span, - ) { - const failedReason = 'job stalled more than allowable limit'; - - failedJobs.forEach((job: Job) => { - span?.addEvent('job failed', { - [TelemetryAttributes.JobId]: job.id, - [TelemetryAttributes.JobName]: job.name, - [TelemetryAttributes.JobFailedReason]: failedReason, - }); - this.emit('failed', job, new Error(failedReason), 'active'); - }); - } - - private moveLimitedBackToWait( - job: Job, - token: string, - ) { - return this.scripts.moveJobFromActiveToWait(job.id, token); - } -} +import * as fs from 'fs'; +import { URL } from 'url'; +import { Redis } from 'ioredis'; +import * as path from 'path'; +import { v4 } from 'uuid'; + +// Note: this Polyfill is only needed for Node versions < 15.4.0 +import { AbortController } from 'node-abort-controller'; + +import { + GetNextJobOptions, + IoredisListener, + JobJsonRaw, + Processor, + RedisClient, + Span, + WorkerOptions, +} from '../interfaces'; +import { MinimalQueue } from '../types'; +import { + delay, + DELAY_TIME_1, + isNotConnectionError, + isRedisInstance, +} from '../utils'; +import { QueueBase } from './queue-base'; +import { Repeat } from './repeat'; +import { ChildPool } from './child-pool'; +import { Job } from './job'; +import { RedisConnection } from './redis-connection'; +import sandbox from './sandbox'; +import { AsyncFifoQueue } from './async-fifo-queue'; +import { + DelayedError, + RateLimitError, + RATE_LIMIT_ERROR, + WaitingChildrenError, +} from './errors'; +import { SpanKind, TelemetryAttributes } from '../enums'; +import { JobScheduler } from './job-scheduler'; + +// 10 seconds is the maximum time a BRPOPLPUSH can block. +const maximumBlockTimeout = 10; + +// 30 seconds is the maximum limit until. +const maximumLimitUntil = 30000; + +// note: sandboxed processors would also like to define concurrency per process +// for better resource utilization. + +export interface WorkerListener< + DataType = any, + ResultType = any, + NameType extends string = string, +> extends IoredisListener { + /** + * Listen to 'active' event. + * + * This event is triggered when a job enters the 'active' state. + */ + active: (job: Job, prev: string) => void; + + /** + * Listen to 'closing' event. + * + * This event is triggered when the worker is closed. + */ + closed: () => void; + + /** + * Listen to 'closing' event. + * + * This event is triggered when the worker is closing. + */ + closing: (msg: string) => void; + + /** + * Listen to 'completed' event. + * + * This event is triggered when a job has successfully completed. + */ + completed: ( + job: Job, + result: ResultType, + prev: string, + ) => void; + + /** + * Listen to 'drained' event. + * + * This event is triggered when the queue has drained the waiting list. + * Note that there could still be delayed jobs waiting their timers to expire + * and this event will still be triggered as long as the waiting list has emptied. + */ + drained: () => void; + + /** + * Listen to 'error' event. + * + * This event is triggered when an error is throw. + */ + error: (failedReason: Error) => void; + + /** + * Listen to 'failed' event. + * + * This event is triggered when a job has thrown an exception. + * Note: job parameter could be received as undefined when an stalled job + * reaches the stalled limit and it is deleted by the removeOnFail option. + */ + failed: ( + job: Job | undefined, + error: Error, + prev: string, + ) => void; + + /** + * Listen to 'paused' event. + * + * This event is triggered when the queue is paused. + */ + paused: () => void; + + /** + * Listen to 'progress' event. + * + * This event is triggered when a job updates it progress, i.e. the + * Job##updateProgress() method is called. This is useful to notify + * progress or any other data from within a processor to the rest of the + * world. + */ + progress: ( + job: Job, + progress: number | object, + ) => void; + + /** + * Listen to 'ready' event. + * + * This event is triggered when blockingConnection is ready. + */ + ready: () => void; + + /** + * Listen to 'resumed' event. + * + * This event is triggered when the queue is resumed. + */ + resumed: () => void; + + /** + * Listen to 'stalled' event. + * + * This event is triggered when a job has stalled and + * has been moved back to the wait list. + */ + stalled: (jobId: string, prev: string) => void; +} + +/** + * + * This class represents a worker that is able to process jobs from the queue. + * As soon as the class is instantiated and a connection to Redis is established + * it will start processing jobs. + * + */ +export class Worker< + DataType = any, + ResultType = any, + NameType extends string = string, +> extends QueueBase { + readonly opts: WorkerOptions; + readonly id: string; + + private abortDelayController: AbortController | null = null; + private asyncFifoQueue: AsyncFifoQueue>; + private blockingConnection: RedisConnection; + private blockUntil = 0; + private _concurrency: number; + private childPool: ChildPool; + private drained: boolean = false; + private extendLocksTimer: NodeJS.Timeout | null = null; + private limitUntil = 0; + private resumeWorker: () => void; + + private stalledCheckStopper?: () => void; + private waiting: Promise | null = null; + private _repeat: Repeat; // To be deprecated in v6 in favor of Job Scheduler + + private _jobScheduler: JobScheduler; + + protected paused: Promise; + protected processFn: Processor; + protected running = false; + + static RateLimitError(): Error { + return new RateLimitError(); + } + + constructor( + name: string, + processor?: string | URL | null | Processor, + opts?: WorkerOptions, + Connection?: typeof RedisConnection, + ) { + super( + name, + { + ...opts, + blockingConnection: true, + }, + Connection, + ); + + if (!opts || !opts.connection) { + throw new Error('Worker requires a connection'); + } + + this.opts = { + drainDelay: 5, + concurrency: 1, + lockDuration: 30000, + maxStalledCount: 1, + stalledInterval: 30000, + autorun: true, + runRetryDelay: 15000, + ...this.opts, + }; + + if (this.opts.stalledInterval <= 0) { + throw new Error('stalledInterval must be greater than 0'); + } + + if (this.opts.drainDelay <= 0) { + throw new Error('drainDelay must be greater than 0'); + } + + this.concurrency = this.opts.concurrency; + + this.opts.lockRenewTime = + this.opts.lockRenewTime || this.opts.lockDuration / 2; + + this.id = v4(); + + if (processor) { + if (typeof processor === 'function') { + this.processFn = processor; + } else { + // SANDBOXED + if (processor instanceof URL) { + if (!fs.existsSync(processor)) { + throw new Error( + `URL ${processor} does not exist in the local file system`, + ); + } + processor = processor.href; + } else { + const supportedFileTypes = ['.js', '.ts', '.flow', '.cjs']; + const processorFile = + processor + + (supportedFileTypes.includes(path.extname(processor)) ? '' : '.js'); + + if (!fs.existsSync(processorFile)) { + throw new Error(`File ${processorFile} does not exist`); + } + } + + // Separate paths so that bundling tools can resolve dependencies easier + const dirname = path.dirname(module.filename || __filename); + const workerThreadsMainFile = path.join(dirname, 'main-worker.js'); + const spawnProcessMainFile = path.join(dirname, 'main.js'); + + let mainFilePath = this.opts.useWorkerThreads + ? workerThreadsMainFile + : spawnProcessMainFile; + + try { + fs.statSync(mainFilePath); // would throw if file not exists + } catch (_) { + const mainFile = this.opts.useWorkerThreads + ? 'main-worker.js' + : 'main.js'; + mainFilePath = path.join( + process.cwd(), + `dist/cjs/classes/${mainFile}`, + ); + fs.statSync(mainFilePath); + } + + this.childPool = new ChildPool({ + mainFile: mainFilePath, + useWorkerThreads: this.opts.useWorkerThreads, + workerForkOptions: this.opts.workerForkOptions, + workerThreadsOptions: this.opts.workerThreadsOptions, + }); + + this.processFn = sandbox( + processor, + this.childPool, + ).bind(this); + } + + if (this.opts.autorun) { + this.run().catch(error => this.emit('error', error)); + } + } + + const connectionName = + this.clientName() + (this.opts.name ? `:w:${this.opts.name}` : ''); + this.blockingConnection = new RedisConnection( + isRedisInstance(opts.connection) + ? (opts.connection).duplicate({ connectionName }) + : { ...opts.connection, connectionName }, + false, + true, + opts.skipVersionCheck, + ); + this.blockingConnection.on('error', error => this.emit('error', error)); + this.blockingConnection.on('ready', () => + setTimeout(() => this.emit('ready'), 0), + ); + } + + emit>( + event: U, + ...args: Parameters[U]> + ): boolean { + return super.emit(event, ...args); + } + + off>( + eventName: U, + listener: WorkerListener[U], + ): this { + super.off(eventName, listener); + return this; + } + + on>( + event: U, + listener: WorkerListener[U], + ): this { + super.on(event, listener); + return this; + } + + once>( + event: U, + listener: WorkerListener[U], + ): this { + super.once(event, listener); + return this; + } + + protected callProcessJob( + job: Job, + token: string, + ): Promise { + return this.processFn(job, token); + } + + protected createJob( + data: JobJsonRaw, + jobId: string, + ): Job { + return this.Job.fromJSON(this as MinimalQueue, data, jobId) as Job< + DataType, + ResultType, + NameType + >; + } + + /** + * + * Waits until the worker is ready to start processing jobs. + * In general only useful when writing tests. + * + */ + async waitUntilReady(): Promise { + await super.waitUntilReady(); + return this.blockingConnection.client; + } + + set concurrency(concurrency: number) { + if ( + typeof concurrency !== 'number' || + concurrency < 1 || + !isFinite(concurrency) + ) { + throw new Error('concurrency must be a finite number greater than 0'); + } + this._concurrency = concurrency; + } + + get concurrency() { + return this._concurrency; + } + + get repeat(): Promise { + return new Promise(async resolve => { + if (!this._repeat) { + const connection = await this.client; + this._repeat = new Repeat(this.name, { + ...this.opts, + connection, + }); + this._repeat.on('error', e => this.emit.bind(this, e)); + } + resolve(this._repeat); + }); + } + + get jobScheduler(): Promise { + return new Promise(async resolve => { + if (!this._jobScheduler) { + const connection = await this.client; + this._jobScheduler = new JobScheduler(this.name, { + ...this.opts, + connection, + }); + this._jobScheduler.on('error', e => this.emit.bind(this, e)); + } + resolve(this._jobScheduler); + }); + } + + async run() { + if (!this.processFn) { + throw new Error('No process function is defined.'); + } + + if (this.running) { + throw new Error('Worker is already running.'); + } + + try { + this.running = true; + + if (this.closing) { + return; + } + + await this.startStalledCheckTimer(); + + const jobsInProgress = new Set<{ job: Job; ts: number }>(); + this.startLockExtenderTimer(jobsInProgress); + + const asyncFifoQueue = (this.asyncFifoQueue = + new AsyncFifoQueue>()); + + let tokenPostfix = 0; + + const client = await this.client; + const bclient = await this.blockingConnection.client; + + /** + * This is the main loop in BullMQ. Its goals are to fetch jobs from the queue + * as efficiently as possible, providing concurrency and minimal unnecessary calls + * to Redis. + */ + while (!this.closing) { + let numTotal = asyncFifoQueue.numTotal(); + + /** + * This inner loop tries to fetch jobs concurrently, but if we are waiting for a job + * to arrive at the queue we should not try to fetch more jobs (as it would be pointless) + */ + while ( + !this.waiting && + numTotal < this._concurrency && + (!this.limitUntil || numTotal == 0) + ) { + const token = `${this.id}:${tokenPostfix++}`; + + const fetchedJob = this.retryIfFailed>( + () => this._getNextJob(client, bclient, token, { block: true }), + this.opts.runRetryDelay, + ); + asyncFifoQueue.add(fetchedJob); + + numTotal = asyncFifoQueue.numTotal(); + + if (this.waiting && numTotal > 1) { + // We are waiting for jobs but we have others that we could start processing already + break; + } + + // We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls + // to Redis in high concurrency scenarios. + const job = await fetchedJob; + + // No more jobs waiting but we have others that could start processing already + if (!job && numTotal > 1) { + break; + } + + // If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting + // for processing this job. + if (this.blockUntil) { + break; + } + } + + // Since there can be undefined jobs in the queue (when a job fails or queue is empty) + // we iterate until we find a job. + let job: Job | void; + do { + job = await asyncFifoQueue.fetch(); + } while (!job && asyncFifoQueue.numQueued() > 0); + + if (job) { + const token = job.token; + asyncFifoQueue.add( + this.retryIfFailed>( + () => + this.processJob( + >job, + token, + () => asyncFifoQueue.numTotal() <= this._concurrency, + jobsInProgress, + ), + this.opts.runRetryDelay, + ), + ); + } + } + + this.running = false; + return await asyncFifoQueue.waitAll(); + } catch (error) { + this.running = false; + throw error; + } + } + + /** + * Returns a promise that resolves to the next job in queue. + * @param token - worker token to be assigned to retrieved job + * @returns a Job or undefined if no job was available in the queue. + */ + async getNextJob(token: string, { block = true }: GetNextJobOptions = {}) { + const nextJob = await this._getNextJob( + await this.client, + await this.blockingConnection.client, + token, + { block }, + ); + + return this.trace>( + SpanKind.INTERNAL, + 'getNextJob', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.QueueName]: this.name, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerOptions]: JSON.stringify({ block }), + [TelemetryAttributes.JobId]: nextJob?.id, + }); + + return nextJob; + }, + nextJob?.opts.telemetryMetadata, + ); + } + + private async _getNextJob( + client: RedisClient, + bclient: RedisClient, + token: string, + { block = true }: GetNextJobOptions = {}, + ): Promise | undefined> { + if (this.paused) { + if (block) { + await this.paused; + } else { + return; + } + } + + if (this.closing) { + return; + } + + if (this.drained && block && !this.limitUntil && !this.waiting) { + this.waiting = this.waitForJob(bclient, this.blockUntil); + try { + this.blockUntil = await this.waiting; + + if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 1) { + return this.moveToActive(client, token, this.opts.name); + } + } catch (err) { + // Swallow error if locally paused or closing since we did force a disconnection + if ( + !(this.paused || this.closing) && + isNotConnectionError(err) + ) { + throw err; + } + } finally { + this.waiting = null; + } + } else { + const limitUntil = this.limitUntil; + if (limitUntil) { + this.abortDelayController?.abort(); + this.abortDelayController = new AbortController(); + await this.delay( + this.getLimitUntil(limitUntil), + this.abortDelayController, + ); + } + return this.moveToActive(client, token, this.opts.name); + } + } + + /** + * Overrides the rate limit to be active for the next jobs. + * + * @param expireTimeMs - expire time in ms of this rate limit. + */ + async rateLimit(expireTimeMs: number): Promise { + await this.trace( + SpanKind.INTERNAL, + 'rateLimit', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerRateLimit]: expireTimeMs, + }); + + await this.client.then(client => + client.set( + this.keys.limiter, + Number.MAX_SAFE_INTEGER, + 'PX', + expireTimeMs, + ), + ); + }, + ); + } + + get minimumBlockTimeout(): number { + return this.blockingConnection.capabilities.canBlockFor1Ms + ? /* 1 millisecond is chosen because the granularity of our timestamps are milliseconds. +Obviously we can still process much faster than 1 job per millisecond but delays and rate limits +will never work with more accuracy than 1ms. */ + 0.001 + : 0.002; + } + + protected async moveToActive( + client: RedisClient, + token: string, + name?: string, + ): Promise> { + const [jobData, id, limitUntil, delayUntil] = + await this.scripts.moveToActive(client, token, name); + this.updateDelays(limitUntil, delayUntil); + + return this.nextJobFromJobData(jobData, id, token); + } + + private async waitForJob( + bclient: RedisClient, + blockUntil: number, + ): Promise { + if (this.paused) { + return Infinity; + } + + let timeout: NodeJS.Timeout; + try { + if (!this.closing && !this.limitUntil) { + let blockTimeout = this.getBlockTimeout(blockUntil); + + if (blockTimeout > 0) { + blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout + ? blockTimeout + : Math.ceil(blockTimeout); + + // We cannot trust that the blocking connection stays blocking forever + // due to issues in Redis and IORedis, so we will reconnect if we + // don't get a response in the expected time. + timeout = setTimeout(async () => { + bclient.disconnect(!this.closing); + }, blockTimeout * 1000 + 1000); + + this.updateDelays(); // reset delays to avoid reusing same values in next iteration + + // Markers should only be used for un-blocking, so we will handle them in this + // function only. + const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); + + if (result) { + const [_key, member, score] = result; + + if (member) { + return parseInt(score); + } + } + } + + return 0; + } + } catch (error) { + if (isNotConnectionError(error)) { + this.emit('error', error); + } + if (!this.closing) { + await this.delay(); + } + } finally { + clearTimeout(timeout); + } + return Infinity; + } + + protected getBlockTimeout(blockUntil: number): number { + const opts: WorkerOptions = this.opts; + + // when there are delayed jobs + if (blockUntil) { + const blockDelay = blockUntil - Date.now(); + // when we reach the time to get new jobs + if (blockDelay <= 0) { + return blockDelay; + } else if (blockDelay < this.minimumBlockTimeout * 1000) { + return this.minimumBlockTimeout; + } else { + // We restrict the maximum block timeout to 10 second to avoid + // blocking the connection for too long in the case of reconnections + // reference: https://github.com/taskforcesh/bullmq/issues/1658 + return Math.min(blockDelay / 1000, maximumBlockTimeout); + } + } else { + return Math.max(opts.drainDelay, this.minimumBlockTimeout); + } + } + + protected getLimitUntil(limitUntil: number): number { + // We restrict the maximum limit until to 30 second to + // be able to promote delayed jobs while queue is rate limited + return Math.min(limitUntil, maximumLimitUntil); + } + + /** + * + * This function is exposed only for testing purposes. + */ + async delay( + milliseconds?: number, + abortController?: AbortController, + ): Promise { + await delay(milliseconds || DELAY_TIME_1, abortController); + } + + private updateDelays(limitUntil = 0, delayUntil = 0) { + this.limitUntil = Math.max(limitUntil, 0) || 0; + this.blockUntil = Math.max(delayUntil, 0) || 0; + } + + protected async nextJobFromJobData( + jobData?: JobJsonRaw, + jobId?: string, + token?: string, + ): Promise> { + if (!jobData) { + if (!this.drained) { + this.emit('drained'); + this.drained = true; + } + } else { + this.drained = false; + const job = this.createJob(jobData, jobId); + job.token = token; + + // Add next scheduled job if necessary. + if (job.opts.repeat) { + // Use new job scheduler if possible + if (job.repeatJobKey) { + const jobScheduler = await this.jobScheduler; + await jobScheduler.upsertJobScheduler( + job.repeatJobKey, + job.opts.repeat, + job.name, + job.data, + job.opts, + { override: false }, + ); + } else { + const repeat = await this.repeat; + await repeat.updateRepeatableJob(job.name, job.data, job.opts, { + override: false, + }); + } + } + return job; + } + } + + async processJob( + job: Job, + token: string, + fetchNextCallback = () => true, + jobsInProgress: Set<{ job: Job; ts: number }>, + ): Promise> { + if (!job || this.closing || this.paused) { + return; + } + + const { telemetryMetadata: srcPropagationMedatada } = job.opts; + + return this.trace>( + SpanKind.CONSUMER, + 'process', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.JobId]: job.id, + }); + + const handleCompleted = async (result: ResultType) => { + if (!this.connection.closing) { + const completed = await job.moveToCompleted( + result, + token, + fetchNextCallback() && !(this.closing || this.paused), + ); + this.emit('completed', job, result, 'active'); + + span?.addEvent('job completed', { + [TelemetryAttributes.JobResult]: JSON.stringify(result), + }); + + const [jobData, jobId, limitUntil, delayUntil] = completed || []; + this.updateDelays(limitUntil, delayUntil); + + return this.nextJobFromJobData(jobData, jobId, token); + } + }; + + const handleFailed = async (err: Error) => { + if (!this.connection.closing) { + try { + // Check if the job was manually rate-limited + if (err.message == RATE_LIMIT_ERROR) { + this.limitUntil = await this.moveLimitedBackToWait(job, token); + return; + } + + if ( + err instanceof DelayedError || + err.name == 'DelayedError' || + err instanceof WaitingChildrenError || + err.name == 'WaitingChildrenError' + ) { + return; + } + + const result = await job.moveToFailed(err, token, true); + this.emit('failed', job, err, 'active'); + + span?.addEvent('job failed', { + [TelemetryAttributes.JobFailedReason]: err.message, + }); + + if (result) { + const [jobData, jobId, limitUntil, delayUntil] = result; + this.updateDelays(limitUntil, delayUntil); + return this.nextJobFromJobData(jobData, jobId, token); + } + } catch (err) { + this.emit('error', err); + // It probably means that the job has lost the lock before completion + // A worker will (or already has) moved the job back + // to the waiting list (as stalled) + span?.recordException((err).message); + } + } + }; + + this.emit('active', job, 'waiting'); + + const processedOn = Date.now(); + const inProgressItem = { job, ts: processedOn }; + + try { + jobsInProgress.add(inProgressItem); + const result = await this.callProcessJob(job, token); + return await handleCompleted(result); + } catch (err) { + const failed = await handleFailed(err); + return failed; + } finally { + span?.setAttributes({ + [TelemetryAttributes.JobFinishedTimestamp]: Date.now(), + [TelemetryAttributes.JobProcessedTimestamp]: processedOn, + }); + + jobsInProgress.delete(inProgressItem); + } + }, + srcPropagationMedatada, + ); + } + + /** + * + * Pauses the processing of this queue only for this worker. + */ + async pause(doNotWaitActive?: boolean): Promise { + await this.trace( + SpanKind.INTERNAL, + 'pause', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerDoNotWaitActive]: doNotWaitActive, + }); + + if (!this.paused) { + this.paused = new Promise(resolve => { + this.resumeWorker = function () { + resolve(); + this.paused = null; // Allow pause to be checked externally for paused state. + this.resumeWorker = null; + }; + }); + await (!doNotWaitActive && this.whenCurrentJobsFinished()); + this.emit('paused'); + } + }, + ); + } + + /** + * + * Resumes processing of this worker (if paused). + */ + resume(): void { + if (this.resumeWorker) { + this.trace(SpanKind.INTERNAL, 'resume', this.name, span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + }); + + this.resumeWorker(); + this.emit('resumed'); + }); + } + } + + /** + * + * Checks if worker is paused. + * + * @returns true if worker is paused, false otherwise. + */ + isPaused(): boolean { + return !!this.paused; + } + + /** + * + * Checks if worker is currently running. + * + * @returns true if worker is running, false otherwise. + */ + isRunning(): boolean { + return this.running; + } + + /** + * + * Closes the worker and related redis connections. + * + * This method waits for current jobs to finalize before returning. + * + * @param force - Use force boolean parameter if you do not want to wait for + * current jobs to be processed. + * + * @returns Promise that resolves when the worker has been closed. + */ + async close(force = false): Promise { + if (this.closing) { + return this.closing; + } + + await this.trace( + SpanKind.INTERNAL, + 'close', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerForceClose]: force, + }); + + this.closing = (async () => { + this.emit('closing', 'closing queue'); + this.abortDelayController?.abort(); + + this.resume(); + + // Define the async cleanup functions + const asyncCleanups = [ + () => { + return force || this.whenCurrentJobsFinished(false); + }, + () => this.childPool?.clean(), + () => this.blockingConnection.close(force), + () => this.connection.close(force), + ]; + + // Run cleanup functions sequentially and make sure all are run despite any errors + for (const cleanup of asyncCleanups) { + try { + await cleanup(); + } catch (err) { + this.emit('error', err); + } + } + + clearTimeout(this.extendLocksTimer); + //clearTimeout(this.stalledCheckTimer); + this.stalledCheckStopper?.(); + + this.closed = true; + this.emit('closed'); + })(); + + return await this.closing; + }, + ); + } + + /** + * + * Manually starts the stalled checker. + * The check will run once as soon as this method is called, and + * then every opts.stalledInterval milliseconds until the worker is closed. + * Note: Normally you do not need to call this method, since the stalled checker + * is automatically started when the worker starts processing jobs after + * calling run. However if you want to process the jobs manually you need + * to call this method to start the stalled checker. + * + * @see {@link https://docs.bullmq.io/patterns/manually-fetching-jobs} + */ + async startStalledCheckTimer(): Promise { + if (!this.opts.skipStalledCheck) { + if (!this.closing) { + await this.trace( + SpanKind.INTERNAL, + 'startStalledCheckTimer', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + }); + + this.stalledChecker().catch(err => { + this.emit('error', err); + }); + }, + ); + } + } + } + + private async stalledChecker() { + while (!this.closing) { + try { + await this.checkConnectionError(() => this.moveStalledJobsToWait()); + } catch (err) { + this.emit('error', err); + } + + await new Promise(resolve => { + const timeout = setTimeout(resolve, this.opts.stalledInterval); + this.stalledCheckStopper = () => { + clearTimeout(timeout); + resolve(); + }; + }); + } + } + + private startLockExtenderTimer( + jobsInProgress: Set<{ job: Job; ts: number }>, + ): void { + if (!this.opts.skipLockRenewal) { + clearTimeout(this.extendLocksTimer); + + if (!this.closed) { + this.extendLocksTimer = setTimeout(async () => { + // Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime + const now = Date.now(); + const jobsToExtend = []; + + for (const item of jobsInProgress) { + const { job, ts } = item; + if (!ts) { + item.ts = now; + continue; + } + + if (ts + this.opts.lockRenewTime / 2 < now) { + item.ts = now; + jobsToExtend.push(job); + } + } + + try { + if (jobsToExtend.length) { + await this.extendLocks(jobsToExtend); + } + } catch (err) { + this.emit('error', err); + } + + this.startLockExtenderTimer(jobsInProgress); + }, this.opts.lockRenewTime / 2); + } + } + } + + /** + * Returns a promise that resolves when active jobs are cleared + * + * @returns + */ + private async whenCurrentJobsFinished(reconnect = true) { + // + // Force reconnection of blocking connection to abort blocking redis call immediately. + // + if (this.waiting) { + // If we are not going to reconnect, we will not wait for the disconnection. + await this.blockingConnection.disconnect(reconnect); + } else { + reconnect = false; + } + + if (this.asyncFifoQueue) { + await this.asyncFifoQueue.waitAll(); + } + + reconnect && (await this.blockingConnection.reconnect()); + } + + private async retryIfFailed(fn: () => Promise, delayInMs: number) { + const retry = 1; + do { + try { + return await fn(); + } catch (err) { + this.emit('error', err); + if (delayInMs) { + await this.delay(delayInMs); + } else { + return; + } + } + } while (retry); + } + + protected async extendLocks(jobs: Job[]) { + await this.trace( + SpanKind.INTERNAL, + 'extendLocks', + this.name, + async span => { + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerJobsToExtendLocks]: jobs.map( + job => job.id, + ), + }); + + try { + const pipeline = (await this.client).pipeline(); + for (const job of jobs) { + await this.scripts.extendLock( + job.id, + job.token, + this.opts.lockDuration, + pipeline, + ); + } + const result = (await pipeline.exec()) as [Error, string][]; + + for (const [err, jobId] of result) { + if (err) { + // TODO: signal process function that the job has been lost. + this.emit( + 'error', + new Error(`could not renew lock for job ${jobId}`), + ); + } + } + } catch (err) { + this.emit('error', err); + } + }, + ); + } + + private async moveStalledJobsToWait() { + await this.trace( + SpanKind.INTERNAL, + 'moveStalledJobsToWait', + this.name, + async span => { + const chunkSize = 50; + const [failed, stalled] = await this.scripts.moveStalledJobsToWait(); + + span?.setAttributes({ + [TelemetryAttributes.WorkerId]: this.id, + [TelemetryAttributes.WorkerName]: this.opts.name, + [TelemetryAttributes.WorkerStalledJobs]: stalled, + [TelemetryAttributes.WorkerFailedJobs]: failed, + }); + + stalled.forEach((jobId: string) => { + span?.addEvent('job stalled', { + [TelemetryAttributes.JobId]: jobId, + }); + this.emit('stalled', jobId, 'active'); + }); + + const jobPromises: Promise>[] = []; + for (let i = 0; i < failed.length; i++) { + jobPromises.push( + Job.fromId( + this as MinimalQueue, + failed[i], + ), + ); + + if ((i + 1) % chunkSize === 0) { + this.notifyFailedJobs(await Promise.all(jobPromises)); + jobPromises.length = 0; + } + } + + this.notifyFailedJobs(await Promise.all(jobPromises)); + }, + ); + } + + private notifyFailedJobs( + failedJobs: Job[], + span?: Span, + ) { + const failedReason = 'job stalled more than allowable limit'; + + failedJobs.forEach((job: Job) => { + span?.addEvent('job failed', { + [TelemetryAttributes.JobId]: job.id, + [TelemetryAttributes.JobName]: job.name, + [TelemetryAttributes.JobFailedReason]: failedReason, + }); + this.emit('failed', job, new Error(failedReason), 'active'); + }); + } + + private moveLimitedBackToWait( + job: Job, + token: string, + ) { + return this.scripts.moveJobFromActiveToWait(job.id, token); + } +}