Skip to content

Commit

Permalink
Remove max workers config setting
Browse files Browse the repository at this point in the history
  • Loading branch information
jkemp101 committed Jan 9, 2019
1 parent e61738f commit f55d764
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 20 deletions.
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,10 @@ Workers support the following options:

Redis server database number (if different from ``0``).

- ``-M``, ``--max-workers-per-queue``

Maximum number of workers that are allowed to process a queue.

In some cases it is convenient to have a custom TaskTiger launch script. For
example, your application may have a ``manage.py`` command that sets up the
environment and you may want to launch TaskTiger workers using that script. To
Expand Down
5 changes: 0 additions & 5 deletions tasktiger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,6 @@ def __init__(self, connection=None, config=None, setup_structlog=False):
# locking techniques.
'SINGLE_WORKER_QUEUES': [],

# The maximum number of workers that will be allowed to actively
# process tasks for any queue. Similar to Single Worker Queues
# but allows specifying any number 1 or greater.
'MAX_WORKERS_PER_QUEUE': None,

# The following settings are only considered if no explicit queues
# are passed in the command line (or to the queues argument in the
# run_worker() method).
Expand Down
2 changes: 0 additions & 2 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ def __init__(self, tiger, queues=None, exclude_queues=None,

if max_workers_per_queue:
self.max_workers_per_queue = max_workers_per_queue
elif self.config['MAX_WORKERS_PER_QUEUE']:
self.max_workers_per_queue = self.config['MAX_WORKERS_PER_QUEUE']
else:
self.max_workers_per_queue = None
assert (self.max_workers_per_queue is None or
Expand Down
26 changes: 14 additions & 12 deletions tests/test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ class TestMaxWorkers(BaseTestCase):
def test_max_workers(self):
"""Test Single Worker Queue."""

config = {'MAX_WORKERS_PER_QUEUE': 2}
self.tiger.config.update(config)

# Queue three tasks
for i in range(0, 3):
task = Task(self.tiger, long_task_killed, queue='a')
Expand All @@ -31,15 +28,17 @@ def test_max_workers(self):

# Start two workers and wait until they start processing.
worker1 = Process(target=external_worker,
kwargs={'patch_config': config})
kwargs={'max_workers_per_queue': 2})
worker2 = Process(target=external_worker,
kwargs={'patch_config': config})
kwargs={'max_workers_per_queue': 2})
worker1.start()
worker2.start()
time.sleep(DELAY)

# This worker should fail to get the queue lock and exit immediately
Worker(self.tiger).run(once=True, force_once=True)
worker = Worker(self.tiger)
worker.max_workers_per_queue = 2
worker.run(once=True, force_once=True)
self._ensure_queues(active={'a': 2}, queued={'a': 1})
# Wait for external workers
worker1.join()
Expand Down Expand Up @@ -98,9 +97,6 @@ def test_single_worker_queue(self):
def test_queue_system_lock(self):
"""Test queue system lock."""

config = {'MAX_WORKERS_PER_QUEUE': 2}
self.tiger.config.update(config)

with FreezeTime(datetime.datetime(2014, 1, 1)):
# Queue three tasks
for i in range(0, 3):
Expand All @@ -109,7 +105,9 @@ def test_queue_system_lock(self):
self._ensure_queues(queued={'a': 3})

# Ensure we can process one
Worker(self.tiger).run(once=True, force_once=True)
worker = Worker(self.tiger)
worker.max_workers_per_queue = 2
worker.run(once=True, force_once=True)
self._ensure_queues(queued={'a': 2})

# Set system lock so no processing should occur for 10 seconds
Expand All @@ -119,10 +117,14 @@ def test_queue_system_lock(self):
assert lock_timeout == time.time() + 10

with FreezeTime(datetime.datetime(2014, 1, 1, 0, 0, 10)):
Worker(self.tiger).run(once=True, force_once=True)
worker = Worker(self.tiger)
worker.max_workers_per_queue = 2
worker.run(once=True, force_once=True)
self._ensure_queues(queued={'a': 2})

# 11 seconds in the future the lock should have expired
with FreezeTime(datetime.datetime(2014, 1, 1, 0, 0, 11)):
Worker(self.tiger).run(once=True, force_once=True)
worker = Worker(self.tiger)
worker.max_workers_per_queue = 2
worker.run(once=True, force_once=True)
self._ensure_queues(queued={'a': 1})
8 changes: 7 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,18 @@ def get_tiger():
return tiger


def external_worker(n=None, patch_config=None):
def external_worker(n=None, patch_config=None, max_workers_per_queue=None):
"""
Runs a worker. To be used with multiprocessing.Pool.map.
"""
tiger = get_tiger()

if patch_config:
tiger.config.update(patch_config)

worker = Worker(tiger)

if max_workers_per_queue is not None:
worker.max_workers_per_queue = max_workers_per_queue

worker.run(once=True)

0 comments on commit f55d764

Please sign in to comment.