diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index f56a349e..7c8918cd 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -190,7 +190,7 @@ def executor_broker( def execute_task_dict(task_dict, meta_future_lst): - if "fn" in task_dict.keys(): + if "fn" in task_dict.keys() or "future" in task_dict.keys(): meta_future = next(as_completed(meta_future_lst.keys())) executor = meta_future_lst.pop(meta_future) executor.future_queue.put(task_dict) diff --git a/pympipool/shell/__init__.py b/pympipool/shell/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py new file mode 100644 index 00000000..fe37ac09 --- /dev/null +++ b/pympipool/shell/executor.py @@ -0,0 +1,75 @@ +from concurrent.futures import Future +import subprocess + +from pympipool.shared.executorbase import ( + executor_broker, ExecutorBase +) +from pympipool.shared.thread import RaisingThread + + +def execute_single_task(future_queue): + while True: + task_dict = future_queue.get() + if "shutdown" in task_dict.keys() and task_dict["shutdown"]: + future_queue.task_done() + future_queue.join() + break + elif "future" in task_dict.keys(): + f = task_dict.pop("future") + if f.set_running_or_notify_cancel(): + try: + f.set_result( + subprocess.check_output( + *task_dict["args"], **task_dict["kwargs"] + ) + ) + except Exception as thread_exception: + future_queue.task_done() + f.set_exception(exception=thread_exception) + raise thread_exception + else: + future_queue.task_done() + else: + raise KeyError(task_dict) + + +class ShellStaticExecutor(ExecutorBase): + def __init__(self): + super().__init__() + self._process = RaisingThread( + target=execute_single_task, + kwargs={ + "future_queue": self._future_queue, + }, + ) + self._process.start() + + def submit(self, *args, **kwargs): + f = Future() + self._future_queue.put({"future": f, "args": args, "kwargs": kwargs}) + return f + + +class ShellExecutor(ExecutorBase): + def __init__( + self, + max_workers=1, + sleep_interval=0.1, + ): + super().__init__() + self._process = RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "sleep_interval": sleep_interval, + "executor_class": ShellStaticExecutor, + }, + ) + self._process.start() + + def submit(self, *args, **kwargs): + f = Future() + self._future_queue.put({"future": f, "args": args, "kwargs": kwargs}) + return f diff --git a/tests/test_shell.py b/tests/test_shell.py new file mode 100644 index 00000000..76113b7e --- /dev/null +++ b/tests/test_shell.py @@ -0,0 +1,35 @@ +from unittest import TestCase + +from pympipool.shell.executor import ShellStaticExecutor, ShellExecutor + + +class StaticExecutorTest(TestCase): + def test_shell_static(self): + with ShellStaticExecutor() as exe: + future = exe.submit(["echo", "test"], universal_newlines=True) + self.assertFalse(future.done()) + self.assertEqual("test\n", future.result()) + self.assertTrue(future.done()) + + def test_shell(self): + with ShellExecutor(max_workers=2) as exe: + f_1 = exe.submit(["echo", "test_1"], universal_newlines=True) + f_2 = exe.submit(["echo", "test_2"], universal_newlines=True) + f_3 = exe.submit(["echo", "test_3"], universal_newlines=True) + f_4 = exe.submit(["echo", "test_4"], universal_newlines=True) + self.assertFalse(f_1.done()) + self.assertFalse(f_2.done()) + self.assertFalse(f_3.done()) + self.assertFalse(f_4.done()) + self.assertEqual("test_1\n", f_1.result()) + self.assertEqual("test_2\n", f_2.result()) + self.assertTrue(f_1.done()) + self.assertTrue(f_2.done()) + self.assertFalse(f_3.done()) + self.assertFalse(f_4.done()) + self.assertEqual("test_3\n", f_3.result()) + self.assertEqual("test_4\n", f_4.result()) + self.assertTrue(f_1.done()) + self.assertTrue(f_2.done()) + self.assertTrue(f_3.done()) + self.assertTrue(f_4.done())