diff --git a/README.rst b/README.rst index 9d5c1f85..fc0b743e 100644 --- a/README.rst +++ b/README.rst @@ -500,6 +500,10 @@ Workers support the following options: Redis server database number (if different from ``0``). +- ``-M``, ``--max-workers-per-queue`` + + Maximum number of workers that are allowed to process a queue. + In some cases it is convenient to have a custom TaskTiger launch script. For example, your application may have a ``manage.py`` command that sets up the environment and you may want to launch TaskTiger workers using that script. To diff --git a/tasktiger/__init__.py b/tasktiger/__init__.py index 05a540f5..687be025 100644 --- a/tasktiger/__init__.py +++ b/tasktiger/__init__.py @@ -157,11 +157,6 @@ def __init__(self, connection=None, config=None, setup_structlog=False): # locking techniques. 'SINGLE_WORKER_QUEUES': [], - # The maximum number of workers that will be allowed to actively - # process tasks for any queue. Similar to Single Worker Queues - # but allows specifying any number 1 or greater. - 'MAX_WORKERS_PER_QUEUE': None, - # The following settings are only considered if no explicit queues # are passed in the command line (or to the queues argument in the # run_worker() method). diff --git a/tasktiger/worker.py b/tasktiger/worker.py index d108d59f..453ad100 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -73,8 +73,6 @@ def __init__(self, tiger, queues=None, exclude_queues=None, if max_workers_per_queue: self.max_workers_per_queue = max_workers_per_queue - elif self.config['MAX_WORKERS_PER_QUEUE']: - self.max_workers_per_queue = self.config['MAX_WORKERS_PER_QUEUE'] else: self.max_workers_per_queue = None assert (self.max_workers_per_queue is None or diff --git a/tests/test_workers.py b/tests/test_workers.py index d76375eb..c61a896a 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -20,9 +20,6 @@ class TestMaxWorkers(BaseTestCase): def test_max_workers(self): """Test Single Worker Queue.""" - config = {'MAX_WORKERS_PER_QUEUE': 2} - self.tiger.config.update(config) - # Queue three tasks for i in range(0, 3): task = Task(self.tiger, long_task_killed, queue='a') @@ -31,15 +28,17 @@ def test_max_workers(self): # Start two workers and wait until they start processing. worker1 = Process(target=external_worker, - kwargs={'patch_config': config}) + kwargs={'max_workers_per_queue': 2}) worker2 = Process(target=external_worker, - kwargs={'patch_config': config}) + kwargs={'max_workers_per_queue': 2}) worker1.start() worker2.start() time.sleep(DELAY) # This worker should fail to get the queue lock and exit immediately - Worker(self.tiger).run(once=True, force_once=True) + worker = Worker(self.tiger) + worker.max_workers_per_queue = 2 + worker.run(once=True, force_once=True) self._ensure_queues(active={'a': 2}, queued={'a': 1}) # Wait for external workers worker1.join() @@ -98,9 +97,6 @@ def test_single_worker_queue(self): def test_queue_system_lock(self): """Test queue system lock.""" - config = {'MAX_WORKERS_PER_QUEUE': 2} - self.tiger.config.update(config) - with FreezeTime(datetime.datetime(2014, 1, 1)): # Queue three tasks for i in range(0, 3): @@ -109,7 +105,9 @@ def test_queue_system_lock(self): self._ensure_queues(queued={'a': 3}) # Ensure we can process one - Worker(self.tiger).run(once=True, force_once=True) + worker = Worker(self.tiger) + worker.max_workers_per_queue = 2 + worker.run(once=True, force_once=True) self._ensure_queues(queued={'a': 2}) # Set system lock so no processing should occur for 10 seconds @@ -119,10 +117,14 @@ def test_queue_system_lock(self): assert lock_timeout == time.time() + 10 with FreezeTime(datetime.datetime(2014, 1, 1, 0, 0, 10)): - Worker(self.tiger).run(once=True, force_once=True) + worker = Worker(self.tiger) + worker.max_workers_per_queue = 2 + worker.run(once=True, force_once=True) self._ensure_queues(queued={'a': 2}) # 11 seconds in the future the lock should have expired with FreezeTime(datetime.datetime(2014, 1, 1, 0, 0, 11)): - Worker(self.tiger).run(once=True, force_once=True) + worker = Worker(self.tiger) + worker.max_workers_per_queue = 2 + worker.run(once=True, force_once=True) self._ensure_queues(queued={'a': 1}) diff --git a/tests/utils.py b/tests/utils.py index 6b5dfe31..759310f6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -61,12 +61,18 @@ def get_tiger(): return tiger -def external_worker(n=None, patch_config=None): +def external_worker(n=None, patch_config=None, max_workers_per_queue=None): """ Runs a worker. To be used with multiprocessing.Pool.map. """ tiger = get_tiger() + if patch_config: tiger.config.update(patch_config) + worker = Worker(tiger) + + if max_workers_per_queue is not None: + worker.max_workers_per_queue = max_workers_per_queue + worker.run(once=True)