Skip to content

Commit

Permalink
Support to emit multiple streams for a file content each responsible …
Browse files Browse the repository at this point in the history
…for processing a specific part of the file

Signed-off-by: Raghuvansh Raj <[email protected]>
  • Loading branch information
raghuvanshraj committed Apr 20, 2023
1 parent 53b128f commit 29b6dca
Show file tree
Hide file tree
Showing 22 changed files with 1,223 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))
- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))
- [Remote Store] Support to emit multiple streams for a file content each responsible for processing a specific part of the file ([#6977](https://github.com/opensearch-project/OpenSearch/pull/6977))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down Expand Up @@ -133,4 +134,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
55 changes: 55 additions & 0 deletions server/src/main/java/org/opensearch/common/Stream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import java.io.InputStream;

/**
* Model composed of an input stream, the total content length and offset
*/
public class Stream {

private final InputStream inputStream;
private final long contentLength;
private final long offset;

/**
* Construct a new stream object
*
* @param inputStream The input stream that is to be encapsulated
* @param contentLength The total content length that is to be read from the stream
* @param offset The offset pointer that this stream reads from in the file
*/
public Stream(InputStream inputStream, long contentLength, long offset) {
this.inputStream = inputStream;
this.contentLength = contentLength;
this.offset = offset;
}

/**
* @return The input stream this object is reading from
*/
public InputStream getInputStream() {
return inputStream;
}

/**
* @return The total length of the content that has to be read from this stream
*/
public long getContentLength() {
return contentLength;
}

/**
* @return The offset pointer in the file that this stream is reading from
*/
public long getOffset() {
return offset;
}
}
47 changes: 47 additions & 0 deletions server/src/main/java/org/opensearch/common/StreamProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import java.io.IOException;

/**
* StreamProvider is used to supply streams to vendor plugins using <code>StreamProvider#provideStream</code>
*/
public class StreamProvider {

private final TransferPartStreamSupplier streamSupplier;
private final long partSize;
private final long lastPartSize;
private final int numOfParts;

/**
* Construct a new StreamProvider object
*
* @param streamSupplier An implementation of TransferPartStreamSupplier which will be called when provideStreams is called
* @param partSize Size of all parts apart from the last one
* @param lastPartSize Size of the last part
* @param numOfParts Total number of parts
*/
public StreamProvider(TransferPartStreamSupplier streamSupplier, long partSize, long lastPartSize, int numOfParts) {
this.streamSupplier = streamSupplier;
this.partSize = partSize;
this.lastPartSize = lastPartSize;
this.numOfParts = numOfParts;
}

/**
* @param partNumber The index of the part
* @return A stream reference to the part requested
*/
public Stream provideStream(int partNumber) throws IOException {
long position = partSize * partNumber;
long size = (partNumber == numOfParts - 1) ? lastPartSize : partSize;
return streamSupplier.supply(partNumber, size, position);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import java.io.IOException;

/**
* TransferPartStreamSupplier is used to supply streams to specific parts of a file based on
* the partNo, size and position (the offset in the file)
*/
public interface TransferPartStreamSupplier {
Stream supply(int partNo, long size, long position) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream;

import org.opensearch.common.StreamProvider;

/**
* StreamContext encapsulates all the data required for uploading multiple streams
*/
public class StreamContext {

private final StreamProvider streamProvider;
private final int numberOfParts;

/**
* Construct a new StreamContext object
*
* @param streamProvider A stream provider to provide a stream for a given part number.
* @param numberOfParts Number of parts of the content referenced by equivalent number of streams.
*/
public StreamContext(StreamProvider streamProvider, int numberOfParts) {
this.streamProvider = streamProvider;
this.numberOfParts = numberOfParts;
}

/**
* @return The stream iterable for the current upload
*/
public StreamProvider getStreamProvider() {
return streamProvider;
}

/**
* @return The number of parts in current upload
*/
public int getNumberOfParts() {
return numberOfParts;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Abstractions for stream based file transfers */
package org.opensearch.common.blobstore.stream;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.write;

import org.opensearch.common.blobstore.stream.StreamContext;

/**
* Will return the <code>StreamContext</code> to the caller given the part size
*/
public interface StreamContextSupplier {

/**
* @param partSize The size of a single part to be uploaded
* @return The <code>StreamContext</code> based on the part size provided
*/
StreamContext supplyStreamContext(long partSize);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.write;

import org.opensearch.common.blobstore.stream.StreamContext;
import org.opensearch.common.blobstore.transfer.UploadFinalizer;

/**
* WriteContext is used to encapsulate all data needed by <code>BlobContainer#writeStreams</code>
*/
public class WriteContext {

private final String fileName;
private final StreamContextSupplier streamContextSupplier;
private final long fileSize;
private final boolean failIfAlreadyExists;
private final WritePriority writePriority;
private final UploadFinalizer uploadFinalizer;

/**
* Construct a new WriteContext object
*
* @param fileName The name of the file being uploaded
* @param streamContextSupplier A supplier that will provide StreamContext to the plugin
* @param fileSize The total size of the file being uploaded
* @param failIfAlreadyExists A boolean to fail the upload is the file exists
* @param writePriority The <code>WritePriority</code> of this upload
*/
public WriteContext(
String fileName,
StreamContextSupplier streamContextSupplier,
long fileSize,
boolean failIfAlreadyExists,
WritePriority writePriority,
UploadFinalizer uploadFinalizer
) {
this.fileName = fileName;
this.streamContextSupplier = streamContextSupplier;
this.fileSize = fileSize;
this.failIfAlreadyExists = failIfAlreadyExists;
this.writePriority = writePriority;
this.uploadFinalizer = uploadFinalizer;
}

/**
* @return The file name
*/
public String getFileName() {
return fileName;
}

/**
* @return The boolean representing whether to fail the file upload if it exists
*/
public boolean isFailIfAlreadyExists() {
return failIfAlreadyExists;
}

/**
* @param partSize The size of a single part to be uploaded
* @return The stream context which will be used by the plugin to initialize streams from the file
*/
public StreamContext getStreamContext(long partSize) {
return streamContextSupplier.supplyStreamContext(partSize);
}

/**
* @return The total size of the file
*/
public long getFileSize() {
return fileSize;
}

/**
* @return The <code>WritePriority</code> of the upload
*/
public WritePriority getWritePriority() {
return writePriority;
}

public UploadFinalizer getUploadFinalizer() {
return uploadFinalizer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.write;

/**
* WritePriority for upload
*/
public enum WritePriority {
NORMAL,
HIGH
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Abstractions for stream based file writes */
package org.opensearch.common.blobstore.stream.write;
Loading

0 comments on commit 29b6dca

Please sign in to comment.