Skip to content

Commit

Permalink
fix(job-scheculer): avoid hazards when upserting job schedulers concu…
Browse files Browse the repository at this point in the history
…rrently
  • Loading branch information
manast authored Nov 11, 2024
1 parent 293df08 commit 022f7b7
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 136 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ jobs:

services:
dragonflydb:
image: docker.dragonflydb.io/dragonflydb/dragonfly
image: docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0
env:
DFLY_cluster_mode: emulated
DFLY_lock_on_hashtags: true
Expand Down
236 changes: 118 additions & 118 deletions docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
@@ -1,135 +1,135 @@
# Table of contents

* [What is BullMQ](README.md)
* [Quick Start](<README (1).md>)
* [API Reference](https://api.docs.bullmq.io)
* [Changelogs](changelog.md)
* [v4](changelogs/changelog-v4.md)
* [v3](changelogs/changelog-v3.md)
* [v2](changelogs/changelog-v2.md)
* [v1](changelogs/changelog-v1.md)
- [What is BullMQ](README.md)
- [Quick Start](<README (1).md>)
- [API Reference](https://api.docs.bullmq.io)
- [Changelogs](changelog.md)
- [v4](changelogs/changelog-v4.md)
- [v3](changelogs/changelog-v3.md)
- [v2](changelogs/changelog-v2.md)
- [v1](changelogs/changelog-v1.md)

## Guide

* [Introduction](guide/introduction.md)
* [Connections](guide/connections.md)
* [Queues](guide/queues/README.md)
* [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md)
* [Adding jobs in bulk](guide/queues/adding-bulks.md)
* [Global Concurrency](guide/queues/global-concurrency.md)
* [Removing Jobs](guide/queues/removing-jobs.md)
* [Workers](guide/workers/README.md)
* [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md)
* [Concurrency](guide/workers/concurrency.md)
* [Graceful shutdown](guide/workers/graceful-shutdown.md)
* [Stalled Jobs](guide/workers/stalled-jobs.md)
* [Sandboxed processors](guide/workers/sandboxed-processors.md)
* [Pausing queues](guide/workers/pausing-queues.md)
* [Jobs](guide/jobs/README.md)
* [FIFO](guide/jobs/fifo.md)
* [LIFO](guide/jobs/lifo.md)
* [Job Ids](guide/jobs/job-ids.md)
* [Job Data](guide/jobs/job-data.md)
* [Deduplication](guide/jobs/deduplication.md)
* [Delayed](guide/jobs/delayed.md)
* [Repeatable](guide/jobs/repeatable.md)
* [Prioritized](guide/jobs/prioritized.md)
* [Removing jobs](guide/jobs/removing-job.md)
* [Stalled](guide/jobs/stalled.md)
* [Getters](guide/jobs/getters.md)
* [Job Schedulers](guide/job-schedulers/README.md)
* [Repeat Strategies](guide/job-schedulers/repeat-strategies.md)
* [Repeat options](guide/job-schedulers/repeat-options.md)
* [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md)
* [Flows](guide/flows/README.md)
* [Adding flows in bulk](guide/flows/adding-bulks.md)
* [Get Flow Tree](guide/flows/get-flow-tree.md)
* [Fail Parent](guide/flows/fail-parent.md)
* [Remove Dependency](guide/flows/remove-dependency.md)
* [Ignore Dependency](guide/flows/ignore-dependency.md)
* [Remove Child Dependency](guide/flows/remove-child-dependency.md)
* [Metrics](guide/metrics/metrics.md)
* [Rate limiting](guide/rate-limiting.md)
* [Parallelism and Concurrency](guide/parallelism-and-concurrency.md)
* [Retrying failing jobs](guide/retrying-failing-jobs.md)
* [Returning job data](guide/returning-job-data.md)
* [Events](guide/events/README.md)
* [Create Custom Events](guide/events/create-custom-events.md)
* [Telemetry](guide/telemetry/README.md)
* [Getting started](guide/telemetry/getting-started.md)
* [Running Jaeger](guide/telemetry/running-jaeger.md)
* [Running a simple example](guide/telemetry/running-a-simple-example.md)
* [QueueScheduler](guide/queuescheduler.md)
* [Redis™ Compatibility](guide/redis-tm-compatibility/README.md)
* [Dragonfly](guide/redis-tm-compatibility/dragonfly.md)
* [Redis™ hosting](guide/redis-tm-hosting/README.md)
* [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md)
* [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md)
* [Architecture](guide/architecture.md)
* [NestJs](guide/nestjs/README.md)
* [Producers](guide/nestjs/producers.md)
* [Queue Events Listeners](guide/nestjs/queue-events-listeners.md)
* [Going to production](guide/going-to-production.md)
* [Migration to newer versions](guide/migration-to-newer-versions.md)
* [Troubleshooting](guide/troubleshooting.md)
- [Introduction](guide/introduction.md)
- [Connections](guide/connections.md)
- [Queues](guide/queues/README.md)
- [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md)
- [Adding jobs in bulk](guide/queues/adding-bulks.md)
- [Global Concurrency](guide/queues/global-concurrency.md)
- [Removing Jobs](guide/queues/removing-jobs.md)
- [Workers](guide/workers/README.md)
- [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md)
- [Concurrency](guide/workers/concurrency.md)
- [Graceful shutdown](guide/workers/graceful-shutdown.md)
- [Stalled Jobs](guide/workers/stalled-jobs.md)
- [Sandboxed processors](guide/workers/sandboxed-processors.md)
- [Pausing queues](guide/workers/pausing-queues.md)
- [Jobs](guide/jobs/README.md)
- [FIFO](guide/jobs/fifo.md)
- [LIFO](guide/jobs/lifo.md)
- [Job Ids](guide/jobs/job-ids.md)
- [Job Data](guide/jobs/job-data.md)
- [Deduplication](guide/jobs/deduplication.md)
- [Delayed](guide/jobs/delayed.md)
- [Repeatable](guide/jobs/repeatable.md)
- [Prioritized](guide/jobs/prioritized.md)
- [Removing jobs](guide/jobs/removing-job.md)
- [Stalled](guide/jobs/stalled.md)
- [Getters](guide/jobs/getters.md)
- [Job Schedulers](guide/job-schedulers/README.md)
- [Repeat Strategies](guide/job-schedulers/repeat-strategies.md)
- [Repeat options](guide/job-schedulers/repeat-options.md)
- [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md)
- [Flows](guide/flows/README.md)
- [Adding flows in bulk](guide/flows/adding-bulks.md)
- [Get Flow Tree](guide/flows/get-flow-tree.md)
- [Fail Parent](guide/flows/fail-parent.md)
- [Remove Dependency](guide/flows/remove-dependency.md)
- [Ignore Dependency](guide/flows/ignore-dependency.md)
- [Remove Child Dependency](guide/flows/remove-child-dependency.md)
- [Metrics](guide/metrics/metrics.md)
- [Rate limiting](guide/rate-limiting.md)
- [Parallelism and Concurrency](guide/parallelism-and-concurrency.md)
- [Retrying failing jobs](guide/retrying-failing-jobs.md)
- [Returning job data](guide/returning-job-data.md)
- [Events](guide/events/README.md)
- [Create Custom Events](guide/events/create-custom-events.md)
- [Telemetry](guide/telemetry/README.md)
- [Getting started](guide/telemetry/getting-started.md)
- [Running Jaeger](guide/telemetry/running-jaeger.md)
- [Running a simple example](guide/telemetry/running-a-simple-example.md)
- [QueueScheduler](guide/queuescheduler.md)
- [Redis™ Compatibility](guide/redis-tm-compatibility/README.md)
- [Dragonfly](guide/redis-tm-compatibility/dragonfly.md)
- [Redis™ hosting](guide/redis-tm-hosting/README.md)
- [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md)
- [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md)
- [Architecture](guide/architecture.md)
- [NestJs](guide/nestjs/README.md)
- [Producers](guide/nestjs/producers.md)
- [Queue Events Listeners](guide/nestjs/queue-events-listeners.md)
- [Going to production](guide/going-to-production.md)
- [Migration to newer versions](guide/migration-to-newer-versions.md)
- [Troubleshooting](guide/troubleshooting.md)

## Patterns

* [Adding jobs in bulk across different queues](patterns/adding-bulks.md)
* [Manually processing jobs](patterns/manually-fetching-jobs.md)
* [Named Processor](patterns/named-processor.md)
* [Flows](patterns/flows.md)
* [Idempotent jobs](patterns/idempotent-jobs.md)
* [Throttle jobs](patterns/throttle-jobs.md)
* [Process Step Jobs](patterns/process-step-jobs.md)
* [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md)
* [Stop retrying jobs](patterns/stop-retrying-jobs.md)
* [Timeout jobs](patterns/timeout-jobs.md)
* [Redis Cluster](patterns/redis-cluster.md)
- [Adding jobs in bulk across different queues](patterns/adding-bulks.md)
- [Manually processing jobs](patterns/manually-fetching-jobs.md)
- [Named Processor](patterns/named-processor.md)
- [Flows](patterns/flows.md)
- [Idempotent jobs](patterns/idempotent-jobs.md)
- [Throttle jobs](patterns/throttle-jobs.md)
- [Process Step Jobs](patterns/process-step-jobs.md)
- [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md)
- [Stop retrying jobs](patterns/stop-retrying-jobs.md)
- [Timeout jobs](patterns/timeout-jobs.md)
- [Redis Cluster](patterns/redis-cluster.md)

## BullMQ Pro

* [Introduction](bullmq-pro/introduction.md)
* [Install](bullmq-pro/install.md)
* [Observables](bullmq-pro/observables/README.md)
* [Cancelation](bullmq-pro/observables/cancelation.md)
* [Groups](bullmq-pro/groups/README.md)
* [Getters](bullmq-pro/groups/getters.md)
* [Rate limiting](bullmq-pro/groups/rate-limiting.md)
* [Concurrency](bullmq-pro/groups/concurrency.md)
* [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md)
* [Max group size](bullmq-pro/groups/max-group-size.md)
* [Pausing groups](bullmq-pro/groups/pausing-groups.md)
* [Prioritized intra-groups](bullmq-pro/groups/prioritized.md)
* [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md)
* [Batches](bullmq-pro/batches.md)
* [NestJs](bullmq-pro/nestjs/README.md)
* [Producers](bullmq-pro/nestjs/producers.md)
* [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md)
* [API Reference](https://nestjs.bullmq.pro/)
* [Changelog](bullmq-pro/nestjs/changelog.md)
* [API Reference](https://api.bullmq.pro)
* [Changelog](bullmq-pro/changelog.md)
* [Support](bullmq-pro/support.md)
- [Introduction](bullmq-pro/introduction.md)
- [Install](bullmq-pro/install.md)
- [Observables](bullmq-pro/observables/README.md)
- [Cancelation](bullmq-pro/observables/cancelation.md)
- [Groups](bullmq-pro/groups/README.md)
- [Getters](bullmq-pro/groups/getters.md)
- [Rate limiting](bullmq-pro/groups/rate-limiting.md)
- [Concurrency](bullmq-pro/groups/concurrency.md)
- [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md)
- [Max group size](bullmq-pro/groups/max-group-size.md)
- [Pausing groups](bullmq-pro/groups/pausing-groups.md)
- [Prioritized intra-groups](bullmq-pro/groups/prioritized.md)
- [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md)
- [Batches](bullmq-pro/batches.md)
- [NestJs](bullmq-pro/nestjs/README.md)
- [Producers](bullmq-pro/nestjs/producers.md)
- [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md)
- [API Reference](https://nestjs.bullmq.pro/)
- [Changelog](bullmq-pro/nestjs/changelog.md)
- [API Reference](https://api.bullmq.pro)
- [Changelog](bullmq-pro/changelog.md)
- [Support](bullmq-pro/support.md)

## Bull

* [Introduction](bull/introduction.md)
* [Install](bull/install.md)
* [Quick Guide](bull/quick-guide.md)
* [Important Notes](bull/important-notes.md)
* [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md)
* [Patterns](bull/patterns/README.md)
* [Persistent connections](bull/patterns/persistent-connections.md)
* [Message queue](bull/patterns/message-queue.md)
* [Returning Job Completions](bull/patterns/returning-job-completions.md)
* [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md)
* [Redis cluster](bull/patterns/redis-cluster.md)
* [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md)
* [Debugging](bull/patterns/debugging.md)
* [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md)
- [Introduction](bull/introduction.md)
- [Install](bull/install.md)
- [Quick Guide](bull/quick-guide.md)
- [Important Notes](bull/important-notes.md)
- [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md)
- [Patterns](bull/patterns/README.md)
- [Persistent connections](bull/patterns/persistent-connections.md)
- [Message queue](bull/patterns/message-queue.md)
- [Returning Job Completions](bull/patterns/returning-job-completions.md)
- [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md)
- [Redis cluster](bull/patterns/redis-cluster.md)
- [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md)
- [Debugging](bull/patterns/debugging.md)
- [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md)

## Python

* [Introduction](python/introduction.md)
* [Changelog](python/changelog.md)
- [Introduction](python/introduction.md)
- [Changelog](python/changelog.md)
57 changes: 47 additions & 10 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,34 +89,58 @@ export class JobScheduler extends QueueBase {
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
}

const multi = (await this.client).multi();
if (nextMillis) {
if (override) {
await this.scripts.addJobScheduler(jobSchedulerId, nextMillis, {
name: jobName,
endDate: endDate ? new Date(endDate).getTime() : undefined,
tz: repeatOpts.tz,
pattern,
every,
});
await this.scripts.addJobScheduler(
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
{
name: jobName,
endDate: endDate ? new Date(endDate).getTime() : undefined,
tz: repeatOpts.tz,
pattern,
every,
},
);
} else {
await this.scripts.updateJobSchedulerNextMillis(
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
);
}

return this.createNextJob<T, R, N>(
const job = this.createNextJob<T, R, N>(
(<unknown>multi) as RedisClient,
jobName,
nextMillis,
jobSchedulerId,
{ ...opts, repeat: filteredRepeatOpts },
jobData,
iterationCount,
);

const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]

// Check if there are any errors
const erroredResult = results.find(result => result[0]);
if (erroredResult) {
throw new Error(
`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`,
);
}

// Get last result with the job id
const lastResult = results.pop();
job.id = lastResult[1] as string;
return job;
}
}

private async createNextJob<T = any, R = any, N extends string = string>(
private createNextJob<T = any, R = any, N extends string = string>(
client: RedisClient,
name: N,
nextMillis: number,
jobSchedulerId: string,
Expand Down Expand Up @@ -146,7 +170,10 @@ export class JobScheduler extends QueueBase {

mergedOpts.repeat = { ...opts.repeat, count: currentCount };

return this.Job.create<T, R, N>(this, name, data, mergedOpts);
const job = new Job<T, R, N>(this, name, data, mergedOpts, jobId);
job.addJob(client);

return job;
}

async removeJobScheduler(jobSchedulerId: string): Promise<number> {
Expand Down Expand Up @@ -268,3 +295,13 @@ export const defaultRepeatStrategy = (
// Ignore error
}
};

function removeUndefinedFields(obj: Record<string, any>) {
const newObj: Record<string, any> = {};
for (const key in obj) {
if (obj[key] !== undefined) {
newObj[key] = obj[key];
}
}
return newObj;
}
7 changes: 4 additions & 3 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
lengthInUtf8Bytes,
parseObjectValues,
tryCatch,
removeUndefinedFields,
} from '../utils';
import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
Expand Down Expand Up @@ -464,11 +465,11 @@ export class Job<
* @returns
*/
asJSON(): JobJson {
return {
return removeUndefinedFields<JobJson>({
id: this.id,
name: this.name,
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
opts: this.optsAsJSON(this.opts),
opts: removeUndefinedFields<RedisJobOptions>(this.optsAsJSON(this.opts)),
parent: this.parent ? { ...this.parent } : undefined,
parentKey: this.parentKey,
progress: this.progress,
Expand All @@ -483,7 +484,7 @@ export class Job<
deduplicationId: this.deduplicationId,
repeatJobKey: this.repeatJobKey,
returnvalue: JSON.stringify(this.returnvalue),
};
});
}

private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions {
Expand Down
Loading

0 comments on commit 022f7b7

Please sign in to comment.