From 5af3d0a2b9c20628e511352e8f51814a2c6f5c5b Mon Sep 17 00:00:00 2001 From: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com> Date: Thu, 23 Jul 2020 15:51:15 -0700 Subject: [PATCH] Fix DownloadResponseTests that began to fail after upgrading Reactor version (#13463) --- .../specialized/DownloadResponseMockFlux.java | 97 ++++++++----------- .../specialized/DownloadResponseTest.groovy | 4 +- 2 files changed, 43 insertions(+), 58 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseMockFlux.java b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseMockFlux.java index d0382bb0d9d8c..031ca4b9dc7a5 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseMockFlux.java +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseMockFlux.java @@ -11,17 +11,15 @@ import com.azure.storage.blob.implementation.models.BlobsDownloadResponse; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.models.DownloadRetryOptions; -import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Operators; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.concurrent.TimeoutException; -class DownloadResponseMockFlux extends Flux { +class DownloadResponseMockFlux { static final int DR_TEST_SCENARIO_SUCCESSFUL_ONE_CHUNK = 0; // Data emitted in one chunk static final int DR_TEST_SCENARIO_SUCCESSFUL_MULTI_CHUNK = 1; // Data emitted in multiple chunks static final int DR_TEST_SCENARIO_SUCCESSFUL_STREAM_FAILURES = 2; // Stream failures successfully handled @@ -33,10 +31,11 @@ class DownloadResponseMockFlux extends Flux { static final int DR_TEST_SCENARIO_TIMEOUT = 10; // ReliableDownload with timeout after not receiving items for 60s static final int DR_TEST_SCENARIO_ERROR_AFTER_ALL_DATA = 11; // Don't actually issue another retry if we've read all the data and the source failed at the end - private int scenario; + private final int scenario; + private final ByteBuffer scenarioData; + private int tryNumber; private HttpGetterInfo info; - private ByteBuffer scenarioData; private DownloadRetryOptions options; private boolean subscribed = false; // Only used for multiple subscription test. @@ -89,27 +88,23 @@ DownloadResponseMockFlux setOptions(DownloadRetryOptions options) { return this; } - @Override - public void subscribe(CoreSubscriber subscriber) { + private Flux getDownloadStream() { switch (this.scenario) { case DR_TEST_SCENARIO_SUCCESSFUL_ONE_CHUNK: - subscriber.onNext(this.scenarioData.duplicate()); - Operators.complete(subscriber); - break; + return Flux.just(scenarioData.duplicate()); case DR_TEST_SCENARIO_SUCCESSFUL_MULTI_CHUNK: - for (int i = 0; i < 4; i++) { + return Flux.range(0, 4).map(i -> { ByteBuffer toSend = this.scenarioData.duplicate(); toSend.position(i * 256); toSend.limit((i + 1) * 256); - subscriber.onNext(toSend); - } - Operators.complete(subscriber); - break; + + return toSend; + }); case DR_TEST_SCENARIO_NO_MULTIPLE_SUBSCRIPTION: if (this.subscribed) { - throw new IllegalStateException("Cannot subscribe to the same flux twice"); + return Flux.error(new IllegalStateException("Cannot subscribe to the same flux twice")); } this.subscribed = true; // fall through to test data @@ -119,69 +114,59 @@ public void subscribe(CoreSubscriber subscriber) { // tryNumber is 1 indexed, so we have to sub 1. if (this.info.getOffset() != (this.tryNumber - 1) * 256 || this.info.getCount() != this.scenarioData.remaining() - (this.tryNumber - 1) * 256) { - Operators.error(subscriber, new IllegalArgumentException("Info values are incorrect.")); - return; + return Flux.error(new IllegalArgumentException("Info values are incorrect.")); } + ByteBuffer toSend = this.scenarioData.duplicate(); toSend.position((this.tryNumber - 1) * 256); toSend.limit(this.tryNumber * 256); - subscriber.onNext(toSend); + + Flux dataStream = Flux.just(toSend); + // A slightly odd but sufficient means of exercising the different retriable exceptions. Exception e = tryNumber % 2 == 0 ? new IOException() : new TimeoutException(); - Operators.error(subscriber, e); - break; + + return dataStream.concatWith(Flux.error(e)); } if (this.info.getOffset() != (this.tryNumber - 1) * 256 || this.info.getCount() != this.scenarioData.remaining() - (this.tryNumber - 1) * 256) { - Operators.error(subscriber, new IllegalArgumentException("Info values are incorrect.")); - return; + return Flux.error(new IllegalArgumentException("Info values are incorrect.")); } ByteBuffer toSend = this.scenarioData.duplicate(); toSend.position((this.tryNumber - 1) * 256); toSend.limit(this.tryNumber * 256); - subscriber.onNext(toSend); - Operators.complete(subscriber); - break; + + return Flux.just(toSend); case DR_TEST_SCENARIO_ERROR_AFTER_ALL_DATA: - subscriber.onNext(this.scenarioData.duplicate()); - Operators.error(subscriber, new IOException("Exception at end")); - break; + return Flux.just(scenarioData.duplicate()).concatWith(Flux.error(new IOException("Exception at end"))); case DR_TEST_SCENARIO_MAX_RETRIES_EXCEEDED: - Operators.error(subscriber, new IOException()); - break; + return Flux.error(new IOException()); case DR_TEST_SCENARIO_NON_RETRYABLE_ERROR: - Operators.error(subscriber, new Exception()); - break; + return Flux.error(new Exception()); case DR_TEST_SCENARIO_ERROR_GETTER_MIDDLE: - if (this.tryNumber == 1) { - /* - We return a retryable error here so we have to invoke the getter, which will throw an error in - this case. - */ - Operators.error(subscriber, new IOException()); - } else { - Operators.error(subscriber, new IllegalArgumentException("Retried after getter error.")); - } - break; + /* + * We return a retryable error here so we have to invoke the getter, which will throw an error in + * this case. + */ + return (this.tryNumber == 1) + ? Flux.error(new IOException()) + : Flux.error(new IllegalArgumentException("Retried after getter error.")); case DR_TEST_SCENARIO_INFO_TEST: switch (this.tryNumber) { case 1: // Test the value of info when getting the initial response. case 2: // Test the value of info when getting an intermediate response. - Operators.error(subscriber, new IOException()); - break; + return Flux.error(new IOException()); case 3: // All calls to getter checked. Exit. This test does not check for data. - Operators.complete(subscriber); - break; + return Flux.empty(); default: - throw new IllegalArgumentException("Invalid try number."); + return Flux.error(new IllegalArgumentException("Invalid try number.")); } - break; case DR_TEST_SCENARIO_TIMEOUT: try { @@ -189,11 +174,11 @@ public void subscribe(CoreSubscriber subscriber) { } catch (InterruptedException e) { e.printStackTrace(); } - Operators.complete(subscriber); - break; + + return Flux.empty(); default: - Operators.error(subscriber, new IllegalArgumentException("Invalid test case")); + return Flux.error(new IllegalArgumentException("Invalid test case")); } } @@ -202,9 +187,9 @@ Mono getter(HttpGetterInfo info) { this.info = info; long contentUpperBound = info.getCount() == null ? this.scenarioData.remaining() - 1 : info.getOffset() + info.getCount() - 1; - BlobsDownloadResponse rawResponse = new BlobsDownloadResponse(null, 200, new HttpHeaders(), this, - new BlobDownloadHeaders().setContentRange(String.format("%d-%d/%d", - info.getOffset(), contentUpperBound, this.scenarioData.remaining()))); + BlobsDownloadResponse rawResponse = new BlobsDownloadResponse(null, 200, new HttpHeaders(), + this.getDownloadStream(), new BlobDownloadHeaders().setContentRange(String.format("%d-%d/%d", + info.getOffset(), contentUpperBound, this.scenarioData.remaining()))); ReliableDownload response = new ReliableDownload(rawResponse, options, info, this::getter); switch (this.scenario) { @@ -266,7 +251,7 @@ public Mono getBodyAsString(Charset charset) { // Construct a new flux each time to mimic getting a new download stream. DownloadResponseMockFlux nextFlux = new DownloadResponseMockFlux(this.scenario, this.tryNumber, this.scenarioData, this.info, this.options); - rawResponse = new BlobsDownloadResponse(null, 200, new HttpHeaders(), nextFlux, + rawResponse = new BlobsDownloadResponse(null, 200, new HttpHeaders(), nextFlux.getDownloadStream(), new BlobDownloadHeaders()); response = new ReliableDownload(rawResponse, options, info, this::getter); return Mono.just(response); diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseTest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseTest.groovy index 83b3dd9a6f134..c7ac2f87a5811 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseTest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/DownloadResponseTest.groovy @@ -166,8 +166,8 @@ class DownloadResponseTest extends APISpec { response.getValue().subscribeOn(Schedulers.elastic()).then().block(Duration.ofSeconds((retryCount + 1) * 62)) then: - def e = thrown(Exceptions.ReactiveException) - e.getCause() instanceof TimeoutException + def e = thrown(Throwable) + Exceptions.unwrap(e) instanceof TimeoutException where: // We test retry count elsewhere. Just using small numbers to speed up the test.