From 25af1508eea9c355fdddd53557d0d20061a80b92 Mon Sep 17 00:00:00 2001 From: Hamilton Turner Date: Tue, 22 Jul 2014 13:17:56 -0400 Subject: [PATCH 1/6] Allow use of logging inside ShellUtils --- toolset/benchmark/utils.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/toolset/benchmark/utils.py b/toolset/benchmark/utils.py index 974c08a92ce..98b53d4b91a 100644 --- a/toolset/benchmark/utils.py +++ b/toolset/benchmark/utils.py @@ -1,13 +1,16 @@ import subprocess import time +import logging class ShellUtils(): - def __init__(self, directory, outfile, errfile): + def __init__(self, directory, outfile, errfile, logger=None): ''' outfile: A file-like object to write command output to. Must have write(string) method. Common choices are files, sys.stdout, or WrapLogger objects errfile: See outfile + logger : If provided, used instead of outfile/errfile for + finer-grained logging ''' # Advanced notes: outfile and errfile do *not* have to be # thread-safe objects. They are only ever written to from one @@ -16,22 +19,34 @@ def __init__(self, directory, outfile, errfile): self.directory = directory self.outfile = outfile self.errfile = errfile + self.logger = logger + + def __write_out(self, message, level=logging.INFO, stream=None): + if self.logger: + self.logger.log(level, message) + elif stream == None: + self.outfile.write(message) + else: + stream.write(message) + + def __write_err(self, message, level=logging.ERROR): + self.__write_out(message, level, stream=self.errfile) def sh(self, command, **kwargs): '''Run a shell command, sending output to outfile and errfile. Blocks until command exits''' kwargs.setdefault('cwd', self.directory) kwargs.setdefault('executable', '/bin/bash') - self.outfile.write("Running %s (cwd=%s)" % (command, kwargs.get('cwd'))) + self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd'))) try: output = subprocess.check_output(command, shell=True, stderr=self.errfile, **kwargs) if output and output.strip(): - self.outfile.write("Output:") - self.outfile.write(output.rstrip('\n')) + self.__write_out("Output:") + self.__write_out(output.rstrip('\n')) else: - self.outfile.write("No Output") + self.__write_out("No Output") except subprocess.CalledProcessError: - self.errfile.write("Command returned non-zero exit code: %s" % command) + self.__write_err("Command returned non-zero exit code: %s" % command) # TODO modify this to start the subcommand as a new process group, so that # we can automatically kill the entire group! @@ -40,9 +55,9 @@ def sh_async(self, command, initial_logs=True, **kwargs): If intial_logs, prints out logs for a few seconds before returning. ''' # TODO add this - '''Continues to send output until command completes''' kwargs.setdefault('cwd', self.directory) - self.outfile.write("Running %s (cwd=%s)" % (command, kwargs.get('cwd'))) # Open in line-buffered mode (bufsize=1) because NonBlockingStreamReader uses readline anyway + self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd'))) process = subprocess.Popen(command, bufsize=1, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs) nbsr = NonBlockingStreamReader(process.stdout) nbsr_err = NonBlockingStreamReader(process.stderr) From 8824dfa9dabf79e4a8f7656ec2c1bb9778ad9265 Mon Sep 17 00:00:00 2001 From: Hamilton Turner Date: Tue, 22 Jul 2014 13:20:04 -0400 Subject: [PATCH 2/6] Create NonBlockingStreamLogger class Automatically handles directing a subprocess' stdout and stderr to a logger --- toolset/benchmark/utils.py | 92 ++++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 23 deletions(-) diff --git a/toolset/benchmark/utils.py b/toolset/benchmark/utils.py index 98b53d4b91a..6255bbd10e2 100644 --- a/toolset/benchmark/utils.py +++ b/toolset/benchmark/utils.py @@ -2,6 +2,8 @@ import subprocess import time import logging +import shlex + class ShellUtils(): def __init__(self, directory, outfile, errfile, logger=None): ''' @@ -51,33 +53,34 @@ def sh(self, command, **kwargs): # TODO modify this to start the subcommand as a new process group, so that # we can automatically kill the entire group! def sh_async(self, command, initial_logs=True, **kwargs): - '''Run a shell command, sending output to outfile and errfile. - If intial_logs, prints out logs for a few seconds before returning. ''' - # TODO add this - '''Continues to send output until command completes''' + ''' + Run a shell command, continually sending output to outfile and errfile until + shell process completes + ''' kwargs.setdefault('cwd', self.directory) - # Open in line-buffered mode (bufsize=1) because NonBlockingStreamReader uses readline anyway self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd'))) + # Open in line-buffered mode (bufsize=1) because NonBlocking* uses readline process = subprocess.Popen(command, bufsize=1, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs) - nbsr = NonBlockingStreamReader(process.stdout) - nbsr_err = NonBlockingStreamReader(process.stderr) - if initial_logs: - time.sleep(8) - # TODO put this read into a tight loop to prevent deadlock due to - # filling up OS buffers - out = nbsr.read() - if len(out) == 0: - self.outfile.write("No output") - else: - self.outfile.write("Initial Output:") - for line in out: - self.outfile.write(line.rstrip('\n')) - - err = nbsr_err.read() - if len(err) != 0: - self.errfile.write("Initial Error Logs:") - for line in err: - self.errfile.write(line.rstrip('\n')) + NonBlockingStreamLogger(process, self.logger, name=shlex.split(command)[0]) + return process.pid + + def sh_pkill(self, group_id, name=None, usesudo=False): + ''' + Kill processes that match all the passed arguments + Set group_id if you used sh_async + Set usesudo only if you started these processes with sudo + # TODO - consider os.pgkill? + ''' + command = "pkill " + command = "%s -g %s" % (command, group_id) + + if name: + command = "%s %s" % (command, name) + if usesudo: + command = "sudo %s" % command + self.sh(command) + from threading import Thread from Queue import Queue, Empty @@ -119,6 +122,49 @@ def read(self): return lines lines.append(line) +import logging +from threading import Thread +import threading +# NonBlockingStreamLogger(p, logging.getLogger()) +# p = subprocess.Popen("asdfasdf", shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) +class NonBlockingStreamLogger: + '''Reads from a subprocess' streams and writes them to the + provided logger as long as the subprocess is alive''' + def __init__(self, process, logger, logout=logging.INFO, logerr=logging.ERROR, name=None): + self.process = process + self.logger = logger + self.out_level = logout + self.err_level = logerr + if name: + self.prefix = "Process '%s':" % name + else: + self.logger.warning("No name provided for process %s", process.pid) + self.prefix = "Process '%s':" % process.pid + name = process.pid + outThread = Thread(target = self._readStream, + args = (process.stdout, self.out_level), + name = "%s - stdout" % name) + outThread.daemon = True + outThread.start() + errThread = Thread(target = self._readStream, + args = (process.stderr, self.err_level), + name = "%s - stderr" % name) + errThread.daemon = True + errThread.start() + def _readStream(self, stream, level): + self.logger.debug("%s Waiting for output (%s)", self.prefix, threading.current_thread().name) + for line in iter(stream.readline, b''): + self.logger.log(level, "%s %s", self.prefix, line.rstrip('\n')) + + # Has process died? + if self.process.poll() == 0: + self.logger.debug("%s Death. Reading remainder of stream", self.prefix) + remainder = stream.read() + for line2 in remainder.split(b'\n'): + self.logger.log(level, "%s %s", self.prefix, line2) + break + self.logger.debug("%s Complete (%s)", self.prefix, threading.current_thread().name) + return 0 import tempfile class WrapLogger(): """ From 84d391e64850aa278bd19a0d61663c2693153b36 Mon Sep 17 00:00:00 2001 From: Hamilton Turner Date: Tue, 22 Jul 2014 13:27:40 -0400 Subject: [PATCH 3/6] Use linux process groups when running background tasks The previous commit defines sh_pkill, I probably should have put it with this --- toolset/benchmark/utils.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/toolset/benchmark/utils.py b/toolset/benchmark/utils.py index 6255bbd10e2..1d99ee260f7 100644 --- a/toolset/benchmark/utils.py +++ b/toolset/benchmark/utils.py @@ -50,14 +50,23 @@ def sh(self, command, **kwargs): except subprocess.CalledProcessError: self.__write_err("Command returned non-zero exit code: %s" % command) - # TODO modify this to start the subcommand as a new process group, so that - # we can automatically kill the entire group! - def sh_async(self, command, initial_logs=True, **kwargs): + def sh_async(self, command, group=True, **kwargs): ''' Run a shell command, continually sending output to outfile and errfile until shell process completes + + - If group is set, create a process group that can later be used to kill all subprocesses + Returns the pid of the newly created process (or process group) ''' + + # Setup extra args kwargs.setdefault('cwd', self.directory) + if group: + if self.os != 'nt': + kwargs.setdefault('preexec_fn', os.setpgrp) + else: + # TODO if someone could make this work that would be great + self.__write_err("Unable to group flag on Windows") self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd'))) # Open in line-buffered mode (bufsize=1) because NonBlocking* uses readline From c5e489b15604e53326930b52e59a04637853756b Mon Sep 17 00:00:00 2001 From: Hamilton Turner Date: Tue, 22 Jul 2014 13:58:17 -0400 Subject: [PATCH 4/6] Update TestRunner to use new ShellUtils --- toolset/benchmark/test_runner.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/toolset/benchmark/test_runner.py b/toolset/benchmark/test_runner.py index 117d6b5d813..f7744f81294 100644 --- a/toolset/benchmark/test_runner.py +++ b/toolset/benchmark/test_runner.py @@ -18,7 +18,7 @@ def __init__(self, test, target, logger): self.dir = test.directory (out, err) = WrapLogger(logger, logging.INFO), WrapLogger(logger, logging.ERROR) - self.utils = ShellUtils(self.dir, out, err) + self.utils = ShellUtils(self.dir, None, None, logger) def start(self): raise NotImplementedError() @@ -26,11 +26,15 @@ def start(self): def stop(self): raise NotImplementedError() - def sh(self, command, **kwargs): - self.utils.sh(command, **kwargs) + def sh(self, *args, **kwargs): + self.utils.sh(*args, **kwargs) - def sh_async(self, command, **kwargs): - self.utils.sh_async(command, **kwargs) + def sh_async(self, *args, **kwargs): + return self.utils.sh_async(*args, **kwargs) + + def sh_pkill(self, *args, **kwargs): + self.logger.debug("Got %s and %s" % (args, kwargs)) + self.utils.sh_pkill(*args, **kwargs) @staticmethod def is_parent_of(target_class): From 02af18423d75ccfc051c31a25b803b13f6985c62 Mon Sep 17 00:00:00 2001 From: Hamilton Turner Date: Tue, 22 Jul 2014 13:59:32 -0400 Subject: [PATCH 5/6] Update go/setup.py to use new ShellUtils --- go/setup.py | 20 ++++++-------------- toolset/benchmark/utils.py | 2 ++ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/go/setup.py b/go/setup.py index 69fee7549ff..b42d1169dc7 100644 --- a/go/setup.py +++ b/go/setup.py @@ -4,7 +4,7 @@ import setup_util from test_runner import TestRunner -class Bar(TestRunner): +class Go(TestRunner): def start(self): setup_util.replace_text("go/src/hello/hello.go", "tcp\(.*:3306\)", "tcp(" + self.database_host + ":3306)") @@ -12,16 +12,12 @@ def start(self): #subprocess.call("rmdir /s /q pkg\\windows_amd64", shell=True, cwd="go") #subprocess.call("rmdir /s /q src\\github.com", shell=True, cwd="go") #subprocess.call("del /s /q /f bin\\hello.exe", shell=True, cwd="go") - subprocess.call("set GOPATH=C:\\FrameworkBenchmarks\\go&& go get ./...", shell=True, cwd="go", stderr=errfile, stdout=logfile) - subprocess.Popen("setup.bat", shell=True, cwd="go", stderr=errfile, stdout=logfile) + self.sh("go get ./...") + self.sh("setup.bat") return 0 - self.sh("which go") - self.sh("rm -rf src/github.com") - self.sh("ls src/github.com/go-sql-driver/mysql") self.sh("go get ./...") - self.sh("ls src/github.com/go-sql-driver/mysql") - self.sh_async("go run -x -v src/hello/hello.go") + self.pid = self.sh_async("go run -x -v src/hello/hello.go") return 0 def stop(self): @@ -29,10 +25,6 @@ def stop(self): subprocess.call("taskkill /f /im go.exe > NUL", shell=True, stderr=errfile, stdout=logfile) subprocess.call("taskkill /f /im hello.exe > NUL", shell=True, stderr=errfile, stdout=logfile) return 0 - p = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE) - out, err = p.communicate() - for line in out.splitlines(): - if 'hello' in line: - pid = int(line.split(None, 2)[1]) - os.kill(pid, 15) + # Kill off the entire go process group + self.sh_pkill(self.pid) return 0 diff --git a/toolset/benchmark/utils.py b/toolset/benchmark/utils.py index 1d99ee260f7..7ab3f20e599 100644 --- a/toolset/benchmark/utils.py +++ b/toolset/benchmark/utils.py @@ -1,6 +1,7 @@ import subprocess import time +import os import logging import shlex @@ -22,6 +23,7 @@ def __init__(self, directory, outfile, errfile, logger=None): self.outfile = outfile self.errfile = errfile self.logger = logger + self.os = os.name def __write_out(self, message, level=logging.INFO, stream=None): if self.logger: From 5892f92caf7745a926e24f8e0f11f1ac0dbdb152 Mon Sep 17 00:00:00 2001 From: Hamilton Turner Date: Tue, 22 Jul 2014 14:01:01 -0400 Subject: [PATCH 6/6] Return non-zero if any test failed --- toolset/benchmark/benchmarker.py | 16 ++++++++++++---- toolset/run-tests.py | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/toolset/benchmark/benchmarker.py b/toolset/benchmark/benchmarker.py index 5333de8c412..60c9b03463d 100644 --- a/toolset/benchmark/benchmarker.py +++ b/toolset/benchmark/benchmarker.py @@ -488,10 +488,12 @@ def __run_tests(self, tests): log.info("__run_tests") log.debug("__run_tests with __name__ = %s",__name__) + error_happened = False if self.os.lower() == 'windows': log.info("Executing __run_tests on Windows") for test in tests: - self.__run_test(test) + if self.__run_test(test) != 0: + error_happened = True else: log.info("Executing __run_tests on Linux") for test in tests: @@ -534,7 +536,12 @@ def __run_tests(self, tests): pass subprocess.call("sudo pkill -SIGKILL -s %s" % test_process.pid, shell=True) log.handlers = [] # Clean up handlers left by __run_test + if test_process.exitcode != 0: + error_happened = True log.info("End __run_tests") + if error_happened: + return 1 + return 0 ############################################################ # End __run_tests @@ -645,7 +652,7 @@ def signal_term_handler(signal, frame): if self.__is_port_bound(test.port): self.__write_intermediate_results(test.name, "port %s is not available before start" % test.port) log.error(Header("Error: Port %s is not available, cannot start %s" % (test.port, test.name))) - return + return 1 result = test.start(log) if result != 0: @@ -654,7 +661,7 @@ def signal_term_handler(signal, frame): log.error("ERROR: Problem starting %s", test.name) log.error(Header("Stopped %s" % test.name)) self.__write_intermediate_results(test.name,"#start() returned non-zero") - return + return 1 log.info("Sleeping for %s", self.sleep) time.sleep(self.sleep) @@ -676,7 +683,7 @@ def signal_term_handler(signal, frame): if self.__is_port_bound(test.port): self.__write_intermediate_results(test.name, "port %s was not released by stop" % test.port) log.error(Header("Error: Port %s was not released by stop %s" % (test.port, test.name))) - return + return 1 log.info(Header("Stopped %s" % test.name)) time.sleep(5) @@ -701,6 +708,7 @@ def signal_term_handler(signal, frame): log.error("%s", e) log.error("%s", sys.exc_info()[:2]) log.debug("Subprocess Error Details", exc_info=True) + return 1 ############################################################ # End __run_test ############################################################ diff --git a/toolset/run-tests.py b/toolset/run-tests.py index 9b748ada774..fe9c095d99a 100755 --- a/toolset/run-tests.py +++ b/toolset/run-tests.py @@ -172,7 +172,7 @@ def main(argv=None): elif benchmarker.parse != None: benchmarker.parse_timestamp() else: - benchmarker.run() + return benchmarker.run() # Integrate uncaught exceptions into our logging system # Note: This doesn't work if the exception happens in a