From 83a3788220b733ec567aeb2e9ec6393d83f5b0f1 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 17 Mar 2024 22:00:36 -0500 Subject: [PATCH 1/5] Add type hints --- pympipool/__init__.py | 32 +++++++++++++------------- pympipool/backend/serial.py | 2 +- pympipool/flux/executor.py | 32 +++++++++++++------------- pympipool/mpi/executor.py | 12 +++++----- pympipool/shared/backend.py | 8 +++---- pympipool/shared/communication.py | 16 ++++++------- pympipool/shared/executorbase.py | 31 +++++++++++++------------ pympipool/shared/interface.py | 38 +++++++++++++++---------------- pympipool/shell/executor.py | 5 ++-- pympipool/shell/interactive.py | 10 ++++---- pympipool/slurm/executor.py | 16 ++++++------- 11 files changed, 103 insertions(+), 99 deletions(-) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index aa9bc881..1cb02f5f 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -69,30 +69,30 @@ class Executor: def __init__( self, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + init_function: callable = None, + cwd: str = None, executor=None, - hostname_localhost=False, + hostname_localhost: bool =False, ): # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. pass def __new__( cls, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + init_function: callable = None, + cwd: str = None, executor=None, - hostname_localhost=False, + hostname_localhost: bool = False, ): """ Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor, diff --git a/pympipool/backend/serial.py b/pympipool/backend/serial.py index effdec6b..2e9308b3 100644 --- a/pympipool/backend/serial.py +++ b/pympipool/backend/serial.py @@ -10,7 +10,7 @@ from pympipool.shared.backend import call_funct, parse_arguments -def main(argument_lst=None): +def main(argument_lst: list[str] = None): if argument_lst is None: argument_lst = sys.argv argument_dict = parse_arguments(argument_lst=argument_lst) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 232cd518..ed5389b2 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -56,14 +56,14 @@ class PyFluxExecutor(ExecutorBroker): def __init__( self, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, - executor=None, - hostname_localhost=False, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + init_function: callable = None, + cwd: str = None, + executor: flux.job.FluxExecutor = None, + hostname_localhost: bool = False, ): super().__init__() self._set_process( @@ -92,12 +92,12 @@ def __init__( class FluxPythonInterface(BaseInterface): def __init__( self, - cwd=None, - cores=1, - threads_per_core=1, - gpus_per_core=0, - oversubscribe=False, - executor=None, + cwd: str = None, + cores: int = 1, + threads_per_core: int = 1, + gpus_per_core: int = 0, + oversubscribe: bool = False, + executor: flux.job.FluxExecutor = None, ): super().__init__( cwd=cwd, @@ -109,7 +109,7 @@ def __init__( self._executor = executor self._future = None - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): if self._oversubscribe: raise ValueError( "Oversubscribing is currently not supported for the Flux adapter." @@ -129,7 +129,7 @@ def bootup(self, command_lst): jobspec.cwd = self._cwd self._future = self._executor.submit(jobspec) - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): if self.poll(): self._future.cancel() # The flux future objects are not instantly updated, diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index b28ffd47..551911d7 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -51,12 +51,12 @@ class PyMPIExecutor(ExecutorBroker): def __init__( self, - max_workers=1, - cores_per_worker=1, - oversubscribe=False, - init_function=None, - cwd=None, - hostname_localhost=False, + max_workers: int = 1, + cores_per_worker: int = 1, + oversubscribe: bool = False, + init_function: callable = None, + cwd: str = None, + hostname_localhost: bool = False, ): super().__init__() self._set_process( diff --git a/pympipool/shared/backend.py b/pympipool/shared/backend.py index 5c917df9..e8d00b07 100644 --- a/pympipool/shared/backend.py +++ b/pympipool/shared/backend.py @@ -1,7 +1,7 @@ import inspect -def call_funct(input_dict, funct=None, memory=None): +def call_funct(input_dict: dict, funct: callable = None, memory: dict = None) -> callable: """ Call function from dictionary @@ -30,7 +30,7 @@ def funct(*args, **kwargs): return funct(input_dict["fn"], *input_dict["args"], **input_dict["kwargs"]) -def parse_arguments(argument_lst): +def parse_arguments(argument_lst: list[str]) -> dict: """ Simple function to parse command line arguments @@ -50,7 +50,7 @@ def parse_arguments(argument_lst): ) -def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict): +def update_default_dict_from_arguments(argument_lst: list[str], argument_dict: dict, default_dict: dict) -> dict: default_dict.update( { k: argument_lst[argument_lst.index(v) + 1] @@ -61,7 +61,7 @@ def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict return default_dict -def _update_dict_delta(dict_input, dict_output, keys_possible_lst): +def _update_dict_delta(dict_input: dict, dict_output: dict, keys_possible_lst: list) -> dict: return { k: v for k, v in dict_input.items() diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index fad9450d..5a82fc49 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -18,7 +18,7 @@ def __init__(self, interface=None): self._process = None self._interface = interface - def send_dict(self, input_dict): + def send_dict(self, input_dict: dict): """ Send a dictionary with instructions to a connected client process. @@ -42,7 +42,7 @@ def receive_dict(self): error_type = output["error_type"].split("'")[1] raise eval(error_type)(output["error"]) - def send_and_receive_dict(self, input_dict): + def send_and_receive_dict(self, input_dict: dict) -> dict: """ Combine both the send_dict() and receive_dict() function in a single call. @@ -66,7 +66,7 @@ def bind_to_random_port(self): """ return self._socket.bind_to_random_port("tcp://*") - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): """ Boot up the client process to connect to the SocketInterface. @@ -75,7 +75,7 @@ def bootup(self, command_lst): """ self._interface.bootup(command_lst=command_lst) - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): result = None if self._interface.poll(): result = self.send_and_receive_dict( @@ -96,9 +96,9 @@ def __del__(self): def interface_bootup( - command_lst, + command_lst: list[str], connections, - hostname_localhost=False, + hostname_localhost: bool = False, ): """ Start interface for ZMQ communication @@ -132,7 +132,7 @@ def interface_bootup( return interface -def interface_connect(host, port): +def interface_connect(host: str, port: str): """ Connect to an existing SocketInterface instance by providing the hostname and the port as strings. @@ -146,7 +146,7 @@ def interface_connect(host, port): return context, socket -def interface_send(socket, result_dict): +def interface_send(socket, result_dict: dict): """ Send results to a SocketInterface instance. diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 9a718e30..582e6aa3 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -1,3 +1,4 @@ +import threading from concurrent.futures import ( Executor as FutureExecutor, Future, @@ -22,7 +23,7 @@ def __init__(self): def future_queue(self): return self._future_queue - def submit(self, fn, *args, **kwargs): + def submit(self, fn: callable, *args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns @@ -35,7 +36,7 @@ def submit(self, fn, *args, **kwargs): self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) return f - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other @@ -58,7 +59,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._process = None self._future_queue = None - def _set_process(self, process): + def _set_process(self, process: threading.Thread): self._process = process self._process.start() @@ -71,13 +72,13 @@ def __del__(self): except (AttributeError, RuntimeError): pass - def _set_process(self, process): + def _set_process(self, process: threading.Thread): self._process = process self._process.start() class ExecutorBroker(ExecutorBase): - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other @@ -103,13 +104,13 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._process = None self._future_queue = None - def _set_process(self, process): + def _set_process(self, process: threading.Thread): self._process = process for process in self._process: process.start() -def cancel_items_in_queue(que): +def cancel_items_in_queue(que: queue.Queue): """ Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future objects have to be cancelled when the executor shuts down. @@ -127,7 +128,7 @@ def cancel_items_in_queue(que): break -def cloudpickle_register(ind=2): +def cloudpickle_register(ind: int = 2): """ Cloudpickle can either pickle by value or pickle by reference. The functions which are communicated have to be pickled by value rather than by reference, so the module which calls the map function is pickled by value. @@ -150,11 +151,11 @@ def cloudpickle_register(ind=2): def execute_parallel_tasks( - future_queue, - cores, + future_queue: queue.Queue, + cores: int, interface_class, - hostname_localhost=False, - init_function=None, + hostname_localhost: bool = False, + init_function: callable = None, **kwargs, ): """ @@ -183,7 +184,7 @@ def execute_parallel_tasks( ) -def execute_parallel_tasks_loop(interface, future_queue, init_function=None): +def execute_parallel_tasks_loop(interface, future_queue: queue.Queue, init_function: callable = None): if init_function is not None: interface.send_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} @@ -209,7 +210,7 @@ def execute_parallel_tasks_loop(interface, future_queue, init_function=None): future_queue.task_done() -def _get_backend_path(cores): +def _get_backend_path(cores: int): command_lst = [sys.executable] if cores > 1: command_lst += [_get_command_path(executable="mpiexec.py")] @@ -218,5 +219,5 @@ def _get_backend_path(cores): return command_lst -def _get_command_path(executable): +def _get_command_path(executable: str): return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) diff --git a/pympipool/shared/interface.py b/pympipool/shared/interface.py index 962c31fc..23ef5ddc 100644 --- a/pympipool/shared/interface.py +++ b/pympipool/shared/interface.py @@ -7,15 +7,15 @@ class BaseInterface(ABC): - def __init__(self, cwd, cores=1, oversubscribe=False): + def __init__(self, cwd: str, cores: int = 1, oversubscribe: bool = False): self._cwd = cwd self._cores = cores self._oversubscribe = oversubscribe - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): raise NotImplementedError - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): raise NotImplementedError def poll(self): @@ -25,9 +25,9 @@ def poll(self): class SubprocessInterface(BaseInterface): def __init__( self, - cwd=None, - cores=1, - oversubscribe=False, + cwd: str = None, + cores: int = 1, + oversubscribe: bool = False, ): super().__init__( cwd=cwd, @@ -36,17 +36,17 @@ def __init__( ) self._process = None - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): self._process = subprocess.Popen( args=self.generate_command(command_lst=command_lst), cwd=self._cwd, stdin=subprocess.DEVNULL, ) - def generate_command(self, command_lst): + def generate_command(self, command_lst: list[str]) -> list[str]: return command_lst - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): self._process.communicate() self._process.terminate() if wait: @@ -58,7 +58,7 @@ def poll(self): class MpiExecInterface(SubprocessInterface): - def generate_command(self, command_lst): + def generate_command(self, command_lst: list[str]): command_prepend_lst = generate_mpiexec_command( cores=self._cores, oversubscribe=self._oversubscribe, @@ -71,11 +71,11 @@ def generate_command(self, command_lst): class SrunInterface(SubprocessInterface): def __init__( self, - cwd=None, - cores=1, - threads_per_core=1, - gpus_per_core=0, - oversubscribe=False, + cwd: str = None, + cores: int = 1, + threads_per_core: int = 1, + gpus_per_core: int = 0, + oversubscribe: bool = False, ): super().__init__( cwd=cwd, @@ -85,7 +85,7 @@ def __init__( self._threads_per_core = threads_per_core self._gpus_per_core = gpus_per_core - def generate_command(self, command_lst): + def generate_command(self, command_lst: list[str]) -> list[str]: command_prepend_lst = generate_slurm_command( cores=self._cores, cwd=self._cwd, @@ -98,7 +98,7 @@ def generate_command(self, command_lst): ) -def generate_mpiexec_command(cores, oversubscribe=False): +def generate_mpiexec_command(cores: int, oversubscribe: bool=False) -> list[str]: if cores == 1: return [] else: @@ -109,8 +109,8 @@ def generate_mpiexec_command(cores, oversubscribe=False): def generate_slurm_command( - cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False -): + cores: int, cwd: str, threads_per_core: int = 1, gpus_per_core: int = 0, oversubscribe: bool = False +) -> list[str]: command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] if cwd is not None: command_prepend_lst += ["-D", cwd] diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index aa1c8f95..92e4803a 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -1,3 +1,4 @@ +import queue from concurrent.futures import Future import subprocess @@ -5,7 +6,7 @@ from pympipool.shared.thread import RaisingThread -def execute_single_task(future_queue): +def execute_single_task(future_queue: queue.Queue): """ Process items received via the queue. @@ -59,7 +60,7 @@ class SubprocessExecutor(ExecutorBroker): def __init__( self, - max_workers=1, + max_workers: int = 1, ): super().__init__() self._set_process( diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py index adef58b3..e02fe6b8 100644 --- a/pympipool/shell/interactive.py +++ b/pympipool/shell/interactive.py @@ -1,3 +1,5 @@ +import queue +import threading from concurrent.futures import Future import subprocess from time import sleep @@ -6,7 +8,7 @@ from pympipool.shared.thread import RaisingThread -def wait_for_process_to_stop(process, sleep_interval=10e-10): +def wait_for_process_to_stop(process: threading.Thread, sleep_interval: float = 10e-10): """ Wait for the subprocess.Popen() process to stop executing @@ -18,7 +20,7 @@ def wait_for_process_to_stop(process, sleep_interval=10e-10): sleep(sleep_interval) -def execute_single_task(future_queue): +def execute_single_task(future_queue: queue.Queue): """ Process items received via the queue. @@ -117,7 +119,7 @@ def __init__(self, *args, **kwargs): ) self._future_queue.put({"init": True, "args": args, "kwargs": kwargs}) - def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): + def submit(self, string_input: str, lines_to_read: int = None, stop_read_pattern: str = None): """ Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure to identify the completion of the execution. This can either be provided based on the number of lines to read @@ -149,7 +151,7 @@ def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): ) return f - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index deb13593..bff090de 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -52,14 +52,14 @@ class PySlurmExecutor(ExecutorBroker): def __init__( self, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, - hostname_localhost=False, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + init_function: callable = None, + cwd: str = None, + hostname_localhost: bool = False, ): super().__init__() self._set_process( From d96fde19916f7b7db7199f997beede9ae78722ff Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 17 Mar 2024 22:04:28 -0500 Subject: [PATCH 2/5] Add type hints --- pympipool/shared/interface.py | 2 +- pympipool/slurm/executor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pympipool/shared/interface.py b/pympipool/shared/interface.py index d195d019..c7cfe9d1 100644 --- a/pympipool/shared/interface.py +++ b/pympipool/shared/interface.py @@ -76,7 +76,7 @@ def __init__( threads_per_core: int = 1, gpus_per_core: int = 0, oversubscribe: bool = False, - command_line_argument_lst: list[str] = [], + command_line_argument_lst: list[str] = [], ): super().__init__( cwd=cwd, diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index c28ecf4a..e3ebf668 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -60,7 +60,7 @@ def __init__( init_function: callable = None, cwd: str = None, hostname_localhost: bool = False, - command_line_argument_lst: list[str] = [], + command_line_argument_lst: list[str] = [], ): super().__init__() self._set_process( From 84e2f9739181eacc5bdf2406e5036512e408754a Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Mon, 18 Mar 2024 03:06:00 +0000 Subject: [PATCH 3/5] Format black --- pympipool/__init__.py | 2 +- pympipool/shared/backend.py | 12 +++++++++--- pympipool/shared/executorbase.py | 4 +++- pympipool/shared/interface.py | 4 ++-- pympipool/shell/interactive.py | 7 ++++++- pympipool/slurm/executor.py | 2 +- 6 files changed, 22 insertions(+), 9 deletions(-) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 1cb02f5f..e6f9a995 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -77,7 +77,7 @@ def __init__( init_function: callable = None, cwd: str = None, executor=None, - hostname_localhost: bool =False, + hostname_localhost: bool = False, ): # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. pass diff --git a/pympipool/shared/backend.py b/pympipool/shared/backend.py index e8d00b07..c49e45eb 100644 --- a/pympipool/shared/backend.py +++ b/pympipool/shared/backend.py @@ -1,7 +1,9 @@ import inspect -def call_funct(input_dict: dict, funct: callable = None, memory: dict = None) -> callable: +def call_funct( + input_dict: dict, funct: callable = None, memory: dict = None +) -> callable: """ Call function from dictionary @@ -50,7 +52,9 @@ def parse_arguments(argument_lst: list[str]) -> dict: ) -def update_default_dict_from_arguments(argument_lst: list[str], argument_dict: dict, default_dict: dict) -> dict: +def update_default_dict_from_arguments( + argument_lst: list[str], argument_dict: dict, default_dict: dict +) -> dict: default_dict.update( { k: argument_lst[argument_lst.index(v) + 1] @@ -61,7 +65,9 @@ def update_default_dict_from_arguments(argument_lst: list[str], argument_dict: d return default_dict -def _update_dict_delta(dict_input: dict, dict_output: dict, keys_possible_lst: list) -> dict: +def _update_dict_delta( + dict_input: dict, dict_output: dict, keys_possible_lst: list +) -> dict: return { k: v for k, v in dict_input.items() diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index dda07152..d43a98e6 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -200,7 +200,9 @@ def execute_parallel_tasks( ) -def execute_parallel_tasks_loop(interface, future_queue: queue.Queue, init_function: callable = None): +def execute_parallel_tasks_loop( + interface, future_queue: queue.Queue, init_function: callable = None +): if init_function is not None: interface.send_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} diff --git a/pympipool/shared/interface.py b/pympipool/shared/interface.py index c7cfe9d1..a7365524 100644 --- a/pympipool/shared/interface.py +++ b/pympipool/shared/interface.py @@ -76,7 +76,7 @@ def __init__( threads_per_core: int = 1, gpus_per_core: int = 0, oversubscribe: bool = False, - command_line_argument_lst: list[str] = [], + command_line_argument_lst: list[str] = [], ): super().__init__( cwd=cwd, @@ -101,7 +101,7 @@ def generate_command(self, command_lst: list[str]) -> list[str]: ) -def generate_mpiexec_command(cores: int, oversubscribe: bool=False) -> list[str]: +def generate_mpiexec_command(cores: int, oversubscribe: bool = False) -> list[str]: if cores == 1: return [] else: diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py index e02fe6b8..7615ac18 100644 --- a/pympipool/shell/interactive.py +++ b/pympipool/shell/interactive.py @@ -119,7 +119,12 @@ def __init__(self, *args, **kwargs): ) self._future_queue.put({"init": True, "args": args, "kwargs": kwargs}) - def submit(self, string_input: str, lines_to_read: int = None, stop_read_pattern: str = None): + def submit( + self, + string_input: str, + lines_to_read: int = None, + stop_read_pattern: str = None, + ): """ Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure to identify the completion of the execution. This can either be provided based on the number of lines to read diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index e3ebf668..c28ecf4a 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -60,7 +60,7 @@ def __init__( init_function: callable = None, cwd: str = None, hostname_localhost: bool = False, - command_line_argument_lst: list[str] = [], + command_line_argument_lst: list[str] = [], ): super().__init__() self._set_process( From 142dccd0b7df5b8b7ebe0bfdb52857e9e9869924 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 17 Mar 2024 22:10:49 -0500 Subject: [PATCH 4/5] Disable python 3.8 --- .github/workflows/unittest-mpich.yml | 5 ----- .github/workflows/unittest-openmpi.yml | 5 ----- 2 files changed, 10 deletions(-) diff --git a/.github/workflows/unittest-mpich.yml b/.github/workflows/unittest-mpich.yml index 2d32edc5..ac568ccc 100644 --- a/.github/workflows/unittest-mpich.yml +++ b/.github/workflows/unittest-mpich.yml @@ -40,11 +40,6 @@ jobs: label: linux-64-py-3-9-mpich prefix: /usr/share/miniconda3/envs/my-env - - operating-system: ubuntu-latest - python-version: 3.8 - label: linux-64-py-3-8-mpich - prefix: /usr/share/miniconda3/envs/my-env - steps: - uses: actions/checkout@v2 - uses: conda-incubator/setup-miniconda@v2.2.0 diff --git a/.github/workflows/unittest-openmpi.yml b/.github/workflows/unittest-openmpi.yml index 9080055f..e36b7966 100644 --- a/.github/workflows/unittest-openmpi.yml +++ b/.github/workflows/unittest-openmpi.yml @@ -40,11 +40,6 @@ jobs: label: linux-64-py-3-9-openmpi prefix: /usr/share/miniconda3/envs/my-env - - operating-system: ubuntu-latest - python-version: 3.8 - label: linux-64-py-3-8-openmpi - prefix: /usr/share/miniconda3/envs/my-env - steps: - uses: actions/checkout@v2 - uses: conda-incubator/setup-miniconda@v2.2.0 From 5f370a339fa909981c177ec389ca5bad9ba8087a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 18 Mar 2024 19:51:35 -0500 Subject: [PATCH 5/5] Add Optional typing --- pympipool/__init__.py | 9 +++++---- pympipool/backend/serial.py | 3 ++- pympipool/flux/executor.py | 13 +++++++------ pympipool/mpi/executor.py | 6 ++++-- pympipool/shared/backend.py | 3 ++- pympipool/shared/communication.py | 6 +++--- pympipool/shared/executorbase.py | 5 +++-- pympipool/shared/interface.py | 5 +++-- pympipool/shell/interactive.py | 5 +++-- pympipool/slurm/executor.py | 5 +++-- 10 files changed, 35 insertions(+), 25 deletions(-) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index e6f9a995..cf8b25bb 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -1,5 +1,6 @@ import os import shutil +from typing import Optional from ._version import get_versions from pympipool.mpi.executor import PyMPIExecutor from pympipool.shared.interface import SLURM_COMMAND @@ -74,8 +75,8 @@ def __init__( threads_per_core: int = 1, gpus_per_worker: int = 0, oversubscribe: bool = False, - init_function: callable = None, - cwd: str = None, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, executor=None, hostname_localhost: bool = False, ): @@ -89,8 +90,8 @@ def __new__( threads_per_core: int = 1, gpus_per_worker: int = 0, oversubscribe: bool = False, - init_function: callable = None, - cwd: str = None, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, executor=None, hostname_localhost: bool = False, ): diff --git a/pympipool/backend/serial.py b/pympipool/backend/serial.py index 2e9308b3..e6398fdb 100644 --- a/pympipool/backend/serial.py +++ b/pympipool/backend/serial.py @@ -1,5 +1,6 @@ from os.path import abspath import sys +from typing import Optional from pympipool.shared.communication import ( interface_connect, @@ -10,7 +11,7 @@ from pympipool.shared.backend import call_funct, parse_arguments -def main(argument_lst: list[str] = None): +def main(argument_lst: Optional[list[str]] = None): if argument_lst is None: argument_lst = sys.argv argument_dict = parse_arguments(argument_lst=argument_lst) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index ed5389b2..5856133a 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -1,4 +1,5 @@ import os +from typing import Optional import flux.job @@ -60,10 +61,10 @@ def __init__( cores_per_worker: int = 1, threads_per_core: int = 1, gpus_per_worker: int = 0, - init_function: callable = None, - cwd: str = None, - executor: flux.job.FluxExecutor = None, - hostname_localhost: bool = False, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, + executor: Optional[flux.job.FluxExecutor] = None, + hostname_localhost: Optional[bool] = False, ): super().__init__() self._set_process( @@ -92,12 +93,12 @@ def __init__( class FluxPythonInterface(BaseInterface): def __init__( self, - cwd: str = None, + cwd: Optional[str] = None, cores: int = 1, threads_per_core: int = 1, gpus_per_core: int = 0, oversubscribe: bool = False, - executor: flux.job.FluxExecutor = None, + executor: Optional[flux.job.FluxExecutor] = None, ): super().__init__( cwd=cwd, diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index 551911d7..b1e39048 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -1,3 +1,5 @@ +from typing import Optional + from pympipool.shared.executorbase import ( execute_parallel_tasks, ExecutorBroker, @@ -54,8 +56,8 @@ def __init__( max_workers: int = 1, cores_per_worker: int = 1, oversubscribe: bool = False, - init_function: callable = None, - cwd: str = None, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, hostname_localhost: bool = False, ): super().__init__() diff --git a/pympipool/shared/backend.py b/pympipool/shared/backend.py index c49e45eb..6f9f18ac 100644 --- a/pympipool/shared/backend.py +++ b/pympipool/shared/backend.py @@ -1,8 +1,9 @@ +from typing import Optional import inspect def call_funct( - input_dict: dict, funct: callable = None, memory: dict = None + input_dict: dict, funct: Optional[callable] = None, memory: Optional[dict] = None ) -> callable: """ Call function from dictionary diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index 5a82fc49..6d15a325 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -146,7 +146,7 @@ def interface_connect(host: str, port: str): return context, socket -def interface_send(socket, result_dict: dict): +def interface_send(socket: zmq.Socket, result_dict: dict): """ Send results to a SocketInterface instance. @@ -157,7 +157,7 @@ def interface_send(socket, result_dict: dict): socket.send(cloudpickle.dumps(result_dict)) -def interface_receive(socket): +def interface_receive(socket: zmq.Socket): """ Receive instructions from a SocketInterface instance. @@ -167,7 +167,7 @@ def interface_receive(socket): return cloudpickle.loads(socket.recv()) -def interface_shutdown(socket, context): +def interface_shutdown(socket: zmq.Socket, context: zmq.Context): """ Close the connection to a SocketInterface instance. diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index d43a98e6..1075bddd 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -1,4 +1,5 @@ import threading +from typing import Optional from concurrent.futures import ( Executor as FutureExecutor, Future, @@ -171,7 +172,7 @@ def execute_parallel_tasks( cores: int, interface_class, hostname_localhost: bool = False, - init_function: callable = None, + init_function: Optional[callable] = None, **kwargs, ): """ @@ -201,7 +202,7 @@ def execute_parallel_tasks( def execute_parallel_tasks_loop( - interface, future_queue: queue.Queue, init_function: callable = None + interface, future_queue: queue.Queue, init_function: Optional[callable] = None ): if init_function is not None: interface.send_dict( diff --git a/pympipool/shared/interface.py b/pympipool/shared/interface.py index a7365524..8fcae945 100644 --- a/pympipool/shared/interface.py +++ b/pympipool/shared/interface.py @@ -1,5 +1,6 @@ from abc import ABC import subprocess +from typing import Optional MPI_COMMAND = "mpiexec" @@ -25,7 +26,7 @@ def poll(self): class SubprocessInterface(BaseInterface): def __init__( self, - cwd: str = None, + cwd: Optional[str] = None, cores: int = 1, oversubscribe: bool = False, ): @@ -71,7 +72,7 @@ def generate_command(self, command_lst: list[str]): class SrunInterface(SubprocessInterface): def __init__( self, - cwd: str = None, + cwd: Optional[str] = None, cores: int = 1, threads_per_core: int = 1, gpus_per_core: int = 0, diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py index 7615ac18..1a3dc6c4 100644 --- a/pympipool/shell/interactive.py +++ b/pympipool/shell/interactive.py @@ -1,5 +1,6 @@ import queue import threading +from typing import Optional from concurrent.futures import Future import subprocess from time import sleep @@ -122,8 +123,8 @@ def __init__(self, *args, **kwargs): def submit( self, string_input: str, - lines_to_read: int = None, - stop_read_pattern: str = None, + lines_to_read: Optional[int] = None, + stop_read_pattern: Optional[str] = None, ): """ Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index c28ecf4a..a587b73e 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -1,3 +1,4 @@ +from typing import Optional from pympipool.shared.executorbase import ( execute_parallel_tasks, ExecutorBroker, @@ -57,8 +58,8 @@ def __init__( threads_per_core: int = 1, gpus_per_worker: int = 0, oversubscribe: bool = False, - init_function: callable = None, - cwd: str = None, + init_function: Optional[callable] = None, + cwd: Optional[str] = None, hostname_localhost: bool = False, command_line_argument_lst: list[str] = [], ):