From edd325a1824c8706371852de5d1f83e9b553859f Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Tue, 27 Feb 2024 13:03:59 -0800 Subject: [PATCH] [Searchable Snapshot] Fix bug of Searchable Snapshot Dependency on repository chunk_size (#12277) * implement logic of fetching blocks from multiple chunks of snapshot file. Signed-off-by: Rishikesh1159 * Refactor and address comments. Signed-off-by: Rishikesh1159 * apply spotless check Signed-off-by: Rishikesh1159 * Address comments of using a different data structure to fetch blob parts. Signed-off-by: Rishikesh1159 * remove unnecessary code. Signed-off-by: Rishikesh1159 * Refactor outputstream usage. Signed-off-by: Rishikesh1159 * refactor blobpart logic into a separate method and add unit tests. Signed-off-by: Rishikesh1159 * Add new unit tests. Signed-off-by: Rishikesh1159 --------- Signed-off-by: Rishikesh1159 Signed-off-by: Aman Khare --- .../snapshots/SearchableSnapshotIT.java | 36 ++++++- .../file/OnDemandBlockSnapshotIndexInput.java | 46 ++++++--- .../store/remote/utils/BlobFetchRequest.java | 95 ++++++++++--------- .../store/remote/utils/TransferManager.java | 22 +++-- .../OnDemandBlockSnapshotIndexInputTests.java | 63 ++++++++++-- .../remote/utils/TransferManagerTests.java | 29 ++---- 6 files changed, 196 insertions(+), 95 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index c89fef20aafb1..90bb2b501764e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -85,10 +85,10 @@ protected Settings.Builder randomRepositorySettings() { return settings; } - private Settings.Builder chunkedRepositorySettings() { + private Settings.Builder chunkedRepositorySettings(long chunkSize) { final Settings.Builder settings = Settings.builder(); settings.put("location", randomRepoPath()).put("compress", randomBoolean()); - settings.put("chunk_size", 2 << 23, ByteSizeUnit.BYTES); + settings.put("chunk_size", chunkSize, ByteSizeUnit.BYTES); return settings; } @@ -194,10 +194,10 @@ public void testSnapshottingSearchableSnapshots() throws Exception { } /** - * Tests a chunked repository scenario for searchable snapshots by creating an index, + * Tests a default 8mib chunked repository scenario for searchable snapshots by creating an index, * taking a snapshot, restoring it as a searchable snapshot index. */ - public void testCreateSearchableSnapshotWithChunks() throws Exception { + public void testCreateSearchableSnapshotWithDefaultChunks() throws Exception { final int numReplicasIndex = randomIntBetween(1, 4); final String indexName = "test-idx"; final String restoredIndexName = indexName + "-copy"; @@ -205,7 +205,33 @@ public void testCreateSearchableSnapshotWithChunks() throws Exception { final String snapshotName = "test-snap"; final Client client = client(); - Settings.Builder repositorySettings = chunkedRepositorySettings(); + Settings.Builder repositorySettings = chunkedRepositorySettings(2 << 23); + + internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1); + createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName); + createRepositoryWithSettings(repositorySettings, repoName); + takeSnapshot(client, snapshotName, repoName, indexName); + + deleteIndicesAndEnsureGreen(client, indexName); + restoreSnapshotAndEnsureGreen(client, snapshotName, repoName); + assertRemoteSnapshotIndexSettings(client, restoredIndexName); + + assertDocCount(restoredIndexName, 1000L); + } + + /** + * Tests a small 1000 bytes chunked repository scenario for searchable snapshots by creating an index, + * taking a snapshot, restoring it as a searchable snapshot index. + */ + public void testCreateSearchableSnapshotWithSmallChunks() throws Exception { + final int numReplicasIndex = randomIntBetween(1, 4); + final String indexName = "test-idx"; + final String restoredIndexName = indexName + "-copy"; + final String repoName = "test-repo"; + final String snapshotName = "test-snap"; + final Client client = client(); + + Settings.Builder repositorySettings = chunkedRepositorySettings(1000); internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1); createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName); diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index 7166e9aa482e3..8097fd08da50a 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -15,6 +15,8 @@ import org.opensearch.index.store.remote.utils.TransferManager; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files. @@ -136,25 +138,45 @@ protected IndexInput fetchBlock(int blockId) throws IOException { final long blockStart = getBlockStart(blockId); final long blockEnd = blockStart + getActualBlockSize(blockId); - // If the snapshot file is chunked, we must account for this by - // choosing the appropriate file part and updating the position - // accordingly. - final int part = (int) (blockStart / partSize); - final long partStart = part * partSize; - - final long position = blockStart - partStart; - final long length = blockEnd - blockStart; - + // Block may be present on multiple chunks of a file, so we need + // to fetch each chunk/blob part separately to fetch an entire block. BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder() - .position(position) - .length(length) - .blobName(fileInfo.partName(part)) + .blobParts(getBlobParts(blockStart, blockEnd)) .directory(directory) .fileName(blockFileName) .build(); return transferManager.fetchBlob(blobFetchRequest); } + /** + * Returns list of blob parts/chunks in a file for a given block. + */ + protected List getBlobParts(long blockStart, long blockEnd) { + // If the snapshot file is chunked, we must account for this by + // choosing the appropriate file part and updating the position + // accordingly. + int partNum = (int) (blockStart / partSize); + long pos = blockStart; + long diff = (blockEnd - blockStart); + + List blobParts = new ArrayList<>(); + while (diff > 0) { + long partStart = pos % partSize; + long partEnd; + if ((partStart + diff) > partSize) { + partEnd = partSize; + } else { + partEnd = (partStart + diff); + } + long fetchBytes = partEnd - partStart; + blobParts.add(new BlobFetchRequest.BlobPart(fileInfo.partName(partNum), partStart, fetchBytes)); + partNum++; + pos = pos + fetchBytes; + diff = (blockEnd - pos); + } + return blobParts; + } + @Override public OnDemandBlockSnapshotIndexInput clone() { OnDemandBlockSnapshotIndexInput clone = buildSlice("clone", 0L, this.length); diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java index d0508e9c6f4c7..f7e6545b5010e 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java @@ -12,6 +12,7 @@ import org.apache.lucene.store.FSDirectory; import java.nio.file.Path; +import java.util.List; /** * The specification to fetch specific block from blob store @@ -20,37 +21,22 @@ */ public class BlobFetchRequest { - private final long position; - - private final long length; - - private final String blobName; - private final Path filePath; private final Directory directory; private final String fileName; + private final List blobParts; + + private final long blobLength; + private BlobFetchRequest(Builder builder) { - this.position = builder.position; - this.length = builder.length; - this.blobName = builder.blobName; this.fileName = builder.fileName; this.filePath = builder.directory.getDirectory().resolve(fileName); this.directory = builder.directory; - } - - public long getPosition() { - return position; - } - - public long getLength() { - return length; - } - - public String getBlobName() { - return blobName; + this.blobParts = builder.blobParts; + this.blobLength = builder.blobParts.stream().mapToLong(o -> o.getLength()).sum(); } public Path getFilePath() { @@ -65,6 +51,14 @@ public String getFileName() { return fileName; } + public List blobParts() { + return blobParts; + } + + public long getBlobLength() { + return blobLength; + } + public static Builder builder() { return new Builder(); } @@ -72,12 +66,8 @@ public static Builder builder() { @Override public String toString() { return "BlobFetchRequest{" - + "position=" - + position - + ", length=" - + length - + ", blobName='" - + blobName + + "blobParts=" + + blobParts + '\'' + ", filePath=" + filePath @@ -90,35 +80,45 @@ public String toString() { } /** - * Builder for BlobFetchRequest + * BlobPart represents a single chunk of a file */ - public static final class Builder { + public static class BlobPart { + private String blobName; private long position; private long length; - private String blobName; - private FSDirectory directory; - private String fileName; - - private Builder() {} - public Builder position(long position) { - this.position = position; - return this; - } - - public Builder length(long length) { + public BlobPart(String blobName, long position, long length) { + this.blobName = blobName; if (length <= 0) { - throw new IllegalArgumentException("Length for blob fetch request needs to be non-negative"); + throw new IllegalArgumentException("Length for blob part fetch request needs to be non-negative"); } this.length = length; - return this; + this.position = position; } - public Builder blobName(String blobName) { - this.blobName = blobName; - return this; + public String getBlobName() { + return blobName; + } + + public long getPosition() { + return position; } + public long getLength() { + return length; + } + } + + /** + * Builder for BlobFetchRequest + */ + public static final class Builder { + private List blobParts; + private FSDirectory directory; + private String fileName; + + private Builder() {} + public Builder directory(FSDirectory directory) { this.directory = directory; return this; @@ -129,6 +129,11 @@ public Builder fileName(String fileName) { return this; } + public Builder blobParts(List blobParts) { + this.blobParts = blobParts; + return this; + } + public BlobFetchRequest build() { return new BlobFetchRequest(this); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 9250e73e08509..98cad7bfadb09 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -48,11 +48,12 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa } /** - * Given a blobFetchRequest, return it's corresponding IndexInput. + * Given a blobFetchRequestList, return it's corresponding IndexInput. * @param blobFetchRequest to fetch * @return future of IndexInput augmented with internal caching maintenance tasks */ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException { + final Path key = blobFetchRequest.getFilePath(); final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { @@ -85,15 +86,20 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo try { if (Files.exists(request.getFilePath()) == false) { try ( - InputStream snapshotFileInputStream = blobContainer.readBlob( - request.getBlobName(), - request.getPosition(), - request.getLength() - ); OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) ) { - snapshotFileInputStream.transferTo(localFileOutputStream); + for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) { + try ( + InputStream snapshotFileInputStream = blobContainer.readBlob( + blobPart.getBlobName(), + blobPart.getPosition(), + blobPart.getLength() + ); + ) { + snapshotFileInputStream.transferTo(localFileOutputStream); + } + } } } final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ); @@ -153,7 +159,7 @@ public IndexInput getIndexInput() throws IOException { @Override public long length() { - return request.getLength(); + return request.getBlobLength(); } @Override diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index 2204124f1de4f..a135802c5f49c 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -78,11 +78,31 @@ public void test4MBBlock() throws Exception { runAllTestsFor(22); } - public void testChunkedRepository() throws IOException { - final long blockSize = new ByteSizeValue(1, ByteSizeUnit.KB).getBytes(); - final long repositoryChunkSize = new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(); - final long fileSize = new ByteSizeValue(3, ByteSizeUnit.KB).getBytes(); + public void testChunkedRepositoryWithBlockSizeGreaterThanChunkSize() throws IOException { + verifyChunkedRepository( + new ByteSizeValue(8, ByteSizeUnit.KB).getBytes(), // block Size + new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(), // repository chunk size + new ByteSizeValue(15, ByteSizeUnit.KB).getBytes() // file size + ); + } + + public void testChunkedRepositoryWithBlockSizeLessThanChunkSize() throws IOException { + verifyChunkedRepository( + new ByteSizeValue(1, ByteSizeUnit.KB).getBytes(), // block Size + new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(), // repository chunk size + new ByteSizeValue(3, ByteSizeUnit.KB).getBytes() // file size + ); + } + + public void testChunkedRepositoryWithBlockSizeEqualToChunkSize() throws IOException { + verifyChunkedRepository( + new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(), // block Size + new ByteSizeValue(2, ByteSizeUnit.KB).getBytes(), // repository chunk size + new ByteSizeValue(15, ByteSizeUnit.KB).getBytes() // file size + ); + } + private void verifyChunkedRepository(long blockSize, long repositoryChunkSize, long fileSize) throws IOException { when(transferManager.fetchBlob(any())).thenReturn(new ByteArrayIndexInput("test", new byte[(int) blockSize])); try ( FSDirectory directory = new MMapDirectory(path, lockFactory); @@ -105,8 +125,9 @@ public void testChunkedRepository() throws IOException { // Seek to the position past the first repository chunk indexInput.seek(repositoryChunkSize); } - // Verify the second chunk is requested (i.e. ".part1") - verify(transferManager).fetchBlob(argThat(request -> request.getBlobName().equals("File_Name.part1"))); + + // Verify all the chunks related to block are added to the fetchBlob request + verify(transferManager).fetchBlob(argThat(request -> request.getBlobLength() == blockSize)); } private void runAllTestsFor(int blockSizeShift) throws Exception { @@ -115,6 +136,7 @@ private void runAllTestsFor(int blockSizeShift) throws Exception { TestGroup.testGetBlock(blockedSnapshotFile, blockSize, FILE_SIZE); TestGroup.testGetBlockOffset(blockedSnapshotFile, blockSize, FILE_SIZE); TestGroup.testGetBlockStart(blockedSnapshotFile, blockSize); + TestGroup.testGetBlobParts(blockedSnapshotFile); TestGroup.testCurrentBlockStart(blockedSnapshotFile, blockSize); TestGroup.testCurrentBlockPosition(blockedSnapshotFile, blockSize); TestGroup.testClone(blockedSnapshotFile, blockSize); @@ -252,6 +274,35 @@ public static void testGetBlockStart(OnDemandBlockSnapshotIndexInput blockedSnap assertEquals(blockSize * 2, blockedSnapshotFile.getBlockStart(2)); } + public static void testGetBlobParts(OnDemandBlockSnapshotIndexInput blockedSnapshotFile) { + // block id 0 + int blockId = 0; + long blockStart = blockedSnapshotFile.getBlockStart(blockId); + long blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId); + assertEquals( + (blockEnd - blockStart), + blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum() + ); + + // block 1 + blockId = 1; + blockStart = blockedSnapshotFile.getBlockStart(blockId); + blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId); + assertEquals( + (blockEnd - blockStart), + blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum() + ); + + // block 2 + blockId = 2; + blockStart = blockedSnapshotFile.getBlockStart(blockId); + blockEnd = blockStart + blockedSnapshotFile.getActualBlockSize(blockId); + assertEquals( + (blockEnd - blockStart), + blockedSnapshotFile.getBlobParts(blockStart, blockEnd).stream().mapToLong(o -> o.getLength()).sum() + ); + } + public static void testCurrentBlockStart(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize) throws IOException { // block 0 blockedSnapshotFile.seek(blockSize - 1); diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java index d42e614302658..7ae3944eb6944 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java @@ -163,17 +163,11 @@ public void testUsageExceedsCapacity() throws Exception { public void testDownloadFails() throws Exception { doThrow(new IOException("Expected test exception")).when(blobContainer).readBlob(eq("failure-blob"), anyLong(), anyLong()); + List blobParts = new ArrayList<>(); + blobParts.add(new BlobFetchRequest.BlobPart("failure-blob", 0, EIGHT_MB)); expectThrows( IOException.class, - () -> transferManager.fetchBlob( - BlobFetchRequest.builder() - .blobName("failure-blob") - .position(0) - .fileName("file") - .directory(directory) - .length(EIGHT_MB) - .build() - ) + () -> transferManager.fetchBlob(BlobFetchRequest.builder().fileName("file").directory(directory).blobParts(blobParts).build()) ); MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L)); MatcherAssert.assertThat(fileCache.usage().usage(), equalTo(0L)); @@ -187,16 +181,13 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception latch.await(); return new ByteArrayInputStream(createData()); }).when(blobContainer).readBlob(eq("blocking-blob"), anyLong(), anyLong()); + List blobParts = new ArrayList<>(); + blobParts.add(new BlobFetchRequest.BlobPart("blocking-blob", 0, EIGHT_MB)); + final Thread blockingThread = new Thread(() -> { try { transferManager.fetchBlob( - BlobFetchRequest.builder() - .blobName("blocking-blob") - .position(0) - .fileName("blocking-file") - .directory(directory) - .length(EIGHT_MB) - .build() + BlobFetchRequest.builder().fileName("blocking-file").directory(directory).blobParts(blobParts).build() ); } catch (IOException e) { throw new RuntimeException(e); @@ -216,9 +207,9 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception } private IndexInput fetchBlobWithName(String blobname) throws IOException { - return transferManager.fetchBlob( - BlobFetchRequest.builder().blobName("blob").position(0).fileName(blobname).directory(directory).length(EIGHT_MB).build() - ); + List blobParts = new ArrayList<>(); + blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB)); + return transferManager.fetchBlob(BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build()); } private static void assertIndexInputIsFunctional(IndexInput indexInput) throws IOException {