From dc8c2982d7ada51d9b4de96017946636cbff89b1 Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Thu, 7 Jul 2022 18:44:58 +0300 Subject: [PATCH 1/9] Add new wait-for-current-snapshots-create operation This commit adds a new runner `WaitForCurrentSnapshotsCreate`, with the corresponding operation name `wait-for-current-snapshots-create` that waits until all current snapshots for a given repository have completed. Closes https://github.com/elastic/rally/issues/1537 --- esrally/driver/runner.py | 54 +++++++++++ esrally/track/track.py | 1 + tests/driver/runner_test.py | 174 ++++++++++++++++++++++++++++++++++++ 3 files changed, 229 insertions(+) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index ed64095c5..678cee0bb 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -91,6 +91,7 @@ def register_default_runners(): register_runner(track.OperationType.DeleteSnapshotRepository, Retry(DeleteSnapshotRepository()), async_runner=True) register_runner(track.OperationType.CreateSnapshotRepository, Retry(CreateSnapshotRepository()), async_runner=True) register_runner(track.OperationType.WaitForSnapshotCreate, Retry(WaitForSnapshotCreate()), async_runner=True) + register_runner(track.OperationType.WaitForCurrentSnapshotsCreate, Retry(WaitForCurrentSnapshotsCreate()), async_runner=True) register_runner(track.OperationType.WaitForRecovery, Retry(IndicesRecovery()), async_runner=True) register_runner(track.OperationType.PutSettings, Retry(PutSettings()), async_runner=True) register_runner(track.OperationType.CreateTransform, Retry(CreateTransform()), async_runner=True) @@ -1982,6 +1983,10 @@ def __repr__(self, *args, **kwargs): class WaitForSnapshotCreate(Runner): + """ + Waits until a currently running on a given repository has finished successfully and returns detailed metrics. + """ + async def __call__(self, es, params): repository = mandatory(params, "repository", repr(self)) snapshot = mandatory(params, "snapshot", repr(self)) @@ -2032,6 +2037,55 @@ def __repr__(self, *args, **kwargs): return "wait-for-snapshot-create" +class WaitForCurrentSnapshotsCreate(Runner): + """ + Waits until all currently running snapshots on a given repository have completed + """ + + async def __call__(self, es, params): + repository = mandatory(params, "repository", repr(self)) + wait_period = params.get("completion-recheck-wait-period", 5) + es_info = await es.info() + es_version = tuple([int(part) for part in es_info["version"]["number"].split(".")[:2]]) + api = es.snapshot.get + request_args = {"repository": repository, "snapshot": "_current", "verbose": False} + + if es_version >= (8, 3): + request_params, headers = self._transport_request_params(params) + headers["Content-Type"] = "application/json" + + # significantly reduce response size when lots of snapshots have been taken + # https://github.com/elastic/elasticsearch/pull/86269 + request_params["index_names"] = False + request_params["verbose"] = False + + request_args = { + "method": "GET", + "path": f"_snapshot/{repository}/_current", + "headers": headers, + "params": request_params, + } + + # TODO: Switch to native es.snapshot.get once `index_names` becomes supported in + # `es.snapshot.get` of the elasticsearch-py client and we've upgraded the client in Rally, see: + # https://elasticsearch-py.readthedocs.io/en/latest/api.html#elasticsearch.client.SnapshotClient.get + api = es.perform_request + + while True: + response = await api(**request_args) + + if int(response.get("total")) > 0: + await asyncio.sleep(wait_period) + continue + break + + # getting details stats per snapshot can be very expensive. + # return nothing and rely on Rally's own service_time measurement for the duration + + def __repr__(self, *args, **kwargs): + return "wait-for-current-snapshots-create" + + class RestoreSnapshot(Runner): """ Restores a snapshot from an already registered repository diff --git a/esrally/track/track.py b/esrally/track/track.py index 4b3dc799c..c26f61862 100644 --- a/esrally/track/track.py +++ b/esrally/track/track.py @@ -686,6 +686,7 @@ class OperationType(Enum): Sql = 16 FieldCaps = 17 CompositeAgg = 18 + WaitForCurrentSnapshotsCreate = 19 # administrative actions ForceMerge = 1001 diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 87db65446..4f99ec39b 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -3965,6 +3965,180 @@ async def test_wait_for_snapshot_create_failure(self, es): ) +class TestWaitForCurrentSnapshotsCreate: + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_wait_for_current_snapshots_create_before_8_3_0(self, es): + es.info = mock.AsyncMock( + return_value={ + "name": "es01", + "cluster_name": "escluster", + "cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A", + "version": { + "number": "7.17.3", + "build_flavor": "default", + "build_type": "tar", + "build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff", + "build_date": "2022-04-19T08:11:19.070913226Z", + "build_snapshot": False, + "lucene_version": "8.11.1", + "minimum_wire_compatibility_version": "6.8.0", + "minimum_index_compatibility_version": "6.0.0-beta1" + }, + "tagline": "You Know, for Search", + }, + ) + + es.snapshot.get = mock.AsyncMock( + side_effect=[ + { + "snapshots": [ + { + "snapshot": "logging-test-0", + "uuid": "xVb4cPSKTfyIz-HcN9mQcg", + "repository": "many-shards", + "data_streams": [], + "state": "IN_PROGRESS", + "indices": ["a", "b", "c"], + }, + { + "snapshot": "logging-test-1", + "uuid": "LEHRHopiTrqemFkGXQijHw", + "repository": "many-shards", + "data_streams": [], + "state": "IN_PROGRESS", + "indices": ["d", "e", "f"], + }, + { + "snapshot": "logging-test-2", + "uuid": "9DJcMb5JQruddQbO3qzxSA", + "repository": "many-shards", + "data_streams": [], + "state": "IN_PROGRESS", + "indices": ["g", "h", "i"], + }, + { + "snapshot": "logging-test-3", + "uuid": "5YmhlUBxRv6pQbswf4nsfw", + "repository": "many-shards", + "data_streams": [], + "state": "IN_PROGRESS", + "indices": ["j", "k", "l"], + } + ], + "total": 4, + "remaining": 0 + }, + { + "snapshots": [], + "total": 0, + "remaining": 0 + }, + ], + ) + + repository = "many-shards" + task_params = { + "repository": repository, + "completion-recheck-wait-period": 0, + } + + r = runner.WaitForCurrentSnapshotsCreate() + result = await r(es, task_params) + + es.snapshot.get.assert_awaited_with(repository=repository, snapshot="_current", verbose=False) + + assert es.snapshot.get.await_count == 2 + + assert result is None + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_wait_for_current_snapshots_create_after_8_3_0(self, es): + es.info = mock.AsyncMock( + return_value={ + "name": "elasticsearch-0", + "cluster_name": "rally-benchmark", + "cluster_uuid": "rs5Gdzm6TISd-L4KZWVW4w", + "version": { + "number": "8.4.0-SNAPSHOT", + "build_type": "tar", + "build_hash": "4e18993f55431f6d3890049c9fea4c6c8218f070", + "build_date": "2022-07-05T00:19:44.887353304Z", + "build_snapshot": True, + "lucene_version": "9.3.0", + "minimum_wire_compatibility_version": "7.17.0", + "minimum_index_compatibility_version": "7.0.0" + }, + "tagline": "You Know, for Search", + }, + ) + + repository = "many-shards" + task_params = { + "repository": repository, + "completion-recheck-wait-period": 0, + } + + es.perform_request = mock.AsyncMock( + side_effect=[ + { + "snapshots": [ + { + "snapshot": "logging-test-0", + "uuid": "xVb4cPSKTfyIz-HcN9mQcg", + "repository": "many-shards", + "data_streams": [], + "state": "IN_PROGRESS", + }, + { + "snapshot": "logging-test-1", + "uuid": "LEHRHopiTrqemFkGXQijHw", + "repository": "many-shards", + "data_streams": [], + "state": "IN_PROGRESS", + }, + { + "snapshot": "logging-test-2", + "uuid": "9DJcMb5JQruddQbO3qzxSA", + "repository": "many-shards", + "data_streams": [], + "state": "IN_PROGRESS", + }, + { + "snapshot": "logging-test-3", + "uuid": "5YmhlUBxRv6pQbswf4nsfw", + "repository": "many-shards", + "data_streams": [], + "state": "IN_PROGRESS", + } + ], + "total": 4, + "remaining": 0 + }, + { + "snapshots": [], + "total": 0, + "remaining": 0 + }, + ], + ) + + r = runner.WaitForCurrentSnapshotsCreate() + result = await r(es, task_params) + + es.perform_request.assert_awaited_with( + method="GET", + path=f"_snapshot/{repository}/_current", + headers={"Content-Type": "application/json"}, + params={"index_names": False, "verbose": False}, + ) + + assert es.perform_request.await_count == 2 + + assert result is None + + class TestRestoreSnapshot: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio From 3ac7c2b49021e157d4f475fee60013617a6880a4 Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Fri, 8 Jul 2022 15:20:07 +0300 Subject: [PATCH 2/9] Bug fixes and docs --- docs/track.rst | 21 +++++++++++++++++++++ esrally/driver/runner.py | 6 +++--- tests/driver/runner_test.py | 2 +- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/docs/track.rst b/docs/track.rst index df4b217d3..fbc52363f 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -2412,6 +2412,27 @@ Meta-data * ``duration``: The time it took (in milliseconds) to create the snapshot. * ``file_count``: The total number of files in the snapshot. +wait-for-current-snapshots-create +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +With the operation ``wait-for-current-snapshots-create`` you can wait until all currently running snapshots within a defined repository have completed. +This operation is useful after issuing many ``create-snapshots``, to benchmark snapshotting performance and related limitations in Elasticsearch. + +It exits when ``total`` in the response body of `GET _snapshot `_ equals ``0``. + +Properties +"""""""""" + +* ``repository`` (mandatory): The name of the snapshot repository to use. +* ``completion-recheck-wait-period`` (optional, defaults to 1 second): Time in seconds to wait in between consecutive attempts. + +This operation is :ref:`retryable `. + +Meta-data +""""""""" + +This operation returns no meta-data. + restore-snapshot ~~~~~~~~~~~~~~~~ diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 678cee0bb..83b11e97d 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2044,7 +2044,7 @@ class WaitForCurrentSnapshotsCreate(Runner): async def __call__(self, es, params): repository = mandatory(params, "repository", repr(self)) - wait_period = params.get("completion-recheck-wait-period", 5) + wait_period = params.get("completion-recheck-wait-period", 1) es_info = await es.info() es_version = tuple([int(part) for part in es_info["version"]["number"].split(".")[:2]]) api = es.snapshot.get @@ -2056,8 +2056,8 @@ async def __call__(self, es, params): # significantly reduce response size when lots of snapshots have been taken # https://github.com/elastic/elasticsearch/pull/86269 - request_params["index_names"] = False - request_params["verbose"] = False + request_params["index_names"] = "false" + request_params["verbose"] = "false" request_args = { "method": "GET", diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 4f99ec39b..60700d587 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -4131,7 +4131,7 @@ async def test_wait_for_current_snapshots_create_after_8_3_0(self, es): method="GET", path=f"_snapshot/{repository}/_current", headers={"Content-Type": "application/json"}, - params={"index_names": False, "verbose": False}, + params={"index_names": "false", "verbose": "false"}, ) assert es.perform_request.await_count == 2 From a78a5bd2dbdc9d3bff059dacfd5e199e869c33e1 Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Fri, 8 Jul 2022 16:15:02 +0300 Subject: [PATCH 3/9] Clarification --- esrally/driver/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 83b11e97d..a8198d61e 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2079,8 +2079,8 @@ async def __call__(self, es, params): continue break - # getting details stats per snapshot can be very expensive. - # return nothing and rely on Rally's own service_time measurement for the duration + # getting detailed stats per snapshot using the snapshot status api can be very expensive. + # return nothing and rely on Rally's own service_time measurement for the duration. def __repr__(self, *args, **kwargs): return "wait-for-current-snapshots-create" From ac80c61ed99e1a8c99f9b73ae9b8216eba34781d Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Fri, 8 Jul 2022 16:16:00 +0300 Subject: [PATCH 4/9] Formatting --- tests/driver/runner_test.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 60700d587..31ef71632 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -3983,7 +3983,7 @@ async def test_wait_for_current_snapshots_create_before_8_3_0(self, es): "build_snapshot": False, "lucene_version": "8.11.1", "minimum_wire_compatibility_version": "6.8.0", - "minimum_index_compatibility_version": "6.0.0-beta1" + "minimum_index_compatibility_version": "6.0.0-beta1", }, "tagline": "You Know, for Search", }, @@ -4024,16 +4024,12 @@ async def test_wait_for_current_snapshots_create_before_8_3_0(self, es): "data_streams": [], "state": "IN_PROGRESS", "indices": ["j", "k", "l"], - } + }, ], "total": 4, - "remaining": 0 - }, - { - "snapshots": [], - "total": 0, - "remaining": 0 + "remaining": 0, }, + {"snapshots": [], "total": 0, "remaining": 0}, ], ) @@ -4068,7 +4064,7 @@ async def test_wait_for_current_snapshots_create_after_8_3_0(self, es): "build_snapshot": True, "lucene_version": "9.3.0", "minimum_wire_compatibility_version": "7.17.0", - "minimum_index_compatibility_version": "7.0.0" + "minimum_index_compatibility_version": "7.0.0", }, "tagline": "You Know, for Search", }, @@ -4111,16 +4107,12 @@ async def test_wait_for_current_snapshots_create_after_8_3_0(self, es): "repository": "many-shards", "data_streams": [], "state": "IN_PROGRESS", - } + }, ], "total": 4, - "remaining": 0 - }, - { - "snapshots": [], - "total": 0, - "remaining": 0 + "remaining": 0, }, + {"snapshots": [], "total": 0, "remaining": 0}, ], ) From 0a3833472e0a5390d7be3d38b4ae22cf57d62092 Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Fri, 8 Jul 2022 16:18:30 +0300 Subject: [PATCH 5/9] Missing conditional --- esrally/track/track.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/esrally/track/track.py b/esrally/track/track.py index c26f61862..1f44c85d3 100644 --- a/esrally/track/track.py +++ b/esrally/track/track.py @@ -810,6 +810,8 @@ def from_hyphenated_string(cls, v): return OperationType.CreateSnapshot elif v == "wait-for-snapshot-create": return OperationType.WaitForSnapshotCreate + elif v == "wait-for-current-snapshots-create": + return OperationType.WaitForCurrentSnapshotsCreate elif v == "restore-snapshot": return OperationType.RestoreSnapshot elif v == "wait-for-recovery": From 68951ec8ab0a845b73dff3aabc419713e317c10c Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Mon, 8 Aug 2022 10:19:00 +0300 Subject: [PATCH 6/9] PR comment, re use utils.versions.Version --- esrally/driver/runner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index a8198d61e..1e5e5b75d 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -35,6 +35,7 @@ import ijson from esrally import exceptions, track +from esrally.utils.versions import Version # Mapping from operation type to specific runner @@ -2046,11 +2047,11 @@ async def __call__(self, es, params): repository = mandatory(params, "repository", repr(self)) wait_period = params.get("completion-recheck-wait-period", 1) es_info = await es.info() - es_version = tuple([int(part) for part in es_info["version"]["number"].split(".")[:2]]) + es_version = Version.from_string(es_info["version"]["number"]) api = es.snapshot.get request_args = {"repository": repository, "snapshot": "_current", "verbose": False} - if es_version >= (8, 3): + if (es_version.major, es_version.minor) >= (8, 3): request_params, headers = self._transport_request_params(params) headers["Content-Type"] = "application/json" From 45f598b6b413333ff54ff76ff5840f0e74cb56b9 Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Mon, 8 Aug 2022 10:20:53 +0300 Subject: [PATCH 7/9] Fix docs --- docs/track.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/track.rst b/docs/track.rst index fbc52363f..927349a64 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -2416,7 +2416,7 @@ wait-for-current-snapshots-create ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ With the operation ``wait-for-current-snapshots-create`` you can wait until all currently running snapshots within a defined repository have completed. -This operation is useful after issuing many ``create-snapshots``, to benchmark snapshotting performance and related limitations in Elasticsearch. +This operation is useful after issuing many ``create-snapshot``, to benchmark snapshotting performance and related limitations in Elasticsearch. It exits when ``total`` in the response body of `GET _snapshot `_ equals ``0``. From e482d72bacc74920de11b608bb2f70175299be3c Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Mon, 8 Aug 2022 10:27:53 +0300 Subject: [PATCH 8/9] Improve comment --- esrally/driver/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 1e5e5b75d..4ee838a7e 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2051,12 +2051,12 @@ async def __call__(self, es, params): api = es.snapshot.get request_args = {"repository": repository, "snapshot": "_current", "verbose": False} + # significantly reduce response size when lots of snapshots have been taken + # only available since ES 8.3.0 (https://github.com/elastic/elasticsearch/pull/86269) if (es_version.major, es_version.minor) >= (8, 3): request_params, headers = self._transport_request_params(params) headers["Content-Type"] = "application/json" - # significantly reduce response size when lots of snapshots have been taken - # https://github.com/elastic/elasticsearch/pull/86269 request_params["index_names"] = "false" request_params["verbose"] = "false" From 87b7040cf816bdbbd53272c356cfba8a27abf504 Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Mon, 8 Aug 2022 10:37:08 +0300 Subject: [PATCH 9/9] PR comment, simplify conditional --- esrally/driver/runner.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 4ee838a7e..20b798083 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2075,10 +2075,10 @@ async def __call__(self, es, params): while True: response = await api(**request_args) - if int(response.get("total")) > 0: - await asyncio.sleep(wait_period) - continue - break + if int(response.get("total")) == 0: + break + + await asyncio.sleep(wait_period) # getting detailed stats per snapshot using the snapshot status api can be very expensive. # return nothing and rely on Rally's own service_time measurement for the duration.