Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ShellExecutor: Add the option to execute shell tasks in a separate conda environment #251

Closed
wants to merge 10 commits into from
12 changes: 12 additions & 0 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def __init__(
oversubscribe: bool = False,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
executor=None,
hostname_localhost: bool = False,
):
Expand All @@ -92,6 +94,8 @@ def __new__(
oversubscribe: bool = False,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
executor=None,
hostname_localhost: bool = False,
):
Expand All @@ -118,6 +122,8 @@ def __new__(
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
conda_environment_name (str): name of the conda environment to initialize
conda_environment_path (str): path of the conda environment to initialize

"""
if flux_installed:
Expand All @@ -133,6 +139,8 @@ def __new__(
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
conda_environment_name=conda_environment_name,
conda_environment_path=conda_environment_path,
hostname_localhost=hostname_localhost,
)
elif slurm_installed:
Expand All @@ -141,6 +149,8 @@ def __new__(
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
conda_environment_name=conda_environment_name,
conda_environment_path=conda_environment_path,
hostname_localhost=hostname_localhost,
)
else:
Expand All @@ -163,5 +173,7 @@ def __new__(
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
conda_environment_name=conda_environment_name,
conda_environment_path=conda_environment_path,
hostname_localhost=hostname_localhost,
)
6 changes: 6 additions & 0 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class PyFluxExecutor(ExecutorBroker):
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
conda_environment_name (str): name of the conda environment to initialize
conda_environment_path (str): path of the conda environment to initialize

Examples:

Expand Down Expand Up @@ -63,6 +65,8 @@ def __init__(
gpus_per_worker: int = 0,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
executor: Optional[flux.job.FluxExecutor] = None,
hostname_localhost: Optional[bool] = False,
):
Expand All @@ -83,6 +87,8 @@ def __init__(
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
"prefix_name": conda_environment_name,
"prefix_path": conda_environment_path,
},
)
for _ in range(max_workers)
Expand Down
6 changes: 6 additions & 0 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class PyMPIExecutor(ExecutorBroker):
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
conda_environment_name (str): name of the conda environment to initialize
conda_environment_path (str): path of the conda environment to initialize

Examples:

Expand Down Expand Up @@ -58,6 +60,8 @@ def __init__(
oversubscribe: bool = False,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
hostname_localhost: bool = False,
):
super().__init__()
Expand All @@ -72,6 +76,8 @@ def __init__(
"interface_class": MpiExecInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
"prefix_name": conda_environment_name,
"prefix_path": conda_environment_path,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
Expand Down
21 changes: 18 additions & 3 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ def execute_parallel_tasks(
interface_class,
hostname_localhost: bool = False,
init_function: Optional[callable] = None,
prefix_name: Optional[str] = None,
prefix_path: Optional[str] = None,
**kwargs,
):
"""
Expand All @@ -189,10 +191,14 @@ def execute_parallel_tasks(
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
prefix_name (str): name of the conda environment to initialize
prefix_path (str): path of the conda environment to initialize
"""
execute_parallel_tasks_loop(
interface=interface_bootup(
command_lst=_get_backend_path(cores=cores),
command_lst=_get_backend_path(
cores=cores, prefix_name=prefix_name, prefix_path=prefix_path
),
connections=interface_class(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
),
Expand Down Expand Up @@ -229,8 +235,17 @@ def execute_parallel_tasks_loop(
future_queue.task_done()


def _get_backend_path(cores: int):
command_lst = [sys.executable]
def _get_backend_path(
cores: int,
prefix_name: Optional[str] = None,
prefix_path: Optional[str] = None,
):
if prefix_name is not None:
command_lst = ["conda", "run", "-n", prefix_name, sys.executable]
elif prefix_path is not None:
command_lst = ["conda", "run", "-p", prefix_path, sys.executable]
else:
command_lst = [sys.executable]
if cores > 1:
command_lst += [_get_command_path(executable="mpiexec.py")]
else:
Expand Down
36 changes: 31 additions & 5 deletions pympipool/shell/executor.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import queue
from concurrent.futures import Future
import subprocess
from typing import Optional

from pympipool.shared.executorbase import ExecutorBroker
from pympipool.shared.thread import RaisingThread


def execute_single_task(future_queue: queue.Queue):
def execute_single_task(
future_queue: queue.Queue,
prefix_name: Optional[str] = None,
prefix_path: Optional[str] = None,
):
"""
Process items received via the queue.

Args:
future_queue (queue.Queue):
prefix_name (str): name of the conda environment to initialize
prefix_path (str): path of the conda environment to initialize

"""
while True:
task_dict = future_queue.get()
Expand All @@ -23,11 +31,23 @@ def execute_single_task(future_queue: queue.Queue):
f = task_dict.pop("future")
if f.set_running_or_notify_cancel():
try:
f.set_result(
subprocess.check_output(
*task_dict["args"], **task_dict["kwargs"]
if prefix_name is None and prefix_path is None:
f.set_result(
subprocess.check_output(
*task_dict["args"], **task_dict["kwargs"]
)
)
else:
import conda_subprocess

f.set_result(
conda_subprocess.check_output(
*task_dict["args"],
**task_dict["kwargs"],
prefix_name=prefix_name,
prefix_path=prefix_path,
)
)
)
except Exception as thread_exception:
future_queue.task_done()
f.set_exception(exception=thread_exception)
Expand All @@ -47,6 +67,8 @@ class SubprocessExecutor(ExecutorBroker):

Args:
max_workers (int): defines the number workers which can execute functions in parallel
conda_environment_name (str): name of the conda environment to initialize
conda_environment_path (str): path of the conda environment to initialize

Examples:

Expand All @@ -61,6 +83,8 @@ class SubprocessExecutor(ExecutorBroker):
def __init__(
self,
max_workers: int = 1,
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
):
super().__init__()
self._set_process(
Expand All @@ -70,6 +94,8 @@ def __init__(
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"prefix_name": conda_environment_name,
"prefix_path": conda_environment_path,
},
)
for _ in range(max_workers)
Expand Down
6 changes: 6 additions & 0 deletions pympipool/slurm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class PySlurmExecutor(ExecutorBroker):
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
conda_environment_name (str): name of the conda environment to initialize
conda_environment_path (str): path of the conda environment to initialize

Examples:

Expand Down Expand Up @@ -60,6 +62,8 @@ def __init__(
oversubscribe: bool = False,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
conda_environment_name: Optional[str] = None,
conda_environment_path: Optional[str] = None,
hostname_localhost: bool = False,
command_line_argument_lst: list[str] = [],
):
Expand All @@ -75,6 +79,8 @@ def __init__(
"interface_class": SrunInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
"prefix_name": conda_environment_name,
"prefix_path": conda_environment_path,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ Homepage = "https://github.com/pyiron/pympipool"
Documentation = "https://pympipool.readthedocs.io"
Repository = "https://github.com/pyiron/pympipool"

[project.optional-dependencies]
conda = ["conda_subprocess==0.0.1"]

[tool.setuptools.packages.find]
include = ["pympipool*"]

Expand Down
Loading