Skip to content

Commit

Permalink
refactor(scheduler): save template data in scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Nov 27, 2024
1 parent 16a8475 commit be863bd
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 112 deletions.
7 changes: 3 additions & 4 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Job } from './job';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';
import { optsAsJSON, removeUndefinedFields } from '../utils';

export interface JobSchedulerJson {
key: string; // key is actually the job scheduler id
Expand Down Expand Up @@ -103,6 +104,8 @@ export class JobScheduler extends QueueBase {
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
JSON.stringify(typeof jobData === 'undefined' ? {} : jobData),
optsAsJSON(opts),
{
name: jobName,
endDate: endDate ? new Date(endDate).getTime() : undefined,
Expand Down Expand Up @@ -257,10 +260,6 @@ export class JobScheduler extends QueueBase {
}
}

async getJobTemplate(schedulerId: string): Promise<any[]> {
return this.scripts.getJobSchedulerTemplate(schedulerId);
}

async getJobSchedulers(
start = 0,
end = -1,
Expand Down
45 changes: 4 additions & 41 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import {
parseObjectValues,
tryCatch,
removeUndefinedFields,
optsAsJSON,
optsFromJSON,
} from '../utils';
import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
Expand Down Expand Up @@ -324,7 +326,7 @@ export class Job<
jobId?: string,
): Job<T, R, N> {
const data = JSON.parse(json.data || '{}');
const opts = Job.optsFromJSON(json.opts);
const opts = optsFromJSON(json.opts);

const job = new this<T, R, N>(
queue,
Expand Down Expand Up @@ -388,27 +390,6 @@ export class Job<
this.scripts = new Scripts(this.queue);
}

static optsFromJSON(rawOpts?: string): JobsOptions {
const opts = JSON.parse(rawOpts || '{}');

const optionEntries = Object.entries(opts) as Array<
[keyof RedisJobOptions, any]
>;

const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if ((optsDecodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsDecodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}

return options as JobsOptions;
}

/**
* Fetches a Job from the queue given the passed job id.
*
Expand Down Expand Up @@ -469,7 +450,7 @@ export class Job<
id: this.id,
name: this.name,
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
opts: removeUndefinedFields<RedisJobOptions>(this.optsAsJSON(this.opts)),
opts: removeUndefinedFields<RedisJobOptions>(optsAsJSON(this.opts)),
parent: this.parent ? { ...this.parent } : undefined,
parentKey: this.parentKey,
progress: this.progress,
Expand All @@ -487,24 +468,6 @@ export class Job<
});
}

private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions {
const optionEntries = Object.entries(opts) as Array<
[keyof JobsOptions, any]
>;
const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if ((optsEncodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsEncodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}

return options as RedisJobOptions;
}

/**
* Prepares a job to be passed to Sandbox.
* @returns
Expand Down
14 changes: 10 additions & 4 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';
import { JobScheduler } from './job-scheduler';
import { version } from '../version';
import { optsFromJSON } from '../utils';

export interface ObliterateOpts {
/**
Expand Down Expand Up @@ -595,11 +596,16 @@ export class Queue<
async getJobSchedulerTemplate(
id: string,
): Promise<{ data: DataType; opts: JobsOptions }> {
const jobScheduler = await this.jobScheduler;
const [jobData] = await jobScheduler.getJobTemplate(id);
const client = await this.client;

const [templateData, templateOpts] = await client.hmget(
`${this.keys.repeat}:${id}`,
'data',
'opts',
);

const data = JSON.parse(jobData.data || '{}');
const opts = this.Job.optsFromJSON(jobData.opts);
const data = JSON.parse(templateData || '{}');
const opts = optsFromJSON(templateOpts);
return {
data,
opts,
Expand Down
35 changes: 11 additions & 24 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,29 +267,6 @@ export class Scripts {
return this.execCommand(client, 'pause', args);
}

protected getJobSchedulerTemplateArgs(id: string) {
const queueKeys = this.queue.keys;
const keys: string[] = [queueKeys.repeat, queueKeys['']];

const args = [id];

return keys.concat(args);
}

async getJobSchedulerTemplate(id: string): Promise<any[]> {
const client = await this.queue.client;

const args = this.getJobSchedulerTemplateArgs(id);

const result = await this.execCommand(
client,
'getJobSchedulerTemplate',
args,
);

return raw2NextJobData(result);
}

protected addRepeatableJobArgs(
customKey: string,
nextMillis: number,
Expand Down Expand Up @@ -334,6 +311,8 @@ export class Scripts {
client: RedisClient,
jobSchedulerId: string,
nextMillis: number,
templateData: string,
templateOpts: RedisJobOptions,
opts: RepeatableOptions,
): Promise<string> {
const queueKeys = this.queue.keys;
Expand All @@ -342,7 +321,15 @@ export class Scripts {
queueKeys.repeat,
queueKeys.delayed,
];
const args = [nextMillis, pack(opts), jobSchedulerId, queueKeys['']];

const args = [
nextMillis,
pack(opts),
jobSchedulerId,
templateData,
pack(templateOpts),
queueKeys[''],
];
return this.execCommand(client, 'addJobScheduler', keys.concat(args));
}

Expand Down
18 changes: 11 additions & 7 deletions src/commands/addJobScheduler-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
[4] endDate?
[5] every?
ARGV[3] jobs scheduler id
ARGV[4] prefix key
ARGV[4] Json stringified template data
ARGV[5] mspacked template opts
ARGV[6] prefix key
Output:
repeatableKey - OK
Expand All @@ -24,13 +26,14 @@ local delayedKey = KEYS[2]

local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[3]
local prefixKey = ARGV[4]
local templateOpts = cmsgpack.unpack(ARGV[5])
local prefixKey = ARGV[6]

-- Includes
--- @include "includes/removeJob"

local function storeRepeatableJob(repeatKey, nextMillis, rawOpts)
rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts)
rcall("ZADD", repeatKey, nextMillis, schedulerId)
local opts = cmsgpack.unpack(rawOpts)

local optionalValues = {}
Expand All @@ -54,8 +57,9 @@ local function storeRepeatableJob(repeatKey, nextMillis, rawOpts)
table.insert(optionalValues, opts['every'])
end

rcall("HMSET", repeatKey .. ":" .. jobSchedulerId, "name", opts['name'],
unpack(optionalValues))
local jsonTemplateOpts = cjson.encode(templateOpts)
rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'], "data", templateData,
"opts", jsonTemplateOpts, unpack(optionalValues))
end

-- If we are overriding a repeatable job we must delete the delayed job for
Expand All @@ -72,4 +76,4 @@ if prevMillis ~= false then
end
end

return storeRepeatableJob(repeatKey, nextMillis, ARGV[2])
return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts)
21 changes: 0 additions & 21 deletions src/commands/getJobSchedulerTemplate-2.lua

This file was deleted.

52 changes: 52 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { EventEmitter } from 'events';
import * as semver from 'semver';

import { SpanKind, TelemetryAttributes } from './enums';
import { JobsOptions, RedisJobOptions } from './types';

export const errorObject: { [index: string]: any } = { value: null };

Expand Down Expand Up @@ -270,6 +271,57 @@ export const toString = (value: any): string => {

export const QUEUE_EVENT_SUFFIX = ':qe';

const optsDecodeMap = {
de: 'deduplication',
fpof: 'failParentOnFailure',
idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
rdof: 'removeDependencyOnFailure',
tm: 'telemetryMetadata',
};

const optsEncodeMap = invertObject(optsDecodeMap);
optsEncodeMap.debounce = 'de';

export function optsAsJSON(opts: JobsOptions = {}): RedisJobOptions {
const optionEntries = Object.entries(opts) as Array<[keyof JobsOptions, any]>;
const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if (value !== undefined) {
if ((optsEncodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsEncodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}
}

return options as RedisJobOptions;
}

export function optsFromJSON(rawOpts?: string): JobsOptions {
const opts = JSON.parse(rawOpts || '{}');

const optionEntries = Object.entries(opts) as Array<
[keyof RedisJobOptions, any]
>;

const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if ((optsDecodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsDecodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}

return options as JobsOptions;
}

export function removeUndefinedFields<T extends Record<string, any>>(
obj: Record<string, any>,
) {
Expand Down
12 changes: 1 addition & 11 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,17 +364,7 @@ describe('Job Scheduler', function () {
expect(data).to.deep.equal({
foo: 'bar',
});
expect(opts).to.deep.equal({
attempts: 0,
delay: 2000,
jobId: 'repeat:test:1486481042000',
prevMillis: 1486481042000,
repeat: {
count: 1,
pattern: '*/2 * * * * *',
},
timestamp: 1486481040000,
});
expect(opts).to.deep.equal({});

this.clock.tick(nextTick);

Expand Down

0 comments on commit be863bd

Please sign in to comment.