diff --git a/pympipool/shell/__init__.py b/pympipool/shell/__init__.py deleted file mode 100644 index 3086c26f..00000000 --- a/pympipool/shell/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from pympipool.shell.executor import SubprocessExecutor -from pympipool.shell.interactive import ShellExecutor diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py deleted file mode 100644 index 92e4803a..00000000 --- a/pympipool/shell/executor.py +++ /dev/null @@ -1,89 +0,0 @@ -import queue -from concurrent.futures import Future -import subprocess - -from pympipool.shared.executorbase import ExecutorBroker -from pympipool.shared.thread import RaisingThread - - -def execute_single_task(future_queue: queue.Queue): - """ - Process items received via the queue. - - Args: - future_queue (queue.Queue): - """ - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - future_queue.task_done() - future_queue.join() - break - elif "future" in task_dict.keys(): - f = task_dict.pop("future") - if f.set_running_or_notify_cancel(): - try: - f.set_result( - subprocess.check_output( - *task_dict["args"], **task_dict["kwargs"] - ) - ) - except Exception as thread_exception: - future_queue.task_done() - f.set_exception(exception=thread_exception) - raise thread_exception - else: - future_queue.task_done() - else: - raise KeyError(task_dict) - - -class SubprocessExecutor(ExecutorBroker): - """ - The pympipool.shell.SubprocessExecutor enables the submission of command line calls via the subprocess.check_output() - interface of the python standard library. It is based on the concurrent.futures.Executor class and returns a - concurrent.futures.Future object for every submitted command line call. Still it does not provide any option to - interact with the external executable during the execution. - - Args: - max_workers (int): defines the number workers which can execute functions in parallel - - Examples: - - >>> from pympipool import SubprocessExecutor - >>> with SubprocessExecutor(max_workers=2) as exe: - >>> future = exe.submit(["echo", "test"], universal_newlines=True) - >>> print(future.done(), future.result(), future.done()) - (False, "test", True) - - """ - - def __init__( - self, - max_workers: int = 1, - ): - super().__init__() - self._set_process( - process=[ - RaisingThread( - target=execute_single_task, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - }, - ) - for _ in range(max_workers) - ], - ) - - def submit(self, *args, **kwargs): - """ - Submit a command line call to be executed. The given arguments are provided to subprocess.Popen() as additional - inputs to control the execution. - - Returns: - A Future representing the given call. - """ - f = Future() - self._future_queue.put({"future": f, "args": args, "kwargs": kwargs}) - return f diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py deleted file mode 100644 index 1a3dc6c4..00000000 --- a/pympipool/shell/interactive.py +++ /dev/null @@ -1,181 +0,0 @@ -import queue -import threading -from typing import Optional -from concurrent.futures import Future -import subprocess -from time import sleep - -from pympipool.shared.executorbase import cancel_items_in_queue, ExecutorBase -from pympipool.shared.thread import RaisingThread - - -def wait_for_process_to_stop(process: threading.Thread, sleep_interval: float = 10e-10): - """ - Wait for the subprocess.Popen() process to stop executing - - Args: - process (subprocess.Popen): process object - sleep_interval (float): interval to sleep during poll() calls - """ - while process.poll() is None: - sleep(sleep_interval) - - -def execute_single_task(future_queue: queue.Queue): - """ - Process items received via the queue. - - Args: - future_queue (queue.Queue): - """ - process = None - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - if process is not None and process.poll() is None: - process.stdin.flush() - process.stdin.close() - process.stdout.close() - process.stderr.close() - process.terminate() - wait_for_process_to_stop(process=process) - future_queue.task_done() - # future_queue.join() - break - elif "init" in task_dict.keys() and task_dict["init"]: - process = subprocess.Popen( - *task_dict["args"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **task_dict["kwargs"], - ) - elif "future" in task_dict.keys(): - if process is None: - raise ValueError("process not initialized") - elif process.poll() is None: - f = task_dict.pop("future") - if f.set_running_or_notify_cancel(): - try: - process.stdin.write(task_dict["input"]) - process.stdin.flush() - lines_count = 0 - output = "" - while True: - output_current = process.stdout.readline() - output += output_current - lines_count += 1 - if ( - task_dict["stop_read_pattern"] is not None - and task_dict["stop_read_pattern"] in output_current - ): - break - elif ( - task_dict["lines_to_read"] is not None - and task_dict["lines_to_read"] == lines_count - ): - break - f.set_result(output) - except Exception as thread_exception: - future_queue.task_done() - f.set_exception(exception=thread_exception) - raise thread_exception - else: - future_queue.task_done() - else: - raise ValueError("process exited") - - -class ShellExecutor(ExecutorBase): - """ - In contrast to the other pympipool.shell.SubprocessExecutor and the pympipool.Executor the pympipool.shell.ShellExecutor - can only execute a single process at a given time. Still it adds the capability to interact with this process during - its execution. The initialization of the pympipool.shell.ShellExecutor takes the same input arguments as the - subprocess.Popen() call for the standard library to start a subprocess. - - Examples - - >>> from pympipool import ShellExecutor - >>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe: - >>> future_lines = exe.submit(string_input="4", lines_to_read=5) - >>> print(future_lines.done(), future_lines.result(), future_lines.done()) - (False, "0\n1\n2\n3\ndone\n", True) - - >>> from pympipool import ShellExecutor - >>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe: - >>> future_pattern = exe.submit(string_input="4", stop_read_pattern="done") - >>> print(future_pattern.done(), future_pattern.result(), future_pattern.done()) - (False, "0\n1\n2\n3\ndone\n", True) - """ - - def __init__(self, *args, **kwargs): - super().__init__() - self._set_process( - process=RaisingThread( - target=execute_single_task, - kwargs={ - "future_queue": self._future_queue, - }, - ), - ) - self._future_queue.put({"init": True, "args": args, "kwargs": kwargs}) - - def submit( - self, - string_input: str, - lines_to_read: Optional[int] = None, - stop_read_pattern: Optional[str] = None, - ): - """ - Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure - to identify the completion of the execution. This can either be provided based on the number of lines to read - using the `lines_to_read` parameter or by providing a string pattern using the `stop_read_pattern` to stop - reading new lines. One of these two stopping criteria has to be defined. - - Args: - string_input (str): Input to be communicated to the underlying executable - lines_to_read (None/int): integer number of lines to read from the command line (optional) - stop_read_pattern (None/str): string pattern to indicate the command line output is completed (optional) - - Returns: - A Future representing the given call. - """ - if lines_to_read is None and stop_read_pattern is None: - raise ValueError( - "Either the number of lines_to_read (int) or the stop_read_pattern (str) has to be defined." - ) - if string_input[-1:] != "\n": - string_input += "\n" - f = Future() - self._future_queue.put( - { - "future": f, - "input": string_input, - "lines_to_read": lines_to_read, - "stop_read_pattern": stop_read_pattern, - } - ) - return f - - def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - parallel_executors have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. - """ - if cancel_futures: - cancel_items_in_queue(que=self._future_queue) - self._future_queue.put({"shutdown": True, "wait": wait}) - if wait: - self._process.join() - # self._future_queue.join() - self._process = None - self._future_queue = None diff --git a/tests/test_shell_executor.py b/tests/test_shell_executor.py deleted file mode 100644 index d2b8c255..00000000 --- a/tests/test_shell_executor.py +++ /dev/null @@ -1,86 +0,0 @@ -from concurrent.futures import Future -import queue -import unittest - -from pympipool.shell.executor import SubprocessExecutor, execute_single_task - - -class SubprocessExecutorTest(unittest.TestCase): - def test_execute_single_task(self): - test_queue = queue.Queue() - f = Future() - test_queue.put( - { - "future": f, - "args": [["echo", "test"]], - "kwargs": {"universal_newlines": True}, - } - ) - test_queue.put({"shutdown": True}) - self.assertFalse(f.done()) - execute_single_task(future_queue=test_queue) - self.assertTrue(f.done()) - self.assertEqual("test\n", f.result()) - - def test_wrong_error(self): - test_queue = queue.Queue() - test_queue.put({"wrong_key": True}) - with self.assertRaises(KeyError): - execute_single_task(future_queue=test_queue) - - def test_broken_executable(self): - test_queue = queue.Queue() - f = Future() - test_queue.put( - { - "future": f, - "args": [["/executable/does/not/exist"]], - "kwargs": {"universal_newlines": True}, - } - ) - with self.assertRaises(FileNotFoundError): - execute_single_task(future_queue=test_queue) - - def test_shell_static_executor_args(self): - with SubprocessExecutor(max_workers=1) as exe: - future = exe.submit(["echo", "test"], universal_newlines=True, shell=False) - self.assertFalse(future.done()) - self.assertEqual("test\n", future.result()) - self.assertTrue(future.done()) - - def test_shell_static_executor_binary(self): - with SubprocessExecutor(max_workers=1) as exe: - future = exe.submit(["echo", "test"], universal_newlines=False, shell=False) - self.assertFalse(future.done()) - self.assertEqual(b"test\n", future.result()) - self.assertTrue(future.done()) - - def test_shell_static_executor_shell(self): - with SubprocessExecutor(max_workers=1) as exe: - future = exe.submit("echo test", universal_newlines=True, shell=True) - self.assertFalse(future.done()) - self.assertEqual("test\n", future.result()) - self.assertTrue(future.done()) - - def test_shell_executor(self): - with SubprocessExecutor(max_workers=2) as exe: - f_1 = exe.submit(["echo", "test_1"], universal_newlines=True) - f_2 = exe.submit(["echo", "test_2"], universal_newlines=True) - f_3 = exe.submit(["echo", "test_3"], universal_newlines=True) - f_4 = exe.submit(["echo", "test_4"], universal_newlines=True) - self.assertFalse(f_1.done()) - self.assertFalse(f_2.done()) - self.assertFalse(f_3.done()) - self.assertFalse(f_4.done()) - self.assertEqual("test_1\n", f_1.result()) - self.assertEqual("test_2\n", f_2.result()) - self.assertTrue(f_1.done()) - self.assertTrue(f_2.done()) - self.assertFalse(f_3.done()) - self.assertFalse(f_4.done()) - self.assertEqual("test_3\n", f_3.result()) - self.assertEqual("test_4\n", f_4.result()) - self.assertTrue(f_1.done()) - self.assertTrue(f_2.done()) - self.assertTrue(f_3.done()) - self.assertTrue(f_4.done()) diff --git a/tests/test_shell_interactive.py b/tests/test_shell_interactive.py deleted file mode 100644 index 7eee1ba0..00000000 --- a/tests/test_shell_interactive.py +++ /dev/null @@ -1,70 +0,0 @@ -from concurrent.futures import Future -import os -import queue -import unittest - -from pympipool.shell.interactive import ShellExecutor, execute_single_task - - -class ShellInteractiveExecutorTest(unittest.TestCase): - def setUp(self): - self.executable_path = os.path.join( - os.path.dirname(__file__), "executables", "count.py" - ) - - def test_execute_single_task(self): - test_queue = queue.Queue() - future_lines = Future() - future_pattern = Future() - test_queue.put( - { - "init": True, - "args": [["python", self.executable_path]], - "kwargs": {"universal_newlines": True}, - } - ) - test_queue.put( - { - "future": future_lines, - "input": "4\n", - "lines_to_read": 5, - "stop_read_pattern": None, - } - ) - test_queue.put( - { - "future": future_pattern, - "input": "4\n", - "lines_to_read": None, - "stop_read_pattern": "done", - } - ) - test_queue.put({"shutdown": True}) - self.assertFalse(future_lines.done()) - self.assertFalse(future_pattern.done()) - execute_single_task(future_queue=test_queue) - self.assertTrue(future_lines.done()) - self.assertTrue(future_pattern.done()) - self.assertEqual("0\n1\n2\n3\ndone\n", future_lines.result()) - self.assertEqual("0\n1\n2\n3\ndone\n", future_pattern.result()) - - def test_shell_interactive_executor(self): - with ShellExecutor( - ["python", self.executable_path], universal_newlines=True - ) as exe: - future_lines = exe.submit( - string_input="4", lines_to_read=5, stop_read_pattern=None - ) - future_pattern = exe.submit( - string_input="4", lines_to_read=None, stop_read_pattern="done" - ) - self.assertFalse(future_lines.done()) - self.assertFalse(future_pattern.done()) - self.assertEqual("0\n1\n2\n3\ndone\n", future_lines.result()) - self.assertEqual("0\n1\n2\n3\ndone\n", future_pattern.result()) - self.assertTrue(future_lines.done()) - self.assertTrue(future_pattern.done()) - - def test_meta(self): - with ShellExecutor(["sleep"]) as exe: - self.assertEqual(exe.info, {})