diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/PayloadSizeGate.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/PayloadSizeGate.java index ba8be907702cc..806fa97e54967 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/PayloadSizeGate.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/PayloadSizeGate.java @@ -66,7 +66,8 @@ Flux write(ByteBuffer buf) { */ Flux flush() { if (byteBuffers != null) { - Flux result = dequeuingFlux(byteBuffers); + // We return Flux from iterable in this case to support retries on single upload. + Flux result = Flux.fromIterable(byteBuffers); byteBuffers = null; return result; } else { diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadUtils.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadUtils.java index 6c0611d3363b9..6c97ce64d2f0c 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadUtils.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadUtils.java @@ -44,14 +44,18 @@ public static Mono> uploadFullOrChunked(final Flux d .filter(ByteBuffer::hasRemaining) // The gate buffers data until threshold is breached. .concatMap(gate::write) - .concatWith(Flux.defer(gate::flush)) // First buffer is emitted after threshold is breached or there's no more data. // Therefore we can make a decision how to upload data on first element. .switchOnFirst((signal, flux) -> { if (gate.isThresholdBreached()) { - return uploadInChunks.apply(flux); + // In this case we can pass a flux that can have just one subscriber because + // the chunked upload is going to cache the data downstream before sending chunks over the wire. + return uploadInChunks.apply(flux.concatWith(Flux.defer(gate::flush))); } else { - return uploadFull.apply(flux, gate.size()); + // In this case gate contains all the data cached. + // The flux passed to this lambda allows only one subscriber. Therefore we substitute it + // with flux coming from gate which is based of iterable and can be subscribed again. + return uploadFull.apply(gate.flush(), gate.size()); } }) .next()