Skip to content

Commit

Permalink
Add new wait-for-current-snapshots-create operation (#1542)
Browse files Browse the repository at this point in the history
This commit adds a new runner `WaitForCurrentSnapshotsCreate`, with the
corresponding operation name `wait-for-current-snapshots-create` that
waits until all current snapshots for a given repository have completed.

Closes #1537
  • Loading branch information
dliappis authored Aug 9, 2022
1 parent b0027e7 commit 6edfa43
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 0 deletions.
21 changes: 21 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2410,6 +2410,27 @@ Meta-data
* ``duration``: The time it took (in milliseconds) to create the snapshot.
* ``file_count``: The total number of files in the snapshot.

wait-for-current-snapshots-create
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

With the operation ``wait-for-current-snapshots-create`` you can wait until all currently running snapshots within a defined repository have completed.
This operation is useful after issuing many ``create-snapshot``, to benchmark snapshotting performance and related limitations in Elasticsearch.

It exits when ``total`` in the response body of `GET _snapshot <https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-api.html>`_ equals ``0``.

Properties
""""""""""

* ``repository`` (mandatory): The name of the snapshot repository to use.
* ``completion-recheck-wait-period`` (optional, defaults to 1 second): Time in seconds to wait in between consecutive attempts.

This operation is :ref:`retryable <track_operations>`.

Meta-data
"""""""""

This operation returns no meta-data.

restore-snapshot
~~~~~~~~~~~~~~~~

Expand Down
55 changes: 55 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import ijson

from esrally import exceptions, track
from esrally.utils.versions import Version

# Mapping from operation type to specific runner

Expand Down Expand Up @@ -91,6 +92,7 @@ def register_default_runners():
register_runner(track.OperationType.DeleteSnapshotRepository, Retry(DeleteSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.CreateSnapshotRepository, Retry(CreateSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.WaitForSnapshotCreate, Retry(WaitForSnapshotCreate()), async_runner=True)
register_runner(track.OperationType.WaitForCurrentSnapshotsCreate, Retry(WaitForCurrentSnapshotsCreate()), async_runner=True)
register_runner(track.OperationType.WaitForRecovery, Retry(IndicesRecovery()), async_runner=True)
register_runner(track.OperationType.PutSettings, Retry(PutSettings()), async_runner=True)
register_runner(track.OperationType.CreateTransform, Retry(CreateTransform()), async_runner=True)
Expand Down Expand Up @@ -1982,6 +1984,10 @@ def __repr__(self, *args, **kwargs):


class WaitForSnapshotCreate(Runner):
"""
Waits until a currently running <snapshot> on a given repository has finished successfully and returns detailed metrics.
"""

async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
Expand Down Expand Up @@ -2032,6 +2038,55 @@ def __repr__(self, *args, **kwargs):
return "wait-for-snapshot-create"


class WaitForCurrentSnapshotsCreate(Runner):
"""
Waits until all currently running snapshots on a given repository have completed
"""

async def __call__(self, es, params):
repository = mandatory(params, "repository", repr(self))
wait_period = params.get("completion-recheck-wait-period", 1)
es_info = await es.info()
es_version = Version.from_string(es_info["version"]["number"])
api = es.snapshot.get
request_args = {"repository": repository, "snapshot": "_current", "verbose": False}

# significantly reduce response size when lots of snapshots have been taken
# only available since ES 8.3.0 (https://github.com/elastic/elasticsearch/pull/86269)
if (es_version.major, es_version.minor) >= (8, 3):
request_params, headers = self._transport_request_params(params)
headers["Content-Type"] = "application/json"

request_params["index_names"] = "false"
request_params["verbose"] = "false"

request_args = {
"method": "GET",
"path": f"_snapshot/{repository}/_current",
"headers": headers,
"params": request_params,
}

# TODO: Switch to native es.snapshot.get once `index_names` becomes supported in
# `es.snapshot.get` of the elasticsearch-py client and we've upgraded the client in Rally, see:
# https://elasticsearch-py.readthedocs.io/en/latest/api.html#elasticsearch.client.SnapshotClient.get
api = es.perform_request

while True:
response = await api(**request_args)

if int(response.get("total")) == 0:
break

await asyncio.sleep(wait_period)

# getting detailed stats per snapshot using the snapshot status api can be very expensive.
# return nothing and rely on Rally's own service_time measurement for the duration.

def __repr__(self, *args, **kwargs):
return "wait-for-current-snapshots-create"


class RestoreSnapshot(Runner):
"""
Restores a snapshot from an already registered repository
Expand Down
3 changes: 3 additions & 0 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ class OperationType(Enum):
Sql = 16
FieldCaps = 17
CompositeAgg = 18
WaitForCurrentSnapshotsCreate = 19

# administrative actions
ForceMerge = 1001
Expand Down Expand Up @@ -809,6 +810,8 @@ def from_hyphenated_string(cls, v):
return OperationType.CreateSnapshot
elif v == "wait-for-snapshot-create":
return OperationType.WaitForSnapshotCreate
elif v == "wait-for-current-snapshots-create":
return OperationType.WaitForCurrentSnapshotsCreate
elif v == "restore-snapshot":
return OperationType.RestoreSnapshot
elif v == "wait-for-recovery":
Expand Down
166 changes: 166 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3965,6 +3965,172 @@ async def test_wait_for_snapshot_create_failure(self, es):
)


class TestWaitForCurrentSnapshotsCreate:
@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_wait_for_current_snapshots_create_before_8_3_0(self, es):
es.info = mock.AsyncMock(
return_value={
"name": "es01",
"cluster_name": "escluster",
"cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A",
"version": {
"number": "7.17.3",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff",
"build_date": "2022-04-19T08:11:19.070913226Z",
"build_snapshot": False,
"lucene_version": "8.11.1",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1",
},
"tagline": "You Know, for Search",
},
)

es.snapshot.get = mock.AsyncMock(
side_effect=[
{
"snapshots": [
{
"snapshot": "logging-test-0",
"uuid": "xVb4cPSKTfyIz-HcN9mQcg",
"repository": "many-shards",
"data_streams": [],
"state": "IN_PROGRESS",
"indices": ["a", "b", "c"],
},
{
"snapshot": "logging-test-1",
"uuid": "LEHRHopiTrqemFkGXQijHw",
"repository": "many-shards",
"data_streams": [],
"state": "IN_PROGRESS",
"indices": ["d", "e", "f"],
},
{
"snapshot": "logging-test-2",
"uuid": "9DJcMb5JQruddQbO3qzxSA",
"repository": "many-shards",
"data_streams": [],
"state": "IN_PROGRESS",
"indices": ["g", "h", "i"],
},
{
"snapshot": "logging-test-3",
"uuid": "5YmhlUBxRv6pQbswf4nsfw",
"repository": "many-shards",
"data_streams": [],
"state": "IN_PROGRESS",
"indices": ["j", "k", "l"],
},
],
"total": 4,
"remaining": 0,
},
{"snapshots": [], "total": 0, "remaining": 0},
],
)

repository = "many-shards"
task_params = {
"repository": repository,
"completion-recheck-wait-period": 0,
}

r = runner.WaitForCurrentSnapshotsCreate()
result = await r(es, task_params)

es.snapshot.get.assert_awaited_with(repository=repository, snapshot="_current", verbose=False)

assert es.snapshot.get.await_count == 2

assert result is None

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_wait_for_current_snapshots_create_after_8_3_0(self, es):
es.info = mock.AsyncMock(
return_value={
"name": "elasticsearch-0",
"cluster_name": "rally-benchmark",
"cluster_uuid": "rs5Gdzm6TISd-L4KZWVW4w",
"version": {
"number": "8.4.0-SNAPSHOT",
"build_type": "tar",
"build_hash": "4e18993f55431f6d3890049c9fea4c6c8218f070",
"build_date": "2022-07-05T00:19:44.887353304Z",
"build_snapshot": True,
"lucene_version": "9.3.0",
"minimum_wire_compatibility_version": "7.17.0",
"minimum_index_compatibility_version": "7.0.0",
},
"tagline": "You Know, for Search",
},
)

repository = "many-shards"
task_params = {
"repository": repository,
"completion-recheck-wait-period": 0,
}

es.perform_request = mock.AsyncMock(
side_effect=[
{
"snapshots": [
{
"snapshot": "logging-test-0",
"uuid": "xVb4cPSKTfyIz-HcN9mQcg",
"repository": "many-shards",
"data_streams": [],
"state": "IN_PROGRESS",
},
{
"snapshot": "logging-test-1",
"uuid": "LEHRHopiTrqemFkGXQijHw",
"repository": "many-shards",
"data_streams": [],
"state": "IN_PROGRESS",
},
{
"snapshot": "logging-test-2",
"uuid": "9DJcMb5JQruddQbO3qzxSA",
"repository": "many-shards",
"data_streams": [],
"state": "IN_PROGRESS",
},
{
"snapshot": "logging-test-3",
"uuid": "5YmhlUBxRv6pQbswf4nsfw",
"repository": "many-shards",
"data_streams": [],
"state": "IN_PROGRESS",
},
],
"total": 4,
"remaining": 0,
},
{"snapshots": [], "total": 0, "remaining": 0},
],
)

r = runner.WaitForCurrentSnapshotsCreate()
result = await r(es, task_params)

es.perform_request.assert_awaited_with(
method="GET",
path=f"_snapshot/{repository}/_current",
headers={"Content-Type": "application/json"},
params={"index_names": "false", "verbose": "false"},
)

assert es.perform_request.await_count == 2

assert result is None


class TestRestoreSnapshot:
@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
Expand Down

0 comments on commit 6edfa43

Please sign in to comment.