diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java index 591933a813a93..2d702236229b9 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; @@ -140,7 +139,7 @@ public void testSimpleWorkflow() { assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false)); final BlobStoreRepository repo = (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo"); - BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC)); + BlobStoreTestUtil.assertConsistency(repo); } public void testMissingUri() { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java index aa0d0ce843137..eb9f60cebce47 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java @@ -167,6 +167,6 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti logger.info("--> cleanup repository"); client().admin().cluster().prepareCleanupRepository(repoName).get(); - BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic()); + BlobStoreTestUtil.assertConsistency(repository); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 4614d2da27b58..ba00b9b8bdc94 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -260,10 +260,11 @@ public void verifyReposThenStopServices() { runUntil(cleanedUp::get, TimeUnit.MINUTES.toMillis(1L)); - BlobStoreTestUtil.assertConsistency( - (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"), - Runnable::run + final PlainActionFuture future = BlobStoreTestUtil.assertConsistencyAsync( + (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo") ); + deterministicTaskQueue.runAllRunnableTasks(); + assertNull(future.actionGet(0)); } finally { testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop); } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index 473b8956c8ded..d6ee81edd09a7 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -198,7 +198,7 @@ public void testCleanup() throws Exception { logger.info("--> deleting a snapshot to trigger repository cleanup"); client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet(); - BlobStoreTestUtil.assertConsistency(repo, genericExec); + BlobStoreTestUtil.assertConsistency(repo); logger.info("--> Create dangling index"); createDanglingIndex(repo, genericExec); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index b2df3f9d6478e..98fd2e5449d04 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -35,12 +35,10 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; @@ -58,7 +56,6 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -82,23 +79,25 @@ public final class BlobStoreTestUtil { - public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) { - final BlobStoreRepository repo = - (BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); - BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC)); - } - /** * Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository. * TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata * @param repository BlobStoreRepository to check - * @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository} - * implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers - * of this assertion must pass an executor on those when using such an implementation. */ - public static void assertConsistency(BlobStoreRepository repository, Executor executor) { - final PlainActionFuture listener = PlainActionFuture.newFuture(); - executor.execute(ActionRunnable.supply(listener, () -> { + public static void assertConsistency(BlobStoreRepository repository) { + final PlainActionFuture listener = assertConsistencyAsync(repository); + final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L)); + if (err != null) { + throw new AssertionError(err); + } + } + + /** + * Same as {@link #assertConsistency(BlobStoreRepository)} but async so it can be used in tests that don't allow blocking. + */ + public static PlainActionFuture assertConsistencyAsync(BlobStoreRepository repository) { + final PlainActionFuture future = PlainActionFuture.newFuture(); + repository.threadPool().generic().execute(ActionRunnable.wrap(future, listener -> { try { final BlobContainer blobContainer = repository.blobContainer(); final long latestGen; @@ -117,15 +116,12 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); - return null; + listener.onResponse(null); } catch (AssertionError e) { - return e; + listener.onResponse(e); } })); - final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L)); - if (err != null) { - throw new AssertionError(err); - } + return future; } private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 8c64c7eaa8c9d..2632615861277 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -134,7 +134,7 @@ public void assertRepoConsistency() { clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get(); clusterAdmin().prepareCleanupRepository(name).get(); } - BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name); + BlobStoreTestUtil.assertConsistency(getRepositoryOnMaster(name)); }); } else { logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason); diff --git a/x-pack/snapshot-tool/qa/s3/src/test/java/org/elasticsearch/snapshots/S3CleanupTests.java b/x-pack/snapshot-tool/qa/s3/src/test/java/org/elasticsearch/snapshots/S3CleanupTests.java index 23ba70f563d65..6471291eb6ac1 100644 --- a/x-pack/snapshot-tool/qa/s3/src/test/java/org/elasticsearch/snapshots/S3CleanupTests.java +++ b/x-pack/snapshot-tool/qa/s3/src/test/java/org/elasticsearch/snapshots/S3CleanupTests.java @@ -10,43 +10,21 @@ import joptsimple.OptionSet; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cli.MockTerminal; -import org.elasticsearch.common.blobstore.BlobMetadata; -import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.s3.S3RepositoryPlugin; import java.util.HashMap; import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; public class S3CleanupTests extends AbstractCleanupTests { - @Override - protected void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map blobs) - throws Exception { - assertBusy(() -> super.assertBlobsByPrefix(repository, path, prefix, blobs), 10, TimeUnit.MINUTES); - } - - @Override - protected void assertCorruptionVisible(BlobStoreRepository repo, Map> indexToFiles) throws Exception { - assertBusy(() -> super.assertCorruptionVisible(repo, indexToFiles), 10, TimeUnit.MINUTES); - } - - @Override - protected void assertConsistency(BlobStoreRepository repo, Executor executor) throws Exception { - assertBusy(() -> super.assertConsistency(repo, executor), 10, TimeUnit.MINUTES); - } - @Override protected Collection> getPlugins() { return pluginList(S3RepositoryPlugin.class); diff --git a/x-pack/snapshot-tool/src/test/java/org/elasticsearch/snapshots/AbstractCleanupTests.java b/x-pack/snapshot-tool/src/test/java/org/elasticsearch/snapshots/AbstractCleanupTests.java index 759ceb8bb2933..d61ff9202fa77 100644 --- a/x-pack/snapshot-tool/src/test/java/org/elasticsearch/snapshots/AbstractCleanupTests.java +++ b/x-pack/snapshot-tool/src/test/java/org/elasticsearch/snapshots/AbstractCleanupTests.java @@ -12,22 +12,18 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cli.MockTerminal; import org.elasticsearch.cli.Terminal; -import org.elasticsearch.common.blobstore.BlobMetadata; -import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.Executor; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -38,19 +34,6 @@ public abstract class AbstractCleanupTests extends ESSingleNodeTestCase { protected BlobStoreRepository repository; - protected void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map blobs) - throws Exception { - BlobStoreTestUtil.assertBlobsByPrefix(repository, path, prefix, blobs); - } - - protected void assertConsistency(BlobStoreRepository repo, Executor executor) throws Exception { - BlobStoreTestUtil.assertConsistency(repo, executor); - } - - protected void assertCorruptionVisible(BlobStoreRepository repo, Map> indexToFiles) throws Exception { - BlobStoreTestUtil.assertCorruptionVisible(repo, indexToFiles); - } - @Override public void setUp() throws Exception { super.setUp(); @@ -64,7 +47,7 @@ private void cleanupRepository(BlobStoreRepository repository) throws Exception repository.threadPool().generic().execute(ActionRunnable.run(future, () -> repository.blobStore().blobContainer(repository.basePath()).delete())); future.actionGet(); - assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap()); + BlobStoreTestUtil.assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap()); } @Override @@ -191,7 +174,7 @@ public void testCleanup() throws Throwable { assertThat(terminal.getOutput(), containsString("Set of deletion candidates is empty. Exiting")); logger.info("--> check that there is no inconsistencies after running the tool"); - assertConsistency(repository, repository.threadPool().executor(ThreadPool.Names.GENERIC)); + BlobStoreTestUtil.assertConsistency(repository); logger.info("--> create several dangling indices"); int numOfFiles = 0; @@ -211,7 +194,7 @@ public void testCleanup() throws Throwable { Set danglingIndices = indexToFiles.keySet(); logger.info("--> ensure dangling index folders are visible"); - assertCorruptionVisible(repository, indexToFiles); + BlobStoreTestUtil.assertCorruptionVisible(repository, indexToFiles); logger.info("--> execute cleanup tool, corruption is created latter than snapshot, there is nothing to cleanup"); terminal = executeCommand(false); @@ -258,7 +241,7 @@ public void testCleanup() throws Throwable { containsString("Total bytes freed: " + size)); logger.info("--> verify that there is no inconsistencies"); - assertConsistency(repository, repository.threadPool().executor(ThreadPool.Names.GENERIC)); + BlobStoreTestUtil.assertConsistency(repository); logger.info("--> perform cleanup by removing snapshots"); assertTrue(client().admin() .cluster()