Skip to content

Commit

Permalink
[project.entry-points."fluidimage.executors"]
Browse files Browse the repository at this point in the history
  • Loading branch information
paugier committed Feb 20, 2024
1 parent c52c472 commit c1379b7
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 28 deletions.
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ pims = ["pims"]
opencv = ["opencv-python"]
# qt = ["PySide6"]

[project.entry-points."fluidimage.executors"]
exec_sequential = "fluidimage.executors.exec_sequential"
exec_async = "fluidimage.executors.exec_async"
exec_async_sequential = "fluidimage.executors.exec_async_sequential"
multi_exec_async = "fluidimage.executors.multi_exec_async"
exec_async_multi = "fluidimage.executors.exec_async_multiproc"
exec_async_servers = "fluidimage.executors.exec_async_servers"
exec_async_servers_threading = "fluidimage.executors.exec_async_servers_threading"

[tool.meson-python.args]
setup = ['-Doptimization=3']

Expand Down
100 changes: 79 additions & 21 deletions src/fluidimage/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@
"""

import importlib
import os
import sys

if sys.version_info < (3, 10):
from importlib_metadata import entry_points, EntryPoint
else:
from importlib.metadata import entry_points, EntryPoint


import trio

Expand All @@ -38,24 +46,74 @@ def afterfork():
os.register_at_fork(after_in_child=afterfork)

from .base import ExecutorBase
from .exec_async import ExecutorAsync
from .exec_async_multiproc import ExecutorAsyncMultiproc
from .exec_async_sequential import ExecutorAsyncSequential
from .exec_async_servers import (
ExecutorAsyncServers,
ExecutorAsyncServersThreading,
)
from .exec_sequential import ExecutorSequential
from .multi_exec_async import MultiExecutorAsync

executors = {
"exec_sequential": ExecutorSequential,
"exec_async": ExecutorAsync,
"exec_async_sequential": ExecutorAsyncSequential,
"multi_exec_async": MultiExecutorAsync,
"exec_async_multi": ExecutorAsyncMultiproc,
"exec_async_servers": ExecutorAsyncServers,
"exec_async_servers_threading": ExecutorAsyncServersThreading,
}

__all__ = ["ExecutorBase", "executors"]


_entry_points = None


def get_entry_points(reload=False, ndim=None, sequential=None):
"""Discover the executors installed"""
global _entry_points
if _entry_points is None or reload:
_entry_points = entry_points(group="fluidimage.executors")

if not _entry_points:
raise RuntimeError("No executor were found.")

return _entry_points


def get_executor_names():
"""Get available executor names"""
return set(entry_point.name for entry_point in get_entry_points())


def _get_module_fullname_from_name(name):
"""Get the module name from an executor name
Parameters
----------
name : str
Name of an executor.
"""
entry_points = get_entry_points()
selected_entry_points = entry_points.select(name=name)
if len(selected_entry_points) == 0:
raise ValueError(f"Cannot find an executor for {name = }. {entry_points}")
elif len(selected_entry_points) > 1:
logging.warning(
f"{len(selected_entry_points)} plugins were found for {name = }"
)

return selected_entry_points[name].value


def import_executor_class(name):
"""Import an executor class.
Parameters
----------
name : str
Executor name.
Returns
-------
The corresponding executor class.
"""

if isinstance(name, EntryPoint):
module_fullname = name.value
name = name.name
else:
module_fullname = _get_module_fullname_from_name(name)

mod = importlib.import_module(module_fullname)
return mod.Executor


__all__ = ["ExecutorBase", "get_executor_names", "import_executor_class"]
3 changes: 3 additions & 0 deletions src/fluidimage/executors/exec_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,6 @@ async def update_has_to_stop(self):
)

await trio.sleep(self.sleep_time)


Executor = ExecutorAsync
3 changes: 3 additions & 0 deletions src/fluidimage/executors/exec_async_multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,6 @@ def start_process_and_check(index_attempt):
if work.output_queue is not None:
work.output_queue[key] = ret
self.nb_working_workers_cpu -= 1


Executor = ExecutorAsyncMultiproc
3 changes: 3 additions & 0 deletions src/fluidimage/executors/exec_async_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,6 @@ async def async_run_work_cpu(self, work):
if work.output_queue is not None:
work.output_queue[key] = ret
self.nb_working_workers_cpu -= 1


Executor = ExecutorAsyncSequential
5 changes: 1 addition & 4 deletions src/fluidimage/executors/exec_async_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,4 @@ def get_available_worker(self):
return worker


class ExecutorAsyncServersThreading(ExecutorAsyncServers):
"""Just used to get a better coverage"""

_type_server = "threading"
Executor = ExecutorAsyncServers
10 changes: 10 additions & 0 deletions src/fluidimage/executors/exec_async_servers_threading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .exec_async_servers import ExecutorAsyncServers


class ExecutorAsyncServersThreading(ExecutorAsyncServers):
"""Just used to get a better coverage"""

_type_server = "threading"


Executor = ExecutorAsyncServersThreading
3 changes: 3 additions & 0 deletions src/fluidimage/executors/exec_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ def _run_works(self):

if work.output_queue is not None:
work.output_queue[key] = ret


Executor = ExecutorSequential
1 change: 1 addition & 0 deletions src/fluidimage/executors/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ python_sources = [
'exec_async_multiproc.py',
'exec_async_sequential.py',
'exec_async_servers.py',
'exec_async_servers_threading.py',
'exec_sequential.py',
'multi_exec_async.py',
'servers.py',
Expand Down
3 changes: 3 additions & 0 deletions src/fluidimage/executors/multi_exec_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,6 @@ def wait_for_all_processes(self):

for process in self.processes:
process.join()


Executor = MultiExecutorAsync
6 changes: 3 additions & 3 deletions src/fluidimage/topologies/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from fluidimage.util import cstring, logger

from ..executors import ExecutorBase, executors
from ..executors import ExecutorBase, import_executor_class, get_executor_names


class Work:
Expand Down Expand Up @@ -195,13 +195,13 @@ def compute(
executor = "multi_exec_async"

if not isinstance(executor, ExecutorBase):
if executor not in executors:
if executor not in get_executor_names():
raise NotImplementedError(f"executor {executor} does not exist")

if nb_max_workers is None:
nb_max_workers = self.nb_max_workers

exec_class = executors[executor]
exec_class = import_executor_class(executor)
self.executor = exec_class(
self,
path_dir_result=self.path_dir_result,
Expand Down

0 comments on commit c1379b7

Please sign in to comment.