Skip to content

Commit

Permalink
Fixup ProcessLauncherTests issues (elastic#746)
Browse files Browse the repository at this point in the history
  • Loading branch information
drawlerr authored and novosibman committed Oct 2, 2019
1 parent 8a92462 commit 3b6a942
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 19 deletions.
2 changes: 1 addition & 1 deletion esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def __init__(self, cfg, clock=time.Clock, meta_info=None):
self._trial_id = None
self._trial_timestamp = None
self._track = None
self._track_params = cfg.opts("track", "params")
self._track_params = cfg.opts("track", "params", default_value={}, mandatory=False)
self._challenge = None
self._car = None
self._car_name = None
Expand Down
90 changes: 72 additions & 18 deletions tests/mechanic/launcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import tempfile
from datetime import datetime
import io, os
import uuid
from unittest import TestCase, mock

import psutil

from esrally import config, exceptions, paths
from esrally.mechanic import launcher, telemetry, team
from esrally.mechanic.provisioner import NodeConfiguration
from esrally.mechanic.team import Car
from esrally.metrics import InMemoryMetricsStore
from esrally.utils import opts


class MockMetricsStore:
def add_meta_info(self, scope, scope_key, key, value):
pass


class MockClientFactory:
def __init__(self, hosts, client_options):
self.client_options = client_options
Expand Down Expand Up @@ -115,29 +118,62 @@ def wait(self):
return 0


class MockProcess:
def __init__(self, pid):
self.pid = pid
self.killed = False

def name(self):
return "p{pid}".format(pid=self.pid)

def wait(self, timeout=None):
raise psutil.TimeoutExpired(timeout)

def kill(self):
self.killed = True


def get_metrics_store(cfg):
ms = InMemoryMetricsStore(cfg)
ms.open(trial_id=str(uuid.uuid4()),
trial_timestamp=datetime.now(),
track_name="test",
challenge_name="test",
car_name="test")
return ms


MOCK_PID_VALUE = 1234


class ProcessLauncherTests(TestCase):
@mock.patch('os.path.join', return_value="/telemetry")
@mock.patch('os.kill')
@mock.patch('subprocess.Popen',new=MockPopen)
@mock.patch('subprocess.Popen', new=MockPopen)
@mock.patch('esrally.mechanic.java_resolver.java_home', return_value=(12, "/java_home/"))
@mock.patch('esrally.utils.jvm.supports_option', return_value=True)
@mock.patch('esrally.utils.io.get_size')
@mock.patch('os.chdir')
@mock.patch('esrally.config.Config')
@mock.patch('esrally.metrics.MetricsStore')
@mock.patch('esrally.mechanic.provisioner.NodeConfiguration')
@mock.patch('esrally.mechanic.launcher.wait_for_pidfile', return_value=MOCK_PID_VALUE)
@mock.patch('psutil.Process')
def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg, chdir, supports, java_home, kill, path):
@mock.patch('psutil.Process', new=MockProcess)
def test_daemon_start_stop(self, wait_for_pidfile, chdir, get_size, supports, java_home, kill):
cfg = config.Config()
cfg.add(config.Scope.application, "node", "root.dir", "test")
cfg.add(config.Scope.application, "mechanic", "keep.running", False)
cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "mechanic", "telemetry.params", None)
cfg.add(config.Scope.application, "system", "env.name", "test")

ms = get_metrics_store(cfg)
proc_launcher = launcher.ProcessLauncher(cfg, ms, paths.races_root(cfg))

node_config = NodeConfiguration(car=Car("default", root_path=None, config_paths=[]), ip="127.0.0.1", node_name="testnode",
node_root_path="/tmp", binary_path="/tmp", log_path="/tmp", data_paths="/tmp")

nodes = proc_launcher.start([node_config])
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].pid, MOCK_PID_VALUE)

proc_launcher.keep_running = False
proc_launcher.stop(nodes)
self.assertTrue(kill.called)

Expand Down Expand Up @@ -170,8 +206,11 @@ def test_setup_external_cluster_single_node(self):
cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "client", "hosts", self.test_host)
cfg.add(config.Scope.application, "client", "options", self.client_options)
cfg.add(config.Scope.application, "system", "env.name", "test")

ms = get_metrics_store(cfg)

m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
m = launcher.ExternalLauncher(cfg, ms, client_factory_class=MockClientFactory)
m.start()

# automatically determined by launcher on attach
Expand All @@ -183,8 +222,11 @@ def test_setup_external_cluster_multiple_nodes(self):
cfg.add(config.Scope.application, "client", "hosts", self.test_host)
cfg.add(config.Scope.application, "client", "options", self.client_options)
cfg.add(config.Scope.application, "mechanic", "distribution.version", "2.3.3")
cfg.add(config.Scope.application, "system", "env.name", "test")

m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
ms = get_metrics_store(cfg)

m = launcher.ExternalLauncher(cfg, ms, client_factory_class=MockClientFactory)
m.start()
# did not change user defined value
self.assertEqual(cfg.opts("mechanic", "distribution.version"), "2.3.3")
Expand All @@ -196,8 +238,11 @@ def test_setup_external_cluster_cannot_determine_version(self):
cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "client", "hosts", self.test_host)
cfg.add(config.Scope.application, "client", "options", client_options)
cfg.add(config.Scope.application, "system", "env.name", "test")

ms = get_metrics_store(cfg)

m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
m = launcher.ExternalLauncher(cfg, ms, client_factory_class=MockClientFactory)
m.start()

# automatically determined by launcher on attach
Expand All @@ -216,8 +261,11 @@ def test_launches_cluster(self):
cfg.add(config.Scope.application, "mechanic", "telemetry.params", {})
cfg.add(config.Scope.application, "mechanic", "preserve.install", False)
cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False)
cfg.add(config.Scope.application, "system", "env.name", "test")

ms = get_metrics_store(cfg)

cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
cluster_launcher = launcher.ClusterLauncher(cfg, ms, client_factory_class=MockClientFactory)
cluster = cluster_launcher.start()

self.assertEqual([{"host": "10.0.0.10", "port": 9200}, {"host": "10.0.0.11", "port": 9200}], cluster.hosts)
Expand All @@ -231,8 +279,11 @@ def test_launches_cluster_with_telemetry_client_timeout_enabled(self):
cfg.add(config.Scope.application, "mechanic", "telemetry.params", {})
cfg.add(config.Scope.application, "mechanic", "preserve.install", False)
cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False)
cfg.add(config.Scope.application, "system", "env.name", "test")

cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
ms = get_metrics_store(cfg)

cluster_launcher = launcher.ClusterLauncher(cfg, ms, client_factory_class=MockClientFactory)
cluster = cluster_launcher.start()

for telemetry_device in cluster.telemetry.devices:
Expand All @@ -253,8 +304,11 @@ def test_error_on_cluster_launch(self, sleep):
cfg.add(config.Scope.application, "mechanic", "telemetry.params", {})
cfg.add(config.Scope.application, "mechanic", "preserve.install", False)
cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False)
cfg.add(config.Scope.application, "system", "env.name", "test")

ms = get_metrics_store(cfg)

cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
cluster_launcher = launcher.ClusterLauncher(cfg, ms, client_factory_class=MockClientFactory)
with self.assertRaisesRegex(exceptions.LaunchError,
"Elasticsearch REST API layer is not available. Forcefully terminated cluster."):
cluster_launcher.start()

0 comments on commit 3b6a942

Please sign in to comment.