-
Notifications
You must be signed in to change notification settings - Fork 41
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* Moved the code to utils.py: click_repl/__init__.py Splitted code into functions to test : click_repl/utils.py modified: tox.ini * Moved tests to its appropriate test files: tests/test_argument.py Added some more tests: tests/test_basic.py Added some more tests: tests/test_command_collection.py Added some tests: tests/test_complete.py Added some dev functions only tests: tests/test_dev/test_get_internal_cmds.py Added some dev functions only tests: tests/test_dev/test_internal_cmds.py Added some dev functions only tests: tests/test_dev/test_register_internal_cmds.py Added some dev functions only tests: tests/test_dev/test_sys_cmds.py Added some more tests: tests/test_repl.py * Formatted by black: click_repl/__init__.py Formatted by black: click_repl/utils.py Removed mypy config: setup.cfg Formatted by black:: tests/test_command_collection.py * Removed an unnecessary link from the readme * Added long description attribute * Updated README Replaced the link to my repo to the click_repl's main repo * Changed tests badge url to master repo
- Loading branch information
1 parent
19148c9
commit a57b650
Showing
13 changed files
with
830 additions
and
371 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,318 +1,11 @@ | ||
from collections import defaultdict | ||
from prompt_toolkit.completion import Completer, Completion | ||
from prompt_toolkit.history import InMemoryHistory | ||
from prompt_toolkit import PromptSession | ||
import click | ||
import click.parser | ||
import os | ||
import shlex | ||
import sys | ||
from .exceptions import InternalCommandException, ExitReplException # noqa | ||
|
||
# Handle backwards compatibility between Click 7.0 and 8.0 | ||
try: | ||
import click.shell_completion | ||
|
||
HAS_C8 = True | ||
except ImportError: | ||
import click._bashcomplete | ||
|
||
HAS_C8 = False | ||
|
||
# Handle click.exceptions.Exit introduced in Click 7.0 | ||
try: | ||
from click.exceptions import Exit as ClickExit | ||
except ImportError: | ||
|
||
class ClickExit(RuntimeError): | ||
pass | ||
|
||
|
||
PY2 = sys.version_info[0] == 2 | ||
|
||
if PY2: | ||
text_type = unicode # noqa | ||
else: | ||
text_type = str # noqa | ||
|
||
from .exceptions import ExitReplException as ExitReplException # noqa | ||
from .exceptions import InternalCommandException as InternalCommandException # noqa | ||
from .utils import ClickCompleter as ClickCompleter # noqa | ||
from .utils import bootstrap_prompt as bootstrap_prompt # noqa | ||
from .utils import dispatch_repl_commands as dispatch_repl_commands # noqa | ||
from .utils import exit as exit # noqa | ||
from .utils import handle_internal_commands as handle_internal_commands # noqa | ||
from .utils import register_repl as register_repl # noqa | ||
from .utils import repl as repl # noqa | ||
|
||
__version__ = "0.2.0" | ||
|
||
_internal_commands = dict() | ||
|
||
|
||
def _register_internal_command(names, target, description=None): | ||
if not hasattr(target, "__call__"): | ||
raise ValueError("Internal command must be a callable") | ||
|
||
if isinstance(names, str): | ||
names = [names] | ||
elif not isinstance(names, (list, tuple)): | ||
raise ValueError('"names" must be a string or a list / tuple') | ||
|
||
for name in names: | ||
_internal_commands[name] = (target, description) | ||
|
||
|
||
def _get_registered_target(name, default=None): | ||
target_info = _internal_commands.get(name) | ||
if target_info: | ||
return target_info[0] | ||
return default | ||
|
||
|
||
def _exit_internal(): | ||
raise ExitReplException() | ||
|
||
|
||
def _help_internal(): | ||
formatter = click.HelpFormatter() | ||
formatter.write_heading("REPL help") | ||
formatter.indent() | ||
with formatter.section("External Commands"): | ||
formatter.write_text('prefix external commands with "!"') | ||
with formatter.section("Internal Commands"): | ||
formatter.write_text('prefix internal commands with ":"') | ||
info_table = defaultdict(list) | ||
for mnemonic, target_info in _internal_commands.items(): | ||
info_table[target_info[1]].append(mnemonic) | ||
formatter.write_dl( | ||
( | ||
", ".join((":{0}".format(mnemonic) for mnemonic in sorted(mnemonics))), | ||
description, | ||
) | ||
for description, mnemonics in info_table.items() | ||
) | ||
return formatter.getvalue() | ||
|
||
|
||
_register_internal_command(["q", "quit", "exit"], _exit_internal, "exits the repl") | ||
_register_internal_command( | ||
["?", "h", "help"], _help_internal, "displays general help information" | ||
) | ||
|
||
|
||
class ClickCompleter(Completer): | ||
def __init__(self, cli): | ||
self.cli = cli | ||
|
||
def get_completions(self, document, complete_event=None): | ||
# Code analogous to click._bashcomplete.do_complete | ||
|
||
try: | ||
args = shlex.split(document.text_before_cursor) | ||
except ValueError: | ||
# Invalid command, perhaps caused by missing closing quotation. | ||
return | ||
|
||
cursor_within_command = ( | ||
document.text_before_cursor.rstrip() == document.text_before_cursor | ||
) | ||
|
||
if args and cursor_within_command: | ||
# We've entered some text and no space, give completions for the | ||
# current word. | ||
incomplete = args.pop() | ||
else: | ||
# We've not entered anything, either at all or for the current | ||
# command, so give all relevant completions for this context. | ||
incomplete = "" | ||
# Resolve context based on click version | ||
if HAS_C8: | ||
ctx = click.shell_completion._resolve_context(self.cli, {}, "", args) | ||
else: | ||
ctx = click._bashcomplete.resolve_ctx(self.cli, "", args) | ||
|
||
if ctx is None: | ||
return | ||
|
||
choices = [] | ||
for param in ctx.command.params: | ||
if isinstance(param, click.Option): | ||
if getattr(param, "hidden", False): | ||
continue | ||
|
||
for options in (param.opts, param.secondary_opts): | ||
for o in options: | ||
choices.append( | ||
Completion( | ||
text_type(o), -len(incomplete), display_meta=param.help | ||
) | ||
) | ||
elif isinstance(param, click.Argument): | ||
if isinstance(param.type, click.Choice): | ||
for choice in param.type.choices: | ||
choices.append(Completion(text_type(choice), -len(incomplete))) | ||
|
||
if isinstance(ctx.command, click.MultiCommand): | ||
for name in ctx.command.list_commands(ctx): | ||
command = ctx.command.get_command(ctx, name) | ||
if getattr(command, "hidden", False): | ||
continue | ||
|
||
choices.append( | ||
Completion( | ||
text_type(name), | ||
-len(incomplete), | ||
display_meta=getattr(command, "short_help"), | ||
) | ||
) | ||
|
||
for item in choices: | ||
if item.text.startswith(incomplete): | ||
yield item | ||
|
||
|
||
def bootstrap_prompt(prompt_kwargs, group): | ||
""" | ||
Bootstrap prompt_toolkit kwargs or use user defined values. | ||
:param prompt_kwargs: The user specified prompt kwargs. | ||
""" | ||
prompt_kwargs = prompt_kwargs or {} | ||
|
||
defaults = { | ||
"history": InMemoryHistory(), | ||
"completer": ClickCompleter(group), | ||
"message": "> ", | ||
} | ||
|
||
for key in defaults: | ||
default_value = defaults[key] | ||
if key not in prompt_kwargs: | ||
prompt_kwargs[key] = default_value | ||
|
||
return prompt_kwargs | ||
|
||
|
||
def repl( # noqa: C901 | ||
old_ctx, | ||
prompt_kwargs=None, | ||
allow_system_commands=True, | ||
allow_internal_commands=True, | ||
): | ||
""" | ||
Start an interactive shell. All subcommands are available in it. | ||
:param old_ctx: The current Click context. | ||
:param prompt_kwargs: Parameters passed to | ||
:py:func:`prompt_toolkit.PromptSession`. | ||
If stdin is not a TTY, no prompt will be printed, but only commands read | ||
from stdin. | ||
""" | ||
# parent should be available, but we're not going to bother if not | ||
group_ctx = old_ctx.parent or old_ctx | ||
group = group_ctx.command | ||
isatty = sys.stdin.isatty() | ||
|
||
# Delete the REPL command from those available, as we don't want to allow | ||
# nesting REPLs (note: pass `None` to `pop` as we don't want to error if | ||
# REPL command already not present for some reason). | ||
repl_command_name = old_ctx.command.name | ||
if isinstance(group_ctx.command, click.CommandCollection): | ||
available_commands = { | ||
cmd_name: cmd_obj | ||
for source in group_ctx.command.sources | ||
for cmd_name, cmd_obj in source.commands.items() | ||
} | ||
else: | ||
available_commands = group_ctx.command.commands | ||
available_commands.pop(repl_command_name, None) | ||
|
||
prompt_kwargs = bootstrap_prompt(prompt_kwargs, group) | ||
session = PromptSession(**prompt_kwargs) | ||
|
||
if isatty: | ||
|
||
def get_command(): | ||
return session.prompt() | ||
|
||
else: | ||
get_command = sys.stdin.readline | ||
|
||
while True: | ||
try: | ||
command = get_command() | ||
except KeyboardInterrupt: | ||
continue | ||
except EOFError: | ||
break | ||
|
||
if not command: | ||
if isatty: | ||
continue | ||
else: | ||
break | ||
|
||
if allow_system_commands and dispatch_repl_commands(command): | ||
continue | ||
|
||
if allow_internal_commands: | ||
try: | ||
result = handle_internal_commands(command) | ||
if isinstance(result, str): | ||
click.echo(result) | ||
continue | ||
except ExitReplException: | ||
break | ||
|
||
try: | ||
args = shlex.split(command) | ||
except ValueError as e: | ||
click.echo("{}: {}".format(type(e).__name__, e)) | ||
continue | ||
|
||
try: | ||
# default_map passes the top-level params to the new group to | ||
# support top-level required params that would reject the | ||
# invocation if missing. | ||
with group.make_context( | ||
None, args, parent=group_ctx, default_map=old_ctx.params | ||
) as ctx: | ||
group.invoke(ctx) | ||
ctx.exit() | ||
except click.ClickException as e: | ||
e.show() | ||
except ClickExit: | ||
pass | ||
except SystemExit: | ||
pass | ||
except ExitReplException: | ||
break | ||
|
||
|
||
def register_repl(group, name="repl"): | ||
"""Register :func:`repl()` as sub-command *name* of *group*.""" | ||
group.command(name=name)(click.pass_context(repl)) | ||
|
||
|
||
def exit(): | ||
"""Exit the repl""" | ||
_exit_internal() | ||
|
||
|
||
def dispatch_repl_commands(command): | ||
"""Execute system commands entered in the repl. | ||
System commands are all commands starting with "!". | ||
""" | ||
if command.startswith("!"): | ||
os.system(command[1:]) | ||
return True | ||
|
||
return False | ||
|
||
|
||
def handle_internal_commands(command): | ||
"""Run repl-internal commands. | ||
Repl-internal commands are all commands starting with ":". | ||
""" | ||
if command.startswith(":"): | ||
target = _get_registered_target(command[1:], default=None) | ||
if target: | ||
return target() |
Oops, something went wrong.