Skip to content

Commit

Permalink
use a quit event to reduce sleeping/spinning in threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Moffat committed Dec 1, 2016
1 parent c203859 commit a500762
Showing 1 changed file with 34 additions and 26 deletions.
60 changes: 34 additions & 26 deletions sh.py
Original file line number Diff line number Diff line change
Expand Up @@ -1933,10 +1933,13 @@ def fn(exit_code):
return self.command.handle_command_exit_code(exit_code)
handle_exit_code = fn

self._quit_threads = threading.Event()

thread_name = "background thread for pid %d" % self.pid
self._background_thread = _start_daemon_thread(background_thread,
thread_name, ca["timeout"], timeout_fn, self._timeout_event,
self._timeout_cancel_event, handle_exit_code, self.is_alive)
self._timeout_cancel_event, handle_exit_code, self.is_alive,
self._quit_threads)


# start the main io threads. stdin thread is not needed if we are
Expand All @@ -1947,7 +1950,7 @@ def fn(exit_code):
thread_name = "STDIN thread for pid %d" % self.pid
self._input_thread = _start_daemon_thread(input_thread,
thread_name, self.log, self._stdin_stream,
self.is_alive, close_before_term)
self.is_alive, self._quit_threads, close_before_term)


# this event is for cases where the subprocess that we launch
Expand All @@ -1961,7 +1964,7 @@ def fn(exit_code):
self._output_thread = _start_daemon_thread(output_thread,
thread_name, self.log, self._stdout_stream,
self._stderr_stream, self._timeout_event, self.is_alive,
self._force_done_event)
self._quit_threads, self._force_done_event)


def __repr__(self):
Expand Down Expand Up @@ -2101,6 +2104,7 @@ def wait(self):
self.log.debug("exit code already set (%d), no need to wait",
self.exit_code)

self._quit_threads.set()

# we may not have a thread for stdin, if the pipe has been connected
# via _piped="direct"
Expand All @@ -2127,40 +2131,48 @@ def wait(self):



def input_thread(log, stdin, is_alive, close_before_term):
def input_thread(log, stdin, is_alive, quit, close_before_term):
""" this is run in a separate thread. it writes into our process's
stdin (a streamwriter) and waits the process to end AND everything that
can be written to be written """

done = False
alive = True
closed = False
alive = True
writers = [stdin]

while alive:
alive, exit_code = is_alive()
log.debug("%r ready for more input", stdin)
while writers and alive:
_, to_write, _ = select.select([], writers, [], 1)

if done:
time.sleep(0.01)
else:
if to_write:
log.debug("%r ready for more input", stdin)
done = stdin.write()

if done and close_before_term:
stdin.close()
closed = True
if done:
writers = []
if close_before_term:
stdin.close()
closed = True

alive, exit_code = is_alive()

while alive:
quit.wait(1)
alive, _ = is_alive()

if not closed:
stdin.close()


def background_thread(timeout, timeout_fn, timeout_event, cancel_event,
handle_exit_code, is_alive):
handle_exit_code, is_alive, quit):
""" handles the timeout logic """

if timeout:
started = time.time()

while True:
time.sleep(0.01)
time.sleep(0.1)
elapsed = time.time() - started
if elapsed > timeout or cancel_event.is_set():
break
Expand All @@ -2175,13 +2187,13 @@ def background_thread(timeout, timeout_fn, timeout_event, cancel_event,
if handle_exit_code and not RUNNING_TESTS: # pragma: no cover
alive = True
while alive:
time.sleep(0.01)
quit.wait(1)
alive, exit_code = is_alive()

handle_exit_code(exit_code)


def output_thread(log, stdout, stderr, timeout_event, is_alive,
def output_thread(log, stdout, stderr, timeout_event, is_alive, quit,
force_done_event):
""" this function is run in a separate thread. it reads from the
process's stdout stream (a streamreader), and waits for it to claim that
Expand All @@ -2204,7 +2216,7 @@ def output_thread(log, stdout, stderr, timeout_event, is_alive,
# loop and clean up
while readers:
outputs, inputs, err = no_interrupt(select.select, readers, [], errors,
0.1)
1.0)

# stdout and stderr
for stream in outputs:
Expand All @@ -2225,14 +2237,10 @@ def output_thread(log, stdout, stderr, timeout_event, is_alive,
if force_done_event.is_set():
break

# here we spin until the process is dead, then handle the exit code. we do
# this because some processes, which are not waited on, may have exceptions,
# and we need some way to report them, even if they're uncatchable
# exceptions in threads.
alive = True
while alive:
alive, exit_code = is_alive()
time.sleep(0.01)
quit.wait(1)
alive, _ = is_alive()

if stdout:
stdout.close()
Expand Down Expand Up @@ -2300,7 +2308,7 @@ def determine_how_to_read_input(input_obj):
def get_queue_chunk_reader(stdin):
def fn():
try:
chunk = stdin.get(True, 0.01)
chunk = stdin.get(True, 0.1)
except Empty:
raise NotYetReadyToRead
if chunk is None:
Expand Down

0 comments on commit a500762

Please sign in to comment.