Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel batch documentation and progress reporting #396

Merged
merged 10 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .vscode/ltex.dictionary.en-US.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ YYYY
CalVer
backported
semver-stable
invokers
lenskit
invoker
CUDA
1 change: 1 addition & 0 deletions .vscode/ltex.hiddenFalsePositives.en-US.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
{"rule":"COMMA_COMPOUND_SENTENCE","sentence":"^\\QThis combination does mean that we may sometimes release a minor revision with the previous\nyear's major version number, if there are breaking changes in progress but not yet ready for\nrelease and we need to release new features or fixes for the current major version.\\E$"}
{"rule":"MISSING_GENITIVE","sentence":"^\\QPyTorch tensors, including those on CUDA devices, are shared.\\E$"}
4 changes: 4 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from seedbank import initialize, numpy_rng

from pytest import fixture
from hypothesis import settings

logging.getLogger("numba").setLevel(logging.INFO)

Expand Down Expand Up @@ -52,3 +53,6 @@ def pytest_collection_modifyitems(items):
if evm is not None and slm is None:
_log.debug("adding slow mark to %s", item)
item.add_marker("slow")


settings.register_profile("default", deadline=1000)
2 changes: 2 additions & 0 deletions docs/batch.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _batch::

Batch-Running Recommenders
==========================

Expand Down
3 changes: 3 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@
"binpickle": ("https://binpickle.lenskit.org/en/stable/", None),
"csr": ("https://csr.lenskit.org/en/latest/", None),
"seedbank": ("https://seedbank.lenskit.org/en/latest/", None),
"progress_api": ("https://progress-api.readthedocs.io/en/latest/", None),
"manylog": ("https://manylog.readthedocs.io/en/latest/", None),
"torch": ("https://pytorch.org/docs/stable/", None),
}

autodoc_default_options = {"members": True, "member-order": "bysource", "show-inheritance": True}
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Resources
batch
evaluation/index
documenting
parallel

.. toctree::
:maxdepth: 1
Expand Down
3 changes: 0 additions & 3 deletions docs/internals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,3 @@ LensKit Internals
These modules are primarily for internal infrastructural support in Lenskit.
Neither LensKit users nor algorithm developers are likely to need to use this
code directly.

.. toctree::
parallel
78 changes: 73 additions & 5 deletions docs/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,70 @@ Parallel Execution

.. py:module:: lenskit.parallel

LensKit supports various forms of parallel execution, each with an environment
variable controlling its :

- :doc:`Batch operations <batch>` using :doc:`multi-process execution <parallel-model-ops>`.
- Parallel model training. For most models provided by LensKit, this is usually
implemented using PyTorch JIT parallelism (:func:`torch.jit.fork`).
- Parallel computation in the various backends (BLAS, MKL, Torch, etc.).

Other models compatible with LensKit may use their own parallel processing logic.

Configuring Parallelism
~~~~~~~~~~~~~~~~~~~~~~~

LensKit provides 4 knobs for configuring parallelism, each of which has a
corresponding environment variable and parameter to :py:func:`initialize`. The
environment variables are:

.. envvar:: LK_NUM_PROCS

The number of processes to use for batch operations. Defaults to the number
of CPUs or 4, whichever is lower.

.. envvar:: LK_NUM_THREADS

The number of threads to use for parallel model building. Defaults to the
number of CPUs or 8, whichever is smaller.

This number is passed to :func:`torch.set_num_interop_threads` to set up the
Torch JIT thread count.

.. envvar:: LK_NUM_BACKEND_THREADS

The number of threads to be used by backend compute engines. Defaults to up
to 4 backend threads per training thread, depending on the capacity of the
machine::

max(min(NCPUS // LK_NUM_THREADS, 4), 1)

This is passed to :func:`torch.set_num_threads` (to control PyTorch internal
parallelism), and to the underlying BLAS layer (via `threadpoolctl`_).

.. envvar:: LK_NUM_CHILD_THREADS

The number of backend threads to be used in worker processes spawned by
batch evaluation. Defaults to 4 per process, capped by the number of CPUs
available::

max(min(NCPUS // LK_NUM_PROCS, 4), 1)

Workers have both the process and thread counts set to 1.

.. _threadpoolctl: https://github.com/joblib/threadpoolctl

.. autofunction:: initialize

.. autofunction:: ensure_parallel_init

.. _parallel-model-ops::

Parallel Model Ops
~~~~~~~~~~~~~~~~~~

LensKit uses a custom API wrapping :py:class:`multiprocessing.pool.Pool` to
paralellize batch operations (see :py:mod:`lenskit.batch`).
parallelize batch operations (see :py:mod:`lenskit.batch`).

The basic idea of this API is to create an *invoker* that has a model and a function,
and then passing lists of argument sets to the function::
Expand All @@ -15,14 +77,20 @@ and then passing lists of argument sets to the function::
The model is persisted into shared memory to be used by the worker processes.
PyTorch tensors, including those on CUDA devices, are shared.

Parallel Model Ops
~~~~~~~~~~~~~~~~~~
LensKit users will generally not need to directly use parallel op invokers, but
if you are implementing new batch operations with parallelism they are useful.
They may also be useful for other kinds of analysis.

.. autofunction:: invoker

.. autofunction:: proc_count

.. autoclass:: ModelOpInvoker
:members:

Logging and Progress
~~~~~~~~~~~~~~~~~~~~

Multi-process op invokers automatically set up logging and progress reporting to
work across processes using the :py:mod:`manylog` package. Op invokers can also
report the progress of queued jobs to a :py:class:`progress_api.Progress`.

.. autofunction:: invoke_progress
4 changes: 2 additions & 2 deletions envs/lenskit-py3.10-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ dependencies:
- tqdm>=4
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- manylog>=0.1.0a5
- progress-api>=0.1.0a9
- seedbank>=0.2.0a1
- unbeheader~=1.3
4 changes: 2 additions & 2 deletions envs/lenskit-py3.10-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dependencies:
- tqdm>=4
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- manylog>=0.1.0a5
- progress-api>=0.1.0a9
- seedbank>=0.2.0a1
- unbeheader~=1.3
4 changes: 2 additions & 2 deletions envs/lenskit-py3.10-doc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ dependencies:
- threadpoolctl>=3.0
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- manylog>=0.1.0a5
- progress-api>=0.1.0a9
- seedbank>=0.2.0a1
4 changes: 2 additions & 2 deletions envs/lenskit-py3.10-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ dependencies:
- threadpoolctl>=3.0
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- manylog>=0.1.0a5
- progress-api>=0.1.0a9
- seedbank>=0.2.0a1
4 changes: 2 additions & 2 deletions envs/lenskit-py3.11-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ dependencies:
- tqdm>=4
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- manylog>=0.1.0a5
- progress-api>=0.1.0a9
- seedbank>=0.2.0a1
- unbeheader~=1.3
4 changes: 2 additions & 2 deletions envs/lenskit-py3.11-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dependencies:
- tqdm>=4
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- manylog>=0.1.0a5
- progress-api>=0.1.0a9
- seedbank>=0.2.0a1
- unbeheader~=1.3
4 changes: 2 additions & 2 deletions envs/lenskit-py3.11-doc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ dependencies:
- threadpoolctl>=3.0
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- manylog>=0.1.0a5
- progress-api>=0.1.0a9
- seedbank>=0.2.0a1
4 changes: 2 additions & 2 deletions envs/lenskit-py3.11-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ dependencies:
- threadpoolctl>=3.0
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- manylog>=0.1.0a5
- progress-api>=0.1.0a9
- seedbank>=0.2.0a1
9 changes: 6 additions & 3 deletions lenskit/batch/_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import pandas as pd

from .. import util
from ..parallel import invoker
from ..parallel import invoke_progress, invoker

_logger = logging.getLogger(__name__)
_rec_context = None


def _predict_user(model, req):
Expand Down Expand Up @@ -82,7 +81,11 @@ def predict(algo, pairs, *, n_jobs=None, **kwargs):
nusers = pairs["user"].nunique()

timer = util.Stopwatch()
with invoker(algo, _predict_user, n_jobs=n_jobs) as worker:
nusers = pairs["user"].nunique()
with (
invoke_progress(_logger, "predictions", nusers, unit="user") as progress,
invoker(algo, _predict_user, n_jobs=n_jobs, progress=progress) as worker,
):
del algo # maybe free some memory

_logger.info(
Expand Down
10 changes: 7 additions & 3 deletions lenskit/batch/_recommend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

import numpy as np
import pandas as pd
from progress_api import make_progress

Check failure on line 12 in lenskit/batch/_recommend.py

View workflow job for this annotation

GitHub Actions / Check Source Code

Ruff (F401)

lenskit/batch/_recommend.py:12:26: F401 `progress_api.make_progress` imported but unused

Check failure on line 12 in lenskit/batch/_recommend.py

View workflow job for this annotation

GitHub Actions / Check Source Code

Ruff (F401)

lenskit/batch/_recommend.py:12:26: F401 `progress_api.make_progress` imported but unused

from .. import util
from ..algorithms import Recommender
from ..parallel import invoker
from ..parallel import invoke_progress, invoker

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -83,8 +84,11 @@

candidates = __standard_cand_fun(candidates)

with invoker(algo, _recommend_user, n_jobs=n_jobs) as worker:
_logger.info("recommending with %s for %d users (n_jobs=%s)", str(algo), len(users), n_jobs)
_logger.info("recommending with %s for %d users (n_jobs=%s)", str(algo), len(users), n_jobs)
with (
invoke_progress(_logger, "recommending", len(users), unit="user") as progress,
invoker(algo, _recommend_user, n_jobs=n_jobs, progress=progress) as worker,
):
del algo
timer = util.Stopwatch()
results = worker.map((user, n, candidates(user)) for user in users)
Expand Down
3 changes: 2 additions & 1 deletion lenskit/parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
from __future__ import annotations

from .config import ensure_parallel_init, get_parallel_config, initialize
from .invoker import ModelOpInvoker, invoker
from .invoker import ModelOpInvoker, invoke_progress, invoker

__all__ = [
"initialize",
"get_parallel_config",
"ensure_parallel_init",
"invoker",
"invoke_progress",
"ModelOpInvoker",
]
26 changes: 8 additions & 18 deletions lenskit/parallel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,18 @@ def initialize(

Args:
processes:
The number of processes to use for multiprocessing evaluations.
Configured from ``LK_NUM_PROCS``. Defaults to the number of CPUs or
4, whichever is smaller.
The number of processes to use for multiprocessing evaluations (see
:envvar:`LK_NUM_PROCS`)
threads:
The number of threads to use for parallel model training and similar
operations. This is passed to
:func:`torch.set_num_interop_threads`. Environment variable is
``LK_NUM_THREADS``. Defaults to the number of CPUs or 8, whichever
is smaller, to avoid runaway thread coordination overhead on large
machines.
operations (see :envvar:`LK_NUM_THREADS`).
backend_threads:
The number of threads underlying computational engines should use.
This is passed to :func:`torch.set_num_threads` and to the BLAS
threading layer. Configured from ``LK_NUM_BACKEND_THREADS``.
The number of threads underlying computational engines should use
(see :envvar:`LK_NUM_BACKEND_THREADS`).
child_threads:
The number of threads backends are allowed to use in the worker
processes in multiprocessing operations. This is like
``backend_threads``, except it is passed to the underlying libraries
in worker processes. Environment variable is
``LK_NUM_CHILD_THREADS``. Defaults is computed from the number of
CPUs with a max of 4 threads per worker. Child processes set both
``processes`` and ``threads`` to 1.
processes in multiprocessing operations (see
:envvar:`LK_NUM_CHILD_THREADS`).
"""
global _config
if _config:
Expand Down Expand Up @@ -137,6 +127,6 @@ def _resolve_parallel_config(
backend_threads = max(min(ncpus // threads, 4), 1)

if child_threads is None:
child_threads = min(ncpus // processes, 4)
child_threads = max(min(ncpus // processes, 4), 1)

return ParallelConfig(processes, threads, backend_threads, child_threads)
Loading
Loading