From 5b005cd94ba0f98677bed4a44f8669c81f073f26 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Sun, 8 Dec 2024 11:56:32 -0600 Subject: [PATCH] feat(queue): enhance getJobSchedulers method to include template information (#2956) ref #2875 --- src/classes/job-scheduler.ts | 55 ++++++++++++++---------------- src/classes/queue.ts | 10 ++++-- src/classes/scripts.ts | 14 ++++++++ src/commands/getJobScheduler-1.lua | 19 +++++++++++ src/commands/updateJobOption-1.lua | 26 -------------- src/interfaces/telemetry.ts | 16 ++++----- tests/test_job_scheduler.ts | 34 ++++++++++++++++-- tests/test_metrics.ts | 2 +- 8 files changed, 106 insertions(+), 70 deletions(-) create mode 100644 src/commands/getJobScheduler-1.lua delete mode 100644 src/commands/updateJobOption-1.lua diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 1f3493c6e6..bd47466ce5 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -11,7 +11,7 @@ import { Job } from './job'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; -import { optsAsJSON, optsFromJSON } from '../utils'; +import { array2obj, optsAsJSON, optsFromJSON } from '../utils'; export class JobScheduler extends QueueBase { private repeatStrategy: RepeatStrategy; @@ -202,13 +202,21 @@ export class JobScheduler extends QueueBase { return this.scripts.removeJobScheduler(jobSchedulerId); } - private async getSchedulerData( + private async getSchedulerData( client: RedisClient, key: string, next?: number, - ): Promise { + ): Promise> { const jobData = await client.hgetall(this.toKey('repeat:' + key)); + return this.transformSchedulerData(key, jobData, next); + } + + private async transformSchedulerData( + key: string, + jobData: any, + next?: number, + ): Promise> { if (jobData) { return { key, @@ -217,6 +225,11 @@ export class JobScheduler extends QueueBase { tz: jobData.tz || null, pattern: jobData.pattern || null, every: jobData.every || null, + ...(jobData.data || jobData.opts + ? { + template: this.getTemplateFromJSON(jobData.data, jobData.opts), + } + : {}), next, }; } @@ -239,30 +252,14 @@ export class JobScheduler extends QueueBase { }; } - async getJobScheduler(id: string): Promise> { - const client = await this.client; - const schedulerAttributes = await client.hgetall( - this.toKey('repeat:' + id), - ); + async getScheduler(id: string): Promise> { + const [rawJobData, next] = await this.scripts.getJobScheduler(id); - if (schedulerAttributes) { - return { - key: id, - name: schedulerAttributes.name, - endDate: parseInt(schedulerAttributes.endDate) || null, - tz: schedulerAttributes.tz || null, - pattern: schedulerAttributes.pattern || null, - every: schedulerAttributes.every || null, - ...(schedulerAttributes.data || schedulerAttributes.opts - ? { - template: this.getTemplateFromJSON( - schedulerAttributes.data, - schedulerAttributes.opts, - ), - } - : {}), - }; - } + return this.transformSchedulerData( + id, + rawJobData ? array2obj(rawJobData) : null, + next ? parseInt(next) : null, + ); } private getTemplateFromJSON( @@ -279,11 +276,11 @@ export class JobScheduler extends QueueBase { return template; } - async getJobSchedulers( + async getJobSchedulers( start = 0, end = -1, asc = false, - ): Promise { + ): Promise[]> { const client = await this.client; const jobSchedulersKey = this.keys.repeat; @@ -294,7 +291,7 @@ export class JobScheduler extends QueueBase { const jobs = []; for (let i = 0; i < result.length; i += 2) { jobs.push( - this.getSchedulerData(client, result[i], parseInt(result[i + 1])), + this.getSchedulerData(client, result[i], parseInt(result[i + 1])), ); } return Promise.all(jobs); diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 3afa5691c0..1525d13254 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -573,7 +573,7 @@ export class Queue< * @param id - identifier of scheduler. */ async getJobScheduler(id: string): Promise> { - return (await this.jobScheduler).getJobScheduler(id); + return (await this.jobScheduler).getScheduler(id); } /** @@ -588,8 +588,12 @@ export class Queue< start?: number, end?: number, asc?: boolean, - ): Promise { - return (await this.jobScheduler).getJobSchedulers(start, end, asc); + ): Promise[]> { + return (await this.jobScheduler).getJobSchedulers( + start, + end, + asc, + ); } /** diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 0897d905a9..dc487b42d1 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -1088,6 +1088,20 @@ export class Scripts { ]); } + getJobSchedulerArgs(id: string): string[] { + const keys: string[] = [this.queue.keys.repeat]; + + return keys.concat([id]); + } + + async getJobScheduler(id: string): Promise<[any, string | null]> { + const client = await this.queue.client; + + const args = this.getJobSchedulerArgs(id); + + return this.execCommand(client, 'getJobScheduler', args); + } + retryJobArgs( jobId: string, lifo: boolean, diff --git a/src/commands/getJobScheduler-1.lua b/src/commands/getJobScheduler-1.lua new file mode 100644 index 0000000000..324bdb58eb --- /dev/null +++ b/src/commands/getJobScheduler-1.lua @@ -0,0 +1,19 @@ +--[[ + Get job scheduler record. + + Input: + KEYS[1] 'repeat' key + + ARGV[1] id +]] + +local rcall = redis.call +local jobSchedulerKey = KEYS[1] .. ":" .. ARGV[1] + +local score = rcall("ZSCORE", KEYS[1], ARGV[1]) + +if score then + return {rcall("HGETALL", jobSchedulerKey), score} -- get job data +end + +return {nil, nil} diff --git a/src/commands/updateJobOption-1.lua b/src/commands/updateJobOption-1.lua deleted file mode 100644 index 03949faf29..0000000000 --- a/src/commands/updateJobOption-1.lua +++ /dev/null @@ -1,26 +0,0 @@ ---[[ - Update a job option - - Input: - KEYS[1] Job id key - - ARGV[1] field - ARGV[2] value - - Output: - 0 - OK - -1 - Missing job. -]] -local rcall = redis.call - -if rcall("EXISTS", KEYS[1]) == 1 then -- // Make sure job exists - - local opts = rcall("HGET", KEYS[1], "opts") - local jsonOpts = cjson.decode(opts) - jsonOpts[ARGV[1]] = ARGV[2] - - rcall("HSET", KEYS[1], "opts", cjson.encode(jsonOpts)) - return 0 -else - return -1 -end diff --git a/src/interfaces/telemetry.ts b/src/interfaces/telemetry.ts index e55c0990dc..d39794d34a 100644 --- a/src/interfaces/telemetry.ts +++ b/src/interfaces/telemetry.ts @@ -36,8 +36,8 @@ export interface ContextManager { /** * Creates a new context and sets it as active for the fn passed as last argument * - * @param context - * @param fn + * @param context - + * @param fn - */ with any>( context: Context, @@ -54,7 +54,7 @@ export interface ContextManager { * is the mechanism used to propagate the context across a distributed * application. * - * @param context + * @param context - */ getMetadata(context: Context): string; @@ -62,8 +62,8 @@ export interface ContextManager { * Creates a new context from a serialized version effectively * linking the new context to the parent context. * - * @param activeContext - * @param metadata + * @param activeContext - + * @param metadata - */ fromMetadata(activeContext: Context, metadata: string): Context; } @@ -78,9 +78,9 @@ export interface Tracer { * context. If the context is not provided, the current active context should be * used. * - * @param name - * @param options - * @param context + * @param name - + * @param options - + * @param context - */ startSpan(name: string, options?: SpanOptions, context?: Context): Span; } diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index ee329207aa..427cb99cee 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -342,7 +342,7 @@ describe('Job Scheduler', function () { ); const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); - const date = new Date('2017-02-07 9:24:00'); + const date = new Date('2017-02-07T15:24:00.000Z'); this.clock.setSystemTime(date); await queue.upsertJobScheduler( @@ -360,6 +360,7 @@ describe('Job Scheduler', function () { tz: null, pattern: '*/2 * * * * *', every: null, + next: 1486481042000, template: { data: { foo: 'bar', @@ -682,7 +683,7 @@ describe('Job Scheduler', function () { ); const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); - const date = new Date('2017-02-07 9:24:00'); + const date = new Date('2017-02-07T15:24:00.000Z'); this.clock.setSystemTime(date); const repeat = { @@ -698,6 +699,7 @@ describe('Job Scheduler', function () { key: 'rrule', name: 'rrule', endDate: null, + next: 1486481042000, tz: null, pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO', every: null, @@ -1424,8 +1426,11 @@ describe('Job Scheduler', function () { describe('when repeatable job fails', function () { it('should continue repeating', async function () { + const date = new Date('2017-02-07T15:24:00.000Z'); + this.clock.setSystemTime(date); const repeatOpts = { pattern: '0 * 1 * *', + tz: 'Asia/Calcutta', }; const worker = new Worker( @@ -1445,7 +1450,11 @@ describe('Job Scheduler', function () { }); }); - const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts, { + name: 'a', + data: { foo: 'bar' }, + opts: { priority: 1 }, + }); const delayedCount = await queue.getDelayedCount(); expect(delayedCount).to.be.equal(1); @@ -1463,6 +1472,25 @@ describe('Job Scheduler', function () { const count = await queue.count(); expect(count).to.be.equal(1); expect(jobSchedulers).to.have.length(1); + + expect(jobSchedulers[0]).to.deep.equal({ + key: 'test', + name: 'a', + endDate: null, + tz: 'Asia/Calcutta', + pattern: '0 * 1 * *', + every: null, + next: 1488310200000, + template: { + data: { + foo: 'bar', + }, + opts: { + priority: 1, + }, + }, + }); + await worker.close(); }); diff --git a/tests/test_metrics.ts b/tests/test_metrics.ts index 6433afc8b2..f59d4e44d7 100644 --- a/tests/test_metrics.ts +++ b/tests/test_metrics.ts @@ -28,7 +28,7 @@ describe('metrics', function () { }); beforeEach(function () { - this.clock = sinon.useFakeTimers(); + this.clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); }); beforeEach(async function () {