Skip to content

Commit

Permalink
Merge branch 'refactor_setup' into travis
Browse files Browse the repository at this point in the history
  • Loading branch information
hamiltont committed Jul 22, 2014
2 parents 7b078b9 + 5892f92 commit 07fcec5
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 57 deletions.
20 changes: 6 additions & 14 deletions go/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,27 @@
import setup_util
from test_runner import TestRunner

class Bar(TestRunner):
class Go(TestRunner):

def start(self):
setup_util.replace_text("go/src/hello/hello.go", "tcp\(.*:3306\)", "tcp(" + self.database_host + ":3306)")
if os.name == 'nt':
#subprocess.call("rmdir /s /q pkg\\windows_amd64", shell=True, cwd="go")
#subprocess.call("rmdir /s /q src\\github.com", shell=True, cwd="go")
#subprocess.call("del /s /q /f bin\\hello.exe", shell=True, cwd="go")
subprocess.call("set GOPATH=C:\\FrameworkBenchmarks\\go&& go get ./...", shell=True, cwd="go", stderr=errfile, stdout=logfile)
subprocess.Popen("setup.bat", shell=True, cwd="go", stderr=errfile, stdout=logfile)
self.sh("go get ./...")
self.sh("setup.bat")
return 0

self.sh("which go")
self.sh("rm -rf src/github.com")
self.sh("ls src/github.com/go-sql-driver/mysql")
self.sh("go get ./...")
self.sh("ls src/github.com/go-sql-driver/mysql")
self.sh_async("go run -x -v src/hello/hello.go")
self.pid = self.sh_async("go run -x -v src/hello/hello.go")
return 0

def stop(self):
if os.name == 'nt':
subprocess.call("taskkill /f /im go.exe > NUL", shell=True, stderr=errfile, stdout=logfile)
subprocess.call("taskkill /f /im hello.exe > NUL", shell=True, stderr=errfile, stdout=logfile)
return 0
p = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE)
out, err = p.communicate()
for line in out.splitlines():
if 'hello' in line:
pid = int(line.split(None, 2)[1])
os.kill(pid, 15)
# Kill off the entire go process group
self.sh_pkill(self.pid)
return 0
16 changes: 12 additions & 4 deletions toolset/benchmark/benchmarker.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,12 @@ def __run_tests(self, tests):
log.info("__run_tests")
log.debug("__run_tests with __name__ = %s",__name__)

error_happened = False
if self.os.lower() == 'windows':
log.info("Executing __run_tests on Windows")
for test in tests:
self.__run_test(test)
if self.__run_test(test) != 0:
error_happened = True
else:
log.info("Executing __run_tests on Linux")
for test in tests:
Expand Down Expand Up @@ -534,7 +536,12 @@ def __run_tests(self, tests):
pass
subprocess.call("sudo pkill -SIGKILL -s %s" % test_process.pid, shell=True)
log.handlers = [] # Clean up handlers left by __run_test
if test_process.exitcode != 0:
error_happened = True
log.info("End __run_tests")
if error_happened:
return 1
return 0

############################################################
# End __run_tests
Expand Down Expand Up @@ -645,7 +652,7 @@ def signal_term_handler(signal, frame):
if self.__is_port_bound(test.port):
self.__write_intermediate_results(test.name, "port %s is not available before start" % test.port)
log.error(Header("Error: Port %s is not available, cannot start %s" % (test.port, test.name)))
return
return 1

result = test.start(log)
if result != 0:
Expand All @@ -654,7 +661,7 @@ def signal_term_handler(signal, frame):
log.error("ERROR: Problem starting %s", test.name)
log.error(Header("Stopped %s" % test.name))
self.__write_intermediate_results(test.name,"<setup.py>#start() returned non-zero")
return
return 1

log.info("Sleeping for %s", self.sleep)
time.sleep(self.sleep)
Expand All @@ -676,7 +683,7 @@ def signal_term_handler(signal, frame):
if self.__is_port_bound(test.port):
self.__write_intermediate_results(test.name, "port %s was not released by stop" % test.port)
log.error(Header("Error: Port %s was not released by stop %s" % (test.port, test.name)))
return
return 1

log.info(Header("Stopped %s" % test.name))
time.sleep(5)
Expand All @@ -701,6 +708,7 @@ def signal_term_handler(signal, frame):
log.error("%s", e)
log.error("%s", sys.exc_info()[:2])
log.debug("Subprocess Error Details", exc_info=True)
return 1
############################################################
# End __run_test
############################################################
Expand Down
14 changes: 9 additions & 5 deletions toolset/benchmark/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@ def __init__(self, test, target, logger):
self.dir = test.directory

(out, err) = WrapLogger(logger, logging.INFO), WrapLogger(logger, logging.ERROR)
self.utils = ShellUtils(self.dir, out, err)
self.utils = ShellUtils(self.dir, None, None, logger)

def start(self):
raise NotImplementedError()

def stop(self):
raise NotImplementedError()

def sh(self, command, **kwargs):
self.utils.sh(command, **kwargs)
def sh(self, *args, **kwargs):
self.utils.sh(*args, **kwargs)

def sh_async(self, command, **kwargs):
self.utils.sh_async(command, **kwargs)
def sh_async(self, *args, **kwargs):
return self.utils.sh_async(*args, **kwargs)

def sh_pkill(self, *args, **kwargs):
self.logger.debug("Got %s and %s" % (args, kwargs))
self.utils.sh_pkill(*args, **kwargs)

@staticmethod
def is_parent_of(target_class):
Expand Down
138 changes: 105 additions & 33 deletions toolset/benchmark/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@

import subprocess
import time
import os
import logging
import shlex

class ShellUtils():
def __init__(self, directory, outfile, errfile):
def __init__(self, directory, outfile, errfile, logger=None):
'''
outfile: A file-like object to write command output to.
Must have write(string) method. Common choices are
files, sys.stdout, or WrapLogger objects
errfile: See outfile
logger : If provided, used instead of outfile/errfile for
finer-grained logging
'''
# Advanced notes: outfile and errfile do *not* have to be
# thread-safe objects. They are only ever written to from one
Expand All @@ -16,53 +22,76 @@ def __init__(self, directory, outfile, errfile):
self.directory = directory
self.outfile = outfile
self.errfile = errfile
self.logger = logger
self.os = os.name

def __write_out(self, message, level=logging.INFO, stream=None):
if self.logger:
self.logger.log(level, message)
elif stream == None:
self.outfile.write(message)
else:
stream.write(message)

def __write_err(self, message, level=logging.ERROR):
self.__write_out(message, level, stream=self.errfile)

def sh(self, command, **kwargs):
'''Run a shell command, sending output to outfile and errfile.
Blocks until command exits'''
kwargs.setdefault('cwd', self.directory)
kwargs.setdefault('executable', '/bin/bash')
self.outfile.write("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
try:
output = subprocess.check_output(command, shell=True, stderr=self.errfile, **kwargs)
if output and output.strip():
self.outfile.write("Output:")
self.outfile.write(output.rstrip('\n'))
self.__write_out("Output:")
self.__write_out(output.rstrip('\n'))
else:
self.outfile.write("No Output")
self.__write_out("No Output")
except subprocess.CalledProcessError:
self.errfile.write("Command returned non-zero exit code: %s" % command)
self.__write_err("Command returned non-zero exit code: %s" % command)

# TODO modify this to start the subcommand as a new process group, so that
# we can automatically kill the entire group!
def sh_async(self, command, initial_logs=True, **kwargs):
'''Run a shell command, sending output to outfile and errfile.
If intial_logs, prints out logs for a few seconds before returning. '''
# TODO add this - '''Continues to send output until command completes'''
def sh_async(self, command, group=True, **kwargs):
'''
Run a shell command, continually sending output to outfile and errfile until
shell process completes
- If group is set, create a process group that can later be used to kill all subprocesses
Returns the pid of the newly created process (or process group)
'''

# Setup extra args
kwargs.setdefault('cwd', self.directory)
self.outfile.write("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
if group:
if self.os != 'nt':
kwargs.setdefault('preexec_fn', os.setpgrp)
else:
# TODO if someone could make this work that would be great
self.__write_err("Unable to group flag on Windows")

# Open in line-buffered mode (bufsize=1) because NonBlockingStreamReader uses readline anyway
self.__write_out("Running %s (cwd=%s)" % (command, kwargs.get('cwd')))
# Open in line-buffered mode (bufsize=1) because NonBlocking* uses readline
process = subprocess.Popen(command, bufsize=1, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
nbsr = NonBlockingStreamReader(process.stdout)
nbsr_err = NonBlockingStreamReader(process.stderr)
if initial_logs:
time.sleep(8)
# TODO put this read into a tight loop to prevent deadlock due to
# filling up OS buffers
out = nbsr.read()
if len(out) == 0:
self.outfile.write("No output")
else:
self.outfile.write("Initial Output:")
for line in out:
self.outfile.write(line.rstrip('\n'))
err = nbsr_err.read()
if len(err) != 0:
self.errfile.write("Initial Error Logs:")
for line in err:
self.errfile.write(line.rstrip('\n'))
NonBlockingStreamLogger(process, self.logger, name=shlex.split(command)[0])
return process.pid

def sh_pkill(self, group_id, name=None, usesudo=False):
'''
Kill processes that match all the passed arguments
Set group_id if you used sh_async
Set usesudo only if you started these processes with sudo
# TODO - consider os.pgkill?
'''
command = "pkill "
command = "%s -g %s" % (command, group_id)

if name:
command = "%s %s" % (command, name)
if usesudo:
command = "sudo %s" % command
self.sh(command)


from threading import Thread
from Queue import Queue, Empty
Expand Down Expand Up @@ -104,6 +133,49 @@ def read(self):
return lines
lines.append(line)

import logging
from threading import Thread
import threading
# NonBlockingStreamLogger(p, logging.getLogger())
# p = subprocess.Popen("asdfasdf", shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
class NonBlockingStreamLogger:
'''Reads from a subprocess' streams and writes them to the
provided logger as long as the subprocess is alive'''
def __init__(self, process, logger, logout=logging.INFO, logerr=logging.ERROR, name=None):
self.process = process
self.logger = logger
self.out_level = logout
self.err_level = logerr
if name:
self.prefix = "Process '%s':" % name
else:
self.logger.warning("No name provided for process %s", process.pid)
self.prefix = "Process '%s':" % process.pid
name = process.pid
outThread = Thread(target = self._readStream,
args = (process.stdout, self.out_level),
name = "%s - stdout" % name)
outThread.daemon = True
outThread.start()
errThread = Thread(target = self._readStream,
args = (process.stderr, self.err_level),
name = "%s - stderr" % name)
errThread.daemon = True
errThread.start()
def _readStream(self, stream, level):
self.logger.debug("%s Waiting for output (%s)", self.prefix, threading.current_thread().name)
for line in iter(stream.readline, b''):
self.logger.log(level, "%s %s", self.prefix, line.rstrip('\n'))

# Has process died?
if self.process.poll() == 0:
self.logger.debug("%s Death. Reading remainder of stream", self.prefix)
remainder = stream.read()
for line2 in remainder.split(b'\n'):
self.logger.log(level, "%s %s", self.prefix, line2)
break
self.logger.debug("%s Complete (%s)", self.prefix, threading.current_thread().name)
return 0
import tempfile
class WrapLogger():
"""
Expand Down
2 changes: 1 addition & 1 deletion toolset/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def main(argv=None):
elif benchmarker.parse != None:
benchmarker.parse_timestamp()
else:
benchmarker.run()
return benchmarker.run()

# Integrate uncaught exceptions into our logging system
# Note: This doesn't work if the exception happens in a
Expand Down

0 comments on commit 07fcec5

Please sign in to comment.