Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobOutputStream using BufferedUpload #7067

Merged
merged 10 commits into from
Jan 3, 2020
5 changes: 4 additions & 1 deletion sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

## 12.2.0-beta.2 (Unreleased)
- Added a field to ParallelTransferOptions that allows customers to configure the maximum size to upload in a single PUT. Data sizes larger than this value will be chunked and parallelized.
- Added overloads to downloadToFile to add the option to overwrite existing files. Default behavior is to not overwrite.
- Added overloads to downloadToFile to add the option to overwrite existing files. Default behavior is to not overwrite.
- Improved performance of BlockBlobOutputStream.
- Added overloads to BlockBlobClient.getBlobOutputStream to allow users to provide parallel transfer options, http headers, metadata, access tier, and request conditions.


## 12.2.0-beta.1 (2019-12-17)
- Added SAS generation methods on clients to improve discoverability and convenience of sas. Deprecated setContainerName, setBlobName, setSnapshotId, generateSasQueryParameters methods on BlobServiceSasSignatureValues to direct users to using the methods added on clients.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,28 @@
package com.azure.storage.blob.specialized;

import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobClientBuilder;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.AppendBlobRequestConditions;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.CpkInfo;
import com.azure.storage.blob.models.CustomerProvidedKey;
import com.azure.storage.blob.models.PageBlobRequestConditions;
import com.azure.storage.blob.models.PageRange;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.common.StorageOutputStream;
import com.azure.storage.common.implementation.Constants;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.UUID;
import java.util.Map;

/**
* BlobOutputStream allows for the uploading of data to a blob using a stream-like approach.
Expand All @@ -37,8 +41,9 @@ static BlobOutputStream appendBlobOutputStream(final AppendBlobAsyncClient clien
}

static BlobOutputStream blockBlobOutputStream(final BlockBlobAsyncClient client,
final BlobRequestConditions requestConditions) {
return new BlockBlobOutputStream(client, requestConditions);
final ParallelTransferOptions parallelTransferOptions, final BlobHttpHeaders headers,
final Map<String, String> metadata, final AccessTier tier, final BlobRequestConditions requestConditions) {
return new BlockBlobOutputStream(client, parallelTransferOptions, headers, metadata, tier, requestConditions);
}

static BlobOutputStream pageBlobOutputStream(final PageBlobAsyncClient client, final PageRange pageRange,
Expand Down Expand Up @@ -70,6 +75,11 @@ public synchronized void close() throws IOException {
} catch (final BlobStorageException e) {
throw new IOException(e);
}
/* Need this check because for block blob the buffered upload error only manifests itself after commit is
called */
if (this.lastError != null) {
throw lastError;
}
} finally {
// if close() is called again, an exception will be thrown
this.lastError = new IOException(Constants.STREAM_CLOSED);
Expand Down Expand Up @@ -134,62 +144,74 @@ void commit() {
}

private static final class BlockBlobOutputStream extends BlobOutputStream {
private final BlobRequestConditions requestConditions;
private final String blockIdPrefix;
private final List<String> blockList;
private final BlockBlobAsyncClient client;

private FluxSink<ByteBuffer> sink;

boolean complete;

private BlockBlobOutputStream(final BlockBlobAsyncClient client,
final BlobRequestConditions requestConditions) {
final ParallelTransferOptions parallelTransferOptions, final BlobHttpHeaders headers,
final Map<String, String> metadata, final AccessTier tier, final BlobRequestConditions requestConditions) {
super(BlockBlobClient.MAX_STAGE_BLOCK_BYTES);
this.client = client;
this.requestConditions = (requestConditions == null) ? new BlobRequestConditions() : requestConditions;
this.blockIdPrefix = UUID.randomUUID().toString() + '-';
this.blockList = new ArrayList<>();
}

/**
* Generates a new block ID to be used for PutBlock.
*
* @return Base64 encoded block ID
*/
private String getCurrentBlockId() {
String blockIdSuffix = String.format("%06d", this.blockList.size());
return Base64.getEncoder().encodeToString((this.blockIdPrefix + blockIdSuffix)
.getBytes(StandardCharsets.UTF_8));
}
BlobAsyncClient blobClient = prepareBuilder(client).buildAsyncClient();

private Mono<Void> writeBlock(Flux<ByteBuffer> blockData, String blockId, long writeLength) {
return client.stageBlockWithResponse(blockId, blockData, writeLength, null,
this.requestConditions.getLeaseId())
.then()
Flux<ByteBuffer> fbb = Flux.create((FluxSink<ByteBuffer> sink) -> this.sink = sink);

/* Subscribe by upload takes too long. We need to subscribe so that the sink is actually created. Since
this subscriber doesn't do anything and no data has started flowing, there are no drawbacks to this extra
subscribe. */
fbb.subscribe();

blobClient.uploadWithResponse(fbb, parallelTransferOptions, headers, metadata, tier, requestConditions)
// This allows the operation to continue while maintaining the error that occurred.
.onErrorResume(BlobStorageException.class, e -> {
this.lastError = new IOException(e);
return Mono.empty();
});
})
.doOnTerminate(() -> complete = true)
.subscribe();
}

@Override
protected Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
if (writeLength == 0) {
return Mono.empty();
private BlobClientBuilder prepareBuilder(BlobAsyncClientBase client) {
BlobClientBuilder builder = new BlobClientBuilder()
.pipeline(client.getHttpPipeline())
.endpoint(client.getBlobUrl())
.snapshot(client.getSnapshotId())
.serviceVersion(client.getServiceVersion());

CpkInfo cpk = client.getCustomerProvidedKey();
if (cpk != null) {
builder.customerProvidedKey(new CustomerProvidedKey(cpk.getEncryptionKey()));
}

final String blockID = this.getCurrentBlockId();
this.blockList.add(blockID);
return builder;
}

Flux<ByteBuffer> fbb = Flux.range(0, 1)
.concatMap(pos -> Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength)));
@Override
void commit() {
sink.complete();

// Need to wait until the uploadTask completes
while (!complete) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
// Does this need to be caught by logger?
e.printStackTrace();
}
}
}

return this.writeBlock(fbb.subscribeOn(Schedulers.elastic()), blockID, writeLength);
@Override
protected void writeInternal(final byte[] data, int offset, int length) {
sink.next(ByteBuffer.wrap(data, offset, length));
}

/**
* Commits the blob, for block blob this uploads the block list.
*/
// Never called
@Override
synchronized void commit() {
client.commitBlockListWithResponse(this.blockList, null, null, null, this.requestConditions).block();
protected Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
return Mono.empty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.azure.storage.blob.models.BlockBlobItem;
import com.azure.storage.blob.models.BlockList;
import com.azure.storage.blob.models.BlockListType;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.common.Utility;
import com.azure.storage.common.implementation.Constants;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -115,7 +116,29 @@ public BlobOutputStream getBlobOutputStream(boolean overwrite) {
* @throws BlobStorageException If a storage service error occurred.
*/
public BlobOutputStream getBlobOutputStream(BlobRequestConditions requestConditions) {
return BlobOutputStream.blockBlobOutputStream(client, requestConditions);
return getBlobOutputStream(null, null, null, null, requestConditions);
}

/**
* Creates and opens an output stream to write data to the block blob. If the blob already exists on the service, it
* will be overwritten.
* <p>
* To avoid overwriting, pass "*" to {@link BlobRequestConditions#setIfNoneMatch(String)}.
*
* @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading.
* @param headers {@link BlobHttpHeaders}
* @param metadata Metadata to associate with the blob.
* @param tier {@link AccessTier} for the destination blob.
* @param requestConditions {@link BlobRequestConditions}
*
* @return A {@link BlobOutputStream} object used to write data to the blob.
* @throws BlobStorageException If a storage service error occurred.
*/
public BlobOutputStream getBlobOutputStream(ParallelTransferOptions parallelTransferOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a changelog entry for this?

BlobHttpHeaders headers, Map<String, String> metadata, AccessTier tier,
BlobRequestConditions requestConditions) {
return BlobOutputStream.blockBlobOutputStream(client, parallelTransferOptions, headers, metadata, tier,
requestConditions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class BlobOutputStreamTest extends APISpec {
and:
blockBlobClient.getBlobOutputStream()


then:
thrown(IllegalArgumentException)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected StorageOutputStream(final int writeThreshold) {
* @param offset An <code>int</code> which represents the start offset in the data.
* @param length An <code>int</code> which represents the number of bytes to write.
*/
private void writeInternal(final byte[] data, int offset, int length) {
protected void writeInternal(final byte[] data, int offset, int length) {
int chunks = (int) (Math.ceil((double) length / (double) this.writeThreshold));
Flux.range(0, chunks).map(c -> offset + c * this.writeThreshold)
.concatMap(pos -> processChunk(data, pos, offset, length))
Expand Down Expand Up @@ -125,7 +125,7 @@ public void write(@NonNull final byte[] data, final int offset, final int length
* <p>
* <code>true</code> is acceptable for you.
*
* @param byteVal An <code>int</code> which represents the bye value to write.
* @param byteVal An <code>int</code> which represents the byte value to write.
*/
@Override
public void write(final int byteVal) {
Expand Down