Skip to content

Commit

Permalink
Only abort current benchmark
Browse files Browse the repository at this point in the history
Closes #383
  • Loading branch information
danielmitterdorfer committed Dec 15, 2017
1 parent 80468ad commit 8adb0f8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 38 deletions.
14 changes: 7 additions & 7 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,13 @@ def print_help_on_errors():


def race(cfg):
other_rally_processes = process.find_all_other_rally_processes()
if other_rally_processes:
pids = [p.pid for p in other_rally_processes]
msg = "There are other Rally processes running on this machine (PIDs: %s) but only one Rally benchmark is allowed to run at " \
"the same time. Please check and terminate these processes and retry again." % pids
raise exceptions.RallyError(msg)

with_actor_system(lambda c: racecontrol.run(c), cfg)


Expand Down Expand Up @@ -722,13 +729,6 @@ def main():
else:
logger.info("Detected a working Internet connection.")

# Kill any lingering Rally processes before attempting to continue - the actor system needs to be a singleton on this machine
# noinspection PyBroadException
try:
process.kill_running_rally_instances()
except BaseException:
logger.exception("Could not terminate potentially running Rally instances correctly. Attempting to go on anyway.")

success = dispatch_sub_command(cfg, sub_command)

end = time.time()
Expand Down
47 changes: 29 additions & 18 deletions esrally/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,42 +75,53 @@ def kill_running_es_instances(trait):
:param trait some trait of the process in the command line.
"""

def elasticsearch_process(p):
return p.name() == "java" and any("elasticsearch" in e for e in p.cmdline()) and any(trait in e for e in p.cmdline())

logger.info("Killing all processes which match [java], [elasticsearch] and [%s]" % trait)
kill_all(elasticsearch_process)


def kill_running_rally_instances():
def rally_process(p):
return p.name() == "esrally" or \
p.name() == "rally" or \
(p.name().lower().startswith("python")
and any("esrally" in e for e in p.cmdline())
and not any("esrallyd" in e for e in p.cmdline()))
def is_rally_process(p):
return p.name() == "esrally" or \
p.name() == "rally" or \
(p.name().lower().startswith("python")
and any("esrally" in e for e in p.cmdline())
and not any("esrallyd" in e for e in p.cmdline()))


kill_all(rally_process)
def find_all_other_rally_processes():
others = []
for_all_other_processes(is_rally_process, lambda p: others.append(p))
return others


def kill_all(predicate):
def kill(p):
logger.info("Killing lingering process with PID [%s] and command line [%s]." % (p.pid, p.cmdline()))
p.kill()
# wait until process has terminated, at most 3 seconds. Otherwise we might run into race conditions with actor system
# sockets that are still open.
for i in range(3):
try:
p.status()
time.sleep(1)
except psutil.NoSuchProcess:
break

for_all_other_processes(predicate, kill)


def for_all_other_processes(predicate, action):
# no harakiri please
my_pid = os.getpid()
for p in psutil.process_iter():
try:
if p.pid == my_pid:
logger.info("Skipping myself (PID [%s])." % p.pid)
elif predicate(p):
logger.info("Killing lingering process with PID [%s] and command line [%s]." % (p.pid, p.cmdline()))
p.kill()
# wait until process has terminated, at most 3 seconds. Otherwise we might run into race conditions with actor system
# sockets that are still open.
for i in range(3):
try:
p.status()
time.sleep(1)
except psutil.NoSuchProcess:
break
action(p)
else:
logger.debug("Skipping [%s]" % p.cmdline())
except (psutil.ZombieProcess, psutil.AccessDenied) as e:
Expand Down
26 changes: 13 additions & 13 deletions tests/utils/process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_kills_only_rally_es_processes(self, process_iter):
self.assertFalse(rally_process_mac.killed)

@mock.patch("psutil.process_iter")
def test_kills_only_rally_processes(self, process_iter):
def test_find_other_rally_processes(self, process_iter):
rally_es_5_process = ProcessTests.Process(100, "java",
["/usr/lib/jvm/java-8-oracle/bin/java", "-Xms2g", "-Xmx2g", "-Enode.name=rally-node0",
"org.elasticsearch.bootstrap.Elasticsearch"])
Expand Down Expand Up @@ -108,16 +108,16 @@ def test_kills_only_rally_processes(self, process_iter):
night_rally_process,
]

process.kill_running_rally_instances()
self.assertEqual([rally_process_p, rally_process_r, rally_process_e, rally_process_mac],
process.find_all_other_rally_processes())

self.assertFalse(rally_es_5_process.killed)
self.assertFalse(rally_es_1_process.killed)
self.assertFalse(metrics_store_process.killed)
self.assertFalse(random_python.killed)
self.assertFalse(other_process.killed)
self.assertTrue(rally_process_p.killed)
self.assertTrue(rally_process_r.killed)
self.assertTrue(rally_process_e.killed)
self.assertTrue(rally_process_mac.killed)
self.assertFalse(own_rally_process.killed)
self.assertFalse(night_rally_process.killed)
@mock.patch("psutil.process_iter")
def test_find_no_other_rally_process_running(self, process_iter):
metrics_store_process = ProcessTests.Process(102, "java", ["/usr/lib/jvm/java-8-oracle/bin/java", "-Xms2g", "-Xmx2g",
"-Des.path.home=~/rally/metrics/",
"org.elasticsearch.bootstrap.Elasticsearch"])
random_python = ProcessTests.Process(103, "python3", ["/some/django/app"])

process_iter.return_value = [ metrics_store_process, random_python]

self.assertEqual(0, len(process.find_all_other_rally_processes()))

0 comments on commit 8adb0f8

Please sign in to comment.