diff --git a/tasktiger/worker.py b/tasktiger/worker.py index f8e2b839..b9d06ce1 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -3,6 +3,7 @@ import errno import fcntl import random +import re import select import signal import socket @@ -1156,9 +1157,27 @@ def _queue_periodic_tasks(self): ) def _refresh_queue_set(self): - self._queue_set = set( - self._filter_queues(self.connection.smembers(self._key(QUEUED))) - ) + self._queue_set = set(self._filter_queues(self._retrieve_queues())) + + def _retrieve_queues(self): + key = self._key(QUEUED) + + if len(self.only_queues) != 1: + return self.connection.smembers(key) + + # Escape special characters in the queue + match = re.sub(r"([?*\[\]])", r"\\\1", list(self.only_queues)[0]) + "*" + + result = set() + cursor = None + + while cursor != 0: + cursor, items = self.connection.sscan( + key, cursor=cursor or 0, match=match, count=100000 + ) + result.update(items) + + return result def run(self, once=False, force_once=False): """ diff --git a/tests/test_base.py b/tests/test_base.py index 134a6751..80547c79 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -739,19 +739,33 @@ def test_batch_lock_key(self): Worker(self.tiger).run(once=True) self._ensure_queues(queued={"batch": 0}) - def test_only_queues(self): + @pytest.mark.parametrize( + "only_queues,expected_unprocessed", + [ + (["a"], {"b": 1, "b.a": 1, "c": 2}), + (["a", "b"], {"c": 2}), + (["a", "c"], {"b": 1, "b.a": 1}), + (["a", "b", "c"], {}), # all queues selected, all queues processed + ([], {}), # no queue restriction, all queues processed + ], + ) + def test_only_queues(self, only_queues, expected_unprocessed): self.tiger.delay(simple_task, queue="a") self.tiger.delay(simple_task, queue="a.a") self.tiger.delay(simple_task, queue="b") self.tiger.delay(simple_task, queue="b.a") + self.tiger.delay(simple_task, queue="c") + self.tiger.delay(simple_task, queue="c") - self._ensure_queues(queued={"a": 1, "a.a": 1, "b": 1, "b.a": 1}) + self._ensure_queues( + queued={"a": 1, "a.a": 1, "b": 1, "b.a": 1, "c": 2} + ) - self.tiger.config["ONLY_QUEUES"] = ["a"] + self.tiger.config["ONLY_QUEUES"] = only_queues Worker(self.tiger).run(once=True) - self._ensure_queues(queued={"b": 1, "b.a": 1}) + self._ensure_queues(queued=expected_unprocessed) def test_exclude_queues(self): """