Skip to content

Commit

Permalink
[chiptest] Wait for subprocess instead of polling (#15968)
Browse files Browse the repository at this point in the history
Polling for subprocess return-code hogs CPU (chiptest Python process
uses 100% CPU). Instead of polling it's better to use wait() call, which
effectively puts Python process into sleep until subprocess terminates.
  • Loading branch information
arkq authored and pull[bot] committed Jun 6, 2022
1 parent 226d646 commit bf66ef9
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 32 deletions.
12 changes: 5 additions & 7 deletions scripts/tests/chiptest/accessories.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def init(self):
def uninit(self):
self.__stopXMLRPCServer()

@property
def accessories(self):
"""List of registered accessory applications."""
return self.__accessories.values()

def add(self, name, accessory):
self.__accessories[name] = accessory

Expand All @@ -44,13 +49,6 @@ def remove(self, name):
def removeAll(self):
self.__accessories = {}

def poll(self):
for accessory in self.__accessories.values():
status = accessory.poll()
if status is not None:
return status
return None

def kill(self, name):
accessory = self.__accessories[name]
if accessory:
Expand Down
50 changes: 38 additions & 12 deletions scripts/tests/chiptest/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import os
import pty
import queue
import re
import subprocess
import sys
Expand Down Expand Up @@ -75,7 +76,26 @@ def close(self):
os.close(self.fd_write)


class RunnerWaitQueue:

def __init__(self):
self.queue = queue.Queue()

def __wait(self, process, userdata):
process.wait()
self.queue.put((process, userdata))

def add_process(self, process, userdata=None):
t = threading.Thread(target=self.__wait, args=(process, userdata))
t.daemon = True
t.start()

def get(self):
return self.queue.get()


class Runner:

def __init__(self, capture_delegate=None):
self.capture_delegate = capture_delegate

Expand All @@ -97,16 +117,22 @@ def RunSubprocess(self, cmd, name, wait=True, dependencies=[]):
if not wait:
return s, outpipe, errpipe

while s.poll() is None:
wait = RunnerWaitQueue()
wait.add_process(s)

for dependency in dependencies:
for accessory in dependency.accessories:
wait.add_process(accessory, dependency)

for process, userdata in iter(wait.queue.get, None):
if process == s:
break
# dependencies MUST NOT be done
for dependency in dependencies:
if dependency.poll() is not None:
s.kill()
raise Exception("Unexpected return %d for %r" %
(dependency.poll(), dependency))

code = s.wait()
if code != 0:
raise Exception('Command %r failed: %d' % (cmd, code))
else:
logging.debug('Command %r completed with error code 0', cmd)
s.kill()
raise Exception("Unexpected return %d for %r" %
(process.returncode, userdata))

if s.returncode != 0:
raise Exception('Command %r failed: %d' % (cmd, s.returncode))

logging.debug('Command %r completed with error code 0', cmd)
32 changes: 19 additions & 13 deletions scripts/tests/chiptest/test_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@


class App:

def __init__(self, runner, command):
self.process = None
self.outpipe = None
self.runner = runner
self.command = command
self.cv_stopped = threading.Condition()
self.stopped = False
self.lastLogIndex = 0

Expand All @@ -44,13 +46,17 @@ def start(self, discriminator):
self.__updateSetUpCode(outpipe)
self.process = process
self.outpipe = outpipe
self.stopped = False
with self.cv_stopped:
self.stopped = False
self.cv_stopped.notify()
return True
return False

def stop(self):
if self.process:
self.stopped = True
with self.cv_stopped:
self.stopped = True
self.cv_stopped.notify()
self.process.kill()
self.process.wait(10)
self.process = None
Expand Down Expand Up @@ -85,21 +91,21 @@ def waitForOperationalAdvertisement(self):
self.process, self.outpipe)
return True

def poll(self):
# When the server is manually stopped, process polling is overridden
# so the other processes that depends on the accessory beeing alive
# does not stop.
if self.stopped:
return None
return self.process.poll()

def kill(self):
if self.process:
self.process.kill()

def wait(self, duration):
if self.process:
self.process.wait(duration)
def wait(self, timeout=None):
while True:
code = self.process.wait(timeout)
with self.cv_stopped:
if not self.stopped:
return code
# When the server is manually stopped, process waiting is
# overridden so the other processes that depends on the
# accessory beeing alive does not stop.
while self.stopped:
self.cv_stopped.wait()

def __startServer(self, runner, command, discriminator):
logging.debug(
Expand Down

0 comments on commit bf66ef9

Please sign in to comment.