From 40cd86c12a647f4e7d3ffbf28f995dc10f7506ac Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Thu, 3 Aug 2023 17:05:39 +0200 Subject: [PATCH 1/7] Retrieve serverless build hash from nodes info API --- esrally/driver/driver.py | 18 +++++++++++++++++- esrally/racecontrol.py | 4 ++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 2a64f42c6..2ea7b5a46 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -47,7 +47,7 @@ from esrally.client import delete_api_keys from esrally.driver import runner, scheduler from esrally.track import TrackProcessorRegistry, load_track, load_track_plugins -from esrally.utils import console, convert, net +from esrally.utils import console, convert, net, versions ################################## @@ -661,6 +661,16 @@ def retrieve_cluster_info(self, es): self.logger.exception("Could not retrieve cluster info on benchmark start") return None + def retrieve_build_hash_from_nodes_info(self, es): + try: + nodes_info = es["default"].nodes.info(filter_path="**.build_hash") + nodes = list(nodes_info["nodes"].keys()) + # assumption: build hash is the same across all the nodes + return nodes_info["nodes"][nodes[0]]["build_hash"] + except BaseException: + self.logger.exception("Could not retrieve build hash from nodes info") + return None + def create_api_key(self, es, client_id): self.logger.debug("Creating ES API key for client [%s].", client_id) try: @@ -696,6 +706,12 @@ def prepare_benchmark(self, t): else: self.wait_for_rest_api(es_clients) self.target.cluster_details = self.retrieve_cluster_info(es_clients) + serverless_mode = self.config.opts("driver", "serverless.mode", default_value=False) + serverless_operator = self.config.opts("driver", "serverless.operator", default_value=False) + if serverless_mode and serverless_operator: + build_hash = self.retrieve_build_hash_from_nodes_info(es_clients) + self.logger.info(f"Retrieved actual build hash [{build_hash}] from serverless cluster.") + self.target.cluster_details["version"]["build_hash"] = build_hash # Avoid issuing any requests to the target cluster when static responses are enabled. The results # are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs. diff --git a/esrally/racecontrol.py b/esrally/racecontrol.py index 8f0b04889..fbbe70902 100644 --- a/esrally/racecontrol.py +++ b/esrally/racecontrol.py @@ -205,6 +205,10 @@ def setup(self, sources=False): raise exceptions.SystemSetupError( f"Cluster version must be at least [{min_es_version}] but was [{distribution_version}]" ) + else: + self.cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True) + # operator privileges assumed for now + self.cfg.add(config.Scope.benchmark, "driver", "serverless.operator", True) self.current_track = track.load_track(self.cfg, install_dependencies=True) self.track_revision = self.cfg.opts("track", "repository.revision", mandatory=False) From 81f5be515d0eb77ac3171b9dc558071bcf147589 Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Thu, 3 Aug 2023 17:12:19 +0200 Subject: [PATCH 2/7] Fix precommit --- esrally/driver/driver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 2ea7b5a46..ef27171ab 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -47,7 +47,7 @@ from esrally.client import delete_api_keys from esrally.driver import runner, scheduler from esrally.track import TrackProcessorRegistry, load_track, load_track_plugins -from esrally.utils import console, convert, net, versions +from esrally.utils import console, convert, net ################################## @@ -710,7 +710,7 @@ def prepare_benchmark(self, t): serverless_operator = self.config.opts("driver", "serverless.operator", default_value=False) if serverless_mode and serverless_operator: build_hash = self.retrieve_build_hash_from_nodes_info(es_clients) - self.logger.info(f"Retrieved actual build hash [{build_hash}] from serverless cluster.") + self.logger.info("Retrieved actual build hash [%s] from serverless cluster.", build_hash) self.target.cluster_details["version"]["build_hash"] = build_hash # Avoid issuing any requests to the target cluster when static responses are enabled. The results From 87f34600a9b0c660d4a83c868af1b54a2aadb10d Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Thu, 3 Aug 2023 17:48:20 +0200 Subject: [PATCH 3/7] Make serverless configuration settings non-mandatory --- esrally/driver/driver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index ef27171ab..76deb00d5 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -706,8 +706,8 @@ def prepare_benchmark(self, t): else: self.wait_for_rest_api(es_clients) self.target.cluster_details = self.retrieve_cluster_info(es_clients) - serverless_mode = self.config.opts("driver", "serverless.mode", default_value=False) - serverless_operator = self.config.opts("driver", "serverless.operator", default_value=False) + serverless_mode = self.config.opts("driver", "serverless.mode", default_value=False, mandatory=False) + serverless_operator = self.config.opts("driver", "serverless.operator", default_value=False, mandatory=False) if serverless_mode and serverless_operator: build_hash = self.retrieve_build_hash_from_nodes_info(es_clients) self.logger.info("Retrieved actual build hash [%s] from serverless cluster.", build_hash) From ed69d829128ba946093159e932184db5ba9dee95 Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Fri, 4 Aug 2023 08:36:55 +0200 Subject: [PATCH 4/7] Avoid list in build hash retrieval --- esrally/driver/driver.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 76deb00d5..5b771af06 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -664,9 +664,10 @@ def retrieve_cluster_info(self, es): def retrieve_build_hash_from_nodes_info(self, es): try: nodes_info = es["default"].nodes.info(filter_path="**.build_hash") - nodes = list(nodes_info["nodes"].keys()) + nodes = nodes_info["nodes"] # assumption: build hash is the same across all the nodes - return nodes_info["nodes"][nodes[0]]["build_hash"] + first_node_id = next(iter(nodes)) + return nodes[first_node_id]["build_hash"] except BaseException: self.logger.exception("Could not retrieve build hash from nodes info") return None From b9a2ab8752a458a2cd29003756a24bfec0a0bd54 Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Fri, 4 Aug 2023 11:10:27 +0200 Subject: [PATCH 5/7] Add unit test for build hash retrieval in serverless mode --- tests/driver/driver_test.py | 70 ++++++++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index fa72a96ad..cb6586eae 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -87,6 +87,43 @@ def create(self): def close(cls): TestDriver.StaticClientFactory.PATCHER.stop() + class StaticServerlessClientFactory: + PATCHER = None + + def __init__(self, *args, **kwargs): + TestDriver.StaticClientFactory.PATCHER = mock.patch("elasticsearch.Elasticsearch") + self.es = TestDriver.StaticClientFactory.PATCHER.start() + self.es.info.return_value = { + "name": "serverless", + "cluster_name": "serverless", + "cluster_uuid": "4bbPT0Z6SsuODSz_vG1umA", + "version": { + "number": "8.10.0", + "build_flavor": "serverless", + "build_type": "docker", + "build_hash": "00000000", + "build_date": "2023-10-31", + "build_snapshot": False, + "lucene_version": "9.7.0", + "minimum_wire_compatibility_version": "8.10.0", + "minimum_index_compatibility_version": "8.10.0", + }, + "tagline": "You Know, for Search", + } + self.es.nodes.info.return_value = { + "nodes": { + "PNAuTQt3Seum5BpPleo0wA": {"build_hash": "5f626ea4014dc029b8ae3f0bca06944975bf2d80"}, + "i9FafotsSSOrOZdDyhA2Ng": {"build_hash": "5f626ea4014dc029b8ae3f0bca06944975bf2d80"}, + } + } + + def create(self): + return self.es + + @classmethod + def close(cls): + TestDriver.StaticClientFactory.PATCHER.stop() + def setup_method(self, method): self.cfg = config.Config() self.cfg.add(config.Scope.application, "system", "env.name", "unittest") @@ -100,7 +137,12 @@ def setup_method(self, method): self.cfg.add(config.Scope.application, "telemetry", "devices", []) self.cfg.add(config.Scope.application, "telemetry", "params", {}) self.cfg.add(config.Scope.application, "mechanic", "car.names", ["default"]) - self.cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", True) + if method == self.test_prepare_serverless_benchmark: # pylint: disable=comparison-with-callable + self.cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False) + self.cfg.add(config.Scope.application, "driver", "serverless.mode", True) + self.cfg.add(config.Scope.application, "driver", "serverless.operator", True) + else: + self.cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", True) self.cfg.add(config.Scope.application, "client", "hosts", self.Holder(all_hosts={"default": ["localhost:9200"]})) self.cfg.add(config.Scope.application, "client", "options", self.Holder(all_client_options={"default": {}})) self.cfg.add(config.Scope.application, "driver", "load_driver_hosts", ["localhost"]) @@ -147,6 +189,32 @@ def test_start_benchmark_and_prepare_track(self, resolve): # Did we start all load generators? There is no specific mock assert for this... assert target.start_worker.call_count == 4 + # mocking DriverActor.prepare_track() only to complete Driver.prepare_benchmark() + @mock.patch.object(driver.DriverActor, "prepare_track") + def test_prepare_serverless_benchmark(self, mock_method): + driver_actor = driver.DriverActor + d = driver.Driver(driver_actor, self.cfg, es_client_factory_class=self.StaticServerlessClientFactory) + d.prepare_benchmark(t=self.track) + + # was build hash determined correctly? + assert driver_actor.cluster_details == { + "name": "serverless", + "cluster_name": "serverless", + "cluster_uuid": "4bbPT0Z6SsuODSz_vG1umA", + "version": { + "number": "8.10.0", + "build_flavor": "serverless", + "build_type": "docker", + "build_hash": "5f626ea4014dc029b8ae3f0bca06944975bf2d80", # <--- THIS + "build_date": "2023-10-31", + "build_snapshot": False, + "lucene_version": "9.7.0", + "minimum_wire_compatibility_version": "8.10.0", + "minimum_index_compatibility_version": "8.10.0", + }, + "tagline": "You Know, for Search", + } + def test_assign_drivers_round_robin(self): target = self.create_test_driver_target() d = driver.Driver(target, self.cfg, es_client_factory_class=self.StaticClientFactory) From da34a7d843237e1c387e081b3949f37e31afe9fb Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Fri, 4 Aug 2023 15:33:40 +0200 Subject: [PATCH 6/7] Override revision in ClusterEnvironmentInfo telemetry device --- esrally/driver/driver.py | 6 ++++-- esrally/telemetry.py | 7 +++++-- tests/telemetry_test.py | 4 ++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 5b771af06..3500e364f 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -615,7 +615,7 @@ def create_es_clients(self): ).create() return es - def prepare_telemetry(self, es, enable, index_names, data_stream_names): + def prepare_telemetry(self, es, enable, index_names, data_stream_names, build_hash): enabled_devices = self.config.opts("telemetry", "devices") telemetry_params = self.config.opts("telemetry", "params") log_root = paths.race_root(self.config) @@ -626,7 +626,7 @@ def prepare_telemetry(self, es, enable, index_names, data_stream_names): devices = [ telemetry.NodeStats(telemetry_params, es, self.metrics_store), telemetry.ExternalEnvironmentInfo(es_default, self.metrics_store), - telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store), + telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store, build_hash), telemetry.JvmStatsSummary(es_default, self.metrics_store), telemetry.IndexStats(es_default, self.metrics_store), telemetry.MlBucketProcessingTime(es_default, self.metrics_store), @@ -700,6 +700,7 @@ def prepare_benchmark(self, t): skip_rest_api_check = self.config.opts("mechanic", "skip.rest.api.check") uses_static_responses = self.config.opts("client", "options").uses_static_responses + build_hash = None if skip_rest_api_check: self.logger.info("Skipping REST API check as requested explicitly.") elif uses_static_responses: @@ -721,6 +722,7 @@ def prepare_benchmark(self, t): enable=not uses_static_responses, index_names=self.track.index_names(), data_stream_names=self.track.data_stream_names(), + build_hash=build_hash, ) for host in self.config.opts("driver", "load_driver_hosts"): diff --git a/esrally/telemetry.py b/esrally/telemetry.py index 86ba574d4..fce61016f 100644 --- a/esrally/telemetry.py +++ b/esrally/telemetry.py @@ -1761,10 +1761,11 @@ class ClusterEnvironmentInfo(InternalTelemetryDevice): Gathers static environment information on a cluster level (e.g. version numbers). """ - def __init__(self, client, metrics_store): + def __init__(self, client, metrics_store, revision_override): super().__init__() self.metrics_store = metrics_store self.client = client + self.revision_override = revision_override def on_benchmark_start(self): # noinspection PyBroadException @@ -1774,8 +1775,10 @@ def on_benchmark_start(self): self.logger.exception("Could not retrieve cluster version info") return distribution_flavor = client_info["version"].get("build_flavor", "oss") - # build hash will only be available on serverless if the client has operator privs + # serverless returns dummy build hash which gets overridden when running with operator privileges revision = client_info["version"].get("build_hash", distribution_flavor) + if self.revision_override: + revision = self.revision_override # build version does not exist for serverless distribution_version = client_info["version"].get("number", distribution_flavor) self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "source_revision", revision) diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index eb9576588..75f1662ec 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -3256,7 +3256,7 @@ def test_stores_cluster_level_metrics_on_attach(self, metrics_store_add_meta_inf cfg = create_config() client = Client(nodes=SubClient(info=nodes_info), info=cluster_info) metrics_store = metrics.EsMetricsStore(cfg) - env_device = telemetry.ClusterEnvironmentInfo(client, metrics_store) + env_device = telemetry.ClusterEnvironmentInfo(client, metrics_store, None) t = telemetry.Telemetry(cfg, devices=[env_device]) t.on_benchmark_start() calls = [ @@ -3282,7 +3282,7 @@ def test_resilient_if_error_response(self, metrics_store_add_meta_info): cfg = create_config() client = Client(nodes=SubClient(stats=raiseTransportError, info=raiseTransportError), info=raiseTransportError) metrics_store = metrics.EsMetricsStore(cfg) - env_device = telemetry.ClusterEnvironmentInfo(client, metrics_store) + env_device = telemetry.ClusterEnvironmentInfo(client, metrics_store, None) t = telemetry.Telemetry(cfg, devices=[env_device]) t.on_benchmark_start() From bbd8cabe478798b71b6623526f4a22f356af8eb5 Mon Sep 17 00:00:00 2001 From: Grzegorz Banasiak Date: Fri, 4 Aug 2023 16:47:11 +0200 Subject: [PATCH 7/7] Add refactor note --- esrally/telemetry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/esrally/telemetry.py b/esrally/telemetry.py index fce61016f..d87893380 100644 --- a/esrally/telemetry.py +++ b/esrally/telemetry.py @@ -1776,6 +1776,7 @@ def on_benchmark_start(self): return distribution_flavor = client_info["version"].get("build_flavor", "oss") # serverless returns dummy build hash which gets overridden when running with operator privileges + # TODO: refactor if config object gets included in telemetry base class (ES-6459) revision = client_info["version"].get("build_hash", distribution_flavor) if self.revision_override: revision = self.revision_override