Skip to content

Commit

Permalink
feat: added flag --(non-)interactive, refactored main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikBjare committed Oct 10, 2023
1 parent c670d35 commit 3122f46
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 55 deletions.
99 changes: 59 additions & 40 deletions gptme/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ def handle_cmd(
@click.option(
"-y", "--no-confirm", is_flag=True, help="Skips all confirmation prompts."
)
@click.option(
"--interactive/--non-interactive",
"-i/-n",
default=True,
help="Choose interactive mode, or not. Non-interactive implies --no-confirm, and is used in testing.",
)
@click.option(
"--show-hidden",
is_flag=True,
Expand All @@ -240,6 +246,7 @@ def main(
verbose: bool,
no_confirm: bool,
show_hidden: bool,
interactive: bool,
):
# log init
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
Expand All @@ -250,6 +257,12 @@ def main(
_load_readline_history()
init_llm(llm) # set up API_KEY and API_BASE

if "PYTEST_CURRENT_TEST" in os.environ:
interactive = False

if not interactive:
no_confirm = True

if no_confirm:
logger.warning("Skipping all confirmation prompts.")

Expand All @@ -258,8 +271,8 @@ def main(
else:
promptmsgs = [Message("system", prompt_system)]

is_interactive = not prompts and sys.stdin.isatty()
if not is_interactive:
# if stdin is not a tty, we're getting piped input
if not sys.stdin.isatty():
# fetch prompt from stdin
prompt_stdin = _read_stdin()
if prompt_stdin:
Expand All @@ -269,7 +282,7 @@ def main(
sys.stdin.close()
sys.stdin = open("/dev/tty")

logfile = get_logfile(name, interactive=is_interactive)
logfile = get_logfile(name, interactive=not prompts and interactive)
print(f"Using logdir {logfile.parent}")
logmanager = LogManager.load(
logfile, initial_msgs=promptmsgs, show_hidden=show_hidden
Expand All @@ -279,72 +292,78 @@ def main(
logmanager.print()
print("--- ^^^ past messages ^^^ ---")

# check if any prompt is a full path, if so, replace it with the contents of that file
prompts = [
f"```{p}\n{Path(p).expanduser().read_text()}\n```"
if Path(p).expanduser().exists()
else p
for p in prompts
]
cli_prompted = bool(prompts)

# main loop
for msg in loop(prompts, logmanager, no_confirm, model, llm):
ctx = loop(logmanager, no_confirm, model, llm)
while True:
# if prompts given on cli:
# - insert prompt into logmanager
# - if a prompt is `-`, wait for reply before sending next prompt
# - set cli_prompted
while prompts:
if prompts[0] == "-":
prompts.pop(0)
break
logmanager.append(Message("user", prompts.pop(0)))

msg = next(ctx)
logmanager.append(msg)

# if prompts have been ran and is non-interactive, exit
# this is used in testing
if cli_prompted and not prompts and not interactive:
logger.info("Command triggered and not in TTY, exiting")
exit(0)


def loop(
prompts: list[str],
logmanager: LogManager,
log: LogManager,
no_confirm: bool,
model: ModelChoice,
llm: LLMChoice,
stream: bool = True,
) -> Generator[Message, None, None]:
log = logmanager.log

# if last message was from assistant, try to run tools again
if log[-1].role == "assistant":
yield from execute_msg(log[-1], ask=not no_confirm)

command_triggered = False
while True:
prompt = None
if prompts:
prompt = prompts[0]
prompts = prompts[1:]

# if prompts have been ran and is non-interactive, exit
if command_triggered and not sys.stdin.isatty():
logger.info("Command triggered and not in TTY, exiting")
break
# execute user command
if log[-1].role == "user":
inquiry = log[-1].content
# if message starts with ., treat as command
# when command has been run,
if inquiry.startswith(".") or inquiry.startswith("$"):
yield from handle_cmd(inquiry, log, no_confirm=no_confirm)
# we need to re-assign `log` here since it may be replaced by `handle_cmd`
# FIXME: this is pretty bad hack to get things working, needs to be refactored
if inquiry != ".continue":
continue

# If last message was a response, ask for input.
# If last message was from the user (such as from crash/edited log),
# then skip asking for input and generate response
last_msg = log[-1] if log else None
if not last_msg or (
(last_msg.role in ["system", "assistant"])
or (log[-1].role == "user" and log[-1].content.startswith("."))
):
inquiry = prompt_user(prompt)
if not last_msg or ((last_msg.role in ["system", "assistant"])):
inquiry = prompt_user()
if not inquiry:
# Empty command, ask for input again
print()
continue
# we will exit when last cli-provided prompt is done (if we're non-interactive, see above)
if prompt and len(prompts) == 0:
command_triggered = True
prompt = None
yield Message("user", inquiry, quiet=True)

# execute user command
if log[-1].role == "user":
inquiry = log[-1].content
# if message starts with ., treat as command
# when command has been run,
if inquiry.startswith(".") or inquiry.startswith("$"):
yield from handle_cmd(inquiry, logmanager, no_confirm=no_confirm)
# we need to re-assign `log` here since it may be replaced by `handle_cmd`
log = logmanager.log
if inquiry != ".continue":
continue

# print response
try:
# performs reduction/context trimming, if necessary
msgs = logmanager.prepare_messages()
msgs = log.prepare_messages()

# append temporary message with current context, right before user message
# NOTE: in my experience, this confused the model more than it helped
Expand Down
27 changes: 21 additions & 6 deletions gptme/logmanager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import textwrap
import logging
import textwrap
from pathlib import Path
from typing import TypeAlias

Expand Down Expand Up @@ -29,13 +29,28 @@ def __init__(
self.show_hidden = show_hidden
# TODO: Check if logfile has contents, then maybe load, or should it overwrite?

def __getitem__(self, key):
return self.log[key]

def __len__(self):
return len(self.log)

def __iter__(self):
return iter(self.log)

def __bool__(self):
return bool(self.log)

def append(self, msg: Message) -> None:
"""Appends a message to the log, writes the log, prints the message."""
self.log.append(msg)
self.write()
if not msg.quiet:
print_msg(msg, oneline=False)

def pop(self, index: int = -1) -> Message:
return self.log.pop(index)

def write(self) -> None:
"""Writes the log to the logfile."""
write_log(self.log, self.logfile)
Expand All @@ -45,26 +60,26 @@ def print(self, show_hidden: bool | None = None):

def undo(self, n: int = 1, quiet=False) -> None:
"""Removes the last message from the log."""
undid = self.log[-1] if self.log else None
undid = self[-1] if self.log else None
if undid and undid.content.startswith(".undo"):
self.log.pop()
self.pop()

# Doesn't work for multiple undos in a row, but useful in testing
# assert undid.content == ".undo" # assert that the last message is an undo
peek = self.log[-1] if self.log else None
peek = self[-1] if self.log else None
if not peek:
print("[yellow]Nothing to undo.[/]")
return

if not quiet:
print("[yellow]Undoing messages:[/yellow]")
for _ in range(n):
undid = self.log.pop()
undid = self.pop()
if not quiet:
print(
f"[red] {undid.role}: {textwrap.shorten(undid.content.strip(), width=50, placeholder='...')}[/]",
)
peek = self.log[-1] if self.log else None
peek = self[-1] if self.log else None

def prepare_messages(self) -> list[Message]:
"""Prepares the log into messages before sending it to the LLM."""
Expand Down
13 changes: 4 additions & 9 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ def test_help():
def test_shell(name: str):
runner = CliRunner()
with runner.isolated_filesystem():
result = runner.invoke(
gptme.cli.main, ["-y", "--name", name, '.shell echo "yes"']
)
result = runner.invoke(gptme.cli.main, ["--name", name, '.shell echo "yes"'])
output = result.output.split("System")[-1]
# check for two 'yes' in output (both command and stdout)
assert output.count("yes") == 2
Expand All @@ -37,9 +35,7 @@ def test_shell(name: str):
def test_python(name: str):
runner = CliRunner()
with runner.isolated_filesystem():
result = runner.invoke(
gptme.cli.main, ["-y", "--name", name, '.python print("yes")']
)
result = runner.invoke(gptme.cli.main, ["--name", name, '.python print("yes")'])
assert "yes\n" in result.output
assert result.exit_code == 0

Expand All @@ -48,7 +44,8 @@ def test_python_error(name: str):
runner = CliRunner()
with runner.isolated_filesystem():
result = runner.invoke(
gptme.cli.main, ["-y", "--name", name, '.python raise Exception("yes")']
gptme.cli.main,
["--name", name, '.python raise Exception("yes")'],
)
assert "Exception: yes" in result.output
assert result.exit_code == 0
Expand Down Expand Up @@ -78,7 +75,6 @@ def test_block(name: str, lang: str):
runner = CliRunner()
with runner.isolated_filesystem():
args = [
"-y",
"--name",
name,
f".impersonate {code}",
Expand All @@ -100,7 +96,6 @@ def test_generate_primes(name: str):
result = runner.invoke(
gptme.cli.main,
[
"-y",
"--name",
name,
"print the first 10 prime numbers",
Expand Down

0 comments on commit 3122f46

Please sign in to comment.