diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 8770547c8c..56176bafdc 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -1,130 +1,130 @@ # Table of contents -* [What is BullMQ](README.md) -* [Quick Start]() -* [API Reference](https://api.docs.bullmq.io) -* [Changelogs](changelog.md) - * [v4](changelogs/changelog-v4.md) - * [v3](changelogs/changelog-v3.md) - * [v2](changelogs/changelog-v2.md) - * [v1](changelogs/changelog-v1.md) +- [What is BullMQ](README.md) +- [Quick Start]() +- [API Reference](https://api.docs.bullmq.io) +- [Changelogs](changelog.md) + - [v4](changelogs/changelog-v4.md) + - [v3](changelogs/changelog-v3.md) + - [v2](changelogs/changelog-v2.md) + - [v1](changelogs/changelog-v1.md) ## Guide -* [Introduction](guide/introduction.md) -* [Connections](guide/connections.md) -* [Queues](guide/queues/README.md) - * [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md) - * [Adding jobs in bulk](guide/queues/adding-bulks.md) - * [Global Concurrency](guide/queues/global-concurrency.md) - * [Removing Jobs](guide/queues/removing-jobs.md) -* [Workers](guide/workers/README.md) - * [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md) - * [Concurrency](guide/workers/concurrency.md) - * [Graceful shutdown](guide/workers/graceful-shutdown.md) - * [Stalled Jobs](guide/workers/stalled-jobs.md) - * [Sandboxed processors](guide/workers/sandboxed-processors.md) - * [Pausing queues](guide/workers/pausing-queues.md) -* [Jobs](guide/jobs/README.md) - * [FIFO](guide/jobs/fifo.md) - * [LIFO](guide/jobs/lifo.md) - * [Job Ids](guide/jobs/job-ids.md) - * [Job Data](guide/jobs/job-data.md) - * [Debouncing](guide/jobs/debouncing.md) - * [Delayed](guide/jobs/delayed.md) - * [Repeatable](guide/jobs/repeatable.md) - * [Prioritized](guide/jobs/prioritized.md) - * [Removing jobs](guide/jobs/removing-job.md) - * [Stalled](guide/jobs/stalled.md) - * [Getters](guide/jobs/getters.md) -* [Job Schedulers](guide/job-schedulers/README.md) - * [Repeat Strategies](guide/job-schedulers/repeat-strategies.md) - * [Repeat options](guide/job-schedulers/repeat-options.md) - * [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md) -* [Flows](guide/flows/README.md) - * [Adding flows in bulk](guide/flows/adding-bulks.md) - * [Get Flow Tree](guide/flows/get-flow-tree.md) - * [Fail Parent](guide/flows/fail-parent.md) - * [Remove Dependency](guide/flows/remove-dependency.md) - * [Ignore Dependency](guide/flows/ignore-dependency.md) - * [Remove Child Dependency](guide/flows/remove-child-dependency.md) -* [Metrics](guide/metrics/metrics.md) -* [Rate limiting](guide/rate-limiting.md) -* [Parallelism and Concurrency](guide/parallelism-and-concurrency.md) -* [Retrying failing jobs](guide/retrying-failing-jobs.md) -* [Returning job data](guide/returning-job-data.md) -* [Events](guide/events.md) -* [QueueScheduler](guide/queuescheduler.md) -* [Redis™ Compatibility](guide/redis-tm-compatibility/README.md) - * [Dragonfly](guide/redis-tm-compatibility/dragonfly.md) -* [Redis™ hosting](guide/redis-tm-hosting/README.md) - * [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md) - * [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md) -* [Architecture](guide/architecture.md) -* [NestJs](guide/nestjs/README.md) - * [Producers](guide/nestjs/producers.md) - * [Queue Events Listeners](guide/nestjs/queue-events-listeners.md) -* [Going to production](guide/going-to-production.md) -* [Migration to newer versions](guide/migration-to-newer-versions.md) -* [Troubleshooting](guide/troubleshooting.md) +- [Introduction](guide/introduction.md) +- [Connections](guide/connections.md) +- [Queues](guide/queues/README.md) + - [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md) + - [Adding jobs in bulk](guide/queues/adding-bulks.md) + - [Global Concurrency](guide/queues/global-concurrency.md) + - [Removing Jobs](guide/queues/removing-jobs.md) +- [Workers](guide/workers/README.md) + - [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md) + - [Concurrency](guide/workers/concurrency.md) + - [Graceful shutdown](guide/workers/graceful-shutdown.md) + - [Stalled Jobs](guide/workers/stalled-jobs.md) + - [Sandboxed processors](guide/workers/sandboxed-processors.md) + - [Pausing queues](guide/workers/pausing-queues.md) +- [Jobs](guide/jobs/README.md) + - [FIFO](guide/jobs/fifo.md) + - [LIFO](guide/jobs/lifo.md) + - [Job Ids](guide/jobs/job-ids.md) + - [Job Data](guide/jobs/job-data.md) + - [Debouncing](guide/jobs/debouncing.md) + - [Delayed](guide/jobs/delayed.md) + - [Repeatable](guide/jobs/repeatable.md) + - [Prioritized](guide/jobs/prioritized.md) + - [Removing jobs](guide/jobs/removing-job.md) + - [Stalled](guide/jobs/stalled.md) + - [Getters](guide/jobs/getters.md) +- [Job Schedulers](guide/job-schedulers/README.md) + - [Repeat Strategies](guide/job-schedulers/repeat-strategies.md) + - [Repeat options](guide/job-schedulers/repeat-options.md) + - [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md) +- [Flows](guide/flows/README.md) + - [Adding flows in bulk](guide/flows/adding-bulks.md) + - [Get Flow Tree](guide/flows/get-flow-tree.md) + - [Fail Parent](guide/flows/fail-parent.md) + - [Remove Dependency](guide/flows/remove-dependency.md) + - [Ignore Dependency](guide/flows/ignore-dependency.md) + - [Remove Child Dependency](guide/flows/remove-child-dependency.md) +- [Metrics](guide/metrics/metrics.md) +- [Rate limiting](guide/rate-limiting.md) +- [Parallelism and Concurrency](guide/parallelism-and-concurrency.md) +- [Retrying failing jobs](guide/retrying-failing-jobs.md) +- [Returning job data](guide/returning-job-data.md) +- [Events](guide/events.md) +- [QueueScheduler](guide/queuescheduler.md) +- [Redis™ Compatibility](guide/redis-tm-compatibility/README.md) + - [Dragonfly](guide/redis-tm-compatibility/dragonfly.md) +- [Redis™ hosting](guide/redis-tm-hosting/README.md) + - [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md) + - [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md) +- [Architecture](guide/architecture.md) +- [NestJs](guide/nestjs/README.md) + - [Producers](guide/nestjs/producers.md) + - [Queue Events Listeners](guide/nestjs/queue-events-listeners.md) +- [Going to production](guide/going-to-production.md) +- [Migration to newer versions](guide/migration-to-newer-versions.md) +- [Troubleshooting](guide/troubleshooting.md) ## Patterns -* [Adding jobs in bulk across different queues](patterns/adding-bulks.md) -* [Manually processing jobs](patterns/manually-fetching-jobs.md) -* [Named Processor](patterns/named-processor.md) -* [Flows](patterns/flows.md) -* [Idempotent jobs](patterns/idempotent-jobs.md) -* [Throttle jobs](patterns/throttle-jobs.md) -* [Process Step Jobs](patterns/process-step-jobs.md) -* [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md) -* [Stop retrying jobs](patterns/stop-retrying-jobs.md) -* [Timeout jobs](patterns/timeout-jobs.md) -* [Redis Cluster](patterns/redis-cluster.md) +- [Adding jobs in bulk across different queues](patterns/adding-bulks.md) +- [Manually processing jobs](patterns/manually-fetching-jobs.md) +- [Named Processor](patterns/named-processor.md) +- [Flows](patterns/flows.md) +- [Idempotent jobs](patterns/idempotent-jobs.md) +- [Throttle jobs](patterns/throttle-jobs.md) +- [Process Step Jobs](patterns/process-step-jobs.md) +- [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md) +- [Stop retrying jobs](patterns/stop-retrying-jobs.md) +- [Timeout jobs](patterns/timeout-jobs.md) +- [Redis Cluster](patterns/redis-cluster.md) ## BullMQ Pro -* [Introduction](bullmq-pro/introduction.md) -* [Install](bullmq-pro/install.md) -* [Observables](bullmq-pro/observables/README.md) - * [Cancelation](bullmq-pro/observables/cancelation.md) -* [Groups](bullmq-pro/groups/README.md) - * [Getters](bullmq-pro/groups/getters.md) - * [Rate limiting](bullmq-pro/groups/rate-limiting.md) - * [Concurrency](bullmq-pro/groups/concurrency.md) - * [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md) - * [Max group size](bullmq-pro/groups/max-group-size.md) - * [Pausing groups](bullmq-pro/groups/pausing-groups.md) - * [Prioritized intra-groups](bullmq-pro/groups/prioritized.md) - * [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md) -* [Batches](bullmq-pro/batches.md) -* [NestJs](bullmq-pro/nestjs/README.md) - * [Producers](bullmq-pro/nestjs/producers.md) - * [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md) - * [API Reference](https://nestjs.bullmq.pro/) - * [Changelog](bullmq-pro/nestjs/changelog.md) -* [API Reference](https://api.bullmq.pro) -* [Changelog](bullmq-pro/changelog.md) -* [Support](bullmq-pro/support.md) +- [Introduction](bullmq-pro/introduction.md) +- [Install](bullmq-pro/install.md) +- [Observables](bullmq-pro/observables/README.md) + - [Cancelation](bullmq-pro/observables/cancelation.md) +- [Groups](bullmq-pro/groups/README.md) + - [Getters](bullmq-pro/groups/getters.md) + - [Rate limiting](bullmq-pro/groups/rate-limiting.md) + - [Concurrency](bullmq-pro/groups/concurrency.md) + - [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md) + - [Max group size](bullmq-pro/groups/max-group-size.md) + - [Pausing groups](bullmq-pro/groups/pausing-groups.md) + - [Prioritized intra-groups](bullmq-pro/groups/prioritized.md) + - [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md) +- [Batches](bullmq-pro/batches.md) +- [NestJs](bullmq-pro/nestjs/README.md) + - [Producers](bullmq-pro/nestjs/producers.md) + - [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md) + - [API Reference](https://nestjs.bullmq.pro/) + - [Changelog](bullmq-pro/nestjs/changelog.md) +- [API Reference](https://api.bullmq.pro) +- [Changelog](bullmq-pro/changelog.md) +- [Support](bullmq-pro/support.md) ## Bull -* [Introduction](bull/introduction.md) -* [Install](bull/install.md) -* [Quick Guide](bull/quick-guide.md) -* [Important Notes](bull/important-notes.md) -* [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md) -* [Patterns](bull/patterns/README.md) - * [Persistent connections](bull/patterns/persistent-connections.md) - * [Message queue](bull/patterns/message-queue.md) - * [Returning Job Completions](bull/patterns/returning-job-completions.md) - * [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md) - * [Redis cluster](bull/patterns/redis-cluster.md) - * [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md) - * [Debugging](bull/patterns/debugging.md) - * [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md) +- [Introduction](bull/introduction.md) +- [Install](bull/install.md) +- [Quick Guide](bull/quick-guide.md) +- [Important Notes](bull/important-notes.md) +- [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md) +- [Patterns](bull/patterns/README.md) + - [Persistent connections](bull/patterns/persistent-connections.md) + - [Message queue](bull/patterns/message-queue.md) + - [Returning Job Completions](bull/patterns/returning-job-completions.md) + - [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md) + - [Redis cluster](bull/patterns/redis-cluster.md) + - [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md) + - [Debugging](bull/patterns/debugging.md) + - [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md) ## Python -* [Introduction](python/introduction.md) -* [Changelog](python/changelog.md) +- [Introduction](python/introduction.md) +- [Changelog](python/changelog.md) diff --git a/docs/gitbook/guide/job-schedulers/README.md b/docs/gitbook/guide/job-schedulers/README.md index 5f61c3cf90..f7bb35af6d 100644 --- a/docs/gitbook/guide/job-schedulers/README.md +++ b/docs/gitbook/guide/job-schedulers/README.md @@ -12,16 +12,18 @@ To create a scheduler, simply use the "upsertJobScheduler" method as demonstrate ```typescript // Creates a new Job Scheduler that generates a job every 1000 milliseconds (1 second) -const firstJob = await queue.upsertJobScheduler("my-scheduler-id", { every: 1000 }); +const firstJob = await queue.upsertJobScheduler('my-scheduler-id', { + every: 1000, +}); ``` This example will create a new Job Scheduler that will produce a new job every second. It will also return the first job created for this Job Scheduler, which will be in "delayed" status waiting to be processed after 1 second. Now there are also a few important considerations that need to be explained here.: -* **Upsert vs. Add:** the 'upsert' is used instead of 'add' to simplify management of recurring jobs, especially in production deployments. It ensures the scheduler is updated or created without duplications. -* **Job Production Rate:** The scheduler will only generate new jobs when the last job begins processing. Therefore, if your queue is very busy, or if you do not have enough workers or concurrency, it is possible that you will get the jobs less frequently than the specified repetition interval. -* **Job Status:** As long as a Job Scheduler is producing jobs, there will be always one job associated to the scheduler in the "Delayed" status. +- **Upsert vs. Add:** the 'upsert' is used instead of 'add' to simplify management of recurring jobs, especially in production deployments. It ensures the scheduler is updated or created without duplications. +- **Job Production Rate:** The scheduler will only generate new jobs when the last job begins processing. Therefore, if your queue is very busy, or if you do not have enough workers or concurrency, it is possible that you will get the jobs less frequently than the specified repetition interval. +- **Job Status:** As long as a Job Scheduler is producing jobs, there will be always one job associated to the scheduler in the "Delayed" status. ### Using Job Templates @@ -30,21 +32,21 @@ You can also define a template with standard names, data, and options for jobs a ```typescript // Create jobs every day at 3:15 (am) const firstJob = await queue.upsertJobScheduler( - "my-scheduler-id", + 'my-scheduler-id', { pattern: '0 15 3 * * *' }, { - name: "my-job-name", - data: { foo: "bar" }, + name: 'my-job-name', + data: { foo: 'bar' }, opts: { backoff: 3, attempts: 5, - removeOnFail: 1000 + removeOnFail: 1000, }, - }); - + }, +); ``` -All jobs produced by this scheduler will use the given settings. Note that in the future you could call "upsertJobScheduler" again with the given "my-scheduler-id" in order to update any settings of this particular job scheduler, such as the repeat options or/and the job's template settings. +All jobs produced by this scheduler will use the given settings. Note that in the future you could call "upsertJobScheduler" again with the given "my-scheduler-id" in order to update any settings of this particular job scheduler, such as the repeat options or/and the job's template settings. {% hint style="info" %} Since jobs produced by the Job Scheduler will get a special job ID in order to guarantee that jobs will never be created more often than the given repeat settings, you cannot choose a custom job id. However you can use the job's name if you need to discriminate these jobs from other jobs. diff --git a/docs/gitbook/guide/job-schedulers/manage-job-schedulers.md b/docs/gitbook/guide/job-schedulers/manage-job-schedulers.md index fabf85307d..0347772ec6 100644 --- a/docs/gitbook/guide/job-schedulers/manage-job-schedulers.md +++ b/docs/gitbook/guide/job-schedulers/manage-job-schedulers.md @@ -9,7 +9,9 @@ The removeJobScheduler method is designed to delete a specific job scheduler fro ```typescript // Remove a job scheduler with ID 'scheduler-123' const result = await queue.removeJobScheduler('scheduler-123'); -console.log(result ? 'Scheduler removed successfully' : 'Failed to remove scheduler'); +console.log( + result ? 'Scheduler removed successfully' : 'Failed to remove scheduler', +); ``` #### Get Job Schedulers @@ -23,4 +25,3 @@ console.log('Current job schedulers:', schedulers); ``` This method can be particularly useful for generating reports or dashboards that provide insights into when jobs are scheduled to run, aiding in system monitoring and troubleshooting. - diff --git a/docs/gitbook/guide/job-schedulers/repeat-options.md b/docs/gitbook/guide/job-schedulers/repeat-options.md index 95ed397069..394e483082 100644 --- a/docs/gitbook/guide/job-schedulers/repeat-options.md +++ b/docs/gitbook/guide/job-schedulers/repeat-options.md @@ -11,13 +11,17 @@ const { Queue } = require('bullmq'); const connection = { host: 'localhost', port: 6379 }; const myQueue = new Queue('my-dated-jobs', { connection }); -await myQueue.upsertJobScheduler('start-later-job', { - every: 60000, // every minute - startDate: new Date('2024-10-15T00:00:00Z') // start on October 15, 2024 -}, { - name: 'timed-start-job', - data: { message: 'Starting later' } -}); +await myQueue.upsertJobScheduler( + 'start-later-job', + { + every: 60000, // every minute + startDate: new Date('2024-10-15T00:00:00Z'), // start on October 15, 2024 + }, + { + name: 'timed-start-job', + data: { message: 'Starting later' }, + }, +); ``` #### End Date @@ -25,13 +29,17 @@ await myQueue.upsertJobScheduler('start-later-job', { Use this to specify when the job should stop being scheduled, effectively setting an expiration date for the job repetitions. ```typescript -await myQueue.upsertJobScheduler('end-soon-job', { - every: 60000, // every minute - endDate: new Date('2024-11-01T00:00:00Z') // end on November 1, 2024 -}, { - name: 'timed-end-job', - data: { message: 'Ending soon' } -}); +await myQueue.upsertJobScheduler( + 'end-soon-job', + { + every: 60000, // every minute + endDate: new Date('2024-11-01T00:00:00Z'), // end on November 1, 2024 + }, + { + name: 'timed-end-job', + data: { message: 'Ending soon' }, + }, +); ``` #### Limit @@ -39,13 +47,17 @@ await myQueue.upsertJobScheduler('end-soon-job', { This setting is used to limit the number of times a job will be repeated. When the count reaches this limit, no more jobs will be produced for the given job scheculer. ```typescript -await myQueue.upsertJobScheduler('limited-job', { - every: 10000, // every 10 seconds - limit: 10 // limit to 10 executions -}, { - name: 'limited-execution-job', - data: { message: 'Limited runs' } -}); +await myQueue.upsertJobScheduler( + 'limited-job', + { + every: 10000, // every 10 seconds + limit: 10, // limit to 10 executions + }, + { + name: 'limited-execution-job', + data: { message: 'Limited runs' }, + }, +); ``` #### immediately @@ -57,12 +69,15 @@ When you use the every option in BullMQ, it schedules jobs based on fixed time i If you need a job to begin processing immediately after you add a job scheduler, regardless of the interval’s alignment with the clock, you can use the immediately setting. This is especially crucial for long intervals. For example, if you set the job to repeat monthly, normally it would wait to start until the first second of the next month. If you add the job mid-month, it would not start until the beginning of the following month. Using immediately ensures the first instance of the job runs as soon as it’s added, bypassing the wait until the scheduled interval begins. ```typescript -await myQueue.upsertJobScheduler('immediate-job', { - every: 86400000, // once a day - immediately: true // execute the first one immediately -}, { - name: 'instant-job', - data: { message: 'Immediate start' } -}); +await myQueue.upsertJobScheduler( + 'immediate-job', + { + every: 86400000, // once a day + immediately: true, // execute the first one immediately + }, + { + name: 'instant-job', + data: { message: 'Immediate start' }, + }, +); ``` - diff --git a/docs/gitbook/guide/job-schedulers/repeat-strategies.md b/docs/gitbook/guide/job-schedulers/repeat-strategies.md index 9d324e890e..bde4da51f2 100644 --- a/docs/gitbook/guide/job-schedulers/repeat-strategies.md +++ b/docs/gitbook/guide/job-schedulers/repeat-strategies.md @@ -1,6 +1,6 @@ # Repeat Strategies -BullMQ comes with two predefined strategies for creating repeatable jobs. The ‘every’ strategy is straightforward, allowing you to schedule jobs to repeat at specific intervals, measured in seconds. The more complex ‘cron’ strategy uses cron expressions, as defined by the [cron-parser](https://www.npmjs.com/package/cron-parser) to schedule jobs in intricate patterns. Additionally, BullMQ lets you create custom strategies, giving you the flexibility to define your own logic for setting job intervals. +BullMQ comes with two predefined strategies for creating repeatable jobs. The ‘every’ strategy is straightforward, allowing you to schedule jobs to repeat at specific intervals, measured in seconds. The more complex ‘cron’ strategy uses cron expressions, as defined by the [cron-parser](https://www.npmjs.com/package/cron-parser) to schedule jobs in intricate patterns. Additionally, BullMQ lets you create custom strategies, giving you the flexibility to define your own logic for setting job intervals. ### "Every" strategy @@ -11,24 +11,32 @@ const { Queue, Worker } = require('bullmq'); const connection = { host: 'localhost', - port: 6379 + port: 6379, }; const myQueue = new Queue('my-repeatable-jobs', { connection }); // Upserting a repeatable job in the queue -await myQueue.upsertJobScheduler('repeat-every-10s', { - every: 10000 // Job will repeat every 10000 milliseconds (10 seconds) -}, { - name: 'every-job', - data: { jobData: 'data' }, - opts: {} // Optional additional job options -}); +await myQueue.upsertJobScheduler( + 'repeat-every-10s', + { + every: 10000, // Job will repeat every 10000 milliseconds (10 seconds) + }, + { + name: 'every-job', + data: { jobData: 'data' }, + opts: {}, // Optional additional job options + }, +); // Worker to process the jobs -const worker = new Worker('my-repeatable-jobs', async job => { - console.log(`Processing job ${job.id} with data: ${job.data.jobData}`); -}, { connection }); +const worker = new Worker( + 'my-repeatable-jobs', + async job => { + console.log(`Processing job ${job.id} with data: ${job.data.jobData}`); + }, + { connection }, +); ``` ### "Cron" strategy @@ -62,29 +70,41 @@ const { Queue, Worker } = require('bullmq'); const connection = { host: 'localhost', - port: 6379 + port: 6379, }; const myQueue = new Queue('my-cron-jobs', { connection }); // Upserting a job with a cron expression -await myQueue.upsertJobScheduler('weekday-morning-job', { - cron: '0 0 9 * * 1-5' // Runs at 9:00 AM every Monday to Friday -}, { - name: 'cron-job', - data: { jobData: 'morning data' }, - opts: {} // Optional additional job options -}); +await myQueue.upsertJobScheduler( + 'weekday-morning-job', + { + cron: '0 0 9 * * 1-5', // Runs at 9:00 AM every Monday to Friday + }, + { + name: 'cron-job', + data: { jobData: 'morning data' }, + opts: {}, // Optional additional job options + }, +); // Worker to process the jobs -const worker = new Worker('my-cron-jobs', async job => { - console.log(`Processing job ${job.id} at ${new Date()} with data: ${job.data.jobData}`); -}, { connection }); +const worker = new Worker( + 'my-cron-jobs', + async job => { + console.log( + `Processing job ${job.id} at ${new Date()} with data: ${ + job.data.jobData + }`, + ); + }, + { connection }, +); ``` ### Custom Strategy -It is possible to define a different strategy to schedule repeatable jobs. The idea is that the repeat strategy, based on a pattern and the latest job's milliseconds, return the next desired timestamp. Although not used in the following example, you could have different behaviours on your repeat strategies based on the current job's name if you want to. However not that **only** **one** repeatStrategy can be defined for a given queue. +It is possible to define a different strategy to schedule repeatable jobs. The idea is that the repeat strategy, based on a pattern and the latest job's milliseconds, return the next desired timestamp. Although not used in the following example, you could have different behaviours on your repeat strategies based on the current job's name if you want to. However not that **only** **one** repeatStrategy can be defined for a given queue. For example we can create a custom one for [RRULE](https://jkbrzt.github.io/rrule/) like this: @@ -98,9 +118,9 @@ const settings = { opts.startDate && new Date(opts.startDate) > new Date(millis) ? new Date(opts.startDate) : new Date(millis); - + const rrule = rrulestr(opts.pattern); - + if (rrule.origOptions.count && !rrule.origOptions.dtstart) { throw new Error('DTSTART must be defined to use COUNT with rrule'); } @@ -130,7 +150,7 @@ await myQueue.upsertJobScheduler( pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=20;WKST=MO', }, { - data: { color: 'gray' } + data: { color: 'gray' }, }, ); diff --git a/docs/gitbook/guide/jobs/repeatable.md b/docs/gitbook/guide/jobs/repeatable.md index cc0f94d95b..a48d22cad2 100644 --- a/docs/gitbook/guide/jobs/repeatable.md +++ b/docs/gitbook/guide/jobs/repeatable.md @@ -52,9 +52,9 @@ await myQueue.add( There are some important considerations regarding repeatable jobs: -* Bull is smart enough not to add the same repeatable job if the repeat options are the same. -* If there are no workers running, repeatable jobs will not accumulate next time a worker is online. -* Repeatable jobs can be removed using the [`removeRepeatable`](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRepeatable) or [`removeRepeatableByKey`](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRepeatableByKey) methods. +- Bull is smart enough not to add the same repeatable job if the repeat options are the same. +- If there are no workers running, repeatable jobs will not accumulate next time a worker is online. +- Repeatable jobs can be removed using the [`removeRepeatable`](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRepeatable) or [`removeRepeatableByKey`](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRepeatableByKey) methods. ```typescript import { Queue } from 'bullmq'; @@ -65,7 +65,7 @@ const myQueue = new Queue('Paint'); const job1 = await myQueue.add('red', { foo: 'bar' }, { repeat }); const job2 = await myQueue.add('blue', { foo: 'baz' }, { repeat }); - + const isRemoved1 = await myQueue.removeRepeatableByKey(job1.repeatJobKey); const isRemoved2 = await queue.removeRepeatable('blue', repeat); ``` @@ -227,7 +227,6 @@ await myQueue.add( }, }, ); - ``` #### Updating repeatable job's options @@ -252,6 +251,6 @@ The code above will not create a new repeatable meta job, it will just update th ### Read more: -* 💡 [Repeat Strategy API Reference](https://api.docs.bullmq.io/types/v5.RepeatStrategy.html) -* 💡 [Remove Repeatable Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRepeatable) -* 💡 [Remove Repeatable Job by Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRepeatableByKey) +- 💡 [Repeat Strategy API Reference](https://api.docs.bullmq.io/types/v5.RepeatStrategy.html) +- 💡 [Remove Repeatable Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRepeatable) +- 💡 [Remove Repeatable Job by Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRepeatableByKey) diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index f59aa261dd..d3c18232d0 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -36,7 +36,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")), "addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")), "changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")), - "cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-2.lua")), + "cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")), "extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")), "getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), "getCountsPerPriority": self.redisClient.register_script(self.getScript("getCountsPerPriority-4.lua")), @@ -159,7 +159,8 @@ def addParentJob(self, job: Job, waiting_children_key: str, pipe = None): def cleanJobsInSetArgs(self, set: str, grace: int, limit:int = 0): keys = [self.toKey(set), - self.keys['events']] + self.keys['events'], + self.keys['repeat']] args = [self.keys[''], round(time.time() * 1000) - grace, limit, set] return (keys, args) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts new file mode 100644 index 0000000000..75819c6223 --- /dev/null +++ b/src/classes/job-scheduler.ts @@ -0,0 +1,250 @@ +import { parseExpression } from 'cron-parser'; +import { RedisClient, RepeatBaseOptions, RepeatOptions } from '../interfaces'; +import { JobsOptions, RepeatStrategy } from '../types'; +import { Job } from './job'; +import { Scripts } from './scripts'; +import { QueueBase } from './queue-base'; +import { RedisConnection } from './redis-connection'; + +export interface JobSchedulerJson { + key: string; // key is actually the job scheduler id + name: string; + id?: string | null; + endDate: number | null; + tz: string | null; + pattern: string | null; + every?: string | null; + next: number; +} + +export class JobScheduler extends QueueBase { + private repeatStrategy: RepeatStrategy; + + constructor( + name: string, + opts: RepeatBaseOptions, + Connection?: typeof RedisConnection, + ) { + super(name, opts, Connection); + + this.repeatStrategy = + (opts.settings && opts.settings.repeatStrategy) || getNextMillis; + } + + async upsertJobScheduler( + jobSchedulerId: string, + repeatOpts: Omit, + jobName: N, + jobData: T, + opts: Omit, + { override }: { override: boolean }, + ): Promise | undefined> { + // Check if we reached the limit of the repeatable job's iterations + const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1; + if ( + typeof repeatOpts.limit !== 'undefined' && + iterationCount > repeatOpts.limit + ) { + return; + } + + // Check if we reached the end date of the repeatable job + let now = Date.now(); + const { endDate } = repeatOpts; + if (!(typeof endDate === undefined) && now > new Date(endDate!).getTime()) { + return; + } + + const prevMillis = opts.prevMillis || 0; + now = prevMillis < now ? now : prevMillis; + + const nextMillis = await this.repeatStrategy(now, repeatOpts, jobName); + const { every, pattern } = repeatOpts; + + const hasImmediately = Boolean( + (every || pattern) && repeatOpts.immediately, + ); + const offset = hasImmediately && every ? now - nextMillis : undefined; + if (nextMillis) { + if (override) { + await this.scripts.addJobScheduler(jobSchedulerId, nextMillis, { + name: jobName, + endDate: endDate ? new Date(endDate).getTime() : undefined, + tz: repeatOpts.tz, + pattern, + every, + }); + } else { + await this.scripts.updateJobSchedulerNextMillis( + jobSchedulerId, + nextMillis, + ); + } + + const { immediately, ...filteredRepeatOpts } = repeatOpts; + + return this.createNextJob( + jobName, + nextMillis, + jobSchedulerId, + { ...opts, repeat: { offset, ...filteredRepeatOpts } }, + jobData, + iterationCount, + hasImmediately, + ); + } + } + + private async createNextJob( + name: N, + nextMillis: number, + jobSchedulerId: string, + opts: JobsOptions, + data: T, + currentCount: number, + hasImmediately: boolean, + ) { + // + // Generate unique job id for this iteration. + // + const jobId = this.getSchedulerNextJobId({ + jobSchedulerId: jobSchedulerId, + nextMillis, + }); + + const now = Date.now(); + const delay = + nextMillis + (opts.repeat.offset ? opts.repeat.offset : 0) - now; + + const mergedOpts = { + ...opts, + jobId, + delay: delay < 0 || hasImmediately ? 0 : delay, + timestamp: now, + prevMillis: nextMillis, + repeatJobKey: jobSchedulerId, + }; + + mergedOpts.repeat = { ...opts.repeat, count: currentCount }; + + return this.Job.create(this, name, data, mergedOpts); + } + + async removeJobScheduler(jobSchedulerId: string): Promise { + return this.scripts.removeJobScheduler(jobSchedulerId); + } + + private async getSchedulerData( + client: RedisClient, + key: string, + next?: number, + ): Promise { + const jobData = await client.hgetall(this.toKey('repeat:' + key)); + + if (jobData) { + return { + key, + name: jobData.name, + endDate: parseInt(jobData.endDate) || null, + tz: jobData.tz || null, + pattern: jobData.pattern || null, + every: jobData.every || null, + next, + }; + } + + return this.keyToData(key, next); + } + + private keyToData(key: string, next?: number): JobSchedulerJson { + const data = key.split(':'); + const pattern = data.slice(4).join(':') || null; + + return { + key, + name: data[0], + id: data[1] || null, + endDate: parseInt(data[2]) || null, + tz: data[3] || null, + pattern, + next, + }; + } + + async getJobSchedulers( + start = 0, + end = -1, + asc = false, + ): Promise { + const client = await this.client; + const jobSchedulersKey = this.keys.repeat; + + const result = asc + ? await client.zrange(jobSchedulersKey, start, end, 'WITHSCORES') + : await client.zrevrange(jobSchedulersKey, start, end, 'WITHSCORES'); + + const jobs = []; + for (let i = 0; i < result.length; i += 2) { + jobs.push( + this.getSchedulerData(client, result[i], parseInt(result[i + 1])), + ); + } + return Promise.all(jobs); + } + + async getSchedulersCount( + client: RedisClient, + prefix: string, + queueName: string, + ): Promise { + return client.zcard(`${prefix}:${queueName}:repeat`); + } + + private getSchedulerNextJobId({ + nextMillis, + jobSchedulerId, + }: { + jobSchedulerId: string; + nextMillis: number | string; + }) { + return `repeat:${jobSchedulerId}:${nextMillis}`; + } +} + +export const getNextMillis = ( + millis: number, + opts: RepeatOptions, +): number | undefined => { + const pattern = opts.pattern; + if (pattern && opts.every) { + throw new Error( + 'Both .pattern and .every options are defined for this repeatable job', + ); + } + + if (opts.every) { + return ( + Math.floor(millis / opts.every) * opts.every + + (opts.immediately ? 0 : opts.every) + ); + } + + const currentDate = + opts.startDate && new Date(opts.startDate) > new Date(millis) + ? new Date(opts.startDate) + : new Date(millis); + const interval = parseExpression(pattern, { + ...opts, + currentDate, + }); + + try { + if (opts.immediately) { + return new Date().getTime(); + } else { + return interval.next().getTime(); + } + } catch (e) { + // Ignore error + } +}; diff --git a/src/classes/job.ts b/src/classes/job.ts index 4d335f6c11..ff08fed570 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -29,6 +29,7 @@ import { lengthInUtf8Bytes, parseObjectValues, tryCatch, + finishedErrors, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -726,7 +727,7 @@ export class Job< const result = results[results.length - 1][1] as number; if (result < 0) { - throw this.scripts.finishedErrors({ + throw finishedErrors({ code: result, jobId: this.id, command, diff --git a/src/classes/queue.ts b/src/classes/queue.ts index db29ed03e1..9a4a114fe2 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -12,6 +12,7 @@ import { Job } from './job'; import { QueueGetters } from './queue-getters'; import { Repeat } from './repeat'; import { RedisConnection } from './redis-connection'; +import { JobScheduler } from './job-scheduler'; export interface ObliterateOpts { /** @@ -96,7 +97,9 @@ export class Queue< token = v4(); jobsOpts: BaseJobOptions; opts: QueueOptions; - private _repeat?: Repeat; + private _repeat?: Repeat; // To be deprecated in v6 in favor of JobScheduler + + private _jobScheduler?: JobScheduler; constructor( name: string, @@ -183,6 +186,19 @@ export class Queue< }); } + get jobScheduler(): Promise { + return new Promise(async resolve => { + if (!this._jobScheduler) { + this._jobScheduler = new JobScheduler(this.name, { + ...this.opts, + connection: await this.client, + }); + this._jobScheduler.on('error', e => this.emit.bind(this, e)); + } + resolve(this._jobScheduler); + }); + } + /** * Get global concurrency value. * Returns null in case no value is set. @@ -278,6 +294,49 @@ export class Queue< ); } + /** + * Upserts a scheduler. + * + * A scheduler is a job factory that creates jobs at a given interval. + * Upserting a scheduler will create a new job scheduler or update an existing one. + * It will also create the first job based on the repeat options and delayed accordingly. + * + * @param key - Unique key for the repeatable job meta. + * @param repeatOpts - Repeat options + * @param jobTemplate - Job template. If provided it will be used for all the jobs + * created by the scheduler. + * + * @returns The next job to be scheduled (would normally be in delayed state). + */ + async upsertJobScheduler( + jobSchedulerId: NameType, + repeatOpts: Omit, + jobTemplate?: { + name?: NameType; + data?: DataType; + opts?: Omit; + }, + ) { + if (repeatOpts.endDate) { + if (+new Date(repeatOpts.endDate) < Date.now()) { + throw new Error('End date must be greater than current timestamp'); + } + } + + return (await this.jobScheduler).upsertJobScheduler< + DataType, + ResultType, + NameType + >( + jobSchedulerId, + repeatOpts, + jobTemplate?.name ?? jobSchedulerId, + jobTemplate?.data ?? {}, + { ...this.jobsOpts, ...jobTemplate?.opts }, + { override: true }, + ); + } + /** * Pauses the processing of this queue globally. * @@ -336,6 +395,9 @@ export class Queue< /** * Get all repeatable meta jobs. * + * + * @deprecated This method is deprecated and will be removed in v6. Use getJobSchedulers instead. + * * @param start - Offset of first job to return. * @param end - Offset of last job to return. * @param asc - Determine the order in which jobs are returned based on their @@ -349,12 +411,30 @@ export class Queue< return (await this.repeat).getRepeatableJobs(start, end, asc); } + /** + * Get all Job Schedulers + * + * @param start - Offset of first scheduler to return. + * @param end - Offset of last scheduler to return. + * @param asc - Determine the order in which schedulers are returned based on their + * next execution time. + */ + async getJobSchedulers( + start?: number, + end?: number, + asc?: boolean, + ): Promise { + return (await this.jobScheduler).getJobSchedulers(start, end, asc); + } + /** * Removes a repeatable job. * * Note: you need to use the exact same repeatOpts when deleting a repeatable job * than when adding it. * + * @deprecated This method is deprecated and will be removed in v6. Use removeJobScheduler instead. + * * @see removeRepeatableByKey * * @param name - Job name @@ -373,6 +453,21 @@ export class Queue< return !removed; } + /** + * + * Removes a job scheduler. + * + * @param jobSchedulerId + * + * @returns + */ + async removeJobScheduler(jobSchedulerId: string): Promise { + const jobScheduler = await this.jobScheduler; + const removed = await jobScheduler.removeJobScheduler(jobSchedulerId); + + return !removed; + } + /** * Removes a debounce key. * @@ -391,6 +486,8 @@ export class Queue< * * @see getRepeatableJobs * + * @deprecated This method is deprecated and will be removed in v6. Use removeJobScheduler instead. + * * @param repeatJobKey - To the repeatable job. * @returns */ diff --git a/src/classes/repeat.ts b/src/classes/repeat.ts index 7ebae338d0..4d14ff265e 100644 --- a/src/classes/repeat.ts +++ b/src/classes/repeat.ts @@ -65,19 +65,20 @@ export class Repeat extends QueueBase { const hasImmediately = Boolean( (every || pattern) && repeatOpts.immediately, ); - const offset = (hasImmediately && every) ? now - nextMillis : undefined; + const offset = hasImmediately && every ? now - nextMillis : undefined; if (nextMillis) { // We store the undecorated opts.jobId into the repeat options if (!prevMillis && opts.jobId) { repeatOpts.jobId = opts.jobId; } - const repeatConcatOptions = getRepeatConcatOptions(name, repeatOpts); + const legacyRepeatKey = getRepeatConcatOptions(name, repeatOpts); + const newRepeatKey = opts.repeat.key ?? this.hash(legacyRepeatKey); let repeatJobKey; if (override) { repeatJobKey = await this.scripts.addRepeatableJob( - opts.repeat.key ?? this.hash(repeatConcatOptions), + newRepeatKey, nextMillis, { name, @@ -86,32 +87,30 @@ export class Repeat extends QueueBase { pattern, every, }, - repeatConcatOptions, + legacyRepeatKey, ); } else { const client = await this.client; repeatJobKey = await this.scripts.updateRepeatableJobMillis( client, - opts.repeat.key ?? this.hash(repeatConcatOptions), + newRepeatKey, nextMillis, - repeatConcatOptions, + legacyRepeatKey, ); } - if (repeatJobKey) { - const { immediately, ...filteredRepeatOpts } = repeatOpts; + const { immediately, ...filteredRepeatOpts } = repeatOpts; - return this.createNextJob( - name, - nextMillis, - repeatJobKey, - { ...opts, repeat: { offset, ...filteredRepeatOpts } }, - data, - iterationCount, - hasImmediately, - ); - } + return this.createNextJob( + name, + nextMillis, + repeatJobKey, + { ...opts, repeat: { offset, ...filteredRepeatOpts } }, + data, + iterationCount, + hasImmediately, + ); } } @@ -340,7 +339,7 @@ export const getNextMillis = ( }); try { - if(opts.immediately){ + if (opts.immediately) { return new Date().getTime(); } else { return interval.next().getTime(); diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 0c21578640..6de3256b45 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -34,7 +34,12 @@ import { RedisJobOptions, } from '../types'; import { ErrorCode } from '../enums'; -import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils'; +import { + array2obj, + finishedErrors, + getParentKey, + isRedisVersionLowerThan, +} from '../utils'; import { ChainableCommander } from 'ioredis'; export type JobData = [JobJsonRaw | number, string?]; @@ -220,7 +225,7 @@ export class Scripts { } if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, parentKey: parentOpts.parentKey, command: 'addJob', @@ -302,6 +307,23 @@ export class Scripts { return (client).addRepeatableJob(args); } + async addJobScheduler( + jobSchedulerId: string, + nextMillis: number, + opts: RepeatableOptions, + ): Promise { + const queueKeys = this.queue.keys; + const client = await this.queue.client; + + const keys: (string | number | Buffer)[] = [ + queueKeys.repeat, + queueKeys.delayed, + ]; + const args = [nextMillis, pack(opts), jobSchedulerId, queueKeys['']]; + + return (client).addJobScheduler(keys.concat(args)); + } + async updateRepeatableJobMillis( client: RedisClient, customKey: string, @@ -317,6 +339,15 @@ export class Scripts { return (client).updateRepeatableJobMillis(args); } + async updateJobSchedulerNextMillis( + jobSchedulerId: string, + nextMillis: number, + ): Promise { + const client = await this.queue.client; + + return client.zadd(this.queue.keys.repeat, nextMillis, jobSchedulerId); + } + private removeRepeatableArgs( legacyRepeatJobId: string, repeatConcatOptions: string, @@ -360,15 +391,37 @@ export class Scripts { return (client).removeRepeatable(args); } + async removeJobScheduler(jobSchedulerId: string): Promise { + const client = await this.queue.client; + + const queueKeys = this.queue.keys; + + const keys = [queueKeys.repeat, queueKeys.delayed, queueKeys.events]; + + const args = [jobSchedulerId, queueKeys['']]; + + return (client).removeJobScheduler(keys.concat(args)); + } + async remove(jobId: string, removeChildren: boolean): Promise { const client = await this.queue.client; const keys: (string | number)[] = ['', 'meta'].map(name => this.queue.toKey(name), ); - return (client).removeJob( + const result = await (client).removeJob( keys.concat([jobId, removeChildren ? 1 : 0]), ); + + if (result == ErrorCode.JobBelongsToJobScheduler) { + throw finishedErrors({ + code: ErrorCode.JobBelongsToJobScheduler, + jobId, + command: 'remove', + }); + } + + return result; } async extendLock( @@ -400,7 +453,7 @@ export class Scripts { const result = await (client).updateData(keys.concat([dataJson])); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId: job.id, command: 'updateData', @@ -426,7 +479,7 @@ export class Scripts { ); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'updateProgress', @@ -451,7 +504,7 @@ export class Scripts { ); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'addLog', @@ -535,7 +588,7 @@ export class Scripts { const result = await (client).moveToFinished(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'moveToFinished', @@ -548,45 +601,6 @@ export class Scripts { } } - finishedErrors({ - code, - jobId, - parentKey, - command, - state, - }: { - code: number; - jobId?: string; - parentKey?: string; - command: string; - state?: string; - }): Error { - switch (code) { - case ErrorCode.JobNotExist: - return new Error(`Missing key for job ${jobId}. ${command}`); - case ErrorCode.JobLockNotExist: - return new Error(`Missing lock for job ${jobId}. ${command}`); - case ErrorCode.JobNotInState: - return new Error( - `Job ${jobId} is not in the ${state} state. ${command}`, - ); - case ErrorCode.JobPendingDependencies: - return new Error(`Job ${jobId} has pending dependencies. ${command}`); - case ErrorCode.ParentJobNotExist: - return new Error(`Missing key for parent job ${parentKey}. ${command}`); - case ErrorCode.JobLockMismatch: - return new Error( - `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, - ); - case ErrorCode.ParentJobCannotBeReplaced: - return new Error( - `The parent job ${parentKey} cannot be replaced. ${command}`, - ); - default: - return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); - } - } - private drainArgs(delayed: boolean): (string | number)[] { const queueKeys = this.queue.keys; @@ -595,6 +609,7 @@ export class Scripts { queueKeys.paused, delayed ? queueKeys.delayed : '', queueKeys.prioritized, + queueKeys.repeat, ]; const args = [queueKeys['']]; @@ -637,7 +652,7 @@ export class Scripts { case 1: return false; default: - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, parentKey, @@ -801,7 +816,7 @@ export class Scripts { const args = this.changeDelayArgs(jobId, delay); const result = await (client).changeDelay(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'changeDelay', @@ -838,7 +853,7 @@ export class Scripts { const args = this.changePriorityArgs(jobId, priority, lifo); const result = await (client).changePriority(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'changePriority', @@ -861,12 +876,7 @@ export class Scripts { this.queue.keys.marker, ]; - return keys.concat([ - priority, - this.queue.toKey(''), - jobId, - lifo ? 1 : 0, - ]); + return keys.concat([priority, this.queue.toKey(''), jobId, lifo ? 1 : 0]); } moveToDelayedArgs( @@ -961,7 +971,7 @@ export class Scripts { const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); const result = await (client).moveToDelayed(args); if (result < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'moveToDelayed', @@ -997,7 +1007,7 @@ export class Scripts { case 1: return false; default: - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId, command: 'moveToWaitingChildren', @@ -1034,6 +1044,7 @@ export class Scripts { return (client).cleanJobsInSet([ this.queue.toKey(set), this.queue.toKey('events'), + this.queue.toKey('repeat'), this.queue.toKey(''), timestamp, limit, @@ -1155,7 +1166,7 @@ export class Scripts { case 1: return; default: - throw this.finishedErrors({ + throw finishedErrors({ code: result, jobId: job.id, command: 'reprocessJob', @@ -1219,7 +1230,7 @@ export class Scripts { const code = await (client).promote(keys.concat(args)); if (code < 0) { - throw this.finishedErrors({ + throw finishedErrors({ code, jobId, command: 'promote', diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 7c96552956..57156d9414 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -35,6 +35,7 @@ import { RATE_LIMIT_ERROR, WaitingChildrenError, } from './errors'; +import { JobScheduler } from './job-scheduler'; // 10 seconds is the maximum time a BRPOPLPUSH can block. const maximumBlockTimeout = 10; @@ -184,7 +185,9 @@ export class Worker< private resumeWorker: () => void; private stalledCheckTimer: NodeJS.Timeout; private waiting: Promise | null = null; - private _repeat: Repeat; + private _repeat: Repeat; // To be deprecated in v6 in favor of Job Scheduler + + private _jobScheduler: JobScheduler; protected paused: Promise; protected processFn: Processor; @@ -403,6 +406,20 @@ export class Worker< }); } + get jobScheduler(): Promise { + return new Promise(async resolve => { + if (!this._jobScheduler) { + const connection = await this.client; + this._jobScheduler = new JobScheduler(this.name, { + ...this.opts, + connection, + }); + this._jobScheduler.on('error', e => this.emit.bind(this, e)); + } + resolve(this._jobScheduler); + }); + } + async run() { if (!this.processFn) { throw new Error('No process function is defined.'); @@ -731,11 +748,26 @@ will never work with more accuracy than 1ms. */ this.drained = false; const job = this.createJob(jobData, jobId); job.token = token; + + // Add next scheduled job if necessary. if (job.opts.repeat) { - const repeat = await this.repeat; - await repeat.updateRepeatableJob(job.name, job.data, job.opts, { - override: false, - }); + // Use new job scheduler if possible + if (job.repeatJobKey) { + const jobScheduler = await this.jobScheduler; + await jobScheduler.upsertJobScheduler( + job.repeatJobKey, + job.opts.repeat, + job.name, + job.data, + job.opts, + { override: false }, + ); + } else { + const repeat = await this.repeat; + await repeat.updateRepeatableJob(job.name, job.data, job.opts, { + override: false, + }); + } } return job; } diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-2.lua new file mode 100644 index 0000000000..4583e6b223 --- /dev/null +++ b/src/commands/addJobScheduler-2.lua @@ -0,0 +1,75 @@ +--[[ + Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options). + + Input: + KEYS[1] 'repeat' key + KEYS[2] 'delayed' key + + ARGV[1] next milliseconds + ARGV[2] msgpacked options + [1] name + [2] tz? + [3] patten? + [4] endDate? + [5] every? + ARGV[3] jobs scheduler id + ARGV[4] prefix key + + Output: + repeatableKey - OK +]] +local rcall = redis.call +local repeatKey = KEYS[1] +local delayedKey = KEYS[2] + +local nextMillis = ARGV[1] +local jobSchedulerId = ARGV[3] +local prefixKey = ARGV[4] + +-- Includes +--- @include "includes/removeJob" + +local function storeRepeatableJob(repeatKey, nextMillis, rawOpts) + rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) + local opts = cmsgpack.unpack(rawOpts) + + local optionalValues = {} + if opts['tz'] then + table.insert(optionalValues, "tz") + table.insert(optionalValues, opts['tz']) + end + + if opts['pattern'] then + table.insert(optionalValues, "pattern") + table.insert(optionalValues, opts['pattern']) + end + + if opts['endDate'] then + table.insert(optionalValues, "endDate") + table.insert(optionalValues, opts['endDate']) + end + + if opts['every'] then + table.insert(optionalValues, "every") + table.insert(optionalValues, opts['every']) + end + + rcall("HMSET", repeatKey .. ":" .. jobSchedulerId, "name", opts['name'], + unpack(optionalValues)) +end + +-- If we are overriding a repeatable job we must delete the delayed job for +-- the next iteration. +local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId) +if prevMillis ~= false then + local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis + local nextDelayedJobId = repeatKey .. ":" .. jobSchedulerId .. ":" .. nextMillis + + if rcall("ZSCORE", delayedKey, delayedJobId) ~= false + and rcall("EXISTS", nextDelayedJobId) ~= 1 then + removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]]) + rcall("ZREM", delayedKey, delayedJobId) + end +end + +return storeRepeatableJob(repeatKey, nextMillis, ARGV[2]) diff --git a/src/commands/cleanJobsInSet-2.lua b/src/commands/cleanJobsInSet-3.lua similarity index 82% rename from src/commands/cleanJobsInSet-2.lua rename to src/commands/cleanJobsInSet-3.lua index c33c64eb11..6e95032300 100644 --- a/src/commands/cleanJobsInSet-2.lua +++ b/src/commands/cleanJobsInSet-3.lua @@ -4,6 +4,7 @@ Input: KEYS[1] set key, KEYS[2] events stream key + KEYS[3] job schedulers key ARGV[1] jobKey prefix ARGV[2] timestamp @@ -32,21 +33,21 @@ end local result if ARGV[4] == "active" then - result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false) + result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false --[[ hasFinished ]]) elseif ARGV[4] == "delayed" then rangeEnd = "+inf" result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, - {"processedOn", "timestamp"}, false) + {"processedOn", "timestamp"}, false --[[ hasFinished ]], KEYS[3]) elseif ARGV[4] == "prioritized" then rangeEnd = "+inf" result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, - {"timestamp"}, false) + {"timestamp"}, false --[[ hasFinished ]]) elseif ARGV[4] == "wait" or ARGV[4] == "paused" then - result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true) + result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true --[[ hasFinished ]]) else rangeEnd = ARGV[2] result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit, - {"finishedOn"}, true) + {"finishedOn"}, true --[[ hasFinished ]]) end rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2]) diff --git a/src/commands/drain-4.lua b/src/commands/drain-4.lua deleted file mode 100644 index 4d6ef7939a..0000000000 --- a/src/commands/drain-4.lua +++ /dev/null @@ -1,26 +0,0 @@ ---[[ - Drains the queue, removes all jobs that are waiting - or delayed, but not active, completed or failed - - Input: - KEYS[1] 'wait', - KEYS[2] 'paused' - KEYS[3] 'delayed' - KEYS[4] 'prioritized' - - ARGV[1] queue key prefix -]] -local rcall = redis.call -local queueBaseKey = ARGV[1] - ---- @include "includes/removeListJobs" ---- @include "includes/removeZSetJobs" - -removeListJobs(KEYS[1], true, queueBaseKey, 0) --wait -removeListJobs(KEYS[2], true, queueBaseKey, 0) --paused - -if KEYS[3] ~= "" then - removeZSetJobs(KEYS[3], true, queueBaseKey, 0) --delayed -end - -removeZSetJobs(KEYS[4], true, queueBaseKey, 0) --prioritized diff --git a/src/commands/drain-5.lua b/src/commands/drain-5.lua new file mode 100644 index 0000000000..d08fe41998 --- /dev/null +++ b/src/commands/drain-5.lua @@ -0,0 +1,41 @@ +--[[ + Drains the queue, removes all jobs that are waiting + or delayed, but not active, completed or failed + + Input: + KEYS[1] 'wait', + KEYS[2] 'paused' + KEYS[3] 'delayed' + KEYS[4] 'prioritized' + KEYS[5] 'jobschedulers' (repeat) + + ARGV[1] queue key prefix +]] +local rcall = redis.call +local queueBaseKey = ARGV[1] + +--- @include "includes/removeListJobs" +--- @include "includes/removeZSetJobs" + +removeListJobs(KEYS[1], true, queueBaseKey, 0) -- wait +removeListJobs(KEYS[2], true, queueBaseKey, 0) -- paused + +if KEYS[3] ~= "" then + + -- We must not remove delayed jobs if they are associated to a job scheduler. + local scheduledJobs = {} + local jobSchedulers = rcall("ZRANGE", KEYS[5], 0, -1, "WITHSCORES") + + -- For every job scheduler, get the current delayed job id. + for i = 1, #jobSchedulers, 2 do + local jobSchedulerId = jobSchedulers[i] + local jobSchedulerMillis = jobSchedulers[i + 1] + + local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. jobSchedulerMillis + scheduledJobs[delayedJobId] = true + end + + removeZSetJobs(KEYS[3], true, queueBaseKey, 0, scheduledJobs) -- delayed +end + +removeZSetJobs(KEYS[4], true, queueBaseKey, 0) -- prioritized diff --git a/src/commands/includes/cleanSet.lua b/src/commands/includes/cleanSet.lua index c48b098693..f85d41c259 100644 --- a/src/commands/includes/cleanSet.lua +++ b/src/commands/includes/cleanSet.lua @@ -1,45 +1,66 @@ --[[ Function to clean job set. Returns jobIds and deleted count number. -]] +]] -- Includes --- @include "batches" --- @include "getJobsInZset" --- @include "getTimestamp" --- @include "removeJob" - -local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attributes, isFinished) - local jobs = getJobsInZset(setKey, rangeEnd, limit) - local deleted = {} - local deletedCount = 0 - local jobTS - for i, job in ipairs(jobs) do - if limit > 0 and deletedCount >= limit then - break +local function isJobSchedulerJob(jobId, jobSchedulersKey) + if jobSchedulersKey then + local jobSchedulerId = jobId:match("repeat:(.*):%d+") + if jobSchedulerId then + return rcall("ZSCORE", jobSchedulersKey, jobSchedulerId) + end end + return false +end + +local function cleanSet( + setKey, + jobKeyPrefix, + rangeEnd, + timestamp, + limit, + attributes, + isFinished, + jobSchedulersKey) + local jobs = getJobsInZset(setKey, rangeEnd, limit) + local deleted = {} + local deletedCount = 0 + local jobTS + for i, job in ipairs(jobs) do + if limit > 0 and deletedCount >= limit then + break + end - local jobKey = jobKeyPrefix .. job - if isFinished then - removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) - deletedCount = deletedCount + 1 - table.insert(deleted, job) - else - -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed - jobTS = getTimestamp(jobKey, attributes) - if (not jobTS or jobTS <= timestamp) then - removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]]) - deletedCount = deletedCount + 1 - table.insert(deleted, job) - end + -- Extract a Job Scheduler Id from jobId ("repeat:job-scheduler-id:millis") + -- and check if it is in the scheduled jobs + if not isJobSchedulerJob(job, jobSchedulersKey) then + local jobKey = jobKeyPrefix .. job + if isFinished then + removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] ) + deletedCount = deletedCount + 1 + table.insert(deleted, job) + else + -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed + jobTS = getTimestamp(jobKey, attributes) + if (not jobTS or jobTS <= timestamp) then + removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] ) + deletedCount = deletedCount + 1 + table.insert(deleted, job) + end + end + end end - end - if(#deleted > 0) then - for from, to in batches(#deleted, 7000) do - rcall("ZREM", setKey, unpack(deleted, from, to)) + if (#deleted > 0) then + for from, to in batches(#deleted, 7000) do + rcall("ZREM", setKey, unpack(deleted, from, to)) + end end - end - return {deleted, deletedCount} + return {deleted, deletedCount} end diff --git a/src/commands/includes/removeZSetJobs.lua b/src/commands/includes/removeZSetJobs.lua index 5858b191b5..79100e78e6 100644 --- a/src/commands/includes/removeZSetJobs.lua +++ b/src/commands/includes/removeZSetJobs.lua @@ -3,8 +3,20 @@ --- @include "getZSetItems" --- @include "removeJobs" -local function removeZSetJobs(keyName, hard, baseKey, max) +local function removeZSetJobs(keyName, hard, baseKey, max, jobsToIgnore) local jobs = getZSetItems(keyName, max) + + -- filter out jobs to ignore + if jobsToIgnore then + local filteredJobs = {} + for i = 1, #jobs do + if not jobsToIgnore[jobs[i]] then + table.insert(filteredJobs, jobs[i]) + end + end + jobs = filteredJobs + end + local count = removeJobs(jobs, hard, baseKey, max) if(#jobs > 0) then for from, to in batches(#jobs, 7000) do diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 0bb292068d..a882ed0445 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -44,7 +44,7 @@ local metaKey = KEYS[6] local pausedKey = KEYS[7] local markerKey = KEYS[8] local eventStreamKey = KEYS[9] -local maxStalledJobCount = ARGV[1] +local maxStalledJobCount = tonumber(ARGV[1]) local queueKeyPrefix = ARGV[2] local timestamp = ARGV[3] local maxCheckTime = ARGV[4] @@ -63,8 +63,6 @@ local failed = {} if (#stalling > 0) then rcall('DEL', stalledKey) - local MAX_STALLED_JOB_COUNT = tonumber(maxStalledJobCount) - -- Remove from active list for i, jobId in ipairs(stalling) do -- Markers in waitlist DEPRECATED in v5: Remove in v6. @@ -83,7 +81,7 @@ if (#stalling > 0) then -- If this job has been stalled too many times, such as if it crashes the worker, then fail it. local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1) - if (stalledCount > MAX_STALLED_JOB_COUNT) then + if (stalledCount > maxStalledJobCount) then local jobAttributes = rcall("HMGET", jobKey, "opts", "parent", "deid") local rawOpts = jobAttributes[1] local rawParentData = jobAttributes[2] diff --git a/src/commands/removeJob-2.lua b/src/commands/removeJob-2.lua index 19c62de1d3..0e64df436f 100644 --- a/src/commands/removeJob-2.lua +++ b/src/commands/removeJob-2.lua @@ -24,7 +24,7 @@ local rcall = redis.call --- @include "includes/removeJobKeys" --- @include "includes/removeParentDependencyKey" -local function removeJob( prefix, jobId, parentKey, removeChildren) +local function removeJob(prefix, jobId, parentKey, removeChildren) local jobKey = prefix .. jobId; removeParentDependencyKey(jobKey, false, parentKey, nil) @@ -33,14 +33,14 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) -- Check if this job has children -- If so, we are going to try to remove the children recursively in deep first way because -- if some job is locked we must exit with and error. - --local countProcessed = rcall("HLEN", jobKey .. ":processed") + -- local countProcessed = rcall("HLEN", jobKey .. ":processed") local processed = rcall("HGETALL", jobKey .. ":processed") if (#processed > 0) then for i = 1, #processed, 2 do local childJobId = getJobIdFromKey(processed[i]) local childJobPrefix = getJobKeyPrefix(processed[i], childJobId) - removeJob( childJobPrefix, childJobId, jobKey, removeChildren ) + removeJob(childJobPrefix, childJobId, jobKey, removeChildren) end end @@ -50,7 +50,7 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) -- We need to get the jobId for this job. local childJobId = getJobIdFromKey(childJobKey) local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId) - removeJob( childJobPrefix, childJobId, jobKey, removeChildren ) + removeJob(childJobPrefix, childJobId, jobKey, removeChildren) end end @@ -60,7 +60,7 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) for i = 1, #failed, 2 do local childJobId = getJobIdFromKey(failed[i]) local childJobPrefix = getJobKeyPrefix(failed[i], childJobId) - removeJob( childJobPrefix, childJobId, jobKey, removeChildren ) + removeJob(childJobPrefix, childJobId, jobKey, removeChildren) end end end @@ -70,15 +70,23 @@ local function removeJob( prefix, jobId, parentKey, removeChildren) removeDebounceKey(prefix, jobKey) if removeJobKeys(jobKey) > 0 then local maxEvents = getOrSetMaxEvents(KEYS[2]) - rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed", - "jobId", jobId, "prev", prev) + rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed", "jobId", jobId, "prev", + prev) end end local prefix = KEYS[1] +local jobId = ARGV[1] +local shouldRemoveChildren = ARGV[2] +local jobKey = prefix .. jobId -if not isLocked(prefix, ARGV[1], ARGV[2]) then - removeJob(prefix, ARGV[1], nil, ARGV[2]) +-- Check if the job belongs to a job scheduler and it is in delayed state. +if rcall("ZSCORE", prefix .. "delayed", jobId) and rcall("HGET", jobKey, "rjk") then + return -8 -- Return error code as the job is part of a job scheduler and is in delayed state. +end + +if not isLocked(prefix, jobId, shouldRemoveChildren) then + removeJob(prefix, jobId, nil, shouldRemoveChildren) return 1 end return 0 diff --git a/src/commands/removeJobScheduler-3.lua b/src/commands/removeJobScheduler-3.lua new file mode 100644 index 0000000000..ebcc98b69a --- /dev/null +++ b/src/commands/removeJobScheduler-3.lua @@ -0,0 +1,43 @@ + +--[[ + Removes a repeatable job + Input: + KEYS[1] job schedulers key + KEYS[2] delayed jobs key + KEYS[3] events key + + ARGV[1] job scheduler id + ARGV[2] prefix key + + Output: + 0 - OK + 1 - Missing repeat job + + Events: + 'removed' +]] +local rcall = redis.call + +-- Includes +--- @include "includes/removeJobKeys" + +local jobSchedulerId = ARGV[1] +local prefix = ARGV[2] + +local millis = rcall("ZSCORE", KEYS[1], jobSchedulerId) + +if millis then + -- Delete next programmed job. + local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. millis + if(rcall("ZREM", KEYS[2], delayedJobId) == 1) then + removeJobKeys(prefix .. delayedJobId) + rcall("XADD", KEYS[3], "*", "event", "removed", "jobId", delayedJobId, "prev", "delayed") + end +end + +if(rcall("ZREM", KEYS[1], jobSchedulerId) == 1) then + rcall("DEL", KEYS[1] .. ":" .. jobSchedulerId) + return 0 +end + +return 1 diff --git a/src/enums/error-code.ts b/src/enums/error-code.ts index e0fa41d673..f2691386bb 100644 --- a/src/enums/error-code.ts +++ b/src/enums/error-code.ts @@ -6,4 +6,5 @@ export enum ErrorCode { ParentJobNotExist = -5, JobLockMismatch = -6, ParentJobCannotBeReplaced = -7, + JobBelongsToJobScheduler = -8, } diff --git a/src/utils.ts b/src/utils.ts index cceecb438c..835fffe3a2 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -9,6 +9,7 @@ import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils'; import { ChildMessage, RedisClient } from './interfaces'; import { EventEmitter } from 'events'; import * as semver from 'semver'; +import { ErrorCode } from './enums'; export const errorObject: { [index: string]: any } = { value: null }; @@ -247,3 +248,44 @@ export const toString = (value: any): string => { }; export const QUEUE_EVENT_SUFFIX = ':qe'; + +export const finishedErrors = ({ + code, + jobId, + parentKey, + command, + state, +}: { + code: number; + jobId?: string; + parentKey?: string; + command: string; + state?: string; +}): Error => { + switch (code) { + case ErrorCode.JobNotExist: + return new Error(`Missing key for job ${jobId}. ${command}`); + case ErrorCode.JobLockNotExist: + return new Error(`Missing lock for job ${jobId}. ${command}`); + case ErrorCode.JobNotInState: + return new Error(`Job ${jobId} is not in the ${state} state. ${command}`); + case ErrorCode.JobPendingDependencies: + return new Error(`Job ${jobId} has pending dependencies. ${command}`); + case ErrorCode.ParentJobNotExist: + return new Error(`Missing key for parent job ${parentKey}. ${command}`); + case ErrorCode.JobLockMismatch: + return new Error( + `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`, + ); + case ErrorCode.ParentJobCannotBeReplaced: + return new Error( + `The parent job ${parentKey} cannot be replaced. ${command}`, + ); + case ErrorCode.JobBelongsToJobScheduler: + return new Error( + `Job ${jobId} belongs to a job scheduler and cannot be removed directly`, + ); + default: + return new Error(`Unknown code ${code} error for ${jobId}. ${command}`); + } +}; diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts new file mode 100644 index 0000000000..f22f80f253 --- /dev/null +++ b/tests/test_job_scheduler.ts @@ -0,0 +1,1615 @@ +import { expect } from 'chai'; +import { default as IORedis } from 'ioredis'; +import { beforeEach, describe, it, before, after as afterAll } from 'mocha'; + +import * as sinon from 'sinon'; +import { v4 } from 'uuid'; +import { rrulestr } from 'rrule'; +import { + Job, + Queue, + QueueEvents, + Repeat, + getNextMillis, + Worker, +} from '../src/classes'; +import { JobsOptions } from '../src/types'; +import { removeAllQueueData, delay, finishedErrors } from '../src/utils'; +import { ErrorCode } from '../src/enums'; + +const moment = require('moment'); + +const ONE_SECOND = 1000; +const ONE_MINUTE = 60 * ONE_SECOND; +const ONE_HOUR = 60 * ONE_MINUTE; +const ONE_DAY = 24 * ONE_HOUR; + +const NoopProc = async (job: Job) => {}; + +describe('Job Scheduler', function () { + const redisHost = process.env.REDIS_HOST || 'localhost'; + const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull'; + this.timeout(10000); + let repeat: Repeat; + let queue: Queue; + let queueEvents: QueueEvents; + let queueName: string; + + let connection; + before(async function () { + connection = new IORedis(redisHost, { maxRetriesPerRequest: null }); + }); + + beforeEach(async function () { + this.clock = sinon.useFakeTimers(); + queueName = `test-${v4()}`; + queue = new Queue(queueName, { connection, prefix }); + repeat = new Repeat(queueName, { connection, prefix }); + queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queue.waitUntilReady(); + await queueEvents.waitUntilReady(); + }); + + afterEach(async function () { + this.clock.restore(); + await queue.close(); + await repeat.close(); + await queueEvents.close(); + await removeAllQueueData(new IORedis(redisHost), queueName); + }); + + afterAll(async function () { + await connection.quit(); + }); + + // NOTE: This test seems to be misplaced, it is not related to the repeatable jobs + describe('when exponential backoff is applied', () => { + it('should retry a job respecting exponential backoff strategy', async function () { + let delay = 10000; + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const worker = new Worker( + queueName, + async () => { + throw Error('error'); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => { + console.log('delay'); + }); + await worker.waitUntilReady(); + + const failing = new Promise(resolve => { + worker.on('failed', async job => { + this.clock.tick(delay + 10); + delay = delay * 2; + + if (job!.attemptsMade === 10) { + resolve(); + } + }); + }); + + await queue.add( + 'test', + { foo: 'bar' }, + { + attempts: 10, + backoff: { + type: 'exponential', + delay, + }, + }, + ); + + worker.run(); + + await failing; + + await worker.close(); + delayStub.restore(); + }); + }); + + describe('when endDate is not greater than current timestamp', () => { + it('throws an error', async function () { + await expect( + queue.upsertJobScheduler('test-scheduler', { + endDate: Date.now() - 1000, + every: 100, + }), + ).to.be.rejectedWith('End date must be greater than current timestamp'); + }); + }); + + it('it should stop repeating after endDate', async function () { + const every = 100; + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const worker = new Worker( + queueName, + async () => { + this.clock.tick(every); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + await worker.waitUntilReady(); + + let processed = 0; + const completing = new Promise(resolve => { + worker.on('completed', async () => { + processed++; + if (processed === 10) { + resolve(); + } + }); + }); + + const job = await queue.upsertJobScheduler('test-scheduler', { + endDate: Date.now() + 1000, + every: 100, + }); + + expect(job!.repeatJobKey).to.not.be.undefined; + + this.clock.tick(every + 1); + + worker.run(); + + await completing; + + const delayed = await queue.getDelayed(); + + expect(delayed).to.have.length(0); + expect(processed).to.be.equal(10); + + await worker.close(); + delayStub.restore(); + }); + + describe('when jobs have the same cron pattern and different job scheduler id', function () { + it('should create multiple jobs', async function () { + const cron = '*/10 * * * * *'; + + await Promise.all([ + queue.upsertJobScheduler('test-scheduler1', { pattern: cron }), + queue.upsertJobScheduler('test-scheduler2', { pattern: cron }), + queue.upsertJobScheduler('test-scheduler3', { pattern: cron }), + ]); + + const count = await queue.count(); + expect(count).to.be.eql(3); + + const delayed = await queue.getDelayed(); + expect(delayed).to.have.length(3); + }); + }); + + describe('when job schedulers have same id and different every pattern', function () { + it('should create only one job scheduler', async function () { + await Promise.all([ + queue.upsertJobScheduler('test-scheduler1', { every: 1000 }), + queue.upsertJobScheduler('test-scheduler1', { every: 2000 }), + queue.upsertJobScheduler('test-scheduler1', { every: 3000 }), + ]); + + const repeatableJobs = await queue.getJobSchedulers(); + expect(repeatableJobs.length).to.be.eql(1); + }); + }); + + it('should create job schedulers with different cron patterns', async function () { + const crons = [ + '10 * * * * *', + '2 10 * * * *', + '1 * * 5 * *', + '2 * * 4 * *', + ]; + + await Promise.all([ + queue.upsertJobScheduler('first', { + pattern: crons[0], + endDate: 12345, + }), + queue.upsertJobScheduler('second', { + pattern: crons[1], + endDate: 610000, + }), + queue.upsertJobScheduler('third', { + pattern: crons[2], + tz: 'Africa/Abidjan', + }), + queue.upsertJobScheduler('fourth', { + pattern: crons[3], + tz: 'Africa/Accra', + }), + queue.upsertJobScheduler('fifth', { + every: 5000, + tz: 'Europa/Copenhaguen', + }), + ]); + const count = await repeat.getRepeatableCount(); + expect(count).to.be.eql(5); + + const jobs = await repeat.getRepeatableJobs(0, -1, true); + + expect(jobs) + .to.be.and.an('array') + .and.have.length(5) + .and.to.deep.include({ + key: 'fifth', + name: 'fifth', + endDate: null, + tz: 'Europa/Copenhaguen', + pattern: null, + every: '5000', + next: 5000, + }) + .and.to.deep.include({ + key: 'first', + name: 'first', + endDate: 12345, + tz: null, + pattern: '10 * * * * *', + every: null, + next: 10000, + }) + .and.to.deep.include({ + key: 'second', + name: 'second', + endDate: 610000, + tz: null, + pattern: '2 10 * * * *', + every: null, + next: 602000, + }) + .and.to.deep.include({ + key: 'fourth', + name: 'fourth', + endDate: null, + tz: 'Africa/Accra', + pattern: '2 * * 4 * *', + every: null, + next: 259202000, + }) + .and.to.deep.include({ + key: 'third', + name: 'third', + endDate: null, + tz: 'Africa/Abidjan', + pattern: '1 * * 5 * *', + every: null, + next: 345601000, + }); + }); + + it('should repeat every 2 seconds', async function () { + this.timeout(10000); + + const nextTick = 2 * ONE_SECOND + 100; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + + await queue.upsertJobScheduler( + 'test', + { pattern: '*/2 * * * * *' }, + { data: { foo: 'bar' } }, + ); + + this.clock.tick(nextTick); + + let prev: any; + let counter = 0; + + const completing = new Promise(resolve => { + worker.on('completed', async job => { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + if (counter == 5) { + resolve(); + } + }); + }); + + worker.run(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + + it('should repeat every 2 seconds with startDate in future', async function () { + this.timeout(10000); + + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND + 500; + const delay = 5 * ONE_SECOND + 500; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + await queue.upsertJobScheduler( + 'test', + { + pattern: '*/2 * * * * *', + startDate: new Date('2017-02-07 9:24:05'), + }, + { data: { foo: 'bar' } }, + ); + + this.clock.tick(nextTick + delay); + + let prev: Job; + let counter = 0; + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + if (counter == 5) { + resolve(); + } + }); + }); + + worker.run(); + + await completing; + + await worker.close(); + delayStub.restore(); + }); + + it('should repeat every 2 seconds with startDate in past', async function () { + this.timeout(10000); + + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND + 500; + const delay = 5 * ONE_SECOND + 500; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + await queue.upsertJobScheduler( + 'repeat', + { + pattern: '*/2 * * * * *', + startDate: new Date('2017-02-07 9:22:00'), + }, + { data: { foo: 'bar' } }, + ); + + this.clock.tick(nextTick + delay); + + let prev: Job; + let counter = 0; + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + if (counter == 5) { + resolve(); + } + }); + }); + + worker.run(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + + describe('when using removeOnComplete', function () { + it('should remove repeated job', async function () { + this.timeout(10000); + const queueName2 = `test-${v4()}`; + const queue2 = new Queue(queueName2, { + connection, + prefix, + defaultJobOptions: { + removeOnComplete: true, + }, + }); + + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND + 500; + const delay = 5 * ONE_SECOND + 500; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + await queue.upsertJobScheduler( + 'test', + { + pattern: '*/2 * * * * *', + startDate: new Date('2017-02-07 9:24:05'), + }, + { data: { foo: 'bar' } }, + ); + + this.clock.tick(nextTick + delay); + + let prev: Job; + let counter = 0; + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + if (counter == 5) { + const counts = await queue2.getJobCounts('completed'); + expect(counts.completed).to.be.equal(0); + resolve(); + } + }); + }); + + worker.run(); + + await completing; + + await queue2.close(); + await worker.close(); + await removeAllQueueData(new IORedis(redisHost), queueName2); + delayStub.restore(); + }); + }); + + describe('when custom cron strategy is provided', function () { + it('should repeat every 2 seconds', async function () { + this.timeout(15000); + const settings = { + repeatStrategy: (millis, opts) => { + const currentDate = + opts.startDate && new Date(opts.startDate) > new Date(millis) + ? new Date(opts.startDate) + : new Date(millis); + const rrule = rrulestr(opts.pattern); + if (rrule.origOptions.count && !rrule.origOptions.dtstart) { + throw new Error('DTSTART must be defined to use COUNT with rrule'); + } + + const next_occurrence = rrule.after(currentDate, false); + return next_occurrence?.getTime(); + }, + }; + const currentQueue = new Queue(queueName, { + connection, + prefix, + settings, + }); + + const nextTick = 2 * ONE_SECOND + 100; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { connection, prefix, settings }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + + await currentQueue.upsertJobScheduler( + 'test', + { + pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO', + }, + { data: { foo: 'bar' } }, + ); + + this.clock.tick(nextTick); + + let prev: any; + let counter = 0; + + const completing = new Promise(resolve => { + worker.on('completed', async job => { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + if (counter == 5) { + resolve(); + } + }); + }); + + await completing; + await currentQueue.close(); + await worker.close(); + delayStub.restore(); + }); + + describe('when differentiating strategy by job name', function () { + it('should repeat every 2 seconds', async function () { + this.timeout(10000); + const settings = { + repeatStrategy: (millis, opts, name) => { + if (name === 'rrule') { + const currentDate = + opts.startDate && new Date(opts.startDate) > new Date(millis) + ? new Date(opts.startDate) + : new Date(millis); + const rrule = rrulestr(opts.pattern); + if (rrule.origOptions.count && !rrule.origOptions.dtstart) { + throw new Error( + 'DTSTART must be defined to use COUNT with rrule', + ); + } + + const next_occurrence = rrule.after(currentDate, false); + return next_occurrence?.getTime(); + } else { + return getNextMillis(millis, opts); + } + }, + }; + const currentQueue = new Queue(queueName, { + connection, + prefix, + settings, + }); + + const nextTick = 2 * ONE_SECOND + 100; + + const worker = new Worker( + queueName, + async job => { + this.clock.tick(nextTick); + + if (job.opts.repeat!.count == 5) { + const removed = await queue.removeJobScheduler('rrule'); + expect(removed).to.be.true; + } + }, + { connection, prefix, settings }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + + const repeat = { + pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO', + }; + await currentQueue.upsertJobScheduler('rrule', repeat, { + name: 'rrule', + }); + + this.clock.tick(nextTick); + + let prev: any; + let counter = 0; + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + if (counter == 5) { + resolve(); + } + } catch (error) { + reject(error); + } + }); + }); + + await completing; + + let prev2: any; + let counter2 = 0; + + const completing2 = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev2) { + expect(prev2.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev2.timestamp).to.be.gte(2000); + } + prev2 = job; + counter2++; + if (counter2 == 5) { + resolve(); + } + } catch (error) { + reject(error); + } + }); + }); + + await queue.upsertJobScheduler( + 'rrule', + { + pattern: '*/2 * * * * *', + startDate: new Date('2017-02-07 9:24:05'), + }, + { + name: 'standard', + }, + ); + + this.clock.tick(nextTick); + + await completing2; + + await currentQueue.close(); + await worker.close(); + delayStub.restore(); + }); + }); + }); + + it('should repeat every 2 seconds and start immediately', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + let prev: Job; + let counter = 0; + + const completing = new Promise(resolve => { + worker.on('completed', async job => { + if (prev && counter === 1) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(100); + } else if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + if (counter === 5) { + resolve(); + } + }); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + every: 2000, + immediately: true, + }, + { data: { foo: 'bar' } }, + ); + + this.clock.tick(100); + + await completing; + await worker.close(); + delayStub.restore(); + }); + + it('should repeat once a day for 5 days and start immediately using endDate', async function () { + this.timeout(8000); + + const date = new Date('2017-05-05 01:01:00'); + this.clock.setSystemTime(date); + + const nextTick = ONE_DAY + 10 * ONE_SECOND; + const delay = 5 * ONE_SECOND + 500; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { + autorun: false, + connection, + prefix, + skipStalledCheck: true, + skipLockRenewal: true, + }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => { + console.log('delay'); + }); + + let prev: Job; + let counter = 0; + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + if (counter === 1) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(delay); + } else if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(ONE_DAY); + } + prev = job; + + counter++; + if (counter == 5) { + resolve(); + } + }); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + pattern: '0 1 * * *', + immediately: true, + endDate: new Date('2017-05-10 13:13:00'), + }, + { data: { foo: 'bar' } }, + ); + this.clock.tick(delay); + + worker.run(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + + it('should repeat once a day for 5 days and start immediately', async function () { + this.timeout(8000); + + const date = new Date('2017-05-05 01:01:00'); + this.clock.setSystemTime(date); + + const nextTick = ONE_DAY + 10 * ONE_SECOND; + const delay = 5 * ONE_SECOND + 500; + + let counter = 0; + const worker = new Worker( + queueName, + async () => { + if (counter === 0) { + this.clock.tick(6 * ONE_HOUR); + } else { + this.clock.tick(nextTick); + } + }, + { + autorun: false, + connection, + prefix, + skipStalledCheck: true, + skipLockRenewal: true, + }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => { + console.log('delay'); + }); + + let prev: Job; + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + if (counter === 1) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.processedOn! - prev.timestamp).to.be.gte(delay); + } else if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.processedOn! - prev.timestamp).to.be.gte(ONE_DAY); + } + prev = job; + + counter++; + if (counter == 5) { + resolve(); + } + }); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + pattern: '0 0 7 * * *', + immediately: true, + }, + { data: { foo: 'bar' } }, + ); + this.clock.tick(delay); + + worker.run(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + + it('should repeat once a day for 5 days', async function () { + this.timeout(8000); + + const date = new Date('2017-05-05 13:12:00'); + this.clock.setSystemTime(date); + + const nextTick = ONE_DAY + 10 * ONE_SECOND; + const delay = 5 * ONE_SECOND + 500; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { + autorun: false, + connection, + prefix, + skipStalledCheck: true, + skipLockRenewal: true, + }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => { + console.log('delay'); + }); + + let prev: Job; + let counter = 0; + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(ONE_DAY); + } + prev = job; + + counter++; + if (counter == 5) { + resolve(); + } + } catch (error) { + console.log(error); + } + }); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + pattern: '0 1 * * *', + endDate: new Date('2017-05-10 01:00:00'), + }, + { data: { foo: 'bar' } }, + ); + + this.clock.tick(nextTick + delay); + + worker.run(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + + describe('when utc option is provided', function () { + it('repeats once a day for 5 days', async function () { + this.timeout(8000); + + const date = new Date('2017-05-05 13:12:00'); + this.clock.setSystemTime(date); + + const nextTick = ONE_DAY + 10 * ONE_SECOND; + const delay = 5 * ONE_SECOND + 500; + + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => { + console.log('delay'); + }); + + let prev: Job; + let counter = 0; + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(ONE_DAY); + } + prev = job; + + counter++; + if (counter == 5) { + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + await queue.upsertJobScheduler('repeat', { + pattern: '0 1 * * *', + endDate: new Date('2017-05-10 13:13:00'), + tz: 'Europe/Athens', + utc: true, + }); + this.clock.tick(nextTick + delay); + + worker.run(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + }); + + it('should repeat 7:th day every month at 9:25', async function () { + this.timeout(15000); + + const date = new Date('2017-02-02 7:21:42'); + this.clock.setSystemTime(date); + + const nextTick = () => { + const now = moment(); + const nextMonth = moment().add(1, 'months'); + this.clock.tick(nextMonth - now); + }; + + const worker = new Worker( + queueName, + async () => { + nextTick(); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + let counter = 25; + let prev: Job; + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + const diff = moment(job.timestamp).diff( + moment(prev.timestamp), + 'months', + true, + ); + expect(diff).to.be.gte(1); + } + prev = job; + + counter--; + if (counter == 0) { + resolve(); + } + } catch (error) { + console.log(error); + reject(error); + } + }); + }); + + worker.run(); + + await queue.upsertJobScheduler('repeat', { pattern: '* 25 9 7 * *' }); + nextTick(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + + describe('when 2 jobs with the same options are added', function () { + it('creates only one job', async function () { + const repeatOpts = { + pattern: '0 1 * * *', + }; + + const p1 = queue.upsertJobScheduler('test', repeatOpts); + const p2 = queue.upsertJobScheduler('test', repeatOpts); + + const jobs = await Promise.all([p1, p2]); + const configs = await repeat.getRepeatableJobs(0, -1, true); + + const count = await queue.count(); + + expect(count).to.be.equal(1); + expect(configs).to.have.length(1); + expect(jobs.length).to.be.eql(2); + expect(jobs[0]!.id).to.be.eql(jobs[1]!.id); + }); + }); + + describe('when repeatable job is promoted', function () { + it('keeps one repeatable and one delayed after being processed', async function () { + const repeatOpts = { + pattern: '0 * 1 * *', + }; + + const worker = new Worker(queueName, async () => {}, { + connection, + prefix, + }); + + const completing = new Promise(resolve => { + worker.on('completed', () => { + resolve(); + }); + }); + + const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.equal(1); + + await repeatableJob!.promote(); + await completing; + + const delayedCount2 = await queue.getDelayedCount(); + expect(delayedCount2).to.be.equal(1); + + const configs = await repeat.getRepeatableJobs(0, -1, true); + + expect(delayedCount).to.be.equal(1); + + const count = await queue.count(); + + expect(count).to.be.equal(1); + expect(configs).to.have.length(1); + await worker.close(); + }); + }); + + it('should allow removing a named repeatable job', async function () { + const numJobs = 3; + const date = new Date('2017-02-07 9:24:00'); + let prev: Job; + let counter = 0; + + this.clock.setSystemTime(date); + + const nextTick = ONE_SECOND + 1; + let processor; + + const processing = new Promise((resolve, reject) => { + processor = async () => { + counter++; + try { + if (counter == numJobs) { + const removed = await queue.removeJobScheduler('remove'); + //expect(removed).to.be.true; + this.clock.tick(nextTick); + //const delayed = await queue.getDelayed(); + //expect(delayed).to.be.empty; + resolve(); + } else if (counter > numJobs) { + reject(Error(`should not repeat more than ${numJobs} times`)); + } + } catch (err) { + reject(err); + } + }; + }); + + const worker = new Worker(queueName, processor, { connection, prefix }); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + await queue.upsertJobScheduler('remove', { pattern: '*/1 * * * * *' }); + this.clock.tick(nextTick); + + worker.on('completed', job => { + this.clock.tick(nextTick); + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(ONE_SECOND); + } + prev = job; + }); + + await processing; + await worker.close(); + delayStub.restore(); + }); + + it('should be able to remove repeatable jobs by key', async () => { + const client = await queue.client; + const repeat = { pattern: '*/2 * * * * *' }; + + const createdJob = await queue.upsertJobScheduler('remove', repeat); + const delayedCount1 = await queue.getJobCountByTypes('delayed'); + expect(delayedCount1).to.be.equal(1); + const job = await queue.getJob(createdJob!.id!); + const repeatableJobs = await queue.getRepeatableJobs(); + expect(repeatableJobs).to.have.length(1); + const existBeforeRemoval = await client.exists( + `${prefix}:${queue.name}:repeat:${createdJob!.repeatJobKey!}`, + ); + expect(existBeforeRemoval).to.be.equal(1); + const removed = await queue.removeRepeatableByKey( + createdJob!.repeatJobKey!, + ); + const delayedCount = await queue.getJobCountByTypes('delayed'); + expect(delayedCount).to.be.equal(0); + const existAfterRemoval = await client.exists( + `${prefix}:${queue.name}:repeat:${createdJob!.repeatJobKey!}`, + ); + expect(existAfterRemoval).to.be.equal(0); + expect(job!.repeatJobKey).to.not.be.undefined; + expect(removed).to.be.true; + const repeatableJobsAfterRemove = await queue.getRepeatableJobs(); + expect(repeatableJobsAfterRemove).to.have.length(0); + }); + + describe('when repeatable job does not exist', function () { + it('returns false', async () => { + const repeat = { pattern: '*/2 * * * * *' }; + + await queue.upsertJobScheduler('remove', repeat); + const repeatableJobs = await queue.getJobSchedulers(); + expect(repeatableJobs).to.have.length(1); + const removed = await queue.removeJobScheduler(repeatableJobs[0].key); + expect(removed).to.be.true; + const removed2 = await queue.removeJobScheduler(repeatableJobs[0].key); + expect(removed2).to.be.false; + }); + }); + + it('should keep only one delayed job if adding a new repeatable job with the same id', async function () { + const date = new Date('2017-02-07 9:24:00'); + const key = 'mykey'; + + this.clock.setSystemTime(date); + + const nextTick = 2 * ONE_SECOND; + + await queue.upsertJobScheduler(key, { + every: 10_000, + }); + + this.clock.tick(nextTick); + + let jobs = await queue.getJobSchedulers(); + expect(jobs).to.have.length(1); + + let delayedJobs = await queue.getDelayed(); + expect(delayedJobs).to.have.length(1); + + await queue.upsertJobScheduler(key, { + every: 35_160, + }); + + jobs = await queue.getJobSchedulers(); + expect(jobs).to.have.length(1); + + delayedJobs = await queue.getDelayed(); + expect(delayedJobs).to.have.length(1); + }); + + // This test is flaky and too complex we need something simpler that tests the same thing + it.skip('should not re-add a repeatable job after it has been removed', async function () { + const repeat = await queue.repeat; + + let worker: Worker; + const jobId = 'xxxx'; + const date = new Date('2017-02-07 9:24:00'); + const nextTick = 2 * ONE_SECOND + 100; + const addNextRepeatableJob = repeat.updateRepeatableJob; + this.clock.setSystemTime(date); + + const repeatOpts = { pattern: '*/2 * * * * *' }; + + const afterRemoved = new Promise(async resolve => { + worker = new Worker( + queueName, + async () => { + const repeatWorker = await worker.repeat; + (repeatWorker.updateRepeatableJob) = async ( + ...args: [string, unknown, JobsOptions, boolean?] + ) => { + // In order to simulate race condition + // Make removeRepeatables happen any time after a moveToX is called + await queue.removeRepeatable('test', repeatOpts, jobId); + + // addNextRepeatableJob will now re-add the removed repeatable + const result = await addNextRepeatableJob.apply(repeat, args); + resolve(); + return result; + }; + }, + { connection, prefix }, + ); + + worker.on('completed', () => { + this.clock.tick(nextTick); + }); + }); + + await queue.add('test', { foo: 'bar' }, { repeat: repeatOpts, jobId }); + + this.clock.tick(nextTick); + + await afterRemoved; + + const jobs = await queue.getRepeatableJobs(); + // Repeatable job was recreated + expect(jobs.length).to.eql(0); + + await worker!.close(); + }); + + it('should allow adding a repeatable job after removing it', async function () { + const repeat = { + pattern: '*/5 * * * *', + }; + + const worker = new Worker(queueName, NoopProc, { connection, prefix }); + await worker.waitUntilReady(); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + await queue.upsertJobScheduler('myTestJob', repeat, { + data: { + data: '2', + }, + }); + let delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(1); + + await new Promise(async (resolve, reject) => { + queueEvents.on('removed', async ({ jobId, prev }) => { + try { + expect(jobId).to.be.equal(delayed[0].id); + expect(prev).to.be.equal('delayed'); + resolve(); + } catch (err) { + reject(err); + } + }); + + try { + await queue.removeJobScheduler('myTestJob'); + } catch (err) { + reject(err); + } + }); + + delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(0); + + await queue.upsertJobScheduler('myTestJob', repeat, { + data: { data: '2' }, + }); + + delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(1); + + // We need to force close in this case, as closing is too slow in Dragonfly. + await worker.close(true); + delayStub.restore(); + }).timeout(8000); + + it('should not allow to remove a delayed job if it belongs to a repeatable job', async function () { + const repeat = { + every: 1000, + }; + + await queue.upsertJobScheduler('myTestJob', repeat); + + // Get delayed jobs + const delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(1); + + // Try to remove the delayed job + const job = delayed[0]; + try { + await job.remove(); + const delayed = await queue.getDelayed(); + console.log({ delayed }); + expect.fail( + 'Should not be able to remove a delayed job that belongs to a repeatable job', + ); + } catch (err) { + const expectedErrMessage = finishedErrors({ + code: ErrorCode.JobBelongsToJobScheduler, + jobId: job.id, + command: 'remove', + }).message; + + expect(err.message).to.be.eql(expectedErrMessage); + } + }); + + it('should not remove delayed jobs if they belong to a repeatable job when using drain', async function () { + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); + await queue.add('test', { foo: 'bar' }, { delay: 1000 }); + + // Get delayed jobs + let delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(2); + + // Drain the queue + await queue.drain(true); + + delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(1); + + expect(delayed[0].name).to.be.eql('myTestJob'); + }); + + it('should not remove delayed jobs if they belong to a repeatable job when using clean', async function () { + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); + await queue.add('test', { foo: 'bar' }, { delay: 1000 }); + + // Get delayed jobs + let delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(2); + + // Clean delayed jobs + await queue.clean(0, 100, 'delayed'); + + delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(1); + + expect(delayed[0].name).to.be.eql('myTestJob'); + }); + + it("should keep one delayed job if updating a repeatable job's every option", async function () { + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); + await queue.upsertJobScheduler('myTestJob', { every: 4000 }); + await queue.upsertJobScheduler('myTestJob', { every: 5000 }); + + // Get delayed jobs + const delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(1); + }); + + it('should not repeat more than 5 times', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = ONE_SECOND + 500; + + const worker = new Worker(queueName, NoopProc, { connection, prefix }); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + await queue.upsertJobScheduler('repeat', { + limit: 5, + pattern: '*/1 * * * * *', + }); + this.clock.tick(nextTick); + + let counter = 0; + + const completing = new Promise((resolve, reject) => { + worker.on('completed', () => { + this.clock.tick(nextTick); + counter++; + if (counter == 5) { + resolve(); + } else if (counter > 5) { + reject(Error('should not repeat more than 5 times')); + } + }); + }); + + await completing; + await worker.close(); + delayStub.restore(); + }); + + // This test is not releated to repeatable jobs + it('should processes delayed jobs by priority', async function () { + let currentPriority = 1; + const nextTick = 1000; + + let processor; + this.clock.setSystemTime(new Date('2017-02-02 7:21:42')); + + const processing = new Promise((resolve, reject) => { + processor = async (job: Job) => { + try { + expect(job.id).to.be.ok; + expect(job.data.p).to.be.eql(currentPriority++); + } catch (err) { + reject(err); + } + + if (currentPriority > 3) { + resolve(); + } + }; + }); + + await Promise.all([ + queue.add('test', { p: 1 }, { priority: 1, delay: nextTick * 3 }), + queue.add('test', { p: 2 }, { priority: 2, delay: nextTick * 2 }), + queue.add('test', { p: 3 }, { priority: 3, delay: nextTick }), + ]); + + this.clock.tick(nextTick * 3 + 100); + + const worker = new Worker(queueName, processor, { connection, prefix }); + await worker.waitUntilReady(); + + await processing; + + await worker.close(); + }); + + it('should use ".every" as a valid interval', async function () { + const interval = ONE_SECOND * 2; + const date = new Date('2017-02-07 9:24:00'); + + this.clock.setSystemTime(date); + + const nextTick = ONE_SECOND * 2 + 500; + + await queue.upsertJobScheduler( + 'repeat m', + { every: interval }, + { data: { type: 'm' } }, + ); + await queue.upsertJobScheduler( + 'repeat s', + { every: interval }, + { data: { type: 's' } }, + ); + this.clock.tick(nextTick); + + const worker = new Worker(queueName, async () => {}, { + connection, + prefix, + }); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + await worker.waitUntilReady(); + + let prevType: string; + let counter = 0; + + const completing = new Promise(resolve => { + worker.on('completed', job => { + this.clock.tick(nextTick); + if (prevType) { + expect(prevType).to.not.be.eql(job.data.type); + } + prevType = job.data.type; + counter++; + if (counter == 20) { + resolve(); + } + }); + }); + + await completing; + await worker.close(); + delayStub.restore(); + }); + + it('should throw an error when using .pattern and .every simultaneously', async function () { + await expect( + queue.upsertJobScheduler('repeat', { + every: 5000, + pattern: '* /1 * * * * *', + }), + ).to.be.rejectedWith( + 'Both .pattern and .every options are defined for this repeatable job', + ); + }); + + it('should emit a waiting event when adding a repeatable job to the waiting list', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = 1 * ONE_SECOND + 500; + const jobSchedulerId = 'test'; + + const worker = new Worker(queueName, async job => {}, { + connection, + prefix, + }); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + + const waiting = new Promise((resolve, reject) => { + queueEvents.on('waiting', function ({ jobId }) { + try { + expect(jobId).to.be.equal( + `repeat:${jobSchedulerId}:${date.getTime() + 1 * ONE_SECOND}`, + ); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + await queue.upsertJobScheduler(jobSchedulerId, { + pattern: '*/1 * * * * *', + }); + this.clock.tick(nextTick); + + await waiting; + await worker.close(); + delayStub.restore(); + }); + + it('should have the right count value', async function () { + await queue.upsertJobScheduler('test', { every: 1000 }); + this.clock.tick(ONE_SECOND + 100); + + let processor; + const processing = new Promise((resolve, reject) => { + processor = async (job: Job) => { + if (job.opts.repeat!.count === 1) { + resolve(); + } else { + reject(new Error('repeatable job got the wrong repeat count')); + } + }; + }); + + const worker = new Worker(queueName, processor, { connection, prefix }); + + await processing; + await worker.close(); + }); +}); diff --git a/tests/test_repeat.ts b/tests/test_repeat.ts index 155b574610..6d2f846ee4 100644 --- a/tests/test_repeat.ts +++ b/tests/test_repeat.ts @@ -13,13 +13,14 @@ import { Worker, } from '../src/classes'; import { JobsOptions } from '../src/types'; -import { removeAllQueueData } from '../src/utils'; +import { removeAllQueueData, delay, finishedErrors } from '../src/utils'; import { createRepeatableJobKey, extractRepeatableJobChecksumFromRedisKey, getRepeatableJobKeyPrefix, getRepeatJobIdCheckum, } from './utils/repeat_utils'; +import { ErrorCode } from '../src/enums'; const moment = require('moment'); @@ -248,7 +249,7 @@ describe('repeat', function () { let jobs = await repeat.getRepeatableJobs(0, -1, true); jobs = await jobs.sort(function (a, b) { - return crons.indexOf(a.cron) - crons.indexOf(b.cron); + return crons.indexOf(a.pattern!) - crons.indexOf(b.pattern!); }); expect(jobs) .to.be.and.an('array') @@ -1766,6 +1767,37 @@ describe('repeat', function () { delayStub.restore(); }).timeout(8000); + it('should not allow to remove a delayed job if it belongs to a repeatable job', async function () { + const repeat = { + every: 1000, + }; + + await queue.add('myTestJob', { data: 'foo' }, { repeat }); + + // Get delayed jobs + const delayed = await queue.getDelayed(); + expect(delayed.length).to.be.eql(1); + + // Try to remove the delayed job + const job = delayed[0]; + try { + await job.remove(); + const delayed = await queue.getDelayed(); + console.log({ delayed }); + expect.fail( + 'Should not be able to remove a delayed job that belongs to a repeatable job', + ); + } catch (err) { + const expectedErrMessage = finishedErrors({ + code: ErrorCode.JobBelongsToJobScheduler, + jobId: job.id, + command: 'remove', + }).message; + + expect(err.message).to.be.eql(expectedErrMessage); + } + }); + it('should not repeat more than 5 times', async function () { const date = new Date('2017-02-07 9:24:00'); this.clock.setSystemTime(date);