Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preferred pattern to limit jobs #51

Open
hardcodet opened this issue Jun 27, 2023 · 21 comments
Open

Preferred pattern to limit jobs #51

hardcodet opened this issue Jun 27, 2023 · 21 comments
Assignees
Labels
enhancement New feature or request

Comments

@hardcodet
Copy link

hardcodet commented Jun 27, 2023

Hello Bull team

Edit: I think I found the answer here - it's actually a documented pattern :)
https://docs.bullmq.io/patterns/throttle-jobs

Seems there be dragons though, since I would have to activate the removeOnComplete flags, which might affect the whole system in a bad way. I would like to keep the system's default behaviour, but be able to run an maintenance job over and over again (just in rate-limited way).

--

I have a building with lots of IoT devices (1:n), and on the bulding site, we maintain an "overall" health status flag. This means that if a device changes status, we want to re-evaluate the overall status and store it with the building.

Now, we could trigger a bull job every time a device changes status, but I would like to limit this a bit: if let's say 5 devices change status in a short amount of time, i would like to run this only once. So basically, what I would like to do:

  • if i have a device status change, enqueue a job to run within the next xx seconds
  • if i have another status change before that job runs, that should be a no-op, otherwise, a new job should be scheduled
  • preferably, my code should not have to worry about this :)

Looking at the docs, I was wondering if this is feasible:

  • on the first device event, I schedule a job with the ID of the building, and a delay of xx seconds
  • if another device event occurs, I schedule the job again with the same job ID and the same delay of xx seconds

If I got the docs right, that second job should not run if the first job didn't run yet (since the job ID already exists). But I guess if the first job already ran, that job ID doesn't exist anymore, and the the second job would be scheduled for execution.

The question is whether this is a dirty hack or a reasonable pattern :)

Thanks for your advice!

@dginzbourg
Copy link

It isn't clear when you say "if the first job already ran", until job has finished it still exists. But once it has finished then new job with the same ID will be scheduled. btw, from the high level it seems that there might be race condition when your job "almost finished" (assume is at the end of the processing) and there was status change event. If there's some work to be done on the "status change" then it'd be missed (unprocessed).

@hardcodet
Copy link
Author

But once it has finished then new job with the same ID will be scheduled

The docs on throttling imply that a finished job may still count as existing if the removeOnComplete flag is not used (as it would run again if it was used:

Hint: Be careful if using removeOnComplete/removeOnFailed options, since a removed job will not count as existing and a new job with the same job ID would indeed be added to the queue.

However, the race condition is indeed a problem. If Bull wouldn't allow me to handle this, I guess I need to build smth myself, shouldn't be too hard. Thanks for the quick response!

@dginzbourg
Copy link

Actually I was referring to the race condition in your logic (at least what I understood from your description). I'm not BullMQ developer, but using it quite for a while. I don't remember how removeOnComplete == false works for unique ID. BTW, you might take a look at group or general rate-limiting as another/additional mechanism to control number of running jobs

@manast
Copy link
Contributor

manast commented Jun 27, 2023

@hardcodet you can choose to only enable removeOnComplete for those jobs where you need to activate throttling. One case to keep in mind is that if a job fails, the failed job that is still in the failed set will prevent new jobs to be added to the queue (if using the same ID which is the basic principle of the whole throttle mechanism).

Regarding the edge case, yes you are right, in this case, you would like to only ignore the jobs unless the job has already started, in that case, you would like to accept the new job. I would need to think more about this case, it probably needs some internal support in BullMQ. The problem is that it is not possible to have two jobs with the same ID, so something else must be used to identify the same type of jobs so that they can be throttled. Maybe this could be something that can be added to groups, like you have a throttle setting for groups which will not allow you to have more than 1 job in a group but as long as the job starts to be processed it will accept a new job for that group. 🤔

@manast manast added the enhancement New feature or request label Jun 27, 2023
@manast
Copy link
Contributor

manast commented Jun 27, 2023

Something along these lines maybe works:

  • introduce a new "max" setting for groups that prevent a group from having more than max jobs in the group.
  • set a rate-limit on the groups that reflects the desired throttle, for example, 1 job per x seconds in your case.

@hardcodet
Copy link
Author

hardcodet commented Jun 27, 2023

Thanks for the feedback, guys, and thanks for willing to put in some custom work, @manast!

Regarding the groups, aligned with your proposal, I think this could work?

  • using a group (ID) instead of custom job IDs. The group has a max concurrency of 1, as you said
  • before scheduling a new job, I'd check the amount of jobs in the group
  • if the job size is 0 or 1, I add a new job

Not sure how expensive querying for jobs is, but if possible, it will still be much cheaper than triggering a job every time. And in case of race conditions, I'd just run a job too many, which also wouldn't be the end of the world.

@manast
Copy link
Contributor

manast commented Jun 28, 2023

@hardcodet Yes, even though the query for group count is fast it will introduce race conditions so the best is that we introduce a new option that limits the number of jobs in a group, and with that new feature and the rate-limit you should be able to fulfill your requirement in a robust way.

@hardcodet
Copy link
Author

@manast I'll gladly take that offer then - thanks! :)

@hardcodet
Copy link
Author

Not sure a race condition would be much of an issue though:

  • if the # of jobs is 0, I add a job. Even if that would just change to 1 in a millisecond, I would have just one redundant run
  • if the # of jobs is 1, I add a job. If it would drop to 0, I still would want to add it. If it would go to two (very very corner case), I still would be just redundant
  • if the # of jobs is > 1, then the 2nd job isn't even running yet. And if it would run in a millisecond, that's exactly what I want.

But as said - i'll gladly use a built-in feature :)

@dginzbourg
Copy link

@hardcodet Yes, even though the query for group count is fast it will introduce race conditions so the best is that we introduce a new option that limits the number of jobs in a group, and with that new feature and the rate-limit you should be able to fulfill your requirement in a robust way.

@manast when do you plan to add limit on jobs in a group? will it return|throw error on addJob()? I find that it'll be beneficial for me as well. Currently, we have custom logic and get the groupsize before each job submission to specific group and it seems that it affects Redis performance (not confirmed) combining with other load. And even if the problem is on our side, still very valuable feature. Since everything is based on Redis application has to protect itself from getting to 100% Redis memory usage.

@manast
Copy link
Contributor

manast commented Jul 14, 2023

I think this could be one of the first features after summer vacation so in a couple of weeks. Probably returning an exception would be the solution, as we do not have any other means to signal this kind of "errors" without introducing new APIs.

@manast manast self-assigned this Jul 16, 2023
@dginzbourg
Copy link

exception sounds good. It'd be beneficial to add max jobs on queue level in addition to max per group.

@manast
Copy link
Contributor

manast commented Jul 17, 2023

A problem with throwing an exception is with addBulk, where you could add several jobs, some of which could exceed the limit while others do not. 🤔

@dginzbourg
Copy link

you could return array of statuses. if it's easy support either behavior, maybe add an option - partial|full addition. in case partial return statuses array or number of submitted jobs (assuming it's submitted sequentially). in case of error, return error status. exception is not a requirement, status code|array should work fine

@hardcodet
Copy link
Author

While I do not have insight into the application architecture (and the related challenges), I think an exception would be semantically problematic. After all, this is rather status information, and an absolutely valid flow, no?

@manast
Copy link
Contributor

manast commented Jul 17, 2023

The problem is that anything different than an exception would imply a change in the API. The exception may not be the most elegant solution but it would be easy to use for add at least. For addBulk I can keep the feature disabled for now. In fact I have an implementation almost ready now.

@manast
Copy link
Contributor

manast commented Jul 18, 2023

Latest version of BullMQ-Pro supports limiting the size of the groups: https://docs.bullmq.io/bullmq-pro/groups/max-group-size
Let me know how it goes.

@hardcodet
Copy link
Author

hardcodet commented Aug 6, 2023

Hi Manuel
Sorry for the late delay, only just got back to this task. The feature works like a charm for me. Minor thing, not sure if that's by design or a potential issue:

  • I created a dummy job that just sleeps for two seconds. The groups always runs 1 job at once, so I force sequential executions.
  • Scheduling two jobs at once (first one is running already also runs the second job (which is great), only gives the exception on the 3rd job. So all good!

However, if I add a delay to the 2nd and 3rd job (let's say 500ms), all three jobs execute synchronously even though the first job is running already (and will be running when job 2 and 3 will become active).

  • I think it's a good thing that it doesn't result in an error after 500ms (after all, the job has been enqueued with the system, so I would indeed expect it to run
  • It's debatable whether it should throw the exception during job creation. Yes, there may be no job running in 500ms, but that doesn't change the fact that a job is running now. Potentially, since Bull can't predict the future, setting the max group size and a delay at the same time could considered mutually exclusive features. I'll probably will enforce this in my façade.
  • Interestingly enough, if job 1 and 2 are scheduled without delay, then job 3, scheduled with a delay will fail.

So bottom line: Delays may be a corner case, they don't play a role for me though. Thanks for the quick turnaround, and sorry again for the delayed response!

@hardcodet
Copy link
Author

hardcodet commented Aug 6, 2023

Another observation I just made:

  • After submitting the first job, the group status is maxed, and the group job count is 0.
  • After submitting the second job (while the first job is running still), the group status is maxed and the group job count is 1.

Is the group job count (https://api.bullmq.pro/classes/v6.Queue.html#getGroupJobsCount) always the number of pending tasks minus any that are currently executing?

@manast
Copy link
Contributor

manast commented Aug 6, 2023

Is the group job count (https://api.bullmq.pro/classes/v6.Queue.html#getGroupJobsCount) always the number of pending tasks minus any that are currently executing?

When using groups, we always try to have 1 job in wait status, so I think in your case since you only tested with 1 group, one of the jobs was in wait status already, while the other was in the group. I thought we has a test case for this edge case actually and getGroupJobsCount should return the correct number, I will need to come back after confirming this.

@manast
Copy link
Contributor

manast commented Aug 6, 2023

Hi Manuel Sorry for the late delay, only just got back to this task. The feature works like a charm for me. Minor thing, not sure if that's by design or a potential issue:

  • I created a dummy job that just sleeps for two seconds. The groups always runs 1 job at once, so I force sequential executions.
  • Scheduling two jobs at once (first one is running already also runs the second job (which is great), only gives the exception on the 3rd job. So all good!

However, if I add a delay to the 2nd and 3rd job (let's say 500ms), all three jobs execute synchronously even though the first job is running already (and will be running when job 2 and 3 will become active).

  • I think it's a good thing that it doesn't result in an error after 500ms (after all, the job has been enqueued with the system, so I would indeed expect it to run
  • It's debatable whether it should throw the exception during job creation. Yes, there may be no job running in 500ms, but that doesn't change the fact that a job is running now. Potentially, since Bull can't predict the future, setting the max group size and a delay at the same time could considered mutually exclusive features. I'll probably will enforce this in my façade.
  • Interestingly enough, if job 1 and 2 are scheduled without delay, then job 3, scheduled with a delay will fail.

So bottom line: Delays may be a corner case, they don't play a role for me though. Thanks for the quick turnaround, and sorry again for the delayed response!

Thanks for the insights. Indeed, using delay would break this logic, and maybe we should throw an exception if you try to combine these two features. Will need to think more about it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants