Skip to content

Commit

Permalink
rewrite pipe handling on windows to use a thread
Browse files Browse the repository at this point in the history
  • Loading branch information
mmerickel committed Mar 10, 2017
1 parent 48017ac commit e46d451
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 34 deletions.
80 changes: 72 additions & 8 deletions src/hupper/ipc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import sys
import threading
import time

from .compat import WIN

Expand Down Expand Up @@ -36,7 +38,45 @@ def add_child(self, pid):
else:
raise

def send_fd(pipe, fd, pid):
class StdinPipe(object):
_bufsize = 256

def __init__(self):
self._thread = None
self._local_fd = sys.stdin.fileno()
self.fd, self._write_fd = os.pipe()
set_inheritable(self.fd)

def start(self):
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.start()

def _run(self):
while self._running:
if msvcrt.kbhit():
ch = os.read(self._local_fd, self._bufsize)
os.write(self._write_fd, ch)
time.sleep(0.05)

def stop(self):
self._running = False
self._thread.join()
self._thread = None

close_fd(self._write_fd, raises=False)
self._write_fd = None

close_fd(self.fd, raises=False)
self.fd = None

def snapshot_termios(self):
pass

def restore_termios(self):
pass

def send_fd(fd, pipe, pid):
hf = msvcrt.get_osfhandle(fd)
hp = winapi.OpenProcess(winapi.PROCESS_ALL_ACCESS, False, pid)
tp = winapi.DuplicateHandle(
Expand Down Expand Up @@ -74,12 +114,31 @@ def add_child(self, pid):
# nothing to do on *nix
pass

def send_fd(pipe, fd, pid):
class StdinPipe(object):
def __init__(self):
self._local_fd = sys.stdin.fileno()
self._orig_termios = None
self.fd = os.dup(self._local_fd)
set_inheritable(self.fd)

def start(self):
pass

def stop(self):
close_fd(self.fd, raises=False)
self.fd = None

def snapshot_termios(self):
self._orig_termios = snapshot_termios(self._local_fd)

def restore_termios(self):
restore_termios(self._local_fd, self._orig_termios)

def send_fd(fd, pipe, pid):
pipe.send(fd)

def recv_fd(pipe, mode):
fd = pipe.recv()
return fd
return pipe.recv()

def patch_stdin(fd):
# Python's input() function used by pdb and other things only uses
Expand All @@ -100,13 +159,18 @@ def restore_termios(fd, state):
termios.tcsetattr(fd, termios.TCSANOW, state)


def dup_fd(fd):
fd = os.dup(fd)

def set_inheritable(fd):
# py34 and above sets CLOEXEC automatically on file descriptors
# NOTE: this isn't usually an issue because multiprocessing doesn't
# actually exec on linux/macos, but we're depending on the behavior
if hasattr(os, 'get_inheritable') and not os.get_inheritable(fd):
os.set_inheritable(fd, True)

return fd

def close_fd(fd, raises=True):
if fd is not None:
try:
os.close(fd)
except Exception: # pragma: nocover
if raises:
raise
38 changes: 12 additions & 26 deletions src/hupper/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,7 @@
get_py_path,
)
from .interfaces import IReloaderProxy
from .ipc import (
dup_fd,
patch_stdin,
recv_fd,
send_fd,
snapshot_termios,
restore_termios,
)
from . import ipc


class WatchSysModules(threading.Thread):
Expand Down Expand Up @@ -119,16 +112,15 @@ def __init__(self, spec, args=None, kwargs=None):
self.terminated = False
self.pid = None
self.exitcode = None
self.stdin_fd = None
self.stdin_termios = None
self.stdin = None

def start(self):
# prepare to close our stdin by making a new copy that is
# not attached to sys.stdin - we will pass this to the worker while
# it's running and then restore it when the worker is done
# we dup it early such that it's inherited by the child
self.stdin_fd = dup_fd(sys.stdin.fileno())
self.stdin_termios = snapshot_termios(self.stdin_fd)
self.stdin = ipc.StdinPipe()
self.stdin.snapshot_termios()

kw = dict(
spec=self.worker_spec,
Expand All @@ -148,7 +140,8 @@ def start(self):
del self._c2p

# send the stdin handle to the worker
send_fd(self.pipe, self.stdin_fd, self.pid)
ipc.send_fd(self.stdin.fd, self.pipe, self.pid)
self.stdin.start()

def is_alive(self):
if self.process:
Expand All @@ -163,13 +156,10 @@ def join(self):
self.process.join()
self.exitcode = self.process.exitcode

if self.stdin_fd is not None:
try:
os.close(self.stdin_fd)
except: # pragma: nocover
pass
finally:
self.stdin_fd = None
if self.stdin:
self.stdin.stop()
self.stdin.restore_termios()
self.stdin = None

if self.pipe is not None:
try:
Expand All @@ -179,10 +169,6 @@ def join(self):
finally:
self.pipe = None

if self.stdin_termios:
restore_termios(sys.stdin.fileno(), self.stdin_termios)
self.stdin_termios = None


# set when the current process is being monitored
_reloader_proxy = None
Expand Down Expand Up @@ -238,8 +224,8 @@ def worker_main(spec, files_queue, pipe, parent_pipe, spec_args=None,
signal.signal(signal.SIGHUP, signal.SIG_IGN)

# use the stdin fd passed in from the reloader process
stdin_fd = recv_fd(pipe, 'r')
patch_stdin(stdin_fd)
stdin_fd = ipc.recv_fd(pipe, 'r')
ipc.patch_stdin(stdin_fd)

# disable pyc files for project code because it can cause timestamp
# issues in which files are reloaded twice
Expand Down

0 comments on commit e46d451

Please sign in to comment.