From 7ba27293615e443903cfdf7d0ff8be0052d061c4 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Mon, 19 Feb 2024 23:03:47 +0100 Subject: [PATCH] feat(worker): add support for naming workers --- src/classes/job.ts | 9 +++++ src/classes/queue-events.ts | 1 + src/classes/queue-getters.ts | 30 ++++++++------ src/classes/scripts.ts | 3 +- src/classes/worker.ts | 30 ++++++++------ .../includes/prepareJobForProcessing.lua | 5 +++ src/interfaces/job-json.ts | 2 + src/interfaces/worker-options.ts | 7 ++++ src/utils.ts | 2 - tests/test_getters.ts | 39 +++++++++++++++++++ tests/test_worker.ts | 29 ++++++++++++++ 11 files changed, 131 insertions(+), 26 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index 806508c34c..b10bf70084 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -147,6 +147,11 @@ export class Job< */ token?: string; + /** + * The worker name that is processing or processed this job. + */ + processedBy?: string; + protected toKey: (type: string) => string; protected discarded: boolean; @@ -338,6 +343,10 @@ export class Job< job.parent = JSON.parse(json.parent); } + if (json.pb) { + job.processedBy = json.pb; + } + return job; } diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index 4e329c44ba..5ed3b25423 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -236,6 +236,7 @@ export class QueueEvents extends QueueBase { this.running = true; const client = await this.client; + // Planed for deprecation as it has no really a use case try { await client.client('SETNAME', this.clientName(QUEUE_EVENT_SUFFIX)); } catch (err) { diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index a734e89bb8..928099b2be 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -3,11 +3,7 @@ import { QueueBase } from './queue-base'; import { Job } from './job'; -import { - clientCommandMessageReg, - QUEUE_EVENT_SUFFIX, - WORKER_SUFFIX, -} from '../utils'; +import { clientCommandMessageReg, QUEUE_EVENT_SUFFIX } from '../utils'; import { JobState, JobType } from '../types'; import { JobJsonRaw, Metrics } from '../interfaces'; @@ -432,7 +428,7 @@ export class QueueGetters< }; } - private async baseGetClients(suffix: string): Promise< + private async baseGetClients(matcher: (name: string) => boolean): Promise< { [index: string]: string; }[] @@ -440,7 +436,7 @@ export class QueueGetters< const client = await this.client; const clients = (await client.client('LIST')) as string; try { - const list = this.parseClientList(clients, suffix); + const list = this.parseClientList(clients, matcher); return list; } catch (err) { if (!clientCommandMessageReg.test((err).message)) { @@ -463,13 +459,23 @@ export class QueueGetters< [index: string]: string; }[] > { - return this.baseGetClients(WORKER_SUFFIX); + const unnamedWorkerClientName = `${this.clientName()}`; + const namedWorkerClientName = `${this.clientName()}:w:`; + + const matcher = (name: string) => + name && + (name === unnamedWorkerClientName || + name.startsWith(namedWorkerClientName)); + + return this.baseGetClients(matcher); } /** * Get queue events list related to the queue. * Note: GCP does not support SETNAME, so this call will not work * + * @deprecated do not use this method, it will be removed in the future. + * * @returns - Returns an array with queue events info. */ async getQueueEvents(): Promise< @@ -477,7 +483,8 @@ export class QueueGetters< [index: string]: string; }[] > { - return this.baseGetClients(QUEUE_EVENT_SUFFIX); + const clientName = `${this.clientName()}${QUEUE_EVENT_SUFFIX}`; + return this.baseGetClients((name: string) => name === clientName); } /** @@ -531,7 +538,7 @@ export class QueueGetters< }; } - private parseClientList(list: string, suffix = '') { + private parseClientList(list: string, matcher: (name: string) => boolean) { const lines = list.split('\n'); const clients: { [index: string]: string }[] = []; @@ -545,8 +552,9 @@ export class QueueGetters< client[key] = value; }); const name = client['name']; - if (name && name === `${this.clientName()}${suffix ? `${suffix}` : ''}`) { + if (matcher(name)) { client['name'] = this.name; + client['rawname'] = name; clients.push(client); } }); diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 6f65edd48c..bd23d2e46a 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -943,7 +943,7 @@ export class Scripts { } } - async moveToActive(client: RedisClient, token: string) { + async moveToActive(client: RedisClient, token: string, name?: string) { const opts = this.queue.opts as WorkerOptions; const queueKeys = this.queue.keys; @@ -968,6 +968,7 @@ export class Scripts { token, lockDuration: opts.lockDuration, limiter: opts.limiter, + name, }), ]; diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 519f643bf9..d929eb0069 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -21,7 +21,6 @@ import { DELAY_TIME_1, isNotConnectionError, isRedisInstance, - WORKER_SUFFIX, } from '../utils'; import { QueueBase } from './queue-base'; import { Repeat } from './repeat'; @@ -256,16 +255,21 @@ export class Worker< } } - const mainFile = this.opts.useWorkerThreads - ? 'main-worker.js' - : 'main.js'; - let mainFilePath = path.join( - path.dirname(module.filename), - `${mainFile}`, - ); + // Separate paths so that bundling tools can resolve dependencies easier + const dirname = path.dirname(module.filename || __filename); + const workerThreadsMainFile = path.join(dirname, 'main-worker.js'); + const spawnProcessMainFile = path.join(dirname, 'main.js'); + + let mainFilePath = this.opts.useWorkerThreads + ? workerThreadsMainFile + : spawnProcessMainFile; + try { fs.statSync(mainFilePath); // would throw if file not exists } catch (_) { + const mainFile = this.opts.useWorkerThreads + ? 'main-worker.js' + : 'main.js'; mainFilePath = path.join( process.cwd(), `dist/cjs/classes/${mainFile}`, @@ -289,7 +293,8 @@ export class Worker< } } - const connectionName = this.clientName(WORKER_SUFFIX); + const connectionName = + this.clientName() + (this.opts.name ? `:w:${this.opts.name}` : ''); this.blockingConnection = new RedisConnection( isRedisInstance(opts.connection) ? (opts.connection).duplicate({ connectionName }) @@ -530,7 +535,7 @@ export class Worker< this.blockUntil = await this.waiting; if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 10) { - return this.moveToActive(client, token); + return this.moveToActive(client, token, this.opts.name); } } catch (err) { // Swallow error if locally paused or closing since we did force a disconnection @@ -549,7 +554,7 @@ export class Worker< this.abortDelayController = new AbortController(); await this.delay(this.limitUntil, this.abortDelayController); } - return this.moveToActive(client, token); + return this.moveToActive(client, token, this.opts.name); } } @@ -572,9 +577,10 @@ export class Worker< protected async moveToActive( client: RedisClient, token: string, + name?: string, ): Promise> { const [jobData, id, limitUntil, delayUntil] = - await this.scripts.moveToActive(client, token); + await this.scripts.moveToActive(client, token, name); this.updateDelays(limitUntil, delayUntil); return this.nextJobFromJobData(jobData, id, token); diff --git a/src/commands/includes/prepareJobForProcessing.lua b/src/commands/includes/prepareJobForProcessing.lua index e6a36094eb..4786e5fa38 100644 --- a/src/commands/includes/prepareJobForProcessing.lua +++ b/src/commands/includes/prepareJobForProcessing.lua @@ -29,6 +29,11 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration']) end + if opts['name'] then + -- Set "processedBy" field to the worker name + rcall("HSET", jobKey, "pb", opts['name']) + end + rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting") rcall("HSET", jobKey, "processedOn", processedOn) rcall("HINCRBY", jobKey, "ats", 1) diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index 6d601f88ae..38d27e6d8a 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -18,6 +18,7 @@ export interface JobJson { parent?: ParentKeys; parentKey?: string; repeatJobKey?: string; + processedBy?: string; } export interface JobJsonRaw { @@ -39,4 +40,5 @@ export interface JobJsonRaw { rjk?: string; atm?: string; ats?: string; + pb?: string; // Worker name } diff --git a/src/interfaces/worker-options.ts b/src/interfaces/worker-options.ts index 4eb33ed687..77a204a23f 100644 --- a/src/interfaces/worker-options.ts +++ b/src/interfaces/worker-options.ts @@ -14,6 +14,13 @@ export type Processor = ( ) => Promise; export interface WorkerOptions extends QueueBaseOptions { + /** + * Optional worker name. The name will be stored on every job + * processed by this worker instance, and can be used to monitor + * which worker is processing or has processed a given job. + */ + name?: string; + /** * Condition to start processor at instance creation. * diff --git a/src/utils.ts b/src/utils.ts index 39ee09802a..ed19d84621 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -212,6 +212,4 @@ export const errorToJSON = (value: any): Record => { return error; }; -export const WORKER_SUFFIX = ''; - export const QUEUE_EVENT_SUFFIX = ':qe'; diff --git a/tests/test_getters.ts b/tests/test_getters.ts index 2da7da45df..3df22701f0 100644 --- a/tests/test_getters.ts +++ b/tests/test_getters.ts @@ -89,6 +89,45 @@ describe('Jobs getters', function () { await worker2.close(); }); + it('gets all workers including their names', async function () { + const worker = new Worker(queueName, async () => {}, { + autorun: false, + connection, + prefix, + name: 'worker1', + }); + await new Promise(resolve => { + worker.on('ready', () => { + resolve(); + }); + }); + + const workers = await queue.getWorkers(); + expect(workers).to.have.length(1); + + const worker2 = new Worker(queueName, async () => {}, { + autorun: false, + connection, + prefix, + name: 'worker2', + }); + await new Promise(resolve => { + worker2.on('ready', () => { + resolve(); + }); + }); + + const nextWorkers = await queue.getWorkers(); + expect(nextWorkers).to.have.length(2); + + // Check that the worker names are included in the response on the rawname property + expect(nextWorkers[0].rawname.endsWith('worker1')).to.be.true; + expect(nextWorkers[1].rawname.endsWith('worker2')).to.be.true; + + await worker.close(); + await worker2.close(); + }); + it('gets only workers related only to one queue', async function () { const queueName2 = `${queueName}2`; const queue2 = new Queue(queueName2, { connection, prefix }); diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 6b339ef546..ded040f315 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -695,6 +695,35 @@ describe('workers', function () { await worker.close(); }); + it('sets the worker name on the job upon processing', async () => { + let worker; + const processing = new Promise(async (resolve, reject) => { + worker = new Worker( + queueName, + async job => { + const fetchedJob = await queue.getJob(job.id!); + + try { + expect(fetchedJob).to.be.ok; + expect(fetchedJob!.processedBy).to.be.equal(worker.opts.name); + } catch (err) { + reject(err); + } + + resolve(); + }, + { connection, prefix, name: 'foobar' }, + ); + await worker.waitUntilReady(); + }); + + await queue.add('test', { foo: 'bar' }); + + await processing; + + await worker.close(); + }); + it('retry a job that fails', async () => { let failedOnce = false; const notEvenErr = new Error('Not even!');