diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml new file mode 100644 index 00000000..5e742142 --- /dev/null +++ b/.github/workflows/mypy.yml @@ -0,0 +1,22 @@ +name: MyPy + +on: + push: + branches: [ main ] + pull_request: + +jobs: + mypy: + runs-on: ubuntu-latest + steps: + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.13" + architecture: x64 + - name: Checkout + uses: actions/checkout@v4 + - name: Install mypy + run: pip install mypy + - name: Test + run: mypy --ignore-missing-imports ${{ github.event.repository.name }} \ No newline at end of file diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 70904fe6..cedd6c2d 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Callable, Optional from executorlib._version import get_versions as _get_versions from executorlib.interactive.executor import ( @@ -16,7 +16,7 @@ ) __version__ = _get_versions()["version"] -__all__ = [] +__all__: list = [] class Executor: @@ -100,7 +100,7 @@ def __init__( pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, - init_function: Optional[callable] = None, + init_function: Optional[Callable] = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, plot_dependency_graph: bool = False, @@ -123,7 +123,7 @@ def __new__( pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, - init_function: Optional[callable] = None, + init_function: Optional[Callable] = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, plot_dependency_graph: bool = False, @@ -177,7 +177,7 @@ def __new__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. """ - default_resource_dict = { + default_resource_dict: dict = { "cores": 1, "threads_per_core": 1, "gpus_per_core": 0, diff --git a/executorlib/backend/cache_parallel.py b/executorlib/backend/cache_parallel.py index 2979a3c6..a5903302 100644 --- a/executorlib/backend/cache_parallel.py +++ b/executorlib/backend/cache_parallel.py @@ -1,6 +1,7 @@ import pickle import sys import time +from typing import Any import cloudpickle @@ -24,7 +25,7 @@ def main() -> None: """ from mpi4py import MPI - MPI.pickle.__init__( + MPI.pickle.__init__( # type: ignore cloudpickle.dumps, cloudpickle.loads, pickle.HIGHEST_PROTOCOL, @@ -34,10 +35,9 @@ def main() -> None: file_name = sys.argv[1] time_start = time.time() + apply_dict = {} if mpi_rank_zero: apply_dict = backend_load_file(file_name=file_name) - else: - apply_dict = None apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0) output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) if mpi_size_larger_one: diff --git a/executorlib/backend/interactive_parallel.py b/executorlib/backend/interactive_parallel.py index 4f103685..62c93d65 100644 --- a/executorlib/backend/interactive_parallel.py +++ b/executorlib/backend/interactive_parallel.py @@ -1,8 +1,10 @@ import pickle import sys from os.path import abspath +from typing import Optional import cloudpickle +import zmq from executorlib.standalone.interactive.backend import call_funct, parse_arguments from executorlib.standalone.interactive.communication import ( @@ -24,7 +26,7 @@ def main() -> None: """ from mpi4py import MPI - MPI.pickle.__init__( + MPI.pickle.__init__( # type: ignore cloudpickle.dumps, cloudpickle.loads, pickle.HIGHEST_PROTOCOL, @@ -33,13 +35,12 @@ def main() -> None: mpi_size_larger_one = MPI.COMM_WORLD.Get_size() > 1 argument_dict = parse_arguments(argument_lst=sys.argv) + context: Optional[zmq.Context] = None + socket: Optional[zmq.Socket] = None if mpi_rank_zero: context, socket = interface_connect( host=argument_dict["host"], port=argument_dict["zmqport"] ) - else: - context = None - socket = None memory = None @@ -50,10 +51,9 @@ def main() -> None: while True: # Read from socket + input_dict: dict = {} if mpi_rank_zero: input_dict = interface_receive(socket=socket) - else: - input_dict = None input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0) # Parse input diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index a0a44fb3..b8908909 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -5,7 +5,7 @@ from concurrent.futures import ( Future, ) -from typing import Optional +from typing import Callable, List, Optional, Union from executorlib.standalone.inputcheck import check_resource_dict from executorlib.standalone.queue import cancel_items_in_queue @@ -27,8 +27,8 @@ def __init__(self, max_cores: Optional[int] = None): """ cloudpickle_register(ind=3) self._max_cores = max_cores - self._future_queue: queue.Queue = queue.Queue() - self._process: Optional[RaisingThread] = None + self._future_queue: Optional[queue.Queue] = queue.Queue() + self._process: Optional[Union[RaisingThread, List[RaisingThread]]] = None @property def info(self) -> Optional[dict]: @@ -39,13 +39,13 @@ def info(self) -> Optional[dict]: Optional[dict]: Information about the executor. """ if self._process is not None and isinstance(self._process, list): - meta_data_dict = self._process[0]._kwargs.copy() + meta_data_dict = self._process[0].get_kwargs().copy() if "future_queue" in meta_data_dict.keys(): del meta_data_dict["future_queue"] meta_data_dict["max_workers"] = len(self._process) return meta_data_dict elif self._process is not None: - meta_data_dict = self._process._kwargs.copy() + meta_data_dict = self._process.get_kwargs().copy() if "future_queue" in meta_data_dict.keys(): del meta_data_dict["future_queue"] return meta_data_dict @@ -53,7 +53,7 @@ def info(self) -> Optional[dict]: return None @property - def future_queue(self) -> queue.Queue: + def future_queue(self) -> Optional[queue.Queue]: """ Get the future queue. @@ -62,7 +62,7 @@ def future_queue(self) -> queue.Queue: """ return self._future_queue - def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future: + def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future: # type: ignore """ Submits a callable to be executed with the given arguments. @@ -97,16 +97,17 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut "The specified number of cores is larger than the available number of cores." ) check_resource_dict(function=fn) - f = Future() - self._future_queue.put( - { - "fn": fn, - "args": args, - "kwargs": kwargs, - "future": f, - "resource_dict": resource_dict, - } - ) + f: Future = Future() + if self._future_queue is not None: + self._future_queue.put( + { + "fn": fn, + "args": args, + "kwargs": kwargs, + "future": f, + "resource_dict": resource_dict, + } + ) return f def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): @@ -124,11 +125,11 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): futures. Futures that are completed or running will not be cancelled. """ - if cancel_futures: + if cancel_futures and self._future_queue is not None: cancel_items_in_queue(que=self._future_queue) - if self._process is not None: + if self._process is not None and self._future_queue is not None: self._future_queue.put({"shutdown": True, "wait": wait}) - if wait: + if wait and isinstance(self._process, RaisingThread): self._process.join() self._future_queue.join() self._process = None @@ -151,7 +152,10 @@ def __len__(self) -> int: Returns: int: The length of the executor. """ - return self._future_queue.qsize() + queue_size = 0 + if self._future_queue is not None: + queue_size = self._future_queue.qsize() + return queue_size def __del__(self): """ diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 62a66f39..46938005 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -1,5 +1,5 @@ import os -from typing import Optional +from typing import Callable, Optional from executorlib.base.executor import ExecutorBase from executorlib.cache.shared import execute_tasks_h5 @@ -21,7 +21,7 @@ from executorlib.cache.queue_spawner import execute_with_pysqa except ImportError: # If pysqa is not available fall back to executing tasks in a subprocess - execute_with_pysqa = execute_in_subprocess + execute_with_pysqa = execute_in_subprocess # type: ignore class FileExecutor(ExecutorBase): @@ -29,8 +29,8 @@ def __init__( self, cache_directory: str = "cache", resource_dict: Optional[dict] = None, - execute_function: callable = execute_with_pysqa, - terminate_function: Optional[callable] = None, + execute_function: Callable = execute_with_pysqa, + terminate_function: Optional[Callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, disable_dependencies: bool = False, @@ -43,8 +43,8 @@ def __init__( resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - cwd (str/None): current working directory where the parallel python task is executed - execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. - terminate_function (callable, optional): The function to terminate the tasks. + execute_function (Callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. + terminate_function (Callable, optional): The function to terminate the tasks. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. @@ -81,9 +81,9 @@ def __init__( def create_file_executor( - max_workers: int = 1, + max_workers: Optional[int] = None, backend: str = "flux_submission", - max_cores: int = 1, + max_cores: Optional[int] = None, cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, flux_executor=None, @@ -93,7 +93,7 @@ def create_file_executor( pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, - init_function: Optional[callable] = None, + init_function: Optional[Callable] = None, disable_dependencies: bool = False, ): if cache_directory is None: diff --git a/executorlib/cache/queue_spawner.py b/executorlib/cache/queue_spawner.py index e30f44b6..dec1c0e0 100644 --- a/executorlib/cache/queue_spawner.py +++ b/executorlib/cache/queue_spawner.py @@ -16,7 +16,7 @@ def execute_with_pysqa( config_directory: Optional[str] = None, backend: Optional[str] = None, cache_directory: Optional[str] = None, -) -> Tuple[int, int]: +) -> Optional[int]: """ Execute a command by submitting it to the queuing system diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index e3ae504c..8efb72d1 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -3,7 +3,7 @@ import queue import sys from concurrent.futures import Future -from typing import Optional, Tuple +from typing import Any, Callable, Optional, Tuple from executorlib.standalone.command import get_command_path from executorlib.standalone.hdf import dump, get_output @@ -21,7 +21,7 @@ def __init__(self, file_name: str): """ self._file_name = file_name - def result(self) -> str: + def result(self) -> Any: """ Get the result of the future item. @@ -49,9 +49,9 @@ def done(self) -> bool: def execute_tasks_h5( future_queue: queue.Queue, cache_directory: str, - execute_function: callable, + execute_function: Callable, resource_dict: dict, - terminate_function: Optional[callable] = None, + terminate_function: Optional[Callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, disable_dependencies: bool = False, @@ -65,8 +65,8 @@ def execute_tasks_h5( resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - cwd (str/None): current working directory where the parallel python task is executed - execute_function (callable): The function to execute the tasks. - terminate_function (callable): The function to terminate the tasks. + execute_function (Callable): The function to execute the tasks. + terminate_function (Callable): The function to terminate the tasks. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. @@ -74,7 +74,9 @@ def execute_tasks_h5( None """ - memory_dict, process_dict, file_name_dict = {}, {}, {} + memory_dict: dict = {} + process_dict: dict = {} + file_name_dict: dict = {} while True: task_dict = None try: diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 4cd39963..1870a4fa 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -81,15 +81,15 @@ def __init__( }, ) ) - self._future_hash_dict = {} - self._task_hash_dict = {} + self._future_hash_dict: dict = {} + self._task_hash_dict: dict = {} self._plot_dependency_graph_filename = plot_dependency_graph_filename if plot_dependency_graph_filename is None: self._generate_dependency_graph = plot_dependency_graph else: self._generate_dependency_graph = True - def submit( + def submit( # type: ignore self, fn: Callable[..., Any], *args: Any, @@ -100,7 +100,7 @@ def submit( Submits a task to the executor. Args: - fn (callable): The function to be executed. + fn (Callable): The function to be executed. *args: Variable length argument list. resource_dict (dict, optional): A dictionary of resources required by the task. Defaults to {}. **kwargs: Arbitrary keyword arguments. @@ -146,7 +146,7 @@ def __exit__( exc_tb: The traceback object. """ - super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) + super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) # type: ignore if self._generate_dependency_graph: node_lst, edge_lst = generate_nodes_and_edges( task_hash_dict=self._task_hash_dict, @@ -166,14 +166,14 @@ def create_executor( backend: str = "local", max_cores: Optional[int] = None, cache_directory: Optional[str] = None, - resource_dict: Optional[dict] = None, + resource_dict: dict = {}, flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, flux_log_files: bool = False, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, - init_function: Optional[callable] = None, + init_function: Optional[Callable] = None, ): """ Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, @@ -218,16 +218,20 @@ def create_executor( if flux_executor is not None and backend != "flux_allocation": backend = "flux_allocation" check_pmi(backend=backend, pmi=flux_executor_pmi_mode) - cores_per_worker = resource_dict["cores"] + cores_per_worker = resource_dict.get("cores", 1) resource_dict["cache_directory"] = cache_directory resource_dict["hostname_localhost"] = hostname_localhost if backend == "flux_allocation": - check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) + check_oversubscribe( + oversubscribe=resource_dict.get("openmpi_oversubscribe", False) + ) check_command_line_argument_lst( - command_line_argument_lst=resource_dict["slurm_cmd_args"] + command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) ) - del resource_dict["openmpi_oversubscribe"] - del resource_dict["slurm_cmd_args"] + if "openmpi_oversubscribe" in resource_dict.keys(): + del resource_dict["openmpi_oversubscribe"] + if "slurm_cmd_args" in resource_dict.keys(): + del resource_dict["slurm_cmd_args"] resource_dict["flux_executor"] = flux_executor resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode resource_dict["flux_executor_nesting"] = flux_executor_nesting @@ -243,7 +247,7 @@ def create_executor( validate_max_workers_flux( max_workers=max_workers, cores=cores_per_worker, - threads_per_core=resource_dict["threads_per_core"], + threads_per_core=resource_dict.get("threads_per_core", 1), ) return InteractiveExecutor( max_workers=max_workers, @@ -272,7 +276,7 @@ def create_executor( validate_max_workers_slurm( max_workers=max_workers, cores=cores_per_worker, - threads_per_core=resource_dict["threads_per_core"], + threads_per_core=resource_dict.get("threads_per_core", 1), ) return InteractiveExecutor( max_workers=max_workers, @@ -290,13 +294,16 @@ def create_executor( check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) check_flux_log_files(flux_log_files=flux_log_files) - check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) + check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0)) check_command_line_argument_lst( - command_line_argument_lst=resource_dict["slurm_cmd_args"] + command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) ) - del resource_dict["threads_per_core"] - del resource_dict["gpus_per_core"] - del resource_dict["slurm_cmd_args"] + if "threads_per_core" in resource_dict.keys(): + del resource_dict["threads_per_core"] + if "gpus_per_core" in resource_dict.keys(): + del resource_dict["gpus_per_core"] + if "slurm_cmd_args" in resource_dict.keys(): + del resource_dict["slurm_cmd_args"] if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( diff --git a/executorlib/interactive/flux.py b/executorlib/interactive/flux.py index 2c324828..f1166a70 100644 --- a/executorlib/interactive/flux.py +++ b/executorlib/interactive/flux.py @@ -117,12 +117,13 @@ def shutdown(self, wait: bool = True): Args: wait (bool, optional): Whether to wait for the execution to complete. Defaults to True. """ - if self.poll(): - self._future.cancel() - # The flux future objects are not instantly updated, - # still showing running after cancel was called, - # so we wait until the execution is completed. - self._future.result() + if self._future is not None: + if self.poll(): + self._future.cancel() + # The flux future objects are not instantly updated, + # still showing running after cancel was called, + # so we wait until the execution is completed. + self._future.result() def poll(self): """ diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 41b99311..4068b84d 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -5,7 +5,7 @@ import time from concurrent.futures import Future from time import sleep -from typing import Callable, List, Optional +from typing import Any, Callable, List, Optional, Tuple, Union from executorlib.base.executor import ExecutorBase, cancel_items_in_queue from executorlib.standalone.command import get_command_path @@ -23,7 +23,7 @@ class ExecutorBroker(ExecutorBase): - def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future: + def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future: # type: ignore """ Submits a callable to be executed with the given arguments. @@ -31,7 +31,7 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut a Future instance representing the execution of the callable. Args: - fn (callable): function to submit for execution + fn (Callable): function to submit for execution args: arguments for the submitted function kwargs: keyword arguments for the submitted function resource_dict (dict): resource dictionary, which defines the resources used for the execution of the @@ -50,8 +50,11 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut """ check_resource_dict_is_empty(resource_dict=resource_dict) check_resource_dict(function=fn) - f = Future() - self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) + f: Future = Future() + if self._future_queue is not None: + self._future_queue.put( + {"fn": fn, "args": args, "kwargs": kwargs, "future": f} + ) return f def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): @@ -68,19 +71,20 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): futures. Futures that are completed or running will not be cancelled. """ - if cancel_futures: - cancel_items_in_queue(que=self._future_queue) - if self._process is not None: - for _ in range(len(self._process)): - self._future_queue.put({"shutdown": True, "wait": wait}) - if wait: - for process in self._process: - process.join() - self._future_queue.join() + if self._future_queue is not None: + if cancel_futures: + cancel_items_in_queue(que=self._future_queue) + if isinstance(self._process, list): + for _ in range(len(self._process)): + self._future_queue.put({"shutdown": True, "wait": wait}) + if wait: + for process in self._process: + process.join() + self._future_queue.join() self._process = None self._future_queue = None - def _set_process(self, process: List[RaisingThread]): + def _set_process(self, process: List[RaisingThread]): # type: ignore """ Set the process for the executor. @@ -88,8 +92,8 @@ def _set_process(self, process: List[RaisingThread]): process (List[RaisingThread]): The process for the executor. """ self._process = process - for process in self._process: - process.start() + for process_instance in self._process: + process_instance.start() class InteractiveExecutor(ExecutorBroker): @@ -130,7 +134,7 @@ def __init__( self, max_workers: int = 1, executor_kwargs: dict = {}, - spawner: BaseSpawner = MpiExecSpawner, + spawner: type[BaseSpawner] = MpiExecSpawner, ): super().__init__(max_cores=executor_kwargs.get("max_cores", None)) executor_kwargs["future_queue"] = self._future_queue @@ -184,7 +188,7 @@ def __init__( max_cores: Optional[int] = None, max_workers: Optional[int] = None, executor_kwargs: dict = {}, - spawner: BaseSpawner = MpiExecSpawner, + spawner: type[BaseSpawner] = MpiExecSpawner, ): super().__init__(max_cores=executor_kwargs.get("max_cores", None)) executor_kwargs["future_queue"] = self._future_queue @@ -202,7 +206,7 @@ def __init__( def execute_parallel_tasks( future_queue: queue.Queue, cores: int = 1, - spawner: BaseSpawner = MpiExecSpawner, + spawner: type[BaseSpawner] = MpiExecSpawner, hostname_localhost: Optional[bool] = None, init_function: Optional[Callable] = None, cache_directory: Optional[str] = None, @@ -223,7 +227,7 @@ def execute_parallel_tasks( points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true - init_function (callable): optional function to preset arguments for functions which are submitted later + init_function (Callable): optional function to preset arguments for functions which are submitted later cache_directory (str, optional): The directory to store cache files. Defaults to "cache". queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. """ @@ -262,7 +266,7 @@ def execute_parallel_tasks( def execute_separate_tasks( future_queue: queue.Queue, - spawner: BaseSpawner = MpiExecSpawner, + spawner: type[BaseSpawner] = MpiExecSpawner, max_cores: Optional[int] = None, max_workers: Optional[int] = None, hostname_localhost: Optional[bool] = None, @@ -286,8 +290,9 @@ def execute_separate_tasks( this look up for security reasons. So on MacOS it is required to set this option to true """ - active_task_dict = {} - process_lst, qtask_lst = [], [] + active_task_dict: dict = {} + process_lst: list = [] + qtask_lst: list = [] if "cores" not in kwargs.keys(): kwargs["cores"] = 1 while True: @@ -299,7 +304,7 @@ def execute_separate_tasks( future_queue.join() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - qtask = queue.Queue() + qtask: queue.Queue = queue.Queue() process, active_task_dict = _submit_function_to_separate_process( task_dict=task_dict, qtask=qtask, @@ -457,7 +462,7 @@ def _submit_waiting_task(wait_lst: List[dict], executor_queue: queue.Queue) -> l return wait_tmp_lst -def _update_futures_in_input(args: tuple, kwargs: dict): +def _update_futures_in_input(args: tuple, kwargs: dict) -> Tuple[tuple, dict]: """ Evaluate future objects in the arguments and keyword arguments by calling future.result() @@ -469,7 +474,7 @@ def _update_futures_in_input(args: tuple, kwargs: dict): tuple, dict: arguments and keyword arguments with each future object in them being evaluated """ - def get_result(arg): + def get_result(arg: Union[List[Future], Future]) -> Any: if isinstance(arg, Future): return arg.result() elif isinstance(arg, list): @@ -477,7 +482,7 @@ def get_result(arg): else: return arg - args = [get_result(arg=arg) for arg in args] + args = tuple([get_result(arg=arg) for arg in args]) kwargs = {key: get_result(arg=value) for key, value in kwargs.items()} return args, kwargs @@ -488,7 +493,7 @@ def _get_future_objects_from_input(task_dict: dict): Args: task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys - {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} Returns: list, boolean: list of future objects and boolean flag if all future objects are already done @@ -514,7 +519,7 @@ def _submit_function_to_separate_process( task_dict: dict, active_task_dict: dict, qtask: queue.Queue, - spawner: BaseSpawner, + spawner: type[BaseSpawner], executor_kwargs: dict, max_cores: Optional[int] = None, max_workers: Optional[int] = None, @@ -524,7 +529,7 @@ def _submit_function_to_separate_process( Submit function to be executed in separate Python process Args: task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys - {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} active_task_dict (dict): Dictionary containing the future objects and the number of cores they require qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function spawner (BaseSpawner): Interface to start process on selected compute resources @@ -586,7 +591,7 @@ def _execute_task( Args: interface (SocketInterface): socket interface for zmq communication task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys - {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} future_queue (Queue): Queue for receiving new tasks. """ f = task_dict.pop("future") @@ -614,7 +619,7 @@ def _execute_task_with_cache( Args: interface (SocketInterface): socket interface for zmq communication task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys - {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} future_queue (Queue): Queue for receiving new tasks. cache_directory (str): The directory to store cache files. """ diff --git a/executorlib/interactive/slurm.py b/executorlib/interactive/slurm.py index 7ed465d1..8ae83547 100644 --- a/executorlib/interactive/slurm.py +++ b/executorlib/interactive/slurm.py @@ -75,7 +75,7 @@ def generate_command(self, command_lst: list[str]) -> list[str]: def generate_slurm_command( cores: int, - cwd: str, + cwd: Optional[str], threads_per_core: int = 1, gpus_per_core: int = 0, openmpi_oversubscribe: bool = False, diff --git a/executorlib/standalone/__init__.py b/executorlib/standalone/__init__.py index 69cdac02..c14857eb 100644 --- a/executorlib/standalone/__init__.py +++ b/executorlib/standalone/__init__.py @@ -10,12 +10,12 @@ from executorlib.standalone.thread import RaisingThread __all__ = [ - SocketInterface, - interface_bootup, - interface_connect, - interface_send, - interface_shutdown, - interface_receive, - RaisingThread, - MpiExecSpawner, + "SocketInterface", + "interface_bootup", + "interface_connect", + "interface_send", + "interface_shutdown", + "interface_receive", + "RaisingThread", + "MpiExecSpawner", ] diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index 61557a15..1a8b05ca 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -1,5 +1,5 @@ import os -from typing import List, Optional, Tuple +from typing import Any, List, Optional, Tuple import cloudpickle import h5py @@ -15,7 +15,7 @@ } -def dump(file_name: str, data_dict: dict) -> None: +def dump(file_name: Optional[str], data_dict: dict) -> None: """ Dump data dictionary into HDF5 file @@ -23,13 +23,14 @@ def dump(file_name: str, data_dict: dict) -> None: file_name (str): file name of the HDF5 file as absolute path data_dict (dict): dictionary containing the python function to be executed {"fn": ..., "args": (), "kwargs": {}} """ - with h5py.File(file_name, "a") as fname: - for data_key, data_value in data_dict.items(): - if data_key in group_dict.keys(): - fname.create_dataset( - name="/" + group_dict[data_key], - data=np.void(cloudpickle.dumps(data_value)), - ) + if file_name is not None: + with h5py.File(file_name, "a") as fname: + for data_key, data_value in data_dict.items(): + if data_key in group_dict.keys(): + fname.create_dataset( + name="/" + group_dict[data_key], + data=np.void(cloudpickle.dumps(data_value)), + ) def load(file_name: str) -> dict: @@ -59,7 +60,7 @@ def load(file_name: str) -> dict: return data_dict -def get_output(file_name: str) -> Tuple[bool, object]: +def get_output(file_name: str) -> Tuple[bool, Any]: """ Check if output is available in the HDF5 file @@ -93,12 +94,12 @@ def get_runtime(file_name: str) -> float: return 0.0 -def get_queue_id(file_name: str) -> Optional[int]: - with h5py.File(file_name, "r") as hdf: - if "queue_id" in hdf: - return cloudpickle.loads(np.void(hdf["/queue_id"])) - else: - return None +def get_queue_id(file_name: Optional[str]) -> Optional[int]: + if file_name is not None: + with h5py.File(file_name, "r") as hdf: + if "queue_id" in hdf: + return cloudpickle.loads(np.void(hdf["/queue_id"])) + return None def get_cache_data(cache_directory: str) -> List[dict]: diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index e56bc500..646d695c 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -100,19 +100,24 @@ def check_plot_dependency_graph(plot_dependency_graph: bool) -> None: ) -def check_pmi(backend: str, pmi: Optional[str]) -> None: +def check_pmi(backend: Optional[str], pmi: Optional[str]) -> None: """ Check if pmi is valid for the selected backend and raise a ValueError if it is not. """ - if backend != "flux_allocation" and pmi is not None: - raise ValueError("The pmi parameter is currently only implemented for flux.") - elif backend == "flux_allocation" and pmi not in ["pmix", "pmi1", "pmi2", None]: - raise ValueError( - "The pmi parameter supports [pmix, pmi1, pmi2], but not: " + pmi - ) + if backend is not None: + if backend != "flux_allocation" and pmi is not None: + raise ValueError( + "The pmi parameter is currently only implemented for flux." + ) + elif backend == "flux_allocation" and pmi not in ["pmix", "pmi1", "pmi2", None]: + raise ValueError( + "The pmi parameter supports [pmix, pmi1, pmi2], but not: " + str(pmi) + ) -def check_init_function(block_allocation: bool, init_function: Callable) -> None: +def check_init_function( + block_allocation: bool, init_function: Optional[Callable] +) -> None: """ Check if block_allocation is False and init_function is not None, and raise a ValueError if it is. """ @@ -170,25 +175,26 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: def validate_number_of_cores( max_cores: Optional[int] = None, max_workers: Optional[int] = None, - cores_per_worker: Optional[int] = None, + cores_per_worker: Optional[int] = 1, set_local_cores: bool = False, ) -> int: """ Validate the number of cores and return the appropriate value. """ - if max_cores is None and max_workers is None: - if not set_local_cores: + if max_cores is not None and max_workers is None and cores_per_worker is not None: + return int(max_cores / cores_per_worker) + elif max_workers is not None: + return int(max_workers) + else: + if max_cores is None and max_workers is None and not set_local_cores: raise ValueError( "Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined." ) else: - max_workers = multiprocessing.cpu_count() - elif max_cores is not None and max_workers is None: - max_workers = int(max_cores / cores_per_worker) - return max_workers + return multiprocessing.cpu_count() -def check_file_exists(file_name: str): +def check_file_exists(file_name: Optional[str]): if file_name is None: raise ValueError("file_name is not set.") if not os.path.exists(file_name): diff --git a/executorlib/standalone/interactive/backend.py b/executorlib/standalone/interactive/backend.py index 6c611bd2..f519a340 100644 --- a/executorlib/standalone/interactive/backend.py +++ b/executorlib/standalone/interactive/backend.py @@ -1,20 +1,20 @@ import inspect -from typing import Optional +from typing import Any, Callable, Optional def call_funct( - input_dict: dict, funct: Optional[callable] = None, memory: Optional[dict] = None -) -> callable: + input_dict: dict, funct: Optional[Callable] = None, memory: Optional[dict] = None +) -> Any: """ Call function from dictionary Args: input_dict (dict): dictionary containing the function 'fn', its arguments 'args' and keyword arguments 'kwargs' - funct (callable, optional): function to be evaluated if it is not included in the input dictionary + funct (Callable, optional): function to be evaluated if it is not included in the input dictionary memory (dict, optional): variables stored in memory which can be used as keyword arguments Returns: - callable: Result of the function + Any: Result of the function """ if funct is None: diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 551fd112..be8f609a 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -175,7 +175,7 @@ def interface_connect(host: str, port: str) -> Tuple[zmq.Context, zmq.Socket]: return context, socket -def interface_send(socket: zmq.Socket, result_dict: dict): +def interface_send(socket: Optional[zmq.Socket], result_dict: dict): """ Send results to a SocketInterface instance. @@ -183,20 +183,24 @@ def interface_send(socket: zmq.Socket, result_dict: dict): socket (zmq.Socket): socket for the connection result_dict (dict): dictionary to be sent, supported keys are result, error and error_type. """ - socket.send(cloudpickle.dumps(result_dict)) + if socket is not None: + socket.send(cloudpickle.dumps(result_dict)) -def interface_receive(socket: zmq.Socket) -> dict: +def interface_receive(socket: Optional[zmq.Socket]) -> dict: """ Receive instructions from a SocketInterface instance. Args: socket (zmq.Socket): socket for the connection """ - return cloudpickle.loads(socket.recv()) + if socket is not None: + return cloudpickle.loads(socket.recv()) + else: + return {} -def interface_shutdown(socket: zmq.Socket, context: zmq.Context): +def interface_shutdown(socket: Optional[zmq.Socket], context: Optional[zmq.Context]): """ Close the connection to a SocketInterface instance. @@ -204,5 +208,6 @@ def interface_shutdown(socket: zmq.Socket, context: zmq.Context): socket (zmq.Socket): socket for the connection context (zmq.sugar.context.Context): context for the connection """ - socket.close() - context.term() + if socket is not None and context is not None: + socket.close() + context.term() diff --git a/executorlib/standalone/interactive/spawner.py b/executorlib/standalone/interactive/spawner.py index 9886eabb..0e6f6a8c 100644 --- a/executorlib/standalone/interactive/spawner.py +++ b/executorlib/standalone/interactive/spawner.py @@ -6,7 +6,12 @@ class BaseSpawner(ABC): - def __init__(self, cwd: str, cores: int = 1, openmpi_oversubscribe: bool = False): + def __init__( + self, + cwd: Optional[str] = None, + cores: int = 1, + openmpi_oversubscribe: bool = False, + ): """ Base class for interface implementations. @@ -72,7 +77,7 @@ def __init__( cores=cores, openmpi_oversubscribe=openmpi_oversubscribe, ) - self._process = None + self._process: Optional[subprocess.Popen] = None self._threads_per_core = threads_per_core def bootup( @@ -110,10 +115,11 @@ def shutdown(self, wait: bool = True): Args: wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True. """ - self._process.communicate() - self._process.terminate() - if wait: - self._process.wait() + if self._process is not None: + self._process.communicate() + self._process.terminate() + if wait: + self._process.wait() self._process = None def poll(self) -> bool: diff --git a/executorlib/standalone/plot.py b/executorlib/standalone/plot.py index 3c5cc252..1d1a6c39 100644 --- a/executorlib/standalone/plot.py +++ b/executorlib/standalone/plot.py @@ -18,8 +18,9 @@ def generate_nodes_and_edges( Returns: Tuple[list, list]: Tuple containing the list of nodes and the list of edges. """ - node_lst, edge_lst = [], [] - hash_id_dict = {} + node_lst: list = [] + edge_lst: list = [] + hash_id_dict: dict = {} def add_element(arg, link_to, label=""): """ diff --git a/executorlib/standalone/serialize.py b/executorlib/standalone/serialize.py index 74bb1729..0cb60723 100644 --- a/executorlib/standalone/serialize.py +++ b/executorlib/standalone/serialize.py @@ -1,7 +1,7 @@ import hashlib import inspect import re -from typing import Any, Tuple +from typing import Callable, Tuple import cloudpickle @@ -29,13 +29,13 @@ def cloudpickle_register(ind: int = 2): def serialize_funct_h5( - fn: callable, fn_args: list = [], fn_kwargs: dict = {}, resource_dict: dict = {} + fn: Callable, fn_args: list = [], fn_kwargs: dict = {}, resource_dict: dict = {} ) -> Tuple[str, dict]: """ Serialize a function and its arguments and keyword arguments into an HDF5 file. Args: - fn (callable): The function to be serialized. + fn (Callable): The function to be serialized. fn_args (list): The arguments of the function. fn_kwargs (dict): The keyword arguments of the function. resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. diff --git a/executorlib/standalone/thread.py b/executorlib/standalone/thread.py index d622ef1c..f9cdaa2c 100644 --- a/executorlib/standalone/thread.py +++ b/executorlib/standalone/thread.py @@ -21,6 +21,9 @@ def __init__( ) self._exception = None + def get_kwargs(self): + return self._kwargs + def run(self) -> None: """ Run the thread's target function and catch any exceptions raised. diff --git a/tests/test_integration_pyiron_workflow.py b/tests/test_integration_pyiron_workflow.py index 64f0eed6..5eb4ba86 100644 --- a/tests/test_integration_pyiron_workflow.py +++ b/tests/test_integration_pyiron_workflow.py @@ -9,6 +9,7 @@ from concurrent.futures._base import TimeoutError as cfbTimeoutError from functools import partialmethod from time import sleep +from typing import Callable import unittest from executorlib import Executor @@ -21,7 +22,7 @@ class Foo: its paces. """ - def __init__(self, fnc: callable): + def __init__(self, fnc: Callable): self.fnc = fnc self.result = None self.running = False @@ -44,7 +45,7 @@ def dynamic_foo(): Overrides the `fnc` input of `Foo` with the decorated function. """ - def as_dynamic_foo(fnc: callable): + def as_dynamic_foo(fnc: Callable): return type( "DynamicFoo", (Foo,), # Define parentage diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 42b8bb55..28917c4e 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -86,6 +86,9 @@ def test_interface_serial(self): class TestZMQ(unittest.TestCase): + def test_interface_receive(self): + self.assertEqual(len(interface_receive(socket=None)), 0) + def test_initialize_zmq(self): message = "test" host = "localhost" diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 5f682a8b..d35a9611 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -109,10 +109,6 @@ def test_validate_number_of_cores(self): validate_number_of_cores( max_cores=None, max_workers=None, cores_per_worker=None ) - with self.assertRaises(TypeError): - validate_number_of_cores( - max_cores=1, max_workers=None, cores_per_worker=None - ) self.assertIsInstance( validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1), int,