From 0d01686fe6be5c335f50098a06b7e847a2753452 Mon Sep 17 00:00:00 2001 From: Seb Aebischer <8686939+saebischer@users.noreply.github.com> Date: Sat, 6 Mar 2021 16:47:26 +0000 Subject: [PATCH 1/2] fix output when using echo_stdin echo calls to read1 pause echo when using hidden input prompt and getchar don't buffer reads to avoid echoing early --- CHANGES.rst | 2 ++ src/click/testing.py | 29 ++++++++++++++--- tests/test_testing.py | 75 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 4 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index bc5c9c1be..86942e555 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -208,6 +208,8 @@ Unreleased :issue:`1568` - The ``MultiCommand.resultcallback`` decorator is renamed to ``result_callback``. The old name is deprecated. :issue:`1160` +- Fix issues with ``CliRunner`` output when using ``echo_stdin=True``. + :issue:`1101` Version 7.1.2 diff --git a/src/click/testing.py b/src/click/testing.py index 3bbdbffe2..89e58da8e 100644 --- a/src/click/testing.py +++ b/src/click/testing.py @@ -16,17 +16,22 @@ class EchoingStdin: def __init__(self, input, output): self._input = input self._output = output + self.paused = False def __getattr__(self, x): return getattr(self._input, x) def _echo(self, rv): - self._output.write(rv) + if not self.paused: + self._output.write(rv) return rv def read(self, n=-1): return self._echo(self._input.read(n)) + def read1(self, n=-1): + return self._echo(self._input.read1(n)) + def readline(self, n=-1): return self._echo(self._input.readline(n)) @@ -204,10 +209,16 @@ def isolation(self, input=None, env=None, color=False): if self.echo_stdin: input = EchoingStdin(input, bytes_output) + echo_input = input sys.stdin = input = _NamedTextIOWrapper( input, encoding=self.charset, name="", mode="r" ) + if self.echo_stdin: + # Force unbuffered reads, otherwise the underlying EchoingStdin + # stream will echo a big chunk of input on the first read. + input._CHUNK_SIZE = 1 + sys.stdout = _NamedTextIOWrapper( bytes_output, encoding=self.charset, name="", mode="w" ) @@ -228,20 +239,30 @@ def isolation(self, input=None, env=None, color=False): def visible_input(prompt=None): sys.stdout.write(prompt or "") val = input.readline().rstrip("\r\n") - sys.stdout.write(f"{val}\n") + if not self.echo_stdin: + sys.stdout.write(f"{val}\n") sys.stdout.flush() return val def hidden_input(prompt=None): sys.stdout.write(f"{prompt or ''}\n") sys.stdout.flush() - return input.readline().rstrip("\r\n") + if self.echo_stdin: + echo_input.paused = True + val = input.readline().rstrip("\r\n") + if self.echo_stdin: + echo_input.paused = False + return val def _getchar(echo): + if not echo and self.echo_stdin: + echo_input.paused = True char = sys.stdin.read(1) - if echo: + if echo and not self.echo_stdin: sys.stdout.write(char) sys.stdout.flush() + elif not echo and self.echo_stdin: + echo_input.paused = False return char default_color = color diff --git a/tests/test_testing.py b/tests/test_testing.py index 1fe4caa04..5d31f2eb6 100644 --- a/tests/test_testing.py +++ b/tests/test_testing.py @@ -26,12 +26,68 @@ def test(): assert not result.exception assert result.output == "Hello World!\n" + +def test_echo_stdin_stream(): + @click.command() + def test(): + i = click.get_binary_stream("stdin") + o = click.get_binary_stream("stdout") + while 1: + chunk = i.read(4096) + if not chunk: + break + o.write(chunk) + o.flush() + runner = CliRunner(echo_stdin=True) result = runner.invoke(test, input="Hello World!\n") assert not result.exception assert result.output == "Hello World!\nHello World!\n" +def test_echo_stdin_prompts(): + @click.command() + def test_python_input(): + foo = input("Foo: ") + click.echo(f"foo={foo}") + + runner = CliRunner(echo_stdin=True) + result = runner.invoke(test_python_input, input="wau wau\n") + assert not result.exception + assert result.output == "Foo: wau wau\nfoo=wau wau\n" + + @click.command() + @click.option("--foo", prompt=True) + def test_prompt(foo): + click.echo(f"foo={foo}") + + runner = CliRunner(echo_stdin=True) + result = runner.invoke(test_prompt, input="wau wau\n") + assert not result.exception + assert result.output == "Foo: wau wau\nfoo=wau wau\n" + + @click.command() + @click.option("--foo", prompt=True, hide_input=True) + def test_hidden_prompt(foo): + click.echo(f"foo={foo}") + + runner = CliRunner(echo_stdin=True) + result = runner.invoke(test_hidden_prompt, input="wau wau\n") + assert not result.exception + assert result.output == "Foo: \nfoo=wau wau\n" + + @click.command() + @click.option("--foo", prompt=True) + @click.option("--bar", prompt=True) + def test_multiple_prompts(foo, bar): + click.echo(f"foo={foo}, bar={bar}") + + runner = CliRunner(echo_stdin=True) + result = runner.invoke(test_multiple_prompts, input="one\ntwo\n") + assert not result.exception + assert result.output == "Foo: one\nBar: two\nfoo=one, bar=two\n" + + def test_runner_with_stream(): @click.command() def test(): @@ -87,6 +143,25 @@ def continue_it(): assert not result.exception assert result.output == "y\n" + runner = CliRunner(echo_stdin=True) + result = runner.invoke(continue_it, input="y") + assert not result.exception + assert result.output == "y\n" + + @click.command() + def getchar_echo(): + click.echo(click.getchar(echo=True)) + + runner = CliRunner() + result = runner.invoke(getchar_echo, input="y") + assert not result.exception + assert result.output == "yy\n" + + runner = CliRunner(echo_stdin=True) + result = runner.invoke(getchar_echo, input="y") + assert not result.exception + assert result.output == "yy\n" + def test_catch_exceptions(): class CustomError(Exception): From f6f8976900d9c9d65e82d61477a626545c58f054 Mon Sep 17 00:00:00 2001 From: David Lord Date: Wed, 14 Apr 2021 07:49:32 -0700 Subject: [PATCH 2/2] use decorator to pause echo pause echo_stdin unconditionally, allowing functions to echo as normal this seems to work better with the readline "echo empty string" fix --- src/click/testing.py | 47 +++++++++++++++++++++++++------------------ tests/test_testing.py | 15 ++++++-------- 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/src/click/testing.py b/src/click/testing.py index 89e58da8e..637c46c67 100644 --- a/src/click/testing.py +++ b/src/click/testing.py @@ -16,14 +16,15 @@ class EchoingStdin: def __init__(self, input, output): self._input = input self._output = output - self.paused = False + self._paused = False def __getattr__(self, x): return getattr(self._input, x) def _echo(self, rv): - if not self.paused: + if not self._paused: self._output.write(rv) + return rv def read(self, n=-1): @@ -45,6 +46,16 @@ def __repr__(self): return repr(self._input) +@contextlib.contextmanager +def _pause_echo(stream): + if stream is None: + yield + else: + stream._paused = True + yield + stream._paused = False + + class _NamedTextIOWrapper(io.TextIOWrapper): def __init__(self, buffer, name=None, mode=None, **kwargs): super().__init__(buffer, **kwargs) @@ -196,6 +207,7 @@ def isolation(self, input=None, env=None, color=False): Added the ``color`` parameter. """ input = make_input_stream(input, self.charset) + echo_input = None old_stdin = sys.stdin old_stdout = sys.stdout @@ -208,15 +220,15 @@ def isolation(self, input=None, env=None, color=False): bytes_output = io.BytesIO() if self.echo_stdin: - input = EchoingStdin(input, bytes_output) - echo_input = input + input = echo_input = EchoingStdin(input, bytes_output) sys.stdin = input = _NamedTextIOWrapper( input, encoding=self.charset, name="", mode="r" ) + if self.echo_stdin: - # Force unbuffered reads, otherwise the underlying EchoingStdin - # stream will echo a big chunk of input on the first read. + # Force unbuffered reads, otherwise TextIOWrapper reads a + # large chunk which is echoed early. input._CHUNK_SIZE = 1 sys.stdout = _NamedTextIOWrapper( @@ -236,33 +248,28 @@ def isolation(self, input=None, env=None, color=False): errors="backslashreplace", ) + @_pause_echo(echo_input) def visible_input(prompt=None): sys.stdout.write(prompt or "") val = input.readline().rstrip("\r\n") - if not self.echo_stdin: - sys.stdout.write(f"{val}\n") + sys.stdout.write(f"{val}\n") sys.stdout.flush() return val + @_pause_echo(echo_input) def hidden_input(prompt=None): sys.stdout.write(f"{prompt or ''}\n") sys.stdout.flush() - if self.echo_stdin: - echo_input.paused = True - val = input.readline().rstrip("\r\n") - if self.echo_stdin: - echo_input.paused = False - return val + return input.readline().rstrip("\r\n") + @_pause_echo(echo_input) def _getchar(echo): - if not echo and self.echo_stdin: - echo_input.paused = True char = sys.stdin.read(1) - if echo and not self.echo_stdin: + + if echo: sys.stdout.write(char) - sys.stdout.flush() - elif not echo and self.echo_stdin: - echo_input.paused = False + + sys.stdout.flush() return char default_color = color diff --git a/tests/test_testing.py b/tests/test_testing.py index 5d31f2eb6..d23a4f231 100644 --- a/tests/test_testing.py +++ b/tests/test_testing.py @@ -52,29 +52,27 @@ def test_python_input(): click.echo(f"foo={foo}") runner = CliRunner(echo_stdin=True) - result = runner.invoke(test_python_input, input="wau wau\n") + result = runner.invoke(test_python_input, input="bar bar\n") assert not result.exception - assert result.output == "Foo: wau wau\nfoo=wau wau\n" + assert result.output == "Foo: bar bar\nfoo=bar bar\n" @click.command() @click.option("--foo", prompt=True) def test_prompt(foo): click.echo(f"foo={foo}") - runner = CliRunner(echo_stdin=True) - result = runner.invoke(test_prompt, input="wau wau\n") + result = runner.invoke(test_prompt, input="bar bar\n") assert not result.exception - assert result.output == "Foo: wau wau\nfoo=wau wau\n" + assert result.output == "Foo: bar bar\nfoo=bar bar\n" @click.command() @click.option("--foo", prompt=True, hide_input=True) def test_hidden_prompt(foo): click.echo(f"foo={foo}") - runner = CliRunner(echo_stdin=True) - result = runner.invoke(test_hidden_prompt, input="wau wau\n") + result = runner.invoke(test_hidden_prompt, input="bar bar\n") assert not result.exception - assert result.output == "Foo: \nfoo=wau wau\n" + assert result.output == "Foo: \nfoo=bar bar\n" @click.command() @click.option("--foo", prompt=True) @@ -82,7 +80,6 @@ def test_hidden_prompt(foo): def test_multiple_prompts(foo, bar): click.echo(f"foo={foo}, bar={bar}") - runner = CliRunner(echo_stdin=True) result = runner.invoke(test_multiple_prompts, input="one\ntwo\n") assert not result.exception assert result.output == "Foo: one\nBar: two\nfoo=one, bar=two\n"