-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Poll for new tasks #176
Poll for new tasks #176
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be publishing here if polling is enabled?
Line 209 in 6064c95
self.connection.publish(self._key('activity'), queue) |
tasktiger/worker.py
Outdated
@@ -222,6 +222,14 @@ def _wait_for_new_tasks(self, timeout=0, batch_timeout=0): | |||
3. Timeout seconds have passed, this is the maximum time to stay in | |||
this method | |||
""" | |||
if not self._pubsub: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might pull this new code into a _poll_for_queues
function and the old code into _pubsub_for_queues
function so the two approaches aren't mixed together in here.
tasktiger/worker.py
Outdated
if self._did_work: | ||
self._refresh_queue_set() | ||
else: | ||
if not self._queue_set: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we not self._queue_set
? Is the theory that if we already have some queues we know about we should drain the set before getting new ones? If so we should have a comment here to clarify the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can remove the check since the condition not self._did_work and self._queue_set
should not happen.
In _worker_run
we call _process_from_queue
for each queue, which takes a task (or multiple with batch queues) and processes it. It removes the queue from _queue_set
if it's empty.
In order for _did_work
to be false, we must have not processed any tasks (or scheduled any tasks). In this case we would have looped through all the queues in the _queue_set
and removed them.
@jkemp101 Take another look. Logic should be equivalent. |
Ability to configure a polling interval for new task to reduce load when there are many workers & tasks. Instead of subscribing to the activity channel, which results in N_WORKERS messages for each task (and then each of them polling for the task), we just poll periodically. Downside is that with polling enabled and few workers it might take up to the polling interval to pick up a new task.