From e4fea4168ffeee8dd4555c26cda548b121e85b77 Mon Sep 17 00:00:00 2001 From: Jake Lishman Date: Wed, 15 May 2024 16:52:17 +0100 Subject: [PATCH] Refactor parallelism utilities for public API `should_run_in_parallel` was added in a stable manner to enable backport to the 1.1 series, but from 1.3 onwards, we want this to be part of the public interface so that others can rely on it too. As part of this, the parallelisation configuration was made more robust and controllable with context managers. This is convenient beyond just for users - it makes it far easier to control the parallelism during the test suite runs. Several instances where different parts of Qiskit and its test suite reached into deep internals of the parallelism utilities and made significant assumptions about the internal logic are refactored to use public interfaces to achieve what they wanted to. The multiprocessing detection is changed from making OS-based assumptions about what Python does to simply querying the module for its configuration. This makes it more robust to changes in Python's handling (especially important since 3.14 will change the default start method on Unix). In the future, we may want to change to making these assumptions only if the user hasn't configured the `multiprocessing` start method themselves. --- .azure/test-linux.yml | 1 + .azure/test-macos.yml | 1 + .azure/test-windows.yml | 1 + .github/workflows/coverage.yml | 1 + .github/workflows/qpy.yml | 2 + .github/workflows/randomized_tests.yml | 1 + .github/workflows/slow.yml | 1 + .github/workflows/tests.yml | 3 + qiskit/circuit/quantumcircuit.py | 10 +- qiskit/pulse/schedule.py | 5 +- .../transpiler/passes/layout/sabre_layout.py | 6 +- .../transpiler/passes/routing/sabre_swap.py | 4 +- .../preset_passmanagers/builtin_plugins.py | 4 +- qiskit/user_config.py | 12 +- qiskit/utils/__init__.py | 19 +- qiskit/utils/multiprocessing.py | 56 ---- qiskit/utils/parallel.py | 284 ++++++++++++------ ...arallel-check-public-7faed5f3e20e1d03.yaml | 17 ++ test/python/compiler/test_scheduler.py | 4 +- test/python/compiler/test_transpiler.py | 12 +- test/python/test_util.py | 67 ----- test/python/utils/test_parallel.py | 143 +++++++-- test/utils/base.py | 19 -- tox.ini | 2 + 24 files changed, 381 insertions(+), 294 deletions(-) delete mode 100644 qiskit/utils/multiprocessing.py create mode 100644 releasenotes/notes/parallel-check-public-7faed5f3e20e1d03.yaml delete mode 100644 test/python/test_util.py diff --git a/.azure/test-linux.yml b/.azure/test-linux.yml index a641ae215efd..9a57ab6cc30b 100644 --- a/.azure/test-linux.yml +++ b/.azure/test-linux.yml @@ -122,6 +122,7 @@ jobs: popd env: QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE RUST_BACKTRACE: 1 displayName: 'Run Python tests' diff --git a/.azure/test-macos.yml b/.azure/test-macos.yml index 9e9620e8ecbb..08dbf229a70b 100644 --- a/.azure/test-macos.yml +++ b/.azure/test-macos.yml @@ -65,6 +65,7 @@ jobs: stestr run env: QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE RUST_BACKTRACE: 1 displayName: "Run tests" diff --git a/.azure/test-windows.yml b/.azure/test-windows.yml index 8d86456bd72f..40840735bce8 100644 --- a/.azure/test-windows.yml +++ b/.azure/test-windows.yml @@ -68,6 +68,7 @@ jobs: LANG: 'C.UTF-8' PYTHONIOENCODING: 'utf-8:backslashreplace' QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE RUST_BACKTRACE: 1 displayName: 'Run tests' diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index a9c1839487a3..8f7893e1732c 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -61,6 +61,7 @@ jobs: env: QISKIT_TEST_CAPTURE_STREAMS: 1 QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE PYTHON: "coverage run --source qiskit --parallel-mode" - name: Convert to lcov and combine data diff --git a/.github/workflows/qpy.yml b/.github/workflows/qpy.yml index 2c127add907a..da90ca8d2587 100644 --- a/.github/workflows/qpy.yml +++ b/.github/workflows/qpy.yml @@ -37,3 +37,5 @@ jobs: - name: Run QPY backwards compatibility tests working-directory: test/qpy_compat run: ./run_tests.sh + env: + QISKIT_IGNORE_USER_SETTINGS: TRUE diff --git a/.github/workflows/randomized_tests.yml b/.github/workflows/randomized_tests.yml index 1cc39893c5ec..fe3fd25f6f17 100644 --- a/.github/workflows/randomized_tests.yml +++ b/.github/workflows/randomized_tests.yml @@ -28,6 +28,7 @@ jobs: run: make test_randomized env: RUST_BACKTRACE: 1 + QISKIT_IGNORE_USER_SETTINGS: TRUE - name: Create comment on failed test run if: ${{ failure() }} uses: peter-evans/create-or-update-comment@v4 diff --git a/.github/workflows/slow.yml b/.github/workflows/slow.yml index 0207b1ec51f7..51f7690c3d50 100644 --- a/.github/workflows/slow.yml +++ b/.github/workflows/slow.yml @@ -26,6 +26,7 @@ jobs: env: RUST_BACKTRACE: 1 QISKIT_TESTS: "run_slow" + QISKIT_IGNORE_USER_SETTINGS: TRUE - name: Create comment on failed test run if: ${{ failure() }} uses: peter-evans/create-or-update-comment@v4 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 39439f7dd059..679ff99287bc 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -52,3 +52,6 @@ jobs: if: matrix.python-version == '3.10' - name: 'Run tests' run: stestr run + env: + QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE diff --git a/qiskit/circuit/quantumcircuit.py b/qiskit/circuit/quantumcircuit.py index 8d81aa09b187..69e7c85cfa4c 100644 --- a/qiskit/circuit/quantumcircuit.py +++ b/qiskit/circuit/quantumcircuit.py @@ -19,7 +19,7 @@ import collections.abc import copy as _copy import itertools -import multiprocessing as mp +import multiprocessing import typing from collections import OrderedDict, defaultdict, namedtuple from typing import ( @@ -41,7 +41,6 @@ from qiskit._accelerate.circuit import CircuitData from qiskit._accelerate.circuit import StandardGate from qiskit.exceptions import QiskitError -from qiskit.utils.multiprocessing import is_main_process from qiskit.circuit.instruction import Instruction from qiskit.circuit.gate import Gate from qiskit.circuit.parameter import Parameter @@ -1498,11 +1497,10 @@ def _cls_prefix(cls) -> str: def _name_update(self) -> None: """update name of instance using instance number""" - if not is_main_process(): - pid_name = f"-{mp.current_process().pid}" - else: + if multiprocessing.parent_process() is None: pid_name = "" - + else: + pid_name = f"-{multiprocessing.current_process().pid}" self.name = f"{self._base_name}-{self._cls_instances()}{pid_name}" def has_register(self, register: Register) -> bool: diff --git a/qiskit/pulse/schedule.py b/qiskit/pulse/schedule.py index d4753847b5d2..aab024f5b7ac 100644 --- a/qiskit/pulse/schedule.py +++ b/qiskit/pulse/schedule.py @@ -52,7 +52,6 @@ from qiskit.pulse.instructions import Instruction, Reference from qiskit.pulse.utils import instruction_duration_validation from qiskit.pulse.reference_manager import ReferenceManager -from qiskit.utils.multiprocessing import is_main_process from qiskit.utils import deprecate_arg from qiskit.utils.deprecate_pulse import deprecate_pulse_func @@ -146,7 +145,7 @@ def __init__( if name is None: name = self.prefix + str(next(self.instances_counter)) - if sys.platform != "win32" and not is_main_process(): + if sys.platform != "win32" and mp.parent_process() is not None: name += f"-{mp.current_process().pid}" self._name = name @@ -1007,7 +1006,7 @@ def __init__( if name is None: name = self.prefix + str(next(self.instances_counter)) - if sys.platform != "win32" and not is_main_process(): + if sys.platform != "win32" and mp.parent_process() is not None: name += f"-{mp.current_process().pid}" # This points to the parent schedule object in the current scope. diff --git a/qiskit/transpiler/passes/layout/sabre_layout.py b/qiskit/transpiler/passes/layout/sabre_layout.py index 78af67ad9118..61d13722633b 100644 --- a/qiskit/transpiler/passes/layout/sabre_layout.py +++ b/qiskit/transpiler/passes/layout/sabre_layout.py @@ -39,7 +39,7 @@ from qiskit.transpiler.passes.routing.sabre_swap import _build_sabre_dag, _apply_sabre_result from qiskit.transpiler.target import Target from qiskit.transpiler.coupling import CouplingMap -from qiskit.utils.parallel import CPU_COUNT +from qiskit.utils import default_num_processes logger = logging.getLogger(__name__) @@ -174,11 +174,11 @@ def __init__( self.max_iterations = max_iterations self.trials = swap_trials if swap_trials is None: - self.swap_trials = CPU_COUNT + self.swap_trials = default_num_processes() else: self.swap_trials = swap_trials if layout_trials is None: - self.layout_trials = CPU_COUNT + self.layout_trials = default_num_processes() else: self.layout_trials = layout_trials self.skip_routing = skip_routing diff --git a/qiskit/transpiler/passes/routing/sabre_swap.py b/qiskit/transpiler/passes/routing/sabre_swap.py index 238444067168..58be2ea09df7 100644 --- a/qiskit/transpiler/passes/routing/sabre_swap.py +++ b/qiskit/transpiler/passes/routing/sabre_swap.py @@ -29,7 +29,7 @@ from qiskit.transpiler.target import Target from qiskit.transpiler.passes.layout import disjoint_utils from qiskit.dagcircuit import DAGCircuit, DAGOpNode -from qiskit.utils.parallel import CPU_COUNT +from qiskit.utils import default_num_processes from qiskit._accelerate.sabre import sabre_routing, Heuristic, SetScaling, NeighborTable, SabreDAG from qiskit._accelerate.nlayout import NLayout @@ -167,7 +167,7 @@ def __init__(self, coupling_map, heuristic="basic", seed=None, fake_run=False, t self.heuristic = heuristic self.seed = seed if trials is None: - self.trials = CPU_COUNT + self.trials = default_num_processes() else: self.trials = trials diff --git a/qiskit/transpiler/preset_passmanagers/builtin_plugins.py b/qiskit/transpiler/preset_passmanagers/builtin_plugins.py index 9301588c0744..3c9698226cd9 100644 --- a/qiskit/transpiler/preset_passmanagers/builtin_plugins.py +++ b/qiskit/transpiler/preset_passmanagers/builtin_plugins.py @@ -67,7 +67,7 @@ SXGate, SXdgGate, ) -from qiskit.utils.parallel import CPU_COUNT +from qiskit.utils import default_num_processes from qiskit import user_config CONFIG = user_config.get_config() @@ -1029,5 +1029,5 @@ def _swap_mapped(property_set): def _get_trial_count(default_trials=5): if CONFIG.get("sabre_all_threads", None) or os.getenv("QISKIT_SABRE_ALL_THREADS"): - return max(CPU_COUNT, default_trials) + return max(default_num_processes(), default_trials) return default_trials diff --git a/qiskit/user_config.py b/qiskit/user_config.py index 22d12406b348..3361911e5370 100644 --- a/qiskit/user_config.py +++ b/qiskit/user_config.py @@ -245,15 +245,19 @@ def set_config(key, value, section=None, file_path=None): def get_config(): - """Read the config file from the default location or env var + """Read the config file from the default location or env var. - It will read a config file at either the default location - ~/.qiskit/settings.conf or if set the value of the QISKIT_SETTINGS env var. + It will read a config file at the location specified by the ``QISKIT_SETTINGS`` environment + variable if set, or ``$HOME/.qiskit/settings.conf`` if not. + + If the environment variable ``QISKIT_IGNORE_USER_SETTINGS`` is set to the string ``TRUE``, this + will return an empty configuration, regardless of all other variables. - It will return the parsed settings dict from the parsed config file. Returns: dict: The settings dict from the parsed config file. """ + if os.getenv("QISKIT_IGNORE_USER_SETTINGS", "false").lower() == "true": + return {} filename = os.getenv("QISKIT_SETTINGS", DEFAULT_FILENAME) if not os.path.isfile(filename): return {} diff --git a/qiskit/utils/__init__.py b/qiskit/utils/__init__.py index 30935437ebf2..9a7abe3c37f5 100644 --- a/qiskit/utils/__init__.py +++ b/qiskit/utils/__init__.py @@ -41,8 +41,10 @@ Multiprocessing =============== -.. autofunction:: local_hardware_info +.. autofunction:: default_num_processes .. autofunction:: is_main_process +.. autofunction:: local_hardware_info +.. autofunction:: should_run_in_parallel A helper function for calling a custom function with Python :class:`~concurrent.futures.ProcessPoolExecutor`. Tasks can be executed in parallel using this function. @@ -62,28 +64,33 @@ deprecate_func, deprecate_function, ) -from .multiprocessing import local_hardware_info -from .multiprocessing import is_main_process from .units import apply_prefix, detach_prefix from .classtools import wrap_method from .lazy_tester import LazyDependencyManager, LazyImportTester, LazySubprocessTester from . import optionals -from .parallel import parallel_map, should_run_in_parallel +from .parallel import ( + parallel_map, + should_run_in_parallel, + local_hardware_info, + is_main_process, + default_num_processes, +) __all__ = [ "LazyDependencyManager", "LazyImportTester", "LazySubprocessTester", "add_deprecation_to_docstring", + "apply_prefix", + "default_num_processes", "deprecate_arg", "deprecate_arguments", "deprecate_func", "deprecate_function", - "local_hardware_info", "is_main_process", - "apply_prefix", + "local_hardware_info", "parallel_map", "should_run_in_parallel", ] diff --git a/qiskit/utils/multiprocessing.py b/qiskit/utils/multiprocessing.py deleted file mode 100644 index e063a86245e3..000000000000 --- a/qiskit/utils/multiprocessing.py +++ /dev/null @@ -1,56 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2017. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Multiprocessing utilities""" - -import multiprocessing as mp -import platform -import os - - -def local_hardware_info(): - """Basic hardware information about the local machine. - - Gives actual number of CPU's in the machine, even when hyperthreading is - turned on. CPU count defaults to 1 when true count can't be determined. - - Returns: - dict: The hardware information. - """ - - if hasattr(os, "sched_getaffinity"): - num_cpus = len(os.sched_getaffinity(0)) - else: - num_cpus = os.cpu_count() - if num_cpus is None: - num_cpus = 1 - else: - num_cpus = int(num_cpus / 2) or 1 - - results = { - "python_compiler": platform.python_compiler(), - "python_build": ", ".join(platform.python_build()), - "python_version": platform.python_version(), - "os": platform.system(), - "cpus": num_cpus, - } - return results - - -def is_main_process(): - """Checks whether the current process is the main one""" - if platform.system() == "Windows": - return not isinstance(mp.current_process(), mp.context.SpawnProcess) - else: - return not isinstance( - mp.current_process(), (mp.context.ForkProcess, mp.context.SpawnProcess) - ) diff --git a/qiskit/utils/parallel.py b/qiskit/utils/parallel.py index f87eeb815967..2935cf4ce2a5 100644 --- a/qiskit/utils/parallel.py +++ b/qiskit/utils/parallel.py @@ -10,7 +10,11 @@ # copyright notice, and modified files need to carry a notice indicating # that they have been altered from the originals. -# This file is part of QuTiP: Quantum Toolbox in Python. +# The original implementation of Qiskit's `parallel_map` in our commit c9c4ed52 was substantially +# derived from QuTiP's (https://github.com/qutip/qutip) in `qutip/parallel.py` at their commit +# f22d3cb7. It has subsequently been significantly rewritten. +# +# The original implementation was used under these licence terms: # # Copyright (c) 2011 and later, Paul D. Nation and Robert J. Johansson. # All rights reserved. @@ -50,85 +54,213 @@ from __future__ import annotations +import contextlib +import functools +import multiprocessing import os -from concurrent.futures import ProcessPoolExecutor +import platform import sys +from concurrent.futures import ProcessPoolExecutor -from qiskit.exceptions import QiskitError -from qiskit.utils.multiprocessing import local_hardware_info from qiskit import user_config -def get_platform_parallel_default(): - """ - Returns the default parallelism flag value for the current platform. +CONFIG = user_config.get_config() - Returns: - parallel_default: The default parallelism flag value for the - current platform. - """ - # Default False on Windows - if sys.platform == "win32": - parallel_default = False - # On macOS default false on Python >=3.8 - elif sys.platform == "darwin": - parallel_default = False - # On linux (and other OSes) default to True +def _task_wrapper(param): + (task, value, task_args, task_kwargs) = param + return task(value, *task_args, **task_kwargs) + + +def _physical_cpus_assuming_twofold_smt(): + if (sched_getaffinity := getattr(os, "sched_getaffinity", None)) is not None: + # It is callable, just pylint doesn't recognise it as `os.sched_getaffinity` because of the + # `getattr`. + # pylint: disable=not-callable + num_cpus = len(sched_getaffinity(0)) else: - parallel_default = True + num_cpus = os.cpu_count() or 1 + return (num_cpus // 2) or 1 - return parallel_default +def _parallel_default(): + # We default to False on `spawn`-based multiprocessing implementations, True on everything else. + if (set_start_method := multiprocessing.get_start_method(allow_none=True)) is None: + # The method hasn't been explicitly set, but it would be badly behaved of us to set it for + # the user, so handle platform defaults. + return sys.platform not in ("darwin", "win32") + return set_start_method in ("fork", "forkserver") -CONFIG = user_config.get_config() -if os.getenv("QISKIT_PARALLEL", None) is not None: - PARALLEL_DEFAULT = os.getenv("QISKIT_PARALLEL", None).lower() == "true" -else: - PARALLEL_DEFAULT = get_platform_parallel_default() +@functools.cache +def default_num_processes() -> int: + """Get the number of processes that a multiprocessing parallel call will use by default. -# Set parallel flag -if os.getenv("QISKIT_IN_PARALLEL") is None: - os.environ["QISKIT_IN_PARALLEL"] = "FALSE" + Such functions typically also accept a ``num_processes`` keyword argument that will supersede + the value returned from this function. -if os.getenv("QISKIT_NUM_PROCS") is not None: - CPU_COUNT = int(os.getenv("QISKIT_NUM_PROCS")) -else: - CPU_COUNT = CONFIG.get("num_process", local_hardware_info()["cpus"]) + In order of priority (highest to lowest), the return value will be: + 1. The ``QISKIT_NUM_PROCS`` environment variable, if set. + 2. The ``num_processes`` key of the Qiskit user configuration file, if set. + 3. Half of the logical CPUs available to this process, if this can be determined. This is a + proxy for the number of physical CPUs, assuming two-fold simultaneous multithreading (SMT); + emperically, multiprocessing performance of Qiskit seems to be worse when attempting to use + SMT cores. + 4. 1, if all else fails. + """ + if (env_num_processes := os.getenv("QISKIT_NUM_PROCS")) is not None: + return int(env_num_processes) or 1 + if (user_num_processes := CONFIG.get("num_processes", None)) is not None: + return user_num_processes + return _physical_cpus_assuming_twofold_smt() -def _task_wrapper(param): - (task, value, task_args, task_kwargs) = param - return task(value, *task_args, **task_kwargs) +def local_hardware_info(): + """Basic hardware information about the local machine. + Attempts to estimate the number of physical CPUs in the machine, even when hyperthreading is + turned on. CPU count defaults to 1 when true count can't be determined. + + Returns: + dict: The hardware information. + """ + return { + "python_compiler": platform.python_compiler(), + "python_build": ", ".join(platform.python_build()), + "python_version": platform.python_version(), + "os": platform.system(), + "cpus": _physical_cpus_assuming_twofold_smt(), + } + + +def is_main_process() -> bool: + """Checks whether the current process is the main one. + + Since Python 3.8, this is identical to the standard Python way of calculating this:: + + >>> import multiprocessing + >>> multiprocessing.parent_process() is None + + This function is left for backwards compatibility, but there is little reason not to use the + built-in tooling of Python. + """ + return multiprocessing.parent_process() is None + + +_PARALLEL_OVERRIDE = None +_PARALLEL_IGNORE_USER_SETTINGS = False +_IN_PARALLEL_ALLOW_PARALLELISM = "FALSE" +_IN_PARALLEL_FORBID_PARALLELISM = "TRUE" + + +@functools.cache def should_run_in_parallel(num_processes: int | None = None) -> bool: - """Return whether the current parallelisation configuration suggests that we should run things - like :func:`parallel_map` in parallel (``True``) or degrade to serial (``False``). + """Decide whether a multiprocessing function should spawn subprocesses for parallelization. + + In particular, this is how :func:`parallel_map` decides whether to use multiprocessing or not. + The ``num_processes`` argument alone does not enforce parallelism; by default, Qiskit will only + use process-based parallelism when a ``fork``-like process spawning start method is in effect. + You can override this decision either by setting the :mod:`multiprocessing` start method you + use, setting the ``QISKIT_PARALLEL`` environment variable to ``"TRUE"``, or setting + ``parallel = true`` in your user settings file. + + This function includes two context managers that can be used to temporarily modify the return + value of this function: + + .. autofunction:: qiskit.utils::should_run_in_parallel.override + .. autofunction:: qiskit.utils::should_run_in_parallel.ignore_user_settings Args: - num_processes: the number of processes requested for use (if given). - """ - num_processes = CPU_COUNT if num_processes is None else num_processes - return ( - num_processes > 1 - and os.getenv("QISKIT_IN_PARALLEL", "FALSE") == "FALSE" - and CONFIG.get("parallel_enabled", PARALLEL_DEFAULT) - ) + num_processes: the maximum number of processes requested for use (``None`` implies the + default). + Examples: + Temporarily override the configured settings to disable parallelism:: -def parallel_map( # pylint: disable=dangerous-default-value - task, values, task_args=(), task_kwargs={}, num_processes=CPU_COUNT -): + >>> with should_run_in_parallel.override(True): + ... assert should_run_in_parallel(8) + >>> with should_run_in_parallel.override(False): + ... assert not should_run_in_parallel(8) + """ + # It's a configuration function with many simple choices - it'd be less clean to return late. + # pylint: disable=too-many-return-statements + num_processes = default_num_processes() if num_processes is None else num_processes + if num_processes < 2: + # There's no resources to parallelise over. + return False + if ( + os.getenv("QISKIT_IN_PARALLEL", _IN_PARALLEL_ALLOW_PARALLELISM) + != _IN_PARALLEL_ALLOW_PARALLELISM + ): + # This isn't a user-set variable; we set this to talk to our own child processes. + return False + if _PARALLEL_OVERRIDE is not None: + return _PARALLEL_OVERRIDE + if _PARALLEL_IGNORE_USER_SETTINGS: + return _parallel_default() + if (env_qiskit_parallel := os.getenv("QISKIT_PARALLEL")) is not None: + return env_qiskit_parallel.lower() == "true" + if (user_qiskit_parallel := CONFIG.get("parallel_enabled", None)) is not None: + return user_qiskit_parallel + # Otherwise, fallback to the default. + return _parallel_default() + + +@contextlib.contextmanager +def _parallel_ignore_user_settings(): + """A context manager within which :func:`should_run_in_parallel` will ignore environmental + configuration variables. + + In particular, the ``QISKIT_PARALLEL`` environment variable and the user-configuration file are + ignored within this context.""" + # The way around this would be to encapsulate `should_run_in_parallel` into a class, but since + # it's a singleton, it ends up being functionally no different to a global anyway. + global _PARALLEL_IGNORE_USER_SETTINGS # pylint: disable=global-statement + + should_run_in_parallel.cache_clear() + previous, _PARALLEL_IGNORE_USER_SETTINGS = _PARALLEL_IGNORE_USER_SETTINGS, True + try: + yield + finally: + _PARALLEL_IGNORE_USER_SETTINGS = previous + should_run_in_parallel.cache_clear() + + +@contextlib.contextmanager +def _parallel_override(value: bool): + """A context manager within which :func:`should_run_in_parallel` will return the given + ``value``. + + This is not a *complete* override; Qiskit will never attempt to parallelize if only a single + process is available, and will not allow process-based parallelism at a depth greater than 1.""" + # The way around this would be to encapsulate `should_run_in_parallel` into a class, but since + # it's a singleton, it ends up being functionally no different to a global anyway. + global _PARALLEL_OVERRIDE # pylint: disable=global-statement + + should_run_in_parallel.cache_clear() + previous, _PARALLEL_OVERRIDE = _PARALLEL_OVERRIDE, value + try: + yield + finally: + _PARALLEL_OVERRIDE = previous + should_run_in_parallel.cache_clear() + + +should_run_in_parallel.ignore_user_settings = _parallel_ignore_user_settings +should_run_in_parallel.override = _parallel_override + + +def parallel_map(task, values, task_args=(), task_kwargs=None, num_processes=None): """ Parallel execution of a mapping of `values` to the function `task`. This is functionally equivalent to:: result = [task(value, *task_args, **task_kwargs) for value in values] - This will parallelise the results if the number of ``values`` is greater than one, and the - current system configuration permits parallelization. + This will parallelise the results if the number of ``values`` is greater than one and + :func:`should_run_in_parallel` returns ``True``. If not, it will run in serial. Args: task (func): Function that is to be called for each value in ``values``. @@ -136,15 +268,13 @@ def parallel_map( # pylint: disable=dangerous-default-value evaluated. task_args (list): Optional additional arguments to the ``task`` function. task_kwargs (dict): Optional additional keyword argument to the ``task`` function. - num_processes (int): Number of processes to spawn. + num_processes (int): Number of processes to spawn. If not given, the return value of + :func:`default_num_processes` is used. Returns: result: The result list contains the value of ``task(value, *task_args, **task_kwargs)`` for each value in ``values``. - Raises: - QiskitError: If user interrupts via keyboard. - Examples: .. code-block:: python @@ -156,36 +286,18 @@ def func(_): return 0 parallel_map(func, list(range(10))); """ + task_kwargs = {} if task_kwargs is None else task_kwargs if num_processes is None: - num_processes = CPU_COUNT - if len(values) == 0: - return [] - if len(values) == 1: - return [task(values[0], *task_args, **task_kwargs)] - - if should_run_in_parallel(num_processes): - os.environ["QISKIT_IN_PARALLEL"] = "TRUE" - try: - results = [] - with ProcessPoolExecutor(max_workers=num_processes) as executor: - param = ((task, value, task_args, task_kwargs) for value in values) - future = executor.map(_task_wrapper, param) - - results = list(future) - - except (KeyboardInterrupt, Exception) as error: - if isinstance(error, KeyboardInterrupt): - os.environ["QISKIT_IN_PARALLEL"] = "FALSE" - raise QiskitError("Keyboard interrupt in parallel_map.") from error - # Otherwise just reset parallel flag and error - os.environ["QISKIT_IN_PARALLEL"] = "FALSE" - raise error - - os.environ["QISKIT_IN_PARALLEL"] = "FALSE" - return results - - results = [] - for _, value in enumerate(values): - result = task(value, *task_args, **task_kwargs) - results.append(result) - return results + num_processes = default_num_processes() + if len(values) < 2 or not should_run_in_parallel(num_processes): + return [task(value, *task_args, **task_kwargs) for value in values] + work_items = ((task, value, task_args, task_kwargs) for value in values) + + # This isn't a user-set variable; we set this to talk to our own child processes. + previous_in_parallel = os.getenv("QISKIT_IN_PARALLEL", _IN_PARALLEL_ALLOW_PARALLELISM) + os.environ["QISKIT_IN_PARALLEL"] = _IN_PARALLEL_FORBID_PARALLELISM + try: + with ProcessPoolExecutor(max_workers=num_processes) as executor: + return list(executor.map(_task_wrapper, work_items)) + finally: + os.environ["QISKIT_IN_PARALLEL"] = previous_in_parallel diff --git a/releasenotes/notes/parallel-check-public-7faed5f3e20e1d03.yaml b/releasenotes/notes/parallel-check-public-7faed5f3e20e1d03.yaml new file mode 100644 index 000000000000..78aef41f76c1 --- /dev/null +++ b/releasenotes/notes/parallel-check-public-7faed5f3e20e1d03.yaml @@ -0,0 +1,17 @@ +--- +features_misc: + - | + :mod:`qiskit.utils` now contains utilities to provide better control and inspection of Qiskit's + :mod:`multiprocessing` parallelization settings. In particular, one can now use + :func:`.should_run_in_parallel` to query whether :func:`.parallel_map` (and pass managers) will + launch subprocesses for suitable inputs, and use the context manager + :func:`.should_run_in_parallel.override` to temporarily override most system and user + configuration around this decision. + + An additional function, :func:`.default_num_processes` reads the default maximum number of + subprocesses that Qiskit will use for process-based parallelism. + - | + A new environment variable, ``QISKIT_IGNORE_USER_SETTINGS``, now controls whether to read the + user settings file on ``import qiskit``. If set to the string ``true``, the settings file + will not be read. This is useful for isolating certain instances of Qiskit from the system + environment, such as for testing. diff --git a/test/python/compiler/test_scheduler.py b/test/python/compiler/test_scheduler.py index c349bf054c3b..27ba28dfaeb7 100644 --- a/test/python/compiler/test_scheduler.py +++ b/test/python/compiler/test_scheduler.py @@ -17,6 +17,7 @@ from qiskit.pulse import InstructionScheduleMap, Schedule from qiskit.providers.fake_provider import FakeOpenPulse3Q, GenericBackendV2 from qiskit.compiler.scheduler import schedule +from qiskit.utils import should_run_in_parallel from test import QiskitTestCase # pylint: disable=wrong-import-order @@ -81,10 +82,9 @@ def test_schedules_single_circuit(self): def test_schedules_multiple_circuits(self): """Test scheduling of multiple circuits.""" - self.enable_parallel_processing() circuits = [self.circ, self.circ2] - with self.assertWarns(DeprecationWarning): + with self.assertWarns(DeprecationWarning), should_run_in_parallel.ignore_user_settings(): circuit_schedules = schedule(circuits, self.backend, method="asap") self.assertEqual(len(circuit_schedules), len(circuits)) diff --git a/test/python/compiler/test_transpiler.py b/test/python/compiler/test_transpiler.py index 5241304a0db4..388b4be1e4bd 100644 --- a/test/python/compiler/test_transpiler.py +++ b/test/python/compiler/test_transpiler.py @@ -81,7 +81,7 @@ from qiskit.providers.options import Options from qiskit.pulse import InstructionScheduleMap, Schedule, Play, Gaussian, DriveChannel from qiskit.quantum_info import Operator, random_unitary -from qiskit.utils import parallel +from qiskit.utils import should_run_in_parallel from qiskit.transpiler import CouplingMap, Layout, PassManager, TransformationPass from qiskit.transpiler.exceptions import TranspilerError, CircuitTooWideForTarget from qiskit.transpiler.passes import BarrierBeforeFinalMeasurements, GateDirection, VF2PostLayout @@ -2763,13 +2763,9 @@ def setUp(self): super().setUp() # Force parallel execution to True to test multiprocessing for this class - original_val = parallel.PARALLEL_DEFAULT - - def restore_default(): - parallel.PARALLEL_DEFAULT = original_val - - self.addCleanup(restore_default) - parallel.PARALLEL_DEFAULT = True + cm = should_run_in_parallel.override(True) + cm.__enter__() + self.addCleanup(cm.__exit__, None, None, None) @data(0, 1, 2, 3) def test_parallel_multiprocessing(self, opt_level): diff --git a/test/python/test_util.py b/test/python/test_util.py deleted file mode 100644 index d403ed004bc3..000000000000 --- a/test/python/test_util.py +++ /dev/null @@ -1,67 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2017, 2023. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for qiskit/utils""" - -from unittest import mock - -from qiskit.utils import multiprocessing -from test import QiskitTestCase # pylint: disable=wrong-import-order - - -class TestUtil(QiskitTestCase): - """Tests for qiskit/_util.py""" - - def test_local_hardware_five_cpu_count(self): - """Test cpu count is half when sched affinity is 5""" - with mock.patch.object(multiprocessing, "os"): - multiprocessing.os.sched_getaffinity = mock.MagicMock(return_value=set(range(5))) - result = multiprocessing.local_hardware_info() - self.assertEqual(2, result["cpus"]) - - def test_local_hardware_sixty_four_cpu_count(self): - """Test cpu count is 32 when sched affinity is 64""" - with mock.patch.object(multiprocessing, "os"): - multiprocessing.os.sched_getaffinity = mock.MagicMock(return_value=set(range(64))) - result = multiprocessing.local_hardware_info() - self.assertEqual(32, result["cpus"]) - - def test_local_hardware_no_cpu_count(self): - """Test cpu count fallback to 1 when true value can't be determined""" - with mock.patch.object(multiprocessing, "os"): - multiprocessing.os.sched_getaffinity = mock.MagicMock(return_value=set()) - result = multiprocessing.local_hardware_info() - self.assertEqual(1, result["cpus"]) - - def test_local_hardware_no_sched_five_count(self): - """Test cpu could if sched affinity method is missing and cpu count is 5.""" - with mock.patch.object(multiprocessing, "os", spec=[]): - multiprocessing.os.cpu_count = mock.MagicMock(return_value=5) - del multiprocessing.os.sched_getaffinity - result = multiprocessing.local_hardware_info() - self.assertEqual(2, result["cpus"]) - - def test_local_hardware_no_sched_sixty_four_count(self): - """Test cpu could if sched affinity method is missing and cpu count is 64.""" - with mock.patch.object(multiprocessing, "os", spec=[]): - multiprocessing.os.cpu_count = mock.MagicMock(return_value=64) - del multiprocessing.os.sched_getaffinity - result = multiprocessing.local_hardware_info() - self.assertEqual(32, result["cpus"]) - - def test_local_hardware_no_sched_no_count(self): - """Test cpu count fallback to 1 when no sched getaffinity available.""" - with mock.patch.object(multiprocessing, "os", spec=[]): - multiprocessing.os.cpu_count = mock.MagicMock(return_value=None) - del multiprocessing.os.sched_getaffinity - result = multiprocessing.local_hardware_info() - self.assertEqual(1, result["cpus"]) diff --git a/test/python/utils/test_parallel.py b/test/python/utils/test_parallel.py index f6de443b7b03..c805c994b373 100644 --- a/test/python/utils/test_parallel.py +++ b/test/python/utils/test_parallel.py @@ -12,12 +12,14 @@ """Tests for qiskit/tools/parallel""" import os +import subprocess +import sys +import tempfile import time import warnings +from unittest import mock -from unittest.mock import patch - -from qiskit.utils.parallel import get_platform_parallel_default, parallel_map +from qiskit.utils import local_hardware_info, should_run_in_parallel, parallel_map from qiskit import QuantumRegister, ClassicalRegister, QuantumCircuit from qiskit.pulse import Schedule from test import QiskitTestCase # pylint: disable=wrong-import-order @@ -43,36 +45,9 @@ def _build_simple_schedule(_): return Schedule() -class TestGetPlatformParallelDefault(QiskitTestCase): - """Tests get_parallel_default_for_platform.""" - - def test_windows_parallel_default(self): - """Verifies the parallel default for Windows.""" - with patch("sys.platform", "win32"): - parallel_default = get_platform_parallel_default() - self.assertEqual(parallel_default, False) - - def test_mac_os_unsupported_version_parallel_default(self): - """Verifies the parallel default for macOS.""" - with patch("sys.platform", "darwin"): - with patch("sys.version_info", (3, 8, 0, "final", 0)): - parallel_default = get_platform_parallel_default() - self.assertEqual(parallel_default, False) - - def test_other_os_parallel_default(self): - """Verifies the parallel default for Linux and other OSes.""" - with patch("sys.platform", "linux"): - parallel_default = get_platform_parallel_default() - self.assertEqual(parallel_default, True) - - class TestParallel(QiskitTestCase): """A class for testing parallel_map functionality.""" - def test_parallel_env_flag(self): - """Verify parallel env flag is set""" - self.assertEqual(os.getenv("QISKIT_IN_PARALLEL", None), "FALSE") - def test_parallel(self): """Test parallel_map""" ans = parallel_map(_parfunc, list(range(10))) @@ -89,3 +64,111 @@ def test_parallel_schedule_names(self): out_schedules = parallel_map(_build_simple_schedule, list(range(10))) names = [schedule.name for schedule in out_schedules] self.assertEqual(len(names), len(set(names))) + + +class TestUtilities(QiskitTestCase): + """Tests for parallel utilities.""" + + def test_local_hardware_five_cpu_count(self): + """Test cpu count is half when sched affinity is 5""" + with mock.patch.object(os, "sched_getaffinity", return_value=set(range(5)), create=True): + self.assertEqual(2, local_hardware_info()["cpus"]) + + def test_local_hardware_sixty_four_cpu_count(self): + """Test cpu count is 32 when sched affinity is 64""" + with mock.patch.object(os, "sched_getaffinity", return_value=set(range(64)), create=True): + self.assertEqual(32, local_hardware_info()["cpus"]) + + def test_local_hardware_no_cpu_count(self): + """Test cpu count fallback to 1 when true value can't be determined""" + with mock.patch.object(os, "sched_getaffinity", return_value=set(), create=True): + self.assertEqual(1, local_hardware_info()["cpus"]) + + def test_local_hardware_no_sched_five_count(self): + """Test cpu could if sched affinity method is missing and cpu count is 5.""" + with ( + mock.patch.object(os, "sched_getaffinity", None, create=True), + mock.patch.object(os, "cpu_count", return_value=5), + ): + self.assertEqual(2, local_hardware_info()["cpus"]) + + def test_local_hardware_no_sched_sixty_four_count(self): + """Test cpu could if sched affinity method is missing and cpu count is 64.""" + with ( + mock.patch.object(os, "sched_getaffinity", None, create=True), + mock.patch.object(os, "cpu_count", return_value=64), + ): + self.assertEqual(32, local_hardware_info()["cpus"]) + + def test_local_hardware_no_sched_no_count(self): + """Test cpu count fallback to 1 when no sched getaffinity available.""" + with ( + mock.patch.object(os, "sched_getaffinity", None, create=True), + mock.patch.object(os, "cpu_count", return_value=None), + ): + self.assertEqual(1, local_hardware_info()["cpus"]) + + def test_should_run_in_parallel_override(self): + """Test that the context managers allow overriding the default value.""" + natural = should_run_in_parallel(8) + with should_run_in_parallel.override(True): + self.assertTrue(should_run_in_parallel(8)) + self.assertEqual(should_run_in_parallel(8), natural) + with should_run_in_parallel.override(False): + self.assertFalse(should_run_in_parallel(8)) + self.assertEqual(should_run_in_parallel(8), natural) + + def test_should_run_in_parallel_ignore_user_settings(self): + """Test that the context managers allow overriding the user settings.""" + # This is a nasty one, because much of the user settings are read statically at `import + # qiskit`, which we're obviously already past. We want to override that, so we need a + # subprocess whose environment we control completely. + + # Windows is picky about opening files that are already opened for writing. Ideally we'd + # use a context manager with `delete_on_close=False` so we close the file, launch our + # subprocess and let the CM clean up on exit, but that argument only arrived in Python 3.12. + # pylint: disable=consider-using-with + settings_file = tempfile.NamedTemporaryFile(mode="w", encoding="utf8", delete=False) + print("[DEFAULT]", file=settings_file) + print("parallel = true", file=settings_file) + settings_file.close() + self.addCleanup(os.remove, settings_file.name) + + # Pass on all our environment, except for our own configuration, which we override with our + # custom settings file, + env = {key: value for key, value in os.environ.items() if not key.startswith("QISKIT")} + env["QISKIT_SETTINGS"] = settings_file.name + env["QISKIT_IN_PARALLEL"] = "FALSE" + env["QISKIT_PARALLEL"] = "TRUE" + env["QISKIT_IGNORE_USER_SETTINGS"] = "FALSE" + + script = """\ +import multiprocessing +from unittest.mock import patch +from qiskit.utils import should_run_in_parallel + +print(should_run_in_parallel()) +with ( + patch.object(multiprocessing, "get_start_method", return_value="forkserver"), + should_run_in_parallel.ignore_user_settings(), +): + print(should_run_in_parallel()) +with ( + patch.object(multiprocessing, "get_start_method", return_value="spawn"), + should_run_in_parallel.ignore_user_settings(), +): + print(should_run_in_parallel()) +""" + result = subprocess.run( + sys.executable, + input=script, + encoding="utf8", + text=True, + env=env, + check=True, + capture_output=True, + ) + user_settings, forkserver_default, spawn_default = result.stdout.splitlines() + self.assertEqual( + (user_settings, forkserver_default, spawn_default), ("True", "True", "False") + ) diff --git a/test/utils/base.py b/test/utils/base.py index 133666cfc7ad..21583141a464 100644 --- a/test/utils/base.py +++ b/test/utils/base.py @@ -28,7 +28,6 @@ import unittest from unittest.util import safe_repr -from qiskit.utils.parallel import get_platform_parallel_default from qiskit.exceptions import QiskitWarning from qiskit.utils import optionals as _optionals from qiskit.circuit import QuantumCircuit @@ -285,24 +284,6 @@ def assertDictAlmostEqual( msg = self._formatMessage(msg, error_msg) raise self.failureException(msg) - def enable_parallel_processing(self): - """ - Enables parallel processing, for the duration of a test, on platforms - that support it. This is done by temporarily overriding the value of - the QISKIT_PARALLEL environment variable with the platform specific default. - """ - parallel_default = str(get_platform_parallel_default()).upper() - - def set_parallel_env(name, value): - os.environ[name] = value - - self.addCleanup( - lambda value: set_parallel_env("QISKIT_PARALLEL", value), - os.getenv("QISKIT_PARALLEL", parallel_default), - ) - - os.environ["QISKIT_PARALLEL"] = parallel_default - class FullQiskitTestCase(QiskitTestCase): """further additions for Qiskit test cases, if ``testtools`` is available. diff --git a/tox.ini b/tox.ini index d4d822766675..b355a9b83126 100644 --- a/tox.ini +++ b/tox.ini @@ -15,11 +15,13 @@ setenv = QISKIT_SUPRESS_PACKAGING_WARNINGS=Y QISKIT_TEST_CAPTURE_STREAMS=1 QISKIT_PARALLEL=FALSE + QISKIT_IGNORE_USER_SETTINGS=TRUE passenv = RUSTUP_TOOLCHAIN RAYON_NUM_THREADS OMP_NUM_THREADS QISKIT_PARALLEL + QISKIT_IGNORE_USER_SETTINGS RUST_BACKTRACE SETUPTOOLS_ENABLE_FEATURES QISKIT_TESTS