From 83a3788220b733ec567aeb2e9ec6393d83f5b0f1 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 17 Mar 2024 22:00:36 -0500 Subject: [PATCH] Add type hints --- pympipool/__init__.py | 32 +++++++++++++------------- pympipool/backend/serial.py | 2 +- pympipool/flux/executor.py | 32 +++++++++++++------------- pympipool/mpi/executor.py | 12 +++++----- pympipool/shared/backend.py | 8 +++---- pympipool/shared/communication.py | 16 ++++++------- pympipool/shared/executorbase.py | 31 +++++++++++++------------ pympipool/shared/interface.py | 38 +++++++++++++++---------------- pympipool/shell/executor.py | 5 ++-- pympipool/shell/interactive.py | 10 ++++---- pympipool/slurm/executor.py | 16 ++++++------- 11 files changed, 103 insertions(+), 99 deletions(-) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index aa9bc881..1cb02f5f 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -69,30 +69,30 @@ class Executor: def __init__( self, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + init_function: callable = None, + cwd: str = None, executor=None, - hostname_localhost=False, + hostname_localhost: bool =False, ): # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. pass def __new__( cls, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + init_function: callable = None, + cwd: str = None, executor=None, - hostname_localhost=False, + hostname_localhost: bool = False, ): """ Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor, diff --git a/pympipool/backend/serial.py b/pympipool/backend/serial.py index effdec6b..2e9308b3 100644 --- a/pympipool/backend/serial.py +++ b/pympipool/backend/serial.py @@ -10,7 +10,7 @@ from pympipool.shared.backend import call_funct, parse_arguments -def main(argument_lst=None): +def main(argument_lst: list[str] = None): if argument_lst is None: argument_lst = sys.argv argument_dict = parse_arguments(argument_lst=argument_lst) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 232cd518..ed5389b2 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -56,14 +56,14 @@ class PyFluxExecutor(ExecutorBroker): def __init__( self, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, - executor=None, - hostname_localhost=False, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + init_function: callable = None, + cwd: str = None, + executor: flux.job.FluxExecutor = None, + hostname_localhost: bool = False, ): super().__init__() self._set_process( @@ -92,12 +92,12 @@ def __init__( class FluxPythonInterface(BaseInterface): def __init__( self, - cwd=None, - cores=1, - threads_per_core=1, - gpus_per_core=0, - oversubscribe=False, - executor=None, + cwd: str = None, + cores: int = 1, + threads_per_core: int = 1, + gpus_per_core: int = 0, + oversubscribe: bool = False, + executor: flux.job.FluxExecutor = None, ): super().__init__( cwd=cwd, @@ -109,7 +109,7 @@ def __init__( self._executor = executor self._future = None - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): if self._oversubscribe: raise ValueError( "Oversubscribing is currently not supported for the Flux adapter." @@ -129,7 +129,7 @@ def bootup(self, command_lst): jobspec.cwd = self._cwd self._future = self._executor.submit(jobspec) - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): if self.poll(): self._future.cancel() # The flux future objects are not instantly updated, diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index b28ffd47..551911d7 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -51,12 +51,12 @@ class PyMPIExecutor(ExecutorBroker): def __init__( self, - max_workers=1, - cores_per_worker=1, - oversubscribe=False, - init_function=None, - cwd=None, - hostname_localhost=False, + max_workers: int = 1, + cores_per_worker: int = 1, + oversubscribe: bool = False, + init_function: callable = None, + cwd: str = None, + hostname_localhost: bool = False, ): super().__init__() self._set_process( diff --git a/pympipool/shared/backend.py b/pympipool/shared/backend.py index 5c917df9..e8d00b07 100644 --- a/pympipool/shared/backend.py +++ b/pympipool/shared/backend.py @@ -1,7 +1,7 @@ import inspect -def call_funct(input_dict, funct=None, memory=None): +def call_funct(input_dict: dict, funct: callable = None, memory: dict = None) -> callable: """ Call function from dictionary @@ -30,7 +30,7 @@ def funct(*args, **kwargs): return funct(input_dict["fn"], *input_dict["args"], **input_dict["kwargs"]) -def parse_arguments(argument_lst): +def parse_arguments(argument_lst: list[str]) -> dict: """ Simple function to parse command line arguments @@ -50,7 +50,7 @@ def parse_arguments(argument_lst): ) -def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict): +def update_default_dict_from_arguments(argument_lst: list[str], argument_dict: dict, default_dict: dict) -> dict: default_dict.update( { k: argument_lst[argument_lst.index(v) + 1] @@ -61,7 +61,7 @@ def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict return default_dict -def _update_dict_delta(dict_input, dict_output, keys_possible_lst): +def _update_dict_delta(dict_input: dict, dict_output: dict, keys_possible_lst: list) -> dict: return { k: v for k, v in dict_input.items() diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index fad9450d..5a82fc49 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -18,7 +18,7 @@ def __init__(self, interface=None): self._process = None self._interface = interface - def send_dict(self, input_dict): + def send_dict(self, input_dict: dict): """ Send a dictionary with instructions to a connected client process. @@ -42,7 +42,7 @@ def receive_dict(self): error_type = output["error_type"].split("'")[1] raise eval(error_type)(output["error"]) - def send_and_receive_dict(self, input_dict): + def send_and_receive_dict(self, input_dict: dict) -> dict: """ Combine both the send_dict() and receive_dict() function in a single call. @@ -66,7 +66,7 @@ def bind_to_random_port(self): """ return self._socket.bind_to_random_port("tcp://*") - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): """ Boot up the client process to connect to the SocketInterface. @@ -75,7 +75,7 @@ def bootup(self, command_lst): """ self._interface.bootup(command_lst=command_lst) - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): result = None if self._interface.poll(): result = self.send_and_receive_dict( @@ -96,9 +96,9 @@ def __del__(self): def interface_bootup( - command_lst, + command_lst: list[str], connections, - hostname_localhost=False, + hostname_localhost: bool = False, ): """ Start interface for ZMQ communication @@ -132,7 +132,7 @@ def interface_bootup( return interface -def interface_connect(host, port): +def interface_connect(host: str, port: str): """ Connect to an existing SocketInterface instance by providing the hostname and the port as strings. @@ -146,7 +146,7 @@ def interface_connect(host, port): return context, socket -def interface_send(socket, result_dict): +def interface_send(socket, result_dict: dict): """ Send results to a SocketInterface instance. diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 9a718e30..582e6aa3 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -1,3 +1,4 @@ +import threading from concurrent.futures import ( Executor as FutureExecutor, Future, @@ -22,7 +23,7 @@ def __init__(self): def future_queue(self): return self._future_queue - def submit(self, fn, *args, **kwargs): + def submit(self, fn: callable, *args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns @@ -35,7 +36,7 @@ def submit(self, fn, *args, **kwargs): self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) return f - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other @@ -58,7 +59,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._process = None self._future_queue = None - def _set_process(self, process): + def _set_process(self, process: threading.Thread): self._process = process self._process.start() @@ -71,13 +72,13 @@ def __del__(self): except (AttributeError, RuntimeError): pass - def _set_process(self, process): + def _set_process(self, process: threading.Thread): self._process = process self._process.start() class ExecutorBroker(ExecutorBase): - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other @@ -103,13 +104,13 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._process = None self._future_queue = None - def _set_process(self, process): + def _set_process(self, process: threading.Thread): self._process = process for process in self._process: process.start() -def cancel_items_in_queue(que): +def cancel_items_in_queue(que: queue.Queue): """ Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future objects have to be cancelled when the executor shuts down. @@ -127,7 +128,7 @@ def cancel_items_in_queue(que): break -def cloudpickle_register(ind=2): +def cloudpickle_register(ind: int = 2): """ Cloudpickle can either pickle by value or pickle by reference. The functions which are communicated have to be pickled by value rather than by reference, so the module which calls the map function is pickled by value. @@ -150,11 +151,11 @@ def cloudpickle_register(ind=2): def execute_parallel_tasks( - future_queue, - cores, + future_queue: queue.Queue, + cores: int, interface_class, - hostname_localhost=False, - init_function=None, + hostname_localhost: bool = False, + init_function: callable = None, **kwargs, ): """ @@ -183,7 +184,7 @@ def execute_parallel_tasks( ) -def execute_parallel_tasks_loop(interface, future_queue, init_function=None): +def execute_parallel_tasks_loop(interface, future_queue: queue.Queue, init_function: callable = None): if init_function is not None: interface.send_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} @@ -209,7 +210,7 @@ def execute_parallel_tasks_loop(interface, future_queue, init_function=None): future_queue.task_done() -def _get_backend_path(cores): +def _get_backend_path(cores: int): command_lst = [sys.executable] if cores > 1: command_lst += [_get_command_path(executable="mpiexec.py")] @@ -218,5 +219,5 @@ def _get_backend_path(cores): return command_lst -def _get_command_path(executable): +def _get_command_path(executable: str): return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) diff --git a/pympipool/shared/interface.py b/pympipool/shared/interface.py index 962c31fc..23ef5ddc 100644 --- a/pympipool/shared/interface.py +++ b/pympipool/shared/interface.py @@ -7,15 +7,15 @@ class BaseInterface(ABC): - def __init__(self, cwd, cores=1, oversubscribe=False): + def __init__(self, cwd: str, cores: int = 1, oversubscribe: bool = False): self._cwd = cwd self._cores = cores self._oversubscribe = oversubscribe - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): raise NotImplementedError - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): raise NotImplementedError def poll(self): @@ -25,9 +25,9 @@ def poll(self): class SubprocessInterface(BaseInterface): def __init__( self, - cwd=None, - cores=1, - oversubscribe=False, + cwd: str = None, + cores: int = 1, + oversubscribe: bool = False, ): super().__init__( cwd=cwd, @@ -36,17 +36,17 @@ def __init__( ) self._process = None - def bootup(self, command_lst): + def bootup(self, command_lst: list[str]): self._process = subprocess.Popen( args=self.generate_command(command_lst=command_lst), cwd=self._cwd, stdin=subprocess.DEVNULL, ) - def generate_command(self, command_lst): + def generate_command(self, command_lst: list[str]) -> list[str]: return command_lst - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): self._process.communicate() self._process.terminate() if wait: @@ -58,7 +58,7 @@ def poll(self): class MpiExecInterface(SubprocessInterface): - def generate_command(self, command_lst): + def generate_command(self, command_lst: list[str]): command_prepend_lst = generate_mpiexec_command( cores=self._cores, oversubscribe=self._oversubscribe, @@ -71,11 +71,11 @@ def generate_command(self, command_lst): class SrunInterface(SubprocessInterface): def __init__( self, - cwd=None, - cores=1, - threads_per_core=1, - gpus_per_core=0, - oversubscribe=False, + cwd: str = None, + cores: int = 1, + threads_per_core: int = 1, + gpus_per_core: int = 0, + oversubscribe: bool = False, ): super().__init__( cwd=cwd, @@ -85,7 +85,7 @@ def __init__( self._threads_per_core = threads_per_core self._gpus_per_core = gpus_per_core - def generate_command(self, command_lst): + def generate_command(self, command_lst: list[str]) -> list[str]: command_prepend_lst = generate_slurm_command( cores=self._cores, cwd=self._cwd, @@ -98,7 +98,7 @@ def generate_command(self, command_lst): ) -def generate_mpiexec_command(cores, oversubscribe=False): +def generate_mpiexec_command(cores: int, oversubscribe: bool=False) -> list[str]: if cores == 1: return [] else: @@ -109,8 +109,8 @@ def generate_mpiexec_command(cores, oversubscribe=False): def generate_slurm_command( - cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False -): + cores: int, cwd: str, threads_per_core: int = 1, gpus_per_core: int = 0, oversubscribe: bool = False +) -> list[str]: command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] if cwd is not None: command_prepend_lst += ["-D", cwd] diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index aa1c8f95..92e4803a 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -1,3 +1,4 @@ +import queue from concurrent.futures import Future import subprocess @@ -5,7 +6,7 @@ from pympipool.shared.thread import RaisingThread -def execute_single_task(future_queue): +def execute_single_task(future_queue: queue.Queue): """ Process items received via the queue. @@ -59,7 +60,7 @@ class SubprocessExecutor(ExecutorBroker): def __init__( self, - max_workers=1, + max_workers: int = 1, ): super().__init__() self._set_process( diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py index adef58b3..e02fe6b8 100644 --- a/pympipool/shell/interactive.py +++ b/pympipool/shell/interactive.py @@ -1,3 +1,5 @@ +import queue +import threading from concurrent.futures import Future import subprocess from time import sleep @@ -6,7 +8,7 @@ from pympipool.shared.thread import RaisingThread -def wait_for_process_to_stop(process, sleep_interval=10e-10): +def wait_for_process_to_stop(process: threading.Thread, sleep_interval: float = 10e-10): """ Wait for the subprocess.Popen() process to stop executing @@ -18,7 +20,7 @@ def wait_for_process_to_stop(process, sleep_interval=10e-10): sleep(sleep_interval) -def execute_single_task(future_queue): +def execute_single_task(future_queue: queue.Queue): """ Process items received via the queue. @@ -117,7 +119,7 @@ def __init__(self, *args, **kwargs): ) self._future_queue.put({"init": True, "args": args, "kwargs": kwargs}) - def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): + def submit(self, string_input: str, lines_to_read: int = None, stop_read_pattern: str = None): """ Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure to identify the completion of the execution. This can either be provided based on the number of lines to read @@ -149,7 +151,7 @@ def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): ) return f - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index deb13593..bff090de 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -52,14 +52,14 @@ class PySlurmExecutor(ExecutorBroker): def __init__( self, - max_workers=1, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, - hostname_localhost=False, + max_workers: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + init_function: callable = None, + cwd: str = None, + hostname_localhost: bool = False, ): super().__init__() self._set_process(