diff --git a/.azure/test-linux.yml b/.azure/test-linux.yml index a641ae215efd..9a57ab6cc30b 100644 --- a/.azure/test-linux.yml +++ b/.azure/test-linux.yml @@ -122,6 +122,7 @@ jobs: popd env: QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE RUST_BACKTRACE: 1 displayName: 'Run Python tests' diff --git a/.azure/test-macos.yml b/.azure/test-macos.yml index 9e9620e8ecbb..08dbf229a70b 100644 --- a/.azure/test-macos.yml +++ b/.azure/test-macos.yml @@ -65,6 +65,7 @@ jobs: stestr run env: QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE RUST_BACKTRACE: 1 displayName: "Run tests" diff --git a/.azure/test-windows.yml b/.azure/test-windows.yml index 8d86456bd72f..40840735bce8 100644 --- a/.azure/test-windows.yml +++ b/.azure/test-windows.yml @@ -68,6 +68,7 @@ jobs: LANG: 'C.UTF-8' PYTHONIOENCODING: 'utf-8:backslashreplace' QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE RUST_BACKTRACE: 1 displayName: 'Run tests' diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index a9c1839487a3..8f7893e1732c 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -61,6 +61,7 @@ jobs: env: QISKIT_TEST_CAPTURE_STREAMS: 1 QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE PYTHON: "coverage run --source qiskit --parallel-mode" - name: Convert to lcov and combine data diff --git a/.github/workflows/qpy.yml b/.github/workflows/qpy.yml index 2c127add907a..da90ca8d2587 100644 --- a/.github/workflows/qpy.yml +++ b/.github/workflows/qpy.yml @@ -37,3 +37,5 @@ jobs: - name: Run QPY backwards compatibility tests working-directory: test/qpy_compat run: ./run_tests.sh + env: + QISKIT_IGNORE_USER_SETTINGS: TRUE diff --git a/.github/workflows/randomized_tests.yml b/.github/workflows/randomized_tests.yml index 1cc39893c5ec..fe3fd25f6f17 100644 --- a/.github/workflows/randomized_tests.yml +++ b/.github/workflows/randomized_tests.yml @@ -28,6 +28,7 @@ jobs: run: make test_randomized env: RUST_BACKTRACE: 1 + QISKIT_IGNORE_USER_SETTINGS: TRUE - name: Create comment on failed test run if: ${{ failure() }} uses: peter-evans/create-or-update-comment@v4 diff --git a/.github/workflows/slow.yml b/.github/workflows/slow.yml index 0207b1ec51f7..51f7690c3d50 100644 --- a/.github/workflows/slow.yml +++ b/.github/workflows/slow.yml @@ -26,6 +26,7 @@ jobs: env: RUST_BACKTRACE: 1 QISKIT_TESTS: "run_slow" + QISKIT_IGNORE_USER_SETTINGS: TRUE - name: Create comment on failed test run if: ${{ failure() }} uses: peter-evans/create-or-update-comment@v4 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 39439f7dd059..679ff99287bc 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -52,3 +52,6 @@ jobs: if: matrix.python-version == '3.10' - name: 'Run tests' run: stestr run + env: + QISKIT_PARALLEL: FALSE + QISKIT_IGNORE_USER_SETTINGS: TRUE diff --git a/qiskit/circuit/quantumcircuit.py b/qiskit/circuit/quantumcircuit.py index 8d81aa09b187..69e7c85cfa4c 100644 --- a/qiskit/circuit/quantumcircuit.py +++ b/qiskit/circuit/quantumcircuit.py @@ -19,7 +19,7 @@ import collections.abc import copy as _copy import itertools -import multiprocessing as mp +import multiprocessing import typing from collections import OrderedDict, defaultdict, namedtuple from typing import ( @@ -41,7 +41,6 @@ from qiskit._accelerate.circuit import CircuitData from qiskit._accelerate.circuit import StandardGate from qiskit.exceptions import QiskitError -from qiskit.utils.multiprocessing import is_main_process from qiskit.circuit.instruction import Instruction from qiskit.circuit.gate import Gate from qiskit.circuit.parameter import Parameter @@ -1498,11 +1497,10 @@ def _cls_prefix(cls) -> str: def _name_update(self) -> None: """update name of instance using instance number""" - if not is_main_process(): - pid_name = f"-{mp.current_process().pid}" - else: + if multiprocessing.parent_process() is None: pid_name = "" - + else: + pid_name = f"-{multiprocessing.current_process().pid}" self.name = f"{self._base_name}-{self._cls_instances()}{pid_name}" def has_register(self, register: Register) -> bool: diff --git a/qiskit/pulse/schedule.py b/qiskit/pulse/schedule.py index d4753847b5d2..aab024f5b7ac 100644 --- a/qiskit/pulse/schedule.py +++ b/qiskit/pulse/schedule.py @@ -52,7 +52,6 @@ from qiskit.pulse.instructions import Instruction, Reference from qiskit.pulse.utils import instruction_duration_validation from qiskit.pulse.reference_manager import ReferenceManager -from qiskit.utils.multiprocessing import is_main_process from qiskit.utils import deprecate_arg from qiskit.utils.deprecate_pulse import deprecate_pulse_func @@ -146,7 +145,7 @@ def __init__( if name is None: name = self.prefix + str(next(self.instances_counter)) - if sys.platform != "win32" and not is_main_process(): + if sys.platform != "win32" and mp.parent_process() is not None: name += f"-{mp.current_process().pid}" self._name = name @@ -1007,7 +1006,7 @@ def __init__( if name is None: name = self.prefix + str(next(self.instances_counter)) - if sys.platform != "win32" and not is_main_process(): + if sys.platform != "win32" and mp.parent_process() is not None: name += f"-{mp.current_process().pid}" # This points to the parent schedule object in the current scope. diff --git a/qiskit/transpiler/passes/layout/sabre_layout.py b/qiskit/transpiler/passes/layout/sabre_layout.py index 78af67ad9118..61d13722633b 100644 --- a/qiskit/transpiler/passes/layout/sabre_layout.py +++ b/qiskit/transpiler/passes/layout/sabre_layout.py @@ -39,7 +39,7 @@ from qiskit.transpiler.passes.routing.sabre_swap import _build_sabre_dag, _apply_sabre_result from qiskit.transpiler.target import Target from qiskit.transpiler.coupling import CouplingMap -from qiskit.utils.parallel import CPU_COUNT +from qiskit.utils import default_num_processes logger = logging.getLogger(__name__) @@ -174,11 +174,11 @@ def __init__( self.max_iterations = max_iterations self.trials = swap_trials if swap_trials is None: - self.swap_trials = CPU_COUNT + self.swap_trials = default_num_processes() else: self.swap_trials = swap_trials if layout_trials is None: - self.layout_trials = CPU_COUNT + self.layout_trials = default_num_processes() else: self.layout_trials = layout_trials self.skip_routing = skip_routing diff --git a/qiskit/transpiler/passes/routing/sabre_swap.py b/qiskit/transpiler/passes/routing/sabre_swap.py index 238444067168..58be2ea09df7 100644 --- a/qiskit/transpiler/passes/routing/sabre_swap.py +++ b/qiskit/transpiler/passes/routing/sabre_swap.py @@ -29,7 +29,7 @@ from qiskit.transpiler.target import Target from qiskit.transpiler.passes.layout import disjoint_utils from qiskit.dagcircuit import DAGCircuit, DAGOpNode -from qiskit.utils.parallel import CPU_COUNT +from qiskit.utils import default_num_processes from qiskit._accelerate.sabre import sabre_routing, Heuristic, SetScaling, NeighborTable, SabreDAG from qiskit._accelerate.nlayout import NLayout @@ -167,7 +167,7 @@ def __init__(self, coupling_map, heuristic="basic", seed=None, fake_run=False, t self.heuristic = heuristic self.seed = seed if trials is None: - self.trials = CPU_COUNT + self.trials = default_num_processes() else: self.trials = trials diff --git a/qiskit/transpiler/preset_passmanagers/builtin_plugins.py b/qiskit/transpiler/preset_passmanagers/builtin_plugins.py index 9301588c0744..3c9698226cd9 100644 --- a/qiskit/transpiler/preset_passmanagers/builtin_plugins.py +++ b/qiskit/transpiler/preset_passmanagers/builtin_plugins.py @@ -67,7 +67,7 @@ SXGate, SXdgGate, ) -from qiskit.utils.parallel import CPU_COUNT +from qiskit.utils import default_num_processes from qiskit import user_config CONFIG = user_config.get_config() @@ -1029,5 +1029,5 @@ def _swap_mapped(property_set): def _get_trial_count(default_trials=5): if CONFIG.get("sabre_all_threads", None) or os.getenv("QISKIT_SABRE_ALL_THREADS"): - return max(CPU_COUNT, default_trials) + return max(default_num_processes(), default_trials) return default_trials diff --git a/qiskit/user_config.py b/qiskit/user_config.py index 22d12406b348..3361911e5370 100644 --- a/qiskit/user_config.py +++ b/qiskit/user_config.py @@ -245,15 +245,19 @@ def set_config(key, value, section=None, file_path=None): def get_config(): - """Read the config file from the default location or env var + """Read the config file from the default location or env var. - It will read a config file at either the default location - ~/.qiskit/settings.conf or if set the value of the QISKIT_SETTINGS env var. + It will read a config file at the location specified by the ``QISKIT_SETTINGS`` environment + variable if set, or ``$HOME/.qiskit/settings.conf`` if not. + + If the environment variable ``QISKIT_IGNORE_USER_SETTINGS`` is set to the string ``TRUE``, this + will return an empty configuration, regardless of all other variables. - It will return the parsed settings dict from the parsed config file. Returns: dict: The settings dict from the parsed config file. """ + if os.getenv("QISKIT_IGNORE_USER_SETTINGS", "false").lower() == "true": + return {} filename = os.getenv("QISKIT_SETTINGS", DEFAULT_FILENAME) if not os.path.isfile(filename): return {} diff --git a/qiskit/utils/__init__.py b/qiskit/utils/__init__.py index 30935437ebf2..9a7abe3c37f5 100644 --- a/qiskit/utils/__init__.py +++ b/qiskit/utils/__init__.py @@ -41,8 +41,10 @@ Multiprocessing =============== -.. autofunction:: local_hardware_info +.. autofunction:: default_num_processes .. autofunction:: is_main_process +.. autofunction:: local_hardware_info +.. autofunction:: should_run_in_parallel A helper function for calling a custom function with Python :class:`~concurrent.futures.ProcessPoolExecutor`. Tasks can be executed in parallel using this function. @@ -62,28 +64,33 @@ deprecate_func, deprecate_function, ) -from .multiprocessing import local_hardware_info -from .multiprocessing import is_main_process from .units import apply_prefix, detach_prefix from .classtools import wrap_method from .lazy_tester import LazyDependencyManager, LazyImportTester, LazySubprocessTester from . import optionals -from .parallel import parallel_map, should_run_in_parallel +from .parallel import ( + parallel_map, + should_run_in_parallel, + local_hardware_info, + is_main_process, + default_num_processes, +) __all__ = [ "LazyDependencyManager", "LazyImportTester", "LazySubprocessTester", "add_deprecation_to_docstring", + "apply_prefix", + "default_num_processes", "deprecate_arg", "deprecate_arguments", "deprecate_func", "deprecate_function", - "local_hardware_info", "is_main_process", - "apply_prefix", + "local_hardware_info", "parallel_map", "should_run_in_parallel", ] diff --git a/qiskit/utils/multiprocessing.py b/qiskit/utils/multiprocessing.py deleted file mode 100644 index e063a86245e3..000000000000 --- a/qiskit/utils/multiprocessing.py +++ /dev/null @@ -1,56 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2017. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Multiprocessing utilities""" - -import multiprocessing as mp -import platform -import os - - -def local_hardware_info(): - """Basic hardware information about the local machine. - - Gives actual number of CPU's in the machine, even when hyperthreading is - turned on. CPU count defaults to 1 when true count can't be determined. - - Returns: - dict: The hardware information. - """ - - if hasattr(os, "sched_getaffinity"): - num_cpus = len(os.sched_getaffinity(0)) - else: - num_cpus = os.cpu_count() - if num_cpus is None: - num_cpus = 1 - else: - num_cpus = int(num_cpus / 2) or 1 - - results = { - "python_compiler": platform.python_compiler(), - "python_build": ", ".join(platform.python_build()), - "python_version": platform.python_version(), - "os": platform.system(), - "cpus": num_cpus, - } - return results - - -def is_main_process(): - """Checks whether the current process is the main one""" - if platform.system() == "Windows": - return not isinstance(mp.current_process(), mp.context.SpawnProcess) - else: - return not isinstance( - mp.current_process(), (mp.context.ForkProcess, mp.context.SpawnProcess) - ) diff --git a/qiskit/utils/parallel.py b/qiskit/utils/parallel.py index f87eeb815967..2935cf4ce2a5 100644 --- a/qiskit/utils/parallel.py +++ b/qiskit/utils/parallel.py @@ -10,7 +10,11 @@ # copyright notice, and modified files need to carry a notice indicating # that they have been altered from the originals. -# This file is part of QuTiP: Quantum Toolbox in Python. +# The original implementation of Qiskit's `parallel_map` in our commit c9c4ed52 was substantially +# derived from QuTiP's (https://github.com/qutip/qutip) in `qutip/parallel.py` at their commit +# f22d3cb7. It has subsequently been significantly rewritten. +# +# The original implementation was used under these licence terms: # # Copyright (c) 2011 and later, Paul D. Nation and Robert J. Johansson. # All rights reserved. @@ -50,85 +54,213 @@ from __future__ import annotations +import contextlib +import functools +import multiprocessing import os -from concurrent.futures import ProcessPoolExecutor +import platform import sys +from concurrent.futures import ProcessPoolExecutor -from qiskit.exceptions import QiskitError -from qiskit.utils.multiprocessing import local_hardware_info from qiskit import user_config -def get_platform_parallel_default(): - """ - Returns the default parallelism flag value for the current platform. +CONFIG = user_config.get_config() - Returns: - parallel_default: The default parallelism flag value for the - current platform. - """ - # Default False on Windows - if sys.platform == "win32": - parallel_default = False - # On macOS default false on Python >=3.8 - elif sys.platform == "darwin": - parallel_default = False - # On linux (and other OSes) default to True +def _task_wrapper(param): + (task, value, task_args, task_kwargs) = param + return task(value, *task_args, **task_kwargs) + + +def _physical_cpus_assuming_twofold_smt(): + if (sched_getaffinity := getattr(os, "sched_getaffinity", None)) is not None: + # It is callable, just pylint doesn't recognise it as `os.sched_getaffinity` because of the + # `getattr`. + # pylint: disable=not-callable + num_cpus = len(sched_getaffinity(0)) else: - parallel_default = True + num_cpus = os.cpu_count() or 1 + return (num_cpus // 2) or 1 - return parallel_default +def _parallel_default(): + # We default to False on `spawn`-based multiprocessing implementations, True on everything else. + if (set_start_method := multiprocessing.get_start_method(allow_none=True)) is None: + # The method hasn't been explicitly set, but it would be badly behaved of us to set it for + # the user, so handle platform defaults. + return sys.platform not in ("darwin", "win32") + return set_start_method in ("fork", "forkserver") -CONFIG = user_config.get_config() -if os.getenv("QISKIT_PARALLEL", None) is not None: - PARALLEL_DEFAULT = os.getenv("QISKIT_PARALLEL", None).lower() == "true" -else: - PARALLEL_DEFAULT = get_platform_parallel_default() +@functools.cache +def default_num_processes() -> int: + """Get the number of processes that a multiprocessing parallel call will use by default. -# Set parallel flag -if os.getenv("QISKIT_IN_PARALLEL") is None: - os.environ["QISKIT_IN_PARALLEL"] = "FALSE" + Such functions typically also accept a ``num_processes`` keyword argument that will supersede + the value returned from this function. -if os.getenv("QISKIT_NUM_PROCS") is not None: - CPU_COUNT = int(os.getenv("QISKIT_NUM_PROCS")) -else: - CPU_COUNT = CONFIG.get("num_process", local_hardware_info()["cpus"]) + In order of priority (highest to lowest), the return value will be: + 1. The ``QISKIT_NUM_PROCS`` environment variable, if set. + 2. The ``num_processes`` key of the Qiskit user configuration file, if set. + 3. Half of the logical CPUs available to this process, if this can be determined. This is a + proxy for the number of physical CPUs, assuming two-fold simultaneous multithreading (SMT); + emperically, multiprocessing performance of Qiskit seems to be worse when attempting to use + SMT cores. + 4. 1, if all else fails. + """ + if (env_num_processes := os.getenv("QISKIT_NUM_PROCS")) is not None: + return int(env_num_processes) or 1 + if (user_num_processes := CONFIG.get("num_processes", None)) is not None: + return user_num_processes + return _physical_cpus_assuming_twofold_smt() -def _task_wrapper(param): - (task, value, task_args, task_kwargs) = param - return task(value, *task_args, **task_kwargs) +def local_hardware_info(): + """Basic hardware information about the local machine. + Attempts to estimate the number of physical CPUs in the machine, even when hyperthreading is + turned on. CPU count defaults to 1 when true count can't be determined. + + Returns: + dict: The hardware information. + """ + return { + "python_compiler": platform.python_compiler(), + "python_build": ", ".join(platform.python_build()), + "python_version": platform.python_version(), + "os": platform.system(), + "cpus": _physical_cpus_assuming_twofold_smt(), + } + + +def is_main_process() -> bool: + """Checks whether the current process is the main one. + + Since Python 3.8, this is identical to the standard Python way of calculating this:: + + >>> import multiprocessing + >>> multiprocessing.parent_process() is None + + This function is left for backwards compatibility, but there is little reason not to use the + built-in tooling of Python. + """ + return multiprocessing.parent_process() is None + + +_PARALLEL_OVERRIDE = None +_PARALLEL_IGNORE_USER_SETTINGS = False +_IN_PARALLEL_ALLOW_PARALLELISM = "FALSE" +_IN_PARALLEL_FORBID_PARALLELISM = "TRUE" + + +@functools.cache def should_run_in_parallel(num_processes: int | None = None) -> bool: - """Return whether the current parallelisation configuration suggests that we should run things - like :func:`parallel_map` in parallel (``True``) or degrade to serial (``False``). + """Decide whether a multiprocessing function should spawn subprocesses for parallelization. + + In particular, this is how :func:`parallel_map` decides whether to use multiprocessing or not. + The ``num_processes`` argument alone does not enforce parallelism; by default, Qiskit will only + use process-based parallelism when a ``fork``-like process spawning start method is in effect. + You can override this decision either by setting the :mod:`multiprocessing` start method you + use, setting the ``QISKIT_PARALLEL`` environment variable to ``"TRUE"``, or setting + ``parallel = true`` in your user settings file. + + This function includes two context managers that can be used to temporarily modify the return + value of this function: + + .. autofunction:: qiskit.utils::should_run_in_parallel.override + .. autofunction:: qiskit.utils::should_run_in_parallel.ignore_user_settings Args: - num_processes: the number of processes requested for use (if given). - """ - num_processes = CPU_COUNT if num_processes is None else num_processes - return ( - num_processes > 1 - and os.getenv("QISKIT_IN_PARALLEL", "FALSE") == "FALSE" - and CONFIG.get("parallel_enabled", PARALLEL_DEFAULT) - ) + num_processes: the maximum number of processes requested for use (``None`` implies the + default). + Examples: + Temporarily override the configured settings to disable parallelism:: -def parallel_map( # pylint: disable=dangerous-default-value - task, values, task_args=(), task_kwargs={}, num_processes=CPU_COUNT -): + >>> with should_run_in_parallel.override(True): + ... assert should_run_in_parallel(8) + >>> with should_run_in_parallel.override(False): + ... assert not should_run_in_parallel(8) + """ + # It's a configuration function with many simple choices - it'd be less clean to return late. + # pylint: disable=too-many-return-statements + num_processes = default_num_processes() if num_processes is None else num_processes + if num_processes < 2: + # There's no resources to parallelise over. + return False + if ( + os.getenv("QISKIT_IN_PARALLEL", _IN_PARALLEL_ALLOW_PARALLELISM) + != _IN_PARALLEL_ALLOW_PARALLELISM + ): + # This isn't a user-set variable; we set this to talk to our own child processes. + return False + if _PARALLEL_OVERRIDE is not None: + return _PARALLEL_OVERRIDE + if _PARALLEL_IGNORE_USER_SETTINGS: + return _parallel_default() + if (env_qiskit_parallel := os.getenv("QISKIT_PARALLEL")) is not None: + return env_qiskit_parallel.lower() == "true" + if (user_qiskit_parallel := CONFIG.get("parallel_enabled", None)) is not None: + return user_qiskit_parallel + # Otherwise, fallback to the default. + return _parallel_default() + + +@contextlib.contextmanager +def _parallel_ignore_user_settings(): + """A context manager within which :func:`should_run_in_parallel` will ignore environmental + configuration variables. + + In particular, the ``QISKIT_PARALLEL`` environment variable and the user-configuration file are + ignored within this context.""" + # The way around this would be to encapsulate `should_run_in_parallel` into a class, but since + # it's a singleton, it ends up being functionally no different to a global anyway. + global _PARALLEL_IGNORE_USER_SETTINGS # pylint: disable=global-statement + + should_run_in_parallel.cache_clear() + previous, _PARALLEL_IGNORE_USER_SETTINGS = _PARALLEL_IGNORE_USER_SETTINGS, True + try: + yield + finally: + _PARALLEL_IGNORE_USER_SETTINGS = previous + should_run_in_parallel.cache_clear() + + +@contextlib.contextmanager +def _parallel_override(value: bool): + """A context manager within which :func:`should_run_in_parallel` will return the given + ``value``. + + This is not a *complete* override; Qiskit will never attempt to parallelize if only a single + process is available, and will not allow process-based parallelism at a depth greater than 1.""" + # The way around this would be to encapsulate `should_run_in_parallel` into a class, but since + # it's a singleton, it ends up being functionally no different to a global anyway. + global _PARALLEL_OVERRIDE # pylint: disable=global-statement + + should_run_in_parallel.cache_clear() + previous, _PARALLEL_OVERRIDE = _PARALLEL_OVERRIDE, value + try: + yield + finally: + _PARALLEL_OVERRIDE = previous + should_run_in_parallel.cache_clear() + + +should_run_in_parallel.ignore_user_settings = _parallel_ignore_user_settings +should_run_in_parallel.override = _parallel_override + + +def parallel_map(task, values, task_args=(), task_kwargs=None, num_processes=None): """ Parallel execution of a mapping of `values` to the function `task`. This is functionally equivalent to:: result = [task(value, *task_args, **task_kwargs) for value in values] - This will parallelise the results if the number of ``values`` is greater than one, and the - current system configuration permits parallelization. + This will parallelise the results if the number of ``values`` is greater than one and + :func:`should_run_in_parallel` returns ``True``. If not, it will run in serial. Args: task (func): Function that is to be called for each value in ``values``. @@ -136,15 +268,13 @@ def parallel_map( # pylint: disable=dangerous-default-value evaluated. task_args (list): Optional additional arguments to the ``task`` function. task_kwargs (dict): Optional additional keyword argument to the ``task`` function. - num_processes (int): Number of processes to spawn. + num_processes (int): Number of processes to spawn. If not given, the return value of + :func:`default_num_processes` is used. Returns: result: The result list contains the value of ``task(value, *task_args, **task_kwargs)`` for each value in ``values``. - Raises: - QiskitError: If user interrupts via keyboard. - Examples: .. code-block:: python @@ -156,36 +286,18 @@ def func(_): return 0 parallel_map(func, list(range(10))); """ + task_kwargs = {} if task_kwargs is None else task_kwargs if num_processes is None: - num_processes = CPU_COUNT - if len(values) == 0: - return [] - if len(values) == 1: - return [task(values[0], *task_args, **task_kwargs)] - - if should_run_in_parallel(num_processes): - os.environ["QISKIT_IN_PARALLEL"] = "TRUE" - try: - results = [] - with ProcessPoolExecutor(max_workers=num_processes) as executor: - param = ((task, value, task_args, task_kwargs) for value in values) - future = executor.map(_task_wrapper, param) - - results = list(future) - - except (KeyboardInterrupt, Exception) as error: - if isinstance(error, KeyboardInterrupt): - os.environ["QISKIT_IN_PARALLEL"] = "FALSE" - raise QiskitError("Keyboard interrupt in parallel_map.") from error - # Otherwise just reset parallel flag and error - os.environ["QISKIT_IN_PARALLEL"] = "FALSE" - raise error - - os.environ["QISKIT_IN_PARALLEL"] = "FALSE" - return results - - results = [] - for _, value in enumerate(values): - result = task(value, *task_args, **task_kwargs) - results.append(result) - return results + num_processes = default_num_processes() + if len(values) < 2 or not should_run_in_parallel(num_processes): + return [task(value, *task_args, **task_kwargs) for value in values] + work_items = ((task, value, task_args, task_kwargs) for value in values) + + # This isn't a user-set variable; we set this to talk to our own child processes. + previous_in_parallel = os.getenv("QISKIT_IN_PARALLEL", _IN_PARALLEL_ALLOW_PARALLELISM) + os.environ["QISKIT_IN_PARALLEL"] = _IN_PARALLEL_FORBID_PARALLELISM + try: + with ProcessPoolExecutor(max_workers=num_processes) as executor: + return list(executor.map(_task_wrapper, work_items)) + finally: + os.environ["QISKIT_IN_PARALLEL"] = previous_in_parallel diff --git a/releasenotes/notes/parallel-check-public-7faed5f3e20e1d03.yaml b/releasenotes/notes/parallel-check-public-7faed5f3e20e1d03.yaml new file mode 100644 index 000000000000..78aef41f76c1 --- /dev/null +++ b/releasenotes/notes/parallel-check-public-7faed5f3e20e1d03.yaml @@ -0,0 +1,17 @@ +--- +features_misc: + - | + :mod:`qiskit.utils` now contains utilities to provide better control and inspection of Qiskit's + :mod:`multiprocessing` parallelization settings. In particular, one can now use + :func:`.should_run_in_parallel` to query whether :func:`.parallel_map` (and pass managers) will + launch subprocesses for suitable inputs, and use the context manager + :func:`.should_run_in_parallel.override` to temporarily override most system and user + configuration around this decision. + + An additional function, :func:`.default_num_processes` reads the default maximum number of + subprocesses that Qiskit will use for process-based parallelism. + - | + A new environment variable, ``QISKIT_IGNORE_USER_SETTINGS``, now controls whether to read the + user settings file on ``import qiskit``. If set to the string ``true``, the settings file + will not be read. This is useful for isolating certain instances of Qiskit from the system + environment, such as for testing. diff --git a/test/python/compiler/test_scheduler.py b/test/python/compiler/test_scheduler.py index c349bf054c3b..27ba28dfaeb7 100644 --- a/test/python/compiler/test_scheduler.py +++ b/test/python/compiler/test_scheduler.py @@ -17,6 +17,7 @@ from qiskit.pulse import InstructionScheduleMap, Schedule from qiskit.providers.fake_provider import FakeOpenPulse3Q, GenericBackendV2 from qiskit.compiler.scheduler import schedule +from qiskit.utils import should_run_in_parallel from test import QiskitTestCase # pylint: disable=wrong-import-order @@ -81,10 +82,9 @@ def test_schedules_single_circuit(self): def test_schedules_multiple_circuits(self): """Test scheduling of multiple circuits.""" - self.enable_parallel_processing() circuits = [self.circ, self.circ2] - with self.assertWarns(DeprecationWarning): + with self.assertWarns(DeprecationWarning), should_run_in_parallel.ignore_user_settings(): circuit_schedules = schedule(circuits, self.backend, method="asap") self.assertEqual(len(circuit_schedules), len(circuits)) diff --git a/test/python/compiler/test_transpiler.py b/test/python/compiler/test_transpiler.py index 5241304a0db4..388b4be1e4bd 100644 --- a/test/python/compiler/test_transpiler.py +++ b/test/python/compiler/test_transpiler.py @@ -81,7 +81,7 @@ from qiskit.providers.options import Options from qiskit.pulse import InstructionScheduleMap, Schedule, Play, Gaussian, DriveChannel from qiskit.quantum_info import Operator, random_unitary -from qiskit.utils import parallel +from qiskit.utils import should_run_in_parallel from qiskit.transpiler import CouplingMap, Layout, PassManager, TransformationPass from qiskit.transpiler.exceptions import TranspilerError, CircuitTooWideForTarget from qiskit.transpiler.passes import BarrierBeforeFinalMeasurements, GateDirection, VF2PostLayout @@ -2763,13 +2763,9 @@ def setUp(self): super().setUp() # Force parallel execution to True to test multiprocessing for this class - original_val = parallel.PARALLEL_DEFAULT - - def restore_default(): - parallel.PARALLEL_DEFAULT = original_val - - self.addCleanup(restore_default) - parallel.PARALLEL_DEFAULT = True + cm = should_run_in_parallel.override(True) + cm.__enter__() + self.addCleanup(cm.__exit__, None, None, None) @data(0, 1, 2, 3) def test_parallel_multiprocessing(self, opt_level): diff --git a/test/python/test_util.py b/test/python/test_util.py deleted file mode 100644 index d403ed004bc3..000000000000 --- a/test/python/test_util.py +++ /dev/null @@ -1,67 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2017, 2023. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for qiskit/utils""" - -from unittest import mock - -from qiskit.utils import multiprocessing -from test import QiskitTestCase # pylint: disable=wrong-import-order - - -class TestUtil(QiskitTestCase): - """Tests for qiskit/_util.py""" - - def test_local_hardware_five_cpu_count(self): - """Test cpu count is half when sched affinity is 5""" - with mock.patch.object(multiprocessing, "os"): - multiprocessing.os.sched_getaffinity = mock.MagicMock(return_value=set(range(5))) - result = multiprocessing.local_hardware_info() - self.assertEqual(2, result["cpus"]) - - def test_local_hardware_sixty_four_cpu_count(self): - """Test cpu count is 32 when sched affinity is 64""" - with mock.patch.object(multiprocessing, "os"): - multiprocessing.os.sched_getaffinity = mock.MagicMock(return_value=set(range(64))) - result = multiprocessing.local_hardware_info() - self.assertEqual(32, result["cpus"]) - - def test_local_hardware_no_cpu_count(self): - """Test cpu count fallback to 1 when true value can't be determined""" - with mock.patch.object(multiprocessing, "os"): - multiprocessing.os.sched_getaffinity = mock.MagicMock(return_value=set()) - result = multiprocessing.local_hardware_info() - self.assertEqual(1, result["cpus"]) - - def test_local_hardware_no_sched_five_count(self): - """Test cpu could if sched affinity method is missing and cpu count is 5.""" - with mock.patch.object(multiprocessing, "os", spec=[]): - multiprocessing.os.cpu_count = mock.MagicMock(return_value=5) - del multiprocessing.os.sched_getaffinity - result = multiprocessing.local_hardware_info() - self.assertEqual(2, result["cpus"]) - - def test_local_hardware_no_sched_sixty_four_count(self): - """Test cpu could if sched affinity method is missing and cpu count is 64.""" - with mock.patch.object(multiprocessing, "os", spec=[]): - multiprocessing.os.cpu_count = mock.MagicMock(return_value=64) - del multiprocessing.os.sched_getaffinity - result = multiprocessing.local_hardware_info() - self.assertEqual(32, result["cpus"]) - - def test_local_hardware_no_sched_no_count(self): - """Test cpu count fallback to 1 when no sched getaffinity available.""" - with mock.patch.object(multiprocessing, "os", spec=[]): - multiprocessing.os.cpu_count = mock.MagicMock(return_value=None) - del multiprocessing.os.sched_getaffinity - result = multiprocessing.local_hardware_info() - self.assertEqual(1, result["cpus"]) diff --git a/test/python/utils/test_parallel.py b/test/python/utils/test_parallel.py index f6de443b7b03..944bf4083c4b 100644 --- a/test/python/utils/test_parallel.py +++ b/test/python/utils/test_parallel.py @@ -12,12 +12,14 @@ """Tests for qiskit/tools/parallel""" import os +import subprocess +import sys +import tempfile import time import warnings +from unittest import mock -from unittest.mock import patch - -from qiskit.utils.parallel import get_platform_parallel_default, parallel_map +from qiskit.utils import local_hardware_info, should_run_in_parallel, parallel_map from qiskit import QuantumRegister, ClassicalRegister, QuantumCircuit from qiskit.pulse import Schedule from test import QiskitTestCase # pylint: disable=wrong-import-order @@ -43,36 +45,9 @@ def _build_simple_schedule(_): return Schedule() -class TestGetPlatformParallelDefault(QiskitTestCase): - """Tests get_parallel_default_for_platform.""" - - def test_windows_parallel_default(self): - """Verifies the parallel default for Windows.""" - with patch("sys.platform", "win32"): - parallel_default = get_platform_parallel_default() - self.assertEqual(parallel_default, False) - - def test_mac_os_unsupported_version_parallel_default(self): - """Verifies the parallel default for macOS.""" - with patch("sys.platform", "darwin"): - with patch("sys.version_info", (3, 8, 0, "final", 0)): - parallel_default = get_platform_parallel_default() - self.assertEqual(parallel_default, False) - - def test_other_os_parallel_default(self): - """Verifies the parallel default for Linux and other OSes.""" - with patch("sys.platform", "linux"): - parallel_default = get_platform_parallel_default() - self.assertEqual(parallel_default, True) - - class TestParallel(QiskitTestCase): """A class for testing parallel_map functionality.""" - def test_parallel_env_flag(self): - """Verify parallel env flag is set""" - self.assertEqual(os.getenv("QISKIT_IN_PARALLEL", None), "FALSE") - def test_parallel(self): """Test parallel_map""" ans = parallel_map(_parfunc, list(range(10))) @@ -89,3 +64,111 @@ def test_parallel_schedule_names(self): out_schedules = parallel_map(_build_simple_schedule, list(range(10))) names = [schedule.name for schedule in out_schedules] self.assertEqual(len(names), len(set(names))) + + +class TestUtilities(QiskitTestCase): + """Tests for parallel utilities.""" + + def test_local_hardware_five_cpu_count(self): + """Test cpu count is half when sched affinity is 5""" + with mock.patch.object(os, "sched_getaffinity", return_value=set(range(5)), create=True): + self.assertEqual(2, local_hardware_info()["cpus"]) + + def test_local_hardware_sixty_four_cpu_count(self): + """Test cpu count is 32 when sched affinity is 64""" + with mock.patch.object(os, "sched_getaffinity", return_value=set(range(64)), create=True): + self.assertEqual(32, local_hardware_info()["cpus"]) + + def test_local_hardware_no_cpu_count(self): + """Test cpu count fallback to 1 when true value can't be determined""" + with mock.patch.object(os, "sched_getaffinity", return_value=set(), create=True): + self.assertEqual(1, local_hardware_info()["cpus"]) + + def test_local_hardware_no_sched_five_count(self): + """Test cpu could if sched affinity method is missing and cpu count is 5.""" + with ( + mock.patch.object(os, "sched_getaffinity", None, create=True), + mock.patch.object(os, "cpu_count", return_value=5), + ): + self.assertEqual(2, local_hardware_info()["cpus"]) + + def test_local_hardware_no_sched_sixty_four_count(self): + """Test cpu could if sched affinity method is missing and cpu count is 64.""" + with ( + mock.patch.object(os, "sched_getaffinity", None, create=True), + mock.patch.object(os, "cpu_count", return_value=64), + ): + self.assertEqual(32, local_hardware_info()["cpus"]) + + def test_local_hardware_no_sched_no_count(self): + """Test cpu count fallback to 1 when no sched getaffinity available.""" + with ( + mock.patch.object(os, "sched_getaffinity", None, create=True), + mock.patch.object(os, "cpu_count", return_value=None), + ): + self.assertEqual(1, local_hardware_info()["cpus"]) + + def test_should_run_in_parallel_override(self): + """Test that the context managers allow overriding the default value.""" + natural = should_run_in_parallel(8) + with should_run_in_parallel.override(True): + self.assertTrue(should_run_in_parallel(8)) + self.assertEqual(should_run_in_parallel(8), natural) + with should_run_in_parallel.override(False): + self.assertFalse(should_run_in_parallel(8)) + self.assertEqual(should_run_in_parallel(8), natural) + + def test_should_run_in_parallel_ignore_user_settings(self): + """Test that the context managers allow overriding the user settings.""" + # This is a nasty one, because much of the user settings are read statically at `import + # qiskit`, which we're obviously already past. We want to override that, so we need a + # subprocess whose environment we control completely. + + # Windows is picky about opening files that are already opened for writing. Ideally we'd + # use a context manager with `delete_on_close=False` so we close the file, launch our + # subprocess and let the CM clean up on exit, but that argument only arrived in Python 3.12. + # pylint: disable=consider-using-with + settings_file = tempfile.NamedTemporaryFile(mode="w", encoding="utf8", delete=False) + print("[DEFAULT]", file=settings_file) + print("parallel = true", file=settings_file) + settings_file.close() + self.addCleanup(os.remove, settings_file.name) + + # Pass on all our environment, except for our own configuration, which we override with our + # custom settings file, + env = {key: value for key, value in os.environ.items() if not key.startswith("QISKIT")} + env["QISKIT_SETTINGS"] = settings_file.name + env["QISKIT_IN_PARALLEL"] = "FALSE" + env["QISKIT_PARALLEL"] = "TRUE" + env["QISKIT_IGNORE_USER_SETTINGS"] = "FALSE" + + script = """\ +import multiprocessing +from unittest.mock import patch +from qiskit.utils import should_run_in_parallel + +print(should_run_in_parallel(8)) +with ( + patch.object(multiprocessing, "get_start_method", return_value="forkserver"), + should_run_in_parallel.ignore_user_settings(), +): + print(should_run_in_parallel(8)) +with ( + patch.object(multiprocessing, "get_start_method", return_value="spawn"), + should_run_in_parallel.ignore_user_settings(), +): + print(should_run_in_parallel(8)) +""" + result = subprocess.run( + sys.executable, + input=script, + encoding="utf8", + text=True, + env=env, + check=True, + capture_output=True, + ) + user_settings, forkserver_default, spawn_default = result.stdout.splitlines() + self.assertEqual( + (user_settings, forkserver_default, spawn_default), ("True", "True", "False") + ) diff --git a/test/python/visualization/test_circuit_text_drawer.py b/test/python/visualization/test_circuit_text_drawer.py index 666255c44f23..5033b26c41ab 100644 --- a/test/python/visualization/test_circuit_text_drawer.py +++ b/test/python/visualization/test_circuit_text_drawer.py @@ -493,7 +493,10 @@ def test_text_reverse_bits_read_from_config(self): file_path = pathlib.Path(dir_path) / "qiskit.conf" with open(file_path, "w") as fptr: fptr.write(config_content) - with unittest.mock.patch.dict(os.environ, {"QISKIT_SETTINGS": str(file_path)}): + with unittest.mock.patch.dict( + os.environ, + {"QISKIT_SETTINGS": str(file_path), "QISKIT_IGNORE_USER_SETTINGS": "false"}, + ): test_reverse = str(circuit_drawer(circuit, output="text")) self.assertEqual(test_reverse, expected_reverse) @@ -545,7 +548,10 @@ def test_text_idle_wires_read_from_config(self): file_path = pathlib.Path(dir_path) / "qiskit.conf" with open(file_path, "w") as fptr: fptr.write(config_content) - with unittest.mock.patch.dict(os.environ, {"QISKIT_SETTINGS": str(file_path)}): + with unittest.mock.patch.dict( + os.environ, + {"QISKIT_SETTINGS": str(file_path), "QISKIT_IGNORE_USER_SETTINGS": "false"}, + ): test_without = str(circuit_drawer(circuit, output="text")) self.assertEqual(test_without, expected_without) diff --git a/test/utils/base.py b/test/utils/base.py index 133666cfc7ad..21583141a464 100644 --- a/test/utils/base.py +++ b/test/utils/base.py @@ -28,7 +28,6 @@ import unittest from unittest.util import safe_repr -from qiskit.utils.parallel import get_platform_parallel_default from qiskit.exceptions import QiskitWarning from qiskit.utils import optionals as _optionals from qiskit.circuit import QuantumCircuit @@ -285,24 +284,6 @@ def assertDictAlmostEqual( msg = self._formatMessage(msg, error_msg) raise self.failureException(msg) - def enable_parallel_processing(self): - """ - Enables parallel processing, for the duration of a test, on platforms - that support it. This is done by temporarily overriding the value of - the QISKIT_PARALLEL environment variable with the platform specific default. - """ - parallel_default = str(get_platform_parallel_default()).upper() - - def set_parallel_env(name, value): - os.environ[name] = value - - self.addCleanup( - lambda value: set_parallel_env("QISKIT_PARALLEL", value), - os.getenv("QISKIT_PARALLEL", parallel_default), - ) - - os.environ["QISKIT_PARALLEL"] = parallel_default - class FullQiskitTestCase(QiskitTestCase): """further additions for Qiskit test cases, if ``testtools`` is available. diff --git a/tox.ini b/tox.ini index d4d822766675..b355a9b83126 100644 --- a/tox.ini +++ b/tox.ini @@ -15,11 +15,13 @@ setenv = QISKIT_SUPRESS_PACKAGING_WARNINGS=Y QISKIT_TEST_CAPTURE_STREAMS=1 QISKIT_PARALLEL=FALSE + QISKIT_IGNORE_USER_SETTINGS=TRUE passenv = RUSTUP_TOOLCHAIN RAYON_NUM_THREADS OMP_NUM_THREADS QISKIT_PARALLEL + QISKIT_IGNORE_USER_SETTINGS RUST_BACKTRACE SETUPTOOLS_ENABLE_FEATURES QISKIT_TESTS