Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with CliRunner's echo_stdin #1820

Merged
merged 2 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ Unreleased
:issue:`1568`
- The ``MultiCommand.resultcallback`` decorator is renamed to
``result_callback``. The old name is deprecated. :issue:`1160`
- Fix issues with ``CliRunner`` output when using ``echo_stdin=True``.
:issue:`1101`


Version 7.1.2
Expand Down
34 changes: 31 additions & 3 deletions src/click/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@ class EchoingStdin:
def __init__(self, input, output):
self._input = input
self._output = output
self._paused = False

def __getattr__(self, x):
return getattr(self._input, x)

def _echo(self, rv):
self._output.write(rv)
if not self._paused:
self._output.write(rv)

return rv

def read(self, n=-1):
return self._echo(self._input.read(n))

def read1(self, n=-1):
return self._echo(self._input.read1(n))

def readline(self, n=-1):
return self._echo(self._input.readline(n))

Expand All @@ -40,6 +46,16 @@ def __repr__(self):
return repr(self._input)


@contextlib.contextmanager
def _pause_echo(stream):
if stream is None:
yield
else:
stream._paused = True
yield
stream._paused = False


class _NamedTextIOWrapper(io.TextIOWrapper):
def __init__(self, buffer, name=None, mode=None, **kwargs):
super().__init__(buffer, **kwargs)
Expand Down Expand Up @@ -191,6 +207,7 @@ def isolation(self, input=None, env=None, color=False):
Added the ``color`` parameter.
"""
input = make_input_stream(input, self.charset)
echo_input = None

old_stdin = sys.stdin
old_stdout = sys.stdout
Expand All @@ -203,11 +220,17 @@ def isolation(self, input=None, env=None, color=False):
bytes_output = io.BytesIO()

if self.echo_stdin:
input = EchoingStdin(input, bytes_output)
input = echo_input = EchoingStdin(input, bytes_output)

sys.stdin = input = _NamedTextIOWrapper(
input, encoding=self.charset, name="<stdin>", mode="r"
)

if self.echo_stdin:
# Force unbuffered reads, otherwise TextIOWrapper reads a
# large chunk which is echoed early.
input._CHUNK_SIZE = 1

sys.stdout = _NamedTextIOWrapper(
bytes_output, encoding=self.charset, name="<stdout>", mode="w"
)
Expand All @@ -225,23 +248,28 @@ def isolation(self, input=None, env=None, color=False):
errors="backslashreplace",
)

@_pause_echo(echo_input)
def visible_input(prompt=None):
sys.stdout.write(prompt or "")
val = input.readline().rstrip("\r\n")
sys.stdout.write(f"{val}\n")
sys.stdout.flush()
return val

@_pause_echo(echo_input)
def hidden_input(prompt=None):
sys.stdout.write(f"{prompt or ''}\n")
sys.stdout.flush()
return input.readline().rstrip("\r\n")

@_pause_echo(echo_input)
def _getchar(echo):
char = sys.stdin.read(1)

if echo:
sys.stdout.write(char)
sys.stdout.flush()

sys.stdout.flush()
return char

default_color = color
Expand Down
72 changes: 72 additions & 0 deletions tests/test_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,65 @@ def test():
assert not result.exception
assert result.output == "Hello World!\n"


def test_echo_stdin_stream():
@click.command()
def test():
i = click.get_binary_stream("stdin")
o = click.get_binary_stream("stdout")
while 1:
chunk = i.read(4096)
if not chunk:
break
o.write(chunk)
o.flush()

runner = CliRunner(echo_stdin=True)
result = runner.invoke(test, input="Hello World!\n")
assert not result.exception
assert result.output == "Hello World!\nHello World!\n"


def test_echo_stdin_prompts():
@click.command()
def test_python_input():
foo = input("Foo: ")
click.echo(f"foo={foo}")

runner = CliRunner(echo_stdin=True)
result = runner.invoke(test_python_input, input="bar bar\n")
assert not result.exception
assert result.output == "Foo: bar bar\nfoo=bar bar\n"

@click.command()
@click.option("--foo", prompt=True)
def test_prompt(foo):
click.echo(f"foo={foo}")

result = runner.invoke(test_prompt, input="bar bar\n")
assert not result.exception
assert result.output == "Foo: bar bar\nfoo=bar bar\n"

@click.command()
@click.option("--foo", prompt=True, hide_input=True)
def test_hidden_prompt(foo):
click.echo(f"foo={foo}")

result = runner.invoke(test_hidden_prompt, input="bar bar\n")
assert not result.exception
assert result.output == "Foo: \nfoo=bar bar\n"

@click.command()
@click.option("--foo", prompt=True)
@click.option("--bar", prompt=True)
def test_multiple_prompts(foo, bar):
click.echo(f"foo={foo}, bar={bar}")

result = runner.invoke(test_multiple_prompts, input="one\ntwo\n")
assert not result.exception
assert result.output == "Foo: one\nBar: two\nfoo=one, bar=two\n"


def test_runner_with_stream():
@click.command()
def test():
Expand Down Expand Up @@ -87,6 +140,25 @@ def continue_it():
assert not result.exception
assert result.output == "y\n"

runner = CliRunner(echo_stdin=True)
result = runner.invoke(continue_it, input="y")
assert not result.exception
assert result.output == "y\n"

@click.command()
def getchar_echo():
click.echo(click.getchar(echo=True))

runner = CliRunner()
result = runner.invoke(getchar_echo, input="y")
assert not result.exception
assert result.output == "yy\n"

runner = CliRunner(echo_stdin=True)
result = runner.invoke(getchar_echo, input="y")
assert not result.exception
assert result.output == "yy\n"


def test_catch_exceptions():
class CustomError(Exception):
Expand Down