Skip to content

Commit

Permalink
Implement a Shell based Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Nov 21, 2023
1 parent 1daf09a commit 2c7e8c3
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def executor_broker(


def execute_task_dict(task_dict, meta_future_lst):
if "fn" in task_dict.keys():
if "fn" in task_dict.keys() or "future" in task_dict.keys():
meta_future = next(as_completed(meta_future_lst.keys()))
executor = meta_future_lst.pop(meta_future)
executor.future_queue.put(task_dict)
Expand Down
Empty file added pympipool/shell/__init__.py
Empty file.
75 changes: 75 additions & 0 deletions pympipool/shell/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from concurrent.futures import Future
import subprocess

from pympipool.shared.executorbase import (
executor_broker, ExecutorBase
)
from pympipool.shared.thread import RaisingThread


def execute_single_task(future_queue):
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
future_queue.task_done()
future_queue.join()
break
elif "future" in task_dict.keys():
f = task_dict.pop("future")
if f.set_running_or_notify_cancel():
try:
f.set_result(
subprocess.check_output(
*task_dict["args"], **task_dict["kwargs"]
)
)
except Exception as thread_exception:
future_queue.task_done()
f.set_exception(exception=thread_exception)
raise thread_exception
else:
future_queue.task_done()
else:
raise KeyError(task_dict)


class ShellStaticExecutor(ExecutorBase):
def __init__(self):
super().__init__()
self._process = RaisingThread(
target=execute_single_task,
kwargs={
"future_queue": self._future_queue,
},
)
self._process.start()

def submit(self, *args, **kwargs):
f = Future()
self._future_queue.put({"future": f, "args": args, "kwargs": kwargs})
return f


class ShellExecutor(ExecutorBase):
def __init__(
self,
max_workers=1,
sleep_interval=0.1,
):
super().__init__()
self._process = RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": ShellStaticExecutor,
},
)
self._process.start()

def submit(self, *args, **kwargs):
f = Future()
self._future_queue.put({"future": f, "args": args, "kwargs": kwargs})
return f
35 changes: 35 additions & 0 deletions tests/test_shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from unittest import TestCase

from pympipool.shell.executor import ShellStaticExecutor, ShellExecutor


class StaticExecutorTest(TestCase):
def test_shell_static(self):
with ShellStaticExecutor() as exe:
future = exe.submit(["echo", "test"], universal_newlines=True)
self.assertFalse(future.done())
self.assertEqual("test\n", future.result())
self.assertTrue(future.done())

def test_shell(self):
with ShellExecutor(max_workers=2) as exe:
f_1 = exe.submit(["echo", "test_1"], universal_newlines=True)
f_2 = exe.submit(["echo", "test_2"], universal_newlines=True)
f_3 = exe.submit(["echo", "test_3"], universal_newlines=True)
f_4 = exe.submit(["echo", "test_4"], universal_newlines=True)
self.assertFalse(f_1.done())
self.assertFalse(f_2.done())
self.assertFalse(f_3.done())
self.assertFalse(f_4.done())
self.assertEqual("test_1\n", f_1.result())
self.assertEqual("test_2\n", f_2.result())
self.assertTrue(f_1.done())
self.assertTrue(f_2.done())
self.assertFalse(f_3.done())
self.assertFalse(f_4.done())
self.assertEqual("test_3\n", f_3.result())
self.assertEqual("test_4\n", f_4.result())
self.assertTrue(f_1.done())
self.assertTrue(f_2.done())
self.assertTrue(f_3.done())
self.assertTrue(f_4.done())

0 comments on commit 2c7e8c3

Please sign in to comment.