Skip to content

Commit

Permalink
Handle null partSize in OnDemandBlockSnapshotIndexInput
Browse files Browse the repository at this point in the history
The `partSize()` value can be null if the underlying repository
implementation does not implement file chunking.

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross committed Aug 23, 2023
1 parent 0c839c3 commit 1cdba2b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix flaky ResourceAwareTasksTests.testBasicTaskResourceTracking test ([#8993](https://github.com/opensearch-project/OpenSearch/pull/8993))
- Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403))
- Fix range reads in respository-s3 ([9512](https://github.com/opensearch-project/OpenSearch/issues/9512))
- Handle null partSize in OnDemandBlockSnapshotIndexInput ([#9291](https://github.com/opensearch-project/OpenSearch/issues/9291))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.index.store.remote.file;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
Expand All @@ -26,8 +24,6 @@
* @opensearch.internal
*/
public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class);

/**
* Where this class fetches IndexInput parts from
*/
Expand All @@ -48,7 +44,7 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
protected final String fileName;

/**
* part size in bytes
* Maximum size in bytes of snapshot file parts.
*/
protected final long partSize;

Expand Down Expand Up @@ -104,7 +100,15 @@ public OnDemandBlockSnapshotIndexInput(
super(builder);
this.transferManager = transferManager;
this.fileInfo = fileInfo;
this.partSize = fileInfo.partSize().getBytes();
if (fileInfo.partSize() != null) {
this.partSize = fileInfo.partSize().getBytes();
} else {
// Repository implementations can define a size at which to split files
// into multiple objects in the repository. If partSize() is null, then
// no splitting happens, so default to Long.MAX_VALUE here to have the
// same effect. See {@code BlobStoreRepository#chunkSize()}.
this.partSize = Long.MAX_VALUE;
}
this.fileName = fileInfo.physicalName();
this.directory = directory;
this.originalFileSize = fileInfo.length();
Expand All @@ -131,6 +135,10 @@ protected IndexInput fetchBlock(int blockId) throws IOException {

final long blockStart = getBlockStart(blockId);
final long blockEnd = blockStart + getActualBlockSize(blockId);

// If the snapshot file is chunked, we must account for this by
// choosing the appropriate file part and updating the position
// accordingly.
final int part = (int) (blockStart / partSize);
final long partStart = part * partSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.Version;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
Expand All @@ -31,9 +33,12 @@
import java.io.IOException;
import java.nio.file.Path;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class OnDemandBlockSnapshotIndexInputTests extends OpenSearchTestCase {
Expand All @@ -43,7 +48,6 @@ public class OnDemandBlockSnapshotIndexInputTests extends OpenSearchTestCase {
private static final String FILE_NAME = "File_Name";
private static final String BLOCK_FILE_PREFIX = FILE_NAME;
private static final boolean IS_CLONE = false;
private static final ByteSizeValue BYTE_SIZE_VALUE = new ByteSizeValue(1L);
private static final int FILE_SIZE = 29360128;
private TransferManager transferManager;
private LockFactory lockFactory;
Expand Down Expand Up @@ -74,7 +78,38 @@ public void test4MBBlock() throws Exception {
runAllTestsFor(22);
}

public void runAllTestsFor(int blockSizeShift) throws Exception {
public void testChunkedRepository() throws IOException {
final long blockSize = new ByteSizeValue(1, ByteSizeUnit.KB).getBytes();
final long repositoryChunkSize = new ByteSizeValue(2, ByteSizeUnit.KB).getBytes();
final long fileSize = new ByteSizeValue(3, ByteSizeUnit.KB).getBytes();

when(transferManager.fetchBlob(any())).thenReturn(new ByteArrayIndexInput("test", new byte[(int) blockSize]));
try (
FSDirectory directory = new MMapDirectory(path, lockFactory);
IndexInput indexInput = new OnDemandBlockSnapshotIndexInput(
OnDemandBlockIndexInput.builder()
.resourceDescription(RESOURCE_DESCRIPTION)
.offset(BLOCK_SNAPSHOT_FILE_OFFSET)
.length(FILE_SIZE)
.blockSizeShift((int) (Math.log(blockSize) / Math.log(2)))
.isClone(IS_CLONE),
new BlobStoreIndexShardSnapshot.FileInfo(
FILE_NAME,
new StoreFileMetadata(FILE_NAME, fileSize, "", Version.LATEST),
new ByteSizeValue(repositoryChunkSize)
),
directory,
transferManager
)
) {
// Seek to the position past the first repository chunk
indexInput.seek(repositoryChunkSize);
}
// Verify the second chunk is requested (i.e. ".part1")
verify(transferManager).fetchBlob(argThat(request -> request.getBlobName().equals("File_Name.part1")));
}

private void runAllTestsFor(int blockSizeShift) throws Exception {
final OnDemandBlockSnapshotIndexInput blockedSnapshotFile = createOnDemandBlockSnapshotIndexInput(blockSizeShift);
final int blockSize = 1 << blockSizeShift;
TestGroup.testGetBlock(blockedSnapshotFile, blockSize, FILE_SIZE);
Expand Down Expand Up @@ -106,7 +141,7 @@ private OnDemandBlockSnapshotIndexInput createOnDemandBlockSnapshotIndexInput(in
fileInfo = new BlobStoreIndexShardSnapshot.FileInfo(
FILE_NAME,
new StoreFileMetadata(FILE_NAME, FILE_SIZE, "", Version.LATEST),
BYTE_SIZE_VALUE
null
);

int blockSize = 1 << blockSizeShift;
Expand Down Expand Up @@ -182,7 +217,7 @@ private void initBlockFiles(int blockSize, FSDirectory fsDirectory) {

}

public static class TestGroup {
private static class TestGroup {

public static void testGetBlock(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize, int fileSize) {
// block 0
Expand Down

0 comments on commit 1cdba2b

Please sign in to comment.