From 29b6dcac28a4b165414bd18488ce6ecefb991e90 Mon Sep 17 00:00:00 2001 From: Raghuvansh Raj Date: Tue, 4 Apr 2023 19:40:40 +0530 Subject: [PATCH] Support to emit multiple streams for a file content each responsible for processing a specific part of the file Signed-off-by: Raghuvansh Raj --- CHANGELOG.md | 3 +- .../java/org/opensearch/common/Stream.java | 55 ++++++ .../org/opensearch/common/StreamProvider.java | 47 +++++ .../common/TransferPartStreamSupplier.java | 19 ++ .../blobstore/stream/StreamContext.java | 45 +++++ .../common/blobstore/stream/package-info.java | 10 + .../stream/write/StreamContextSupplier.java | 23 +++ .../blobstore/stream/write/WriteContext.java | 90 +++++++++ .../blobstore/stream/write/WritePriority.java | 17 ++ .../blobstore/stream/write/package-info.java | 10 + .../transfer/RemoteTransferContainer.java | 180 ++++++++++++++++++ .../blobstore/transfer/UploadFinalizer.java | 16 ++ .../blobstore/transfer/package-info.java | 10 + .../stream/OffsetRangeFileInputStream.java | 120 ++++++++++++ .../stream/OffsetRangeIndexInputStream.java | 73 +++++++ .../stream/OffsetRangeInputStream.java | 20 ++ .../stream/ResettableCheckedInputStream.java | 107 +++++++++++ .../transfer/stream/package-info.java | 10 + .../RemoteTransferContainerTests.java | 132 +++++++++++++ .../OffsetRangeFileInputStreamTests.java | 19 ++ .../OffsetRangeIndexInputStreamTests.java | 41 ++++ .../ResettableCheckedInputStreamBaseTest.java | 177 +++++++++++++++++ 22 files changed, 1223 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/common/Stream.java create mode 100644 server/src/main/java/org/opensearch/common/StreamProvider.java create mode 100644 server/src/main/java/org/opensearch/common/TransferPartStreamSupplier.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/StreamContext.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/UploadFinalizer.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java create mode 100644 server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java create mode 100644 server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java create mode 100644 server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java create mode 100644 server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e4fe8736e8588..3732a0d83877c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834)) - Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577)) - [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644)) +- [Remote Store] Support to emit multiple streams for a file content each responsible for processing a specific part of the file ([#6977](https://github.com/opensearch-project/OpenSearch/pull/6977)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) @@ -133,4 +134,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x diff --git a/server/src/main/java/org/opensearch/common/Stream.java b/server/src/main/java/org/opensearch/common/Stream.java new file mode 100644 index 0000000000000..ab5666f5819fc --- /dev/null +++ b/server/src/main/java/org/opensearch/common/Stream.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +import java.io.InputStream; + +/** + * Model composed of an input stream, the total content length and offset + */ +public class Stream { + + private final InputStream inputStream; + private final long contentLength; + private final long offset; + + /** + * Construct a new stream object + * + * @param inputStream The input stream that is to be encapsulated + * @param contentLength The total content length that is to be read from the stream + * @param offset The offset pointer that this stream reads from in the file + */ + public Stream(InputStream inputStream, long contentLength, long offset) { + this.inputStream = inputStream; + this.contentLength = contentLength; + this.offset = offset; + } + + /** + * @return The input stream this object is reading from + */ + public InputStream getInputStream() { + return inputStream; + } + + /** + * @return The total length of the content that has to be read from this stream + */ + public long getContentLength() { + return contentLength; + } + + /** + * @return The offset pointer in the file that this stream is reading from + */ + public long getOffset() { + return offset; + } +} diff --git a/server/src/main/java/org/opensearch/common/StreamProvider.java b/server/src/main/java/org/opensearch/common/StreamProvider.java new file mode 100644 index 0000000000000..b91d83582c295 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/StreamProvider.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +import java.io.IOException; + +/** + * StreamProvider is used to supply streams to vendor plugins using StreamProvider#provideStream + */ +public class StreamProvider { + + private final TransferPartStreamSupplier streamSupplier; + private final long partSize; + private final long lastPartSize; + private final int numOfParts; + + /** + * Construct a new StreamProvider object + * + * @param streamSupplier An implementation of TransferPartStreamSupplier which will be called when provideStreams is called + * @param partSize Size of all parts apart from the last one + * @param lastPartSize Size of the last part + * @param numOfParts Total number of parts + */ + public StreamProvider(TransferPartStreamSupplier streamSupplier, long partSize, long lastPartSize, int numOfParts) { + this.streamSupplier = streamSupplier; + this.partSize = partSize; + this.lastPartSize = lastPartSize; + this.numOfParts = numOfParts; + } + + /** + * @param partNumber The index of the part + * @return A stream reference to the part requested + */ + public Stream provideStream(int partNumber) throws IOException { + long position = partSize * partNumber; + long size = (partNumber == numOfParts - 1) ? lastPartSize : partSize; + return streamSupplier.supply(partNumber, size, position); + } +} diff --git a/server/src/main/java/org/opensearch/common/TransferPartStreamSupplier.java b/server/src/main/java/org/opensearch/common/TransferPartStreamSupplier.java new file mode 100644 index 0000000000000..7bf01f3ca174c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/TransferPartStreamSupplier.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +import java.io.IOException; + +/** + * TransferPartStreamSupplier is used to supply streams to specific parts of a file based on + * the partNo, size and position (the offset in the file) + */ +public interface TransferPartStreamSupplier { + Stream supply(int partNo, long size, long position) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/StreamContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/StreamContext.java new file mode 100644 index 0000000000000..60696592ef640 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/StreamContext.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream; + +import org.opensearch.common.StreamProvider; + +/** + * StreamContext encapsulates all the data required for uploading multiple streams + */ +public class StreamContext { + + private final StreamProvider streamProvider; + private final int numberOfParts; + + /** + * Construct a new StreamContext object + * + * @param streamProvider A stream provider to provide a stream for a given part number. + * @param numberOfParts Number of parts of the content referenced by equivalent number of streams. + */ + public StreamContext(StreamProvider streamProvider, int numberOfParts) { + this.streamProvider = streamProvider; + this.numberOfParts = numberOfParts; + } + + /** + * @return The stream iterable for the current upload + */ + public StreamProvider getStreamProvider() { + return streamProvider; + } + + /** + * @return The number of parts in current upload + */ + public int getNumberOfParts() { + return numberOfParts; + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java new file mode 100644 index 0000000000000..88d6d433d837f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Abstractions for stream based file transfers */ +package org.opensearch.common.blobstore.stream; diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java new file mode 100644 index 0000000000000..3a718d08b5a95 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream.write; + +import org.opensearch.common.blobstore.stream.StreamContext; + +/** + * Will return the StreamContext to the caller given the part size + */ +public interface StreamContextSupplier { + + /** + * @param partSize The size of a single part to be uploaded + * @return The StreamContext based on the part size provided + */ + StreamContext supplyStreamContext(long partSize); +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java new file mode 100644 index 0000000000000..5b04d4b617fec --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream.write; + +import org.opensearch.common.blobstore.stream.StreamContext; +import org.opensearch.common.blobstore.transfer.UploadFinalizer; + +/** + * WriteContext is used to encapsulate all data needed by BlobContainer#writeStreams + */ +public class WriteContext { + + private final String fileName; + private final StreamContextSupplier streamContextSupplier; + private final long fileSize; + private final boolean failIfAlreadyExists; + private final WritePriority writePriority; + private final UploadFinalizer uploadFinalizer; + + /** + * Construct a new WriteContext object + * + * @param fileName The name of the file being uploaded + * @param streamContextSupplier A supplier that will provide StreamContext to the plugin + * @param fileSize The total size of the file being uploaded + * @param failIfAlreadyExists A boolean to fail the upload is the file exists + * @param writePriority The WritePriority of this upload + */ + public WriteContext( + String fileName, + StreamContextSupplier streamContextSupplier, + long fileSize, + boolean failIfAlreadyExists, + WritePriority writePriority, + UploadFinalizer uploadFinalizer + ) { + this.fileName = fileName; + this.streamContextSupplier = streamContextSupplier; + this.fileSize = fileSize; + this.failIfAlreadyExists = failIfAlreadyExists; + this.writePriority = writePriority; + this.uploadFinalizer = uploadFinalizer; + } + + /** + * @return The file name + */ + public String getFileName() { + return fileName; + } + + /** + * @return The boolean representing whether to fail the file upload if it exists + */ + public boolean isFailIfAlreadyExists() { + return failIfAlreadyExists; + } + + /** + * @param partSize The size of a single part to be uploaded + * @return The stream context which will be used by the plugin to initialize streams from the file + */ + public StreamContext getStreamContext(long partSize) { + return streamContextSupplier.supplyStreamContext(partSize); + } + + /** + * @return The total size of the file + */ + public long getFileSize() { + return fileSize; + } + + /** + * @return The WritePriority of the upload + */ + public WritePriority getWritePriority() { + return writePriority; + } + + public UploadFinalizer getUploadFinalizer() { + return uploadFinalizer; + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java new file mode 100644 index 0000000000000..db51f8c4ff849 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream.write; + +/** + * WritePriority for upload + */ +public enum WritePriority { + NORMAL, + HIGH +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java new file mode 100644 index 0000000000000..a2f0e634f0a0a --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Abstractions for stream based file writes */ +package org.opensearch.common.blobstore.stream.write; diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java new file mode 100644 index 0000000000000..13d47a1479c00 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.SetOnce; +import org.opensearch.common.Stream; +import org.opensearch.common.StreamProvider; +import org.opensearch.common.TransferPartStreamSupplier; +import org.opensearch.common.blobstore.stream.StreamContext; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.Objects; + +/** + * RemoteTransferContainer is an encapsulation for managing transfers for translog and segment files. + */ +public class RemoteTransferContainer implements Closeable { + + private int numberOfParts; + private long partSize; + private long lastPartSize; + + private final long contentLength; + private final SetOnce inputStreams = new SetOnce<>(); + private final String localFileName; + private final String remoteFileName; + private final boolean failTransferIfFileExists; + private final WritePriority writePriority; + private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; + + private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); + + /** + * Construct a new RemoteTransferContainer object using {@link Path} reference to the file. + * + * @param localFileName Name of the local file + * @param remoteFileName Name of the remote file + * @param contentLength Total content length of the file to be uploaded + * @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists + * @param writePriority The {@link WritePriority} of current upload + * @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams + */ + public RemoteTransferContainer( + String localFileName, + String remoteFileName, + long contentLength, + boolean failTransferIfFileExists, + WritePriority writePriority, + OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier + ) { + this.localFileName = localFileName; + this.remoteFileName = remoteFileName; + this.contentLength = contentLength; + this.failTransferIfFileExists = failTransferIfFileExists; + this.writePriority = writePriority; + this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; + } + + /** + * @return The {@link WriteContext} for the current upload + */ + public WriteContext createWriteContext() { + return new WriteContext( + remoteFileName, + this::supplyStreamContext, + contentLength, + failTransferIfFileExists, + writePriority, + this::finalizeUpload + ); + } + + /** + * @param partSize Part sizes of all parts apart from the last one, which is determined internally + * @return The {@link StreamContext} object that will be used by the vendor plugin to retrieve streams + */ + public StreamContext supplyStreamContext(long partSize) { + try { + return this.openMultipartStreams(partSize); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private StreamContext openMultipartStreams(long partSize) throws IOException { + if (inputStreams.get() != null) { + throw new IOException("Multi-part streams are already created."); + } + + this.partSize = partSize; + this.lastPartSize = (contentLength % partSize) != 0 ? contentLength % partSize : partSize; + this.numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); + ResettableCheckedInputStream[] streams = new ResettableCheckedInputStream[numberOfParts]; + inputStreams.set(streams); + + return new StreamContext(new StreamProvider(getTransferPartStreamSupplier(), partSize, lastPartSize, numberOfParts), numberOfParts); + } + + private TransferPartStreamSupplier getTransferPartStreamSupplier() { + return ((partNo, size, position) -> { + assert inputStreams.get() != null : "expected inputStreams to be initialised"; + return getMultipartStreamSupplier(partNo, size, position).get(); + }); + } + + /** + * OffsetRangeInputStreamSupplier is used to get the offset based input streams at runtime + */ + public interface OffsetRangeInputStreamSupplier { + OffsetRangeInputStream get(long size, long position) throws IOException; + } + + interface LocalStreamSupplier { + Stream get() throws IOException; + } + + private LocalStreamSupplier getMultipartStreamSupplier(final int streamIdx, final long size, final long position) { + return () -> { + try { + OffsetRangeInputStream offsetRangeInputStream = offsetRangeInputStreamSupplier.get(size, position); + ResettableCheckedInputStream checkedInputStream = new ResettableCheckedInputStream(offsetRangeInputStream, localFileName); + Objects.requireNonNull(inputStreams.get())[streamIdx] = checkedInputStream; + + return new Stream(checkedInputStream, size, position); + } catch (IOException e) { + log.error("Failed to create input stream", e); + throw e; + } + }; + } + + private void finalizeUpload(boolean uploadSuccessful) {} + + /** + * @return The total content length of current upload + */ + public long getContentLength() { + return contentLength; + } + + @Override + public void close() throws IOException { + if (inputStreams.get() == null) { + log.warn("Input streams cannot be closed since they are not yet set for multi stream upload"); + return; + } + + boolean closeStreamException = false; + for (InputStream is : Objects.requireNonNull(inputStreams.get())) { + try { + if (is != null) { + is.close(); + } + } catch (IOException ex) { + closeStreamException = true; + // Attempting to close all streams first before throwing exception. + log.error("Multipart stream failed to close ", ex); + } + } + + if (closeStreamException) { + throw new IOException("Closure of some of the multi-part streams failed."); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/UploadFinalizer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/UploadFinalizer.java new file mode 100644 index 0000000000000..08fc9e69a0a30 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/UploadFinalizer.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer; + +/** + * UploadFinalizer is an interface with support for a method that will be called once upload is complete + */ +public interface UploadFinalizer { + void accept(boolean uploadSuccess); +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java new file mode 100644 index 0000000000000..779b6538401d0 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Contains transfer related utilities for {@link org.opensearch.common.blobstore.BlobContainer} */ +package org.opensearch.common.blobstore.transfer; diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java new file mode 100644 index 0000000000000..4c938bc230ee1 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * OffsetRangeFileInputStream extends InputStream to read from a specified offset using FileChannel + */ +public class OffsetRangeFileInputStream extends OffsetRangeInputStream { + private final InputStream inputStream; + private final FileChannel fileChannel; + + private final long actualSizeToRead; + // This is the maximum position till stream is to be read. If read methods exceed maxPos then bytes are read + // till maxPos. If no byte is left after maxPos, then -1 is returned from read methods. + private final long limit; + // Position in stream from which read will start. + private long counter = 0; + + private long markPointer; + private long markCounter; + + /** + * Construct a new OffsetRangeFileInputStream object + * + * @param path Path to the file + * @param size Number of bytes that need to be read from specified position + * @param position Position from where the read needs to start + * @throws IOException When FileChannel#position operation fails + */ + public OffsetRangeFileInputStream(Path path, long size, long position) throws IOException { + fileChannel = FileChannel.open(path, StandardOpenOption.READ); + fileChannel.position(position); + inputStream = Channels.newInputStream(fileChannel); + long totalLength = fileChannel.size(); + this.counter = 0; + this.limit = size; + if ((totalLength - position) > limit) { + actualSizeToRead = limit; + } else { + actualSizeToRead = totalLength - position; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (fileChannel.position() >= fileChannel.size()) { + return -1; + } + if (fileChannel.position() + len > fileChannel.size()) { + len = (int) (fileChannel.size() - fileChannel.position()); + } + if (counter + len > limit) { + len = (int) (limit - counter); + } + if (len <= 0) { + return -1; + } + + inputStream.read(b, off, len); + counter += len; + return len; + } + + @Override + public int read() throws IOException { + if (counter++ >= limit) { + return -1; + } + return (fileChannel.position() < fileChannel.size()) ? (inputStream.read() & 0xff) : -1; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public synchronized void mark(int readlimit) { + try { + markPointer = fileChannel.position(); + } catch (IOException e) { + throw new RuntimeException(e); + } + markCounter = counter; + } + + @Override + public synchronized void reset() throws IOException { + fileChannel.position(markPointer); + counter = markCounter; + } + + @Override + public long getFilePointer() throws IOException { + return fileChannel.position(); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java new file mode 100644 index 0000000000000..f800a826df22f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.apache.lucene.store.IndexInput; +import org.opensearch.common.lucene.store.InputStreamIndexInput; + +import java.io.IOException; + +/** + * OffsetRangeIndexInputStream extends InputStream to read from a specified offset using IndexInput + */ +public class OffsetRangeIndexInputStream extends OffsetRangeInputStream { + + private final InputStreamIndexInput inputStreamIndexInput; + private final IndexInput indexInput; + + /** + * Construct a new OffsetRangeIndexInputStream object + * + * @param indexInput IndexInput opened on the file to read from + * @param size The maximum length to read from specified position + * @param position Position from where read needs to start + * @throws IOException When IndexInput#seek operation fails + */ + public OffsetRangeIndexInputStream(IndexInput indexInput, long size, long position) throws IOException { + indexInput.seek(position); + this.indexInput = indexInput; + this.inputStreamIndexInput = new InputStreamIndexInput(indexInput, size); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputStreamIndexInput.read(b, off, len); + } + + @Override + public int read() throws IOException { + return inputStreamIndexInput.read(); + } + + @Override + public boolean markSupported() { + return inputStreamIndexInput.markSupported(); + } + + @Override + public synchronized void mark(int readlimit) { + inputStreamIndexInput.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + inputStreamIndexInput.reset(); + } + + @Override + public long getFilePointer() throws IOException { + return indexInput.getFilePointer(); + } + + @Override + public void close() throws IOException { + inputStreamIndexInput.close(); + indexInput.close(); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java new file mode 100644 index 0000000000000..9cc00594b2bb0 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import java.io.IOException; +import java.io.InputStream; + +/** + * OffsetRangeInputStream is an abstract class that extends from {@link InputStream} + * and adds a method to get the file pointer to the specific part being read + */ +public abstract class OffsetRangeInputStream extends InputStream { + public abstract long getFilePointer() throws IOException; +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java new file mode 100644 index 0000000000000..867dcbef49eeb --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import java.io.FilterInputStream; +import java.io.IOException; + +/** + * ResettableCheckedInputStream is a modified implementation of {@link java.util.zip.CheckedInputStream} that supports + * mark and reset and modifies the file checksum during mark and reset calls. + */ +public class ResettableCheckedInputStream extends FilterInputStream { + + private final long startPos; + private final String file; + private final OffsetRangeInputStream offsetRangeInputStream; + + /** + * Creates a new ResettableCheckedInputStream object + * + * @param offsetRangeInputStream The underlying input stream + * @param file Name of the file + */ + public ResettableCheckedInputStream(OffsetRangeInputStream offsetRangeInputStream, String file) throws IOException { + super(offsetRangeInputStream); + this.offsetRangeInputStream = offsetRangeInputStream; + this.startPos = offsetRangeInputStream.getFilePointer(); + this.file = file; + } + + /** + * Reads a byte. Will block if no input is available. + * @return the byte read, or -1 if the end of the stream is reached. + * @exception IOException if an I/O error has occurred + */ + public int read() throws IOException { + byte[] buffer = new byte[1]; + int len = read(buffer, 0, 1); + if (len == -1) { + return -1; + } + return buffer[0]; + } + + /** + * Reads into an array of bytes. If len is not zero, the method + * blocks until some input is available; otherwise, no + * bytes are read and 0 is returned. + * @param buf the buffer into which the data is read + * @param off the start offset in the destination array b + * @param len the maximum number of bytes read + * @return the actual number of bytes read, or -1 if the end + * of the stream is reached. + * @exception NullPointerException If buf is null. + * @exception IndexOutOfBoundsException If off is negative, + * len is negative, or len is greater than + * buf.length - off + * @exception IOException if an I/O error has occurred + */ + public int read(byte[] buf, int off, int len) throws IOException { + return in.read(buf, off, len); + } + + /** + * Skips specified number of bytes of input. + * @param n the number of bytes to skip + * @return the actual number of bytes skipped + * @exception IOException if an I/O error has occurred + */ + public long skip(long n) throws IOException { + byte[] buf = new byte[512]; + long total = 0; + while (total < n) { + long len = n - total; + len = read(buf, 0, len < buf.length ? (int) len : buf.length); + if (len == -1) { + return total; + } + total += len; + } + return total; + } + + @Override + public synchronized void mark(int readlimit) { + super.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + if (startPos == offsetRangeInputStream.getFilePointer()) { + return; + } + super.reset(); + } + + @Override + public void close() throws IOException { + super.close(); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java new file mode 100644 index 0000000000000..c5e7e45b9f015 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Contains encapsulations to create streams for part uploads */ +package org.opensearch.common.blobstore.transfer.stream; diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java new file mode 100644 index 0000000000000..3c0ad6fbf4389 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer; + +import org.junit.Before; +import org.opensearch.common.Stream; +import org.opensearch.common.blobstore.stream.StreamContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class RemoteTransferContainerTests extends OpenSearchTestCase { + + private static final int TEST_FILE_SIZE_BYTES = 128; + + private Path testFile; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(TEST_FILE_SIZE_BYTES), StandardOpenOption.APPEND); + } + + public void testSupplyStreamContextForPathDivisibleParts() throws IOException, InterruptedException { + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + testFile.getFileName().toString(), + testFile.getFileName().toString(), + TEST_FILE_SIZE_BYTES, + true, + WritePriority.HIGH, + new RemoteTransferContainer.OffsetRangeInputStreamSupplier() { + @Override + public OffsetRangeInputStream get(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } + } + ) + ) { + testSupplyStreamContext(remoteTransferContainer, 16, 16, 8); + } + } + + public void testSupplyStreamContextForPathNonDivisibleParts() throws IOException, InterruptedException { + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + testFile.getFileName().toString(), + testFile.getFileName().toString(), + TEST_FILE_SIZE_BYTES, + true, + WritePriority.HIGH, + new RemoteTransferContainer.OffsetRangeInputStreamSupplier() { + @Override + public OffsetRangeInputStream get(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } + } + ) + ) { + testSupplyStreamContext(remoteTransferContainer, 10, 8, 13); + } + } + + private void testSupplyStreamContext( + RemoteTransferContainer remoteTransferContainer, + long partSize, + long lastPartSize, + int expectedPartCount + ) throws InterruptedException { + StreamContext streamContext = remoteTransferContainer.supplyStreamContext(partSize); + int partCount = streamContext.getNumberOfParts(); + assertEquals(expectedPartCount, partCount); + Thread[] threads = new Thread[partCount]; + long totalContentLength = remoteTransferContainer.getContentLength(); + assert partSize * (partCount - 1) + lastPartSize == totalContentLength + : "part sizes and last part size don't add up to total content length"; + logger.info("partSize: {}, lastPartSize: {}, partCount: {}", partSize, lastPartSize, streamContext.getNumberOfParts()); + for (int partIdx = 0; partIdx < partCount; partIdx++) { + int finalPartIdx = partIdx; + long expectedPartSize = (partIdx == partCount - 1) ? lastPartSize : partSize; + long expectedOffset = partIdx * partSize; + threads[partIdx] = new Thread(() -> { + try { + Stream stream = streamContext.getStreamProvider().provideStream(finalPartIdx); + assertEquals(expectedPartSize, stream.getContentLength()); + assertEquals(expectedOffset, stream.getOffset()); + } catch (IOException e) { + fail("IOException during stream creation"); + } + }); + threads[partIdx].start(); + } + for (int i = 0; i < partCount; i++) { + threads[i].join(); + } + } + + public void testSupplyStreamContextCalledTwice() throws IOException { + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + testFile.getFileName().toString(), + testFile.getFileName().toString(), + TEST_FILE_SIZE_BYTES, + true, + WritePriority.HIGH, + new RemoteTransferContainer.OffsetRangeInputStreamSupplier() { + @Override + public OffsetRangeInputStream get(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } + } + ) + ) { + remoteTransferContainer.supplyStreamContext(16); + assertThrows(RuntimeException.class, () -> remoteTransferContainer.supplyStreamContext(16)); + } + } +} diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java new file mode 100644 index 0000000000000..f7e093f8357cc --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import java.io.IOException; + +public class OffsetRangeFileInputStreamTests extends ResettableCheckedInputStreamBaseTest { + + @Override + protected OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } +} diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java new file mode 100644 index 0000000000000..7381d49bcdf99 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.NIOFSDirectory; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +public class OffsetRangeIndexInputStreamTests extends ResettableCheckedInputStreamBaseTest { + + private Directory directory; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + directory = new NIOFSDirectory(testFile.getParent()); + } + + @Override + protected OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException { + return new OffsetRangeIndexInputStream(directory.openInput(testFile.getFileName().toString(), IOContext.DEFAULT), size, position); + } + + @Override + @After + public void tearDown() throws Exception { + directory.close(); + super.tearDown(); + } +} diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java new file mode 100644 index 0000000000000..62b6a7d1ea972 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java @@ -0,0 +1,177 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public abstract class ResettableCheckedInputStreamBaseTest extends OpenSearchTestCase { + + private static final int TEST_FILE_SIZE_BYTES = 10; + + private final byte[] bytesToWrite = randomByteArrayOfLength(TEST_FILE_SIZE_BYTES); + protected Path testFile; + private ResettableCheckedInputStream[] resettableCheckedInputStreams; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + testFile = createTempFile(); + Files.write(testFile, bytesToWrite, StandardOpenOption.TRUNCATE_EXISTING); + } + + protected abstract OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException; + + public void testReadSingleByte() throws IOException, InterruptedException { + final int nParallelReads = 10; + Thread[] threads = new Thread[nParallelReads]; + resettableCheckedInputStreams = new ResettableCheckedInputStream[nParallelReads]; + for (int readIdx = 0; readIdx < nParallelReads; readIdx++) { + int offset = randomInt(TEST_FILE_SIZE_BYTES - 1); + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(1, offset); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams[readIdx] = resettableCheckedInputStream; + threads[readIdx] = new Thread(() -> { + try { + assertEquals(bytesToWrite[offset], resettableCheckedInputStream.read()); + } catch (IOException e) { + fail("Failure while reading single byte from offset stream"); + } + }); + threads[readIdx].start(); + } + for (Thread thread : threads) { + thread.join(); + } + } + + public void testReadMultipleBytes() throws IOException, InterruptedException { + final int nParallelReads = 10; + Thread[] threads = new Thread[nParallelReads]; + resettableCheckedInputStreams = new ResettableCheckedInputStream[nParallelReads]; + for (int readIdx = 0; readIdx < nParallelReads; readIdx++) { + int readByteCount = randomInt(TEST_FILE_SIZE_BYTES - 1) + 1; + int offset = randomInt(TEST_FILE_SIZE_BYTES - readByteCount); + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(readByteCount, offset); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams[readIdx] = resettableCheckedInputStream; + threads[readIdx] = new Thread(() -> { + try { + byte[] buffer = new byte[readByteCount]; + int bytesRead = resettableCheckedInputStream.read(buffer, 0, readByteCount); + assertEquals(readByteCount, bytesRead); + for (int bufferIdx = 0; bufferIdx < readByteCount; bufferIdx++) { + assertEquals(bytesToWrite[offset + bufferIdx], buffer[bufferIdx]); + } + } catch (IOException e) { + fail("Failure while reading bytes from offset stream"); + } + }); + threads[readIdx].start(); + } + for (Thread thread : threads) { + thread.join(); + } + } + + public void testMarkAndReset() throws IOException, InterruptedException { + final int nParallelReads = 100; + Thread[] threads = new Thread[nParallelReads]; + resettableCheckedInputStreams = new ResettableCheckedInputStream[nParallelReads]; + for (int readIdx = 0; readIdx < nParallelReads; readIdx++) { + int readByteCount = randomInt(TEST_FILE_SIZE_BYTES - 1) + 1; + int offset = randomInt(TEST_FILE_SIZE_BYTES - readByteCount); + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(readByteCount, offset); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams[readIdx] = resettableCheckedInputStream; + int finalRunIdx = readIdx; + threads[readIdx] = new Thread(() -> { + try { + boolean streamMarked = false; + long streamMarkPosition = -1; + for (int byteIdx = 0; byteIdx < readByteCount - 1; byteIdx++) { + resettableCheckedInputStream.read(); + if (!streamMarked && randomBoolean()) { + streamMarked = true; + streamMarkPosition = offsetRangeInputStream.getFilePointer(); + resettableCheckedInputStream.mark(readByteCount); + } + } + if (!streamMarked) { + streamMarkPosition = offsetRangeInputStream.getFilePointer(); + resettableCheckedInputStream.mark(readByteCount); + } + resettableCheckedInputStream.reset(); + assertEquals(bytesToWrite[(int) streamMarkPosition], resettableCheckedInputStream.read()); + } catch (IOException e) { + fail("Failure while reading bytes from offset stream"); + } + }); + threads[readIdx].start(); + } + for (Thread thread : threads) { + thread.join(); + } + } + + public void testReadAfterSkip() throws IOException { + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(TEST_FILE_SIZE_BYTES, 0); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams = new ResettableCheckedInputStream[] { resettableCheckedInputStream }; + + long skipBytes = randomLongBetween(1, TEST_FILE_SIZE_BYTES - 1); + long actualBytesSkipped = resettableCheckedInputStream.skip(skipBytes); + assertEquals(skipBytes, actualBytesSkipped); + assertEquals(bytesToWrite[(int) skipBytes], resettableCheckedInputStream.read()); + } + + public void testReadLastByte() throws IOException { + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(TEST_FILE_SIZE_BYTES, 0); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams = new ResettableCheckedInputStream[] { resettableCheckedInputStream }; + + long skipBytes = TEST_FILE_SIZE_BYTES; + long actualBytesSkipped = resettableCheckedInputStream.skip(skipBytes); + assertEquals(skipBytes, actualBytesSkipped); + assertEquals(-1, resettableCheckedInputStream.read()); + } + + @Override + @After + public void tearDown() throws Exception { + for (ResettableCheckedInputStream resettableCheckedInputStream : resettableCheckedInputStreams) { + if (resettableCheckedInputStream != null) { + resettableCheckedInputStream.close(); + } + } + super.tearDown(); + } +}