From 3a295d019826436738372326a9dccaff2c175778 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sun, 12 May 2024 14:12:21 -0400 Subject: [PATCH 01/10] remove unused import --- lenskit/parallel/pool.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lenskit/parallel/pool.py b/lenskit/parallel/pool.py index d65ce28d0..f932f3a3e 100644 --- a/lenskit/parallel/pool.py +++ b/lenskit/parallel/pool.py @@ -9,7 +9,6 @@ import logging import multiprocessing as mp -import pickle from concurrent.futures import ProcessPoolExecutor from multiprocessing.managers import SharedMemoryManager from typing import Generic, Iterable, Iterator From 4bcc18d385bf7fcc3a363d137f6b038944a224cd Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sun, 12 May 2024 15:16:04 -0400 Subject: [PATCH 02/10] add routing for progress bars to workers --- lenskit/parallel/__init__.py | 3 ++- lenskit/parallel/invoker.py | 36 +++++++++++++++++++++++++++++----- lenskit/parallel/pool.py | 5 ++++- lenskit/parallel/sequential.py | 4 +++- 4 files changed, 40 insertions(+), 8 deletions(-) diff --git a/lenskit/parallel/__init__.py b/lenskit/parallel/__init__.py index 00bada6b3..7944933f4 100644 --- a/lenskit/parallel/__init__.py +++ b/lenskit/parallel/__init__.py @@ -11,12 +11,13 @@ from __future__ import annotations from .config import ensure_parallel_init, get_parallel_config, initialize -from .invoker import ModelOpInvoker, invoker +from .invoker import ModelOpInvoker, invoke_progress, invoker __all__ = [ "initialize", "get_parallel_config", "ensure_parallel_init", "invoker", + "invoke_progress", "ModelOpInvoker", ] diff --git a/lenskit/parallel/invoker.py b/lenskit/parallel/invoker.py index e8655dab9..529166a82 100644 --- a/lenskit/parallel/invoker.py +++ b/lenskit/parallel/invoker.py @@ -8,8 +8,11 @@ from __future__ import annotations from abc import ABC, abstractmethod +from logging import Logger from typing import Any, Callable, Generic, Iterable, Iterator, Optional, TypeAlias, TypeVar +from progress_api import Progress, make_progress + from lenskit.parallel.config import ensure_parallel_init, get_parallel_config M = TypeVar("M") @@ -18,20 +21,43 @@ InvokeOp: TypeAlias = Callable[[M, A], R] +def invoke_progress( + logger: str | Logger | None = None, label: str | None = None, total: int | None = None +) -> Progress: + """ + Create a progress bar for parallel tasks. It is populated with the + correct state of tasks for :func:`invoker`. + + See :func:`make_progress` for details on parameter meanings. + """ + return make_progress( + logger, label, total, outcomes="finished", states=["in-progress", "dispatched"] + ) + + def invoker( model: M, func: InvokeOp[M, A, R], n_jobs: Optional[int] = None, + progress: Progress | None = None, ) -> ModelOpInvoker[A, R]: """ Get an appropriate invoker for performing operations on ``model``. Args: - model(obj): The model object on which to perform operations. - func(function): The function to call. The function must be pickleable. - n_jobs(int or None): + model: The model object on which to perform operations. + func: The function to call. The function must be pickleable. + n_jobs: The number of processes to use for parallel operations. If ``None``, will call :func:`proc_count` with a maximum default process count of 4. + progress: + A progress bar to use to report status. It should have the following states: + + * dispatched + * in-progress + * finished + + One can be created with :func:`invoke_progress` Returns: ModelOpInvoker: @@ -44,11 +70,11 @@ def invoker( if n_jobs == 1: from .sequential import InProcessOpInvoker - return InProcessOpInvoker(model, func) + return InProcessOpInvoker(model, func, progress) else: from .pool import ProcessPoolOpInvoker - return ProcessPoolOpInvoker(model, func, n_jobs) + return ProcessPoolOpInvoker(model, func, n_jobs, progress) class ModelOpInvoker(ABC, Generic[A, R]): diff --git a/lenskit/parallel/pool.py b/lenskit/parallel/pool.py index f932f3a3e..066f8b6ee 100644 --- a/lenskit/parallel/pool.py +++ b/lenskit/parallel/pool.py @@ -15,6 +15,7 @@ import manylog import seedbank +from progress_api import Progress from . import worker from .config import get_parallel_config @@ -29,7 +30,9 @@ class ProcessPoolOpInvoker(ModelOpInvoker[A, R], Generic[M, A, R]): manager: SharedMemoryManager pool: ProcessPoolExecutor - def __init__(self, model: M, func: InvokeOp[M, A, R], n_jobs: int): + def __init__( + self, model: M, func: InvokeOp[M, A, R], n_jobs: int, progress: Progress | None = None + ): _log.debug("persisting function") ctx = mp.get_context("spawn") _log.info("setting up process pool w/ %d workers", n_jobs) diff --git a/lenskit/parallel/sequential.py b/lenskit/parallel/sequential.py index eb3ed7a2c..9199084ee 100644 --- a/lenskit/parallel/sequential.py +++ b/lenskit/parallel/sequential.py @@ -9,6 +9,8 @@ import logging from typing import Generic, Iterable, Iterator +from progress_api import Progress + from .invoker import A, InvokeOp, M, ModelOpInvoker, R _log = logging.getLogger(__name__) @@ -18,7 +20,7 @@ class InProcessOpInvoker(ModelOpInvoker[A, R], Generic[M, A, R]): model: M function: InvokeOp[M, A, R] - def __init__(self, model: M, func: InvokeOp[M, A, R]): + def __init__(self, model: M, func: InvokeOp[M, A, R], progress: Progress | None = None): _log.info("setting up in-process worker") self.model = model self.function = func From 3c3234bd6750ae2721611da8106100567fe1aede Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sun, 12 May 2024 15:16:52 -0400 Subject: [PATCH 03/10] add documentation for parallel cofnig --- .vscode/ltex.dictionary.en-US.txt | 4 ++ .vscode/ltex.hiddenFalsePositives.en-US.txt | 1 + docs/batch.rst | 2 + docs/conf.py | 3 + docs/index.rst | 1 + docs/internals.rst | 3 - docs/parallel.rst | 78 +++++++++++++++++++-- lenskit/parallel/config.py | 26 +++---- 8 files changed, 92 insertions(+), 26 deletions(-) diff --git a/.vscode/ltex.dictionary.en-US.txt b/.vscode/ltex.dictionary.en-US.txt index eba16ce28..9fc80886b 100644 --- a/.vscode/ltex.dictionary.en-US.txt +++ b/.vscode/ltex.dictionary.en-US.txt @@ -4,3 +4,7 @@ YYYY CalVer backported semver-stable +invokers +lenskit +invoker +CUDA diff --git a/.vscode/ltex.hiddenFalsePositives.en-US.txt b/.vscode/ltex.hiddenFalsePositives.en-US.txt index 9bcc82e76..394fdfe8d 100644 --- a/.vscode/ltex.hiddenFalsePositives.en-US.txt +++ b/.vscode/ltex.hiddenFalsePositives.en-US.txt @@ -1 +1,2 @@ {"rule":"COMMA_COMPOUND_SENTENCE","sentence":"^\\QThis combination does mean that we may sometimes release a minor revision with the previous\nyear's major version number, if there are breaking changes in progress but not yet ready for\nrelease and we need to release new features or fixes for the current major version.\\E$"} +{"rule":"MISSING_GENITIVE","sentence":"^\\QPyTorch tensors, including those on CUDA devices, are shared.\\E$"} diff --git a/docs/batch.rst b/docs/batch.rst index 782028fa4..c7c97e664 100644 --- a/docs/batch.rst +++ b/docs/batch.rst @@ -1,3 +1,5 @@ +.. _batch:: + Batch-Running Recommenders ========================== diff --git a/docs/conf.py b/docs/conf.py index e90e0e32a..ad3c2514f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -121,6 +121,9 @@ "binpickle": ("https://binpickle.lenskit.org/en/stable/", None), "csr": ("https://csr.lenskit.org/en/latest/", None), "seedbank": ("https://seedbank.lenskit.org/en/latest/", None), + "progress_api": ("https://progress-api.readthedocs.io/en/latest/", None), + "manylog": ("https://manylog.readthedocs.io/en/latest/", None), + "torch": ("https://pytorch.org/docs/stable/", None), } autodoc_default_options = {"members": True, "member-order": "bysource", "show-inheritance": True} diff --git a/docs/index.rst b/docs/index.rst index 4ed6bcf5a..463fb2fb9 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -43,6 +43,7 @@ Resources batch evaluation/index documenting + parallel .. toctree:: :maxdepth: 1 diff --git a/docs/internals.rst b/docs/internals.rst index 4912867c5..2934e5fcf 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -4,6 +4,3 @@ LensKit Internals These modules are primarily for internal infrastructural support in Lenskit. Neither LensKit users nor algorithm developers are likely to need to use this code directly. - -.. toctree:: - parallel diff --git a/docs/parallel.rst b/docs/parallel.rst index 13eab4360..b69abe67c 100644 --- a/docs/parallel.rst +++ b/docs/parallel.rst @@ -3,8 +3,70 @@ Parallel Execution .. py:module:: lenskit.parallel +LensKit supports various forms of parallel execution, each with an environment +variable controlling its : + +- :doc:`Batch operations ` using :doc:`multi-process execution `. +- Parallel model training. For most models provided by LensKit, this is usually + implemented using PyTorch JIT parallelism (:func:`torch.jit.fork`). +- Parallel computation in the various backends (BLAS, MKL, Torch, etc.). + +Other models compatible with LensKit may use their own parallel processing logic. + +Configuring Parallelism +~~~~~~~~~~~~~~~~~~~~~~~ + +LensKit provides 4 knobs for configuring parallelism, each of which has a +corresponding environment variable and parameter to :py:func:`initialize`. The +environment variables are: + +.. envvar:: LK_NUM_PROCS + + The number of processes to use for batch operations. Defaults to the number + of CPUs or 4, whichever is lower. + +.. envvar:: LK_NUM_THREADS + + The number of threads to use for parallel model building. Defaults to the + number of CPUs or 8, whichever is smaller. + + This number is passed to :func:`torch.set_num_interop_threads` to set up the + Torch JIT thread count. + +.. envvar:: LK_NUM_BACKEND_THREADS + + The number of threads to be used by backend compute engines. Defaults to up + to 4 backend threads per training thread, depending on the capacity of the + machine:: + + max(min(NCPUS // LK_NUM_THREADS, 4), 1) + + This is passed to :func:`torch.set_num_threads` (to control PyTorch internal + parallelism), and to the underlying BLAS layer (via `threadpoolctl`_). + +.. envvar:: LK_NUM_CHILD_THREADS + + The number of backend threads to be used in worker processes spawned by + batch evaluation. Defaults to 4 per process, capped by the number of CPUs + available:: + + max(min(NCPUS // LK_NUM_PROCS, 4), 1) + + Workers have both the process and thread counts set to 1. + +.. _threadpoolctl: https://github.com/joblib/threadpoolctl + +.. autofunction:: initialize + +.. autofunction:: ensure_parallel_init + +.. _parallel-model-ops:: + +Parallel Model Ops +~~~~~~~~~~~~~~~~~~ + LensKit uses a custom API wrapping :py:class:`multiprocessing.pool.Pool` to -paralellize batch operations (see :py:mod:`lenskit.batch`). +parallelize batch operations (see :py:mod:`lenskit.batch`). The basic idea of this API is to create an *invoker* that has a model and a function, and then passing lists of argument sets to the function:: @@ -15,14 +77,20 @@ and then passing lists of argument sets to the function:: The model is persisted into shared memory to be used by the worker processes. PyTorch tensors, including those on CUDA devices, are shared. -Parallel Model Ops -~~~~~~~~~~~~~~~~~~ +LensKit users will generally not need to directly use parallel op invokers, but +if you are implementing new batch operations with parallelism they are useful. +They may also be useful for other kinds of analysis. .. autofunction:: invoker -.. autofunction:: proc_count - .. autoclass:: ModelOpInvoker :members: +Logging and Progress +~~~~~~~~~~~~~~~~~~~~ + +Multi-process op invokers automatically set up logging and progress reporting to +work across processes using the :py:mod:`manylog` package. Op invokers can also +report the progress of queued jobs to a :py:class:`progress_api.Progress`. +.. autofunction:: invoke_progress diff --git a/lenskit/parallel/config.py b/lenskit/parallel/config.py index 85692e826..e03982c12 100644 --- a/lenskit/parallel/config.py +++ b/lenskit/parallel/config.py @@ -43,28 +43,18 @@ def initialize( Args: processes: - The number of processes to use for multiprocessing evaluations. - Configured from ``LK_NUM_PROCS``. Defaults to the number of CPUs or - 4, whichever is smaller. + The number of processes to use for multiprocessing evaluations (see + :envvar:`LK_NUM_PROCS`) threads: The number of threads to use for parallel model training and similar - operations. This is passed to - :func:`torch.set_num_interop_threads`. Environment variable is - ``LK_NUM_THREADS``. Defaults to the number of CPUs or 8, whichever - is smaller, to avoid runaway thread coordination overhead on large - machines. + operations (see :envvar:`LK_NUM_THREADS`). backend_threads: - The number of threads underlying computational engines should use. - This is passed to :func:`torch.set_num_threads` and to the BLAS - threading layer. Configured from ``LK_NUM_BACKEND_THREADS``. + The number of threads underlying computational engines should use + (see :envvar:`LK_NUM_BACKEND_THREADS`). child_threads: The number of threads backends are allowed to use in the worker - processes in multiprocessing operations. This is like - ``backend_threads``, except it is passed to the underlying libraries - in worker processes. Environment variable is - ``LK_NUM_CHILD_THREADS``. Defaults is computed from the number of - CPUs with a max of 4 threads per worker. Child processes set both - ``processes`` and ``threads`` to 1. + processes in multiprocessing operations (see + :envvar:`LK_NUM_CHILD_THREADS`). """ global _config if _config: @@ -137,6 +127,6 @@ def _resolve_parallel_config( backend_threads = max(min(ncpus // threads, 4), 1) if child_threads is None: - child_threads = min(ncpus // processes, 4) + child_threads = max(min(ncpus // processes, 4), 1) return ParallelConfig(processes, threads, backend_threads, child_threads) From f06ebea456ac85b4b04b19f177562f57487c679f Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sun, 12 May 2024 16:09:09 -0400 Subject: [PATCH 04/10] bump minimum versions --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 24a1c16fa..ed07a65c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,8 +31,8 @@ dependencies = [ "threadpoolctl >=3.0", "binpickle >= 0.3.2", "seedbank >= 0.2.0a1", # p2c: -p - "progress-api >=0.1.0a6", # p2c: -p - "manylog >=0.1.0a3", # p2c: -p + "progress-api >=0.1.0a8", # p2c: -p + "manylog >=0.1.0a4", # p2c: -p "csr >= 0.5", ] From a7addfc815622af7b48218edd019ac3b8a233912 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sun, 12 May 2024 16:09:22 -0400 Subject: [PATCH 05/10] route progress to sequential workers --- lenskit/parallel/pool.py | 21 ++++++++++++++++----- lenskit/parallel/sequential.py | 11 ++++++++--- lenskit/parallel/worker.py | 13 ++++++++++--- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/lenskit/parallel/pool.py b/lenskit/parallel/pool.py index 066f8b6ee..7e73e81a2 100644 --- a/lenskit/parallel/pool.py +++ b/lenskit/parallel/pool.py @@ -15,7 +15,7 @@ import manylog import seedbank -from progress_api import Progress +from progress_api import Progress, null_progress from . import worker from .config import get_parallel_config @@ -27,6 +27,7 @@ class ProcessPoolOpInvoker(ModelOpInvoker[A, R], Generic[M, A, R]): + progress: Progress manager: SharedMemoryManager pool: ProcessPoolExecutor @@ -38,14 +39,16 @@ def __init__( _log.info("setting up process pool w/ %d workers", n_jobs) kid_tc = get_parallel_config().child_threads seed = seedbank.root_seed() - log_addr = ensure_log_listener() + manylog.initialize() + self.progress = progress or null_progress() self.manager = SharedMemoryManager() self.manager.start() + prog_uuid = manylog.share_progress(self.progress) try: - cfg = worker.WorkerConfig(kid_tc, seed, log_addr) - job = worker.WorkerContext(func, model) + cfg = worker.WorkerConfig(kid_tc, seed) + job = worker.WorkerContext(func, model, prog_uuid) job = shm_serialize(job, self.manager) self.pool = ProcessPoolExecutor(n_jobs, ctx, worker.initalize, (cfg, job)) except Exception as e: @@ -53,7 +56,15 @@ def __init__( raise e def map(self, tasks: Iterable[A]) -> Iterator[R]: - return self.pool.map(worker.worker, tasks) + return self.pool.map(worker.worker, self._task_iter(tasks)) + + def _task_iter(self, tasks: Iterable[A]): + """ + Yield the tasks, recording each as dispatched before it is yielded. + """ + for task in tasks: + self.progress.update(1, "dispatched") + yield task def shutdown(self): self.pool.shutdown() diff --git a/lenskit/parallel/sequential.py b/lenskit/parallel/sequential.py index 9199084ee..5fe95cc49 100644 --- a/lenskit/parallel/sequential.py +++ b/lenskit/parallel/sequential.py @@ -9,7 +9,7 @@ import logging from typing import Generic, Iterable, Iterator -from progress_api import Progress +from progress_api import Progress, null_progress from .invoker import A, InvokeOp, M, ModelOpInvoker, R @@ -19,15 +19,20 @@ class InProcessOpInvoker(ModelOpInvoker[A, R], Generic[M, A, R]): model: M function: InvokeOp[M, A, R] + progress: Progress | None = None def __init__(self, model: M, func: InvokeOp[M, A, R], progress: Progress | None = None): _log.info("setting up in-process worker") self.model = model self.function = func + self.progress = progress or null_progress() def map(self, tasks: Iterable[A]) -> Iterator[R]: - proc = ft.partial(self.function, self.model) - return map(proc, tasks) + for task in tasks: + self.progress.update(1, "in-progress") + res = self.function(self.model, task) + self.progress.update(1, "finished", "in-progress") + yield res def shutdown(self): del self.model diff --git a/lenskit/parallel/worker.py b/lenskit/parallel/worker.py index 3e12c1a61..69c916204 100644 --- a/lenskit/parallel/worker.py +++ b/lenskit/parallel/worker.py @@ -12,10 +12,12 @@ import pickle import warnings from typing import Any +from uuid import UUID import manylog import seedbank from numpy.random import SeedSequence +from progress_api import Progress from typing_extensions import Generic, NamedTuple from .config import initialize as init_parallel @@ -26,22 +28,23 @@ __work_context: WorkerContext +__progress: Progress class WorkerConfig(NamedTuple): threads: int seed: SeedSequence - log_addr: str class WorkerContext(NamedTuple, Generic[M, A, R]): func: InvokeOp[M, A, R] model: M + progress: UUID def initalize(cfg: WorkerConfig, ctx: ModelData) -> None: - global __work_context - manylog.init_worker_logging(cfg.log_addr) + global __work_context, __progress + manylog.initialize() init_parallel(processes=1, threads=1, backend_threads=cfg.threads, child_threads=1) seed = seedbank.derive_seed(mp.current_process().name, base=cfg.seed) @@ -54,9 +57,13 @@ def initalize(cfg: WorkerConfig, ctx: ModelData) -> None: _log.error("deserialization failed: %s", e) raise e + __progress = manylog.connect_progress(__work_context.progress) + _log.debug("worker %d ready (process %s)", os.getpid(), mp.current_process()) def worker(arg: Any) -> Any: + __progress.update(1, "in-progress", "dispatched") res = __work_context.func(__work_context.model, arg) + __progress.update(1, "finished", "in-progress") return res From eaacb7925821278da098bfd282fd401820db5eb2 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sun, 12 May 2024 16:11:03 -0400 Subject: [PATCH 06/10] add progress to recommend and predict --- lenskit/batch/_predict.py | 8 ++++++-- lenskit/batch/_recommend.py | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/lenskit/batch/_predict.py b/lenskit/batch/_predict.py index 5c571fca1..c7cf39c7b 100644 --- a/lenskit/batch/_predict.py +++ b/lenskit/batch/_predict.py @@ -8,12 +8,12 @@ import warnings import pandas as pd +from progress_api import make_progress from .. import util from ..parallel import invoker _logger = logging.getLogger(__name__) -_rec_context = None def _predict_user(model, req): @@ -82,7 +82,11 @@ def predict(algo, pairs, *, n_jobs=None, **kwargs): nusers = pairs["user"].nunique() timer = util.Stopwatch() - with invoker(algo, _predict_user, n_jobs=n_jobs) as worker: + nusers = pairs["user"].nunique() + with ( + make_progress(_logger, "predictions", nusers, unit="user") as progress, + invoker(algo, _predict_user, n_jobs=n_jobs, progress=progress) as worker, + ): del algo # maybe free some memory _logger.info( diff --git a/lenskit/batch/_recommend.py b/lenskit/batch/_recommend.py index 0ae0049e2..955e9f83a 100644 --- a/lenskit/batch/_recommend.py +++ b/lenskit/batch/_recommend.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd +from progress_api import make_progress from .. import util from ..algorithms import Recommender @@ -83,8 +84,11 @@ def recommend(algo, users, n, candidates=None, *, n_jobs=None, **kwargs): candidates = __standard_cand_fun(candidates) - with invoker(algo, _recommend_user, n_jobs=n_jobs) as worker: - _logger.info("recommending with %s for %d users (n_jobs=%s)", str(algo), len(users), n_jobs) + _logger.info("recommending with %s for %d users (n_jobs=%s)", str(algo), len(users), n_jobs) + with ( + make_progress(_logger, "recommending", len(users), unit="user") as progress, + invoker(algo, _recommend_user, n_jobs=n_jobs, progress=progress) as worker, + ): del algo timer = util.Stopwatch() results = worker.map((user, n, candidates(user)) for user in users) From 3a9dfec658658e193224803ede94a2cbaffae588 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sun, 12 May 2024 16:23:49 -0400 Subject: [PATCH 07/10] use invoke-progress --- lenskit/batch/_predict.py | 5 ++--- lenskit/batch/_recommend.py | 4 ++-- lenskit/parallel/invoker.py | 7 +++++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/lenskit/batch/_predict.py b/lenskit/batch/_predict.py index c7cf39c7b..fc76dd2ec 100644 --- a/lenskit/batch/_predict.py +++ b/lenskit/batch/_predict.py @@ -8,10 +8,9 @@ import warnings import pandas as pd -from progress_api import make_progress from .. import util -from ..parallel import invoker +from ..parallel import invoke_progress, invoker _logger = logging.getLogger(__name__) @@ -84,7 +83,7 @@ def predict(algo, pairs, *, n_jobs=None, **kwargs): timer = util.Stopwatch() nusers = pairs["user"].nunique() with ( - make_progress(_logger, "predictions", nusers, unit="user") as progress, + invoke_progress(_logger, "predictions", nusers, unit="user") as progress, invoker(algo, _predict_user, n_jobs=n_jobs, progress=progress) as worker, ): del algo # maybe free some memory diff --git a/lenskit/batch/_recommend.py b/lenskit/batch/_recommend.py index 955e9f83a..c3eca1a44 100644 --- a/lenskit/batch/_recommend.py +++ b/lenskit/batch/_recommend.py @@ -13,7 +13,7 @@ from .. import util from ..algorithms import Recommender -from ..parallel import invoker +from ..parallel import invoke_progress, invoker _logger = logging.getLogger(__name__) @@ -86,7 +86,7 @@ def recommend(algo, users, n, candidates=None, *, n_jobs=None, **kwargs): _logger.info("recommending with %s for %d users (n_jobs=%s)", str(algo), len(users), n_jobs) with ( - make_progress(_logger, "recommending", len(users), unit="user") as progress, + invoke_progress(_logger, "recommending", len(users), unit="user") as progress, invoker(algo, _recommend_user, n_jobs=n_jobs, progress=progress) as worker, ): del algo diff --git a/lenskit/parallel/invoker.py b/lenskit/parallel/invoker.py index 529166a82..c2fb4f266 100644 --- a/lenskit/parallel/invoker.py +++ b/lenskit/parallel/invoker.py @@ -22,7 +22,10 @@ def invoke_progress( - logger: str | Logger | None = None, label: str | None = None, total: int | None = None + logger: str | Logger | None = None, + label: str | None = None, + total: int | None = None, + unit: str | None = None, ) -> Progress: """ Create a progress bar for parallel tasks. It is populated with the @@ -31,7 +34,7 @@ def invoke_progress( See :func:`make_progress` for details on parameter meanings. """ return make_progress( - logger, label, total, outcomes="finished", states=["in-progress", "dispatched"] + logger, label, total, outcomes="finished", states=["in-progress", "dispatched"], unit=unit ) From 9ae8a81e2b372c9bc0afa666d07edc9b021f2f12 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sun, 12 May 2024 17:37:22 -0400 Subject: [PATCH 08/10] bump deps --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ed07a65c0..95cd78940 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,8 +31,8 @@ dependencies = [ "threadpoolctl >=3.0", "binpickle >= 0.3.2", "seedbank >= 0.2.0a1", # p2c: -p - "progress-api >=0.1.0a8", # p2c: -p - "manylog >=0.1.0a4", # p2c: -p + "progress-api >=0.1.0a9", # p2c: -p + "manylog >=0.1.0a5", # p2c: -p "csr >= 0.5", ] From c01d0042ff7e01e5410fbdef1d87831664794076 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sun, 12 May 2024 17:37:30 -0400 Subject: [PATCH 09/10] update dependencies --- envs/lenskit-py3.10-ci.yaml | 4 ++-- envs/lenskit-py3.10-dev.yaml | 4 ++-- envs/lenskit-py3.10-doc.yaml | 4 ++-- envs/lenskit-py3.10-test.yaml | 4 ++-- envs/lenskit-py3.11-ci.yaml | 4 ++-- envs/lenskit-py3.11-dev.yaml | 4 ++-- envs/lenskit-py3.11-doc.yaml | 4 ++-- envs/lenskit-py3.11-test.yaml | 4 ++-- 8 files changed, 16 insertions(+), 16 deletions(-) diff --git a/envs/lenskit-py3.10-ci.yaml b/envs/lenskit-py3.10-ci.yaml index 52a151bbd..76dfb8ce0 100644 --- a/envs/lenskit-py3.10-ci.yaml +++ b/envs/lenskit-py3.10-ci.yaml @@ -43,7 +43,7 @@ dependencies: - tqdm>=4 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 - unbeheader~=1.3 diff --git a/envs/lenskit-py3.10-dev.yaml b/envs/lenskit-py3.10-dev.yaml index 50b1b9e47..d6d552c92 100644 --- a/envs/lenskit-py3.10-dev.yaml +++ b/envs/lenskit-py3.10-dev.yaml @@ -52,7 +52,7 @@ dependencies: - tqdm>=4 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 - unbeheader~=1.3 diff --git a/envs/lenskit-py3.10-doc.yaml b/envs/lenskit-py3.10-doc.yaml index 695df08bb..b17c55bda 100644 --- a/envs/lenskit-py3.10-doc.yaml +++ b/envs/lenskit-py3.10-doc.yaml @@ -31,6 +31,6 @@ dependencies: - threadpoolctl>=3.0 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 diff --git a/envs/lenskit-py3.10-test.yaml b/envs/lenskit-py3.10-test.yaml index ad2b0dd50..2001c153d 100644 --- a/envs/lenskit-py3.10-test.yaml +++ b/envs/lenskit-py3.10-test.yaml @@ -31,6 +31,6 @@ dependencies: - threadpoolctl>=3.0 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 diff --git a/envs/lenskit-py3.11-ci.yaml b/envs/lenskit-py3.11-ci.yaml index e50554765..affbcba54 100644 --- a/envs/lenskit-py3.11-ci.yaml +++ b/envs/lenskit-py3.11-ci.yaml @@ -43,7 +43,7 @@ dependencies: - tqdm>=4 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 - unbeheader~=1.3 diff --git a/envs/lenskit-py3.11-dev.yaml b/envs/lenskit-py3.11-dev.yaml index f6823bfdf..69392a96d 100644 --- a/envs/lenskit-py3.11-dev.yaml +++ b/envs/lenskit-py3.11-dev.yaml @@ -52,7 +52,7 @@ dependencies: - tqdm>=4 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 - unbeheader~=1.3 diff --git a/envs/lenskit-py3.11-doc.yaml b/envs/lenskit-py3.11-doc.yaml index d7efce6f3..819a54c0e 100644 --- a/envs/lenskit-py3.11-doc.yaml +++ b/envs/lenskit-py3.11-doc.yaml @@ -31,6 +31,6 @@ dependencies: - threadpoolctl>=3.0 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 diff --git a/envs/lenskit-py3.11-test.yaml b/envs/lenskit-py3.11-test.yaml index a6de0af20..7e9c3d6b1 100644 --- a/envs/lenskit-py3.11-test.yaml +++ b/envs/lenskit-py3.11-test.yaml @@ -31,6 +31,6 @@ dependencies: - threadpoolctl>=3.0 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 From 732e5a2d07e4ec7aeb03670198ce1c94fcc6b9e4 Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Mon, 13 May 2024 10:20:12 -0400 Subject: [PATCH 10/10] increase deadline --- conftest.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/conftest.py b/conftest.py index b2b43dadd..3d455e7c5 100644 --- a/conftest.py +++ b/conftest.py @@ -11,6 +11,7 @@ from seedbank import initialize, numpy_rng from pytest import fixture +from hypothesis import settings logging.getLogger("numba").setLevel(logging.INFO) @@ -52,3 +53,6 @@ def pytest_collection_modifyitems(items): if evm is not None and slm is None: _log.debug("adding slow mark to %s", item) item.add_marker("slow") + + +settings.register_profile("default", deadline=1000)