Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): enhance getJobScheduler method to include template information #2929

Merged
merged 9 commits into from
Dec 2, 2024
56 changes: 37 additions & 19 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,11 @@ import { parseExpression } from 'cron-parser';
import { RedisClient, RepeatBaseOptions, RepeatOptions } from '../interfaces';
import { JobsOptions, RepeatStrategy } from '../types';
import { Job } from './job';
import { JobSchedulerJson, JobSchedulerTemplateJson } from '../interfaces';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';

export interface JobSchedulerJson {
key: string; // key is actually the job scheduler id
name: string;
id?: string | null;
endDate: number | null;
tz: string | null;
pattern: string | null;
every?: string | null;
next?: number;
}
import { optsAsJSON, optsFromJSON } from '../utils';

export class JobScheduler extends QueueBase {
private repeatStrategy: RepeatStrategy;
Expand Down Expand Up @@ -103,6 +94,8 @@ export class JobScheduler extends QueueBase {
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
JSON.stringify(typeof jobData === 'undefined' ? {} : jobData),
optsAsJSON(opts),
{
name: jobName,
endDate: endDate ? new Date(endDate).getTime() : undefined,
Expand Down Expand Up @@ -241,22 +234,47 @@ export class JobScheduler extends QueueBase {
};
}

async getJobScheduler(id: string): Promise<JobSchedulerJson> {
async getJobScheduler<D = any>(id: string): Promise<JobSchedulerJson<D>> {
const client = await this.client;
const jobData = await client.hgetall(this.toKey('repeat:' + id));
const schedulerAttributes = await client.hgetall(
this.toKey('repeat:' + id),
);

if (jobData) {
if (schedulerAttributes) {
return {
key: id,
name: jobData.name,
endDate: parseInt(jobData.endDate) || null,
tz: jobData.tz || null,
pattern: jobData.pattern || null,
every: jobData.every || null,
name: schedulerAttributes.name,
endDate: parseInt(schedulerAttributes.endDate) || null,
tz: schedulerAttributes.tz || null,
pattern: schedulerAttributes.pattern || null,
every: schedulerAttributes.every || null,
...(schedulerAttributes.data || schedulerAttributes.opts
? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are checking if data is defined here, why having a default in the next line?:
data: JSON.parse(schedulerAttributes.data || '{}')
Also, could it potentially be the case that you have an empty data but options?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I refactored a little bit to not save this data if it's empty, same for template opts

template: this.getTemplateFromJSON<D>(
schedulerAttributes.data,
schedulerAttributes.opts,
),
}
: {}),
};
}
}

private getTemplateFromJSON<D = any>(
rawData?: string,
rawOpts?: string,
): JobSchedulerTemplateJson<D> {
console.log(typeof rawOpts);
const template: JobSchedulerTemplateJson<D> = {};
if (rawData) {
template.data = JSON.parse(rawData);
}
if (rawOpts) {
template.opts = optsFromJSON(rawOpts);
}
return template;
}

async getJobSchedulers(
start = 0,
end = -1,
Expand Down
45 changes: 4 additions & 41 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import {
parseObjectValues,
tryCatch,
removeUndefinedFields,
optsAsJSON,
optsFromJSON,
} from '../utils';
import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
Expand Down Expand Up @@ -324,7 +326,7 @@ export class Job<
jobId?: string,
): Job<T, R, N> {
const data = JSON.parse(json.data || '{}');
const opts = Job.optsFromJSON(json.opts);
const opts = optsFromJSON(json.opts);

const job = new this<T, R, N>(
queue,
Expand Down Expand Up @@ -388,27 +390,6 @@ export class Job<
this.scripts = new Scripts(this.queue);
}

private static optsFromJSON(rawOpts?: string): JobsOptions {
const opts = JSON.parse(rawOpts || '{}');

const optionEntries = Object.entries(opts) as Array<
[keyof RedisJobOptions, any]
>;

const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if ((optsDecodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsDecodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}

return options as JobsOptions;
}

/**
* Fetches a Job from the queue given the passed job id.
*
Expand Down Expand Up @@ -469,7 +450,7 @@ export class Job<
id: this.id,
name: this.name,
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
opts: removeUndefinedFields<RedisJobOptions>(this.optsAsJSON(this.opts)),
opts: optsAsJSON(this.opts),
parent: this.parent ? { ...this.parent } : undefined,
parentKey: this.parentKey,
progress: this.progress,
Expand All @@ -487,24 +468,6 @@ export class Job<
});
}

private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions {
const optionEntries = Object.entries(opts) as Array<
[keyof JobsOptions, any]
>;
const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if ((optsEncodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsEncodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}

return options as RedisJobOptions;
}

/**
* Prepares a job to be passed to Sandbox.
* @returns
Expand Down
3 changes: 2 additions & 1 deletion src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
BaseJobOptions,
BulkJobOptions,
IoredisListener,
JobSchedulerJson,
QueueOptions,
RepeatableJob,
RepeatOptions,
Expand Down Expand Up @@ -571,7 +572,7 @@ export class Queue<
*
* @param id - identifier of scheduler.
*/
async getJobScheduler(id: string): Promise<RepeatableJob> {
async getJobScheduler(id: string): Promise<JobSchedulerJson<DataType>> {
return (await this.jobScheduler).getJobScheduler(id);
}

Expand Down
12 changes: 11 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ export class Scripts {
client: RedisClient,
jobSchedulerId: string,
nextMillis: number,
templateData: string,
templateOpts: RedisJobOptions,
opts: RepeatableOptions,
): Promise<string> {
const queueKeys = this.queue.keys;
Expand All @@ -319,7 +321,15 @@ export class Scripts {
queueKeys.repeat,
queueKeys.delayed,
];
const args = [nextMillis, pack(opts), jobSchedulerId, queueKeys['']];

const args = [
nextMillis,
pack(opts),
jobSchedulerId,
templateData,
pack(templateOpts),
queueKeys[''],
];
return this.execCommand(client, 'addJobScheduler', keys.concat(args));
}

Expand Down
26 changes: 20 additions & 6 deletions src/commands/addJobScheduler-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
[4] endDate?
[5] every?
ARGV[3] jobs scheduler id
ARGV[4] prefix key
ARGV[4] Json stringified template data
ARGV[5] mspacked template opts
ARGV[6] prefix key

Output:
repeatableKey - OK
Expand All @@ -24,13 +26,14 @@ local delayedKey = KEYS[2]

local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[3]
local prefixKey = ARGV[4]
local templateOpts = cmsgpack.unpack(ARGV[5])
local prefixKey = ARGV[6]

-- Includes
--- @include "includes/removeJob"

local function storeRepeatableJob(repeatKey, nextMillis, rawOpts)
rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts)
rcall("ZADD", repeatKey, nextMillis, schedulerId)
local opts = cmsgpack.unpack(rawOpts)

local optionalValues = {}
Expand All @@ -54,7 +57,18 @@ local function storeRepeatableJob(repeatKey, nextMillis, rawOpts)
table.insert(optionalValues, opts['every'])
end

rcall("HMSET", repeatKey .. ":" .. jobSchedulerId, "name", opts['name'],
local jsonTemplateOpts = cjson.encode(templateOpts)
if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then
table.insert(optionalValues, "opts")
table.insert(optionalValues, jsonTemplateOpts)
end

if templateData and templateData ~= '{}' then
table.insert(optionalValues, "data")
table.insert(optionalValues, templateData)
end

rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'],
unpack(optionalValues))
end

Expand All @@ -74,4 +88,4 @@ if prevMillis ~= false then
end
end

return storeRepeatableJob(repeatKey, nextMillis, ARGV[2])
return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts)
1 change: 1 addition & 0 deletions src/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export * from './debounce-options';
export * from './flow-job';
export * from './ioredis-events';
export * from './job-json';
export * from './job-scheduler-json';
export * from './keep-jobs';
export * from './metrics-options';
export * from './metrics';
Expand Down
18 changes: 18 additions & 0 deletions src/interfaces/job-scheduler-json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { JobsOptions } from '../types';

export interface JobSchedulerTemplateJson<D = any> {
data?: D;
opts?: Omit<JobsOptions, 'jobId' | 'repeat' | 'delay'>;
}

export interface JobSchedulerJson<D = any> {
key: string; // key is actually the job scheduler id
name: string;
id?: string | null;
endDate: number | null;
tz: string | null;
pattern: string | null;
every?: string | null;
next?: number;
template?: JobSchedulerTemplateJson<D>;
}
52 changes: 52 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { EventEmitter } from 'events';
import * as semver from 'semver';

import { SpanKind, TelemetryAttributes } from './enums';
import { JobsOptions, RedisJobOptions } from './types';

export const errorObject: { [index: string]: any } = { value: null };

Expand Down Expand Up @@ -270,6 +271,57 @@ export const toString = (value: any): string => {

export const QUEUE_EVENT_SUFFIX = ':qe';

const optsDecodeMap = {
de: 'deduplication',
fpof: 'failParentOnFailure',
idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
rdof: 'removeDependencyOnFailure',
tm: 'telemetryMetadata',
};

const optsEncodeMap = invertObject(optsDecodeMap);
optsEncodeMap.debounce = 'de';

export function optsAsJSON(opts: JobsOptions = {}): RedisJobOptions {
const optionEntries = Object.entries(opts) as Array<[keyof JobsOptions, any]>;
const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if (value !== undefined) {
if ((optsEncodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsEncodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}
}

return options as RedisJobOptions;
}

export function optsFromJSON(rawOpts?: string): JobsOptions {
const opts = JSON.parse(rawOpts || '{}');

const optionEntries = Object.entries(opts) as Array<
[keyof RedisJobOptions, any]
>;

const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if ((optsDecodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsDecodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}

return options as JobsOptions;
}

export function removeUndefinedFields<T extends Record<string, any>>(
obj: Record<string, any>,
) {
Expand Down
16 changes: 16 additions & 0 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ describe('Job Scheduler', function () {
tz: null,
pattern: '*/2 * * * * *',
every: null,
template: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add cases where data / opts are undefined?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed as well

data: {
foo: 'bar',
},
},
});

this.clock.tick(nextTick);
Expand Down Expand Up @@ -687,6 +692,17 @@ describe('Job Scheduler', function () {
name: 'rrule',
});

const scheduler = await queue.getJobScheduler('rrule');

expect(scheduler).to.deep.equal({
key: 'rrule',
name: 'rrule',
endDate: null,
tz: null,
pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO',
every: null,
});

this.clock.tick(nextTick);

let prev: any;
Expand Down
Loading