Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Download functionality of global metadata from remote store #10733

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10696](https://github.com/opensearch-project/OpenSearch/pull/10696))
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public void testFullClusterRestoreStaleDelete() throws Exception {

assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata(
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestMetadata(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
);
).getIndices();
assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,18 +662,18 @@ private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @param clusterMetadataManifest manifest file of cluster
* @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map
*/
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException {
start();
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
throw new IllegalStateException("Latest index metadata is not present for the provided clusterUUID");
}
assert Objects.equals(clusterUUID, clusterMetadataManifest.get().getClusterUUID())
private Map<String, IndexMetadata> getIndexMetadataMap(
String clusterName,
String clusterUUID,
ClusterMetadataManifest clusterMetadataManifest
) {
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
: "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch.";
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.get().getIndices()) {
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) {
IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata);
remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata);
}
Expand Down Expand Up @@ -704,6 +704,52 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U
}
}

/**
* Fetch latest metadata from remote cluster state including global metadata and index metadata
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return {@link IndexMetadata}
*/
public Metadata getLatestMetadata(String clusterName, String clusterUUID) throws IOException {
start();
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Latest cluster metadata manifest is not present for the provided clusterUUID: %s", clusterUUID)
);
}
// Fetch Global Metadata
Metadata globalMetadata = getGlobalMetadata(clusterName, clusterUUID, clusterMetadataManifest.get());

// Fetch Index Metadata
Map<String, IndexMetadata> indices = getIndexMetadataMap(clusterName, clusterUUID, clusterMetadataManifest.get());

return Metadata.builder(globalMetadata).indices(indices).build();
}

private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) {
String globalMetadataFileName = clusterMetadataManifest.getGlobalMetadataFileName();
try {
// Fetch Global metadata
if (globalMetadataFileName != null) {
String[] splitPath = globalMetadataFileName.split("/");
return GLOBAL_METADATA_FORMAT.read(
globalMetadataContainer(clusterName, clusterUUID),
splitPath[splitPath.length - 1],
blobStoreRepository.getNamedXContentRegistry()
);
} else {
return Metadata.EMPTY_METADATA;
}
} catch (IOException e) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Error while downloading Global Metadata - %s", globalMetadataFileName),
e
);
}
}

/**
* Fetch latest ClusterMetadataManifest from remote state store
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public RemoteRestoreResult restore(
|| restoreClusterUUID.isBlank()) == false;
if (metadataFromRemoteStore) {
try {
remoteClusterStateService.getLatestIndexMetadata(currentState.getClusterName().value(), restoreClusterUUID)
remoteClusterStateService.getLatestMetadata(currentState.getClusterName().value(), restoreClusterUUID)
.getIndices()
.values()
.forEach(indexMetadata -> {
indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.IndexGraveyard;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand All @@ -28,6 +29,7 @@
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.ParseField;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
Expand Down Expand Up @@ -634,7 +636,8 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE

remoteClusterStateService.start();
assertEquals(
remoteClusterStateService.getLatestIndexMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
.getIndices()
.size(),
0
);
Expand Down Expand Up @@ -662,10 +665,8 @@ public void testReadLatestMetadataManifestSuccessButIndexMetadataFetchIOExceptio
remoteClusterStateService.start();
Exception e = assertThrows(
IllegalStateException.class,
() -> remoteClusterStateService.getLatestIndexMetadata(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
)
() -> remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
.getIndices()
);
assertEquals(e.getMessage(), "Error while downloading IndexMetadata - " + uploadedIndexMetadata.getUploadedFilename());
}
Expand Down Expand Up @@ -704,6 +705,70 @@ public void testReadLatestMetadataManifestSuccess() throws IOException {
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
}

public void testReadGlobalMetadata() throws IOException {
when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(new NamedXContentRegistry(
List.of(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(IndexGraveyard.TYPE), IndexGraveyard::fromXContent))));
final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();

final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID("state-uuid")
.clusterUUID("cluster-uuid")
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
.globalMetadataFileName("global-metadata-file")
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.build();

Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build();
mockBlobContainerForGlobalMetadata(mockBlobStoreObjects(), expectedManifest, expactedMetadata);

Metadata metadata = remoteClusterStateService.getLatestMetadata(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);

assertTrue(Metadata.isGlobalStateEquals(metadata, expactedMetadata));
}

public void testReadGlobalMetadataIOException() throws IOException {
final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();
String globalIndexMetadataName = "global-metadata-file";
final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID("state-uuid")
.clusterUUID("cluster-uuid")
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
.globalMetadataFileName(globalIndexMetadataName)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.build();

Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build();

BlobContainer blobContainer = mockBlobStoreObjects();
mockBlobContainerForGlobalMetadata(blobContainer, expectedManifest, expactedMetadata);

when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(globalIndexMetadataName))).thenThrow(
FileNotFoundException.class
);

remoteClusterStateService.start();
Exception e = assertThrows(
IllegalStateException.class,
() -> remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
);
assertEquals(e.getMessage(), "Error while downloading Global Metadata - " + globalIndexMetadataName);
}

public void testReadLatestIndexMetadataSuccess() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();
Expand All @@ -730,15 +795,16 @@ public void testReadLatestIndexMetadataSuccess() throws IOException {
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.globalMetadataFileName("global-metadata-file")
.codecVersion(ClusterMetadataManifest.CODEC_V0)
.build();

mockBlobContainer(mockBlobStoreObjects(), expectedManifest, Map.of(index.getUUID(), indexMetadata));

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata(
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestMetadata(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);
).getIndices();

assertEquals(indexMetadataMap.size(), 1);
assertEquals(indexMetadataMap.get(index.getUUID()).getIndex().getName(), index.getName());
Expand Down Expand Up @@ -1114,6 +1180,41 @@ private void mockBlobContainer(
});
}

private void mockBlobContainerForGlobalMetadata(
BlobContainer blobContainer,
ClusterMetadataManifest clusterMetadataManifest,
Metadata metadata
) throws IOException {
String mockManifestFileName = "manifest__1__2__C__456__1";
BlobMetadata blobMetadata = new PlainBlobMetadata(mockManifestFileName, 1);
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(Arrays.asList(blobMetadata));

BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize(
clusterMetadataManifest,
mockManifestFileName,
blobStoreRepository.getCompressor(),
FORMAT_PARAMS
);
when(blobContainer.readBlob(mockManifestFileName)).thenReturn(new ByteArrayInputStream(bytes.streamInput().readAllBytes()));

BytesReference bytesGlobalMetadata = RemoteClusterStateService.GLOBAL_METADATA_FORMAT.serialize(
metadata,
"global-metadata-file",
blobStoreRepository.getCompressor(),
FORMAT_PARAMS
);
String[] splitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/");
when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(splitPath[splitPath.length - 1]))).thenReturn(
new ByteArrayInputStream(bytesGlobalMetadata.streamInput().readAllBytes())
);
}

private static ClusterState.Builder generateClusterStateWithGlobalMetadata() {
final Settings clusterSettings = Settings.builder().put("cluster.blocks.read_only", true).build();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
Expand Down
Loading