Skip to content

Commit

Permalink
Fix Index Deletion During Partial Snapshot Create (#50234)
Browse files Browse the repository at this point in the history
* Fix Index Deletion During Partial Snapshot Create

We can simply filter out shard generation updates for indices
that were removed from the cluster state concurrently to fix
index deletes during partial snapshots as that completely removes
any reference to those shards from the snapshot.

Follow up to #50202
Closes #50200
  • Loading branch information
original-brownbear authored Dec 17, 2019
1 parent 08214eb commit e70c7c9
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ private ShardGenerations(Map<IndexId, List<String>> shardGenerations) {
this.shardGenerations = shardGenerations;
}

/**
* Returns the total number of shards tracked by this instance.
*/
public int totalShards() {
return shardGenerations.values().stream().mapToInt(List::size).sum();
}

/**
* Returns all indices for which shard generations are tracked.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,16 +568,17 @@ public void onNoLongerMaster() {
private void cleanupAfterError(Exception exception) {
threadPool.generic().execute(() -> {
if (snapshotCreated) {
final MetaData metaData = clusterService.state().metaData();
repositoriesService.repository(snapshot.snapshot().getRepository())
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
buildGenerations(snapshot),
buildGenerations(snapshot, metaData),
snapshot.startTime(),
ExceptionsHelper.stackTrace(exception),
0,
Collections.emptyList(),
snapshot.repositoryStateId(),
snapshot.includeGlobalState(),
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
metaDataForSnapshot(snapshot, metaData),
snapshot.userMetadata(),
snapshot.useShardGenerations(),
ActionListener.runAfter(ActionListener.wrap(ignored -> {
Expand All @@ -593,11 +594,21 @@ private void cleanupAfterError(Exception exception) {
}
}

private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) {
private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
ShardGenerations.Builder builder = ShardGenerations.builder();
final Map<String, IndexId> indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
snapshot.shards().forEach(c -> builder.put(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation()));
snapshot.shards().forEach(c -> {
if (metaData.index(c.key.getIndex()) == null) {
assert snapshot.partial() :
"Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
return;
}
final IndexId indexId = indexLookup.get(c.key.getIndexName());
if (indexId != null) {
builder.put(indexId, c.key.id(), c.value.generation());
}
});
return builder.build();
}

Expand Down Expand Up @@ -1032,12 +1043,13 @@ protected void doRun() {
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
}
}
final ShardGenerations shardGenerations = buildGenerations(entry, metaData);
repository.finalizeSnapshot(
snapshot.getSnapshotId(),
buildGenerations(entry),
shardGenerations,
entry.startTime(),
failure,
entry.shards().size(),
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(),
unmodifiableList(shardFailures),
entry.repositoryStateId(),
entry.includeGlobalState(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -143,6 +144,7 @@
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
Expand Down Expand Up @@ -211,6 +213,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -503,7 +506,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
}
}

public void testConcurrentSnapshotDeleteAndDeleteIndex() {
public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
Expand All @@ -514,11 +517,13 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() {
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());

final StepListener<Collection<CreateIndexResponse>> createIndicesListener = new StepListener<>();
final int indices = randomIntBetween(5, 20);

final SetOnce<Index> firstIndex = new SetOnce<>();
continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> {
firstIndex.set(masterNode.clusterService.state().metaData().index(index).getIndex());
// create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot
// finalization
final int indices = randomIntBetween(5, 20);
final GroupedActionListener<CreateIndexResponse> listener = new GroupedActionListener<>(createIndicesListener, indices);
for (int i = 0; i < indices; ++i) {
client().admin().indices().create(new CreateIndexRequest("index-" + i), listener);
Expand All @@ -527,23 +532,54 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() {

final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();

final boolean partialSnapshot = randomBoolean();

continueOrDie(createIndicesListener, createIndexResponses ->
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(false)
.execute(createSnapshotResponseStepListener));
.setPartial(partialSnapshot).execute(createSnapshotResponseStepListener));

continueOrDie(createSnapshotResponseStepListener,
createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener()));
createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (partialSnapshot) {
// Recreate index by the same name to test that we don't snapshot conflicting metadata in this scenario
client().admin().indices().create(new CreateIndexRequest(index), noopListener());
}
}

@Override
public void onFailure(Exception e) {
if (partialSnapshot) {
throw new AssertionError("Delete index should always work during partial snapshots", e);
}
}
}));

deterministicTaskQueue.runAllRunnableTasks();

SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
final RepositoryData repositoryData = getRepositoryData(repository);
Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
assertThat(snapshotIds, hasSize(1));

final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
if (partialSnapshot) {
// Single shard for each index so we either get all indices or all except for the deleted index
assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices)));
if (snapshotInfo.successfulShards() == indices + 1) {
final IndexMetaData indexMetaData =
repository.getSnapshotIndexMetaData(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index));
// Make sure we snapshotted the metadata of this index and not the recreated version
assertEquals(indexMetaData.getIndex(), firstIndex.get());
}
} else {
// Index delete must be blocked for non-partial snapshots and we get a snapshot for every index
assertEquals(snapshotInfo.successfulShards(), indices + 1);
}
assertEquals(0, snapshotInfo.failedShards());
}

Expand Down

0 comments on commit e70c7c9

Please sign in to comment.