Skip to content

Commit

Permalink
[Remote Store] Add header(codec) and footer(checksum) to remote store…
Browse files Browse the repository at this point in the history
… segment metadata (#5917) (#6406)

(cherry picked from commit 31434a3)

Signed-off-by: Varun Bansal <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] authored Feb 21, 2023
1 parent 8a16596 commit 8b3409e
Show file tree
Hide file tree
Showing 9 changed files with 717 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.io;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;

import java.io.IOException;

/**
* Interface for reading/writing content streams to/from - {@link T}
* @param <T> The type of content to be read/written to stream
*
* @opensearch.internal
*/
public interface IndexIOStreamHandler<T> {
/**
* Implements logic to read content from file input stream {@code indexInput} and parse into {@link T}
* @param indexInput file input stream
* @return content parsed to {@link T}
*/
T readContent(IndexInput indexInput) throws IOException;

/**
* Implements logic to write content from {@code content} to file output stream {@code indexOutput}
* @param indexOutput file input stream
*/
void writeContent(IndexOutput indexOutput, T content) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.io;

import java.io.IOException;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;

/**
* Manages versioning and checksum for a stream of content.
* @param <T> Type of content to be read/written
*
* @opensearch.internal
*/
public class VersionedCodecStreamWrapper<T> {
// TODO This can be updated to hold a streamReadWriteHandlerFactory and get relevant handler based on the stream versions
private final IndexIOStreamHandler<T> indexIOStreamHandler;
private final int currentVersion;
private final String codec;

/**
* @param indexIOStreamHandler handler to read/write stream from T
* @param currentVersion latest supported version of the stream
* @param codec: stream codec
*/
public VersionedCodecStreamWrapper(IndexIOStreamHandler<T> indexIOStreamHandler, int currentVersion, String codec) {
this.indexIOStreamHandler = indexIOStreamHandler;
this.currentVersion = currentVersion;
this.codec = codec;
}

/**
* Reads stream content from {@code indexInput} and parses the read content to {@link T}.
* Before reading actual content, verifies the header with relevant codec and version.
* After reading the actual content, verifies the checksum as well
* @param indexInput file input stream
* @return stream content parsed into {@link T}
*/
public T readStream(IndexInput indexInput) throws IOException {
ChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(indexInput);
int readStreamVersion = checkHeader(checksumIndexInput);
T content = getHandlerForVersion(readStreamVersion).readContent(checksumIndexInput);
checkFooter(checksumIndexInput);
return content;
}

/**
* Writes to file output stream {@code indexOutput}
* @param indexOutput file output stream which will store stream content
* @param content stream content.
*/
public void writeStream(IndexOutput indexOutput, T content) throws IOException {
this.writeHeader(indexOutput);
getHandlerForVersion(this.currentVersion).writeContent(indexOutput, content);
this.writeFooter(indexOutput);
}

/**
* Reads header from file input stream containing {@code this.codec} and {@code this.currentVersion}.
* @param indexInput file input stream
* @return header version found in the input stream
*/
private int checkHeader(IndexInput indexInput) throws IOException {
// TODO Once versioning strategy is decided we'll add support for min/max supported versions
return CodecUtil.checkHeader(indexInput, this.codec, this.currentVersion, this.currentVersion);
}

/**
* Reads footer from file input stream containing checksum.
* The {@link IndexInput#getFilePointer()} should be at the footer start position.
* @param indexInput file input stream
*/
private void checkFooter(ChecksumIndexInput indexInput) throws IOException {
CodecUtil.checkFooter(indexInput);
}

/**
* Writes header with {@code this.codec} and {@code this.currentVersion} to the file output stream
* @param indexOutput file output stream
*/
private void writeHeader(IndexOutput indexOutput) throws IOException {
CodecUtil.writeHeader(indexOutput, this.codec, this.currentVersion);
}

/**
* Writes footer with checksum of contents of file output stream
* @param indexOutput file output stream
*/
private void writeFooter(IndexOutput indexOutput) throws IOException {
CodecUtil.writeFooter(indexOutput);
}

/**
* Returns relevant handler for the version
* @param version stream content version
*/
private IndexIOStreamHandler<T> getHandlerForVersion(int version) {
// TODO implement factory and pick relevant handler based on version.
// It should also take into account min and max supported versions
return this.indexIOStreamHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.UUIDs;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
Expand Down Expand Up @@ -75,6 +78,12 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory {
*/
private Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore;

private static final VersionedCodecStreamWrapper<RemoteSegmentMetadata> metadataStreamWrapper = new VersionedCodecStreamWrapper<>(
new RemoteSegmentMetadataHandler(),
RemoteSegmentMetadata.CURRENT_VERSION,
RemoteSegmentMetadata.METADATA_CODEC
);

private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class);

public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory) throws IOException {
Expand Down Expand Up @@ -126,10 +135,8 @@ private Map<String, UploadedSegmentMetadata> readLatestMetadataFile() throws IOE

private Map<String, UploadedSegmentMetadata> readMetadataFile(String metadataFilename) throws IOException {
try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) {
Map<String, String> segmentMetadata = indexInput.readMapOfStrings();
return segmentMetadata.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> UploadedSegmentMetadata.fromString(entry.getValue())));
RemoteSegmentMetadata metadata = metadataStreamWrapper.readStream(indexInput);
return metadata.getMetadata();
}
}

Expand All @@ -139,6 +146,7 @@ private Map<String, UploadedSegmentMetadata> readMetadataFile(String metadataFil
public static class UploadedSegmentMetadata {
// Visible for testing
static final String SEPARATOR = "::";

private final String originalFilename;
private final String uploadedFilename;
private final String checksum;
Expand Down Expand Up @@ -353,7 +361,7 @@ public void uploadMetadata(Collection<String> segmentFiles, Directory storeDirec
throw new NoSuchFileException(file);
}
}
indexOutput.writeMapOfStrings(uploadedSegments);
metadataStreamWrapper.writeStream(indexOutput, RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments));
indexOutput.close();
storeDirectory.sync(Collections.singleton(metadataFilename));
remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.metadata;

import java.util.Map;
import java.util.stream.Collectors;

import org.opensearch.index.store.RemoteSegmentStoreDirectory;

/**
* Metadata object for Remote Segment
*
* @opensearch.internal
*/
public class RemoteSegmentMetadata {
/**
* Latest supported version of metadata
*/
public static final int CURRENT_VERSION = 1;
/**
* Metadata codec
*/
public static final String METADATA_CODEC = "segment_md";

/**
* Data structure holding metadata content
*/
private final Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> metadata;

public RemoteSegmentMetadata(Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> metadata) {
this.metadata = metadata;
}

/**
* Exposes underlying metadata content data structure.
* @return {@code metadata}
*/
public Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> getMetadata() {
return this.metadata;
}

/**
* Generate {@code Map<String, String>} from {@link RemoteSegmentMetadata}
* @return {@code Map<String, String>}
*/
public Map<String, String> toMapOfStrings() {
return this.metadata.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toString()));
}

/**
* Generate {@link RemoteSegmentMetadata} from {@code segmentMetadata}
* @param segmentMetadata metadata content in the form of {@code Map<String, String>}
* @return {@link RemoteSegmentMetadata}
*/
public static RemoteSegmentMetadata fromMapOfStrings(Map<String, String> segmentMetadata) {
return new RemoteSegmentMetadata(
segmentMetadata.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString(entry.getValue())
)
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.metadata;

import java.io.IOException;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.io.IndexIOStreamHandler;

/**
* Handler for {@link RemoteSegmentMetadata}
*
* @opensearch.internal
*/
public class RemoteSegmentMetadataHandler implements IndexIOStreamHandler<RemoteSegmentMetadata> {
/**
* Reads metadata content from metadata file input stream and parsed into {@link RemoteSegmentMetadata}
* @param indexInput metadata file input stream with {@link IndexInput#getFilePointer()} pointing to metadata content
* @return {@link RemoteSegmentMetadata}
*/
@Override
public RemoteSegmentMetadata readContent(IndexInput indexInput) throws IOException {
return RemoteSegmentMetadata.fromMapOfStrings(indexInput.readMapOfStrings());
}

/**
* Writes metadata to file output stream
* @param indexOutput metadata file input stream
* @param content {@link RemoteSegmentMetadata} from which metadata content would be generated
*/
@Override
public void writeContent(IndexOutput indexOutput, RemoteSegmentMetadata content) throws IOException {
indexOutput.writeMapOfStrings(content.toMapOfStrings());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Package containing metadata constructs for Remote Segment store
*/
package org.opensearch.index.store.remote.metadata;
Loading

0 comments on commit 8b3409e

Please sign in to comment.