Skip to content

Commit

Permalink
Check for recovery details when finished (#1018)
Browse files Browse the repository at this point in the history
With this commit we defer checking any details about indices recovery
until recovery has finished. This avoids any issues with properties that
are not yet returned by the API in earlier stages (e.g.
`stop_time_in_millis`).
  • Loading branch information
danielmitterdorfer authored Jun 15, 2020
1 parent 9f73d5d commit fadc065
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
14 changes: 9 additions & 5 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,8 @@ async def __call__(self, es, params):
total_end_millis = 0

# wait until recovery is done
# The nesting level is ok here given the structure of the API response
# pylint: disable=too-many-nested-blocks
while not all_shards_done:
response = await es.indices.recovery(index=index)
# This might happen if we happen to call the API before the next recovery is scheduled.
Expand All @@ -1528,11 +1530,13 @@ async def __call__(self, es, params):
for _, idx_data in response.items():
for _, shard_data in idx_data.items():
for shard in shard_data:
all_shards_done = all_shards_done and (shard["stage"] == "DONE")
total_start_millis = min(total_start_millis, shard["start_time_in_millis"])
total_end_millis = max(total_end_millis, shard["stop_time_in_millis"])
idx_size = shard["index"]["size"]
total_recovered += idx_size["recovered_in_bytes"]
current_shard_done = shard["stage"] == "DONE"
all_shards_done = all_shards_done and current_shard_done
if current_shard_done:
total_start_millis = min(total_start_millis, shard["start_time_in_millis"])
total_end_millis = max(total_end_millis, shard["stop_time_in_millis"])
idx_size = shard["index"]["size"]
total_recovered += idx_size["recovered_in_bytes"]
self.logger.debug("All shards done for [%s]: [%s].", index, all_shards_done)

if not all_shards_done:
Expand Down
43 changes: 41 additions & 2 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2879,6 +2879,45 @@ async def test_waits_for_ongoing_indices_recovery(self, es):
es.indices.recovery.side_effect = [
# recovery did not yet start
as_future({}),
# recovery about to be started
as_future({
"index1": {
"shards": [
{
"id": 0,
"type": "SNAPSHOT",
"stage": "INIT",
"primary": True,
"start_time_in_millis": 1393244159716,
"index": {
"size": {
"total": "75.4mb",
"total_in_bytes": 79063092,
"recovered": "0mb",
"recovered_in_bytes": 0,
}
}
},
{
"id": 1,
"type": "SNAPSHOT",
"stage": "DONE",
"primary": True,
"start_time_in_millis": 1393244155000,
"stop_time_in_millis": 1393244158000,
"index": {
"size": {
"total": "175.4mb",
"total_in_bytes": 179063092,
"recovered": "165.7mb",
"recovered_in_bytes": 168891939,
}
}
}
]
}
}),

# active recovery - one shard is not yet finished
as_future({
"index1": {
Expand Down Expand Up @@ -2975,8 +3014,8 @@ async def test_waits_for_ongoing_indices_recovery(self, es):
self.assertEqual(5, result["time_period"])

es.indices.recovery.assert_called_with(index="index1")
# retries three times
self.assertEqual(3, es.indices.recovery.call_count)
# retries four times
self.assertEqual(4, es.indices.recovery.call_count)


class ShrinkIndexTests(TestCase):
Expand Down

0 comments on commit fadc065

Please sign in to comment.