Skip to content

Commit

Permalink
Implement range reads in HDFS repository
Browse files Browse the repository at this point in the history
Resolves opensearch-project#9513

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross committed Aug 23, 2023
1 parent 0c839c3 commit 73229f6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.repositories.hdfs;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -46,6 +47,7 @@
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.io.Streams;
import org.opensearch.repositories.hdfs.HdfsBlobStore.Operation;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -125,8 +127,23 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public InputStream readBlob(String blobName, long position, long length) {
throw new UnsupportedOperationException();
public InputStream readBlob(String blobName, long position, long length) throws IOException {
return store.execute(fileContext -> {
final FSDataInputStream stream;
try {
stream = fileContext.open(new Path(path, blobName), bufferSize);
} catch (FileNotFoundException fnfe) {
throw new NoSuchFileException("[" + blobName + "] blob not found");
}
// Seek to the desired start position, closing the stream if any error occurs
try {
stream.seek(position);
} catch (Exception e) {
stream.close();
throw e;
}
return Streams.limitStream(new HDFSPrivilegedInputSteam(stream, securityContext), length);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,4 @@ protected Settings repositorySettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(HdfsPlugin.class);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9513")
@Override
public void testReadRange() {}
}

0 comments on commit 73229f6

Please sign in to comment.