forked from danmohad/khod-kaar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSystem.py
176 lines (141 loc) · 7.1 KB
/
System.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
File: System.py
Author: Danyal Mohaddes
Description: This file contains the System class.
"""
import os
import subprocess
import tempfile
from colorama import Back
from colorama import Style
class System:
"""Singleton class to allow for independent code execution.
Attributes:
cwd: current working directory
_autopilot: boolean, whether autopilot is active
split_kwd: keyword for separating LLM code output from cwd check"""
def __init__(self, autopilot_: bool) -> None:
"""Initialization for the System class.
Args:
autopilot_: boolean, whether autopilot is active"""
# Keep track of the current working directory
self.cwd = os.getcwd()
self._autopilot = autopilot_
self.split_kwd = "**PWD**"
def _prepare_command(self, llm_code_: str) -> str:
"""Prepare command in `llm_code_` for execution as a subprocess."""
# Append cwd check to code to keep track of it
llm_code_ += f"\necho '{self.split_kwd}'\npwd\necho '{self.split_kwd}'"
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
temp_file.write(llm_code_)
temp_file_path = temp_file.name
return temp_file_path
def execute(self, llm_output_) -> str:
"""Execute code contained in `llm_output_` as a shell command."""
# Ask human for approval to continue
# If LLM wants to stop and human wants to stop, both **STOP** and **CONTINUE** have the same effect
# If LLM wants to stop but human wants to continue, human can give feedback like "you're not done yet"
input_ = self._human_in_the_loop()
if input_ == "**STOP**":
return "**STOP**"
elif input_ != "**CONTINUE**":
return "No code executed. Human in the loop says:\n" + input_
# Check if LLM wants to stop execution
if "**STOP**" in llm_output_:
return "**STOP**"
# Parse the LLM output text for code
llm_code_ = System._parse_output_for_code(llm_output_)
# TODO change this to _parse_output_for_code raising errors and excepting them here
# Complain if `llm_code_` is empty and return
if llm_code_ == "**NO CODE BLOCK**":
# If in autopilot mode, then this will just ask the LLM to do what it thinks is best.
# If not in autopilot mode, the user should notice if no code block AND there should be,
# and should then give direct feedback to LLM
return "I agree with your approach. Continue."
elif llm_code_ == "**MULTIPLE CODE BLOCKS**":
return "Multiple code blocks received. Only one code block can be provided at a time."
temp_file_path_ = self._prepare_command(llm_code_)
# Execute `llm_code_` as a shell command, capture any errors in execution
try:
sp = subprocess.run(['bash', temp_file_path_],
check=True, capture_output=True, text=True, cwd=self.cwd)
# When a command fails, sometimes the exit code is still 0 and subprocess doesn't raise an error because the final "pwd" command succeeds. In these cases, stderr will be non-empty.
if sp.stderr:
raise ValueError
else:
raw_out_ = sp.stdout
except subprocess.CalledProcessError as e:
raw_out_ = f"Shell command exited with status {e.returncode}"
raw_out_ += f"\nCommand was: {e.cmd}"
raw_out_ += f"\nstdout was: {e.stdout}"
if e.stderr:
raw_out_ += f"\nstderr was: {e.stderr}"
except ValueError:
raw_out_ = f"Shell command wrote to stderr"
raw_out_ += f"\nstdout was: {sp.stdout}"
raw_out_ += f"\nstderr was: {sp.stderr}"
finally:
os.remove(temp_file_path_)
# Parse the cwd from the output, remove from `out` so LLM doesn't see commands it didn't issue
std_out_, cwd_, std_err_ = raw_out_.split(self.split_kwd)
self.cwd = "".join(cwd_.split())
out = std_out_ + std_err_
# Add a message to let the LLM know when a shell command that generates no stdout output is successful
if out == "" or out.isspace():
out = "Shell command executed successfully. No output was generated."
# Return stdout output of shell command
return out
@staticmethod
def _parse_output_for_code(output_) -> str:
"""Parse the LLM's output text for code based on the `code_start_stop_substr_` keyword."""
code_start_stop_substr_ = "```"
# Check if LLM failed to include code in its output
count_ = System._count_substr(code_start_stop_substr_, output_)
if count_ == 0:
return "**NO CODE BLOCK**"
elif count_ > 2:
return "**MULTIPLE CODE BLOCKS**"
# Find the first and second instances of `code_start_stop_substr_`
code_start_ = System._find_nth_substr(code_start_stop_substr_, output_, 0) + \
len(code_start_stop_substr_)
code_stop_ = System._find_nth_substr(code_start_stop_substr_, output_, 1)
# Return the code appearing between the instances of `code_start_stop_substr_`
code_ = output_[code_start_:code_stop_]
# Deal with the LLM sometimes starting code with "```bash", assuming "bash" only appears once with no flags e.g. `-c`.
code_ = code_.split("bash\n")[-1]
return code_
@staticmethod
def _count_substr(needle_: str, haystack_: str) -> int:
"""Count how many `needle_` substrings there are in `haystack_`."""
n = 0
start = haystack_.find(needle_)
while start >= 0:
start = haystack_.find(needle_, start+len(needle_))
n += 1
return n
@staticmethod
def _find_nth_substr(needle_, haystack_, n) -> int:
"""Find the starting position of the `n`-th substring `needle_` in the string `haystack_`."""
start = haystack_.find(needle_)
while start >= 0 and n > 0:
start = haystack_.find(needle_, start+len(needle_))
n -= 1
return start
def _human_in_the_loop(self) -> str:
"""Get human approval to continue program execution. This is the only human intervention in the program."""
# TODO change these returns to enumerated types
print("")
if self._autopilot:
print(f"{Back.GREEN}Approved on autopilot{Style.RESET_ALL}")
return "**CONTINUE**"
human_input_ = input(f"{Back.GREEN}e=Send agreement message if no code, else execute code{Style.RESET_ALL}, {Back.RED}q=Quit{Style.RESET_ALL}, {Back.BLUE}Anything else=Feedback to LLM{Style.RESET_ALL}\n")
if human_input_ == "e":
return "**CONTINUE**"
elif human_input_ == "q":
return "**STOP**"
else:
return human_input_
@staticmethod
def _output(output_: str, role_: str) -> None:
"""Write `role_` and associated `output_` to console."""
print(f'{role_}: {output_}', flush=True)