From 2efca17d1a7a57e5467b7c51e41cb04578e7b62c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 27 May 2021 14:26:26 +0200 Subject: [PATCH] Fix Bug with Concurrent Snapshot and Index Delete (#73456) Fixes the incorrect assumption in the snapshot state machine that a finished snapshot delete could only start shard snapshots: in fact it can also move snapshots to a completed state. --- .../DedicatedClusterSnapshotRestoreIT.java | 64 +++++++++++++++++++ .../snapshots/SnapshotsService.java | 6 +- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 4f6d5927425b8..a8f07bbdda2b4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -11,6 +11,7 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; @@ -20,6 +21,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -35,6 +37,7 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.ShardId; @@ -75,6 +78,7 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -90,6 +94,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; @@ -1045,6 +1050,65 @@ public void testPartialSnapshotsDoNotRecordDeletedShardFailures() throws Excepti assertThat(snapshotInfo.shardFailures(), empty()); } + public void testDeleteIndexDuringSnapshot() throws Exception { + final String indexName = "test-idx"; + assertAcked(prepareCreate(indexName, 1, indexSettingsNoReplicas(1))); + ensureGreen(); + indexRandomDocs(indexName, 100); + + final String repoName = "test-repo"; + createRepository(repoName, "fs"); + + final String firstSnapshotName = "test-snap"; + createSnapshot(repoName, firstSnapshotName, Collections.singletonList(indexName)); + final int concurrentLoops = randomIntBetween(2, 5); + final List> futures = new ArrayList<>(concurrentLoops); + for (int i = 0; i < concurrentLoops; i++) { + final PlainActionFuture future = PlainActionFuture.newFuture(); + futures.add(future); + startSnapshotDeleteLoop(repoName, indexName, "test-snap-" + i, future); + } + + Thread.sleep(200); + + logger.info("--> delete index"); + assertAcked(admin().indices().prepareDelete(indexName)); + + for (Future future : futures) { + future.get(); + } + + logger.info("--> restore snapshot 1"); + clusterAdmin().prepareRestoreSnapshot(repoName, firstSnapshotName).get(); + ensureGreen(indexName); + } + + // create and delete a snapshot of the given name and for the given single index in a loop until the index is removed from the cluster + // at which point doneListener is resolved + private void startSnapshotDeleteLoop(String repoName, String indexName, String snapshotName, ActionListener doneListener) { + clusterAdmin().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .setPartial(true) + .setIndices(indexName) + .execute(new ActionListener() { + @Override + public void onResponse(CreateSnapshotResponse createSnapshotResponse) { + clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).execute( + ActionTestUtils.assertNoFailureListener(acknowledgedResponse -> { + assertAcked(acknowledgedResponse); + startSnapshotDeleteLoop(repoName, indexName, snapshotName, doneListener); + })); + } + + @Override + public void onFailure(Exception e) { + assertThat(e, instanceOf(IndexNotFoundException.class)); + doneListener.onResponse(null); + } + }); + } + + public void testGetReposWithWildcard() { internalCluster().startMasterOnlyNode(); List repositoryMetadata = client().admin().cluster().prepareGetRepositories("*").get().repositories(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 77debf7a8ff01..37caa7d6d2844 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -2777,8 +2777,12 @@ && alreadyReassigned(value.key.getIndexName(), value.key.getId(), reassignedShar updatedAssignmentsBuilder.put(shardId, updated); } } - snapshotEntries.add(entry.withStartedShards(updatedAssignmentsBuilder.build())); + final SnapshotsInProgress.Entry updatedEntry = entry.withShardStates(updatedAssignmentsBuilder.build()); + snapshotEntries.add(updatedEntry); changed = true; + if (updatedEntry.state().completed()) { + newFinalizations.add(entry); + } } } } else {