Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chiptest] Wait for subprocess instead of polling #15968

Merged
merged 1 commit into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions scripts/tests/chiptest/accessories.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def init(self):
def uninit(self):
self.__stopXMLRPCServer()

@property
def accessories(self):
"""List of registered accessory applications."""
return self.__accessories.values()

def add(self, name, accessory):
self.__accessories[name] = accessory

Expand All @@ -44,13 +49,6 @@ def remove(self, name):
def removeAll(self):
self.__accessories = {}

def poll(self):
for accessory in self.__accessories.values():
status = accessory.poll()
if status is not None:
return status
return None

def kill(self, name):
accessory = self.__accessories[name]
if accessory:
Expand Down
50 changes: 38 additions & 12 deletions scripts/tests/chiptest/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import os
import pty
import queue
import re
import subprocess
import sys
Expand Down Expand Up @@ -75,7 +76,26 @@ def close(self):
os.close(self.fd_write)


class RunnerWaitQueue:

def __init__(self):
self.queue = queue.Queue()

def __wait(self, process, userdata):
process.wait()
self.queue.put((process, userdata))

def add_process(self, process, userdata=None):
t = threading.Thread(target=self.__wait, args=(process, userdata))
t.daemon = True
t.start()

def get(self):
return self.queue.get()


class Runner:

def __init__(self, capture_delegate=None):
self.capture_delegate = capture_delegate

Expand All @@ -97,16 +117,22 @@ def RunSubprocess(self, cmd, name, wait=True, dependencies=[]):
if not wait:
return s, outpipe, errpipe

while s.poll() is None:
wait = RunnerWaitQueue()
wait.add_process(s)

for dependency in dependencies:
for accessory in dependency.accessories:
wait.add_process(accessory, dependency)

for process, userdata in iter(wait.queue.get, None):
if process == s:
break
# dependencies MUST NOT be done
for dependency in dependencies:
if dependency.poll() is not None:
s.kill()
raise Exception("Unexpected return %d for %r" %
(dependency.poll(), dependency))

code = s.wait()
if code != 0:
raise Exception('Command %r failed: %d' % (cmd, code))
else:
logging.debug('Command %r completed with error code 0', cmd)
s.kill()
raise Exception("Unexpected return %d for %r" %
(process.returncode, userdata))

if s.returncode != 0:
raise Exception('Command %r failed: %d' % (cmd, s.returncode))

logging.debug('Command %r completed with error code 0', cmd)
32 changes: 19 additions & 13 deletions scripts/tests/chiptest/test_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@


class App:

def __init__(self, runner, command):
self.process = None
self.outpipe = None
self.runner = runner
self.command = command
self.cv_stopped = threading.Condition()
self.stopped = False
self.lastLogIndex = 0

Expand All @@ -44,13 +46,17 @@ def start(self, discriminator):
self.__updateSetUpCode(outpipe)
self.process = process
self.outpipe = outpipe
self.stopped = False
with self.cv_stopped:
self.stopped = False
self.cv_stopped.notify()
return True
return False

def stop(self):
if self.process:
self.stopped = True
with self.cv_stopped:
self.stopped = True
self.cv_stopped.notify()
self.process.kill()
self.process.wait(10)
self.process = None
Expand Down Expand Up @@ -85,21 +91,21 @@ def waitForOperationalAdvertisement(self):
self.process, self.outpipe)
return True

def poll(self):
# When the server is manually stopped, process polling is overridden
# so the other processes that depends on the accessory beeing alive
# does not stop.
if self.stopped:
return None
return self.process.poll()

def kill(self):
if self.process:
self.process.kill()

def wait(self, duration):
if self.process:
self.process.wait(duration)
def wait(self, timeout=None):
while True:
code = self.process.wait(timeout)
with self.cv_stopped:
if not self.stopped:
return code
# When the server is manually stopped, process waiting is
# overridden so the other processes that depends on the
# accessory beeing alive does not stop.
while self.stopped:
self.cv_stopped.wait()

def __startServer(self, runner, command, discriminator):
logging.debug(
Expand Down