Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieve serverless build hash from nodes info API #1756

Merged
merged 7 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,17 @@ def retrieve_cluster_info(self, es):
self.logger.exception("Could not retrieve cluster info on benchmark start")
return None

def retrieve_build_hash_from_nodes_info(self, es):
try:
nodes_info = es["default"].nodes.info(filter_path="**.build_hash")
nodes = nodes_info["nodes"]
# assumption: build hash is the same across all the nodes
first_node_id = next(iter(nodes))
return nodes[first_node_id]["build_hash"]
except BaseException:
self.logger.exception("Could not retrieve build hash from nodes info")
return None

def create_api_key(self, es, client_id):
self.logger.debug("Creating ES API key for client [%s].", client_id)
try:
Expand Down Expand Up @@ -696,6 +707,12 @@ def prepare_benchmark(self, t):
else:
self.wait_for_rest_api(es_clients)
self.target.cluster_details = self.retrieve_cluster_info(es_clients)
serverless_mode = self.config.opts("driver", "serverless.mode", default_value=False, mandatory=False)
serverless_operator = self.config.opts("driver", "serverless.operator", default_value=False, mandatory=False)
if serverless_mode and serverless_operator:
build_hash = self.retrieve_build_hash_from_nodes_info(es_clients)
self.logger.info("Retrieved actual build hash [%s] from serverless cluster.", build_hash)
self.target.cluster_details["version"]["build_hash"] = build_hash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the stats in the metric store?

There is some code in

rally/esrally/telemetry.py

Lines 1778 to 1781 in c3b04f4

revision = client_info["version"].get("build_hash", distribution_flavor)
# build version does not exist for serverless
distribution_version = client_info["version"].get("number", distribution_flavor)
self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "source_revision", revision)
and in my tests it results in

image

(correctly)

but

image

whereas I see in the logs Retrieved actual build hash [ad5c80e1b49f10b42d438253d7cc0b9753c156b9] from serverless cluster.

Additionally, shouldn't it also be collected by the node stats telemetry device?

Copy link
Contributor Author

@gbanasiak gbanasiak Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Telemetry device modifications are scoped separately in ES-6459, but after discussion with @dliappis we concluded the necessary changes to override build hash (revision) in ClusterEnvironmentInfo are small, so I went ahead and added da34a7d. PTAL.

Edit: Additional changes to telemetry devices will go into separate PRs.


# Avoid issuing any requests to the target cluster when static responses are enabled. The results
# are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs.
Expand Down
4 changes: 4 additions & 0 deletions esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ def setup(self, sources=False):
raise exceptions.SystemSetupError(
f"Cluster version must be at least [{min_es_version}] but was [{distribution_version}]"
)
else:
self.cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True)
# operator privileges assumed for now
self.cfg.add(config.Scope.benchmark, "driver", "serverless.operator", True)

self.current_track = track.load_track(self.cfg, install_dependencies=True)
self.track_revision = self.cfg.opts("track", "repository.revision", mandatory=False)
Expand Down
70 changes: 69 additions & 1 deletion tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,43 @@ def create(self):
def close(cls):
TestDriver.StaticClientFactory.PATCHER.stop()

class StaticServerlessClientFactory:
PATCHER = None

def __init__(self, *args, **kwargs):
TestDriver.StaticClientFactory.PATCHER = mock.patch("elasticsearch.Elasticsearch")
self.es = TestDriver.StaticClientFactory.PATCHER.start()
self.es.info.return_value = {
"name": "serverless",
"cluster_name": "serverless",
"cluster_uuid": "4bbPT0Z6SsuODSz_vG1umA",
"version": {
"number": "8.10.0",
"build_flavor": "serverless",
"build_type": "docker",
"build_hash": "00000000",
"build_date": "2023-10-31",
"build_snapshot": False,
"lucene_version": "9.7.0",
"minimum_wire_compatibility_version": "8.10.0",
"minimum_index_compatibility_version": "8.10.0",
},
"tagline": "You Know, for Search",
}
self.es.nodes.info.return_value = {
"nodes": {
"PNAuTQt3Seum5BpPleo0wA": {"build_hash": "5f626ea4014dc029b8ae3f0bca06944975bf2d80"},
"i9FafotsSSOrOZdDyhA2Ng": {"build_hash": "5f626ea4014dc029b8ae3f0bca06944975bf2d80"},
}
}

def create(self):
return self.es

@classmethod
def close(cls):
TestDriver.StaticClientFactory.PATCHER.stop()

def setup_method(self, method):
self.cfg = config.Config()
self.cfg.add(config.Scope.application, "system", "env.name", "unittest")
Expand All @@ -100,7 +137,12 @@ def setup_method(self, method):
self.cfg.add(config.Scope.application, "telemetry", "devices", [])
self.cfg.add(config.Scope.application, "telemetry", "params", {})
self.cfg.add(config.Scope.application, "mechanic", "car.names", ["default"])
self.cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", True)
if method == self.test_prepare_serverless_benchmark: # pylint: disable=comparison-with-callable
self.cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", False)
self.cfg.add(config.Scope.application, "driver", "serverless.mode", True)
self.cfg.add(config.Scope.application, "driver", "serverless.operator", True)
else:
self.cfg.add(config.Scope.application, "mechanic", "skip.rest.api.check", True)
self.cfg.add(config.Scope.application, "client", "hosts", self.Holder(all_hosts={"default": ["localhost:9200"]}))
self.cfg.add(config.Scope.application, "client", "options", self.Holder(all_client_options={"default": {}}))
self.cfg.add(config.Scope.application, "driver", "load_driver_hosts", ["localhost"])
Expand Down Expand Up @@ -147,6 +189,32 @@ def test_start_benchmark_and_prepare_track(self, resolve):
# Did we start all load generators? There is no specific mock assert for this...
assert target.start_worker.call_count == 4

# mocking DriverActor.prepare_track() only to complete Driver.prepare_benchmark()
@mock.patch.object(driver.DriverActor, "prepare_track")
def test_prepare_serverless_benchmark(self, mock_method):
driver_actor = driver.DriverActor
d = driver.Driver(driver_actor, self.cfg, es_client_factory_class=self.StaticServerlessClientFactory)
d.prepare_benchmark(t=self.track)

# was build hash determined correctly?
assert driver_actor.cluster_details == {
"name": "serverless",
"cluster_name": "serverless",
"cluster_uuid": "4bbPT0Z6SsuODSz_vG1umA",
"version": {
"number": "8.10.0",
"build_flavor": "serverless",
"build_type": "docker",
"build_hash": "5f626ea4014dc029b8ae3f0bca06944975bf2d80", # <--- THIS
"build_date": "2023-10-31",
"build_snapshot": False,
"lucene_version": "9.7.0",
"minimum_wire_compatibility_version": "8.10.0",
"minimum_index_compatibility_version": "8.10.0",
},
"tagline": "You Know, for Search",
}

def test_assign_drivers_round_robin(self):
target = self.create_test_driver_target()
d = driver.Driver(target, self.cfg, es_client_factory_class=self.StaticClientFactory)
Expand Down