diff --git a/CHANGELOG.md b/CHANGELOG.md
index e4fe8736e8588..3732a0d83877c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -96,6 +96,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))
- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))
+- [Remote Store] Support to emit multiple streams for a file content each responsible for processing a specific part of the file ([#6977](https://github.com/opensearch-project/OpenSearch/pull/6977))
### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
@@ -133,4 +134,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security
[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
-[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
\ No newline at end of file
+[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
diff --git a/server/src/main/java/org/opensearch/common/Stream.java b/server/src/main/java/org/opensearch/common/Stream.java
new file mode 100644
index 0000000000000..ab5666f5819fc
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/Stream.java
@@ -0,0 +1,55 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common;
+
+import java.io.InputStream;
+
+/**
+ * Model composed of an input stream, the total content length and offset
+ */
+public class Stream {
+
+ private final InputStream inputStream;
+ private final long contentLength;
+ private final long offset;
+
+ /**
+ * Construct a new stream object
+ *
+ * @param inputStream The input stream that is to be encapsulated
+ * @param contentLength The total content length that is to be read from the stream
+ * @param offset The offset pointer that this stream reads from in the file
+ */
+ public Stream(InputStream inputStream, long contentLength, long offset) {
+ this.inputStream = inputStream;
+ this.contentLength = contentLength;
+ this.offset = offset;
+ }
+
+ /**
+ * @return The input stream this object is reading from
+ */
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+ /**
+ * @return The total length of the content that has to be read from this stream
+ */
+ public long getContentLength() {
+ return contentLength;
+ }
+
+ /**
+ * @return The offset pointer in the file that this stream is reading from
+ */
+ public long getOffset() {
+ return offset;
+ }
+}
diff --git a/server/src/main/java/org/opensearch/common/StreamProvider.java b/server/src/main/java/org/opensearch/common/StreamProvider.java
new file mode 100644
index 0000000000000..b91d83582c295
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/StreamProvider.java
@@ -0,0 +1,47 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common;
+
+import java.io.IOException;
+
+/**
+ * StreamProvider is used to supply streams to vendor plugins using StreamProvider#provideStream
+ */
+public class StreamProvider {
+
+ private final TransferPartStreamSupplier streamSupplier;
+ private final long partSize;
+ private final long lastPartSize;
+ private final int numOfParts;
+
+ /**
+ * Construct a new StreamProvider object
+ *
+ * @param streamSupplier An implementation of TransferPartStreamSupplier which will be called when provideStreams is called
+ * @param partSize Size of all parts apart from the last one
+ * @param lastPartSize Size of the last part
+ * @param numOfParts Total number of parts
+ */
+ public StreamProvider(TransferPartStreamSupplier streamSupplier, long partSize, long lastPartSize, int numOfParts) {
+ this.streamSupplier = streamSupplier;
+ this.partSize = partSize;
+ this.lastPartSize = lastPartSize;
+ this.numOfParts = numOfParts;
+ }
+
+ /**
+ * @param partNumber The index of the part
+ * @return A stream reference to the part requested
+ */
+ public Stream provideStream(int partNumber) throws IOException {
+ long position = partSize * partNumber;
+ long size = (partNumber == numOfParts - 1) ? lastPartSize : partSize;
+ return streamSupplier.supply(partNumber, size, position);
+ }
+}
diff --git a/server/src/main/java/org/opensearch/common/TransferPartStreamSupplier.java b/server/src/main/java/org/opensearch/common/TransferPartStreamSupplier.java
new file mode 100644
index 0000000000000..7bf01f3ca174c
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/TransferPartStreamSupplier.java
@@ -0,0 +1,19 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common;
+
+import java.io.IOException;
+
+/**
+ * TransferPartStreamSupplier is used to supply streams to specific parts of a file based on
+ * the partNo, size and position (the offset in the file)
+ */
+public interface TransferPartStreamSupplier {
+ Stream supply(int partNo, long size, long position) throws IOException;
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/StreamContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/StreamContext.java
new file mode 100644
index 0000000000000..60696592ef640
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/stream/StreamContext.java
@@ -0,0 +1,45 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.stream;
+
+import org.opensearch.common.StreamProvider;
+
+/**
+ * StreamContext encapsulates all the data required for uploading multiple streams
+ */
+public class StreamContext {
+
+ private final StreamProvider streamProvider;
+ private final int numberOfParts;
+
+ /**
+ * Construct a new StreamContext object
+ *
+ * @param streamProvider A stream provider to provide a stream for a given part number.
+ * @param numberOfParts Number of parts of the content referenced by equivalent number of streams.
+ */
+ public StreamContext(StreamProvider streamProvider, int numberOfParts) {
+ this.streamProvider = streamProvider;
+ this.numberOfParts = numberOfParts;
+ }
+
+ /**
+ * @return The stream iterable for the current upload
+ */
+ public StreamProvider getStreamProvider() {
+ return streamProvider;
+ }
+
+ /**
+ * @return The number of parts in current upload
+ */
+ public int getNumberOfParts() {
+ return numberOfParts;
+ }
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java
new file mode 100644
index 0000000000000..88d6d433d837f
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java
@@ -0,0 +1,10 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/** Abstractions for stream based file transfers */
+package org.opensearch.common.blobstore.stream;
diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java
new file mode 100644
index 0000000000000..3a718d08b5a95
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java
@@ -0,0 +1,23 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.stream.write;
+
+import org.opensearch.common.blobstore.stream.StreamContext;
+
+/**
+ * Will return the StreamContext
to the caller given the part size
+ */
+public interface StreamContextSupplier {
+
+ /**
+ * @param partSize The size of a single part to be uploaded
+ * @return The StreamContext
based on the part size provided
+ */
+ StreamContext supplyStreamContext(long partSize);
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java
new file mode 100644
index 0000000000000..5b04d4b617fec
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java
@@ -0,0 +1,90 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.stream.write;
+
+import org.opensearch.common.blobstore.stream.StreamContext;
+import org.opensearch.common.blobstore.transfer.UploadFinalizer;
+
+/**
+ * WriteContext is used to encapsulate all data needed by BlobContainer#writeStreams
+ */
+public class WriteContext {
+
+ private final String fileName;
+ private final StreamContextSupplier streamContextSupplier;
+ private final long fileSize;
+ private final boolean failIfAlreadyExists;
+ private final WritePriority writePriority;
+ private final UploadFinalizer uploadFinalizer;
+
+ /**
+ * Construct a new WriteContext object
+ *
+ * @param fileName The name of the file being uploaded
+ * @param streamContextSupplier A supplier that will provide StreamContext to the plugin
+ * @param fileSize The total size of the file being uploaded
+ * @param failIfAlreadyExists A boolean to fail the upload is the file exists
+ * @param writePriority The WritePriority
of this upload
+ */
+ public WriteContext(
+ String fileName,
+ StreamContextSupplier streamContextSupplier,
+ long fileSize,
+ boolean failIfAlreadyExists,
+ WritePriority writePriority,
+ UploadFinalizer uploadFinalizer
+ ) {
+ this.fileName = fileName;
+ this.streamContextSupplier = streamContextSupplier;
+ this.fileSize = fileSize;
+ this.failIfAlreadyExists = failIfAlreadyExists;
+ this.writePriority = writePriority;
+ this.uploadFinalizer = uploadFinalizer;
+ }
+
+ /**
+ * @return The file name
+ */
+ public String getFileName() {
+ return fileName;
+ }
+
+ /**
+ * @return The boolean representing whether to fail the file upload if it exists
+ */
+ public boolean isFailIfAlreadyExists() {
+ return failIfAlreadyExists;
+ }
+
+ /**
+ * @param partSize The size of a single part to be uploaded
+ * @return The stream context which will be used by the plugin to initialize streams from the file
+ */
+ public StreamContext getStreamContext(long partSize) {
+ return streamContextSupplier.supplyStreamContext(partSize);
+ }
+
+ /**
+ * @return The total size of the file
+ */
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ /**
+ * @return The WritePriority
of the upload
+ */
+ public WritePriority getWritePriority() {
+ return writePriority;
+ }
+
+ public UploadFinalizer getUploadFinalizer() {
+ return uploadFinalizer;
+ }
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java
new file mode 100644
index 0000000000000..db51f8c4ff849
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java
@@ -0,0 +1,17 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.stream.write;
+
+/**
+ * WritePriority for upload
+ */
+public enum WritePriority {
+ NORMAL,
+ HIGH
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java
new file mode 100644
index 0000000000000..a2f0e634f0a0a
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java
@@ -0,0 +1,10 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/** Abstractions for stream based file writes */
+package org.opensearch.common.blobstore.stream.write;
diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java
new file mode 100644
index 0000000000000..13d47a1479c00
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java
@@ -0,0 +1,180 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.common.SetOnce;
+import org.opensearch.common.Stream;
+import org.opensearch.common.StreamProvider;
+import org.opensearch.common.TransferPartStreamSupplier;
+import org.opensearch.common.blobstore.stream.StreamContext;
+import org.opensearch.common.blobstore.stream.write.WriteContext;
+import org.opensearch.common.blobstore.stream.write.WritePriority;
+import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
+import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.Objects;
+
+/**
+ * RemoteTransferContainer is an encapsulation for managing transfers for translog and segment files.
+ */
+public class RemoteTransferContainer implements Closeable {
+
+ private int numberOfParts;
+ private long partSize;
+ private long lastPartSize;
+
+ private final long contentLength;
+ private final SetOnce inputStreams = new SetOnce<>();
+ private final String localFileName;
+ private final String remoteFileName;
+ private final boolean failTransferIfFileExists;
+ private final WritePriority writePriority;
+ private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
+
+ private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class);
+
+ /**
+ * Construct a new RemoteTransferContainer object using {@link Path} reference to the file.
+ *
+ * @param localFileName Name of the local file
+ * @param remoteFileName Name of the remote file
+ * @param contentLength Total content length of the file to be uploaded
+ * @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists
+ * @param writePriority The {@link WritePriority} of current upload
+ * @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams
+ */
+ public RemoteTransferContainer(
+ String localFileName,
+ String remoteFileName,
+ long contentLength,
+ boolean failTransferIfFileExists,
+ WritePriority writePriority,
+ OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier
+ ) {
+ this.localFileName = localFileName;
+ this.remoteFileName = remoteFileName;
+ this.contentLength = contentLength;
+ this.failTransferIfFileExists = failTransferIfFileExists;
+ this.writePriority = writePriority;
+ this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier;
+ }
+
+ /**
+ * @return The {@link WriteContext} for the current upload
+ */
+ public WriteContext createWriteContext() {
+ return new WriteContext(
+ remoteFileName,
+ this::supplyStreamContext,
+ contentLength,
+ failTransferIfFileExists,
+ writePriority,
+ this::finalizeUpload
+ );
+ }
+
+ /**
+ * @param partSize Part sizes of all parts apart from the last one, which is determined internally
+ * @return The {@link StreamContext} object that will be used by the vendor plugin to retrieve streams
+ */
+ public StreamContext supplyStreamContext(long partSize) {
+ try {
+ return this.openMultipartStreams(partSize);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private StreamContext openMultipartStreams(long partSize) throws IOException {
+ if (inputStreams.get() != null) {
+ throw new IOException("Multi-part streams are already created.");
+ }
+
+ this.partSize = partSize;
+ this.lastPartSize = (contentLength % partSize) != 0 ? contentLength % partSize : partSize;
+ this.numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
+ ResettableCheckedInputStream[] streams = new ResettableCheckedInputStream[numberOfParts];
+ inputStreams.set(streams);
+
+ return new StreamContext(new StreamProvider(getTransferPartStreamSupplier(), partSize, lastPartSize, numberOfParts), numberOfParts);
+ }
+
+ private TransferPartStreamSupplier getTransferPartStreamSupplier() {
+ return ((partNo, size, position) -> {
+ assert inputStreams.get() != null : "expected inputStreams to be initialised";
+ return getMultipartStreamSupplier(partNo, size, position).get();
+ });
+ }
+
+ /**
+ * OffsetRangeInputStreamSupplier is used to get the offset based input streams at runtime
+ */
+ public interface OffsetRangeInputStreamSupplier {
+ OffsetRangeInputStream get(long size, long position) throws IOException;
+ }
+
+ interface LocalStreamSupplier {
+ Stream get() throws IOException;
+ }
+
+ private LocalStreamSupplier getMultipartStreamSupplier(final int streamIdx, final long size, final long position) {
+ return () -> {
+ try {
+ OffsetRangeInputStream offsetRangeInputStream = offsetRangeInputStreamSupplier.get(size, position);
+ ResettableCheckedInputStream checkedInputStream = new ResettableCheckedInputStream(offsetRangeInputStream, localFileName);
+ Objects.requireNonNull(inputStreams.get())[streamIdx] = checkedInputStream;
+
+ return new Stream(checkedInputStream, size, position);
+ } catch (IOException e) {
+ log.error("Failed to create input stream", e);
+ throw e;
+ }
+ };
+ }
+
+ private void finalizeUpload(boolean uploadSuccessful) {}
+
+ /**
+ * @return The total content length of current upload
+ */
+ public long getContentLength() {
+ return contentLength;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (inputStreams.get() == null) {
+ log.warn("Input streams cannot be closed since they are not yet set for multi stream upload");
+ return;
+ }
+
+ boolean closeStreamException = false;
+ for (InputStream is : Objects.requireNonNull(inputStreams.get())) {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (IOException ex) {
+ closeStreamException = true;
+ // Attempting to close all streams first before throwing exception.
+ log.error("Multipart stream failed to close ", ex);
+ }
+ }
+
+ if (closeStreamException) {
+ throw new IOException("Closure of some of the multi-part streams failed.");
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/UploadFinalizer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/UploadFinalizer.java
new file mode 100644
index 0000000000000..08fc9e69a0a30
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/UploadFinalizer.java
@@ -0,0 +1,16 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer;
+
+/**
+ * UploadFinalizer is an interface with support for a method that will be called once upload is complete
+ */
+public interface UploadFinalizer {
+ void accept(boolean uploadSuccess);
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java
new file mode 100644
index 0000000000000..779b6538401d0
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java
@@ -0,0 +1,10 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/** Contains transfer related utilities for {@link org.opensearch.common.blobstore.BlobContainer} */
+package org.opensearch.common.blobstore.transfer;
diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java
new file mode 100644
index 0000000000000..4c938bc230ee1
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java
@@ -0,0 +1,120 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * OffsetRangeFileInputStream extends InputStream to read from a specified offset using FileChannel
+ */
+public class OffsetRangeFileInputStream extends OffsetRangeInputStream {
+ private final InputStream inputStream;
+ private final FileChannel fileChannel;
+
+ private final long actualSizeToRead;
+ // This is the maximum position till stream is to be read. If read methods exceed maxPos then bytes are read
+ // till maxPos. If no byte is left after maxPos, then -1 is returned from read methods.
+ private final long limit;
+ // Position in stream from which read will start.
+ private long counter = 0;
+
+ private long markPointer;
+ private long markCounter;
+
+ /**
+ * Construct a new OffsetRangeFileInputStream object
+ *
+ * @param path Path to the file
+ * @param size Number of bytes that need to be read from specified position
+ * @param position Position from where the read needs to start
+ * @throws IOException When FileChannel#position
operation fails
+ */
+ public OffsetRangeFileInputStream(Path path, long size, long position) throws IOException {
+ fileChannel = FileChannel.open(path, StandardOpenOption.READ);
+ fileChannel.position(position);
+ inputStream = Channels.newInputStream(fileChannel);
+ long totalLength = fileChannel.size();
+ this.counter = 0;
+ this.limit = size;
+ if ((totalLength - position) > limit) {
+ actualSizeToRead = limit;
+ } else {
+ actualSizeToRead = totalLength - position;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (fileChannel.position() >= fileChannel.size()) {
+ return -1;
+ }
+ if (fileChannel.position() + len > fileChannel.size()) {
+ len = (int) (fileChannel.size() - fileChannel.position());
+ }
+ if (counter + len > limit) {
+ len = (int) (limit - counter);
+ }
+ if (len <= 0) {
+ return -1;
+ }
+
+ inputStream.read(b, off, len);
+ counter += len;
+ return len;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (counter++ >= limit) {
+ return -1;
+ }
+ return (fileChannel.position() < fileChannel.size()) ? (inputStream.read() & 0xff) : -1;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ try {
+ markPointer = fileChannel.position();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ markCounter = counter;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ fileChannel.position(markPointer);
+ counter = markCounter;
+ }
+
+ @Override
+ public long getFilePointer() throws IOException {
+ return fileChannel.position();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ }
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java
new file mode 100644
index 0000000000000..f800a826df22f
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java
@@ -0,0 +1,73 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer.stream;
+
+import org.apache.lucene.store.IndexInput;
+import org.opensearch.common.lucene.store.InputStreamIndexInput;
+
+import java.io.IOException;
+
+/**
+ * OffsetRangeIndexInputStream extends InputStream to read from a specified offset using IndexInput
+ */
+public class OffsetRangeIndexInputStream extends OffsetRangeInputStream {
+
+ private final InputStreamIndexInput inputStreamIndexInput;
+ private final IndexInput indexInput;
+
+ /**
+ * Construct a new OffsetRangeIndexInputStream object
+ *
+ * @param indexInput IndexInput opened on the file to read from
+ * @param size The maximum length to read from specified position
+ * @param position Position from where read needs to start
+ * @throws IOException When IndexInput#seek
operation fails
+ */
+ public OffsetRangeIndexInputStream(IndexInput indexInput, long size, long position) throws IOException {
+ indexInput.seek(position);
+ this.indexInput = indexInput;
+ this.inputStreamIndexInput = new InputStreamIndexInput(indexInput, size);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return inputStreamIndexInput.read(b, off, len);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return inputStreamIndexInput.read();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return inputStreamIndexInput.markSupported();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ inputStreamIndexInput.mark(readlimit);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ inputStreamIndexInput.reset();
+ }
+
+ @Override
+ public long getFilePointer() throws IOException {
+ return indexInput.getFilePointer();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStreamIndexInput.close();
+ indexInput.close();
+ }
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java
new file mode 100644
index 0000000000000..9cc00594b2bb0
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java
@@ -0,0 +1,20 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * OffsetRangeInputStream is an abstract class that extends from {@link InputStream}
+ * and adds a method to get the file pointer to the specific part being read
+ */
+public abstract class OffsetRangeInputStream extends InputStream {
+ public abstract long getFilePointer() throws IOException;
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java
new file mode 100644
index 0000000000000..867dcbef49eeb
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java
@@ -0,0 +1,107 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer.stream;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+
+/**
+ * ResettableCheckedInputStream is a modified implementation of {@link java.util.zip.CheckedInputStream} that supports
+ * mark and reset and modifies the file checksum during mark and reset calls.
+ */
+public class ResettableCheckedInputStream extends FilterInputStream {
+
+ private final long startPos;
+ private final String file;
+ private final OffsetRangeInputStream offsetRangeInputStream;
+
+ /**
+ * Creates a new ResettableCheckedInputStream object
+ *
+ * @param offsetRangeInputStream The underlying input stream
+ * @param file Name of the file
+ */
+ public ResettableCheckedInputStream(OffsetRangeInputStream offsetRangeInputStream, String file) throws IOException {
+ super(offsetRangeInputStream);
+ this.offsetRangeInputStream = offsetRangeInputStream;
+ this.startPos = offsetRangeInputStream.getFilePointer();
+ this.file = file;
+ }
+
+ /**
+ * Reads a byte. Will block if no input is available.
+ * @return the byte read, or -1 if the end of the stream is reached.
+ * @exception IOException if an I/O error has occurred
+ */
+ public int read() throws IOException {
+ byte[] buffer = new byte[1];
+ int len = read(buffer, 0, 1);
+ if (len == -1) {
+ return -1;
+ }
+ return buffer[0];
+ }
+
+ /**
+ * Reads into an array of bytes. If len
is not zero, the method
+ * blocks until some input is available; otherwise, no
+ * bytes are read and 0
is returned.
+ * @param buf the buffer into which the data is read
+ * @param off the start offset in the destination array b
+ * @param len the maximum number of bytes read
+ * @return the actual number of bytes read, or -1 if the end
+ * of the stream is reached.
+ * @exception NullPointerException If buf
is null
.
+ * @exception IndexOutOfBoundsException If off
is negative,
+ * len
is negative, or len
is greater than
+ * buf.length - off
+ * @exception IOException if an I/O error has occurred
+ */
+ public int read(byte[] buf, int off, int len) throws IOException {
+ return in.read(buf, off, len);
+ }
+
+ /**
+ * Skips specified number of bytes of input.
+ * @param n the number of bytes to skip
+ * @return the actual number of bytes skipped
+ * @exception IOException if an I/O error has occurred
+ */
+ public long skip(long n) throws IOException {
+ byte[] buf = new byte[512];
+ long total = 0;
+ while (total < n) {
+ long len = n - total;
+ len = read(buf, 0, len < buf.length ? (int) len : buf.length);
+ if (len == -1) {
+ return total;
+ }
+ total += len;
+ }
+ return total;
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ super.mark(readlimit);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ if (startPos == offsetRangeInputStream.getFilePointer()) {
+ return;
+ }
+ super.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+}
diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java
new file mode 100644
index 0000000000000..c5e7e45b9f015
--- /dev/null
+++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java
@@ -0,0 +1,10 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/** Contains encapsulations to create streams for part uploads */
+package org.opensearch.common.blobstore.transfer.stream;
diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java
new file mode 100644
index 0000000000000..3c0ad6fbf4389
--- /dev/null
+++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java
@@ -0,0 +1,132 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer;
+
+import org.junit.Before;
+import org.opensearch.common.Stream;
+import org.opensearch.common.blobstore.stream.StreamContext;
+import org.opensearch.common.blobstore.stream.write.WritePriority;
+import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream;
+import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
+import org.opensearch.test.OpenSearchTestCase;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+public class RemoteTransferContainerTests extends OpenSearchTestCase {
+
+ private static final int TEST_FILE_SIZE_BYTES = 128;
+
+ private Path testFile;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ testFile = createTempFile();
+ Files.write(testFile, randomByteArrayOfLength(TEST_FILE_SIZE_BYTES), StandardOpenOption.APPEND);
+ }
+
+ public void testSupplyStreamContextForPathDivisibleParts() throws IOException, InterruptedException {
+ try (
+ RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
+ testFile.getFileName().toString(),
+ testFile.getFileName().toString(),
+ TEST_FILE_SIZE_BYTES,
+ true,
+ WritePriority.HIGH,
+ new RemoteTransferContainer.OffsetRangeInputStreamSupplier() {
+ @Override
+ public OffsetRangeInputStream get(long size, long position) throws IOException {
+ return new OffsetRangeFileInputStream(testFile, size, position);
+ }
+ }
+ )
+ ) {
+ testSupplyStreamContext(remoteTransferContainer, 16, 16, 8);
+ }
+ }
+
+ public void testSupplyStreamContextForPathNonDivisibleParts() throws IOException, InterruptedException {
+ try (
+ RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
+ testFile.getFileName().toString(),
+ testFile.getFileName().toString(),
+ TEST_FILE_SIZE_BYTES,
+ true,
+ WritePriority.HIGH,
+ new RemoteTransferContainer.OffsetRangeInputStreamSupplier() {
+ @Override
+ public OffsetRangeInputStream get(long size, long position) throws IOException {
+ return new OffsetRangeFileInputStream(testFile, size, position);
+ }
+ }
+ )
+ ) {
+ testSupplyStreamContext(remoteTransferContainer, 10, 8, 13);
+ }
+ }
+
+ private void testSupplyStreamContext(
+ RemoteTransferContainer remoteTransferContainer,
+ long partSize,
+ long lastPartSize,
+ int expectedPartCount
+ ) throws InterruptedException {
+ StreamContext streamContext = remoteTransferContainer.supplyStreamContext(partSize);
+ int partCount = streamContext.getNumberOfParts();
+ assertEquals(expectedPartCount, partCount);
+ Thread[] threads = new Thread[partCount];
+ long totalContentLength = remoteTransferContainer.getContentLength();
+ assert partSize * (partCount - 1) + lastPartSize == totalContentLength
+ : "part sizes and last part size don't add up to total content length";
+ logger.info("partSize: {}, lastPartSize: {}, partCount: {}", partSize, lastPartSize, streamContext.getNumberOfParts());
+ for (int partIdx = 0; partIdx < partCount; partIdx++) {
+ int finalPartIdx = partIdx;
+ long expectedPartSize = (partIdx == partCount - 1) ? lastPartSize : partSize;
+ long expectedOffset = partIdx * partSize;
+ threads[partIdx] = new Thread(() -> {
+ try {
+ Stream stream = streamContext.getStreamProvider().provideStream(finalPartIdx);
+ assertEquals(expectedPartSize, stream.getContentLength());
+ assertEquals(expectedOffset, stream.getOffset());
+ } catch (IOException e) {
+ fail("IOException during stream creation");
+ }
+ });
+ threads[partIdx].start();
+ }
+ for (int i = 0; i < partCount; i++) {
+ threads[i].join();
+ }
+ }
+
+ public void testSupplyStreamContextCalledTwice() throws IOException {
+ try (
+ RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
+ testFile.getFileName().toString(),
+ testFile.getFileName().toString(),
+ TEST_FILE_SIZE_BYTES,
+ true,
+ WritePriority.HIGH,
+ new RemoteTransferContainer.OffsetRangeInputStreamSupplier() {
+ @Override
+ public OffsetRangeInputStream get(long size, long position) throws IOException {
+ return new OffsetRangeFileInputStream(testFile, size, position);
+ }
+ }
+ )
+ ) {
+ remoteTransferContainer.supplyStreamContext(16);
+ assertThrows(RuntimeException.class, () -> remoteTransferContainer.supplyStreamContext(16));
+ }
+ }
+}
diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java
new file mode 100644
index 0000000000000..f7e093f8357cc
--- /dev/null
+++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java
@@ -0,0 +1,19 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer.stream;
+
+import java.io.IOException;
+
+public class OffsetRangeFileInputStreamTests extends ResettableCheckedInputStreamBaseTest {
+
+ @Override
+ protected OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException {
+ return new OffsetRangeFileInputStream(testFile, size, position);
+ }
+}
diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java
new file mode 100644
index 0000000000000..7381d49bcdf99
--- /dev/null
+++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java
@@ -0,0 +1,41 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer.stream;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+
+public class OffsetRangeIndexInputStreamTests extends ResettableCheckedInputStreamBaseTest {
+
+ private Directory directory;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ directory = new NIOFSDirectory(testFile.getParent());
+ }
+
+ @Override
+ protected OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException {
+ return new OffsetRangeIndexInputStream(directory.openInput(testFile.getFileName().toString(), IOContext.DEFAULT), size, position);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ directory.close();
+ super.tearDown();
+ }
+}
diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java
new file mode 100644
index 0000000000000..62b6a7d1ea972
--- /dev/null
+++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java
@@ -0,0 +1,177 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.common.blobstore.transfer.stream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.opensearch.test.OpenSearchTestCase;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+public abstract class ResettableCheckedInputStreamBaseTest extends OpenSearchTestCase {
+
+ private static final int TEST_FILE_SIZE_BYTES = 10;
+
+ private final byte[] bytesToWrite = randomByteArrayOfLength(TEST_FILE_SIZE_BYTES);
+ protected Path testFile;
+ private ResettableCheckedInputStream[] resettableCheckedInputStreams;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ testFile = createTempFile();
+ Files.write(testFile, bytesToWrite, StandardOpenOption.TRUNCATE_EXISTING);
+ }
+
+ protected abstract OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException;
+
+ public void testReadSingleByte() throws IOException, InterruptedException {
+ final int nParallelReads = 10;
+ Thread[] threads = new Thread[nParallelReads];
+ resettableCheckedInputStreams = new ResettableCheckedInputStream[nParallelReads];
+ for (int readIdx = 0; readIdx < nParallelReads; readIdx++) {
+ int offset = randomInt(TEST_FILE_SIZE_BYTES - 1);
+ OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(1, offset);
+ ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream(
+ offsetRangeInputStream,
+ testFile.getFileName().toString()
+ );
+ resettableCheckedInputStreams[readIdx] = resettableCheckedInputStream;
+ threads[readIdx] = new Thread(() -> {
+ try {
+ assertEquals(bytesToWrite[offset], resettableCheckedInputStream.read());
+ } catch (IOException e) {
+ fail("Failure while reading single byte from offset stream");
+ }
+ });
+ threads[readIdx].start();
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ public void testReadMultipleBytes() throws IOException, InterruptedException {
+ final int nParallelReads = 10;
+ Thread[] threads = new Thread[nParallelReads];
+ resettableCheckedInputStreams = new ResettableCheckedInputStream[nParallelReads];
+ for (int readIdx = 0; readIdx < nParallelReads; readIdx++) {
+ int readByteCount = randomInt(TEST_FILE_SIZE_BYTES - 1) + 1;
+ int offset = randomInt(TEST_FILE_SIZE_BYTES - readByteCount);
+ OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(readByteCount, offset);
+ ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream(
+ offsetRangeInputStream,
+ testFile.getFileName().toString()
+ );
+ resettableCheckedInputStreams[readIdx] = resettableCheckedInputStream;
+ threads[readIdx] = new Thread(() -> {
+ try {
+ byte[] buffer = new byte[readByteCount];
+ int bytesRead = resettableCheckedInputStream.read(buffer, 0, readByteCount);
+ assertEquals(readByteCount, bytesRead);
+ for (int bufferIdx = 0; bufferIdx < readByteCount; bufferIdx++) {
+ assertEquals(bytesToWrite[offset + bufferIdx], buffer[bufferIdx]);
+ }
+ } catch (IOException e) {
+ fail("Failure while reading bytes from offset stream");
+ }
+ });
+ threads[readIdx].start();
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ public void testMarkAndReset() throws IOException, InterruptedException {
+ final int nParallelReads = 100;
+ Thread[] threads = new Thread[nParallelReads];
+ resettableCheckedInputStreams = new ResettableCheckedInputStream[nParallelReads];
+ for (int readIdx = 0; readIdx < nParallelReads; readIdx++) {
+ int readByteCount = randomInt(TEST_FILE_SIZE_BYTES - 1) + 1;
+ int offset = randomInt(TEST_FILE_SIZE_BYTES - readByteCount);
+ OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(readByteCount, offset);
+ ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream(
+ offsetRangeInputStream,
+ testFile.getFileName().toString()
+ );
+ resettableCheckedInputStreams[readIdx] = resettableCheckedInputStream;
+ int finalRunIdx = readIdx;
+ threads[readIdx] = new Thread(() -> {
+ try {
+ boolean streamMarked = false;
+ long streamMarkPosition = -1;
+ for (int byteIdx = 0; byteIdx < readByteCount - 1; byteIdx++) {
+ resettableCheckedInputStream.read();
+ if (!streamMarked && randomBoolean()) {
+ streamMarked = true;
+ streamMarkPosition = offsetRangeInputStream.getFilePointer();
+ resettableCheckedInputStream.mark(readByteCount);
+ }
+ }
+ if (!streamMarked) {
+ streamMarkPosition = offsetRangeInputStream.getFilePointer();
+ resettableCheckedInputStream.mark(readByteCount);
+ }
+ resettableCheckedInputStream.reset();
+ assertEquals(bytesToWrite[(int) streamMarkPosition], resettableCheckedInputStream.read());
+ } catch (IOException e) {
+ fail("Failure while reading bytes from offset stream");
+ }
+ });
+ threads[readIdx].start();
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ public void testReadAfterSkip() throws IOException {
+ OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(TEST_FILE_SIZE_BYTES, 0);
+ ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream(
+ offsetRangeInputStream,
+ testFile.getFileName().toString()
+ );
+ resettableCheckedInputStreams = new ResettableCheckedInputStream[] { resettableCheckedInputStream };
+
+ long skipBytes = randomLongBetween(1, TEST_FILE_SIZE_BYTES - 1);
+ long actualBytesSkipped = resettableCheckedInputStream.skip(skipBytes);
+ assertEquals(skipBytes, actualBytesSkipped);
+ assertEquals(bytesToWrite[(int) skipBytes], resettableCheckedInputStream.read());
+ }
+
+ public void testReadLastByte() throws IOException {
+ OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(TEST_FILE_SIZE_BYTES, 0);
+ ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream(
+ offsetRangeInputStream,
+ testFile.getFileName().toString()
+ );
+ resettableCheckedInputStreams = new ResettableCheckedInputStream[] { resettableCheckedInputStream };
+
+ long skipBytes = TEST_FILE_SIZE_BYTES;
+ long actualBytesSkipped = resettableCheckedInputStream.skip(skipBytes);
+ assertEquals(skipBytes, actualBytesSkipped);
+ assertEquals(-1, resettableCheckedInputStream.read());
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ for (ResettableCheckedInputStream resettableCheckedInputStream : resettableCheckedInputStreams) {
+ if (resettableCheckedInputStream != null) {
+ resettableCheckedInputStream.close();
+ }
+ }
+ super.tearDown();
+ }
+}