Skip to content

Commit

Permalink
Simplify Blobstore Consistency Check in Tests (#73992) (#74045)
Browse files Browse the repository at this point in the history
With work to make repo APIs more async incoming in #73570
we need a non-blocking way to run this check. This adds that async
check and removes the need to manually pass executors around as well.
  • Loading branch information
original-brownbear authored Jun 13, 2021
1 parent 094eb11 commit b2c17c6
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;

Expand Down Expand Up @@ -140,7 +139,7 @@ public void testSimpleWorkflow() {
assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false));
final BlobStoreRepository repo =
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
BlobStoreTestUtil.assertConsistency(repo);
}

public void testMissingUri() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,6 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
logger.info("--> cleanup repository");
client().admin().cluster().prepareCleanupRepository(repoName).get();

BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic());
BlobStoreTestUtil.assertConsistency(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,11 @@ public void verifyReposThenStopServices() {

runUntil(cleanedUp::get, TimeUnit.MINUTES.toMillis(1L));

BlobStoreTestUtil.assertConsistency(
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
Runnable::run
final PlainActionFuture<AssertionError> future = BlobStoreTestUtil.assertConsistencyAsync(
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo")
);
deterministicTaskQueue.runAllRunnableTasks();
assertNull(future.actionGet(0));
} finally {
testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void testCleanup() throws Exception {
logger.info("--> deleting a snapshot to trigger repository cleanup");
client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet();

BlobStoreTestUtil.assertConsistency(repo, genericExec);
BlobStoreTestUtil.assertConsistency(repo);

logger.info("--> Create dangling index");
createDanglingIndex(repo, genericExec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.ByteArrayInputStream;
Expand All @@ -58,7 +56,6 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -82,23 +79,25 @@

public final class BlobStoreTestUtil {

public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) {
final BlobStoreRepository repo =
(BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
}

/**
* Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
* TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata
* @param repository BlobStoreRepository to check
* @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository}
* implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers
* of this assertion must pass an executor on those when using such an implementation.
*/
public static void assertConsistency(BlobStoreRepository repository, Executor executor) {
final PlainActionFuture<AssertionError> listener = PlainActionFuture.newFuture();
executor.execute(ActionRunnable.supply(listener, () -> {
public static void assertConsistency(BlobStoreRepository repository) {
final PlainActionFuture<AssertionError> listener = assertConsistencyAsync(repository);
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
if (err != null) {
throw new AssertionError(err);
}
}

/**
* Same as {@link #assertConsistency(BlobStoreRepository)} but async so it can be used in tests that don't allow blocking.
*/
public static PlainActionFuture<AssertionError> assertConsistencyAsync(BlobStoreRepository repository) {
final PlainActionFuture<AssertionError> future = PlainActionFuture.newFuture();
repository.threadPool().generic().execute(ActionRunnable.wrap(future, listener -> {
try {
final BlobContainer blobContainer = repository.blobContainer();
final long latestGen;
Expand All @@ -117,15 +116,12 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
assertIndexUUIDs(repository, repositoryData);
assertSnapshotUUIDs(repository, repositoryData);
assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
return null;
listener.onResponse(null);
} catch (AssertionError e) {
return e;
listener.onResponse(e);
}
}));
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
if (err != null) {
throw new AssertionError(err);
}
return future;
}

private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void assertRepoConsistency() {
clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get();
clusterAdmin().prepareCleanupRepository(name).get();
}
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
BlobStoreTestUtil.assertConsistency(getRepositoryOnMaster(name));
});
} else {
logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,21 @@
import joptsimple.OptionSet;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cli.MockTerminal;
import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.s3.S3RepositoryPlugin;

import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;

public class S3CleanupTests extends AbstractCleanupTests {

@Override
protected void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map<String, BlobMetadata> blobs)
throws Exception {
assertBusy(() -> super.assertBlobsByPrefix(repository, path, prefix, blobs), 10, TimeUnit.MINUTES);
}

@Override
protected void assertCorruptionVisible(BlobStoreRepository repo, Map<String, Set<String>> indexToFiles) throws Exception {
assertBusy(() -> super.assertCorruptionVisible(repo, indexToFiles), 10, TimeUnit.MINUTES);
}

@Override
protected void assertConsistency(BlobStoreRepository repo, Executor executor) throws Exception {
assertBusy(() -> super.assertConsistency(repo, executor), 10, TimeUnit.MINUTES);
}

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(S3RepositoryPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,18 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cli.MockTerminal;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Executor;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -38,19 +34,6 @@ public abstract class AbstractCleanupTests extends ESSingleNodeTestCase {

protected BlobStoreRepository repository;

protected void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map<String, BlobMetadata> blobs)
throws Exception {
BlobStoreTestUtil.assertBlobsByPrefix(repository, path, prefix, blobs);
}

protected void assertConsistency(BlobStoreRepository repo, Executor executor) throws Exception {
BlobStoreTestUtil.assertConsistency(repo, executor);
}

protected void assertCorruptionVisible(BlobStoreRepository repo, Map<String, Set<String>> indexToFiles) throws Exception {
BlobStoreTestUtil.assertCorruptionVisible(repo, indexToFiles);
}

@Override
public void setUp() throws Exception {
super.setUp();
Expand All @@ -64,7 +47,7 @@ private void cleanupRepository(BlobStoreRepository repository) throws Exception
repository.threadPool().generic().execute(ActionRunnable.run(future,
() -> repository.blobStore().blobContainer(repository.basePath()).delete()));
future.actionGet();
assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap());
BlobStoreTestUtil.assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap());
}

@Override
Expand Down Expand Up @@ -191,7 +174,7 @@ public void testCleanup() throws Throwable {
assertThat(terminal.getOutput(), containsString("Set of deletion candidates is empty. Exiting"));

logger.info("--> check that there is no inconsistencies after running the tool");
assertConsistency(repository, repository.threadPool().executor(ThreadPool.Names.GENERIC));
BlobStoreTestUtil.assertConsistency(repository);

logger.info("--> create several dangling indices");
int numOfFiles = 0;
Expand All @@ -211,7 +194,7 @@ public void testCleanup() throws Throwable {
Set<String> danglingIndices = indexToFiles.keySet();

logger.info("--> ensure dangling index folders are visible");
assertCorruptionVisible(repository, indexToFiles);
BlobStoreTestUtil.assertCorruptionVisible(repository, indexToFiles);

logger.info("--> execute cleanup tool, corruption is created latter than snapshot, there is nothing to cleanup");
terminal = executeCommand(false);
Expand Down Expand Up @@ -258,7 +241,7 @@ public void testCleanup() throws Throwable {
containsString("Total bytes freed: " + size));

logger.info("--> verify that there is no inconsistencies");
assertConsistency(repository, repository.threadPool().executor(ThreadPool.Names.GENERIC));
BlobStoreTestUtil.assertConsistency(repository);
logger.info("--> perform cleanup by removing snapshots");
assertTrue(client().admin()
.cluster()
Expand Down

0 comments on commit b2c17c6

Please sign in to comment.