diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java new file mode 100644 index 0000000000000..6fcc89cfe9e9a --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.Settings; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase { + + private static String INDEX_NAME = "test-index"; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); + } + + private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { + internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); + internalCluster().startDataOnlyNodes(numDataOnlyNodes); + for (String index : indices.split(",")) { + createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); + ensureYellowAndNoInitializingShards(index); + ensureGreen(index); + } + } + + private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { + prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); + Map indexStats = indexData(1, false, INDEX_NAME); + assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards); + ensureGreen(INDEX_NAME); + return indexStats; + } + + public void testFullClusterRestoreStaleDelete() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); + setReplicaCount(0); + setReplicaCount(2); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + setReplicaCount(2); + setReplicaCount(0); + + RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateService.class + ); + + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); + BlobPath baseMetadataPath = repository.basePath() + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add("cluster-state") + .add(getClusterState().metadata().clusterUUID()); + + assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size()); + + Map indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata( + cluster().getClusterName(), + getClusterState().metadata().clusterUUID() + ); + assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas()); + assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards()); + } + + private void setReplicaCount(int replicaCount) { + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)) + .get(); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 0ebbdc81661ad..040c0663efbd9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -423,10 +423,15 @@ public UploadedIndexMetadata(StreamInput in) throws IOException { this.uploadedFilename = in.readString(); } - public String getUploadedFilename() { + public String getUploadedFilePath() { return uploadedFilename; } + public String getUploadedFilename() { + String[] splitPath = uploadedFilename.split("/"); + return splitPath[splitPath.length - 1]; + } + public String getIndexName() { return indexName; } @@ -440,7 +445,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.startObject() .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) - .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()) + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()) .endObject(); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 2aa3384b0f33a..cf750bb11f3f8 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -18,6 +18,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -28,12 +29,14 @@ import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; +import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -42,6 +45,7 @@ import java.util.Base64; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -50,6 +54,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -69,6 +74,12 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + public static final int RETAINED_MANIFESTS = 10; + + public static final String DELIMITER = "__"; + + private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); + public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000; public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( @@ -92,9 +103,6 @@ public class RemoteClusterStateService implements Closeable { Property.Final ); - private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - - public static final String DELIMITER = "__"; private static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state"; private static final String INDEX_PATH_TOKEN = "index"; private static final String MANIFEST_PATH_TOKEN = "manifest"; @@ -105,25 +113,38 @@ public class RemoteClusterStateService implements Closeable { private final Supplier repositoriesService; private final Settings settings; private final LongSupplier relativeTimeNanosSupplier; + private final ThreadPool threadpool; private BlobStoreRepository blobStoreRepository; + private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; + private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); + public RemoteClusterStateService( String nodeId, Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, - LongSupplier relativeTimeNanosSupplier + LongSupplier relativeTimeNanosSupplier, + ThreadPool threadPool ) { assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; this.nodeId = nodeId; this.repositoriesService = repositoriesService; this.settings = settings; this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; + this.threadpool = threadPool; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); } + private BlobStoreTransferService getBlobStoreTransferService() { + if (blobStoreTransferService == null) { + blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); + } + return blobStoreTransferService; + } + /** * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be * invoked by the elected cluster manager when the remote cluster state is enabled. @@ -233,6 +254,8 @@ public ClusterMetadataManifest writeIncrementalMetadata( previousManifest.getPreviousClusterUUID(), false ); + deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); + final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -439,26 +462,16 @@ private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX return blobStoreRepository.blobStore() - .blobContainer( - blobStoreRepository.basePath() - .add(encodeString(clusterName)) - .add(CLUSTER_STATE_PATH_TOKEN) - .add(clusterUUID) - .add(INDEX_PATH_TOKEN) - .add(indexUUID) - ); + .blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(INDEX_PATH_TOKEN).add(indexUUID)); } private BlobContainer manifestContainer(String clusterName, String clusterUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest - return blobStoreRepository.blobStore() - .blobContainer( - blobStoreRepository.basePath() - .add(encodeString(clusterName)) - .add(CLUSTER_STATE_PATH_TOKEN) - .add(clusterUUID) - .add(MANIFEST_PATH_TOKEN) - ); + return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); + } + + private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { + return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); } private BlobContainer clusterUUIDContainer(String clusterName) { @@ -476,13 +489,12 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { private static String getManifestFileName(long term, long version) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637_456536447 - return String.join( - DELIMITER, - MANIFEST_FILE_PREFIX, - RemoteStoreUtils.invertLong(term), - RemoteStoreUtils.invertLong(version), - RemoteStoreUtils.invertLong(System.currentTimeMillis()) - ); + return String.join(DELIMITER, getManifestFileNamePrefix(term, version), RemoteStoreUtils.invertLong(System.currentTimeMillis())); + } + + private static String getManifestFileNamePrefix(long term, long version) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637 + return String.join(DELIMITER, MANIFEST_PATH_TOKEN, RemoteStoreUtils.invertLong(term), RemoteStoreUtils.invertLong(version)); } private static String indexMetadataFileName(IndexMetadata indexMetadata) { @@ -494,6 +506,10 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) { ); } + private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { + return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN); + } + /** * Fetch latest index metadata from remote cluster state * @@ -591,7 +607,7 @@ private Map getLatestManifestForAllClusterUUIDs for (String clusterUUID : clusterUUIDs) { try { Optional manifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); - manifestsByClusterUUID.put(clusterUUID, manifest.get()); + manifest.ifPresent(clusterMetadataManifest -> manifestsByClusterUUID.put(clusterUUID, clusterMetadataManifest)); } catch (Exception e) { throw new IllegalStateException( String.format(Locale.ROOT, "Exception in fetching manifest for clusterUUID: %s", clusterUUID) @@ -710,4 +726,141 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) { super(errorDesc, cause); } } + + /** + * Purges all remote cluster state against provided cluster UUIDs + * @param clusterName name of the cluster + * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged + */ + public void deleteStaleClusterMetadata(String clusterName, List clusterUUIDs) { + clusterUUIDs.forEach(clusterUUID -> { + getBlobStoreTransferService().deleteAsync( + ThreadPool.Names.REMOTE_PURGE, + getCusterMetadataBasePath(clusterName, clusterUUID), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting all remote cluster metadata for cluster UUID {}", + clusterUUID + ), + e + ); + } + } + ); + }); + } + + /** + * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests + * @param clusterName name of the cluster + * @param clusterUUID uuid of cluster state to refer to in remote + * @param manifestsToRetain no of latest manifest files to keep in remote + */ + private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { + if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { + logger.info("Delete stale cluster metadata task is already in progress."); + return; + } + try { + getBlobStoreTransferService().listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + getManifestFolderPath(clusterName, clusterUUID), + "manifest", + Integer.MAX_VALUE, + new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + if (blobMetadata.size() > manifestsToRetain) { + deleteClusterMetadata( + clusterName, + clusterUUID, + blobMetadata.subList(0, manifestsToRetain - 1), + blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size()) + ); + } + deleteStaleMetadataRunning.set(false); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + deleteStaleMetadataRunning.set(false); + } + } + ); + } finally { + deleteStaleMetadataRunning.set(false); + } + } + + private void deleteClusterMetadata( + String clusterName, + String clusterUUID, + List activeManifestBlobMetadata, + List staleManifestBlobMetadata + ) { + try { + Set filesToKeep = new HashSet<>(); + Set staleManifestPaths = new HashSet<>(); + Set staleIndexMetadataPaths = new HashSet<>(); + activeManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() + ); + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + }); + staleManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() + ); + staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + staleIndexMetadataPaths.add( + new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + + uploadedIndexMetadata.getUploadedFilename() + + ".dat" + ); + } + }); + }); + + if (staleManifestPaths.isEmpty()) { + logger.info("No stale Remote Cluster Metadata files found"); + return; + } + + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); + } catch (IllegalStateException e) { + logger.error("Error while fetching Remote Cluster Metadata manifests", e); + } catch (IOException e) { + logger.error("Error while deleting stale Remote Cluster Metadata files", e); + } catch (Exception e) { + logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); + } + } + + private void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException { + logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); + getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths); + } } diff --git a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java index 5cdff14cae360..d05242a3aeaf7 100644 --- a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java +++ b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java @@ -36,7 +36,6 @@ import org.opensearch.snapshots.RestoreInfo; import org.opensearch.snapshots.RestoreService; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -147,7 +146,7 @@ public RemoteRestoreResult restore( .forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata)); }); - } catch (IOException e) { + } catch (Exception e) { throw new IllegalStateException("Unable to restore remote index metadata", e); } } else { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e06207a0c7bff..2c987432199a0 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -688,7 +688,8 @@ protected Node( repositoriesServiceReference::get, settings, clusterService.getClusterSettings(), - threadPool::preciseRelativeTimeInNanos + threadPool::preciseRelativeTimeInNanos, + threadPool ); } else { remoteClusterStateService = null; diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 6a2f4cd0ab300..486717faaf864 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -118,6 +118,8 @@ public class GatewayMetaStatePersistedStateTests extends OpenSearchTestCase { private DiscoveryNode localNode; private BigArrays bigArrays; + private MockGatewayMetaState gateway; + @Override public void setUp() throws Exception { bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); @@ -137,11 +139,13 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { nodeEnvironment.close(); + IOUtils.close(gateway); super.tearDown(); } - private CoordinationState.PersistedState newGatewayPersistedState() { - final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays); + private CoordinationState.PersistedState newGatewayPersistedState() throws IOException { + IOUtils.close(gateway); + gateway = new MockGatewayMetaState(localNode, bigArrays); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); gateway.start(settings, nodeEnvironment, xContentRegistry(), persistedStateRegistry); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); @@ -447,7 +451,10 @@ public void testDataOnlyNodePersistence() throws Exception { cleanup.add(gateway); final TransportService transportService = mock(TransportService.class); TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode"); - cleanup.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); + cleanup.add(() -> { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + threadPool.shutdown(); + }); when(transportService.getThreadPool()).thenReturn(threadPool); ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn( @@ -474,7 +481,8 @@ public void testDataOnlyNodePersistence() throws Exception { ), settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - () -> 0L + () -> 0L, + threadPool ); } else { return null; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 8c6ccea940816..fe8dc0b564cda 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -41,6 +41,9 @@ import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -82,6 +85,7 @@ public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; private BlobStore blobStore; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before public void setup() { @@ -117,17 +121,25 @@ public void setup() { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - () -> 0L + () -> 0L, + threadPool ); } + @After + public void teardown() throws Exception { + super.tearDown(); + remoteClusterStateService.close(); + threadPool.shutdown(); + } + public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); Assert.assertThat(manifest, nullValue()); } - public void testFailInitializationWhenRemoteStateDisabled() throws IOException { + public void testFailInitializationWhenRemoteStateDisabled() { final Settings settings = Settings.builder().build(); assertThrows( AssertionError.class, @@ -136,7 +148,8 @@ public void testFailInitializationWhenRemoteStateDisabled() throws IOException { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - () -> 0L + () -> 0L, + threadPool ) ); } diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index d49d3d290b8a8..1f56615959618 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -844,14 +844,16 @@ class MockPersistedState implements CoordinationState.PersistedState { private final CoordinationState.PersistedState delegate; private final NodeEnvironment nodeEnvironment; + private MockGatewayMetaState mockGatewayMetaState; + MockPersistedState(DiscoveryNode localNode) { try { if (rarely()) { nodeEnvironment = newNodeEnvironment(); nodeEnvironments.add(nodeEnvironment); - final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode, bigArrays); - gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry(), persistedStateRegistry()); - delegate = gatewayMetaState.getPersistedState(); + mockGatewayMetaState = new MockGatewayMetaState(localNode, bigArrays); + mockGatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry(), persistedStateRegistry()); + delegate = mockGatewayMetaState.getPersistedState(); } else { nodeEnvironment = null; delegate = new InMemoryPersistedState( diff --git a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java index dea205619ce95..d77596cf5cdd1 100644 --- a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java @@ -103,6 +103,11 @@ ClusterState prepareInitialClusterState(TransportService transportService, Clust return ClusterStateUpdaters.setLocalNode(clusterState, localNode); } + @Override + public void close() throws IOException { + super.close(); + } + public void start( Settings settings, NodeEnvironment nodeEnvironment,