Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting number of processes/threads through DRAMATIQ_NPROCS, DRAMATIQ_NTHREADS. #186

Merged
merged 2 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Seemlessly integrate [Dramatiq][dramatiq] with your Django project!

## Installation

To install, ensure both Django Dramtiq and Dramatiq are installed, along with RabbitMQ:
To install, ensure both Django Dramatiq and Dramatiq are installed, along with RabbitMQ:

pip install django-dramatiq 'dramatiq[rabbitmq]'

Expand Down Expand Up @@ -104,7 +104,31 @@ DRAMATIQ_AUTODISCOVER_MODULES = ["tasks", "services"]
Django Dramatiq comes with a management command you can use to
auto-discover task modules and run workers:

```sh
python manage.py rundramatiq
```

By default, `rundramatiq` will adjust the number of processes/threads used
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: We really should get a separate documentation page...

by Dramatiq based on the number of detected CPUs: one process will be launched
per CPU, and each process will have 8 worker threads.

The default number of processes, threads per process can be overridden through
environment variables, which take precedence over the defaults:

```sh
export DRAMATIQ_NPROCS=2 DRAMATIQ_NTHREADS=2
python manage.py rundramatiq
```

Or alternatively through command line arguments, which take precedence over the
defaults and any environment variables:

```sh
python manage.py rundramatiq -p 2 -t 2
```

This is useful e.g. to facilitate faster Dramatiq restarts in your development
environment.

If your project for some reason has apps with modules named `tasks` that
are not intended for use with Dramatiq, you can ignore them:
Expand Down
15 changes: 10 additions & 5 deletions django_dramatiq/management/commands/rundramatiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
from django.core.management.base import BaseCommand
from django.utils.module_loading import module_has_submodule

#: The number of available CPUs.
CPU_COUNT = multiprocessing.cpu_count()
THREAD_COUNT = 8
from django_dramatiq.utils import getenv_int


# Number of processes to use. Default: one per CPU.
NPROCS = getenv_int("DRAMATIQ_NPROCS", default=multiprocessing.cpu_count)

# Number of threads per process to use. Default: 8.
NTHREADS = getenv_int("DRAMATIQ_NTHREADS", 8)
Comment on lines +17 to +21
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Another way to do this is to use a setting, and then allow the user to set the setting themselves, rather than django-dramatiq grabbing it from the environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an intentional choice. I have considered adding a setting, but I concluded that an environment variable would be the appropriate solution here, since the value that should be used depends on the deployment environment.

Adding a setting would be less convenient because application setting files are typically versioned (in scm) and reused across different deployment environments. A setting which is dictated by the deployment environment would create complexities for the users.

E.g. I run the same developer setup as everyone else in my team, but I happen to have less cores on my computer. Putting e.g. DRAMATIQ_NPROCS in django settings, means that we will have to now add custom code to properly set the value for everyone. And that custom code would probably involve checking some environment variable. Having django-dramatiq handle this out of the box is

Also, in containerized environments (k8s, docker-compose etc.) the preferred way of configuring resource-related settings is through environment variables, as you don't want to update/fork your application settings file for every clone of your setup.

One could of course add settings as an additional link in the fallback chain: cli argument -> environment -> application setting -> auto-guess default value. But this can always be done at a later time, in a separate PR. At this point, IMHO, it would only add extra complexity to the codebase without offering any additional convenience.

I hope this explains this design choice.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

One could of course add settings as an additional link in the fallback chain: cli argument -> environment -> application setting -> auto-guess default value.

Lets get this in a see if the users are missing it.



class Command(BaseCommand):
Expand Down Expand Up @@ -49,13 +54,13 @@ def add_arguments(self, parser):
)
parser.add_argument(
"--processes", "-p",
default=CPU_COUNT,
default=NPROCS,
type=int,
help="The number of processes to run",
)
parser.add_argument(
"--threads", "-t",
default=THREAD_COUNT,
default=NTHREADS,
type=int,
help="The number of threads per process to use",
)
Expand Down
22 changes: 22 additions & 0 deletions django_dramatiq/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
import logging
import os

from django.utils.module_loading import import_string


def getenv_int(varname, default=None):
"""Retrieves an environment variable as an int."""
envstr = os.getenv(varname, None)

if envstr is not None:
try:
return int(envstr)
except ValueError:
if default is None:
raise
msgf = "Invalid value for %s: %r. Reverting to default."
logging.warning(msgf, varname, envstr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: i wonder if we should reraise the ValueError here with the warning message. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I wouldn't object to (essentially) not handle ValueError, as a user I would expect a warning and some best-effort fallback instead of a hard failure.

This is common behaviour when it comes to values set through environment variables. E.g. VIM=/does/not/exist vim script.py will have vim complain about the non-existing directory, but the editor will open. Or with TERM=wtf ls, ls will complain but still run and assume a dumb terminal.


if callable(default):
return default()
else:
return default


def load_middleware(path_or_obj, **kwargs):
if isinstance(path_or_obj, str):
return import_string(path_or_obj)(**kwargs)
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ order_by_type = true

[coverage:report]
omit =
__init__.py
django_dramatiq/setup.py
*/migrations/*
*/tests/*
32 changes: 16 additions & 16 deletions tests/test_rundramatiq_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def test_rundramatiq_can_run_dramatiq(execvp_mock):
assert "Discovered tasks module: 'tests.testapp3.tasks.other_tasks'" in buff.getvalue()

# And execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -67,8 +67,8 @@ def test_rundramatiq_can_run_dramatiq_reload(execvp_mock):
call_command("rundramatiq", "--reload", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -99,8 +99,8 @@ def test_rundramatiq_can_run_dramatiq_with_polling(execvp_mock):
call_command("rundramatiq", "--reload", "--reload-use-polling", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -132,8 +132,8 @@ def test_rundramatiq_can_run_dramatiq_with_only_some_queues(execvp_mock):
call_command("rundramatiq", "--queues", "A B C", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -164,8 +164,8 @@ def test_rundramatiq_can_run_dramatiq_with_specified_pid_file(execvp_mock):
call_command("rundramatiq", "--pid-file", "drama.pid", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -196,8 +196,8 @@ def test_rundramatiq_can_run_dramatiq_with_specified_log_file(execvp_mock):
call_command("rundramatiq", "--log-file", "drama.log", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down Expand Up @@ -244,8 +244,8 @@ def test_rundramatiq_can_ignore_modules(execvp_mock, settings):
assert "Ignored tasks module: 'tests.testapp3.tasks.utils.not_a_task'" in buff.getvalue()

# And execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand All @@ -272,8 +272,8 @@ def test_rundramatiq_can_fork(execvp_mock, settings):
call_command("rundramatiq", "--fork-function", "a", "--fork-function", "b", stdout=buff)

# Then execvp should be called with the appropriate arguments
cores = str(rundramatiq.CPU_COUNT)
threads = str(rundramatiq.THREAD_COUNT)
cores = str(rundramatiq.NPROCS)
threads = str(rundramatiq.NTHREADS)
expected_exec_name = "dramatiq"
expected_exec_path = os.path.join(
os.path.dirname(sys.executable),
Expand Down
29 changes: 29 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os

import pytest

from django_dramatiq.utils import getenv_int


@pytest.mark.parametrize("value, default, expected", (
("42", None, 42),
("invalid", 69, 69),
("invalid", None, ValueError),
("invalid", lambda: 96, 96),
(None, 19, 19),
(None, lambda: 78, 78),
(None, "hello", "hello"), # returned default is not checked to be an int
(None, lambda: "world", "world") # idem
))
def test_getenv_int(value, default, expected):
varname = "TEST_ENV_20250204"
if value is not None:
os.environ[varname] = value
else:
os.environ.pop(varname, None)

if isinstance(expected, type) and issubclass(expected, Exception):
with pytest.raises(expected):
getenv_int(varname, default)
else:
assert getenv_int(varname, default) == expected