Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobOutputStream using BufferedUpload #7067

Merged
merged 10 commits into from
Jan 3, 2020
7 changes: 7 additions & 0 deletions sdk/storage/azure-storage-blob/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@
<version>3.2.7</version> <!-- {x-version-update;cglib:cglib-nodep;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<!-- Just for now so I can test the StreamUtils.copy -->
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.2.2.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@
package com.azure.storage.blob.specialized;

import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobClientBuilder;
import com.azure.storage.blob.implementation.util.ModelHelper;
import com.azure.storage.blob.models.AppendBlobRequestConditions;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.CpkInfo;
import com.azure.storage.blob.models.CustomerProvidedKey;
import com.azure.storage.blob.models.PageBlobRequestConditions;
import com.azure.storage.blob.models.PageRange;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.common.StorageOutputStream;
import com.azure.storage.common.implementation.Constants;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -52,7 +58,8 @@ static BlobOutputStream pageBlobOutputStream(final PageBlobAsyncClient client, f
* Closes this output stream and releases any system resources associated with this stream. If any data remains in
* the buffer it is committed to the service.
*
* @throws IOException If an I/O error occurs.
* @throws IOException
* If an I/O error occurs.
*/
@Override
public synchronized void close() throws IOException {
Expand Down Expand Up @@ -134,39 +141,35 @@ void commit() {
}

private static final class BlockBlobOutputStream extends BlobOutputStream {

private final BlobRequestConditions requestConditions;
private final String blockIdPrefix;
private final List<String> blockList;
private final BlockBlobAsyncClient client;
private final BlobAsyncClient client;

private BlockBlobOutputStream(final BlockBlobAsyncClient client,
final BlobRequestConditions requestConditions) {
super(BlockBlobClient.MAX_STAGE_BLOCK_BYTES);
this.client = client;
super(BlockBlobClient.MAX_UPLOAD_BLOB_BYTES);
this.client = prepareBuilder(client).buildAsyncClient();
this.requestConditions = (requestConditions == null) ? new BlobRequestConditions() : requestConditions;
this.blockIdPrefix = UUID.randomUUID().toString() + '-';
this.blockList = new ArrayList<>();
}

/**
* Generates a new block ID to be used for PutBlock.
*
* @return Base64 encoded block ID
*/
private String getCurrentBlockId() {
String blockIdSuffix = String.format("%06d", this.blockList.size());
return Base64.getEncoder().encodeToString((this.blockIdPrefix + blockIdSuffix)
.getBytes(StandardCharsets.UTF_8));
@Override
synchronized void commit() {
// BlockBlob is now based on buffered upload, no need to commit.
}

private Mono<Void> writeBlock(Flux<ByteBuffer> blockData, String blockId, long writeLength) {
return client.stageBlockWithResponse(blockId, blockData, writeLength, null,
this.requestConditions.getLeaseId())
.then()
.onErrorResume(BlobStorageException.class, e -> {
this.lastError = new IOException(e);
return Mono.empty();
});
private BlobClientBuilder prepareBuilder(BlobAsyncClientBase client) {
BlobClientBuilder builder = new BlobClientBuilder()
.pipeline(client.getHttpPipeline())
.endpoint(client.getBlobUrl())
.snapshot(client.getSnapshotId())
.serviceVersion(client.getServiceVersion());

CpkInfo cpk = client.getCustomerProvidedKey();
if (cpk != null) {
builder.customerProvidedKey(new CustomerProvidedKey(cpk.getEncryptionKey()));
}

return builder;
}

@Override
Expand All @@ -175,21 +178,34 @@ protected Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
return Mono.empty();
}

final String blockID = this.getCurrentBlockId();
this.blockList.add(blockID);

Flux<ByteBuffer> fbb = Flux.range(0, 1)
.concatMap(pos -> Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength)));

return this.writeBlock(fbb.subscribeOn(Schedulers.elastic()), blockID, writeLength);
ParallelTransferOptions parallelTransferOptions = ModelHelper.populateAndApplyDefaults(null);

return this.writeBlock(fbb.subscribeOn(Schedulers.elastic()), parallelTransferOptions);
}

private Mono<Void> writeBlock(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions) {
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
return client.uploadWithResponse(data, parallelTransferOptions, null, null, null, requestConditions)
.then()
.onErrorResume(BlobStorageException.class, e -> {
this.lastError = new IOException(e);
return Mono.empty();
});
}

/**
* Commits the blob, for block blob this uploads the block list.
*/
@Override
synchronized void commit() {
client.commitBlockListWithResponse(this.blockList, null, null, null, this.requestConditions).block();
protected void writeInternal(final byte[] data, int offset, int length) {
dispatchWrite(data, length, offset)
.doOnError(t -> {
if (t instanceof IOException) {
lastError = (IOException) t;
} else {
lastError = new IOException(t);
}
})
.block();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ import com.azure.storage.blob.models.BlobErrorCode
import com.azure.storage.blob.models.BlobStorageException
import com.azure.storage.blob.models.PageRange
import com.azure.storage.common.implementation.Constants
import org.springframework.util.StreamUtils
import spock.lang.Requires

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths

class BlobOutputStreamTest extends APISpec {
private static int FOUR_MB = 4 * Constants.MB

Expand Down Expand Up @@ -133,4 +138,28 @@ class BlobOutputStreamTest extends APISpec {

return outputStream.toByteArray()
}

@Requires({ liveMode() })
def "BlockBlob large file"() {
setup:
Path filePath = Paths.get("C:/Users/gapra/temptestfile1g.txt")
InputStream is = Files.newInputStream(filePath)

when:
OutputStream bos = cc.getBlobClient(generateBlobName()).getBlockBlobClient().getBlobOutputStream(null)

def startTime = System.nanoTime()
//
StreamUtils.copy(is, bos)

// cc.getBlobClient(generateBlobName()).uploadFromFile(filePath.toString())

def endTime = System.nanoTime();

def duration = (endTime - startTime);

then:
System.out.println("Done. " + duration)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected StorageOutputStream(final int writeThreshold) {
* @param offset An <code>int</code> which represents the start offset in the data.
* @param length An <code>int</code> which represents the number of bytes to write.
*/
private void writeInternal(final byte[] data, int offset, int length) {
protected void writeInternal(final byte[] data, int offset, int length) {
int chunks = (int) (Math.ceil((double) length / (double) this.writeThreshold));
Flux.range(0, chunks).map(c -> offset + c * this.writeThreshold)
.concatMap(pos -> processChunk(data, pos, offset, length))
Expand Down Expand Up @@ -125,7 +125,7 @@ public void write(@NonNull final byte[] data, final int offset, final int length
* <p>
* <code>true</code> is acceptable for you.
*
* @param byteVal An <code>int</code> which represents the bye value to write.
* @param byteVal An <code>int</code> which represents the byte value to write.
*/
@Override
public void write(final int byteVal) {
Expand Down