diff --git a/pyproject.toml b/pyproject.toml index e7f1e2ea..956b82b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,6 +60,15 @@ pims = ["pims"] opencv = ["opencv-python"] # qt = ["PySide6"] +[project.entry-points."fluidimage.executors"] +exec_sequential = "fluidimage.executors.exec_sequential" +exec_async = "fluidimage.executors.exec_async" +exec_async_sequential = "fluidimage.executors.exec_async_sequential" +multi_exec_async = "fluidimage.executors.multi_exec_async" +exec_async_multi = "fluidimage.executors.exec_async_multiproc" +exec_async_servers = "fluidimage.executors.exec_async_servers" +exec_async_servers_threading = "fluidimage.executors.exec_async_servers_threading" + [tool.meson-python.args] setup = ['-Doptimization=3'] diff --git a/src/fluidimage/executors/__init__.py b/src/fluidimage/executors/__init__.py index ec4cdda1..b552e8ca 100644 --- a/src/fluidimage/executors/__init__.py +++ b/src/fluidimage/executors/__init__.py @@ -25,7 +25,15 @@ """ +import importlib import os +import sys + +if sys.version_info < (3, 10): + from importlib_metadata import entry_points, EntryPoint +else: + from importlib.metadata import entry_points, EntryPoint + import trio @@ -38,24 +46,74 @@ def afterfork(): os.register_at_fork(after_in_child=afterfork) from .base import ExecutorBase -from .exec_async import ExecutorAsync -from .exec_async_multiproc import ExecutorAsyncMultiproc -from .exec_async_sequential import ExecutorAsyncSequential -from .exec_async_servers import ( - ExecutorAsyncServers, - ExecutorAsyncServersThreading, -) -from .exec_sequential import ExecutorSequential -from .multi_exec_async import MultiExecutorAsync - -executors = { - "exec_sequential": ExecutorSequential, - "exec_async": ExecutorAsync, - "exec_async_sequential": ExecutorAsyncSequential, - "multi_exec_async": MultiExecutorAsync, - "exec_async_multi": ExecutorAsyncMultiproc, - "exec_async_servers": ExecutorAsyncServers, - "exec_async_servers_threading": ExecutorAsyncServersThreading, -} - -__all__ = ["ExecutorBase", "executors"] + + +_entry_points = None + + +def get_entry_points(reload=False, ndim=None, sequential=None): + """Discover the executors installed""" + global _entry_points + if _entry_points is None or reload: + _entry_points = entry_points(group="fluidimage.executors") + + if not _entry_points: + raise RuntimeError("No executor were found.") + + return _entry_points + + +def get_executor_names(): + """Get available executor names""" + return set(entry_point.name for entry_point in get_entry_points()) + + +def _get_module_fullname_from_name(name): + """Get the module name from an executor name + + Parameters + ---------- + + name : str + Name of an executor. + + """ + entry_points = get_entry_points() + selected_entry_points = entry_points.select(name=name) + if len(selected_entry_points) == 0: + raise ValueError(f"Cannot find an executor for {name = }. {entry_points}") + elif len(selected_entry_points) > 1: + logging.warning( + f"{len(selected_entry_points)} plugins were found for {name = }" + ) + + return selected_entry_points[name].value + + +def import_executor_class(name): + """Import an executor class. + + Parameters + ---------- + + name : str + Executor name. + + Returns + ------- + + The corresponding executor class. + + """ + + if isinstance(name, EntryPoint): + module_fullname = name.value + name = name.name + else: + module_fullname = _get_module_fullname_from_name(name) + + mod = importlib.import_module(module_fullname) + return mod.Executor + + +__all__ = ["ExecutorBase", "get_executor_names", "import_executor_class"] diff --git a/src/fluidimage/executors/exec_async.py b/src/fluidimage/executors/exec_async.py index 6144c0b1..a8e3d970 100644 --- a/src/fluidimage/executors/exec_async.py +++ b/src/fluidimage/executors/exec_async.py @@ -327,3 +327,6 @@ async def update_has_to_stop(self): ) await trio.sleep(self.sleep_time) + + +Executor = ExecutorAsync diff --git a/src/fluidimage/executors/exec_async_multiproc.py b/src/fluidimage/executors/exec_async_multiproc.py index f0b347c8..ccc18fcd 100644 --- a/src/fluidimage/executors/exec_async_multiproc.py +++ b/src/fluidimage/executors/exec_async_multiproc.py @@ -132,3 +132,6 @@ def start_process_and_check(index_attempt): if work.output_queue is not None: work.output_queue[key] = ret self.nb_working_workers_cpu -= 1 + + +Executor = ExecutorAsyncMultiproc diff --git a/src/fluidimage/executors/exec_async_sequential.py b/src/fluidimage/executors/exec_async_sequential.py index 02acb4c0..0808ef0d 100644 --- a/src/fluidimage/executors/exec_async_sequential.py +++ b/src/fluidimage/executors/exec_async_sequential.py @@ -76,3 +76,6 @@ async def async_run_work_cpu(self, work): if work.output_queue is not None: work.output_queue[key] = ret self.nb_working_workers_cpu -= 1 + + +Executor = ExecutorAsyncSequential diff --git a/src/fluidimage/executors/exec_async_servers.py b/src/fluidimage/executors/exec_async_servers.py index d005f79d..05237e6f 100644 --- a/src/fluidimage/executors/exec_async_servers.py +++ b/src/fluidimage/executors/exec_async_servers.py @@ -267,7 +267,4 @@ def get_available_worker(self): return worker -class ExecutorAsyncServersThreading(ExecutorAsyncServers): - """Just used to get a better coverage""" - - _type_server = "threading" +Executor = ExecutorAsyncServers diff --git a/src/fluidimage/executors/exec_async_servers_threading.py b/src/fluidimage/executors/exec_async_servers_threading.py new file mode 100644 index 00000000..78d3dbb7 --- /dev/null +++ b/src/fluidimage/executors/exec_async_servers_threading.py @@ -0,0 +1,10 @@ +from .exec_async_servers import ExecutorAsyncServers + + +class ExecutorAsyncServersThreading(ExecutorAsyncServers): + """Just used to get a better coverage""" + + _type_server = "threading" + + +Executor = ExecutorAsyncServersThreading diff --git a/src/fluidimage/executors/exec_sequential.py b/src/fluidimage/executors/exec_sequential.py index ba4c2fb6..fccc63a7 100644 --- a/src/fluidimage/executors/exec_sequential.py +++ b/src/fluidimage/executors/exec_sequential.py @@ -65,3 +65,6 @@ def _run_works(self): if work.output_queue is not None: work.output_queue[key] = ret + + +Executor = ExecutorSequential diff --git a/src/fluidimage/executors/meson.build b/src/fluidimage/executors/meson.build index ae4a51bb..abc471be 100644 --- a/src/fluidimage/executors/meson.build +++ b/src/fluidimage/executors/meson.build @@ -6,6 +6,7 @@ python_sources = [ 'exec_async_multiproc.py', 'exec_async_sequential.py', 'exec_async_servers.py', + 'exec_async_servers_threading.py', 'exec_sequential.py', 'multi_exec_async.py', 'servers.py', diff --git a/src/fluidimage/executors/multi_exec_async.py b/src/fluidimage/executors/multi_exec_async.py index 6fc2d3d0..bafc6fdf 100644 --- a/src/fluidimage/executors/multi_exec_async.py +++ b/src/fluidimage/executors/multi_exec_async.py @@ -335,3 +335,6 @@ def wait_for_all_processes(self): for process in self.processes: process.join() + + +Executor = MultiExecutorAsync diff --git a/src/fluidimage/topologies/base.py b/src/fluidimage/topologies/base.py index 4bacb903..3a65b3fa 100644 --- a/src/fluidimage/topologies/base.py +++ b/src/fluidimage/topologies/base.py @@ -20,7 +20,7 @@ from fluidimage.util import cstring, logger -from ..executors import ExecutorBase, executors +from ..executors import ExecutorBase, import_executor_class, get_executor_names class Work: @@ -195,13 +195,13 @@ def compute( executor = "multi_exec_async" if not isinstance(executor, ExecutorBase): - if executor not in executors: + if executor not in get_executor_names(): raise NotImplementedError(f"executor {executor} does not exist") if nb_max_workers is None: nb_max_workers = self.nb_max_workers - exec_class = executors[executor] + exec_class = import_executor_class(executor) self.executor = exec_class( self, path_dir_result=self.path_dir_result,