Skip to content

Commit

Permalink
feat(queue): enhance getJobSchedulers method to include template info…
Browse files Browse the repository at this point in the history
…rmation (#2956) ref #2875
  • Loading branch information
roggervalf authored Dec 8, 2024
1 parent 6b5c3de commit 5b005cd
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 70 deletions.
55 changes: 26 additions & 29 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Job } from './job';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';
import { optsAsJSON, optsFromJSON } from '../utils';
import { array2obj, optsAsJSON, optsFromJSON } from '../utils';

export class JobScheduler extends QueueBase {
private repeatStrategy: RepeatStrategy;
Expand Down Expand Up @@ -202,13 +202,21 @@ export class JobScheduler extends QueueBase {
return this.scripts.removeJobScheduler(jobSchedulerId);
}

private async getSchedulerData(
private async getSchedulerData<D>(
client: RedisClient,
key: string,
next?: number,
): Promise<JobSchedulerJson> {
): Promise<JobSchedulerJson<D>> {
const jobData = await client.hgetall(this.toKey('repeat:' + key));

return this.transformSchedulerData<D>(key, jobData, next);
}

private async transformSchedulerData<D>(
key: string,
jobData: any,
next?: number,
): Promise<JobSchedulerJson<D>> {
if (jobData) {
return {
key,
Expand All @@ -217,6 +225,11 @@ export class JobScheduler extends QueueBase {
tz: jobData.tz || null,
pattern: jobData.pattern || null,
every: jobData.every || null,
...(jobData.data || jobData.opts
? {
template: this.getTemplateFromJSON<D>(jobData.data, jobData.opts),
}
: {}),
next,
};
}
Expand All @@ -239,30 +252,14 @@ export class JobScheduler extends QueueBase {
};
}

async getJobScheduler<D = any>(id: string): Promise<JobSchedulerJson<D>> {
const client = await this.client;
const schedulerAttributes = await client.hgetall(
this.toKey('repeat:' + id),
);
async getScheduler<D = any>(id: string): Promise<JobSchedulerJson<D>> {
const [rawJobData, next] = await this.scripts.getJobScheduler(id);

if (schedulerAttributes) {
return {
key: id,
name: schedulerAttributes.name,
endDate: parseInt(schedulerAttributes.endDate) || null,
tz: schedulerAttributes.tz || null,
pattern: schedulerAttributes.pattern || null,
every: schedulerAttributes.every || null,
...(schedulerAttributes.data || schedulerAttributes.opts
? {
template: this.getTemplateFromJSON<D>(
schedulerAttributes.data,
schedulerAttributes.opts,
),
}
: {}),
};
}
return this.transformSchedulerData<D>(
id,
rawJobData ? array2obj(rawJobData) : null,
next ? parseInt(next) : null,
);
}

private getTemplateFromJSON<D = any>(
Expand All @@ -279,11 +276,11 @@ export class JobScheduler extends QueueBase {
return template;
}

async getJobSchedulers(
async getJobSchedulers<D = any>(
start = 0,
end = -1,
asc = false,
): Promise<JobSchedulerJson[]> {
): Promise<JobSchedulerJson<D>[]> {
const client = await this.client;
const jobSchedulersKey = this.keys.repeat;

Expand All @@ -294,7 +291,7 @@ export class JobScheduler extends QueueBase {
const jobs = [];
for (let i = 0; i < result.length; i += 2) {
jobs.push(
this.getSchedulerData(client, result[i], parseInt(result[i + 1])),
this.getSchedulerData<D>(client, result[i], parseInt(result[i + 1])),
);
}
return Promise.all(jobs);
Expand Down
10 changes: 7 additions & 3 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ export class Queue<
* @param id - identifier of scheduler.
*/
async getJobScheduler(id: string): Promise<JobSchedulerJson<DataType>> {
return (await this.jobScheduler).getJobScheduler(id);
return (await this.jobScheduler).getScheduler<DataType>(id);
}

/**
Expand All @@ -588,8 +588,12 @@ export class Queue<
start?: number,
end?: number,
asc?: boolean,
): Promise<RepeatableJob[]> {
return (await this.jobScheduler).getJobSchedulers(start, end, asc);
): Promise<JobSchedulerJson<DataType>[]> {
return (await this.jobScheduler).getJobSchedulers<DataType>(
start,
end,
asc,
);
}

/**
Expand Down
14 changes: 14 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,20 @@ export class Scripts {
]);
}

getJobSchedulerArgs(id: string): string[] {
const keys: string[] = [this.queue.keys.repeat];

return keys.concat([id]);
}

async getJobScheduler(id: string): Promise<[any, string | null]> {
const client = await this.queue.client;

const args = this.getJobSchedulerArgs(id);

return this.execCommand(client, 'getJobScheduler', args);
}

retryJobArgs(
jobId: string,
lifo: boolean,
Expand Down
19 changes: 19 additions & 0 deletions src/commands/getJobScheduler-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
--[[
Get job scheduler record.
Input:
KEYS[1] 'repeat' key
ARGV[1] id
]]

local rcall = redis.call
local jobSchedulerKey = KEYS[1] .. ":" .. ARGV[1]

local score = rcall("ZSCORE", KEYS[1], ARGV[1])

if score then
return {rcall("HGETALL", jobSchedulerKey), score} -- get job data
end

return {nil, nil}
26 changes: 0 additions & 26 deletions src/commands/updateJobOption-1.lua

This file was deleted.

16 changes: 8 additions & 8 deletions src/interfaces/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ export interface ContextManager<Context = any> {
/**
* Creates a new context and sets it as active for the fn passed as last argument
*
* @param context
* @param fn
* @param context -
* @param fn -
*/
with<A extends (...args: any[]) => any>(
context: Context,
Expand All @@ -54,16 +54,16 @@ export interface ContextManager<Context = any> {
* is the mechanism used to propagate the context across a distributed
* application.
*
* @param context
* @param context -
*/
getMetadata(context: Context): string;

/**
* Creates a new context from a serialized version effectively
* linking the new context to the parent context.
*
* @param activeContext
* @param metadata
* @param activeContext -
* @param metadata -
*/
fromMetadata(activeContext: Context, metadata: string): Context;
}
Expand All @@ -78,9 +78,9 @@ export interface Tracer<Context = any> {
* context. If the context is not provided, the current active context should be
* used.
*
* @param name
* @param options
* @param context
* @param name -
* @param options -
* @param context -
*/
startSpan(name: string, options?: SpanOptions, context?: Context): Span;
}
Expand Down
34 changes: 31 additions & 3 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ describe('Job Scheduler', function () {
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});

const date = new Date('2017-02-07 9:24:00');
const date = new Date('2017-02-07T15:24:00.000Z');
this.clock.setSystemTime(date);

await queue.upsertJobScheduler(
Expand All @@ -360,6 +360,7 @@ describe('Job Scheduler', function () {
tz: null,
pattern: '*/2 * * * * *',
every: null,
next: 1486481042000,
template: {
data: {
foo: 'bar',
Expand Down Expand Up @@ -682,7 +683,7 @@ describe('Job Scheduler', function () {
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});

const date = new Date('2017-02-07 9:24:00');
const date = new Date('2017-02-07T15:24:00.000Z');
this.clock.setSystemTime(date);

const repeat = {
Expand All @@ -698,6 +699,7 @@ describe('Job Scheduler', function () {
key: 'rrule',
name: 'rrule',
endDate: null,
next: 1486481042000,
tz: null,
pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO',
every: null,
Expand Down Expand Up @@ -1424,8 +1426,11 @@ describe('Job Scheduler', function () {

describe('when repeatable job fails', function () {
it('should continue repeating', async function () {
const date = new Date('2017-02-07T15:24:00.000Z');
this.clock.setSystemTime(date);
const repeatOpts = {
pattern: '0 * 1 * *',
tz: 'Asia/Calcutta',
};

const worker = new Worker(
Expand All @@ -1445,7 +1450,11 @@ describe('Job Scheduler', function () {
});
});

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts, {
name: 'a',
data: { foo: 'bar' },
opts: { priority: 1 },
});
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

Expand All @@ -1463,6 +1472,25 @@ describe('Job Scheduler', function () {
const count = await queue.count();
expect(count).to.be.equal(1);
expect(jobSchedulers).to.have.length(1);

expect(jobSchedulers[0]).to.deep.equal({
key: 'test',
name: 'a',
endDate: null,
tz: 'Asia/Calcutta',
pattern: '0 * 1 * *',
every: null,
next: 1488310200000,
template: {
data: {
foo: 'bar',
},
opts: {
priority: 1,
},
},
});

await worker.close();
});

Expand Down
2 changes: 1 addition & 1 deletion tests/test_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe('metrics', function () {
});

beforeEach(function () {
this.clock = sinon.useFakeTimers();
this.clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
});

beforeEach(async function () {
Expand Down

0 comments on commit 5b005cd

Please sign in to comment.