Skip to content

Commit

Permalink
Export sys.path on client connection
Browse files Browse the repository at this point in the history
  • Loading branch information
edyounis committed Apr 1, 2024
1 parent 66d38da commit 72c073d
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 4 deletions.
3 changes: 1 addition & 2 deletions bqskit/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import sys
import time
import uuid
import warnings
from multiprocessing.connection import Client
from multiprocessing.connection import Connection
from subprocess import Popen
Expand Down Expand Up @@ -149,7 +148,7 @@ def _connect_to_server(self, ip: str, port: int) -> None:
self.old_signal = signal.signal(signal.SIGINT, handle)
if self.conn is None:
raise RuntimeError('Connection unexpectedly none.')
self.conn.send((RuntimeMessage.CONNECT, None))
self.conn.send((RuntimeMessage.CONNECT, sys.path))
_logger.debug('Successfully connected to runtime server.')
return
raise RuntimeError('Client connection refused')
Expand Down
16 changes: 14 additions & 2 deletions bqskit/runtime/detached.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from bqskit.runtime.result import RuntimeResult
from bqskit.runtime.task import RuntimeTask


def listen(server: DetachedServer, port: int) -> None:
"""Listening thread listens for client connections."""
listener = Listener(('0.0.0.0', port))
Expand Down Expand Up @@ -131,7 +130,15 @@ def handle_message(
if direction == MessageDirection.CLIENT:

if msg == RuntimeMessage.CONNECT:
pass
# paths, serialized_defintions = cast(List[str], payload)
paths = cast(List[str], payload)
import sys
for path in paths:
if path not in sys.path:
sys.path.append(path)
for employee in self.employees:
employee.conn.send((RuntimeMessage.IMPORTPATH, path))


elif msg == RuntimeMessage.DISCONNECT:
self.handle_disconnect(conn)
Expand Down Expand Up @@ -370,6 +377,11 @@ def handle_error(self, error_payload: tuple[int, str]) -> None:
conn = self.tasks[self.mailbox_to_task_dict[tid]][1]
self.outgoing.put((conn, RuntimeMessage.ERROR, error_payload[1]))
# TODO: Broadcast cancel to all tasks with compilation task id tid
# But avoid double broadcasting it. If the client crashes due to
# this error, which it may not, then we will quickly process
# a handle_disconnect and call the cancel anyways. We should
# still cancel here incase the client catches the error and
# resubmits a job.

def handle_log(self, log_payload: tuple[int, LogRecord]) -> None:
"""Forward logs to appropriate client."""
Expand Down
7 changes: 7 additions & 0 deletions bqskit/runtime/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ def handle_message(
elif msg == RuntimeMessage.SHUTDOWN:
self.handle_shutdown()

elif msg == RuntimeMessage.IMPORTPATH:
import_path = cast(str, payload)
import sys
sys.path.append(import_path)
for employee in self.employees:
employee.conn.send((RuntimeMessage.IMPORTPATH, import_path))

else:
raise RuntimeError(f'Unexpected message type: {msg.name}')

Expand Down
1 change: 1 addition & 0 deletions bqskit/runtime/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ class RuntimeMessage(IntEnum):
CANCEL = 11
WAITING = 12
UPDATE = 13
IMPORTPATH = 14
4 changes: 4 additions & 0 deletions bqskit/runtime/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ def handle_incoming_comms(worker: Worker) -> None:
worker._handle_cancel(addr)
# TODO: preempt?

elif msg == RuntimeMessage.IMPORTPATH:
import_path = cast(str, payload)
sys.path.append(import_path)


class Worker:
"""
Expand Down

0 comments on commit 72c073d

Please sign in to comment.