Skip to content

Commit

Permalink
Set number of processes/threads through DRAMATIQ_NPROCS, DRAMATIQ_NTH…
Browse files Browse the repository at this point in the history
…READS. (#186)

* Add coverage report exclusions for __init__.py and setup.py.

* Allow setting number of processes/threads through env.
  • Loading branch information
m000 authored Feb 24, 2025
1 parent 4be89b3 commit 08be4f6
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 22 deletions.
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Seemlessly integrate [Dramatiq][dramatiq] with your Django project!

## Installation

To install, ensure both Django Dramtiq and Dramatiq are installed, along with RabbitMQ:
To install, ensure both Django Dramatiq and Dramatiq are installed, along with RabbitMQ:

pip install django-dramatiq 'dramatiq[rabbitmq]'

Expand Down Expand Up @@ -104,7 +104,31 @@ DRAMATIQ_AUTODISCOVER_MODULES = ["tasks", "services"]
Django Dramatiq comes with a management command you can use to
auto-discover task modules and run workers:

```sh
python manage.py rundramatiq
```

By default, `rundramatiq` will adjust the number of processes/threads used
by Dramatiq based on the number of detected CPUs: one process will be launched
per CPU, and each process will have 8 worker threads.

The default number of processes, threads per process can be overridden through
environment variables, which take precedence over the defaults:

```sh
export DRAMATIQ_NPROCS=2 DRAMATIQ_NTHREADS=2
python manage.py rundramatiq
```

Or alternatively through command line arguments, which take precedence over the
defaults and any environment variables:

```sh
python manage.py rundramatiq -p 2 -t 2
```

This is useful e.g. to facilitate faster Dramatiq restarts in your development
environment.

If your project for some reason has apps with modules named `tasks` that
are not intended for use with Dramatiq, you can ignore them:
Expand Down
15 changes: 10 additions & 5 deletions django_dramatiq/management/commands/rundramatiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
from django.core.management.base import BaseCommand
from django.utils.module_loading import module_has_submodule

#: The number of available CPUs.
CPU_COUNT = multiprocessing.cpu_count()
THREAD_COUNT = 8
from django_dramatiq.utils import getenv_int


# Number of processes to use. Default: one per CPU.
NPROCS = getenv_int("DRAMATIQ_NPROCS", default=multiprocessing.cpu_count)

# Number of threads per process to use. Default: 8.
NTHREADS = getenv_int("DRAMATIQ_NTHREADS", 8)


class Command(BaseCommand):
Expand Down Expand Up @@ -49,13 +54,13 @@ def add_arguments(self, parser):
)
parser.add_argument(
"--processes", "-p",
default=CPU_COUNT,
default=NPROCS,
type=int,
help="The number of processes to run",
)
parser.add_argument(
"--threads", "-t",
default=THREAD_COUNT,
default=NTHREADS,
type=int,
help="The number of threads per process to use",
)
Expand Down
22 changes: 22 additions & 0 deletions django_dramatiq/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
import logging
import os

from django.utils.module_loading import import_string


def getenv_int(varname, default=None):
"""Retrieves an environment variable as an int."""
envstr = os.getenv(varname, None)

if envstr is not None:
try:
return int(envstr)
except ValueError:
if default is None:
raise
msgf = "Invalid value for %s: %r. Reverting to default."
logging.warning(msgf, varname, envstr)

if callable(default):
return default()
else:
return default


def load_middleware(path_or_obj, **kwargs):
if isinstance(path_or_obj, str):
return import_string(path_or_obj)(**kwargs)
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ order_by_type = true

[coverage:report]
omit =
__init__.py
django_dramatiq/setup.py
*/migrations/*
*/tests/*
32 changes: 16 additions & 16 deletions tests/test_rundramatiq_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def test_rundramatiq_can_run_dramatiq(execvp_mock):
assert "Discovered tasks module: 'tests.testapp3.tasks.other_tasks'" in buff.getvalue()

# And execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -67,8 +67,8 @@ def test_rundramatiq_can_run_dramatiq_reload(execvp_mock):
call_command("rundramatiq", "--reload", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -99,8 +99,8 @@ def test_rundramatiq_can_run_dramatiq_with_polling(execvp_mock):
call_command("rundramatiq", "--reload", "--reload-use-polling", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -132,8 +132,8 @@ def test_rundramatiq_can_run_dramatiq_with_only_some_queues(execvp_mock):
call_command("rundramatiq", "--queues", "A B C", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -164,8 +164,8 @@ def test_rundramatiq_can_run_dramatiq_with_specified_pid_file(execvp_mock):
call_command("rundramatiq", "--pid-file", "drama.pid", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -196,8 +196,8 @@ def test_rundramatiq_can_run_dramatiq_with_specified_log_file(execvp_mock):
call_command("rundramatiq", "--log-file", "drama.log", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -244,8 +244,8 @@ def test_rundramatiq_can_ignore_modules(execvp_mock, settings):
assert "Ignored tasks module: 'tests.testapp3.tasks.utils.not_a_task'" in buff.getvalue()

# And execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand All @@ -272,8 +272,8 @@ def test_rundramatiq_can_fork(execvp_mock, settings):
call_command("rundramatiq", "--fork-function", "a", "--fork-function", "b", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down
29 changes: 29 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os

import pytest

from django_dramatiq.utils import getenv_int


@pytest.mark.parametrize("value, default, expected", (
("42", None, 42),
("invalid", 69, 69),
("invalid", None, ValueError),
("invalid", lambda: 96, 96),
(None, 19, 19),
(None, lambda: 78, 78),
(None, "hello", "hello"), # returned default is not checked to be an int
(None, lambda: "world", "world") # idem
))
def test_getenv_int(value, default, expected):
varname = "TEST_ENV_20250204"
if value is not None:
os.environ[varname] = value
else:
os.environ.pop(varname, None)

if isinstance(expected, type) and issubclass(expected, Exception):
with pytest.raises(expected):
getenv_int(varname, default)
else:
assert getenv_int(varname, default) == expected

0 comments on commit 08be4f6

Please sign in to comment.