Skip to content

Commit

Permalink
Fix Bug with Concurrent Snapshot and Index Delete (#73456)
Browse files Browse the repository at this point in the history
Fixes the incorrect assumption in the snapshot state machine that a finished
snapshot delete could only start shard snapshots: in fact it can also move
snapshots to a completed state.
  • Loading branch information
original-brownbear authored May 27, 2021
1 parent 3d36df5 commit 14a31b9
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
Expand All @@ -19,7 +20,9 @@
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.SnapshotsInProgress;
Expand All @@ -33,6 +36,7 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -72,6 +76,7 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -87,6 +92,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -1007,6 +1013,65 @@ public void testPartialSnapshotsDoNotRecordDeletedShardFailures() throws Excepti
assertThat(snapshotInfo.shardFailures(), empty());
}

public void testDeleteIndexDuringSnapshot() throws Exception {
final String indexName = "test-idx";
assertAcked(prepareCreate(indexName, 1, indexSettingsNoReplicas(1)));
ensureGreen();
indexRandomDocs(indexName, 100);

final String repoName = "test-repo";
createRepository(repoName, "fs");

final String firstSnapshotName = "test-snap";
createSnapshot(repoName, firstSnapshotName, List.of(indexName));
final int concurrentLoops = randomIntBetween(2, 5);
final List<Future<Void>> futures = new ArrayList<>(concurrentLoops);
for (int i = 0; i < concurrentLoops; i++) {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
futures.add(future);
startSnapshotDeleteLoop(repoName, indexName, "test-snap-" + i, future);
}

Thread.sleep(200);

logger.info("--> delete index");
assertAcked(admin().indices().prepareDelete(indexName));

for (Future<Void> future : futures) {
future.get();
}

logger.info("--> restore snapshot 1");
clusterAdmin().prepareRestoreSnapshot(repoName, firstSnapshotName).get();
ensureGreen(indexName);
}

// create and delete a snapshot of the given name and for the given single index in a loop until the index is removed from the cluster
// at which point doneListener is resolved
private void startSnapshotDeleteLoop(String repoName, String indexName, String snapshotName, ActionListener<Void> doneListener) {
clusterAdmin().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setPartial(true)
.setIndices(indexName)
.execute(new ActionListener<>() {
@Override
public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).execute(
ActionTestUtils.assertNoFailureListener(acknowledgedResponse -> {
assertAcked(acknowledgedResponse);
startSnapshotDeleteLoop(repoName, indexName, snapshotName, doneListener);
}));
}

@Override
public void onFailure(Exception e) {
assertThat(e, instanceOf(IndexNotFoundException.class));
doneListener.onResponse(null);
}
});
}


public void testGetReposWithWildcard() {
internalCluster().startMasterOnlyNode();
List<RepositoryMetadata> repositoryMetadata = client().admin().cluster().prepareGetRepositories("*").get().repositories();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2210,8 +2210,12 @@ && alreadyReassigned(value.key.getIndexName(), value.key.getId(), reassignedShar
updatedAssignmentsBuilder.put(shardId, updated);
}
}
snapshotEntries.add(entry.withStartedShards(updatedAssignmentsBuilder.build()));
final SnapshotsInProgress.Entry updatedEntry = entry.withShardStates(updatedAssignmentsBuilder.build());
snapshotEntries.add(updatedEntry);
changed = true;
if (updatedEntry.state().completed()) {
newFinalizations.add(entry);
}
}
}
} else {
Expand Down

0 comments on commit 14a31b9

Please sign in to comment.