From e4f355b9f1fcf73bce98e48707b591c6545ad4eb Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 12 Jun 2024 04:57:34 +0530 Subject: [PATCH] Read write ephemeral objects for remote publication of cluster state (#14089) * Read and write ephemeral objects for remote publication Co-authored-by: Sooraj Sinha Co-authored-by: Arpit Bandejiya Signed-off-by: Shivansh Arora * Add serde logic for hashes of consistent settings Signed-off-by: Sooraj Sinha * Add formattedName to readAsync for IndexMetadata Signed-off-by: Shivansh Arora --- .../cluster/RepositoryCleanupInProgress.java | 36 + .../opensearch/cluster/metadata/Metadata.java | 4 + .../cluster/routing/RoutingTable.java | 2 +- .../InternalRemoteRoutingTableService.java | 71 +- .../remote/NoopRemoteRoutingTableService.java | 21 + .../remote/RemoteRoutingTableService.java | 16 +- .../RemoteRoutingTableServiceFactory.java | 7 +- .../common/settings/ClusterSettings.java | 1 + .../RemoteClusterStateAttributesManager.java | 76 +- .../RemoteClusterStateCleanupManager.java | 44 ++ .../remote/RemoteClusterStateService.java | 694 +++++++++++++++++- .../remote/RemoteGlobalMetadataManager.java | 46 +- .../remote/RemoteIndexMetadataManager.java | 22 +- .../gateway/remote/RemoteManifestManager.java | 10 +- .../remote/model/RemoteClusterBlocks.java | 22 +- .../model/RemoteClusterStateCustoms.java | 43 +- .../remote/model/RemoteCustomMetadata.java | 41 +- .../remote/model/RemoteDiscoveryNodes.java | 22 +- .../RemoteHashesOfConsistentSettings.java | 22 +- .../recovery/RemoteStoreRestoreService.java | 6 +- .../main/java/org/opensearch/node/Node.java | 3 +- ...RemoteRoutingTableServiceFactoryTests.java | 17 +- .../RemoteRoutingTableServiceTests.java | 226 +++++- .../GatewayMetaStatePersistedStateTests.java | 3 +- .../remote/ClusterMetadataManifestTests.java | 4 +- .../RemoteClusterStateServiceTests.java | 66 +- .../RemoteGlobalMetadataManagerTests.java | 1 + .../model/RemoteCustomMetadataTests.java | 42 +- .../model/RemoteDiscoveryNodesTests.java | 2 - 29 files changed, 1347 insertions(+), 223 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java b/server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java index 72a3519aca6f8..4c76858107ed8 100644 --- a/server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; /** * Information passed during repository cleanup @@ -118,6 +119,24 @@ public Version getMinimalSupportedVersion() { return LegacyESVersion.fromId(7040099); } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RepositoryCleanupInProgress that = (RepositoryCleanupInProgress) o; + return entries.equals(that.entries); + } + + @Override + public int hashCode() { + return 31 + entries.hashCode(); + } + /** * Entry in the collection. * @@ -155,6 +174,23 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(repositoryStateId); } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RepositoryCleanupInProgress.Entry that = (RepositoryCleanupInProgress.Entry) o; + return repository.equals(that.repository) && repositoryStateId == that.repositoryStateId; + } + + @Override + public int hashCode() { + return Objects.hash(repository, repositoryStateId); + } + @Override public String toString() { return "{" + repository + '}' + '{' + repositoryStateId + '}'; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 232f900f25375..a0ef8de07fbf2 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -981,6 +981,10 @@ public static boolean isTemplatesMetadataEqual(Metadata metadata1, Metadata meta return metadata1.templates.equals(metadata2.templates); } + public static boolean isHashesOfConsistentSettingsEqual(Metadata metadata1, Metadata metadata2) { + return metadata1.hashesOfConsistentSettings.equals(metadata2.hashesOfConsistentSettings); + } + public static boolean isCustomMetadataEqual(Metadata metadata1, Metadata metadata2) { int customCount1 = 0; for (Map.Entry cursor : metadata1.customs.entrySet()) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index e4095a84be081..6c7b94f316da2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -79,7 +79,7 @@ public class RoutingTable implements Iterable, Diffable indicesRouting; - private RoutingTable(long version, final Map indicesRouting) { + public RoutingTable(long version, final Map indicesRouting) { this.version = version; this.indicesRouting = Collections.unmodifiableMap(indicesRouting); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 82facc903866f..cc1b0713393f3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -33,6 +33,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteStateTransferException; import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; @@ -44,11 +45,14 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -97,11 +101,13 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen private BlobStoreRepository blobStoreRepository; private RemoteStoreEnums.PathType pathType; private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo; + private ThreadPool threadPool; public InternalRemoteRoutingTableService( Supplier repositoriesService, Settings settings, - ClusterSettings clusterSettings + ClusterSettings clusterSettings, + ThreadPool threadpool ) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; @@ -110,6 +116,7 @@ public InternalRemoteRoutingTableService( this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING); clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting); clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting); + this.threadPool = threadpool; } private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) { @@ -266,6 +273,68 @@ private void uploadIndex( } } + @Override + public CheckedRunnable getAsyncIndexRoutingReadAction( + String uploadedFilename, + Index index, + LatchedActionListener latchedActionListener + ) { + int idx = uploadedFilename.lastIndexOf("/"); + String blobFileName = uploadedFilename.substring(idx + 1); + BlobContainer blobContainer = blobStoreRepository.blobStore() + .blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx))); + + return () -> readAsync( + blobContainer, + blobFileName, + index, + threadPool.executor(ThreadPool.Names.REMOTE_STATE_READ), + ActionListener.wrap( + response -> latchedActionListener.onResponse(response.getIndexRoutingTable()), + latchedActionListener::onFailure + ) + ); + } + + private void readAsync( + BlobContainer blobContainer, + String name, + Index index, + ExecutorService executorService, + ActionListener listener + ) { + executorService.execute(() -> { + try { + listener.onResponse(read(blobContainer, name, index)); + } catch (Exception e) { + listener.onFailure(e); + } + }); + } + + private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) { + try { + return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index); + } catch (IOException | AssertionError e) { + logger.error(() -> new ParameterizedMessage("RoutingTable read failed for path {}", path), e); + throw new RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e); + } + } + + @Override + public List getUpdatedIndexRoutingTableMetadata( + List updatedIndicesRouting, + List allIndicesRouting + ) { + return updatedIndicesRouting.stream().map(idx -> { + Optional uploadedIndexMetadataOptional = allIndicesRouting.stream() + .filter(idx2 -> idx2.getIndexName().equals(idx)) + .findFirst(); + assert uploadedIndexMetadataOptional.isPresent() == true; + return uploadedIndexMetadataOptional.get(); + }).collect(Collectors.toList()); + } + private String getIndexRoutingFileName(long term, long version) { return String.join( DELIMITER, diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index 95688c55c6519..6236d107d0220 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -16,6 +16,7 @@ import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; @@ -57,6 +58,26 @@ public List getAllUploadedIndices List indicesRoutingUploaded, List indicesRoutingToDelete ) { + // noop + return List.of(); + } + + @Override + public CheckedRunnable getAsyncIndexRoutingReadAction( + String uploadedFilename, + Index index, + LatchedActionListener latchedActionListener + ) { + // noop + return () -> {}; + } + + @Override + public List getUpdatedIndexRoutingTableMetadata( + List updatedIndicesRouting, + List allIndicesRouting + ) { + // noop return List.of(); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 55b7dab07da90..d455dfb58eabc 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -18,6 +18,7 @@ import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; @@ -25,7 +26,9 @@ import java.util.Map; /** - * Interface for RemoteRoutingTableService. Exposes methods to orchestrate upload and download of routing table from remote store. + * A Service which provides APIs to upload and download routing table from remote store. + * + * @opensearch.internal */ public interface RemoteRoutingTableService extends LifecycleComponent { public static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = @@ -43,6 +46,17 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { List getIndicesRouting(RoutingTable routingTable); + CheckedRunnable getAsyncIndexRoutingReadAction( + String uploadedFilename, + Index index, + LatchedActionListener latchedActionListener + ); + + List getUpdatedIndexRoutingTableMetadata( + List updatedIndicesRouting, + List allIndicesRouting + ); + DiffableUtils.MapDiff> getIndicesRoutingMapDiff( RoutingTable before, RoutingTable after diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java index 49f90fa261f27..82837191a30b7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -11,6 +11,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.threadpool.ThreadPool; import java.util.function.Supplier; @@ -26,15 +27,17 @@ public class RemoteRoutingTableServiceFactory { * @param repositoriesService repositoriesService * @param settings settings * @param clusterSettings clusterSettings + * @param threadPool threadPool * @return RemoteRoutingTableService */ public static RemoteRoutingTableService getService( Supplier repositoriesService, Settings settings, - ClusterSettings clusterSettings + ClusterSettings clusterSettings, + ThreadPool threadPool ) { if (isRemoteRoutingTableEnabled(settings)) { - return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings); + return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool); } return new NoopRemoteRoutingTableService(); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index f55d570f902a2..e4cd3c729389b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -724,6 +724,7 @@ public void apply(Settings value, Settings current, Settings previous) { INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, + RemoteClusterStateService.REMOTE_STATE_READ_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 7e83a7bf7da44..4098993246073 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -10,21 +10,20 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.ClusterState.Custom; -import org.opensearch.cluster.block.ClusterBlocks; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.compress.Compressor; -import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes; import org.opensearch.gateway.remote.model.RemoteReadResult; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.HashMap; @@ -42,27 +41,48 @@ public class RemoteClusterStateAttributesManager { public static final String DISCOVERY_NODES = "nodes"; public static final String CLUSTER_BLOCKS = "blocks"; public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1; - private final RemoteClusterStateBlobStore clusterBlocksBlobStore; - private final RemoteClusterStateBlobStore discoveryNodesBlobStore; - private final RemoteClusterStateBlobStore customsBlobStore; - private final Compressor compressor; - private final NamedXContentRegistry namedXContentRegistry; + private final Map remoteWritableEntityStores; private final NamedWriteableRegistry namedWriteableRegistry; RemoteClusterStateAttributesManager( - RemoteClusterStateBlobStore clusterBlocksBlobStore, - RemoteClusterStateBlobStore discoveryNodesBlobStore, - RemoteClusterStateBlobStore customsBlobStore, - Compressor compressor, - NamedXContentRegistry namedXContentRegistry, - NamedWriteableRegistry namedWriteableRegistry + String clusterName, + BlobStoreRepository blobStoreRepository, + BlobStoreTransferService blobStoreTransferService, + NamedWriteableRegistry namedWriteableRegistry, + ThreadPool threadpool ) { - this.clusterBlocksBlobStore = clusterBlocksBlobStore; - this.discoveryNodesBlobStore = discoveryNodesBlobStore; - this.customsBlobStore = customsBlobStore; - this.compressor = compressor; - this.namedXContentRegistry = namedXContentRegistry; this.namedWriteableRegistry = namedWriteableRegistry; + this.remoteWritableEntityStores = new HashMap<>(); + this.remoteWritableEntityStores.put( + RemoteDiscoveryNodes.DISCOVERY_NODES, + new RemoteClusterStateBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadpool, + ThreadPool.Names.REMOTE_STATE_READ + ) + ); + this.remoteWritableEntityStores.put( + RemoteClusterBlocks.CLUSTER_BLOCKS, + new RemoteClusterStateBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadpool, + ThreadPool.Names.REMOTE_STATE_READ + ) + ); + this.remoteWritableEntityStores.put( + RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM, + new RemoteClusterStateBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadpool, + ThreadPool.Names.REMOTE_STATE_READ + ) + ); } /** @@ -71,10 +91,9 @@ public class RemoteClusterStateAttributesManager { CheckedRunnable getAsyncMetadataWriteAction( String component, AbstractRemoteWritableBlobEntity blobEntity, - RemoteClusterStateBlobStore remoteEntityStore, LatchedActionListener latchedActionListener ) { - return () -> remoteEntityStore.writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener)); + return () -> getStore(blobEntity).writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener)); } private ActionListener getActionListener( @@ -88,17 +107,24 @@ private ActionListener getActionListener( ); } + private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { + RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); + if (remoteStore == null) { + throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); + } + return remoteStore; + } + public CheckedRunnable getAsyncMetadataReadAction( String component, AbstractRemoteWritableBlobEntity blobEntity, - RemoteClusterStateBlobStore remoteEntityStore, LatchedActionListener listener ) { final ActionListener actionListener = ActionListener.wrap( response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure ); - return () -> remoteEntityStore.readAsync(blobEntity, actionListener); + return () -> getStore(blobEntity).readAsync(blobEntity, actionListener); } public Map getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java index 92e1d40047623..99235bc96bfe3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -177,6 +177,7 @@ void deleteClusterMetadata( Set staleManifestPaths = new HashSet<>(); Set staleIndexMetadataPaths = new HashSet<>(); Set staleGlobalMetadataPaths = new HashSet<>(); + Set staleEphemeralAttributePaths = new HashSet<>(); Set staleIndexRoutingPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest( @@ -200,6 +201,23 @@ void deleteClusterMetadata( .values() .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); } + if (clusterMetadataManifest.getTransientSettingsMetadata() != null) { + filesToKeep.add(clusterMetadataManifest.getTransientSettingsMetadata().getUploadedFilename()); + } + if (clusterMetadataManifest.getHashesOfConsistentSettings() != null) { + filesToKeep.add(clusterMetadataManifest.getHashesOfConsistentSettings().getUploadedFilename()); + } + if (clusterMetadataManifest.getDiscoveryNodesMetadata() != null) { + filesToKeep.add(clusterMetadataManifest.getDiscoveryNodesMetadata().getUploadedFilename()); + } + if (clusterMetadataManifest.getClusterBlocksMetadata() != null) { + filesToKeep.add(clusterMetadataManifest.getClusterBlocksMetadata().getUploadedFilename()); + } + if (clusterMetadataManifest.getClusterStateCustomMap() != null) { + clusterMetadataManifest.getClusterStateCustomMap() + .values() + .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); + } if (clusterMetadataManifest.getIndicesRouting() != null) { clusterMetadataManifest.getIndicesRouting() .forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename())); @@ -253,6 +271,31 @@ void deleteClusterMetadata( staleIndexMetadataPaths.add(fileName); } }); + + if (clusterMetadataManifest.getClusterBlocksMetadata() != null + && !filesToKeep.contains(clusterMetadataManifest.getClusterBlocksMetadata().getUploadedFilename())) { + staleEphemeralAttributePaths.add(clusterMetadataManifest.getClusterBlocksMetadata().getUploadedFilename()); + } + if (clusterMetadataManifest.getDiscoveryNodesMetadata() != null + && !filesToKeep.contains(clusterMetadataManifest.getDiscoveryNodesMetadata().getUploadedFilename())) { + staleEphemeralAttributePaths.add(clusterMetadataManifest.getDiscoveryNodesMetadata().getUploadedFilename()); + } + if (clusterMetadataManifest.getTransientSettingsMetadata() != null + && !filesToKeep.contains(clusterMetadataManifest.getTransientSettingsMetadata().getUploadedFilename())) { + staleEphemeralAttributePaths.add(clusterMetadataManifest.getTransientSettingsMetadata().getUploadedFilename()); + } + if (clusterMetadataManifest.getHashesOfConsistentSettings() != null + && !filesToKeep.contains(clusterMetadataManifest.getHashesOfConsistentSettings().getUploadedFilename())) { + staleEphemeralAttributePaths.add(clusterMetadataManifest.getHashesOfConsistentSettings().getUploadedFilename()); + } + if (clusterMetadataManifest.getClusterStateCustomMap() != null) { + clusterMetadataManifest.getCustomMetadataMap() + .values() + .stream() + .filter(u -> !filesToKeep.contains(u.getUploadedFilename())) + .forEach(attribute -> staleEphemeralAttributePaths.add(attribute.getUploadedFilename())); + } + }); if (staleManifestPaths.isEmpty()) { @@ -262,6 +305,7 @@ void deleteClusterMetadata( deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths)); deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths)); + deleteStalePaths(new ArrayList<>(staleEphemeralAttributePaths)); deleteStalePaths(new ArrayList<>(staleManifestPaths)); try { remoteRoutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths)); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 9a0c3a35974ad..bd371ae671cf4 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -15,9 +15,17 @@ import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.Metadata.XContentContext; +import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.node.DiscoveryNodes.Builder; import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; @@ -33,14 +41,23 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.model.RemoteClusterBlocks; +import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata; import org.opensearch.gateway.remote.model.RemoteCustomMetadata; +import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes; +import org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings; +import org.opensearch.gateway.remote.model.RemoteIndexMetadata; import org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata; +import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.gateway.remote.model.RemoteTemplatesMetadata; +import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; @@ -63,21 +80,31 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTE; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.UploadedMetadataResults; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.clusterUUIDContainer; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getClusterMetadataBasePath; +import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; +import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS; import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; +import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -99,6 +126,16 @@ public class RemoteClusterStateService implements Closeable { Property.Final ); + public static final TimeValue REMOTE_STATE_READ_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final Setting REMOTE_STATE_READ_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.read_timeout", + REMOTE_STATE_READ_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private TimeValue remoteStateReadTimeout; private final String nodeId; private final Supplier repositoriesService; private final Settings settings; @@ -114,8 +151,10 @@ public class RemoteClusterStateService implements Closeable { private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; private RemoteIndexMetadataManager remoteIndexMetadataManager; private RemoteGlobalMetadataManager remoteGlobalMetadataManager; + private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager; private RemoteManifestManager remoteManifestManager; private ClusterSettings clusterSettings; + private final NamedWriteableRegistry namedWriteableRegistry; private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " @@ -138,7 +177,8 @@ public RemoteClusterStateService( ClusterService clusterService, LongSupplier relativeTimeNanosSupplier, ThreadPool threadPool, - List indexMetadataUploadListeners + List indexMetadataUploadListeners, + NamedWriteableRegistry namedWriteableRegistry ) { assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; this.nodeId = nodeId; @@ -149,9 +189,17 @@ public RemoteClusterStateService( clusterSettings = clusterService.getClusterSettings(); this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING); + clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout); this.remoteStateStats = new RemotePersistenceStats(); + this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(repositoriesService, settings, clusterSettings); + this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( + repositoriesService, + settings, + clusterSettings, + threadPool + ); this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); } @@ -172,19 +220,26 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel( clusterState, new ArrayList<>(clusterState.metadata().indices().values()), - Collections.emptyMap(), + emptyMap(), clusterState.metadata().customs(), true, true, true, + true, + true, + true, + clusterState.customs(), + true, remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()) ); final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest( clusterState, uploadedMetadataResults, previousClusterUUID, + new ClusterStateDiffManifest(clusterState, ClusterState.EMPTY_STATE), false ); + final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateSucceeded(); remoteStateStats.stateTook(durationMillis); @@ -221,6 +276,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( ClusterState clusterState, ClusterMetadataManifest previousManifest ) throws IOException { + logger.info("WRITING INCREMENTAL STATE"); + final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); @@ -233,12 +290,23 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( clusterState, previousClusterState ); + final Map clusterStateCustomsToBeDeleted = new HashMap<>( + previousManifest.getClusterStateCustomMap() + ); + final Map clusterStateCustomsToUpload = remoteClusterStateAttributesManager.getUpdatedCustoms( + clusterState, + previousClusterState + ); final Map allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap()); for (final String custom : clusterState.metadata().customs().keySet()) { // remove all the customs which are present currently customsToBeDeletedFromRemote.remove(custom); } final Map indicesToBeDeletedFromRemote = new HashMap<>(previousClusterState.metadata().indices()); + for (final String custom : clusterState.customs().keySet()) { + // remove all the custom which are present currently + clusterStateCustomsToBeDeleted.remove(custom); + } int numIndicesUpdated = 0; int numIndicesUnchanged = 0; final Map allUploadedIndexMetadata = previousManifest.getIndices() @@ -283,8 +351,15 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( ; boolean updateSettingsMetadata = firstUploadForSplitGlobalMetadata || Metadata.isSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; + boolean updateTransientSettingsMetadata = firstUploadForSplitGlobalMetadata + || Metadata.isTransientSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata || Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; + // ToDo: check if these needs to be updated or not + final boolean updateDiscoveryNodes = clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges(); + final boolean updateClusterBlocks = !clusterState.blocks().equals(previousClusterState.blocks()); + final boolean updateHashesOfConsistentSettings = firstUploadForSplitGlobalMetadata + || Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false; uploadedMetadataResults = writeMetadataInParallel( clusterState, @@ -294,6 +369,11 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( updateCoordinationMetadata, updateSettingsMetadata, updateTemplatesMetadata, + updateDiscoveryNodes, + updateClusterBlocks, + updateTransientSettingsMetadata, + clusterStateCustomsToUpload, + updateHashesOfConsistentSettings, indicesRoutingToUpload ); @@ -305,6 +385,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( // remove the data for removed custom/indices customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove); indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove); + clusterStateCustomsToBeDeleted.keySet().forEach(allUploadedCustomMap::remove); if (!updateCoordinationMetadata) { uploadedMetadataResults.uploadedCoordinationMetadata = previousManifest.getCoordinationMetadata(); @@ -312,9 +393,27 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( if (!updateSettingsMetadata) { uploadedMetadataResults.uploadedSettingsMetadata = previousManifest.getSettingsMetadata(); } + if (!updateTransientSettingsMetadata) { + uploadedMetadataResults.uploadedTransientSettingsMetadata = previousManifest.getTransientSettingsMetadata(); + } if (!updateTemplatesMetadata) { uploadedMetadataResults.uploadedTemplatesMetadata = previousManifest.getTemplatesMetadata(); } + if (!updateDiscoveryNodes && !firstUploadForSplitGlobalMetadata) { + uploadedMetadataResults.uploadedDiscoveryNodes = previousManifest.getDiscoveryNodesMetadata(); + } + if (!updateClusterBlocks && !firstUploadForSplitGlobalMetadata) { + uploadedMetadataResults.uploadedClusterBlocks = previousManifest.getClusterBlocksMetadata(); + } + if (!updateHashesOfConsistentSettings && !firstUploadForSplitGlobalMetadata) { + uploadedMetadataResults.uploadedHashesOfConsistentSettings = previousManifest.getHashesOfConsistentSettings(); + } + if (!firstUploadForSplitGlobalMetadata && customsToUpload.isEmpty()) { + uploadedMetadataResults.uploadedCustomMetadataMap = previousManifest.getCustomMetadataMap(); + } + if (!firstUploadForSplitGlobalMetadata && clusterStateCustomsToUpload.isEmpty()) { + uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = previousManifest.getClusterStateCustomMap(); + } uploadedMetadataResults.uploadedCustomMetadataMap = allUploadedCustomMap; uploadedMetadataResults.uploadedIndexMetadata = new ArrayList<>(allUploadedIndexMetadata.values()); @@ -330,6 +429,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( clusterState, uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), + new ClusterStateDiffManifest(clusterState, previousClusterState), false ); @@ -352,14 +452,35 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( indicesRoutingToUpload.size() ); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { + // TODO update logs to add more details about objects uploaded logger.warn( - "{} which is above the warn threshold of [{}]; {}", - clusterStateUploadTimeMessage, + "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + + "wrote metadata for [{}] indices and skipped [{}] unchanged indices, coordination metadata updated : [{}], " + + "settings metadata updated : [{}], templates metadata updated : [{}], custom metadata updated : [{}]", + durationMillis, slowWriteLoggingThreshold, - metadataUpdateMessage + numIndicesUpdated, + numIndicesUnchanged, + updateCoordinationMetadata, + updateSettingsMetadata, + updateTemplatesMetadata, + customsToUpload.size() ); } else { logger.info("{}; {}", clusterStateUploadTimeMessage, metadataUpdateMessage); + logger.info( + "writing cluster state for version [{}] took [{}ms]; " + + "wrote metadata for [{}] indices and skipped [{}] unchanged indices, coordination metadata updated : [{}], " + + "settings metadata updated : [{}], templates metadata updated : [{}], custom metadata updated : [{}]", + manifestDetails.getClusterMetadataManifest().getStateVersion(), + durationMillis, + numIndicesUpdated, + numIndicesUnchanged, + updateCoordinationMetadata, + updateSettingsMetadata, + updateTemplatesMetadata, + customsToUpload.size() + ); } return manifestDetails; } @@ -372,12 +493,18 @@ private UploadedMetadataResults writeMetadataInParallel( boolean uploadCoordinationMetadata, boolean uploadSettingsMetadata, boolean uploadTemplateMetadata, + boolean uploadDiscoveryNodes, + boolean uploadClusterBlock, + boolean uploadTransientSettingMetadata, + Map clusterStateCustomToUpload, + boolean uploadHashesOfConsistentSettings, List indicesRoutingToUpload ) throws IOException { assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null"; int totalUploadTasks = indexToUpload.size() + indexMetadataUploadListeners.size() + customToUpload.size() + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0) - + indicesRoutingToUpload.size(); + + (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0) + (uploadTransientSettingMetadata ? 1 : 0) + + clusterStateCustomToUpload.size() + (uploadHashesOfConsistentSettings ? 1 : 0) + indicesRoutingToUpload.size(); CountDownLatch latch = new CountDownLatch(totalUploadTasks); Map> uploadTasks = new ConcurrentHashMap<>(totalUploadTasks); Map results = new ConcurrentHashMap<>(totalUploadTasks); @@ -412,6 +539,21 @@ private UploadedMetadataResults writeMetadataInParallel( ) ); } + if (uploadTransientSettingMetadata) { + uploadTasks.put( + TRANSIENT_SETTING_METADATA, + remoteGlobalMetadataManager.getAsyncMetadataWriteAction( + new RemoteTransientSettingsMetadata( + clusterState.metadata().transientSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener + ) + ); + } if (uploadCoordinationMetadata) { uploadTasks.put( COORDINATION_METADATA, @@ -442,6 +584,50 @@ private UploadedMetadataResults writeMetadataInParallel( ) ); } + if (uploadDiscoveryNodes) { + uploadTasks.put( + DISCOVERY_NODES, + remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + RemoteDiscoveryNodes.DISCOVERY_NODES, + new RemoteDiscoveryNodes( + clusterState.nodes(), + clusterState.version(), + clusterState.stateUUID(), + blobStoreRepository.getCompressor() + ), + listener + ) + ); + } + if (uploadClusterBlock) { + uploadTasks.put( + CLUSTER_BLOCKS, + remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + RemoteClusterBlocks.CLUSTER_BLOCKS, + new RemoteClusterBlocks( + clusterState.blocks(), + clusterState.version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ), + listener + ) + ); + } + if (uploadHashesOfConsistentSettings) { + uploadTasks.put( + HASHES_OF_CONSISTENT_SETTINGS, + remoteGlobalMetadataManager.getAsyncMetadataWriteAction( + new RemoteHashesOfConsistentSettings( + (DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ), + listener + ) + ); + } customToUpload.forEach((key, value) -> { String customComponent = String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, key); uploadTasks.put( @@ -453,7 +639,7 @@ private UploadedMetadataResults writeMetadataInParallel( clusterState.metadata().version(), clusterState.metadata().clusterUUID(), blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() + namedWriteableRegistry ), listener ) @@ -466,6 +652,23 @@ private UploadedMetadataResults writeMetadataInParallel( ); }); + clusterStateCustomToUpload.forEach((key, value) -> { + uploadTasks.put( + key, + remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + CLUSTER_STATE_CUSTOM, + new RemoteClusterStateCustoms( + value, + key, + clusterState.version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener + ) + ); + }); indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), @@ -537,6 +740,12 @@ private UploadedMetadataResults writeMetadataInParallel( custom, new UploadedMetadataAttribute(custom, uploadedMetadata.getUploadedFilename()) ); + } else if (name.startsWith(CLUSTER_STATE_CUSTOM)) { + String custom = name.split(DELIMITER)[0].split(CUSTOM_DELIMITER)[1]; + response.uploadedClusterStateCustomMetadataMap.put( + custom, + new UploadedMetadataAttribute(custom, uploadedMetadata.getUploadedFilename()) + ); } else if (COORDINATION_METADATA.equals(name)) { response.uploadedCoordinationMetadata = (UploadedMetadataAttribute) uploadedMetadata; } else if (RemotePersistentSettingsMetadata.SETTING_METADATA.equals(name)) { @@ -545,10 +754,19 @@ private UploadedMetadataResults writeMetadataInParallel( response.uploadedTemplatesMetadata = (UploadedMetadataAttribute) uploadedMetadata; } else if (name.contains(UploadedIndexMetadata.COMPONENT_PREFIX)) { response.uploadedIndexMetadata.add((UploadedIndexMetadata) uploadedMetadata); + } else if (RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA.equals(name)) { + response.uploadedTransientSettingsMetadata = (UploadedMetadataAttribute) uploadedMetadata; + } else if (RemoteDiscoveryNodes.DISCOVERY_NODES.equals(uploadedMetadata.getComponent())) { + response.uploadedDiscoveryNodes = (UploadedMetadataAttribute) uploadedMetadata; + } else if (RemoteClusterBlocks.CLUSTER_BLOCKS.equals(uploadedMetadata.getComponent())) { + response.uploadedClusterBlocks = (UploadedMetadataAttribute) uploadedMetadata; + } else if (RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS.equals(uploadedMetadata.getComponent())) { + response.uploadedHashesOfConsistentSettings = (UploadedMetadataAttribute) uploadedMetadata; } else { throw new IllegalStateException("Unknown metadata component name " + name); } }); + logger.info("response {}", response.uploadedIndicesRoutingMetadata.toString()); return response; } @@ -644,6 +862,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clus clusterState, uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), + previousManifest.getDiffManifest(), true ); if (!previousManifest.isClusterUUIDCommitted() && committedManifestDetails.getClusterMetadataManifest().isClusterUUIDCommitted()) { @@ -664,6 +883,10 @@ public Optional getLatestClusterMetadataManifest(String return remoteManifestManager.getLatestClusterMetadataManifest(clusterName, clusterUUID); } + public ClusterMetadataManifest getClusterMetadataManifestByFileName(String clusterUUID, String fileName) { + return remoteManifestManager.getRemoteClusterMetadataManifestByFileName(clusterUUID, fileName); + } + @Override public void close() throws IOException { remoteClusterStateCleanupManager.close(); @@ -691,6 +914,7 @@ public void start() { clusterName, blobStoreRepository, blobStoreTransferService, + namedWriteableRegistry, threadpool ); remoteIndexMetadataManager = new RemoteIndexMetadataManager( @@ -708,6 +932,13 @@ public void start() { blobStoreTransferService, threadpool ); + remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager( + clusterName, + blobStoreRepository, + blobStoreTransferService, + namedWriteableRegistry, + threadpool + ); remoteClusterStateCleanupManager.start(); } @@ -739,7 +970,7 @@ BlobStore getBlobStore() { * @param clusterName name of the cluster * @return {@link IndexMetadata} */ - public ClusterState getLatestClusterState(String clusterName, String clusterUUID) { + public ClusterState getLatestClusterState(String clusterName, String clusterUUID, boolean includeEphemeral) throws IOException { Optional clusterMetadataManifest = remoteManifestManager.getLatestClusterMetadataManifest( clusterName, clusterUUID @@ -750,19 +981,299 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID ); } - // Fetch Global Metadata - Metadata globalMetadata = remoteGlobalMetadataManager.getGlobalMetadata(clusterUUID, clusterMetadataManifest.get()); + return getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId, includeEphemeral); + } + + private ClusterState readClusterStateInParallel( + ClusterState previousState, + ClusterMetadataManifest manifest, + String clusterUUID, + String localNodeId, + List indicesToRead, + Map customToRead, + boolean readCoordinationMetadata, + boolean readSettingsMetadata, + boolean readTransientSettingsMetadata, + boolean readTemplatesMetadata, + boolean readDiscoveryNodes, + boolean readClusterBlocks, + List indicesRoutingToRead, + boolean readHashesOfConsistentSettings, + Map clusterStateCustomToRead, + boolean includeEphemeral + ) throws IOException { + int totalReadTasks = indicesToRead.size() + customToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata + ? 1 + : 0) + (readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0) + clusterStateCustomToRead.size() + + indicesRoutingToRead.size(); + CountDownLatch latch = new CountDownLatch(totalReadTasks); + List> asyncMetadataReadActions = new ArrayList<>(); + List readResults = Collections.synchronizedList(new ArrayList<>()); + List readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>()); + List exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks)); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(response -> { + logger.debug("Successfully read cluster state component from remote"); + readResults.add(response); + }, ex -> { + logger.error("Failed to read cluster state from remote", ex); + exceptionList.add(ex); + }), latch); + + for (UploadedIndexMetadata indexMetadata : indicesToRead) { + asyncMetadataReadActions.add( + remoteIndexMetadataManager.getAsyncIndexMetadataReadAction(clusterUUID, indexMetadata.getUploadedFilename(), listener) + ); + } + + LatchedActionListener routingTableLatchedActionListener = new LatchedActionListener<>( + ActionListener.wrap(response -> { + logger.debug("Successfully read cluster state component from remote"); + readIndexRoutingTableResults.add(response); + }, ex -> { + logger.error("Failed to read cluster state from remote", ex); + exceptionList.add(ex); + }), + latch + ); + + for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) { + asyncMetadataReadActions.add( + remoteRoutingTableService.getAsyncIndexRoutingReadAction( + indexRouting.getUploadedFilename(), + new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), + routingTableLatchedActionListener + ) + ); + } - // Fetch Index Metadata - Map indices = remoteIndexMetadataManager.getIndexMetadataMap(clusterUUID, clusterMetadataManifest.get()); + for (Map.Entry entry : customToRead.entrySet()) { + asyncMetadataReadActions.add( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + new RemoteCustomMetadata( + entry.getValue().getUploadedFilename(), + entry.getKey(), + clusterUUID, + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + entry.getValue().getAttributeName(), + listener + ) + ); + } + if (readCoordinationMetadata) { + asyncMetadataReadActions.add( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + new RemoteCoordinationMetadata( + manifest.getCoordinationMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + COORDINATION_METADATA, + listener + ) + ); + } + + if (readSettingsMetadata) { + asyncMetadataReadActions.add( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + new RemotePersistentSettingsMetadata( + manifest.getSettingsMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + SETTING_METADATA, + listener + ) + ); + } + + if (readTransientSettingsMetadata) { + asyncMetadataReadActions.add( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + new RemoteTransientSettingsMetadata( + manifest.getTransientSettingsMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + TRANSIENT_SETTING_METADATA, + listener + ) + ); + } + + if (readTemplatesMetadata) { + asyncMetadataReadActions.add( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + new RemoteTemplatesMetadata( + manifest.getTemplatesMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + TEMPLATES_METADATA, + listener + ) + ); + } + + if (readDiscoveryNodes) { + asyncMetadataReadActions.add( + remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + DISCOVERY_NODES, + new RemoteDiscoveryNodes( + manifest.getDiscoveryNodesMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener + ) + ); + } + + if (readClusterBlocks) { + asyncMetadataReadActions.add( + remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + CLUSTER_BLOCKS, + new RemoteClusterBlocks( + manifest.getClusterBlocksMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener + ) + ); + } + + if (readHashesOfConsistentSettings) { + asyncMetadataReadActions.add( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + new RemoteHashesOfConsistentSettings( + manifest.getHashesOfConsistentSettings().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + HASHES_OF_CONSISTENT_SETTINGS, + listener + ) + ); + } + + for (Map.Entry entry : clusterStateCustomToRead.entrySet()) { + asyncMetadataReadActions.add( + remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + CLUSTER_STATE_CUSTOM, + new RemoteClusterStateCustoms( + entry.getValue().getUploadedFilename(), + entry.getValue().getAttributeName(), + clusterUUID, + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener + ) + ); + } + + for (CheckedRunnable asyncMetadataReadAction : asyncMetadataReadActions) { + asyncMetadataReadAction.run(); + } + + try { + if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) { + RemoteStateTransferException exception = new RemoteStateTransferException( + "Timed out waiting to read cluster state from remote within timeout " + this.remoteStateReadTimeout + ); + exceptionList.forEach(exception::addSuppressed); + throw exception; + } + } catch (InterruptedException e) { + exceptionList.forEach(e::addSuppressed); + RemoteStateTransferException ex = new RemoteStateTransferException( + "Interrupted while waiting to read cluster state from metadata" + ); + Thread.currentThread().interrupt(); + throw ex; + } + + if (!exceptionList.isEmpty()) { + RemoteStateTransferException exception = new RemoteStateTransferException("Exception during reading cluster state from remote"); + exceptionList.forEach(exception::addSuppressed); + throw exception; + } + + final ClusterState.Builder clusterStateBuilder = ClusterState.builder(previousState); + AtomicReference discoveryNodesBuilder = new AtomicReference<>(DiscoveryNodes.builder()); + Metadata.Builder metadataBuilder = Metadata.builder(previousState.metadata()); + metadataBuilder.version(manifest.getMetadataVersion()); + metadataBuilder.clusterUUID(manifest.getClusterUUID()); + metadataBuilder.clusterUUIDCommitted(manifest.isClusterUUIDCommitted()); Map indexMetadataMap = new HashMap<>(); - indices.values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); }); + Map indicesRouting = new HashMap<>(previousState.routingTable().getIndicesRouting()); + + readResults.forEach(remoteReadResult -> { + switch (remoteReadResult.getComponent()) { + case RemoteIndexMetadata.INDEX: + IndexMetadata indexMetadata = (IndexMetadata) remoteReadResult.getObj(); + indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); + break; + case CUSTOM_METADATA: + Metadata.Custom metadataCustom = (Metadata.Custom) remoteReadResult.getObj(); + if (includeEphemeral || (!includeEphemeral && metadataCustom.context().contains(XContentContext.GATEWAY))) { + metadataBuilder.putCustom(remoteReadResult.getComponentName(), (Metadata.Custom) remoteReadResult.getObj()); + } + break; + case COORDINATION_METADATA: + metadataBuilder.coordinationMetadata((CoordinationMetadata) remoteReadResult.getObj()); + break; + case SETTING_METADATA: + metadataBuilder.persistentSettings((Settings) remoteReadResult.getObj()); + break; + case TRANSIENT_SETTING_METADATA: + metadataBuilder.transientSettings((Settings) remoteReadResult.getObj()); + break; + case TEMPLATES_METADATA: + metadataBuilder.templates((TemplatesMetadata) remoteReadResult.getObj()); + break; + case HASHES_OF_CONSISTENT_SETTINGS: + metadataBuilder.hashesOfConsistentSettings((DiffableStringMap) remoteReadResult.getObj()); + break; + case CLUSTER_STATE_ATTRIBUTE: + if (remoteReadResult.getComponentName().equals(DISCOVERY_NODES)) { + discoveryNodesBuilder.set(DiscoveryNodes.builder((DiscoveryNodes) remoteReadResult.getObj())); + } else if (remoteReadResult.getComponentName().equals(CLUSTER_BLOCKS)) { + clusterStateBuilder.blocks((ClusterBlocks) remoteReadResult.getObj()); + } else if (remoteReadResult.getComponentName().startsWith(CLUSTER_STATE_CUSTOM)) { + // component name for mat is "cluster-state-custom--custom_name" + String custom = remoteReadResult.getComponentName().split(CUSTOM_DELIMITER)[1]; + clusterStateBuilder.putCustom(custom, (ClusterState.Custom) remoteReadResult.getObj()); + } + break; + default: + throw new IllegalStateException("Unknown component: " + remoteReadResult.getComponent()); + } + }); - return ClusterState.builder(ClusterState.EMPTY_STATE) - .version(clusterMetadataManifest.get().getStateVersion()) - .metadata(Metadata.builder(globalMetadata).indices(indexMetadataMap).build()) - .build(); + metadataBuilder.indices(indexMetadataMap); + if (readDiscoveryNodes) { + clusterStateBuilder.nodes(discoveryNodesBuilder.get().localNodeId(localNodeId)); + } + + clusterStateBuilder.metadata(metadataBuilder).version(manifest.getStateVersion()).stateUUID(manifest.getStateUUID()); + + readIndexRoutingTableResults.forEach( + indexRoutingTable -> indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable) + ); + clusterStateBuilder.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting)); + + return clusterStateBuilder.build(); } public ClusterState getClusterStateForManifest( @@ -770,23 +1281,137 @@ public ClusterState getClusterStateForManifest( ClusterMetadataManifest manifest, String localNodeId, boolean includeEphemeral - ) { - // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 - return null; + ) throws IOException { + if (manifest.onOrAfterCodecVersion(CODEC_V2)) { + return readClusterStateInParallel( + ClusterState.builder(new ClusterName(clusterName)).build(), + manifest, + manifest.getClusterUUID(), + localNodeId, + manifest.getIndices(), + manifest.getCustomMetadataMap(), + manifest.getCoordinationMetadata() != null, + manifest.getSettingsMetadata() != null, + manifest.getTransientSettingsMetadata() != null, + manifest.getTemplatesMetadata() != null, + includeEphemeral && manifest.getDiscoveryNodesMetadata() != null, + includeEphemeral && manifest.getClusterBlocksMetadata() != null, + includeEphemeral ? manifest.getIndicesRouting() : emptyList(), + includeEphemeral && manifest.getHashesOfConsistentSettings() != null, + includeEphemeral ? manifest.getClusterStateCustomMap() : emptyMap(), + includeEphemeral + ); + } else { + ClusterState clusterState = readClusterStateInParallel( + ClusterState.builder(new ClusterName(clusterName)).build(), + manifest, + manifest.getClusterUUID(), + localNodeId, + manifest.getIndices(), + // for manifest codec V1, we don't have the following objects to read, so not passing anything + emptyMap(), + false, + false, + false, + false, + false, + false, + emptyList(), + false, + emptyMap(), + false + ); + Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest)); + mb.indices(clusterState.metadata().indices()); + return ClusterState.builder(clusterState).metadata(mb).build(); + } + } public ClusterState getClusterStateUsingDiff( String clusterName, ClusterMetadataManifest manifest, - ClusterState previousClusterState, + ClusterState previousState, String localNodeId - ) { - // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 - return null; - } + ) throws IOException { + assert manifest.getDiffManifest() != null; + ClusterStateDiffManifest diff = manifest.getDiffManifest(); + List updatedIndices = diff.getIndicesUpdated().stream().map(idx -> { + Optional uploadedIndexMetadataOptional = manifest.getIndices() + .stream() + .filter(idx2 -> idx2.getIndexName().equals(idx)) + .findFirst(); + assert uploadedIndexMetadataOptional.isPresent() == true; + return uploadedIndexMetadataOptional.get(); + }).collect(Collectors.toList()); + + Map updatedCustomMetadata = new HashMap<>(); + if (diff.getCustomMetadataUpdated() != null) { + for (String customType : diff.getCustomMetadataUpdated()) { + updatedCustomMetadata.put(customType, manifest.getCustomMetadataMap().get(customType)); + } + } + Map updatedClusterStateCustom = new HashMap<>(); + if (diff.getClusterStateCustomUpdated() != null) { + for (String customType : diff.getClusterStateCustomUpdated()) { + updatedClusterStateCustom.put(customType, manifest.getClusterStateCustomMap().get(customType)); + } + } + + List updatedIndexRouting = new ArrayList<>(); + updatedIndexRouting.addAll( + remoteRoutingTableService.getUpdatedIndexRoutingTableMetadata(diff.getIndicesRoutingUpdated(), manifest.getIndicesRouting()) + ); + + ClusterState updatedClusterState = readClusterStateInParallel( + previousState, + manifest, + manifest.getClusterUUID(), + localNodeId, + updatedIndices, + updatedCustomMetadata, + diff.isCoordinationMetadataUpdated(), + diff.isSettingsMetadataUpdated(), + diff.isTransientSettingsMetadataUpdated(), + diff.isTemplatesMetadataUpdated(), + diff.isDiscoveryNodesUpdated(), + diff.isClusterBlocksUpdated(), + updatedIndexRouting, + diff.isHashesOfConsistentSettingsUpdated(), + updatedClusterStateCustom, + true + ); + ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState); + Metadata.Builder metadataBuilder = Metadata.builder(updatedClusterState.metadata()); + // remove the deleted indices from the metadata + for (String index : diff.getIndicesDeleted()) { + metadataBuilder.remove(index); + } + // remove the deleted metadata customs from the metadata + if (diff.getCustomMetadataDeleted() != null) { + for (String customType : diff.getCustomMetadataDeleted()) { + metadataBuilder.removeCustom(customType); + } + } + + // remove the deleted cluster state customs from the metadata + if (diff.getClusterStateCustomDeleted() != null) { + for (String customType : diff.getClusterStateCustomDeleted()) { + clusterStateBuilder.removeCustom(customType); + } + } + + HashMap indexRoutingTables = new HashMap<>(updatedClusterState.getRoutingTable().getIndicesRouting()); + + for (String indexName : diff.getIndicesRoutingDeleted()) { + indexRoutingTables.remove(indexName); + } - public ClusterMetadataManifest getClusterMetadataManifestByFileName(String clusterUUID, String manifestFileName) { - return remoteManifestManager.getRemoteClusterMetadataManifestByFileName(clusterUUID, manifestFileName); + return clusterStateBuilder.stateUUID(manifest.getStateUUID()) + .version(manifest.getStateVersion()) + .metadata(metadataBuilder) + .routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables)) + .build(); } /** @@ -815,6 +1440,17 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } } + public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { + this.remoteStateReadTimeout = remoteStateReadTimeout; + } + + private BlobStoreTransferService getBlobStoreTransferService() { + if (blobStoreTransferService == null) { + blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); + } + return blobStoreTransferService; + } + Set getAllClusterUUIDs(String clusterName) throws IOException { Map clusterUUIDMetadata = clusterUUIDContainer(blobStoreRepository, clusterName).children(); if (clusterUUIDMetadata == null) { @@ -861,7 +1497,7 @@ private List createClusterChain(final Map 1) { logger.info("Top level cluster UUIDs: {}", topLevelClusterUUIDs); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 8a4598e52708e..cd29114e05684 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -22,14 +22,19 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata; import org.opensearch.gateway.remote.model.RemoteCustomMetadata; import org.opensearch.gateway.remote.model.RemoteGlobalMetadata; +import org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings; import org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata; +import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.gateway.remote.model.RemoteTemplatesMetadata; +import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -65,17 +70,20 @@ public class RemoteGlobalMetadataManager { private Map remoteWritableEntityStores; private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; + private final NamedWriteableRegistry namedWriteableRegistry; RemoteGlobalMetadataManager( ClusterSettings clusterSettings, String clusterName, BlobStoreRepository blobStoreRepository, BlobStoreTransferService blobStoreTransferService, + NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadpool ) { this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING); this.compressor = blobStoreRepository.getCompressor(); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); + this.namedWriteableRegistry = namedWriteableRegistry; this.remoteWritableEntityStores = new HashMap<>(); this.remoteWritableEntityStores.put( RemoteGlobalMetadata.GLOBAL_METADATA, @@ -107,6 +115,26 @@ public class RemoteGlobalMetadataManager { ThreadPool.Names.REMOTE_STATE_READ ) ); + this.remoteWritableEntityStores.put( + RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA, + new RemoteClusterStateBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadpool, + ThreadPool.Names.REMOTE_STATE_READ + ) + ); + this.remoteWritableEntityStores.put( + RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS, + new RemoteClusterStateBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadpool, + ThreadPool.Names.REMOTE_STATE_READ + ) + ); this.remoteWritableEntityStores.put( RemoteTemplatesMetadata.TEMPLATES_METADATA, new RemoteClusterStateBlobStore<>( @@ -154,8 +182,22 @@ private ActionListener getActionListener( ) { return ActionListener.wrap( resp -> latchedActionListener.onResponse(remoteBlobStoreObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed", ex)) + ex -> latchedActionListener.onFailure( + new RemoteStateTransferException("Upload failed for " + remoteBlobStoreObject.getType(), ex) + ) + ); + } + + CheckedRunnable getAsyncMetadataReadAction( + AbstractRemoteWritableBlobEntity readEntity, + String componentName, + LatchedActionListener listener + ) { + ActionListener actionListener = ActionListener.wrap( + response -> listener.onResponse(new RemoteReadResult((ToXContent) response, readEntity.getType(), componentName)), + listener::onFailure ); + return () -> getStore(readEntity).readAsync(readEntity, actionListener); } Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { @@ -213,7 +255,7 @@ Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMe key, clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); builder.putCustom(key, (Custom) getStore(remoteCustomMetadata).read(remoteCustomMetadata)); } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index 80ae33e2bb00c..a84161b202a22 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -20,6 +20,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteIndexMetadata; +import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -67,7 +68,6 @@ public RemoteIndexMetadataManager( threadpool, ThreadPool.Names.REMOTE_STATE_READ ); - ; this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.compressor = blobStoreRepository.getCompressor(); this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); @@ -93,6 +93,26 @@ CheckedRunnable getAsyncIndexMetadataWriteAction( return () -> indexMetadataBlobStore.writeAsync(remoteIndexMetadata, completionListener); } + CheckedRunnable getAsyncIndexMetadataReadAction( + String clusterUUID, + String uploadedFilename, + LatchedActionListener latchedActionListener + ) { + RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata( + RemoteClusterStateUtils.getFormattedIndexFileName(uploadedFilename), + clusterUUID, + compressor, + namedXContentRegistry + ); + ActionListener actionListener = ActionListener.wrap( + response -> latchedActionListener.onResponse( + new RemoteReadResult(response, RemoteIndexMetadata.INDEX, response.getIndex().getName()) + ), + latchedActionListener::onFailure + ); + return () -> indexMetadataBlobStore.readAsync(remoteIndexMetadata, actionListener); + } + /** * Fetch index metadata from remote cluster state * diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java index 86d5a4d1aafb5..cb09de1a6ec44 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -96,6 +96,7 @@ RemoteClusterStateManifestInfo uploadManifest( ClusterState clusterState, RemoteClusterStateUtils.UploadedMetadataResults uploadedMetadataResult, String previousClusterUUID, + ClusterStateDiffManifest clusterDiffManifest, boolean committed ) { synchronized (this) { @@ -116,7 +117,14 @@ RemoteClusterStateManifestInfo uploadManifest( .templatesMetadata(uploadedMetadataResult.uploadedTemplatesMetadata) .customMetadataMap(uploadedMetadataResult.uploadedCustomMetadataMap) .routingTableVersion(clusterState.getRoutingTable().version()) - .indicesRouting(uploadedMetadataResult.uploadedIndicesRoutingMetadata); + .indicesRouting(uploadedMetadataResult.uploadedIndicesRoutingMetadata) + .discoveryNodesMetadata(uploadedMetadataResult.uploadedDiscoveryNodes) + .clusterBlocksMetadata(uploadedMetadataResult.uploadedClusterBlocks) + .diffManifest(clusterDiffManifest) + .metadataVersion(clusterState.metadata().version()) + .transientSettingsMetadata(uploadedMetadataResult.uploadedTransientSettingsMetadata) + .clusterStateCustomMetadataMap(uploadedMetadataResult.uploadedClusterStateCustomMetadataMap) + .hashesOfConsistentSettings(uploadedMetadataResult.uploadedHashesOfConsistentSettings); final ClusterMetadataManifest manifest = manifestBuilder.build(); String manifestFileName = writeMetadataManifest(clusterState.metadata().clusterUUID(), manifest); return new RemoteClusterStateManifestInfo(manifest, manifestFileName); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java index 01704df700e20..9c5fbd5941640 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java @@ -10,21 +10,18 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.common.io.Streams; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; -import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; import java.io.IOException; import java.io.InputStream; import java.util.List; -import static org.opensearch.core.common.bytes.BytesReference.toBytes; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; @@ -35,6 +32,10 @@ public class RemoteClusterBlocks extends AbstractRemoteWritableBlobEntity { public static final String CLUSTER_BLOCKS = "blocks"; + public static final ChecksumWritableBlobStoreFormat CLUSTER_BLOCKS_FORMAT = new ChecksumWritableBlobStoreFormat<>( + "blocks", + ClusterBlocks::readFrom + ); private ClusterBlocks clusterBlocks; private long stateVersion; @@ -82,20 +83,11 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { - clusterBlocks.writeTo(bytesStreamOutput); - return bytesStreamOutput.bytes().streamInput(); - } catch (IOException e) { - throw new IOException("Failed to serialize remote cluster blocks", e); - } + return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput(); } @Override public ClusterBlocks deserialize(final InputStream inputStream) throws IOException { - try (StreamInput in = new BytesStreamInput(toBytes(Streams.readFully(inputStream)))) { - return ClusterBlocks.readFrom(in); - } catch (IOException e) { - throw new IOException("Failed to deserialize remote cluster blocks", e); - } + return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java index 25d6ce5848f9f..f384908bc6b65 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -11,22 +11,19 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState.Custom; import org.opensearch.common.io.Streams; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; -import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; import java.io.IOException; import java.io.InputStream; import java.util.List; -import static org.opensearch.cluster.ClusterState.FeatureAware.shouldSerialize; -import static org.opensearch.core.common.bytes.BytesReference.toBytes; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; @@ -39,9 +36,10 @@ public class RemoteClusterStateCustoms extends AbstractRemoteWritableBlobEntity< public static final String CLUSTER_STATE_CUSTOM = "cluster-state-custom"; private long stateVersion; - private String customType; + private final String customType; private ClusterState.Custom custom; private final NamedWriteableRegistry namedWriteableRegistry; + private final ChecksumWritableBlobStoreFormat clusterStateCustomsFormat; public RemoteClusterStateCustoms( final ClusterState.Custom custom, @@ -56,6 +54,10 @@ public RemoteClusterStateCustoms( this.customType = customType; this.custom = custom; this.namedWriteableRegistry = namedWriteableRegistry; + this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>( + "cluster-state-custom", + is -> readFrom(is, namedWriteableRegistry, customType) + ); } public RemoteClusterStateCustoms( @@ -69,6 +71,10 @@ public RemoteClusterStateCustoms( this.blobName = blobName; this.customType = customType; this.namedWriteableRegistry = namedWriteableRegistry; + this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>( + "cluster-state-custom", + is -> readFrom(is, namedWriteableRegistry, customType) + ); } @Override @@ -107,27 +113,16 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - try (BytesStreamOutput outputStream = new BytesStreamOutput()) { - if (shouldSerialize(outputStream, custom)) { - outputStream.writeNamedWriteable(custom); - } - return outputStream.bytes().streamInput(); - } catch (IOException e) { - throw new IOException("Failed to serialize cluster state custom of type " + customType, e); - } + return clusterStateCustomsFormat.serialize(custom, generateBlobFileName(), getCompressor()).streamInput(); } @Override public ClusterState.Custom deserialize(final InputStream inputStream) throws IOException { - try ( - NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput( - new BytesStreamInput(toBytes(Streams.readFully(inputStream))), - this.namedWriteableRegistry - ) - ) { - return in.readNamedWriteable(Custom.class); - } catch (IOException e) { - throw new IOException("Failed to deserialize cluster state custom of type " + customType, e); - } + return clusterStateCustomsFormat.deserialize(blobName, Streams.readFully(inputStream)); + } + + public static ClusterState.Custom readFrom(StreamInput streamInput, NamedWriteableRegistry namedWriteableRegistry, String customType) + throws IOException { + return namedWriteableRegistry.getReader(ClusterState.Custom.class, customType).read(streamInput); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java index 1947fd9e0bb88..4c7069ee8be9e 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java @@ -8,18 +8,17 @@ package org.opensearch.gateway.remote.model; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.Metadata.Custom; import org.opensearch.common.io.Streams; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; -import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; -import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; import java.io.IOException; import java.io.InputStream; @@ -28,7 +27,6 @@ import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT; /** * Wrapper class for uploading/downloading {@link Custom} to/from remote blob store @@ -37,11 +35,12 @@ public class RemoteCustomMetadata extends AbstractRemoteWritableBlobEntity customBlobStoreFormat; + public final ChecksumWritableBlobStoreFormat customBlobStoreFormat; private Custom custom; private final String customType; private long metadataVersion; + private final NamedWriteableRegistry namedWriteableRegistry; public RemoteCustomMetadata( final Custom custom, @@ -49,16 +48,16 @@ public RemoteCustomMetadata( final long metadataVersion, final String clusterUUID, Compressor compressor, - NamedXContentRegistry namedXContentRegistry + NamedWriteableRegistry namedWriteableRegistry ) { - super(clusterUUID, compressor, namedXContentRegistry); + super(clusterUUID, compressor, null); this.custom = custom; this.customType = customType; this.metadataVersion = metadataVersion; - this.customBlobStoreFormat = new ChecksumBlobStoreFormat<>( + this.namedWriteableRegistry = namedWriteableRegistry; + this.customBlobStoreFormat = new ChecksumWritableBlobStoreFormat<>( "custom", - METADATA_NAME_PLAIN_FORMAT, - (parser -> Metadata.Custom.fromXContent(parser, customType)) + is -> readFrom(is, namedWriteableRegistry, customType) ); } @@ -67,15 +66,15 @@ public RemoteCustomMetadata( final String customType, final String clusterUUID, final Compressor compressor, - final NamedXContentRegistry namedXContentRegistry + final NamedWriteableRegistry namedWriteableRegistry ) { - super(clusterUUID, compressor, namedXContentRegistry); + super(clusterUUID, compressor, null); this.blobName = blobName; this.customType = customType; - this.customBlobStoreFormat = new ChecksumBlobStoreFormat<>( + this.namedWriteableRegistry = namedWriteableRegistry; + this.customBlobStoreFormat = new ChecksumWritableBlobStoreFormat<>( "custom", - METADATA_NAME_PLAIN_FORMAT, - (parser -> Metadata.Custom.fromXContent(parser, customType)) + is -> readFrom(is, namedWriteableRegistry, customType) ); } @@ -107,13 +106,12 @@ public String generateBlobFileName() { @Override public InputStream serialize() throws IOException { - return customBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS) - .streamInput(); + return customBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor()).streamInput(); } @Override public Custom deserialize(final InputStream inputStream) throws IOException { - return customBlobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + return customBlobStoreFormat.deserialize(blobName, Streams.readFully(inputStream)); } @Override @@ -121,4 +119,9 @@ public UploadedMetadata getUploadedMetadata() { assert blobName != null; return new UploadedMetadataAttribute(String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, customType), blobName); } + + public static Custom readFrom(StreamInput streamInput, NamedWriteableRegistry namedWriteableRegistry, String customType) + throws IOException { + return namedWriteableRegistry.getReader(Custom.class, customType).read(streamInput); + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java index 970f485abdc49..fb399e2899cdd 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -10,21 +10,18 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.io.Streams; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; -import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; import java.io.IOException; import java.io.InputStream; import java.util.List; -import static org.opensearch.core.common.bytes.BytesReference.toBytes; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; @@ -35,6 +32,10 @@ public class RemoteDiscoveryNodes extends AbstractRemoteWritableBlobEntity { public static final String DISCOVERY_NODES = "nodes"; + public static final ChecksumWritableBlobStoreFormat DISCOVERY_NODES_FORMAT = new ChecksumWritableBlobStoreFormat<>( + "nodes", + is -> DiscoveryNodes.readFrom(is, null) + ); private DiscoveryNodes discoveryNodes; private long stateVersion; @@ -87,20 +88,11 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - try (BytesStreamOutput outputStream = new BytesStreamOutput()) { - discoveryNodes.writeTo(outputStream); - return outputStream.bytes().streamInput(); - } catch (IOException e) { - throw new IOException("Failed to serialize remote discovery nodes", e); - } + return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor()).streamInput(); } @Override public DiscoveryNodes deserialize(final InputStream inputStream) throws IOException { - try (StreamInput streamInput = new BytesStreamInput(toBytes(Streams.readFully(inputStream)))) { - return DiscoveryNodes.readFrom(streamInput, null); - } catch (IOException e) { - throw new IOException("Failed to deserialize remote discovery nodes", e); - } + return DISCOVERY_NODES_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java index d343f8633131e..1debf75cdfec9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java @@ -10,20 +10,17 @@ import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.common.io.Streams; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; -import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; import java.io.IOException; import java.io.InputStream; import java.util.List; -import static org.opensearch.core.common.bytes.BytesReference.toBytes; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; @@ -33,6 +30,8 @@ */ public class RemoteHashesOfConsistentSettings extends AbstractRemoteWritableBlobEntity { public static final String HASHES_OF_CONSISTENT_SETTINGS = "hashes-of-consistent-settings"; + public static final ChecksumWritableBlobStoreFormat HASHES_OF_CONSISTENT_SETTINGS_FORMAT = + new ChecksumWritableBlobStoreFormat<>("hashes-of-consistent-settings", DiffableStringMap::readFrom); private DiffableStringMap hashesOfConsistentSettings; private long metadataVersion; @@ -84,21 +83,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { - hashesOfConsistentSettings.writeTo(bytesStreamOutput); - return bytesStreamOutput.bytes().streamInput(); - } catch (IOException e) { - throw new IOException("Failed to serialize hashes of consistent settings", e); - } - + return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor()) + .streamInput(); } @Override public DiffableStringMap deserialize(final InputStream inputStream) throws IOException { - try (StreamInput in = new BytesStreamInput(toBytes(Streams.readFully(inputStream)))) { - return DiffableStringMap.readFrom(in); - } catch (IOException e) { - throw new IOException("Failed to deserialize hashes of consistent settings", e); - } + return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); } } diff --git a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java index fe90f24b0f544..d3c6fc9d1f3bf 100644 --- a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java +++ b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java @@ -152,7 +152,11 @@ public RemoteRestoreResult restore( throw new IllegalArgumentException("clusterUUID to restore from should be different from current cluster UUID"); } logger.info("Restoring cluster state from remote store from cluster UUID : [{}]", restoreClusterUUID); - remoteState = remoteClusterStateService.getLatestClusterState(currentState.getClusterName().value(), restoreClusterUUID); + remoteState = remoteClusterStateService.getLatestClusterState( + currentState.getClusterName().value(), + restoreClusterUUID, + false + ); remoteState.getMetadata().getIndices().values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata)); }); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index afefb2f390636..397949525a3ec 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -769,7 +769,8 @@ protected Node( clusterService, threadPool::preciseRelativeTimeInNanos, threadPool, - List.of(remoteIndexPathUploader) + List.of(remoteIndexPathUploader), + namedWriteableRegistry ); remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager(); } else { diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java index d0c2cca4b46f0..39294ee8da41e 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java @@ -14,6 +14,9 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; import java.util.function.Supplier; @@ -23,13 +26,21 @@ public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase { Supplier repositoriesService; + private ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @After + public void teardown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } public void testGetServiceWhenRemoteRoutingDisabled() { Settings settings = Settings.builder().build(); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool ); assertTrue(service instanceof NoopRemoteRoutingTableService); } @@ -44,8 +55,10 @@ public void testGetServiceWhenRemoteRoutingEnabled() { RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool ); assertTrue(service instanceof InternalRemoteRoutingTableService); } + } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 9604f0bdaf8c4..cc31c1a6e8fd1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -18,6 +18,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; @@ -26,6 +27,7 @@ import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; @@ -34,6 +36,7 @@ import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteStateTransferException; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; @@ -41,14 +44,17 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; @@ -59,13 +65,14 @@ import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX; import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; +import static org.opensearch.gateway.remote.ClusterMetadataManifestTests.randomUploadedIndexMetadataList; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -83,6 +90,9 @@ public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private BlobStore blobStore; private BlobContainer blobContainer; private BlobPath basePath; + private ClusterSettings clusterSettings; + private ClusterService clusterService; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before public void setup() { @@ -92,8 +102,10 @@ public void setup() { Settings settings = Settings.builder() .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") - .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); blobStoreRepository = mock(BlobStoreRepository.class); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); blobStore = mock(BlobStore.class); @@ -108,14 +120,17 @@ public void setup() { remoteRoutingTableService = new InternalRemoteRoutingTableService( repositoriesServiceSupplier, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool ); + } @After public void teardown() throws Exception { super.tearDown(); - remoteRoutingTableService.close(); + remoteRoutingTableService.doClose(); + threadPool.shutdown(); } public void testFailInitializationWhenRemoteRoutingDisabled() { @@ -125,7 +140,8 @@ public void testFailInitializationWhenRemoteRoutingDisabled() { () -> new InternalRemoteRoutingTableService( repositoriesServiceSupplier, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool ) ); } @@ -171,6 +187,39 @@ public void testGetIndicesRoutingMapDiff() { assertEquals(0, diff.getDeletes().size()); } + public void testGetChangedIndicesRouting() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final Index index = new Index(indexName, "uuid"); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).routingTable(routingTable).build(); + + assertEquals( + 0, + remoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), state.getRoutingTable()).getUpserts().size() + ); + + // Reversing order to check for equality without order. + IndexRoutingTable indexRouting = routingTable.getIndicesRouting().get(indexName); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(indexRouting.getShards().get(0).replicaShards().get(0)) + .addShard(indexRouting.getShards().get(0).primaryShard()) + .build(); + ClusterState newState = ClusterState.builder(ClusterName.DEFAULT) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); + assertEquals( + 0, + remoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), newState.getRoutingTable()).getUpserts().size() + ); + } + public void testGetIndicesRoutingMapDiffIndexAdded() { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( @@ -531,6 +580,171 @@ public void testGetAllUploadedIndicesRoutingNoChange() { assertEquals(uploadedIndexMetadata2, allIndiceRoutingMetadata.get(1)); } + public void testIndicesRoutingDiffWhenIndexDeleted() { + + ClusterState state = createIndices(randomIntBetween(1, 100)); + RoutingTable routingTable = state.routingTable(); + + List allIndices = new ArrayList<>(); + routingTable.getIndicesRouting().forEach((k, v) -> allIndices.add(k)); + + String indexNameToDelete = allIndices.get(randomIntBetween(0, allIndices.size() - 1)); + RoutingTable updatedRoutingTable = RoutingTable.builder(routingTable).remove(indexNameToDelete).build(); + + assertEquals( + 1, + remoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().size() + ); + assertEquals( + indexNameToDelete, + remoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().get(0) + ); + } + + public void testIndicesRoutingDiffWhenIndexDeletedAndAdded() { + + ClusterState state = createIndices(randomIntBetween(1, 100)); + RoutingTable routingTable = state.routingTable(); + + List allIndices = new ArrayList<>(); + routingTable.getIndicesRouting().forEach((k, v) -> allIndices.add(k)); + + String indexNameToDelete = allIndices.get(randomIntBetween(0, allIndices.size() - 1)); + RoutingTable.Builder updatedRoutingTableBuilder = RoutingTable.builder(routingTable).remove(indexNameToDelete); + + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + RoutingTable updatedRoutingTable = updatedRoutingTableBuilder.addAsNew(indexMetadata).build(); + + assertEquals( + 1, + remoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().size() + ); + assertEquals( + indexNameToDelete, + remoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().get(0) + ); + + assertEquals( + 1, + remoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getUpserts().size() + ); + assertTrue( + remoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable) + .getUpserts() + .containsKey(indexName) + ); + } + + public void testGetAsyncIndexMetadataReadAction() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + ClusterState clusterState = createClusterState(indexName); + String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); + Index index = new Index(indexName, "uuid-01"); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + BytesStreamOutput streamOutput = new BytesStreamOutput(); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( + clusterState.routingTable().getIndicesRouting().get(indexName) + ); + remoteIndexRoutingTable.writeTo(streamOutput); + when(blobContainer.readBlob(indexName)).thenReturn(streamOutput.bytes().streamInput()); + remoteRoutingTableService.start(); + + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); + assertNotNull(runnable); + runnable.run(); + + assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); + assertBusy(() -> verify(listener, times(1)).onResponse(any(IndexRoutingTable.class))); + } + + public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + ClusterState clusterState = createClusterState(indexName); + String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); + Index index = new Index("incorrect-index", "uuid-01"); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + BytesStreamOutput streamOutput = new BytesStreamOutput(); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( + clusterState.routingTable().getIndicesRouting().get(indexName) + ); + remoteIndexRoutingTable.writeTo(streamOutput); + when(blobContainer.readBlob(anyString())).thenReturn(streamOutput.bytes().streamInput()); + remoteRoutingTableService.doStart(); + + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); + assertNotNull(runnable); + runnable.run(); + + assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); + assertBusy(() -> verify(listener, times(1)).onFailure(any(Exception.class))); + } + + public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); + Index index = new Index(indexName, "uuid-01"); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + doThrow(new IOException("testing failure")).when(blobContainer).readBlob(indexName); + remoteRoutingTableService.doStart(); + + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); + assertNotNull(runnable); + runnable.run(); + + assertBusy(() -> verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class))); + } + + public void testGetUpdatedIndexRoutingTableMetadataWhenNoChange() { + List updatedIndicesRouting = new ArrayList<>(); + List indicesRouting = randomUploadedIndexMetadataList(); + List updatedIndexMetadata = remoteRoutingTableService + .getUpdatedIndexRoutingTableMetadata(updatedIndicesRouting, indicesRouting); + assertEquals(0, updatedIndexMetadata.size()); + } + + public void testGetUpdatedIndexRoutingTableMetadataWhenIndexIsUpdated() { + List updatedIndicesRouting = new ArrayList<>(); + List indicesRouting = randomUploadedIndexMetadataList(); + ClusterMetadataManifest.UploadedIndexMetadata expectedIndexRouting = indicesRouting.get( + randomIntBetween(0, indicesRouting.size() - 1) + ); + updatedIndicesRouting.add(expectedIndexRouting.getIndexName()); + List updatedIndexMetadata = remoteRoutingTableService + .getUpdatedIndexRoutingTableMetadata(updatedIndicesRouting, indicesRouting); + assertEquals(1, updatedIndexMetadata.size()); + assertEquals(expectedIndexRouting, updatedIndexMetadata.get(0)); + } + + private ClusterState createIndices(int numberOfIndices) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + for (int i = 0; i < numberOfIndices; i++) { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final Index index = new Index(indexName, "uuid"); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + routingTableBuilder.addAsNew(indexMetadata); + } + return ClusterState.builder(ClusterName.DEFAULT).routingTable(routingTableBuilder.build()).build(); + } + private ClusterState createClusterState(String indexName) { final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( Settings.builder() diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 7b113961fc2c7..8e8d80c870ddf 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -489,7 +489,8 @@ public void testDataOnlyNodePersistence() throws Exception { clusterService, () -> 0L, threadPool, - List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) + List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)), + writableRegistry() ); } else { return null; diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 6ae92542c7923..02471c9cdbbbe 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -593,7 +593,7 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc } } - private List randomUploadedIndexMetadataList() { + public static List randomUploadedIndexMetadataList() { final int size = randomIntBetween(1, 10); final List uploadedIndexMetadataList = new ArrayList<>(size); while (uploadedIndexMetadataList.size() < size) { @@ -602,7 +602,7 @@ private List randomUploadedIndexMetadataList() { return uploadedIndexMetadataList; } - private UploadedIndexMetadata randomUploadedIndexMetadata() { + private static UploadedIndexMetadata randomUploadedIndexMetadata() { return new UploadedIndexMetadata(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10)); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index c22b325990a41..feae97bae48e9 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -55,7 +55,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.TestCustomMetadata; @@ -95,7 +95,6 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName; import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; @@ -182,7 +181,8 @@ public void setup() { clusterService, () -> 0L, threadPool, - List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) + List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)), + writableRegistry() ); } @@ -214,7 +214,8 @@ public void testFailInitializationWhenRemoteStateDisabled() { clusterService, () -> 0L, threadPool, - List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) + List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)), + writableRegistry() ) ); } @@ -309,8 +310,8 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); - assertEquals(7, actionListenerArgumentCaptor.getAllValues().size()); - assertEquals(7, writeContextArgumentCaptor.getAllValues().size()); + assertEquals(11, actionListenerArgumentCaptor.getAllValues().size()); + assertEquals(11, writeContextArgumentCaptor.getAllValues().size()); byte[] writtenBytes = capturedWriteContext.get("metadata") .getStreamProvider(Integer.MAX_VALUE) @@ -597,6 +598,7 @@ public void testCoordinationMetadataOnlyUpdated() throws IOException { Function updater = (initialClusterState) -> ClusterState.builder(initialClusterState) .metadata( Metadata.builder(initialClusterState.metadata()) + .version(initialClusterState.metadata().version() + 1) .coordinationMetadata( CoordinationMetadata.builder(initialClusterState.coordinationMetadata()) .addVotingConfigExclusion(new CoordinationMetadata.VotingConfigExclusion("excludedNodeId", "excludedNodeName")) @@ -989,10 +991,11 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE remoteClusterStateService.start(); assertEquals( - remoteClusterStateService.getLatestClusterState(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID()) - .getMetadata() - .getIndices() - .size(), + remoteClusterStateService.getLatestClusterState( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID(), + false + ).getMetadata().getIndices().size(), 0 ); } @@ -1018,13 +1021,14 @@ public void testReadLatestMetadataManifestSuccessButIndexMetadataFetchIOExceptio remoteClusterStateService.start(); Exception e = assertThrows( - IllegalStateException.class, + RemoteStateTransferException.class, () -> remoteClusterStateService.getLatestClusterState( clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() + clusterState.metadata().clusterUUID(), + false ).getMetadata().getIndices() ); - assertEquals(e.getMessage(), "Error while downloading IndexMetadata - " + uploadedIndexMetadata.getUploadedFilename()); + assertEquals("Exception during reading cluster state from remote", e.getMessage()); } public void testReadLatestMetadataManifestSuccess() throws IOException { @@ -1091,7 +1095,8 @@ public void testReadGlobalMetadata() throws IOException { ClusterState newClusterState = remoteClusterStateService.getLatestClusterState( clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() + clusterState.metadata().clusterUUID(), + false ); assertTrue(Metadata.isGlobalStateEquals(newClusterState.getMetadata(), expectedMetadata)); @@ -1134,7 +1139,8 @@ public void testReadGlobalMetadataIOException() throws IOException { IllegalStateException.class, () -> remoteClusterStateService.getLatestClusterState( clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() + clusterState.metadata().clusterUUID(), + false ) ); assertEquals(e.getMessage(), "Error while downloading Global Metadata - " + globalIndexMetadataName); @@ -1173,7 +1179,8 @@ public void testReadLatestIndexMetadataSuccess() throws IOException { Map indexMetadataMap = remoteClusterStateService.getLatestClusterState( clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() + clusterState.metadata().clusterUUID(), + false ).getMetadata().getIndices(); assertEquals(indexMetadataMap.size(), 1); @@ -1327,7 +1334,8 @@ public void testRemoteRoutingTableInitializedWhenEnabled() { clusterService, () -> 0L, threadPool, - List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)) + List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)), + writableRegistry() ); assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService); } @@ -1425,8 +1433,8 @@ public void testWriteFullMetadataInParallelSuccessWithRoutingTable() throws IOEx assertThat(manifest.getIndicesRouting().get(0).getIndexUUID(), is(uploadedIndiceRoutingMetadata.getIndexUUID())); assertThat(manifest.getIndicesRouting().get(0).getUploadedFilename(), notNullValue()); - assertEquals(8, actionListenerArgumentCaptor.getAllValues().size()); - assertEquals(8, writeContextArgumentCaptor.getAllValues().size()); + assertEquals(12, actionListenerArgumentCaptor.getAllValues().size()); + assertEquals(12, writeContextArgumentCaptor.getAllValues().size()); } public void testWriteIncrementalMetadataSuccessWithRoutingTable() throws IOException { @@ -1493,7 +1501,8 @@ private void initializeRoutingTable() { clusterService, () -> 0L, threadPool, - List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)) + List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)), + writableRegistry() ); } @@ -1852,20 +1861,21 @@ private void mockBlobContainerForGlobalMetadata( .stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> getFileNameFromPath(entry.getValue().getUploadedFilename()))); - ChecksumBlobStoreFormat customMetadataFormat = new ChecksumBlobStoreFormat<>( - "custom", - METADATA_NAME_PLAIN_FORMAT, - null - ); + // ChecksumBlobStoreFormat customMetadataFormat = new ChecksumBlobStoreFormat<>( + // "custom", + // METADATA_NAME_PLAIN_FORMAT, + // null + // ); + + ChecksumWritableBlobStoreFormat customMetadataFormat = new ChecksumWritableBlobStoreFormat<>("custom", null); for (Map.Entry entry : customFileMap.entrySet()) { String custom = entry.getKey(); String fileName = entry.getValue(); - when(blobContainer.readBlob(customMetadataFormat.blobName(fileName))).thenAnswer((invocation) -> { + when(blobContainer.readBlob(fileName)).thenAnswer((invocation) -> { BytesReference bytesReference = customMetadataFormat.serialize( metadata.custom(custom), fileName, - blobStoreRepository.getCompressor(), - FORMAT_PARAMS + blobStoreRepository.getCompressor() ); return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); }); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index 07d2c9a40185b..f24f8ddeb1959 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -57,6 +57,7 @@ public void setup() { "test-cluster", blobStoreRepository, blobStoreTransferService, + writableRegistry(), threadPool ); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java index a0c60ee2088b8..1bce176273270 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java @@ -8,24 +8,21 @@ package org.opensearch.gateway.remote.model; -import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.Metadata.Custom; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.compress.DeflateCompressor; -import org.opensearch.common.network.NetworkModule; import org.opensearch.common.remote.BlobPathParameters; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.index.Index; -import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; -import org.opensearch.indices.IndicesModule; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -36,10 +33,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; -import java.util.function.Function; -import java.util.stream.Stream; -import static java.util.stream.Collectors.toList; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; @@ -62,7 +56,7 @@ public class RemoteCustomMetadataTests extends OpenSearchTestCase { private String clusterName; private ClusterSettings clusterSettings; private Compressor compressor; - private NamedXContentRegistry namedXContentRegistry; + private NamedWriteableRegistry namedWriteableRegistry; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -75,15 +69,7 @@ public void setup() { when(blobStoreRepository.basePath()).thenReturn(blobPath); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); compressor = new NoneCompressor(); - namedXContentRegistry = new NamedXContentRegistry( - Stream.of( - NetworkModule.getNamedXContents().stream(), - IndicesModule.getNamedXContents().stream(), - ClusterModule.getNamedXWriteables().stream() - ).flatMap(Function.identity()).collect(toList()) - ); - // namedXContentRegistry = new NamedXContentRegistry(List.of(new Entry(Metadata.Custom.class, new ParseField(CUSTOM_TYPE), - // p->TestCustomMetadata.fromXContent(CustomMetadata1::new, p)))); + namedWriteableRegistry = writableRegistry(); this.clusterName = "test-cluster-name"; } @@ -101,7 +87,7 @@ public void testClusterUUID() { METADATA_VERSION, clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); assertThat(remoteObjectForUpload.clusterUUID(), is(clusterUUID)); @@ -110,7 +96,7 @@ public void testClusterUUID() { "test-custom", clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); } @@ -123,7 +109,7 @@ public void testFullBlobName() { METADATA_VERSION, clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); @@ -132,7 +118,7 @@ public void testFullBlobName() { "test-custom", clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); } @@ -145,7 +131,7 @@ public void testBlobFileName() { METADATA_VERSION, clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); @@ -154,7 +140,7 @@ public void testBlobFileName() { "test-custom", clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); } @@ -166,7 +152,7 @@ public void testBlobPathTokens() { "test-custom", clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "customMetadata" })); } @@ -179,7 +165,7 @@ public void testBlobPathParameters() { METADATA_VERSION, clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); assertThat(params.getPathTokens(), is(List.of(RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN))); @@ -195,7 +181,7 @@ public void testGenerateBlobFileName() { METADATA_VERSION, clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); String blobFileName = remoteObjectForUpload.generateBlobFileName(); String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); @@ -215,7 +201,7 @@ public void testGetUploadedMetadata() throws IOException { METADATA_VERSION, clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); @@ -236,7 +222,7 @@ public void testSerDe() throws IOException { METADATA_VERSION, clusterUUID, compressor, - namedXContentRegistry + namedWriteableRegistry ); try (InputStream inputStream = remoteObjectForUpload.serialize()) { remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java index 0c46960938798..b9b6e02e8274f 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java @@ -145,7 +145,6 @@ public void testExceptionDuringSerialization() throws IOException { RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor); doThrow(new IOException("mock-exception")).when(nodes).writeTo(any()); IOException iea = assertThrows(IOException.class, remoteObjectForUpload::serialize); - assertEquals("Failed to serialize remote discovery nodes", iea.getMessage()); } public void testExceptionDuringDeserialize() throws IOException { @@ -155,7 +154,6 @@ public void testExceptionDuringDeserialize() throws IOException { String uploadedFile = "user/local/opensearch/discovery-nodes"; RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(uploadedFile, clusterUUID, compressor); IOException ioe = assertThrows(IOException.class, () -> remoteObjectForDownload.deserialize(in)); - assertEquals("Failed to deserialize remote discovery nodes", ioe.getMessage()); } private DiscoveryNodes getDiscoveryNodes() {