Skip to content

Commit

Permalink
Merge pull request #268 from pyiron/set_process
Browse files Browse the repository at this point in the history
Implement a Executor._set_process() function to set the process
  • Loading branch information
jan-janssen authored Feb 18, 2024
2 parents 36090f6 + f801199 commit 1a1ad7e
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 116 deletions.
68 changes: 35 additions & 33 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,25 @@ def __init__(
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"hostname_localhost": hostname_localhost,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"init_function": init_function,
"cwd": cwd,
"executor": executor,
},
self._set_process(
process=RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"hostname_localhost": hostname_localhost,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"init_function": init_function,
"cwd": cwd,
"executor": executor,
},
)
)
self._process.start()


class PyFluxSingleTaskExecutor(ExecutorBase):
Expand Down Expand Up @@ -121,23 +122,24 @@ def __init__(
):
super().__init__()
cloudpickle_register(ind=3)
self._process = RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": FluxPythonInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"executor": executor,
},
self._set_process(
process=RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": FluxPythonInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"executor": executor,
},
)
)
self._process.start()


class FluxPythonInterface(BaseInterface):
Expand Down
60 changes: 31 additions & 29 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,23 @@ def __init__(
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"executor_class": PyMPISingleTaskExecutor,
"hostname_localhost": hostname_localhost,
# Executor Arguments
"cores": cores_per_worker,
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
},
self._set_process(
process=RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"executor_class": PyMPISingleTaskExecutor,
"hostname_localhost": hostname_localhost,
# Executor Arguments
"cores": cores_per_worker,
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
},
),
)
self._process.start()


class PyMPISingleTaskExecutor(ExecutorBase):
Expand Down Expand Up @@ -108,18 +109,19 @@ def __init__(
):
super().__init__()
cloudpickle_register(ind=3)
self._process = RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": MpiExecInterface,
"init_function": init_function,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
self._set_process(
process=RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": MpiExecInterface,
"init_function": init_function,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
)
)
self._process.start()
4 changes: 4 additions & 0 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def __del__(self):
except (AttributeError, RuntimeError):
pass

def _set_process(self, process):
self._process = process
self._process.start()


def cancel_items_in_queue(que):
"""
Expand Down
32 changes: 17 additions & 15 deletions pympipool/shell/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ class SubprocessSingleExecutor(ExecutorBase):

def __init__(self):
super().__init__()
self._process = RaisingThread(
target=execute_single_task,
kwargs={
"future_queue": self._future_queue,
},
self._set_process(
process=RaisingThread(
target=execute_single_task,
kwargs={
"future_queue": self._future_queue,
},
),
)
self._process.start()

def submit(self, *args, **kwargs):
f = Future()
Expand Down Expand Up @@ -83,16 +84,17 @@ def __init__(
max_workers=1,
):
super().__init__()
self._process = RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"executor_class": SubprocessSingleExecutor,
},
self._set_process(
process=RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"executor_class": SubprocessSingleExecutor,
},
),
)
self._process.start()

def submit(self, *args, **kwargs):
"""
Expand Down
13 changes: 7 additions & 6 deletions pympipool/shell/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,14 @@ class ShellExecutor(ExecutorBase):

def __init__(self, *args, **kwargs):
super().__init__()
self._process = RaisingThread(
target=execute_single_task,
kwargs={
"future_queue": self._future_queue,
},
self._set_process(
process=RaisingThread(
target=execute_single_task,
kwargs={
"future_queue": self._future_queue,
},
),
)
self._process.start()
self._future_queue.put({"init": True, "args": args, "kwargs": kwargs})

def submit(self, string_input, lines_to_read=None, stop_read_pattern=None):
Expand Down
68 changes: 35 additions & 33 deletions pympipool/slurm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,25 @@ def __init__(
hostname_localhost=False,
):
super().__init__()
self._process = RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"hostname_localhost": hostname_localhost,
"executor_class": PySlurmSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
},
self._set_process(
process=RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"hostname_localhost": hostname_localhost,
"executor_class": PySlurmSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
},
),
)
self._process.start()


class PySlurmSingleTaskExecutor(ExecutorBase):
Expand Down Expand Up @@ -117,20 +118,21 @@ def __init__(
):
super().__init__()
cloudpickle_register(ind=3)
self._process = RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": SrunInterface,
"init_function": init_function,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
self._set_process(
process=RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": SrunInterface,
"init_function": init_function,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
),
)
self._process.start()

0 comments on commit 1a1ad7e

Please sign in to comment.