Skip to content

Commit

Permalink
Prefilter polled queues
Browse files Browse the repository at this point in the history
  • Loading branch information
neob91-close committed Nov 30, 2022
1 parent 38f5f17 commit 8586fe7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
3 changes: 3 additions & 0 deletions tasktiger/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
SCHEDULED = "scheduled"
ERROR = "error"

# Used in a mechanism for prefiltering queues
IGNORED = "ignored"

# This lock is acquired in the main process when forking, and must be acquired
# in any thread of the main process when performing an operation that triggers a
# lock that a child process might want to acquire.
Expand Down
19 changes: 17 additions & 2 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1156,10 +1156,25 @@ def _queue_periodic_tasks(self):
)

def _refresh_queue_set(self):
self._queue_set = set(
self._filter_queues(self.connection.smembers(self._key(QUEUED)))
queued_key = self._key(QUEUED)
ignored_key = self._ignored_queues_key()

queues = self.connection.sdiff(queued_key, ignored_key)
self._queue_set = set(self._filter_queues(queues))

ignore_queues = queues - self._queue_set
if len(ignore_queues) > 100:
self.connection.sadd(ignored_key, *ignore_queues)
self.connection.sinterstore(ignored_key, ignored_key, queued_key)

def _ignored_queues_key(self):
signature = ",".join(
*sorted(self.only_queues),
*sorted(f"-{queue}" for queue in self.exclude_queues),
)

return self._key(IGNORED, signature)

def run(self, once=False, force_once=False):
"""
Main loop of the worker.
Expand Down

0 comments on commit 8586fe7

Please sign in to comment.