Skip to content

Commit

Permalink
Fix single upload retry. (#12834)
Browse files Browse the repository at this point in the history
  • Loading branch information
kasobol-msft authored Jul 6, 2020
1 parent 714c26b commit b730543
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ Flux<ByteBuffer> write(ByteBuffer buf) {
*/
Flux<ByteBuffer> flush() {
if (byteBuffers != null) {
Flux<ByteBuffer> result = dequeuingFlux(byteBuffers);
// We return Flux from iterable in this case to support retries on single upload.
Flux<ByteBuffer> result = Flux.fromIterable(byteBuffers);
byteBuffers = null;
return result;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ public static <T> Mono<Response<T>> uploadFullOrChunked(final Flux<ByteBuffer> d
.filter(ByteBuffer::hasRemaining)
// The gate buffers data until threshold is breached.
.concatMap(gate::write)
.concatWith(Flux.defer(gate::flush))
// First buffer is emitted after threshold is breached or there's no more data.
// Therefore we can make a decision how to upload data on first element.
.switchOnFirst((signal, flux) -> {
if (gate.isThresholdBreached()) {
return uploadInChunks.apply(flux);
// In this case we can pass a flux that can have just one subscriber because
// the chunked upload is going to cache the data downstream before sending chunks over the wire.
return uploadInChunks.apply(flux.concatWith(Flux.defer(gate::flush)));
} else {
return uploadFull.apply(flux, gate.size());
// In this case gate contains all the data cached.
// The flux passed to this lambda allows only one subscriber. Therefore we substitute it
// with flux coming from gate which is based of iterable and can be subscribed again.
return uploadFull.apply(gate.flush(), gate.size());
}
})
.next()
Expand Down

0 comments on commit b730543

Please sign in to comment.