diff --git a/pympipool/__init__.py b/pympipool/__init__.py index cf8b25bb..2d0336c9 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -77,6 +77,8 @@ def __init__( oversubscribe: bool = False, init_function: Optional[callable] = None, cwd: Optional[str] = None, + conda_environment_name: Optional[str] = None, + conda_environment_path: Optional[str] = None, executor=None, hostname_localhost: bool = False, ): @@ -92,6 +94,8 @@ def __new__( oversubscribe: bool = False, init_function: Optional[callable] = None, cwd: Optional[str] = None, + conda_environment_name: Optional[str] = None, + conda_environment_path: Optional[str] = None, executor=None, hostname_localhost: bool = False, ): @@ -118,6 +122,8 @@ def __new__( points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true + conda_environment_name (str): name of the conda environment to initialize + conda_environment_path (str): path of the conda environment to initialize """ if flux_installed: @@ -133,6 +139,8 @@ def __new__( gpus_per_worker=gpus_per_worker, init_function=init_function, cwd=cwd, + conda_environment_name=conda_environment_name, + conda_environment_path=conda_environment_path, hostname_localhost=hostname_localhost, ) elif slurm_installed: @@ -141,6 +149,8 @@ def __new__( cores_per_worker=cores_per_worker, init_function=init_function, cwd=cwd, + conda_environment_name=conda_environment_name, + conda_environment_path=conda_environment_path, hostname_localhost=hostname_localhost, ) else: @@ -163,5 +173,7 @@ def __new__( cores_per_worker=cores_per_worker, init_function=init_function, cwd=cwd, + conda_environment_name=conda_environment_name, + conda_environment_path=conda_environment_path, hostname_localhost=hostname_localhost, ) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 5856133a..615a48d1 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -33,6 +33,8 @@ class PyFluxExecutor(ExecutorBroker): points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true + conda_environment_name (str): name of the conda environment to initialize + conda_environment_path (str): path of the conda environment to initialize Examples: @@ -63,6 +65,8 @@ def __init__( gpus_per_worker: int = 0, init_function: Optional[callable] = None, cwd: Optional[str] = None, + conda_environment_name: Optional[str] = None, + conda_environment_path: Optional[str] = None, executor: Optional[flux.job.FluxExecutor] = None, hostname_localhost: Optional[bool] = False, ): @@ -83,6 +87,8 @@ def __init__( "gpus_per_core": int(gpus_per_worker / cores_per_worker), "cwd": cwd, "executor": executor, + "prefix_name": conda_environment_name, + "prefix_path": conda_environment_path, }, ) for _ in range(max_workers) diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index b1e39048..ade9e0c3 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -29,6 +29,8 @@ class PyMPIExecutor(ExecutorBroker): points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true + conda_environment_name (str): name of the conda environment to initialize + conda_environment_path (str): path of the conda environment to initialize Examples: @@ -58,6 +60,8 @@ def __init__( oversubscribe: bool = False, init_function: Optional[callable] = None, cwd: Optional[str] = None, + conda_environment_name: Optional[str] = None, + conda_environment_path: Optional[str] = None, hostname_localhost: bool = False, ): super().__init__() @@ -72,6 +76,8 @@ def __init__( "interface_class": MpiExecInterface, "hostname_localhost": hostname_localhost, "init_function": init_function, + "prefix_name": conda_environment_name, + "prefix_path": conda_environment_path, # Interface Arguments "cwd": cwd, "oversubscribe": oversubscribe, diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 1075bddd..b384a645 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -173,6 +173,8 @@ def execute_parallel_tasks( interface_class, hostname_localhost: bool = False, init_function: Optional[callable] = None, + prefix_name: Optional[str] = None, + prefix_path: Optional[str] = None, **kwargs, ): """ @@ -189,10 +191,14 @@ def execute_parallel_tasks( points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true + prefix_name (str): name of the conda environment to initialize + prefix_path (str): path of the conda environment to initialize """ execute_parallel_tasks_loop( interface=interface_bootup( - command_lst=_get_backend_path(cores=cores), + command_lst=_get_backend_path( + cores=cores, prefix_name=prefix_name, prefix_path=prefix_path + ), connections=interface_class(cores=cores, **kwargs), hostname_localhost=hostname_localhost, ), @@ -229,8 +235,17 @@ def execute_parallel_tasks_loop( future_queue.task_done() -def _get_backend_path(cores: int): - command_lst = [sys.executable] +def _get_backend_path( + cores: int, + prefix_name: Optional[str] = None, + prefix_path: Optional[str] = None, +): + if prefix_name is not None: + command_lst = ["conda", "run", "-n", prefix_name, sys.executable] + elif prefix_path is not None: + command_lst = ["conda", "run", "-p", prefix_path, sys.executable] + else: + command_lst = [sys.executable] if cores > 1: command_lst += [_get_command_path(executable="mpiexec.py")] else: diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index 92e4803a..0f56716a 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -1,17 +1,25 @@ import queue from concurrent.futures import Future import subprocess +from typing import Optional from pympipool.shared.executorbase import ExecutorBroker from pympipool.shared.thread import RaisingThread -def execute_single_task(future_queue: queue.Queue): +def execute_single_task( + future_queue: queue.Queue, + prefix_name: Optional[str] = None, + prefix_path: Optional[str] = None, +): """ Process items received via the queue. Args: future_queue (queue.Queue): + prefix_name (str): name of the conda environment to initialize + prefix_path (str): path of the conda environment to initialize + """ while True: task_dict = future_queue.get() @@ -23,11 +31,23 @@ def execute_single_task(future_queue: queue.Queue): f = task_dict.pop("future") if f.set_running_or_notify_cancel(): try: - f.set_result( - subprocess.check_output( - *task_dict["args"], **task_dict["kwargs"] + if prefix_name is None and prefix_path is None: + f.set_result( + subprocess.check_output( + *task_dict["args"], **task_dict["kwargs"] + ) + ) + else: + import conda_subprocess + + f.set_result( + conda_subprocess.check_output( + *task_dict["args"], + **task_dict["kwargs"], + prefix_name=prefix_name, + prefix_path=prefix_path, + ) ) - ) except Exception as thread_exception: future_queue.task_done() f.set_exception(exception=thread_exception) @@ -47,6 +67,8 @@ class SubprocessExecutor(ExecutorBroker): Args: max_workers (int): defines the number workers which can execute functions in parallel + conda_environment_name (str): name of the conda environment to initialize + conda_environment_path (str): path of the conda environment to initialize Examples: @@ -61,6 +83,8 @@ class SubprocessExecutor(ExecutorBroker): def __init__( self, max_workers: int = 1, + conda_environment_name: Optional[str] = None, + conda_environment_path: Optional[str] = None, ): super().__init__() self._set_process( @@ -70,6 +94,8 @@ def __init__( kwargs={ # Executor Arguments "future_queue": self._future_queue, + "prefix_name": conda_environment_name, + "prefix_path": conda_environment_path, }, ) for _ in range(max_workers) diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index a587b73e..831af85f 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -29,6 +29,8 @@ class PySlurmExecutor(ExecutorBroker): points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true + conda_environment_name (str): name of the conda environment to initialize + conda_environment_path (str): path of the conda environment to initialize Examples: @@ -60,6 +62,8 @@ def __init__( oversubscribe: bool = False, init_function: Optional[callable] = None, cwd: Optional[str] = None, + conda_environment_name: Optional[str] = None, + conda_environment_path: Optional[str] = None, hostname_localhost: bool = False, command_line_argument_lst: list[str] = [], ): @@ -75,6 +79,8 @@ def __init__( "interface_class": SrunInterface, "hostname_localhost": hostname_localhost, "init_function": init_function, + "prefix_name": conda_environment_name, + "prefix_path": conda_environment_path, # Interface Arguments "threads_per_core": threads_per_core, "gpus_per_core": int(gpus_per_worker / cores_per_worker), diff --git a/pyproject.toml b/pyproject.toml index f6b3d458..f4514576 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,9 @@ Homepage = "https://github.com/pyiron/pympipool" Documentation = "https://pympipool.readthedocs.io" Repository = "https://github.com/pyiron/pympipool" +[project.optional-dependencies] +conda = ["conda_subprocess==0.0.1"] + [tool.setuptools.packages.find] include = ["pympipool*"]