From 3122f4605c68c9466f061bcb0a535d98a73758e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Tue, 10 Oct 2023 10:50:21 +0200 Subject: [PATCH] feat: added flag `--(non-)interactive`, refactored main loop --- gptme/cli.py | 99 +++++++++++++++++++++++++++------------------ gptme/logmanager.py | 27 ++++++++++--- tests/test_cli.py | 13 ++---- 3 files changed, 84 insertions(+), 55 deletions(-) diff --git a/gptme/cli.py b/gptme/cli.py index 7eb41479..6127741d 100644 --- a/gptme/cli.py +++ b/gptme/cli.py @@ -225,6 +225,12 @@ def handle_cmd( @click.option( "-y", "--no-confirm", is_flag=True, help="Skips all confirmation prompts." ) +@click.option( + "--interactive/--non-interactive", + "-i/-n", + default=True, + help="Choose interactive mode, or not. Non-interactive implies --no-confirm, and is used in testing.", +) @click.option( "--show-hidden", is_flag=True, @@ -240,6 +246,7 @@ def main( verbose: bool, no_confirm: bool, show_hidden: bool, + interactive: bool, ): # log init logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) @@ -250,6 +257,12 @@ def main( _load_readline_history() init_llm(llm) # set up API_KEY and API_BASE + if "PYTEST_CURRENT_TEST" in os.environ: + interactive = False + + if not interactive: + no_confirm = True + if no_confirm: logger.warning("Skipping all confirmation prompts.") @@ -258,8 +271,8 @@ def main( else: promptmsgs = [Message("system", prompt_system)] - is_interactive = not prompts and sys.stdin.isatty() - if not is_interactive: + # if stdin is not a tty, we're getting piped input + if not sys.stdin.isatty(): # fetch prompt from stdin prompt_stdin = _read_stdin() if prompt_stdin: @@ -269,7 +282,7 @@ def main( sys.stdin.close() sys.stdin = open("/dev/tty") - logfile = get_logfile(name, interactive=is_interactive) + logfile = get_logfile(name, interactive=not prompts and interactive) print(f"Using logdir {logfile.parent}") logmanager = LogManager.load( logfile, initial_msgs=promptmsgs, show_hidden=show_hidden @@ -279,72 +292,78 @@ def main( logmanager.print() print("--- ^^^ past messages ^^^ ---") + # check if any prompt is a full path, if so, replace it with the contents of that file + prompts = [ + f"```{p}\n{Path(p).expanduser().read_text()}\n```" + if Path(p).expanduser().exists() + else p + for p in prompts + ] + cli_prompted = bool(prompts) + # main loop - for msg in loop(prompts, logmanager, no_confirm, model, llm): + ctx = loop(logmanager, no_confirm, model, llm) + while True: + # if prompts given on cli: + # - insert prompt into logmanager + # - if a prompt is `-`, wait for reply before sending next prompt + # - set cli_prompted + while prompts: + if prompts[0] == "-": + prompts.pop(0) + break + logmanager.append(Message("user", prompts.pop(0))) + + msg = next(ctx) logmanager.append(msg) + # if prompts have been ran and is non-interactive, exit + # this is used in testing + if cli_prompted and not prompts and not interactive: + logger.info("Command triggered and not in TTY, exiting") + exit(0) + def loop( - prompts: list[str], - logmanager: LogManager, + log: LogManager, no_confirm: bool, model: ModelChoice, llm: LLMChoice, stream: bool = True, ) -> Generator[Message, None, None]: - log = logmanager.log - # if last message was from assistant, try to run tools again if log[-1].role == "assistant": yield from execute_msg(log[-1], ask=not no_confirm) - command_triggered = False while True: - prompt = None - if prompts: - prompt = prompts[0] - prompts = prompts[1:] - - # if prompts have been ran and is non-interactive, exit - if command_triggered and not sys.stdin.isatty(): - logger.info("Command triggered and not in TTY, exiting") - break + # execute user command + if log[-1].role == "user": + inquiry = log[-1].content + # if message starts with ., treat as command + # when command has been run, + if inquiry.startswith(".") or inquiry.startswith("$"): + yield from handle_cmd(inquiry, log, no_confirm=no_confirm) + # we need to re-assign `log` here since it may be replaced by `handle_cmd` + # FIXME: this is pretty bad hack to get things working, needs to be refactored + if inquiry != ".continue": + continue # If last message was a response, ask for input. # If last message was from the user (such as from crash/edited log), # then skip asking for input and generate response last_msg = log[-1] if log else None - if not last_msg or ( - (last_msg.role in ["system", "assistant"]) - or (log[-1].role == "user" and log[-1].content.startswith(".")) - ): - inquiry = prompt_user(prompt) + if not last_msg or ((last_msg.role in ["system", "assistant"])): + inquiry = prompt_user() if not inquiry: # Empty command, ask for input again print() continue - # we will exit when last cli-provided prompt is done (if we're non-interactive, see above) - if prompt and len(prompts) == 0: - command_triggered = True - prompt = None yield Message("user", inquiry, quiet=True) - # execute user command - if log[-1].role == "user": - inquiry = log[-1].content - # if message starts with ., treat as command - # when command has been run, - if inquiry.startswith(".") or inquiry.startswith("$"): - yield from handle_cmd(inquiry, logmanager, no_confirm=no_confirm) - # we need to re-assign `log` here since it may be replaced by `handle_cmd` - log = logmanager.log - if inquiry != ".continue": - continue - # print response try: # performs reduction/context trimming, if necessary - msgs = logmanager.prepare_messages() + msgs = log.prepare_messages() # append temporary message with current context, right before user message # NOTE: in my experience, this confused the model more than it helped diff --git a/gptme/logmanager.py b/gptme/logmanager.py index d9c79a6a..51c790c5 100644 --- a/gptme/logmanager.py +++ b/gptme/logmanager.py @@ -1,6 +1,6 @@ import json -import textwrap import logging +import textwrap from pathlib import Path from typing import TypeAlias @@ -29,6 +29,18 @@ def __init__( self.show_hidden = show_hidden # TODO: Check if logfile has contents, then maybe load, or should it overwrite? + def __getitem__(self, key): + return self.log[key] + + def __len__(self): + return len(self.log) + + def __iter__(self): + return iter(self.log) + + def __bool__(self): + return bool(self.log) + def append(self, msg: Message) -> None: """Appends a message to the log, writes the log, prints the message.""" self.log.append(msg) @@ -36,6 +48,9 @@ def append(self, msg: Message) -> None: if not msg.quiet: print_msg(msg, oneline=False) + def pop(self, index: int = -1) -> Message: + return self.log.pop(index) + def write(self) -> None: """Writes the log to the logfile.""" write_log(self.log, self.logfile) @@ -45,13 +60,13 @@ def print(self, show_hidden: bool | None = None): def undo(self, n: int = 1, quiet=False) -> None: """Removes the last message from the log.""" - undid = self.log[-1] if self.log else None + undid = self[-1] if self.log else None if undid and undid.content.startswith(".undo"): - self.log.pop() + self.pop() # Doesn't work for multiple undos in a row, but useful in testing # assert undid.content == ".undo" # assert that the last message is an undo - peek = self.log[-1] if self.log else None + peek = self[-1] if self.log else None if not peek: print("[yellow]Nothing to undo.[/]") return @@ -59,12 +74,12 @@ def undo(self, n: int = 1, quiet=False) -> None: if not quiet: print("[yellow]Undoing messages:[/yellow]") for _ in range(n): - undid = self.log.pop() + undid = self.pop() if not quiet: print( f"[red] {undid.role}: {textwrap.shorten(undid.content.strip(), width=50, placeholder='...')}[/]", ) - peek = self.log[-1] if self.log else None + peek = self[-1] if self.log else None def prepare_messages(self) -> list[Message]: """Prepares the log into messages before sending it to the LLM.""" diff --git a/tests/test_cli.py b/tests/test_cli.py index 5735f832..fca5d764 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -25,9 +25,7 @@ def test_help(): def test_shell(name: str): runner = CliRunner() with runner.isolated_filesystem(): - result = runner.invoke( - gptme.cli.main, ["-y", "--name", name, '.shell echo "yes"'] - ) + result = runner.invoke(gptme.cli.main, ["--name", name, '.shell echo "yes"']) output = result.output.split("System")[-1] # check for two 'yes' in output (both command and stdout) assert output.count("yes") == 2 @@ -37,9 +35,7 @@ def test_shell(name: str): def test_python(name: str): runner = CliRunner() with runner.isolated_filesystem(): - result = runner.invoke( - gptme.cli.main, ["-y", "--name", name, '.python print("yes")'] - ) + result = runner.invoke(gptme.cli.main, ["--name", name, '.python print("yes")']) assert "yes\n" in result.output assert result.exit_code == 0 @@ -48,7 +44,8 @@ def test_python_error(name: str): runner = CliRunner() with runner.isolated_filesystem(): result = runner.invoke( - gptme.cli.main, ["-y", "--name", name, '.python raise Exception("yes")'] + gptme.cli.main, + ["--name", name, '.python raise Exception("yes")'], ) assert "Exception: yes" in result.output assert result.exit_code == 0 @@ -78,7 +75,6 @@ def test_block(name: str, lang: str): runner = CliRunner() with runner.isolated_filesystem(): args = [ - "-y", "--name", name, f".impersonate {code}", @@ -100,7 +96,6 @@ def test_generate_primes(name: str): result = runner.invoke( gptme.cli.main, [ - "-y", "--name", name, "print the first 10 prime numbers",