Skip to content

Commit

Permalink
Retrieve serverless build hash from nodes info API (#1756)
Browse files Browse the repository at this point in the history
This commit adds retrieval of build hash for serverless clusters from
nodes info API (`_nodes?filter_path=**.build_hash`).

The commit introduces 2 configuration settings under `driver` section:
* `serverless.mode` - equals true if Rally targets serverless cluster,
* `serverless.operator` - equals true if Elasticsearch user has operator
   privileges.
  • Loading branch information
gbanasiak authored Aug 7, 2023
1 parent 2479411 commit a7387ae
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 7 deletions.
23 changes: 21 additions & 2 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def create_es_clients(self):
).create()
return es

def prepare_telemetry(self, es, enable, index_names, data_stream_names):
def prepare_telemetry(self, es, enable, index_names, data_stream_names, build_hash):
enabled_devices = self.config.opts("telemetry", "devices")
telemetry_params = self.config.opts("telemetry", "params")
log_root = paths.race_root(self.config)
Expand All @@ -626,7 +626,7 @@ def prepare_telemetry(self, es, enable, index_names, data_stream_names):
devices = [
telemetry.NodeStats(telemetry_params, es, self.metrics_store),
telemetry.ExternalEnvironmentInfo(es_default, self.metrics_store),
telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store),
telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store, build_hash),
telemetry.JvmStatsSummary(es_default, self.metrics_store),
telemetry.IndexStats(es_default, self.metrics_store),
telemetry.MlBucketProcessingTime(es_default, self.metrics_store),
Expand Down Expand Up @@ -661,6 +661,17 @@ def retrieve_cluster_info(self, es):
self.logger.exception("Could not retrieve cluster info on benchmark start")
return None

def retrieve_build_hash_from_nodes_info(self, es):
try:
nodes_info = es["default"].nodes.info(filter_path="**.build_hash")
nodes = nodes_info["nodes"]
# assumption: build hash is the same across all the nodes
first_node_id = next(iter(nodes))
return nodes[first_node_id]["build_hash"]
except BaseException:
self.logger.exception("Could not retrieve build hash from nodes info")
return None

def create_api_key(self, es, client_id):
self.logger.debug("Creating ES API key for client [%s].", client_id)
try:
Expand Down Expand Up @@ -689,13 +700,20 @@ def prepare_benchmark(self, t):

skip_rest_api_check = self.config.opts("mechanic", "skip.rest.api.check")
uses_static_responses = self.config.opts("client", "options").uses_static_responses
build_hash = None
if skip_rest_api_check:
self.logger.info("Skipping REST API check as requested explicitly.")
elif uses_static_responses:
self.logger.info("Skipping REST API check as static responses are used.")
else:
self.wait_for_rest_api(es_clients)
self.target.cluster_details = self.retrieve_cluster_info(es_clients)
serverless_mode = self.config.opts("driver", "serverless.mode", default_value=False, mandatory=False)
serverless_operator = self.config.opts("driver", "serverless.operator", default_value=False, mandatory=False)
if serverless_mode and serverless_operator:
build_hash = self.retrieve_build_hash_from_nodes_info(es_clients)
self.logger.info("Retrieved actual build hash [%s] from serverless cluster.", build_hash)
self.target.cluster_details["version"]["build_hash"] = build_hash

# Avoid issuing any requests to the target cluster when static responses are enabled. The results
# are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs.
Expand All @@ -704,6 +722,7 @@ def prepare_benchmark(self, t):
enable=not uses_static_responses,
index_names=self.track.index_names(),
data_stream_names=self.track.data_stream_names(),
build_hash=build_hash,
)

for host in self.config.opts("driver", "load_driver_hosts"):
Expand Down
4 changes: 4 additions & 0 deletions esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ def setup(self, sources=False):
raise exceptions.SystemSetupError(
f"Cluster version must be at least [{min_es_version}] but was [{distribution_version}]"
)
else:
self.cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True)
# operator privileges assumed for now
self.cfg.add(config.Scope.benchmark, "driver", "serverless.operator", True)

self.current_track = track.load_track(self.cfg, install_dependencies=True)
self.track_revision = self.cfg.opts("track", "repository.revision", mandatory=False)
Expand Down
8 changes: 6 additions & 2 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1761,10 +1761,11 @@ class ClusterEnvironmentInfo(InternalTelemetryDevice):
Gathers static environment information on a cluster level (e.g. version numbers).
"""

def __init__(self, client, metrics_store):
def __init__(self, client, metrics_store, revision_override):
super().__init__()
self.metrics_store = metrics_store
self.client = client
self.revision_override = revision_override

def on_benchmark_start(self):
# noinspection PyBroadException
Expand All @@ -1774,8 +1775,11 @@ def on_benchmark_start(self):
self.logger.exception("Could not retrieve cluster version info")
return
distribution_flavor = client_info["version"].get("build_flavor", "oss")
# build hash will only be available on serverless if the client has operator privs
# serverless returns dummy build hash which gets overridden when running with operator privileges
# TODO: refactor if config object gets included in telemetry base class (ES-6459)
revision = client_info["version"].get("build_hash", distribution_flavor)
if self.revision_override:
revision = self.revision_override
# build version does not exist for serverless
distribution_version = client_info["version"].get("number", distribution_flavor)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "source_revision", revision)
Expand Down
70 changes: 69 additions & 1 deletion tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,43 @@ def create(self):
def close(cls):
TestDriver.StaticClientFactory.PATCHER.stop()

class StaticServerlessClientFactory:
PATCHER = None

def __init__(self, *args, **kwargs):
TestDriver.StaticClientFactory.PATCHER = mock.patch("elasticsearch.Elasticsearch")
self.es = TestDriver.StaticClientFactory.PATCHER.start()
self.es.info.return_value = {
"name": "serverless",
"cluster_name": "serverless",
"cluster_uuid": "4bbPT0Z6SsuODSz_vG1umA",
"version": {
"number": "8.10.0",
"build_flavor": "serverless",
"build_type": "docker",
"build_hash": "00000000",
"build_date": "2023-10-31",
"build_snapshot": False,
"lucene_version": "9.7.0",
"minimum_wire_compatibility_version": "8.10.0",
"minimum_index_compatibility_version": "8.10.0",
},
"tagline": "You Know, for Search",
}
self.es.nodes.info.return_value = {
"nodes": {
"PNAuTQt3Seum5BpPleo0wA": {"build_hash": "5f626ea4014dc029b8ae3f0bca06944975bf2d80"},
"i9FafotsSSOrOZdDyhA2Ng": {"build_hash": "5f626ea4014dc029b8ae3f0bca06944975bf2d80"},
}
}

def create(self):
return self.es

@classmethod
def close(cls):
TestDriver.StaticClientFactory.PATCHER.stop()

def setup_method(self, method):
self.cfg = config.Config()
self.cfg.add(config.Scope.application, "system", "env.name", "unittest")
Expand All @@ -100,7 +137,12 @@ def setup_method(self, method):
self.cfg.add(config.Scope.application, "telemetry", "devices", [])
self.cfg.add(config.Scope.application, "telemetry", "params", {})
self.cfg.add(config.Scope.application, "mechanic", "car.names", ["default"])
self.cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", True)
if method == self.test_prepare_serverless_benchmark: # pylint: disable=comparison-with-callable
self.cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False)
self.cfg.add(config.Scope.application, "driver", "serverless.mode", True)
self.cfg.add(config.Scope.application, "driver", "serverless.operator", True)
else:
self.cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", True)
self.cfg.add(config.Scope.application, "client", "hosts", self.Holder(all_hosts={"default": ["localhost:9200"]}))
self.cfg.add(config.Scope.application, "client", "options", self.Holder(all_client_options={"default": {}}))
self.cfg.add(config.Scope.application, "driver", "load_driver_hosts", ["localhost"])
Expand Down Expand Up @@ -147,6 +189,32 @@ def test_start_benchmark_and_prepare_track(self, resolve):
# Did we start all load generators? There is no specific mock assert for this...
assert target.start_worker.call_count == 4

# mocking DriverActor.prepare_track() only to complete Driver.prepare_benchmark()
@mock.patch.object(driver.DriverActor, "prepare_track")
def test_prepare_serverless_benchmark(self, mock_method):
driver_actor = driver.DriverActor
d = driver.Driver(driver_actor, self.cfg, es_client_factory_class=self.StaticServerlessClientFactory)
d.prepare_benchmark(t=self.track)

# was build hash determined correctly?
assert driver_actor.cluster_details == {
"name": "serverless",
"cluster_name": "serverless",
"cluster_uuid": "4bbPT0Z6SsuODSz_vG1umA",
"version": {
"number": "8.10.0",
"build_flavor": "serverless",
"build_type": "docker",
"build_hash": "5f626ea4014dc029b8ae3f0bca06944975bf2d80", # <--- THIS
"build_date": "2023-10-31",
"build_snapshot": False,
"lucene_version": "9.7.0",
"minimum_wire_compatibility_version": "8.10.0",
"minimum_index_compatibility_version": "8.10.0",
},
"tagline": "You Know, for Search",
}

def test_assign_drivers_round_robin(self):
target = self.create_test_driver_target()
d = driver.Driver(target, self.cfg, es_client_factory_class=self.StaticClientFactory)
Expand Down
4 changes: 2 additions & 2 deletions tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3256,7 +3256,7 @@ def test_stores_cluster_level_metrics_on_attach(self, metrics_store_add_meta_inf
cfg = create_config()
client = Client(nodes=SubClient(info=nodes_info), info=cluster_info)
metrics_store = metrics.EsMetricsStore(cfg)
env_device = telemetry.ClusterEnvironmentInfo(client, metrics_store)
env_device = telemetry.ClusterEnvironmentInfo(client, metrics_store, None)
t = telemetry.Telemetry(cfg, devices=[env_device])
t.on_benchmark_start()
calls = [
Expand All @@ -3282,7 +3282,7 @@ def test_resilient_if_error_response(self, metrics_store_add_meta_info):
cfg = create_config()
client = Client(nodes=SubClient(stats=raiseTransportError, info=raiseTransportError), info=raiseTransportError)
metrics_store = metrics.EsMetricsStore(cfg)
env_device = telemetry.ClusterEnvironmentInfo(client, metrics_store)
env_device = telemetry.ClusterEnvironmentInfo(client, metrics_store, None)
t = telemetry.Telemetry(cfg, devices=[env_device])
t.on_benchmark_start()

Expand Down

0 comments on commit a7387ae

Please sign in to comment.