Skip to content

Commit

Permalink
Prefilter polled queues
Browse files Browse the repository at this point in the history
  • Loading branch information
neob91-close committed Dec 1, 2022
1 parent 38f5f17 commit 887f2b7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
25 changes: 22 additions & 3 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import errno
import fcntl
import random
import re
import select
import signal
import socket
Expand Down Expand Up @@ -1156,9 +1157,27 @@ def _queue_periodic_tasks(self):
)

def _refresh_queue_set(self):
self._queue_set = set(
self._filter_queues(self.connection.smembers(self._key(QUEUED)))
)
self._queue_set = set(self._filter_queues(self._retrieve_queues()))

def _retrieve_queues(self):
key = self._key(QUEUED)

if len(self.only_queues) != 1:
return self.connection.smembers(key)

# Escape special characters in the queue
match = re.sub(r"([?*\[\]])", r"\\\1", list(self.only_queues)[0]) + "*"

result = set()
cursor = None

while cursor != 0:
cursor, items = self.connection.sscan(
key, cursor=cursor or 0, match=match, count=100000
)
result.update(items)

return result

def run(self, once=False, force_once=False):
"""
Expand Down
22 changes: 18 additions & 4 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,19 +739,33 @@ def test_batch_lock_key(self):
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={"batch": 0})

def test_only_queues(self):
@pytest.mark.parametrize(
"only_queues,expected_unprocessed",
[
(["a"], {"b": 1, "b.a": 1, "c": 2}),
(["a", "b"], {"c": 2}),
(["a", "c"], {"b": 1, "b.a": 1}),
(["a", "b", "c"], {}), # all queues selected, all queues processed
([], {}), # no queue restriction, all queues processed
],
)
def test_only_queues(self, only_queues, expected_unprocessed):
self.tiger.delay(simple_task, queue="a")
self.tiger.delay(simple_task, queue="a.a")
self.tiger.delay(simple_task, queue="b")
self.tiger.delay(simple_task, queue="b.a")
self.tiger.delay(simple_task, queue="c")
self.tiger.delay(simple_task, queue="c")

self._ensure_queues(queued={"a": 1, "a.a": 1, "b": 1, "b.a": 1})
self._ensure_queues(
queued={"a": 1, "a.a": 1, "b": 1, "b.a": 1, "c": 2}
)

self.tiger.config["ONLY_QUEUES"] = ["a"]
self.tiger.config["ONLY_QUEUES"] = only_queues

Worker(self.tiger).run(once=True)

self._ensure_queues(queued={"b": 1, "b.a": 1})
self._ensure_queues(queued=expected_unprocessed)

def test_exclude_queues(self):
"""
Expand Down

0 comments on commit 887f2b7

Please sign in to comment.