Skip to content

Commit

Permalink
perf(job-scheduler): add delayed job and update scheduler in same scr…
Browse files Browse the repository at this point in the history
…ipt (#2997)
  • Loading branch information
roggervalf authored Jan 8, 2025
1 parent acd8bd7 commit 9be28a0
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 176 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:

strategy:
matrix:
node-version: [current, lts/*, lts/-1, lts/-2]
node-version: [lts/*, lts/-1, lts/-2]
redis-version: [7-alpine]
include:
- node-version: 'lts/*'
Expand Down
187 changes: 53 additions & 134 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,42 +115,40 @@ export class JobScheduler extends QueueBase {
}
}

const multi = (await this.client).multi();
if (nextMillis) {
if (override) {
return this.trace<Job<T, R, N>>(
SpanKind.PRODUCER,
'add',
`${this.name}.${jobName}`,
async (span, srcPropagationMedatada) => {
let telemetry = opts.telemetry;

if (srcPropagationMedatada) {
const omitContext = opts.telemetry?.omitContext;
const telemetryMetadata =
opts.telemetry?.metadata ||
(!omitContext && srcPropagationMedatada);

if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
}
return this.trace<Job<T, R, N>>(
SpanKind.PRODUCER,
'add',
`${this.name}.${jobName}`,
async (span, srcPropagationMedatada) => {
let telemetry = opts.telemetry;

const mergedOpts = this.getNextJobOpts(
nextMillis,
jobSchedulerId,
{
...opts,
repeat: filteredRepeatOpts,
telemetry,
},
iterationCount,
newOffset,
);
if (srcPropagationMedatada) {
const omitContext = opts.telemetry?.omitContext;
const telemetryMetadata =
opts.telemetry?.metadata ||
(!omitContext && srcPropagationMedatada);

if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
}
const mergedOpts = this.getNextJobOpts(
nextMillis,
jobSchedulerId,
{
...opts,
repeat: filteredRepeatOpts,
telemetry,
},
iterationCount,
newOffset,
);

if (override) {
const jobId = await this.scripts.addJobScheduler(
jobSchedulerId,
nextMillis,
Expand Down Expand Up @@ -183,117 +181,38 @@ export class JobScheduler extends QueueBase {
});

return job;
},
);
} else {
this.scripts.updateJobSchedulerNextMillis(
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
);
}

return this.trace<Job<T, R, N>>(
SpanKind.PRODUCER,
'add',
`${this.name}.${jobName}`,
async (span, srcPropagationMedatada) => {
let telemetry = opts.telemetry;

if (srcPropagationMedatada) {
const omitContext = opts.telemetry?.omitContext;
const telemetryMetadata =
opts.telemetry?.metadata ||
(!omitContext && srcPropagationMedatada);
} else {
const jobId = await this.scripts.updateJobSchedulerNextMillis(
jobSchedulerId,
nextMillis,
Job.optsAsJSON(mergedOpts),
producerId,
);

if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
}
if (jobId) {
const job = new this.Job<T, R, N>(
this,
jobName,
jobData,
mergedOpts,
jobId,
);

const job = this.createNextJob<T, R, N>(
(<unknown>multi) as RedisClient,
jobName,
nextMillis,
newOffset,
jobSchedulerId,
{
...opts,
repeat: { ...filteredRepeatOpts, offset: newOffset },
telemetry,
},
jobData,
iterationCount,
producerId,
);
job.id = jobId;

const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]
span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});

// Check if there are any errors
const erroredResult = results.find(result => result[0]);
if (erroredResult) {
throw new Error(
`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`,
);
return job;
}
}

// Get last result with the job id
const lastResult = results.pop();
job.id = lastResult[1] as string;

span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});

return job;
},
);
}
}

private createNextJob<T = any, R = any, N extends string = string>(
client: RedisClient,
name: N,
nextMillis: number,
offset: number,
jobSchedulerId: string,
opts: JobsOptions,
data: T,
currentCount: number,
// The job id of the job that produced this next iteration
producerId?: string,
) {
//
// Generate unique job id for this iteration.
//
const jobId = this.getSchedulerNextJobId({
jobSchedulerId,
nextMillis,
});

const mergedOpts = this.getNextJobOpts(
nextMillis,
jobSchedulerId,
opts,
currentCount,
offset,
);

const job = new this.Job<T, R, N>(this, name, data, mergedOpts, jobId);
job.addJob(client);

if (producerId) {
const producerJobKey = this.toKey(producerId);
client.hset(producerJobKey, 'nrjid', job.id);
}

return job;
}

private getNextJobOpts(
nextMillis: number,
jobSchedulerId: string,
Expand Down
40 changes: 32 additions & 8 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,38 @@ export class Scripts {
return this.execCommand(client, 'addJobScheduler', keys.concat(args));
}

async updateJobSchedulerNextMillis(
jobSchedulerId: string,
nextMillis: number,
delayedJobOpts: JobsOptions,
// The job id of the job that produced this next iteration
producerId?: string,
): Promise<string | null> {
const client = await this.queue.client;

const queueKeys = this.queue.keys;

const keys: (string | number | Buffer)[] = [
queueKeys.marker,
queueKeys.meta,
queueKeys.id,
queueKeys.delayed,
queueKeys.events,
queueKeys.repeat,
];

const args = [
nextMillis,
jobSchedulerId,
pack(delayedJobOpts),
Date.now(),
queueKeys[''],
producerId ? this.queue.toKey(producerId) : '',
];

return this.execCommand(client, 'updateJobScheduler', keys.concat(args));
}

async updateRepeatableJobMillis(
client: RedisClient,
customKey: string,
Expand All @@ -366,14 +398,6 @@ export class Scripts {
return this.execCommand(client, 'updateRepeatableJobMillis', args);
}

async updateJobSchedulerNextMillis(
client: RedisClient,
jobSchedulerId: string,
nextMillis: number,
): Promise<number> {
return client.zadd(this.queue.keys.repeat, nextMillis, jobSchedulerId);
}

private removeRepeatableArgs(
legacyRepeatJobId: string,
repeatConcatOptions: string,
Expand Down
5 changes: 4 additions & 1 deletion src/commands/includes/addDelayedJob.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
--[[
Add marker if needed when a job is available.
Adds a delayed job to the queue by doing the following:
- Creates a new job key with the job data.
- adds to delayed zset.
- Emits a global event 'delayed' if the job is delayed.
]]

-- Includes
Expand Down
61 changes: 61 additions & 0 deletions src/commands/updateJobScheduler-6.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
--[[
Updates a job scheduler and adds next delayed job
Input:
KEYS[1] 'marker',
KEYS[2] 'meta'
KEYS[3] 'id'
KEYS[4] 'delayed'
KEYS[5] events stream key
KEYS[6] 'repeat' key
ARGV[1] next milliseconds
ARGV[2] jobs scheduler id
ARGV[3] msgpacked delayed opts
ARGV[4] timestamp
ARGV[5] prefix key
ARGV[6] producer key
Output:
next delayed job id - OK
]]
local rcall = redis.call
local repeatKey = KEYS[6]
local delayedKey = KEYS[4]
local timestamp = ARGV[4]
local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[2]
local prefixKey = ARGV[5]

-- Includes
--- @include "includes/addDelayedJob"
--- @include "includes/getOrSetMaxEvents"

local schedulerKey = repeatKey .. ":" .. jobSchedulerId
local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis

-- Validate that scheduler exists.
local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
if prevMillis ~= false then
local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data")

rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)

local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)

rcall("INCR", KEYS[3])

local delayedOpts = cmsgpack.unpack(ARGV[3])

addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1],
schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)

if ARGV[6] ~= "" then
rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId)
end

return nextDelayedJobId .. "" -- convert to string
end
Loading

0 comments on commit 9be28a0

Please sign in to comment.