Skip to content

Commit

Permalink
fix readline method of wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
unaidedelf8777 committed Sep 27, 2023
1 parent c4a3344 commit 040fd6a
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 209 deletions.
196 changes: 61 additions & 135 deletions interpreter/code_interpreters/container_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def build_docker_images(

# Update the stored current hash
stored_hashes["current_hash"] = current_hash
with open(hash_file_path, "w", encoding="utf-8") as f:
with open(hash_file_path, "w") as f:
json.dump(stored_hashes, f)

except subprocess.CalledProcessError:
Expand All @@ -118,13 +118,6 @@ def build_docker_images(


class DockerStreamWrapper:
"""
A wrapper class for Docker container streams.
This class provides a way to interact with the input/output streams of a Docker container.
It creates pipes for stdin, stdout, and stderr, and starts a thread to listen for data on the socket.
"""

def __init__(self, exec_id, sock):
self.exec_id = exec_id
self._sock = sock
Expand All @@ -133,32 +126,25 @@ def __init__(self, exec_id, sock):
self.stdout = self.Stream(self, self._stdout_r)
self.stderr = self.Stream(self, self._stderr_r)

## stdin pipe and fd. dosent need a pipe, but its easier and thread safe and less mem intensive than a queue.Queue()
self._stdin_r, self._stdin_w = os.pipe() # Pipe for stdin
self.stdin = os.fdopen(self._stdin_w, "w")
self.stdin = os.fdopen(self._stdin_w, 'w')
self._stdin_buffer = b"" # Buffer for stdin data. more complex = better fr

## start recieving thread to watch socket, and send data from stdin pipe.
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._listen, daemon=True)
self._thread.start()

class Stream:
"""
A class representing a stream of data.
Attributes:
parent (object): The parent object that created the stream.
_read_fd (int): The file descriptor for the read end of the stream.
_buffer (str): The buffer for the stream data.
"""
def __init__(self, parent, read_fd):
self.parent = parent
self._read_fd = read_fd
self._buffer = ""

### CAUTION: For some reason when formatting the document, it deletes the readline method. i dont understand why, but it does so dont format this doc.
def readline(self, timeout=3):

def readline(self):
while '\n' not in self._buffer:
ready_to_read, _, _ = select.select([self._read_fd], [], [], timeout)
ready_to_read, _, _ = select.select([self._read_fd], [], [], None)
if not ready_to_read:
return ''
chunk = os.read(self._read_fd, 1024).decode('utf-8')
Expand All @@ -171,8 +157,8 @@ def readline(self, timeout=3):

def _listen(self):
while not self._stop_event.is_set():
ready_to_read, _, _ = select.select([self._sock, self._stdin_r], [], [], 1)

ready_to_read, _, _ = select.select([self._sock, self._stdin_r], [], [], None)
for s in ready_to_read:
if s == self._sock:
raw_data = self._sock.recv(2048)
Expand All @@ -181,58 +167,48 @@ def _listen(self):
os.write(self._stderr_w, stderr.encode())
elif s == self._stdin_r:
# Read from the read end of the stdin pipe and add to the buffer
data_to_write = os.read(self._stdin_r, 2048).decode("utf-8")

data_to_write = os.read(self._stdin_r, 2048).decode('utf-8')
# Remove escape characters for quotes but leave other backslashes untouched
data_to_write = re.sub(r'\\([\'"])', r"\1", data_to_write)
data_to_write = re.sub(r'\\([\'"])', r'\1', data_to_write)

data_to_write = data_to_write.replace("\\n", "\n")
data_to_write = data_to_write.replace('\\n', '\n')

self._stdin_buffer += data_to_write.encode()

# Check for newline and send line by line
while b"\n" in self._stdin_buffer:
newline_pos = self._stdin_buffer.find(b"\n")
line = self._stdin_buffer[: newline_pos + 1] # Include the newline
self._stdin_buffer = self._stdin_buffer[newline_pos + 1 :]
while b'\n' in self._stdin_buffer:
newline_pos = self._stdin_buffer.find(b'\n')
line = self._stdin_buffer[:newline_pos + 1] # Include the newline
self._stdin_buffer = self._stdin_buffer[newline_pos + 1:]


# Send the line to the Docker container
self._sock.sendall(line)

@staticmethod
def demux_docker_stream(data):
"""
Demultiplexes a Docker stream into stdout and stderr.
Args:
data (bytes): The Docker stream to demultiplex.

Returns:
Tuple[str, str]: A tuple containing the stdout and stderr streams.
"""
def demux_docker_stream(self, data):
stdout = ""
stderr = ""
offset = 0
while offset + 8 <= len(data):
header = data[offset : offset + 8]
(stream_type, length) = struct.unpack(">BxxxL", header)
header = data[offset:offset + 8]
stream_type, length = struct.unpack('>BxxxL', header)
offset += 8
chunk = data[offset : offset + length].decode("utf-8")
chunk = data[offset:offset + length].decode('utf-8')
offset += length
if stream_type == 1:
stdout += chunk
elif stream_type == 2:
stderr += chunk
return (stdout, stderr)

@staticmethod
def flush():
"""
This method is not implemented as we use .sendall when sending data to the socket.
It is only here for the sake of being identical to the Subprocess.POPEN interface.
"""
return stdout, stderr

def flush(self):
pass

def close(self):
print("#### CLOSE METHOD HIT")
self._stop_event.set()
self._thread.join()
os.close(self._stdout_r)
Expand All @@ -241,21 +217,8 @@ def close(self):
os.close(self._stderr_w)


class DockerProcWrapper:
"""
Initializes a DockerProcWrapper instance.
Args:
command (str): The command to be executed in the Docker container.
session_path (str): The path to the session directory.

Returns:
None
Raises:
TimeoutError: Raised when the container fails to start within the specified timeout.
"""
class DockerProcWrapper:
def __init__(self, command, session_path):
self.client = docker.APIClient()
self.image_name = "openinterpreter-runtime-container:latest"
Expand All @@ -265,81 +228,59 @@ def __init__(self, command, session_path):
self.exec_socket = None
atexit.register(atexit_destroy, self)

if not os.path.exists(session_path):
os.makedirs(session_path)
os.makedirs(self.session_path, exist_ok=True)


# Initialize container
self.init_container()

self.init_exec_instance()
self.init_exec_instance(command)


self.wrapper = DockerStreamWrapper(self.exec_id, self.exec_socket)
self.stdout = self.wrapper.stdout
self.stderr = self.wrapper.stderr
self.stdin = self.wrapper.stdin

self.stdin.write(command + "\n")

def init_container(self):
"""
Initializes a Docker container for the interpreter session.
If a container with the session ID label already exists, it will be used.
Otherwise, a new container will be created with the specified image and host configuration.
Raises:
docker.errors.APIError: If an error occurs while interacting with the Docker API.
"""
self.container = None
try:
if containers := self.client.containers(
filters={"label": f"session_id={self.id}"}, all=True
):
containers = self.client.containers(
filters={"label": f"session_id={self.id}"}, all=True)
if containers:
self.container = containers[0]
container_id = self.container.get("Id")
container_id = self.container.get('Id')
container_info = self.client.inspect_container(container_id)
if container_info.get("State", {}).get("Running") is False:
Print(container_info.get("State", {}))
if container_info.get('State', {}).get('Running') is False:
print(container_info.get('State', {}))
self.client.start(container=container_id)
self.wait_for_container_start(container_id)
else:
host_config = self.client.create_host_config(
binds={self.session_path: {"bind": "/mnt/data", "mode": "rw"}}
binds={self.session_path: {'bind': '/mnt/data', 'mode': 'rw'}}
)

self.container = self.client.create_container(
image=self.image_name,
detach=True,
command="/bin/bash -i",
labels={"session_id": self.id},
labels={'session_id': self.id},
host_config=host_config,
user="nobody",
stdin_open=True,
tty=False,
tty=False
)

self.client.start(container=self.container.get("Id"))
self.wait_for_container_start(self.container.get("Id"))
self.client.start(container=self.container.get('Id'))
self.wait_for_container_start(self.container.get('Id'))

except docker.errors.APIError as api_error:
Print(f"An error occurred: {api_error}")

def init_exec_instance(self):
"""
Initializes the execution instance for the container.
except Exception as e:
print(f"An error occurred: {e}")

If a container exists, this method creates an execution instance for the container using the Docker API.
The execution instance is created with the following parameters:
- cmd: "/bin/bash"
- stdin: True
- stdout: True
- stderr: True
- workdir: "/mnt/data"
- user: "nobody"
- tty: False
Returns:
None
"""
def init_exec_instance(self, command):
if self.container:
self.exec_id = self.client.exec_create(
self.container.get("Id"),
Expand All @@ -349,44 +290,29 @@ def init_exec_instance(self):
stderr=True,
workdir="/mnt/data",
user="docker",
tty=False,
)["Id"]
self.exec_socket = self.client.exec_start(
self.exec_id, socket=True, tty=False, demux=False
)._sock

def wait_for_container_start(self, container_id, timeout=30):
"""
Waits for a container to start running.
tty=False

Args:
container_id (str): The ID of the container to wait for.
timeout (int, optional): The maximum amount of time to wait for the container to start, in seconds. Defaults to 30.
)['Id']
self.exec_socket = self.client.exec_start(
self.exec_id, socket=True, tty=False, demux=False)._sock

Raises:
TimeoutError: If the container does not start running within the specified timeout.

Returns:
bool: True if the container starts running within the specified timeout, False otherwise.
"""
def wait_for_container_start(self, container_id, timeout=30):
start_time = time.time()
while True:
container_info = self.client.inspect_container(container_id)
if container_info.get("State", {}).get("Running") is True:
if container_info.get('State', {}).get('Running') is True:
return True
if time.time() - start_time > timeout:
raise TimeoutError("Container did not start within the specified timeout.")
elif time.time() - start_time > timeout:
raise TimeoutError(
"Container did not start within the specified timeout.")
time.sleep(1)


def atexit_destroy(self):
"""
Deletes the session directory and stops/removes the container associated with the current session.
Args:
self: The current instance of the ContainerUtils class.
"""

shutil.rmtree(self.session_path)
self.client.stop(self.container.get("Id"))
self.client.remove_container(self.container.get("Id"))



3 changes: 2 additions & 1 deletion interpreter/code_interpreters/create_code_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,5 @@ def create_code_interpreter(language, use_containers=False):
os.makedirs(session_path)
return CodeInterpreter(session_id=session_id, use_docker=use_containers)
except KeyError as exc:
raise ValueError(f"Unknown or unsupported language: {language}. \n ") from exc
raise ValueError(f"Unknown or unsupported language: {language}. \n ") from exc

4 changes: 2 additions & 2 deletions interpreter/code_interpreters/dockerfiles/hash.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"original_hash": "8c30b83e642147e3ce8a4965006c060fe81a15abc38222e65506ae602a26e831",
"last_hash": "8c30b83e642147e3ce8a4965006c060fe81a15abc38222e65506ae602a26e831"
"original_hash": "bf472f80a3acd23a1d9b689da1f3c5834a1a728bf51265e034f0b1bbe44a8435",
"last_hash": "bf472f80a3acd23a1d9b689da1f3c5834a1a728bf51265e034f0b1bbe44a8435"
}
Loading

0 comments on commit 040fd6a

Please sign in to comment.