From e70c7c9e94847a882bf6a401fbe70177affefe2f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 17 Dec 2019 09:42:02 +0100 Subject: [PATCH] Fix Index Deletion During Partial Snapshot Create (#50234) * Fix Index Deletion During Partial Snapshot Create We can simply filter out shard generation updates for indices that were removed from the cluster state concurrently to fix index deletes during partial snapshots as that completely removes any reference to those shards from the snapshot. Follow up to #50202 Closes #50200 --- .../repositories/ShardGenerations.java | 7 +++ .../snapshots/SnapshotsService.java | 24 +++++++--- .../snapshots/SnapshotResiliencyTests.java | 46 +++++++++++++++++-- 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java index 6351d5e2f2bf0..8b7f799d0e7c2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java +++ b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java @@ -54,6 +54,13 @@ private ShardGenerations(Map> shardGenerations) { this.shardGenerations = shardGenerations; } + /** + * Returns the total number of shards tracked by this instance. + */ + public int totalShards() { + return shardGenerations.values().stream().mapToInt(List::size).sum(); + } + /** * Returns all indices for which shard generations are tracked. * diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7c638bc332edc..cfb2112b4044d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -568,16 +568,17 @@ public void onNoLongerMaster() { private void cleanupAfterError(Exception exception) { threadPool.generic().execute(() -> { if (snapshotCreated) { + final MetaData metaData = clusterService.state().metaData(); repositoriesService.repository(snapshot.snapshot().getRepository()) .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), - buildGenerations(snapshot), + buildGenerations(snapshot, metaData), snapshot.startTime(), ExceptionsHelper.stackTrace(exception), 0, Collections.emptyList(), snapshot.repositoryStateId(), snapshot.includeGlobalState(), - metaDataForSnapshot(snapshot, clusterService.state().metaData()), + metaDataForSnapshot(snapshot, metaData), snapshot.userMetadata(), snapshot.useShardGenerations(), ActionListener.runAfter(ActionListener.wrap(ignored -> { @@ -593,11 +594,21 @@ private void cleanupAfterError(Exception exception) { } } - private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) { + private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, MetaData metaData) { ShardGenerations.Builder builder = ShardGenerations.builder(); final Map indexLookup = new HashMap<>(); snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx)); - snapshot.shards().forEach(c -> builder.put(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation())); + snapshot.shards().forEach(c -> { + if (metaData.index(c.key.getIndex()) == null) { + assert snapshot.partial() : + "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial."; + return; + } + final IndexId indexId = indexLookup.get(c.key.getIndexName()); + if (indexId != null) { + builder.put(indexId, c.key.id(), c.value.generation()); + } + }); return builder.build(); } @@ -1032,12 +1043,13 @@ protected void doRun() { shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason())); } } + final ShardGenerations shardGenerations = buildGenerations(entry, metaData); repository.finalizeSnapshot( snapshot.getSnapshotId(), - buildGenerations(entry), + shardGenerations, entry.startTime(), failure, - entry.shards().size(), + entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), unmodifiableList(shardFailures), entry.repositoryStateId(), entry.includeGlobalState(), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index a0e7e51098681..5fd6624d1d128 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -143,6 +144,7 @@ import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; +import org.elasticsearch.index.Index; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; @@ -211,6 +213,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; @@ -503,7 +506,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { } } - public void testConcurrentSnapshotDeleteAndDeleteIndex() { + public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); String repoName = "repo"; @@ -514,11 +517,13 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() { testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); final StepListener> createIndicesListener = new StepListener<>(); + final int indices = randomIntBetween(5, 20); + final SetOnce firstIndex = new SetOnce<>(); continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> { + firstIndex.set(masterNode.clusterService.state().metaData().index(index).getIndex()); // create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot // finalization - final int indices = randomIntBetween(5, 20); final GroupedActionListener listener = new GroupedActionListener<>(createIndicesListener, indices); for (int i = 0; i < indices; ++i) { client().admin().indices().create(new CreateIndexRequest("index-" + i), listener); @@ -527,23 +532,54 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); + final boolean partialSnapshot = randomBoolean(); + continueOrDie(createIndicesListener, createIndexResponses -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(false) - .execute(createSnapshotResponseStepListener)); + .setPartial(partialSnapshot).execute(createSnapshotResponseStepListener)); continueOrDie(createSnapshotResponseStepListener, - createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener())); + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (partialSnapshot) { + // Recreate index by the same name to test that we don't snapshot conflicting metadata in this scenario + client().admin().indices().create(new CreateIndexRequest(index), noopListener()); + } + } + + @Override + public void onFailure(Exception e) { + if (partialSnapshot) { + throw new AssertionError("Delete index should always work during partial snapshots", e); + } + } + })); deterministicTaskQueue.runAllRunnableTasks(); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + final RepositoryData repositoryData = getRepositoryData(repository); + Collection snapshotIds = repositoryData.getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + if (partialSnapshot) { + // Single shard for each index so we either get all indices or all except for the deleted index + assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices))); + if (snapshotInfo.successfulShards() == indices + 1) { + final IndexMetaData indexMetaData = + repository.getSnapshotIndexMetaData(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index)); + // Make sure we snapshotted the metadata of this index and not the recreated version + assertEquals(indexMetaData.getIndex(), firstIndex.get()); + } + } else { + // Index delete must be blocked for non-partial snapshots and we get a snapshot for every index + assertEquals(snapshotInfo.successfulShards(), indices + 1); + } assertEquals(0, snapshotInfo.failedShards()); }