From 4ec119817b8f67d0f567eb5bdc3eca64d877ce4e Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 23 Nov 2024 13:51:14 -0500 Subject: [PATCH 1/9] feat(queue): add getJobSchedulerTemplate method --- src/classes/job-scheduler.ts | 4 ++++ src/classes/queue.ts | 27 ++++++++++++++++++++++ src/classes/scripts.ts | 23 ++++++++++++++++++ src/commands/getJobSchedulerTemplate-2.lua | 26 +++++++++++++++++++++ tests/test_job_scheduler.ts | 7 ++++++ 5 files changed, 87 insertions(+) create mode 100644 src/commands/getJobSchedulerTemplate-2.lua diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 1cec4caa5c..66bda6abc9 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -257,6 +257,10 @@ export class JobScheduler extends QueueBase { } } + async getJobSchedulerTemplate(id: string): Promise { + return this.scripts.getJobSchedulerTemplate(id); + } + async getJobSchedulers( start = 0, end = -1, diff --git a/src/classes/queue.ts b/src/classes/queue.ts index f642233ac3..ebc321ddf6 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -3,6 +3,7 @@ import { BaseJobOptions, BulkJobOptions, IoredisListener, + JobJsonRaw, QueueOptions, RepeatableJob, RepeatOptions, @@ -239,6 +240,17 @@ export class Queue< return await client.hget(this.keys.meta, 'version'); } + protected createJob( + data: JobJsonRaw, + jobId: string, + ): Job { + return this.Job.fromJSON(this as MinimalQueue, data, jobId) as Job< + DataType, + ResultType, + NameType + >; + } + get repeat(): Promise { return new Promise(async resolve => { if (!this._repeat) { @@ -575,6 +587,21 @@ export class Queue< return (await this.jobScheduler).getJobScheduler(id); } + /** + * Get Job Scheduler template by id + * + * @param id - identifier of scheduler. + */ + async getJobSchedulerTemplate( + id: string, + ): Promise> { + const jobScheduler = await this.jobScheduler; + const [jobData, jobId] = await jobScheduler.getJobSchedulerTemplate(id); + + console.log('gg', jobData, jobId); + return this.createJob(jobData, jobId); + } + /** * Get all Job Schedulers * diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index c9156b291d..d3b7f800f0 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -267,6 +267,29 @@ export class Scripts { return this.execCommand(client, 'pause', args); } + protected getJobSchedulerTemplateArgs(id: string) { + const queueKeys = this.queue.keys; + const keys: string[] = [queueKeys.repeat, queueKeys['']]; + + const args = [id]; + + return keys.concat(args); + } + + async getJobSchedulerTemplate(id: string): Promise { + const client = await this.queue.client; + + const args = this.getJobSchedulerTemplateArgs(id); + + const result = await this.execCommand( + client, + 'getJobSchedulerTemplate', + args, + ); + + return raw2NextJobData(result); + } + protected addRepeatableJobArgs( customKey: string, nextMillis: number, diff --git a/src/commands/getJobSchedulerTemplate-2.lua b/src/commands/getJobSchedulerTemplate-2.lua new file mode 100644 index 0000000000..40bdf84345 --- /dev/null +++ b/src/commands/getJobSchedulerTemplate-2.lua @@ -0,0 +1,26 @@ + +--[[ + Get job scheduler template + Input: + KEYS[1] repeat key + KEYS[2] prefix key + + ARGV[1] job scheduler id +]] +local rcall = redis.call + +local millis = rcall("ZSCORE", KEYS[1], ARGV[1]) + +rcall('SET', 'DEBUG1', 'here') + +if millis ~= false then + rcall('SET', 'DEBUG', millis) + local templateJobId = "repeat:" .. ARGV[1] .. ":" .. millis + + rcall('SET', 'DEBUG2', templateJobId) + rcall('SET', 'DEBUG3', KEYS[2] .. templateJobId) + + return {rcall("HGETALL", KEYS[2] .. templateJobId), templateJobId} -- get job data +end + +return {0, 0} \ No newline at end of file diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index a1093895fd..6fabbddc5f 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -362,6 +362,13 @@ describe('Job Scheduler', function () { every: null, }); + const jobSchedulerTemplate = await queue.getJobSchedulerTemplate('test'); + + expect(jobSchedulerTemplate.id?.startsWith('repeat:test')).to.be.true; + expect(jobSchedulerTemplate.data).to.deep.equal({ + foo: 'bar', + }); + this.clock.tick(nextTick); let prev: any; From db7ac3f17e15cc19b3679764a6a9cc0d830554bc Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 23 Nov 2024 13:54:38 -0500 Subject: [PATCH 2/9] chore: remove console --- src/classes/queue.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index ebc321ddf6..2bda7f94cc 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -598,7 +598,6 @@ export class Queue< const jobScheduler = await this.jobScheduler; const [jobData, jobId] = await jobScheduler.getJobSchedulerTemplate(id); - console.log('gg', jobData, jobId); return this.createJob(jobData, jobId); } From 7fc51e2e33468854c3beb460077489cec5ce64dc Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sat, 23 Nov 2024 14:33:28 -0500 Subject: [PATCH 3/9] chore: remove debug statements --- src/commands/getJobSchedulerTemplate-2.lua | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/commands/getJobSchedulerTemplate-2.lua b/src/commands/getJobSchedulerTemplate-2.lua index 40bdf84345..e84092d76a 100644 --- a/src/commands/getJobSchedulerTemplate-2.lua +++ b/src/commands/getJobSchedulerTemplate-2.lua @@ -11,15 +11,9 @@ local rcall = redis.call local millis = rcall("ZSCORE", KEYS[1], ARGV[1]) -rcall('SET', 'DEBUG1', 'here') - if millis ~= false then - rcall('SET', 'DEBUG', millis) local templateJobId = "repeat:" .. ARGV[1] .. ":" .. millis - rcall('SET', 'DEBUG2', templateJobId) - rcall('SET', 'DEBUG3', KEYS[2] .. templateJobId) - return {rcall("HGETALL", KEYS[2] .. templateJobId), templateJobId} -- get job data end From ce1d9a4a7cd7af9b60b02cc34b7b9f7059b3b166 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 25 Nov 2024 22:45:01 -0600 Subject: [PATCH 4/9] refactor(scheduler): change method name --- src/classes/job-scheduler.ts | 4 ++-- src/classes/job.ts | 2 +- src/classes/queue.ts | 11 ++++++++--- src/commands/getJobSchedulerTemplate-2.lua | 5 +++-- tests/test_job_scheduler.ts | 16 +++++++++++++--- 5 files changed, 27 insertions(+), 11 deletions(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 66bda6abc9..b4af853c15 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -257,8 +257,8 @@ export class JobScheduler extends QueueBase { } } - async getJobSchedulerTemplate(id: string): Promise { - return this.scripts.getJobSchedulerTemplate(id); + async getJobTemplate(schedulerId: string): Promise { + return this.scripts.getJobSchedulerTemplate(schedulerId); } async getJobSchedulers( diff --git a/src/classes/job.ts b/src/classes/job.ts index b63e5cdf49..74487bf2e1 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -388,7 +388,7 @@ export class Job< this.scripts = new Scripts(this.queue); } - private static optsFromJSON(rawOpts?: string): JobsOptions { + static optsFromJSON(rawOpts?: string): JobsOptions { const opts = JSON.parse(rawOpts || '{}'); const optionEntries = Object.entries(opts) as Array< diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 2bda7f94cc..7bf72a863b 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -594,11 +594,16 @@ export class Queue< */ async getJobSchedulerTemplate( id: string, - ): Promise> { + ): Promise<{ data: DataType; opts: JobsOptions }> { const jobScheduler = await this.jobScheduler; - const [jobData, jobId] = await jobScheduler.getJobSchedulerTemplate(id); + const [jobData] = await jobScheduler.getJobTemplate(id); - return this.createJob(jobData, jobId); + const data = JSON.parse(jobData.data || '{}'); + const opts = this.Job.optsFromJSON(jobData.opts); + return { + data, + opts, + }; } /** diff --git a/src/commands/getJobSchedulerTemplate-2.lua b/src/commands/getJobSchedulerTemplate-2.lua index e84092d76a..f978330278 100644 --- a/src/commands/getJobSchedulerTemplate-2.lua +++ b/src/commands/getJobSchedulerTemplate-2.lua @@ -1,6 +1,7 @@ --[[ - Get job scheduler template + Get job scheduler template. Taking the last iterations job's data and options + TODO: return a stored template in the job scheduler itself Input: KEYS[1] repeat key KEYS[2] prefix key @@ -14,7 +15,7 @@ local millis = rcall("ZSCORE", KEYS[1], ARGV[1]) if millis ~= false then local templateJobId = "repeat:" .. ARGV[1] .. ":" .. millis - return {rcall("HGETALL", KEYS[2] .. templateJobId), templateJobId} -- get job data + return {rcall("HGETALL", KEYS[2] .. templateJobId)} -- get job data end return {0, 0} \ No newline at end of file diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 6fabbddc5f..44f9bce370 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -362,12 +362,22 @@ describe('Job Scheduler', function () { every: null, }); - const jobSchedulerTemplate = await queue.getJobSchedulerTemplate('test'); + const { data, opts } = await queue.getJobSchedulerTemplate('test'); - expect(jobSchedulerTemplate.id?.startsWith('repeat:test')).to.be.true; - expect(jobSchedulerTemplate.data).to.deep.equal({ + expect(data).to.deep.equal({ foo: 'bar', }); + expect(opts).to.deep.equal({ + attempts: 0, + delay: 2000, + jobId: 'repeat:test:1486481042000', + prevMillis: 1486481042000, + repeat: { + count: 1, + pattern: '*/2 * * * * *', + }, + timestamp: 1486481040000, + }); this.clock.tick(nextTick); From 826ed0ff7a3a4b2c390978f6cb2c6720604db13e Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 26 Nov 2024 20:08:35 -0600 Subject: [PATCH 5/9] refactor(scheduler): save template data in scheduler --- src/classes/job-scheduler.ts | 7 ++- src/classes/job.ts | 45 ++----------------- src/classes/queue.ts | 14 ++++-- src/classes/scripts.ts | 35 +++++---------- src/commands/addJobScheduler-2.lua | 18 +++++--- src/commands/getJobSchedulerTemplate-2.lua | 21 --------- src/utils.ts | 52 ++++++++++++++++++++++ tests/test_job_scheduler.ts | 12 +---- 8 files changed, 92 insertions(+), 112 deletions(-) delete mode 100644 src/commands/getJobSchedulerTemplate-2.lua diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index b4af853c15..57649fc329 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -5,6 +5,7 @@ import { Job } from './job'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; +import { optsAsJSON, removeUndefinedFields } from '../utils'; export interface JobSchedulerJson { key: string; // key is actually the job scheduler id @@ -103,6 +104,8 @@ export class JobScheduler extends QueueBase { (multi) as RedisClient, jobSchedulerId, nextMillis, + JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), + optsAsJSON(opts), { name: jobName, endDate: endDate ? new Date(endDate).getTime() : undefined, @@ -257,10 +260,6 @@ export class JobScheduler extends QueueBase { } } - async getJobTemplate(schedulerId: string): Promise { - return this.scripts.getJobSchedulerTemplate(schedulerId); - } - async getJobSchedulers( start = 0, end = -1, diff --git a/src/classes/job.ts b/src/classes/job.ts index 74487bf2e1..d0ced3c6a0 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -30,6 +30,8 @@ import { parseObjectValues, tryCatch, removeUndefinedFields, + optsAsJSON, + optsFromJSON, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -324,7 +326,7 @@ export class Job< jobId?: string, ): Job { const data = JSON.parse(json.data || '{}'); - const opts = Job.optsFromJSON(json.opts); + const opts = optsFromJSON(json.opts); const job = new this( queue, @@ -388,27 +390,6 @@ export class Job< this.scripts = new Scripts(this.queue); } - static optsFromJSON(rawOpts?: string): JobsOptions { - const opts = JSON.parse(rawOpts || '{}'); - - const optionEntries = Object.entries(opts) as Array< - [keyof RedisJobOptions, any] - >; - - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if ((optsDecodeMap as Record)[attributeName]) { - options[(optsDecodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - - return options as JobsOptions; - } - /** * Fetches a Job from the queue given the passed job id. * @@ -469,7 +450,7 @@ export class Job< id: this.id, name: this.name, data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data), - opts: removeUndefinedFields(this.optsAsJSON(this.opts)), + opts: removeUndefinedFields(optsAsJSON(this.opts)), parent: this.parent ? { ...this.parent } : undefined, parentKey: this.parentKey, progress: this.progress, @@ -487,24 +468,6 @@ export class Job< }); } - private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { - const optionEntries = Object.entries(opts) as Array< - [keyof JobsOptions, any] - >; - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if ((optsEncodeMap as Record)[attributeName]) { - options[(optsEncodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - - return options as RedisJobOptions; - } - /** * Prepares a job to be passed to Sandbox. * @returns diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 7bf72a863b..f4bd88c9ed 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -16,6 +16,7 @@ import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; import { JobScheduler } from './job-scheduler'; import { version } from '../version'; +import { optsFromJSON } from '../utils'; export interface ObliterateOpts { /** @@ -595,11 +596,16 @@ export class Queue< async getJobSchedulerTemplate( id: string, ): Promise<{ data: DataType; opts: JobsOptions }> { - const jobScheduler = await this.jobScheduler; - const [jobData] = await jobScheduler.getJobTemplate(id); + const client = await this.client; + + const [templateData, templateOpts] = await client.hmget( + `${this.keys.repeat}:${id}`, + 'data', + 'opts', + ); - const data = JSON.parse(jobData.data || '{}'); - const opts = this.Job.optsFromJSON(jobData.opts); + const data = JSON.parse(templateData || '{}'); + const opts = optsFromJSON(templateOpts); return { data, opts, diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index d3b7f800f0..0897d905a9 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -267,29 +267,6 @@ export class Scripts { return this.execCommand(client, 'pause', args); } - protected getJobSchedulerTemplateArgs(id: string) { - const queueKeys = this.queue.keys; - const keys: string[] = [queueKeys.repeat, queueKeys['']]; - - const args = [id]; - - return keys.concat(args); - } - - async getJobSchedulerTemplate(id: string): Promise { - const client = await this.queue.client; - - const args = this.getJobSchedulerTemplateArgs(id); - - const result = await this.execCommand( - client, - 'getJobSchedulerTemplate', - args, - ); - - return raw2NextJobData(result); - } - protected addRepeatableJobArgs( customKey: string, nextMillis: number, @@ -334,6 +311,8 @@ export class Scripts { client: RedisClient, jobSchedulerId: string, nextMillis: number, + templateData: string, + templateOpts: RedisJobOptions, opts: RepeatableOptions, ): Promise { const queueKeys = this.queue.keys; @@ -342,7 +321,15 @@ export class Scripts { queueKeys.repeat, queueKeys.delayed, ]; - const args = [nextMillis, pack(opts), jobSchedulerId, queueKeys['']]; + + const args = [ + nextMillis, + pack(opts), + jobSchedulerId, + templateData, + pack(templateOpts), + queueKeys[''], + ]; return this.execCommand(client, 'addJobScheduler', keys.concat(args)); } diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-2.lua index 8b7a8084eb..69ee57d63d 100644 --- a/src/commands/addJobScheduler-2.lua +++ b/src/commands/addJobScheduler-2.lua @@ -13,7 +13,9 @@ [4] endDate? [5] every? ARGV[3] jobs scheduler id - ARGV[4] prefix key + ARGV[4] Json stringified template data + ARGV[5] mspacked template opts + ARGV[6] prefix key Output: repeatableKey - OK @@ -24,13 +26,14 @@ local delayedKey = KEYS[2] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[3] -local prefixKey = ARGV[4] +local templateOpts = cmsgpack.unpack(ARGV[5]) +local prefixKey = ARGV[6] -- Includes --- @include "includes/removeJob" -local function storeRepeatableJob(repeatKey, nextMillis, rawOpts) - rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) +local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts) + rcall("ZADD", repeatKey, nextMillis, schedulerId) local opts = cmsgpack.unpack(rawOpts) local optionalValues = {} @@ -54,8 +57,9 @@ local function storeRepeatableJob(repeatKey, nextMillis, rawOpts) table.insert(optionalValues, opts['every']) end - rcall("HMSET", repeatKey .. ":" .. jobSchedulerId, "name", opts['name'], - unpack(optionalValues)) + local jsonTemplateOpts = cjson.encode(templateOpts) + rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'], "data", templateData, + "opts", jsonTemplateOpts, unpack(optionalValues)) end -- If we are overriding a repeatable job we must delete the delayed job for @@ -74,4 +78,4 @@ if prevMillis ~= false then end end -return storeRepeatableJob(repeatKey, nextMillis, ARGV[2]) +return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts) diff --git a/src/commands/getJobSchedulerTemplate-2.lua b/src/commands/getJobSchedulerTemplate-2.lua deleted file mode 100644 index f978330278..0000000000 --- a/src/commands/getJobSchedulerTemplate-2.lua +++ /dev/null @@ -1,21 +0,0 @@ - ---[[ - Get job scheduler template. Taking the last iterations job's data and options - TODO: return a stored template in the job scheduler itself - Input: - KEYS[1] repeat key - KEYS[2] prefix key - - ARGV[1] job scheduler id -]] -local rcall = redis.call - -local millis = rcall("ZSCORE", KEYS[1], ARGV[1]) - -if millis ~= false then - local templateJobId = "repeat:" .. ARGV[1] .. ":" .. millis - - return {rcall("HGETALL", KEYS[2] .. templateJobId)} -- get job data -end - -return {0, 0} \ No newline at end of file diff --git a/src/utils.ts b/src/utils.ts index 4efa2e3467..1bf2eac69c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -17,6 +17,7 @@ import { EventEmitter } from 'events'; import * as semver from 'semver'; import { SpanKind, TelemetryAttributes } from './enums'; +import { JobsOptions, RedisJobOptions } from './types'; export const errorObject: { [index: string]: any } = { value: null }; @@ -270,6 +271,57 @@ export const toString = (value: any): string => { export const QUEUE_EVENT_SUFFIX = ':qe'; +const optsDecodeMap = { + de: 'deduplication', + fpof: 'failParentOnFailure', + idof: 'ignoreDependencyOnFailure', + kl: 'keepLogs', + rdof: 'removeDependencyOnFailure', + tm: 'telemetryMetadata', +}; + +const optsEncodeMap = invertObject(optsDecodeMap); +optsEncodeMap.debounce = 'de'; + +export function optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { + const optionEntries = Object.entries(opts) as Array<[keyof JobsOptions, any]>; + const options: Partial> = {}; + for (const item of optionEntries) { + const [attributeName, value] = item; + if (value !== undefined) { + if ((optsEncodeMap as Record)[attributeName]) { + options[(optsEncodeMap as Record)[attributeName]] = + value; + } else { + options[attributeName] = value; + } + } + } + + return options as RedisJobOptions; +} + +export function optsFromJSON(rawOpts?: string): JobsOptions { + const opts = JSON.parse(rawOpts || '{}'); + + const optionEntries = Object.entries(opts) as Array< + [keyof RedisJobOptions, any] + >; + + const options: Partial> = {}; + for (const item of optionEntries) { + const [attributeName, value] = item; + if ((optsDecodeMap as Record)[attributeName]) { + options[(optsDecodeMap as Record)[attributeName]] = + value; + } else { + options[attributeName] = value; + } + } + + return options as JobsOptions; +} + export function removeUndefinedFields>( obj: Record, ) { diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 44f9bce370..0db1f98120 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -367,17 +367,7 @@ describe('Job Scheduler', function () { expect(data).to.deep.equal({ foo: 'bar', }); - expect(opts).to.deep.equal({ - attempts: 0, - delay: 2000, - jobId: 'repeat:test:1486481042000', - prevMillis: 1486481042000, - repeat: { - count: 1, - pattern: '*/2 * * * * *', - }, - timestamp: 1486481040000, - }); + expect(opts).to.deep.equal({}); this.clock.tick(nextTick); From a769497595f6cc78a02c5a81489a32562483c8a5 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 26 Nov 2024 20:11:44 -0600 Subject: [PATCH 6/9] chore: remove unused variable --- src/classes/job-scheduler.ts | 2 +- src/classes/job.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 57649fc329..9d41564972 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -5,7 +5,7 @@ import { Job } from './job'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; -import { optsAsJSON, removeUndefinedFields } from '../utils'; +import { optsAsJSON } from '../utils'; export interface JobSchedulerJson { key: string; // key is actually the job scheduler id diff --git a/src/classes/job.ts b/src/classes/job.ts index d0ced3c6a0..21a73cff81 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -450,7 +450,7 @@ export class Job< id: this.id, name: this.name, data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data), - opts: removeUndefinedFields(optsAsJSON(this.opts)), + opts: optsAsJSON(this.opts), parent: this.parent ? { ...this.parent } : undefined, parentKey: this.parentKey, progress: this.progress, From 8cf25d19a41a24c160f02e2efaa036ad668630e9 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 26 Nov 2024 21:31:59 -0600 Subject: [PATCH 7/9] refactor: enhance getJobScheduler method --- src/classes/job-scheduler.ts | 34 ++++++++++++++++++++++++---------- src/classes/queue.ts | 28 ++-------------------------- tests/test_job_scheduler.ts | 13 ++++++------- 3 files changed, 32 insertions(+), 43 deletions(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 9d41564972..d473fdc73b 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -5,9 +5,9 @@ import { Job } from './job'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; -import { optsAsJSON } from '../utils'; +import { optsAsJSON, optsFromJSON } from '../utils'; -export interface JobSchedulerJson { +export interface JobSchedulerJson { key: string; // key is actually the job scheduler id name: string; id?: string | null; @@ -16,6 +16,10 @@ export interface JobSchedulerJson { pattern: string | null; every?: string | null; next?: number; + template?: { + data: D; + opts: JobsOptions; + }; } export class JobScheduler extends QueueBase { @@ -244,18 +248,28 @@ export class JobScheduler extends QueueBase { }; } - async getJobScheduler(id: string): Promise { + async getJobScheduler(id: string): Promise> { const client = await this.client; - const jobData = await client.hgetall(this.toKey('repeat:' + id)); + const schedulerAttributes = await client.hgetall( + this.toKey('repeat:' + id), + ); - if (jobData) { + if (schedulerAttributes) { return { key: id, - name: jobData.name, - endDate: parseInt(jobData.endDate) || null, - tz: jobData.tz || null, - pattern: jobData.pattern || null, - every: jobData.every || null, + name: schedulerAttributes.name, + endDate: parseInt(schedulerAttributes.endDate) || null, + tz: schedulerAttributes.tz || null, + pattern: schedulerAttributes.pattern || null, + every: schedulerAttributes.every || null, + ...(schedulerAttributes.data + ? { + template: { + data: JSON.parse(schedulerAttributes.data || '{}'), + opts: optsFromJSON(schedulerAttributes.opts), + }, + } + : {}), }; } } diff --git a/src/classes/queue.ts b/src/classes/queue.ts index f4bd88c9ed..4b9ef454ce 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -14,7 +14,7 @@ import { QueueGetters } from './queue-getters'; import { Repeat } from './repeat'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; -import { JobScheduler } from './job-scheduler'; +import { JobScheduler, JobSchedulerJson } from './job-scheduler'; import { version } from '../version'; import { optsFromJSON } from '../utils'; @@ -584,34 +584,10 @@ export class Queue< * * @param id - identifier of scheduler. */ - async getJobScheduler(id: string): Promise { + async getJobScheduler(id: string): Promise> { return (await this.jobScheduler).getJobScheduler(id); } - /** - * Get Job Scheduler template by id - * - * @param id - identifier of scheduler. - */ - async getJobSchedulerTemplate( - id: string, - ): Promise<{ data: DataType; opts: JobsOptions }> { - const client = await this.client; - - const [templateData, templateOpts] = await client.hmget( - `${this.keys.repeat}:${id}`, - 'data', - 'opts', - ); - - const data = JSON.parse(templateData || '{}'); - const opts = optsFromJSON(templateOpts); - return { - data, - opts, - }; - } - /** * Get all Job Schedulers * diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 0db1f98120..dae92f9cf1 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -360,15 +360,14 @@ describe('Job Scheduler', function () { tz: null, pattern: '*/2 * * * * *', every: null, + template: { + data: { + foo: 'bar', + }, + opts: {}, + }, }); - const { data, opts } = await queue.getJobSchedulerTemplate('test'); - - expect(data).to.deep.equal({ - foo: 'bar', - }); - expect(opts).to.deep.equal({}); - this.clock.tick(nextTick); let prev: any; From a660b1e9067b1283480b1ea6a6a88f58e61641f1 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 26 Nov 2024 21:34:41 -0600 Subject: [PATCH 8/9] chore: remove unused method --- src/classes/queue.ts | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 4b9ef454ce..8e8126343b 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -241,17 +241,6 @@ export class Queue< return await client.hget(this.keys.meta, 'version'); } - protected createJob( - data: JobJsonRaw, - jobId: string, - ): Job { - return this.Job.fromJSON(this as MinimalQueue, data, jobId) as Job< - DataType, - ResultType, - NameType - >; - } - get repeat(): Promise { return new Promise(async resolve => { if (!this._repeat) { From 9d4779e09d1320f63ab5179a3af24a11efe0a123 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 2 Dec 2024 11:56:27 -0600 Subject: [PATCH 9/9] refactor: save template attributes if necessary --- src/classes/job-scheduler.ts | 41 ++++++++++++++-------------- src/classes/queue.ts | 5 ++-- src/commands/addJobScheduler-2.lua | 14 ++++++++-- src/interfaces/index.ts | 1 + src/interfaces/job-scheduler-json.ts | 18 ++++++++++++ tests/test_job_scheduler.ts | 12 +++++++- 6 files changed, 65 insertions(+), 26 deletions(-) create mode 100644 src/interfaces/job-scheduler-json.ts diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index d473fdc73b..344c96014e 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -2,26 +2,12 @@ import { parseExpression } from 'cron-parser'; import { RedisClient, RepeatBaseOptions, RepeatOptions } from '../interfaces'; import { JobsOptions, RepeatStrategy } from '../types'; import { Job } from './job'; +import { JobSchedulerJson, JobSchedulerTemplateJson } from '../interfaces'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; import { optsAsJSON, optsFromJSON } from '../utils'; -export interface JobSchedulerJson { - key: string; // key is actually the job scheduler id - name: string; - id?: string | null; - endDate: number | null; - tz: string | null; - pattern: string | null; - every?: string | null; - next?: number; - template?: { - data: D; - opts: JobsOptions; - }; -} - export class JobScheduler extends QueueBase { private repeatStrategy: RepeatStrategy; @@ -262,18 +248,33 @@ export class JobScheduler extends QueueBase { tz: schedulerAttributes.tz || null, pattern: schedulerAttributes.pattern || null, every: schedulerAttributes.every || null, - ...(schedulerAttributes.data + ...(schedulerAttributes.data || schedulerAttributes.opts ? { - template: { - data: JSON.parse(schedulerAttributes.data || '{}'), - opts: optsFromJSON(schedulerAttributes.opts), - }, + template: this.getTemplateFromJSON( + schedulerAttributes.data, + schedulerAttributes.opts, + ), } : {}), }; } } + private getTemplateFromJSON( + rawData?: string, + rawOpts?: string, + ): JobSchedulerTemplateJson { + console.log(typeof rawOpts); + const template: JobSchedulerTemplateJson = {}; + if (rawData) { + template.data = JSON.parse(rawData); + } + if (rawOpts) { + template.opts = optsFromJSON(rawOpts); + } + return template; + } + async getJobSchedulers( start = 0, end = -1, diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 8e8126343b..3afa5691c0 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -3,7 +3,7 @@ import { BaseJobOptions, BulkJobOptions, IoredisListener, - JobJsonRaw, + JobSchedulerJson, QueueOptions, RepeatableJob, RepeatOptions, @@ -14,9 +14,8 @@ import { QueueGetters } from './queue-getters'; import { Repeat } from './repeat'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; -import { JobScheduler, JobSchedulerJson } from './job-scheduler'; +import { JobScheduler } from './job-scheduler'; import { version } from '../version'; -import { optsFromJSON } from '../utils'; export interface ObliterateOpts { /** diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-2.lua index 69ee57d63d..c5a84a641c 100644 --- a/src/commands/addJobScheduler-2.lua +++ b/src/commands/addJobScheduler-2.lua @@ -58,8 +58,18 @@ local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, t end local jsonTemplateOpts = cjson.encode(templateOpts) - rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'], "data", templateData, - "opts", jsonTemplateOpts, unpack(optionalValues)) + if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then + table.insert(optionalValues, "opts") + table.insert(optionalValues, jsonTemplateOpts) + end + + if templateData and templateData ~= '{}' then + table.insert(optionalValues, "data") + table.insert(optionalValues, templateData) + end + + rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'], + unpack(optionalValues)) end -- If we are overriding a repeatable job we must delete the delayed job for diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index bf8a6ac0be..44a4945006 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -7,6 +7,7 @@ export * from './debounce-options'; export * from './flow-job'; export * from './ioredis-events'; export * from './job-json'; +export * from './job-scheduler-json'; export * from './keep-jobs'; export * from './metrics-options'; export * from './metrics'; diff --git a/src/interfaces/job-scheduler-json.ts b/src/interfaces/job-scheduler-json.ts new file mode 100644 index 0000000000..90796f5dd5 --- /dev/null +++ b/src/interfaces/job-scheduler-json.ts @@ -0,0 +1,18 @@ +import { JobsOptions } from '../types'; + +export interface JobSchedulerTemplateJson { + data?: D; + opts?: Omit; +} + +export interface JobSchedulerJson { + key: string; // key is actually the job scheduler id + name: string; + id?: string | null; + endDate: number | null; + tz: string | null; + pattern: string | null; + every?: string | null; + next?: number; + template?: JobSchedulerTemplateJson; +} diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index dae92f9cf1..f0f3244af8 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -364,7 +364,6 @@ describe('Job Scheduler', function () { data: { foo: 'bar', }, - opts: {}, }, }); @@ -693,6 +692,17 @@ describe('Job Scheduler', function () { name: 'rrule', }); + const scheduler = await queue.getJobScheduler('rrule'); + + expect(scheduler).to.deep.equal({ + key: 'rrule', + name: 'rrule', + endDate: null, + tz: null, + pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO', + every: null, + }); + this.clock.tick(nextTick); let prev: any;