From 8b3409e7c8da21700dca803ab926c20d333512c3 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 21 Feb 2023 20:18:46 +0530 Subject: [PATCH] [Remote Store] Add header(codec) and footer(checksum) to remote store segment metadata (#5917) (#6406) (cherry picked from commit 31434a37f366f20afafb3aeba204908a586f1761) Signed-off-by: Varun Bansal --- .../common/io/IndexIOStreamHandler.java | 35 +++ .../io/VersionedCodecStreamWrapper.java | 112 +++++++++ .../store/RemoteSegmentStoreDirectory.java | 18 +- .../metadata/RemoteSegmentMetadata.java | 73 ++++++ .../RemoteSegmentMetadataHandler.java | 42 ++++ .../store/remote/metadata/package-info.java | 12 + .../io/VersionedCodecStreamWrapperTests.java | 141 +++++++++++ .../RemoteSegmentStoreDirectoryTests.java | 237 ++++++++++++++++-- .../RemoteSegmentMetadataHandlerTests.java | 70 ++++++ 9 files changed, 717 insertions(+), 23 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/io/IndexIOStreamHandler.java create mode 100644 server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java create mode 100644 server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java create mode 100644 server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java create mode 100644 server/src/main/java/org/opensearch/index/store/remote/metadata/package-info.java create mode 100644 server/src/test/java/org/opensearch/common/io/VersionedCodecStreamWrapperTests.java create mode 100644 server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java diff --git a/server/src/main/java/org/opensearch/common/io/IndexIOStreamHandler.java b/server/src/main/java/org/opensearch/common/io/IndexIOStreamHandler.java new file mode 100644 index 0000000000000..34e0c0fea2093 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/io/IndexIOStreamHandler.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; + +/** + * Interface for reading/writing content streams to/from - {@link T} + * @param The type of content to be read/written to stream + * + * @opensearch.internal + */ +public interface IndexIOStreamHandler { + /** + * Implements logic to read content from file input stream {@code indexInput} and parse into {@link T} + * @param indexInput file input stream + * @return content parsed to {@link T} + */ + T readContent(IndexInput indexInput) throws IOException; + + /** + * Implements logic to write content from {@code content} to file output stream {@code indexOutput} + * @param indexOutput file input stream + */ + void writeContent(IndexOutput indexOutput, T content) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java b/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java new file mode 100644 index 0000000000000..9a1a951b0796e --- /dev/null +++ b/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io; + +import java.io.IOException; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; + +/** + * Manages versioning and checksum for a stream of content. + * @param Type of content to be read/written + * + * @opensearch.internal + */ +public class VersionedCodecStreamWrapper { + // TODO This can be updated to hold a streamReadWriteHandlerFactory and get relevant handler based on the stream versions + private final IndexIOStreamHandler indexIOStreamHandler; + private final int currentVersion; + private final String codec; + + /** + * @param indexIOStreamHandler handler to read/write stream from T + * @param currentVersion latest supported version of the stream + * @param codec: stream codec + */ + public VersionedCodecStreamWrapper(IndexIOStreamHandler indexIOStreamHandler, int currentVersion, String codec) { + this.indexIOStreamHandler = indexIOStreamHandler; + this.currentVersion = currentVersion; + this.codec = codec; + } + + /** + * Reads stream content from {@code indexInput} and parses the read content to {@link T}. + * Before reading actual content, verifies the header with relevant codec and version. + * After reading the actual content, verifies the checksum as well + * @param indexInput file input stream + * @return stream content parsed into {@link T} + */ + public T readStream(IndexInput indexInput) throws IOException { + ChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(indexInput); + int readStreamVersion = checkHeader(checksumIndexInput); + T content = getHandlerForVersion(readStreamVersion).readContent(checksumIndexInput); + checkFooter(checksumIndexInput); + return content; + } + + /** + * Writes to file output stream {@code indexOutput} + * @param indexOutput file output stream which will store stream content + * @param content stream content. + */ + public void writeStream(IndexOutput indexOutput, T content) throws IOException { + this.writeHeader(indexOutput); + getHandlerForVersion(this.currentVersion).writeContent(indexOutput, content); + this.writeFooter(indexOutput); + } + + /** + * Reads header from file input stream containing {@code this.codec} and {@code this.currentVersion}. + * @param indexInput file input stream + * @return header version found in the input stream + */ + private int checkHeader(IndexInput indexInput) throws IOException { + // TODO Once versioning strategy is decided we'll add support for min/max supported versions + return CodecUtil.checkHeader(indexInput, this.codec, this.currentVersion, this.currentVersion); + } + + /** + * Reads footer from file input stream containing checksum. + * The {@link IndexInput#getFilePointer()} should be at the footer start position. + * @param indexInput file input stream + */ + private void checkFooter(ChecksumIndexInput indexInput) throws IOException { + CodecUtil.checkFooter(indexInput); + } + + /** + * Writes header with {@code this.codec} and {@code this.currentVersion} to the file output stream + * @param indexOutput file output stream + */ + private void writeHeader(IndexOutput indexOutput) throws IOException { + CodecUtil.writeHeader(indexOutput, this.codec, this.currentVersion); + } + + /** + * Writes footer with checksum of contents of file output stream + * @param indexOutput file output stream + */ + private void writeFooter(IndexOutput indexOutput) throws IOException { + CodecUtil.writeFooter(indexOutput); + } + + /** + * Returns relevant handler for the version + * @param version stream content version + */ + private IndexIOStreamHandler getHandlerForVersion(int version) { + // TODO implement factory and pick relevant handler based on version. + // It should also take into account min and max supported versions + return this.indexIOStreamHandler; + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 3b36d3777a712..a0bd32403bd39 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -17,6 +17,9 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.common.UUIDs; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; +import org.opensearch.common.io.VersionedCodecStreamWrapper; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; import java.io.IOException; import java.nio.file.NoSuchFileException; @@ -75,6 +78,12 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory { */ private Map segmentsUploadedToRemoteStore; + private static final VersionedCodecStreamWrapper metadataStreamWrapper = new VersionedCodecStreamWrapper<>( + new RemoteSegmentMetadataHandler(), + RemoteSegmentMetadata.CURRENT_VERSION, + RemoteSegmentMetadata.METADATA_CODEC + ); + private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class); public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory) throws IOException { @@ -126,10 +135,8 @@ private Map readLatestMetadataFile() throws IOE private Map readMetadataFile(String metadataFilename) throws IOException { try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) { - Map segmentMetadata = indexInput.readMapOfStrings(); - return segmentMetadata.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> UploadedSegmentMetadata.fromString(entry.getValue()))); + RemoteSegmentMetadata metadata = metadataStreamWrapper.readStream(indexInput); + return metadata.getMetadata(); } } @@ -139,6 +146,7 @@ private Map readMetadataFile(String metadataFil public static class UploadedSegmentMetadata { // Visible for testing static final String SEPARATOR = "::"; + private final String originalFilename; private final String uploadedFilename; private final String checksum; @@ -353,7 +361,7 @@ public void uploadMetadata(Collection segmentFiles, Directory storeDirec throw new NoSuchFileException(file); } } - indexOutput.writeMapOfStrings(uploadedSegments); + metadataStreamWrapper.writeStream(indexOutput, RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments)); indexOutput.close(); storeDirectory.sync(Collections.singleton(metadataFilename)); remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT); diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java new file mode 100644 index 0000000000000..a2d12b0bd0e67 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.metadata; + +import java.util.Map; +import java.util.stream.Collectors; + +import org.opensearch.index.store.RemoteSegmentStoreDirectory; + +/** + * Metadata object for Remote Segment + * + * @opensearch.internal + */ +public class RemoteSegmentMetadata { + /** + * Latest supported version of metadata + */ + public static final int CURRENT_VERSION = 1; + /** + * Metadata codec + */ + public static final String METADATA_CODEC = "segment_md"; + + /** + * Data structure holding metadata content + */ + private final Map metadata; + + public RemoteSegmentMetadata(Map metadata) { + this.metadata = metadata; + } + + /** + * Exposes underlying metadata content data structure. + * @return {@code metadata} + */ + public Map getMetadata() { + return this.metadata; + } + + /** + * Generate {@code Map} from {@link RemoteSegmentMetadata} + * @return {@code Map} + */ + public Map toMapOfStrings() { + return this.metadata.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toString())); + } + + /** + * Generate {@link RemoteSegmentMetadata} from {@code segmentMetadata} + * @param segmentMetadata metadata content in the form of {@code Map} + * @return {@link RemoteSegmentMetadata} + */ + public static RemoteSegmentMetadata fromMapOfStrings(Map segmentMetadata) { + return new RemoteSegmentMetadata( + segmentMetadata.entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString(entry.getValue()) + ) + ) + ); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java new file mode 100644 index 0000000000000..655a39c6248ab --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.metadata; + +import java.io.IOException; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.io.IndexIOStreamHandler; + +/** + * Handler for {@link RemoteSegmentMetadata} + * + * @opensearch.internal + */ +public class RemoteSegmentMetadataHandler implements IndexIOStreamHandler { + /** + * Reads metadata content from metadata file input stream and parsed into {@link RemoteSegmentMetadata} + * @param indexInput metadata file input stream with {@link IndexInput#getFilePointer()} pointing to metadata content + * @return {@link RemoteSegmentMetadata} + */ + @Override + public RemoteSegmentMetadata readContent(IndexInput indexInput) throws IOException { + return RemoteSegmentMetadata.fromMapOfStrings(indexInput.readMapOfStrings()); + } + + /** + * Writes metadata to file output stream + * @param indexOutput metadata file input stream + * @param content {@link RemoteSegmentMetadata} from which metadata content would be generated + */ + @Override + public void writeContent(IndexOutput indexOutput, RemoteSegmentMetadata content) throws IOException { + indexOutput.writeMapOfStrings(content.toMapOfStrings()); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/package-info.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/package-info.java new file mode 100644 index 0000000000000..da0444af4c246 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package containing metadata constructs for Remote Segment store + */ +package org.opensearch.index.store.remote.metadata; diff --git a/server/src/test/java/org/opensearch/common/io/VersionedCodecStreamWrapperTests.java b/server/src/test/java/org/opensearch/common/io/VersionedCodecStreamWrapperTests.java new file mode 100644 index 0000000000000..a92b4823466e3 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/io/VersionedCodecStreamWrapperTests.java @@ -0,0 +1,141 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.junit.Before; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link org.opensearch.common.io.VersionedCodecStreamWrapper}. + */ +public class VersionedCodecStreamWrapperTests extends OpenSearchTestCase { + + private static final String CODEC = "dummycodec"; + private static final int VERSION = 1; + + IndexIOStreamHandler ioStreamHandler; + VersionedCodecStreamWrapper versionedCodecStreamWrapper; + + @Before + public void setup() throws IOException { + ioStreamHandler = mock(IndexIOStreamHandler.class); + versionedCodecStreamWrapper = new VersionedCodecStreamWrapper(ioStreamHandler, VERSION, CODEC); + } + + public void testReadStream() throws IOException { + DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); + DummyObject readData = versionedCodecStreamWrapper.readStream(createHeaderFooterBytes(CODEC, VERSION, true, true)); + assertEquals(readData, expectedObject); + } + + public void testReadWithOldVersionThrowsException() throws IOException { + DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); + assertThrows( + IndexFormatTooOldException.class, + () -> versionedCodecStreamWrapper.readStream(createHeaderFooterBytes(CODEC, 0, true, true)) + ); + } + + public void testReadWithNewVersionThrowsException() throws IOException { + DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); + assertThrows( + IndexFormatTooNewException.class, + () -> versionedCodecStreamWrapper.readStream(createHeaderFooterBytes(CODEC, 2, true, true)) + ); + } + + public void testReadWithUnexpectedCodecThrowsException() throws IOException { + DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); + assertThrows( + CorruptIndexException.class, + () -> versionedCodecStreamWrapper.readStream(createHeaderFooterBytes("wrong codec", VERSION, true, true)) + ); + } + + public void testReadWithNoHeaderThrowsException() throws IOException { + DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); + assertThrows( + CorruptIndexException.class, + () -> versionedCodecStreamWrapper.readStream(createHeaderFooterBytes("wrong codec", VERSION, false, true)) + ); + } + + public void testReadWithNoFooterThrowsException() throws IOException { + DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); + assertThrows( + CorruptIndexException.class, + () -> versionedCodecStreamWrapper.readStream(createHeaderFooterBytes("wrong codec", VERSION, true, false)) + ); + } + + public void testWriteStream() throws IOException { + DummyObject expectedObject = new DummyObject("test read"); + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); + doAnswer(invocation -> { + IndexOutput io = invocation.getArgument(0); + io.writeString("test write"); + return null; + }).when(ioStreamHandler).writeContent(indexOutput, expectedObject); + versionedCodecStreamWrapper.writeStream(indexOutput, expectedObject); + indexOutput.close(); + IndexInput indexInput = new ByteArrayIndexInput("dummy bytes", BytesReference.toBytes(output.bytes())); + BufferedChecksumIndexInput bii = new BufferedChecksumIndexInput(indexInput); + + CodecUtil.checkHeader(bii, CODEC, VERSION, VERSION); + assertEquals(bii.readString(), "test write"); + CodecUtil.checkFooter(bii); + } + + private ByteArrayIndexInput createHeaderFooterBytes(String codec, int version, boolean writeHeader, boolean writeFooter) + throws IOException { + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); + if (writeHeader) { + CodecUtil.writeHeader(indexOutput, codec, version); + } + if (writeFooter) { + CodecUtil.writeFooter(indexOutput); + } + indexOutput.close(); + return new ByteArrayIndexInput("dummy bytes", BytesReference.toBytes(output.bytes())); + } + + private static class DummyObject { + private static String dummyString; + + public DummyObject(String dummy) { + dummyString = dummy; + } + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 3f1c20f0d88a5..8b7a2e8e06207 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -9,14 +9,22 @@ package org.opensearch.index.store; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.Before; import org.opensearch.common.UUIDs; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.collect.Set; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -28,9 +36,11 @@ import java.util.Map; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.startsWith; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -148,6 +158,22 @@ private Map getDummyMetadata(String prefix, int commitGeneration return metadata; } + /** + * Prepares metadata file bytes with header and footer + * @param segmentFilesMap: actual metadata content + * @return ByteArrayIndexInput: metadata file bytes with header and footer + * @throws IOException + */ + private ByteArrayIndexInput createMetadataFileBytes(Map segmentFilesMap) throws IOException { + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); + CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, RemoteSegmentMetadata.CURRENT_VERSION); + indexOutput.writeMapOfStrings(segmentFilesMap); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + return new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); + } + private Map> populateMetadata() throws IOException { List metadataFiles = List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv"); when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( @@ -163,17 +189,16 @@ private Map> populateMetadata() throws IOException { getDummyMetadata("_0", 1) ); - IndexInput indexInput1 = mock(IndexInput.class); - when(indexInput1.readMapOfStrings()).thenReturn(metadataFilenameContentMapping.get("metadata__1__5__abc")); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(indexInput1); - - IndexInput indexInput2 = mock(IndexInput.class); - when(indexInput2.readMapOfStrings()).thenReturn(metadataFilenameContentMapping.get("metadata__1__6__pqr")); - when(remoteMetadataDirectory.openInput("metadata__1__6__pqr", IOContext.DEFAULT)).thenReturn(indexInput2); - - IndexInput indexInput3 = mock(IndexInput.class); - when(indexInput3.readMapOfStrings()).thenReturn(metadataFilenameContentMapping.get("metadata__2__1__zxv")); - when(remoteMetadataDirectory.openInput("metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(indexInput3); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn( + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc")) + ); + when(remoteMetadataDirectory.openInput("metadata__1__6__pqr", IOContext.DEFAULT)).thenReturn( + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__6__pqr")) + ); + when(remoteMetadataDirectory.openInput("metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn( + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv")), + createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv")) + ); return metadataFilenameContentMapping; } @@ -354,11 +379,7 @@ public void testContainsFile() throws IOException { metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); - Map> metadataFilenameContentMapping = Map.of("metadata__1__5__abc", metadata); - - IndexInput indexInput1 = mock(IndexInput.class); - when(indexInput1.readMapOfStrings()).thenReturn(metadataFilenameContentMapping.get("metadata__1__5__abc")); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(indexInput1); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata)); remoteSegmentStoreDirectory.init(); @@ -394,7 +415,8 @@ public void testUploadMetadataNonEmpty() throws IOException { remoteSegmentStoreDirectory.init(); Directory storeDirectory = mock(Directory.class); - IndexOutput indexOutput = mock(IndexOutput.class); + BytesStreamOutput output = new BytesStreamOutput(); + IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); when(storeDirectory.createOutput(startsWith("metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); Collection segmentFiles = List.of("_0.si"); @@ -407,7 +429,125 @@ public void testUploadMetadataNonEmpty() throws IOException { eq(IOContext.DEFAULT) ); String metadataString = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().get("_0.si").toString(); - verify(indexOutput).writeMapOfStrings(Map.of("_0.si", metadataString)); + + ByteArrayIndexInput expectedMetadataFileContent = createMetadataFileBytes(Map.of("_0.si", metadataString)); + int expectedBytesLength = (int) expectedMetadataFileContent.length(); + byte[] expectedBytes = new byte[expectedBytesLength]; + expectedMetadataFileContent.readBytes(expectedBytes, 0, expectedBytesLength); + + assertArrayEquals(expectedBytes, BytesReference.toBytes(output.bytes())); + } + + public void testNoMetadataHeaderCorruptIndexException() throws IOException { + List metadataFiles = List.of("metadata__1__5__abc"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + metadataFiles + ); + + Map metadata = new HashMap<>(); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); + indexOutput.writeMapOfStrings(metadata); + indexOutput.close(); + ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + + assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); + } + + public void testInvalidCodecHeaderCorruptIndexException() throws IOException { + List metadataFiles = List.of("metadata__1__5__abc"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + metadataFiles + ); + + Map metadata = new HashMap<>(); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); + CodecUtil.writeHeader(indexOutput, "invalidCodec", RemoteSegmentMetadata.CURRENT_VERSION); + indexOutput.writeMapOfStrings(metadata); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + + assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); + } + + public void testHeaderMinVersionCorruptIndexException() throws IOException { + List metadataFiles = List.of("metadata__1__5__abc"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + metadataFiles + ); + + Map metadata = new HashMap<>(); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); + CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, -1); + indexOutput.writeMapOfStrings(metadata); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + + assertThrows(IndexFormatTooOldException.class, () -> remoteSegmentStoreDirectory.init()); + } + + public void testHeaderMaxVersionCorruptIndexException() throws IOException { + List metadataFiles = List.of("metadata__1__5__abc"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + metadataFiles + ); + + Map metadata = new HashMap<>(); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); + CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, 2); + indexOutput.writeMapOfStrings(metadata); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + + assertThrows(IndexFormatTooNewException.class, () -> remoteSegmentStoreDirectory.init()); + } + + public void testIncorrectChecksumCorruptIndexException() throws IOException { + List metadataFiles = List.of("metadata__1__5__abc"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + metadataFiles + ); + + Map metadata = new HashMap<>(); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + + BytesStreamOutput output = new BytesStreamOutput(); + IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); + IndexOutput wrappedIndexOutput = new WrapperIndexOutput(indexOutput); + IndexOutput indexOutputSpy = spy(wrappedIndexOutput); + CodecUtil.writeHeader(indexOutputSpy, RemoteSegmentMetadata.METADATA_CODEC, RemoteSegmentMetadata.CURRENT_VERSION); + indexOutputSpy.writeMapOfStrings(metadata); + doReturn(12345L).when(indexOutputSpy).getChecksum(); + CodecUtil.writeFooter(indexOutputSpy); + indexOutputSpy.close(); + + ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); + when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + + assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); } public void testDeleteStaleCommitsException() throws IOException { @@ -489,4 +629,65 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOExc ; verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); } + + public void testSegmentMetadataCurrentVersion() { + /* + This is a fake test which will fail whenever the CURRENT_VERSION is incremented. + This is to bring attention of the author towards backward compatibility of metadata files. + If there is any breaking change the author needs to specify how old metadata file will be supported after + this change + If author doesn't want to support old metadata files. Then this can be ignored. + After taking appropriate action, fix this test by setting the correct version here + */ + assertEquals(RemoteSegmentMetadata.CURRENT_VERSION, 1); + } + + private static class WrapperIndexOutput extends IndexOutput { + public IndexOutput indexOutput; + + public WrapperIndexOutput(IndexOutput indexOutput) { + super(indexOutput.toString(), indexOutput.getName()); + this.indexOutput = indexOutput; + } + + @Override + public final void writeByte(byte b) throws IOException { + this.indexOutput.writeByte(b); + } + + @Override + public final void writeBytes(byte[] b, int offset, int length) throws IOException { + this.indexOutput.writeBytes(b, offset, length); + } + + @Override + public void writeShort(short i) throws IOException { + this.indexOutput.writeShort(i); + } + + @Override + public void writeInt(int i) throws IOException { + this.indexOutput.writeInt(i); + } + + @Override + public void writeLong(long i) throws IOException { + this.indexOutput.writeLong(i); + } + + @Override + public void close() throws IOException { + this.indexOutput.close(); + } + + @Override + public final long getFilePointer() { + return this.indexOutput.getFilePointer(); + } + + @Override + public long getChecksum() throws IOException { + return this.indexOutput.getChecksum(); + } + } } diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java new file mode 100644 index 0000000000000..2a30e58b8802c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.metadata; + +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.junit.Before; +import org.opensearch.common.UUIDs; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Unit Tests for {@link RemoteSegmentMetadataHandler} + */ +public class RemoteSegmentMetadataHandlerTests extends OpenSearchTestCase { + private RemoteSegmentMetadataHandler remoteSegmentMetadataHandler; + + @Before + public void setup() throws IOException { + remoteSegmentMetadataHandler = new RemoteSegmentMetadataHandler(); + } + + public void testReadContent() throws IOException { + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); + Map expectedOutput = getDummyData(); + indexOutput.writeMapOfStrings(expectedOutput); + indexOutput.close(); + RemoteSegmentMetadata metadata = remoteSegmentMetadataHandler.readContent( + new ByteArrayIndexInput("dummy bytes", BytesReference.toBytes(output.bytes())) + ); + assertEquals(expectedOutput, metadata.toMapOfStrings()); + } + + public void testWriteContent() throws IOException { + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); + Map expectedOutput = getDummyData(); + remoteSegmentMetadataHandler.writeContent(indexOutput, RemoteSegmentMetadata.fromMapOfStrings(expectedOutput)); + indexOutput.close(); + Map actualOutput = new ByteArrayIndexInput("dummy bytes", BytesReference.toBytes(output.bytes())) + .readMapOfStrings(); + assertEquals(expectedOutput, actualOutput); + } + + private Map getDummyData() { + Map expectedOutput = new HashMap<>(); + String prefix = "_0"; + expectedOutput.put( + prefix + ".cfe", + prefix + ".cfe::" + prefix + ".cfe__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000) + ); + expectedOutput.put( + prefix + ".cfs", + prefix + ".cfs::" + prefix + ".cfs__" + UUIDs.base64UUID() + "::" + randomIntBetween(1000, 5000) + ); + return expectedOutput; + } +}