Skip to content

Commit

Permalink
Fix Temporarily Leaking Shard Level Metadata Blobs in some Cases (ela…
Browse files Browse the repository at this point in the history
…stic#76562) (elastic#77177)

When doing out of order finalizations we would leak shard level metadata blobs at times.
This commit enhances the cleanup logic after finalization to catch these leaked blobs
and adds a test that would without this fix trip the leaked blobs assertion in the test
infrastructure.
  • Loading branch information
original-brownbear authored Sep 2, 2021
1 parent 71902d8 commit 3fa00ea
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,43 @@ public void testQueuedAfterFailedShardSnapshot() throws Exception {
assertEquals(snapshotsStatusResponse1, snapshotsStatusResponse3);
}

public void testOutOfOrderFinalizationManySnapshots() throws Exception {
internalCluster().startMasterOnlyNode();
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
final String index1 = "index-1";
final String index2 = "index-2";
createIndexWithContent(index1, dataNodes.get(0), dataNodes.get(1));
createIndexWithContent(index2, dataNodes.get(1), dataNodes.get(0));

final String repository = "test-repo";
createRepository(repository, "mock");

blockNodeWithIndex(repository, index2);

final ActionFuture<CreateSnapshotResponse> snapshot1 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-1")
.setIndices(index1, index2)
.setWaitForCompletion(true)
.execute();
final ActionFuture<CreateSnapshotResponse> snapshot2 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-2")
.setIndices(index1, index2)
.setWaitForCompletion(true)
.execute();
awaitNumberOfSnapshotsInProgress(2);
final ActionFuture<CreateSnapshotResponse> snapshot3 = clusterAdmin().prepareCreateSnapshot(repository, "snapshot-3")
.setIndices(index1)
.setWaitForCompletion(true)
.execute();
assertSuccessful(snapshot3);
unblockAllDataNodes(repository);
assertSuccessful(snapshot1);
assertSuccessful(snapshot2);

assertThat(
clusterAdmin().prepareSnapshotStatus().setSnapshots("snapshot-2").setRepository(repository).get().getSnapshots(),
hasSize(1)
);
}

private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
final SnapshotsStatusResponse snapshotsStatusResponse = client().admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,38 @@ public Entry snapshot(final Snapshot snapshot) {
return null;
}

/**
* Computes a map of repository shard id to set of generations, containing all shard generations that became obsolete and may be
* deleted from the repository as the cluster state moved from the given {@code old} value of {@link SnapshotsInProgress} to this
* instance.
*/
public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations(SnapshotsInProgress old) {
final Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations = new HashMap<>();
for (Entry entry : old.entries()) {
final Entry updatedEntry = snapshot(entry.snapshot());
if (updatedEntry == null) {
continue;
}
for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> oldShardAssignment : entry.shardsByRepoShardId()) {
final RepositoryShardId repositoryShardId = oldShardAssignment.key;
final ShardSnapshotStatus oldStatus = oldShardAssignment.value;
final ShardSnapshotStatus newStatus = updatedEntry.shardsByRepoShardId().get(repositoryShardId);
if (oldStatus.state == ShardState.SUCCESS
&& oldStatus.generation() != null
&& newStatus != null
&& newStatus.state() == ShardState.SUCCESS
&& newStatus.generation() != null
&& oldStatus.generation().equals(newStatus.generation()) == false
) {
// We moved from a non-null generation successful generation to a different non-null successful generation
// so the original generation is clearly obsolete because it was in-flight before and is now unreferenced everywhere.
obsoleteGenerations.computeIfAbsent(repositoryShardId, ignored -> new HashSet<>()).add(oldStatus.generation());
}
}
}
return org.elasticsearch.core.Map.copyOf(obsoleteGenerations);
}

@Override
public String getWriteableName() {
return TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@

package org.elasticsearch.repositories;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;

import java.util.Map;
import java.util.Set;

/**
* Context for finalizing a snapshot.
*/
Expand All @@ -25,6 +30,12 @@ public final class FinalizeSnapshotContext extends ActionListener.Delegating<

private final ShardGenerations updatedShardGenerations;

/**
* Obsolete shard generations map computed from the cluster state update that this finalization executed in
* {@link #updatedClusterState}.
*/
private final SetOnce<Map<RepositoryShardId, Set<ShardGeneration>>> obsoleteGenerations = new SetOnce<>();

private final long repositoryStateId;

private final Metadata clusterMetadata;
Expand Down Expand Up @@ -78,8 +89,18 @@ public Metadata clusterMetadata() {
return clusterMetadata;
}

public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteShardGenerations() {
assert obsoleteGenerations.get() != null : "must only be called after #updatedClusterState";
return obsoleteGenerations.get();
}

public ClusterState updatedClusterState(ClusterState state) {
return SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot());
final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot());
obsoleteGenerations.set(
updatedState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.obsoleteGenerations(state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY))
);
return updatedState;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,7 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
finalizeSnapshotContext::updatedClusterState,
ActionListener.wrap(newRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, newRepoData);
cleanupOldShardGens(existingRepositoryData, newRepoData, finalizeSnapshotContext);
}
finalizeSnapshotContext.onResponse(Tuple.tuple(newRepoData, snapshotInfo));
}, onUpdateFailure)
Expand Down Expand Up @@ -1478,8 +1478,12 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
}

// Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) {
final List<String> toDelete = new ArrayList<>();
private void cleanupOldShardGens(
RepositoryData existingRepositoryData,
RepositoryData updatedRepositoryData,
FinalizeSnapshotContext finalizeSnapshotContext
) {
final Set<String> toDelete = new HashSet<>();
final int prefixPathLen = basePath().buildAsString().length();
updatedRepositoryData.shardGenerations()
.obsoleteShardGenerations(existingRepositoryData.shardGenerations())
Expand All @@ -1490,6 +1494,15 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
)
)
);
for (Map.Entry<RepositoryShardId, Set<ShardGeneration>> obsoleteEntry : finalizeSnapshotContext.obsoleteShardGenerations()
.entrySet()) {
final String containerPath = shardContainer(obsoleteEntry.getKey().index(), obsoleteEntry.getKey().shardId()).path()
.buildAsString()
.substring(prefixPathLen) + INDEX_FILE_PREFIX;
for (ShardGeneration shardGeneration : obsoleteEntry.getValue()) {
toDelete.add(containerPath + shardGeneration);
}
}
try {
deleteFromContainer(blobContainer(), toDelete.iterator());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2241,8 +2241,6 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
* @return updated cluster state
*/
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
// TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance
// BlobStoreTestUtil to catch this leak
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
ClusterState result = state;
int indexOfEntry = -1;
Expand Down

0 comments on commit 3fa00ea

Please sign in to comment.