Skip to content

Commit

Permalink
Added Datalake Buffered Upload (#8302)
Browse files Browse the repository at this point in the history
  • Loading branch information
gapra-msft authored Feb 21, 2020
1 parent 314478f commit 6376fb7
Show file tree
Hide file tree
Showing 65 changed files with 11,441 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.UploadUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
Expand All @@ -31,7 +32,6 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.security.GeneralSecurityException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -318,7 +318,7 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall
try {
final Map<String, String> metadataFinal = metadata == null ? new HashMap<>() : metadata;

return Mono.using(() -> super.uploadFileResourceSupplier(filePath),
return Mono.using(() -> UploadUtils.uploadFileResourceSupplier(filePath, logger),
channel -> this.uploadWithResponse(FluxUtil.readFile(channel), parallelTransferOptions, headers,
metadataFinal, tier, requestConditions)
.then()
Expand All @@ -328,20 +328,12 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall
} catch (IOException e) {
throw logger.logExceptionAsError(new UncheckedIOException(e));
}
}), this::uploadFileCleanup);
}), channel -> UploadUtils.uploadFileCleanup(channel, logger));
} catch (RuntimeException ex) {
return monoError(logger, ex);
}
}

private void uploadFileCleanup(AsynchronousFileChannel channel) {
try {
channel.close();
} catch (IOException e) {
throw logger.logExceptionAsError(new UncheckedIOException(e));
}
}

/**
* Encrypts the given Flux ByteBuffer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.azure.storage.blob.specialized.PageBlobAsyncClient;
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.UploadUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -31,8 +32,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Base64;
import java.util.LinkedList;
Expand Down Expand Up @@ -527,7 +526,7 @@ public Mono<Void> uploadFromFile(String filePath, boolean overwrite) {

// Note that if the file will be uploaded using a putBlob, we also can skip the exists check.
if (!overwrite) {
if (uploadInBlocks(filePath, BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES)) {
if (UploadUtils.shouldUploadInChunks(filePath, BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES, logger)) {
overwriteCheck = exists().flatMap(exists -> exists
? monoError(logger, new IllegalArgumentException(Constants.BLOB_ALREADY_EXISTS))
: Mono.empty());
Expand Down Expand Up @@ -571,15 +570,16 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall
final ParallelTransferOptions finalParallelTransferOptions =
ModelHelper.populateAndApplyDefaults(parallelTransferOptions);
try {
return Mono.using(() -> uploadFileResourceSupplier(filePath),
return Mono.using(() -> UploadUtils.uploadFileResourceSupplier(filePath, logger),
channel -> {
try {
BlockBlobAsyncClient blockBlobAsyncClient = getBlockBlobAsyncClient();
long fileSize = channel.size();

// If the file is larger than 256MB chunk it and stage it as blocks.
if (uploadInBlocks(filePath, finalParallelTransferOptions.getMaxSingleUploadSize())) {
return uploadBlocks(fileSize, finalParallelTransferOptions, originalBlockSize, headers,
if (UploadUtils.shouldUploadInChunks(filePath,
finalParallelTransferOptions.getMaxSingleUploadSize(), logger)) {
return uploadFileChunks(fileSize, finalParallelTransferOptions, originalBlockSize, headers,
metadata, tier, requestConditions, channel, blockBlobAsyncClient);
} else {
// Otherwise we know it can be sent in a single request reducing network overhead.
Expand All @@ -590,27 +590,15 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall
} catch (IOException ex) {
return Mono.error(ex);
}
}, this::uploadFileCleanup);
},
channel ->
UploadUtils.uploadFileCleanup(channel, logger));
} catch (RuntimeException ex) {
return monoError(logger, ex);
}
}

boolean uploadInBlocks(String filePath, Integer maxSingleUploadSize) {
AsynchronousFileChannel channel = uploadFileResourceSupplier(filePath);
boolean retVal;
try {
retVal = channel.size() > maxSingleUploadSize;
} catch (IOException e) {
throw logger.logExceptionAsError(new UncheckedIOException(e));
} finally {
uploadFileCleanup(channel);
}

return retVal;
}

private Mono<Void> uploadBlocks(long fileSize, ParallelTransferOptions parallelTransferOptions,
private Mono<Void> uploadFileChunks(long fileSize, ParallelTransferOptions parallelTransferOptions,
Integer originalBlockSize, BlobHttpHeaders headers, Map<String, String> metadata, AccessTier tier,
BlobRequestConditions requestConditions, AsynchronousFileChannel channel, BlockBlobAsyncClient client) {
final BlobRequestConditions finalRequestConditions = (requestConditions == null)
Expand Down Expand Up @@ -649,19 +637,7 @@ private Mono<Void> uploadBlocks(long fileSize, ParallelTransferOptions parallelT
* @throws UncheckedIOException an input output exception.
*/
protected AsynchronousFileChannel uploadFileResourceSupplier(String filePath) {
try {
return AsynchronousFileChannel.open(Paths.get(filePath), StandardOpenOption.READ);
} catch (IOException e) {
throw logger.logExceptionAsError(new UncheckedIOException(e));
}
}

private void uploadFileCleanup(AsynchronousFileChannel channel) {
try {
channel.close();
} catch (IOException e) {
throw logger.logExceptionAsError(new UncheckedIOException(e));
}
return UploadUtils.uploadFileResourceSupplier(filePath, logger);
}

private String getBlockID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.common.implementation.UploadUtils;
import reactor.core.publisher.Mono;

import java.io.IOException;
Expand Down Expand Up @@ -214,7 +215,7 @@ public void uploadFromFile(String filePath, boolean overwrite) {

if (!overwrite) {
// Note we only want to make the exists call if we will be uploading in stages. Otherwise it is superfluous.
if (client.uploadInBlocks(filePath, BlockBlobClient.MAX_UPLOAD_BLOB_BYTES) && exists()) {
if (UploadUtils.shouldUploadInChunks(filePath, BlockBlobClient.MAX_UPLOAD_BLOB_BYTES, logger) && exists()) {
throw logger.logExceptionAsError(new IllegalArgumentException(Constants.BLOB_ALREADY_EXISTS));
}
requestConditions = new BlobRequestConditions().setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD);
Expand Down
4 changes: 2 additions & 2 deletions sdk/storage/azure-storage-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
</scm>

<properties>
<jacoco.min.linecoverage>0.20</jacoco.min.linecoverage>
<jacoco.min.branchcoverage>0.18</jacoco.min.branchcoverage>
<jacoco.min.linecoverage>0.10</jacoco.min.linecoverage>
<jacoco.min.branchcoverage>0.10</jacoco.min.branchcoverage>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.common;

import com.azure.core.annotation.Fluent;
import com.azure.storage.common.implementation.StorageImplUtils;

/**
* This class contains configuration used to parallelize data transfer operations. Note that not all values are used
* by every method which accepts this type. Please refer to the javadoc on specific methods for these cases.
*/
@Fluent
public final class ParallelTransferOptions {

private final Integer blockSize;
private final Integer numBuffers;
private final ProgressReceiver progressReceiver;
private final Integer maxSingleUploadSize;

/**
* Creates a new {@link ParallelTransferOptions} with default parameters applied.
*
* @param blockSize The block size.
* For upload, The block size is the size of each block that will be staged. This value also determines the number
* of requests that need to be made. This parameter also determines the size that each buffer uses when buffering
* is required and consequently amount of memory consumed by such methods may be up to blockSize * numBuffers.
* For download to file, the block size is the size of each data chunk returned from the service.
* For both applications, If block size is large, upload will make fewer network calls, but each
* individual call will send more data and will therefore take longer.
* @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method
* should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the
* number of buffers, the more parallel, and thus faster, the upload portion of this operation will be.
* The amount of memory consumed by methods using this value may be up to blockSize * numBuffers.
* @param progressReceiver {@link ProgressReceiver}
* @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a
* single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be
* ignored. Some constraints to consider are that more requests cost more, but several small or mid-sized requests
* may sometimes perform better. In the case of buffered upload, up to this amount of data may be buffered before
* any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the
* service accepts for uploading in a single requests, which varies depending on the service.
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver,
Integer maxSingleUploadSize) {
this.blockSize = blockSize;
if (numBuffers != null) {
StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE);
}
this.numBuffers = numBuffers;
this.progressReceiver = progressReceiver;
this.maxSingleUploadSize = maxSingleUploadSize;
}

/**
* Gets the block size (chunk size) to transfer at a time.
* @return The block size.
*/
public Integer getBlockSize() {
return this.blockSize;
}

/**
* Gets the number of buffers being used for a transfer operation.
* @return The number of buffers.
*/
public Integer getNumBuffers() {
return this.numBuffers;
}

/**
* Gets the Progress receiver for parallel reporting
* @return The progress reporter
*/
public ProgressReceiver getProgressReceiver() {
return this.progressReceiver;
}

/**
* Gets the value above which the upload will be broken into blocks and parallelized.
* @return The threshold value.
*/
public Integer getMaxSingleUploadSize() {
return this.maxSingleUploadSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.common;

import reactor.core.publisher.Flux;

/**
* A {@code ProgressReceiver} is an object that can be used to report progress on network transfers. When specified on
* transfer operations, the {@code reportProgress} method will be called periodically with the total number of bytes
* transferred. The user may configure this method to report progress in whatever format desired. It is recommended
* that this type be used in conjunction with {@link ProgressReporter#addProgressReporting(Flux, ProgressReceiver)} to
* enable reporting on sequential transfers. Note that any method accepting a {@link ParallelTransferOptions} will use
* the {@code ProgressReceiver} specified there and will handle the logic to coordinate the reporting between parallel
* operations.
*/
public interface ProgressReceiver {

/**
* The callback function invoked as progress is reported.
*
* @param bytesTransferred The total number of bytes transferred during this transaction.
*/
void reportProgress(long bytesTransferred);
}
Loading

0 comments on commit 6376fb7

Please sign in to comment.