From 9e28d8704af1a88e146e61fd62c0adafad26fa07 Mon Sep 17 00:00:00 2001 From: Andrzej Klajnert Date: Sun, 23 Jan 2022 15:12:29 +0100 Subject: [PATCH] Support stdin with asyncio (#71) --- changelog.d/feature.0f8582b8.entry.yaml | 5 ++++ pytest_subprocess/fake_popen.py | 31 ++++++++++++++++++--- tests/test_asyncio.py | 36 +++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 3 deletions(-) create mode 100644 changelog.d/feature.0f8582b8.entry.yaml diff --git a/changelog.d/feature.0f8582b8.entry.yaml b/changelog.d/feature.0f8582b8.entry.yaml new file mode 100644 index 0000000..a00c1b0 --- /dev/null +++ b/changelog.d/feature.0f8582b8.entry.yaml @@ -0,0 +1,5 @@ +message: Add support for stdin with asyncio. +pr_ids: +- '71' +timestamp: 1642946529 +type: feature diff --git a/pytest_subprocess/fake_popen.py b/pytest_subprocess/fake_popen.py index 64f2e89..c46141b 100644 --- a/pytest_subprocess/fake_popen.py +++ b/pytest_subprocess/fake_popen.py @@ -284,6 +284,9 @@ def run_thread(self) -> None: def _finish_process(self) -> None: self.returncode = self.__returncode + self._finalize_streams() + + def _finalize_streams(self) -> None: self._finalize_stream(self.stdout) self._finalize_stream(self.stderr) @@ -301,13 +304,21 @@ def received_signals(self) -> Tuple[int, ...]: class AsyncFakePopen(FakePopen): """Class to handle async processes""" - stdout: asyncio.StreamReader - stderr: asyncio.StreamReader + stdout: Optional[asyncio.StreamReader] + stderr: Optional[asyncio.StreamReader] async def communicate( # type: ignore self, input: OPTIONAL_TEXT = None, timeout: Optional[float] = None ) -> Tuple[AnyType, AnyType]: - self._handle_stdin(input) + if input: + # streams were fed with eof, need to be reopened + await self._reopen_streams() + + self._handle_stdin(input) + + # feed eof one more time as streams were opened + self._finalize_streams() + self._finalize_thread(timeout) return ( @@ -320,3 +331,17 @@ async def wait(self, timeout: Optional[float] = None) -> int: # type: ignore def _get_empty_buffer(self, _: bool) -> asyncio.StreamReader: return asyncio.StreamReader() + + async def _reopen_streams(self) -> None: + self.stdout = await self._reopen_stream(self.stdout) + self.stderr = await self._reopen_stream(self.stderr) + + async def _reopen_stream( + self, stream: Optional[asyncio.StreamReader] + ) -> Optional[asyncio.StreamReader]: + if stream: + data = await stream.read() + fresh_stream = self._get_empty_buffer(False) + fresh_stream.feed_data(data) + return fresh_stream + return None diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index b2ab88e..8ff9e62 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -275,6 +275,42 @@ async def _read_stream(stream: asyncio.StreamReader, output_list): output_list.append(line.decode()) +@pytest.mark.asyncio +@pytest.mark.parametrize("fake", [False, True]) +async def test_input(fake_process, fake): + fake_process.allow_unregistered(not fake) + if fake: + + def stdin_callable(input): + return { + "stdout": "Provide an input: Provided: {data}".format( + data=input.decode() + ) + } + + fake_process.register_subprocess( + ["python", "example_script.py", "input"], + stdout=[b"Stdout line 1", b"Stdout line 2"], + stdin_callable=stdin_callable, + ) + + process = await asyncio.create_subprocess_exec( + "python", + "example_script.py", + "input", + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + ) + out, err = await process.communicate(input=b"test") + + assert out.splitlines() == [ + b"Stdout line 1", + b"Stdout line 2", + b"Provide an input: Provided: test", + ] + assert err is None + + @pytest.fixture(autouse=True) def skip_on_pypy(): """Async test for some reason crash on pypy 3.6 on Windows"""