diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index e873a84e59..fc38f0fa86 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -5,6 +5,7 @@ import { Job } from './job'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; +import { optsAsJSON, removeUndefinedFields } from '../utils'; export interface JobSchedulerJson { key: string; // key is actually the job scheduler id @@ -103,6 +104,8 @@ export class JobScheduler extends QueueBase { (multi) as RedisClient, jobSchedulerId, nextMillis, + JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), + optsAsJSON(opts), { name: jobName, endDate: endDate ? new Date(endDate).getTime() : undefined, @@ -257,10 +260,6 @@ export class JobScheduler extends QueueBase { } } - async getJobTemplate(schedulerId: string): Promise { - return this.scripts.getJobSchedulerTemplate(schedulerId); - } - async getJobSchedulers( start = 0, end = -1, diff --git a/src/classes/job.ts b/src/classes/job.ts index 74487bf2e1..d0ced3c6a0 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -30,6 +30,8 @@ import { parseObjectValues, tryCatch, removeUndefinedFields, + optsAsJSON, + optsFromJSON, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -324,7 +326,7 @@ export class Job< jobId?: string, ): Job { const data = JSON.parse(json.data || '{}'); - const opts = Job.optsFromJSON(json.opts); + const opts = optsFromJSON(json.opts); const job = new this( queue, @@ -388,27 +390,6 @@ export class Job< this.scripts = new Scripts(this.queue); } - static optsFromJSON(rawOpts?: string): JobsOptions { - const opts = JSON.parse(rawOpts || '{}'); - - const optionEntries = Object.entries(opts) as Array< - [keyof RedisJobOptions, any] - >; - - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if ((optsDecodeMap as Record)[attributeName]) { - options[(optsDecodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - - return options as JobsOptions; - } - /** * Fetches a Job from the queue given the passed job id. * @@ -469,7 +450,7 @@ export class Job< id: this.id, name: this.name, data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data), - opts: removeUndefinedFields(this.optsAsJSON(this.opts)), + opts: removeUndefinedFields(optsAsJSON(this.opts)), parent: this.parent ? { ...this.parent } : undefined, parentKey: this.parentKey, progress: this.progress, @@ -487,24 +468,6 @@ export class Job< }); } - private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { - const optionEntries = Object.entries(opts) as Array< - [keyof JobsOptions, any] - >; - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if ((optsEncodeMap as Record)[attributeName]) { - options[(optsEncodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - - return options as RedisJobOptions; - } - /** * Prepares a job to be passed to Sandbox. * @returns diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 211c632177..2f34c22139 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -16,6 +16,7 @@ import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; import { JobScheduler } from './job-scheduler'; import { version } from '../version'; +import { optsFromJSON } from '../utils'; export interface ObliterateOpts { /** @@ -595,11 +596,16 @@ export class Queue< async getJobSchedulerTemplate( id: string, ): Promise<{ data: DataType; opts: JobsOptions }> { - const jobScheduler = await this.jobScheduler; - const [jobData] = await jobScheduler.getJobTemplate(id); + const client = await this.client; + + const [templateData, templateOpts] = await client.hmget( + `${this.keys.repeat}:${id}`, + 'data', + 'opts', + ); - const data = JSON.parse(jobData.data || '{}'); - const opts = this.Job.optsFromJSON(jobData.opts); + const data = JSON.parse(templateData || '{}'); + const opts = optsFromJSON(templateOpts); return { data, opts, diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index d3b7f800f0..0897d905a9 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -267,29 +267,6 @@ export class Scripts { return this.execCommand(client, 'pause', args); } - protected getJobSchedulerTemplateArgs(id: string) { - const queueKeys = this.queue.keys; - const keys: string[] = [queueKeys.repeat, queueKeys['']]; - - const args = [id]; - - return keys.concat(args); - } - - async getJobSchedulerTemplate(id: string): Promise { - const client = await this.queue.client; - - const args = this.getJobSchedulerTemplateArgs(id); - - const result = await this.execCommand( - client, - 'getJobSchedulerTemplate', - args, - ); - - return raw2NextJobData(result); - } - protected addRepeatableJobArgs( customKey: string, nextMillis: number, @@ -334,6 +311,8 @@ export class Scripts { client: RedisClient, jobSchedulerId: string, nextMillis: number, + templateData: string, + templateOpts: RedisJobOptions, opts: RepeatableOptions, ): Promise { const queueKeys = this.queue.keys; @@ -342,7 +321,15 @@ export class Scripts { queueKeys.repeat, queueKeys.delayed, ]; - const args = [nextMillis, pack(opts), jobSchedulerId, queueKeys['']]; + + const args = [ + nextMillis, + pack(opts), + jobSchedulerId, + templateData, + pack(templateOpts), + queueKeys[''], + ]; return this.execCommand(client, 'addJobScheduler', keys.concat(args)); } diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-2.lua index 4583e6b223..bf4eed0c7d 100644 --- a/src/commands/addJobScheduler-2.lua +++ b/src/commands/addJobScheduler-2.lua @@ -13,7 +13,9 @@ [4] endDate? [5] every? ARGV[3] jobs scheduler id - ARGV[4] prefix key + ARGV[4] Json stringified template data + ARGV[5] mspacked template opts + ARGV[6] prefix key Output: repeatableKey - OK @@ -24,13 +26,14 @@ local delayedKey = KEYS[2] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[3] -local prefixKey = ARGV[4] +local templateOpts = cmsgpack.unpack(ARGV[5]) +local prefixKey = ARGV[6] -- Includes --- @include "includes/removeJob" -local function storeRepeatableJob(repeatKey, nextMillis, rawOpts) - rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) +local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts) + rcall("ZADD", repeatKey, nextMillis, schedulerId) local opts = cmsgpack.unpack(rawOpts) local optionalValues = {} @@ -54,8 +57,9 @@ local function storeRepeatableJob(repeatKey, nextMillis, rawOpts) table.insert(optionalValues, opts['every']) end - rcall("HMSET", repeatKey .. ":" .. jobSchedulerId, "name", opts['name'], - unpack(optionalValues)) + local jsonTemplateOpts = cjson.encode(templateOpts) + rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'], "data", templateData, + "opts", jsonTemplateOpts, unpack(optionalValues)) end -- If we are overriding a repeatable job we must delete the delayed job for @@ -72,4 +76,4 @@ if prevMillis ~= false then end end -return storeRepeatableJob(repeatKey, nextMillis, ARGV[2]) +return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts) diff --git a/src/commands/getJobSchedulerTemplate-2.lua b/src/commands/getJobSchedulerTemplate-2.lua deleted file mode 100644 index f978330278..0000000000 --- a/src/commands/getJobSchedulerTemplate-2.lua +++ /dev/null @@ -1,21 +0,0 @@ - ---[[ - Get job scheduler template. Taking the last iterations job's data and options - TODO: return a stored template in the job scheduler itself - Input: - KEYS[1] repeat key - KEYS[2] prefix key - - ARGV[1] job scheduler id -]] -local rcall = redis.call - -local millis = rcall("ZSCORE", KEYS[1], ARGV[1]) - -if millis ~= false then - local templateJobId = "repeat:" .. ARGV[1] .. ":" .. millis - - return {rcall("HGETALL", KEYS[2] .. templateJobId)} -- get job data -end - -return {0, 0} \ No newline at end of file diff --git a/src/utils.ts b/src/utils.ts index 4efa2e3467..1bf2eac69c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -17,6 +17,7 @@ import { EventEmitter } from 'events'; import * as semver from 'semver'; import { SpanKind, TelemetryAttributes } from './enums'; +import { JobsOptions, RedisJobOptions } from './types'; export const errorObject: { [index: string]: any } = { value: null }; @@ -270,6 +271,57 @@ export const toString = (value: any): string => { export const QUEUE_EVENT_SUFFIX = ':qe'; +const optsDecodeMap = { + de: 'deduplication', + fpof: 'failParentOnFailure', + idof: 'ignoreDependencyOnFailure', + kl: 'keepLogs', + rdof: 'removeDependencyOnFailure', + tm: 'telemetryMetadata', +}; + +const optsEncodeMap = invertObject(optsDecodeMap); +optsEncodeMap.debounce = 'de'; + +export function optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { + const optionEntries = Object.entries(opts) as Array<[keyof JobsOptions, any]>; + const options: Partial> = {}; + for (const item of optionEntries) { + const [attributeName, value] = item; + if (value !== undefined) { + if ((optsEncodeMap as Record)[attributeName]) { + options[(optsEncodeMap as Record)[attributeName]] = + value; + } else { + options[attributeName] = value; + } + } + } + + return options as RedisJobOptions; +} + +export function optsFromJSON(rawOpts?: string): JobsOptions { + const opts = JSON.parse(rawOpts || '{}'); + + const optionEntries = Object.entries(opts) as Array< + [keyof RedisJobOptions, any] + >; + + const options: Partial> = {}; + for (const item of optionEntries) { + const [attributeName, value] = item; + if ((optsDecodeMap as Record)[attributeName]) { + options[(optsDecodeMap as Record)[attributeName]] = + value; + } else { + options[attributeName] = value; + } + } + + return options as JobsOptions; +} + export function removeUndefinedFields>( obj: Record, ) { diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 6554a19954..bd2d616a02 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -364,17 +364,7 @@ describe('Job Scheduler', function () { expect(data).to.deep.equal({ foo: 'bar', }); - expect(opts).to.deep.equal({ - attempts: 0, - delay: 2000, - jobId: 'repeat:test:1486481042000', - prevMillis: 1486481042000, - repeat: { - count: 1, - pattern: '*/2 * * * * *', - }, - timestamp: 1486481040000, - }); + expect(opts).to.deep.equal({}); this.clock.tick(nextTick);