diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody.java index 9b19907ce36b..d713f9ccfd55 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody.java @@ -23,10 +23,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.core.exception.NonRetryableException; +import software.amazon.awssdk.core.internal.async.SplittingPublisher; import software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream; import software.amazon.awssdk.core.internal.util.NoopSubscription; +import software.amazon.awssdk.utils.async.DelegatingSubscriber; import software.amazon.awssdk.utils.async.InputStreamConsumingPublisher; /** @@ -104,6 +107,11 @@ public void subscribe(Subscriber s) { } } + @Override + public SdkPublisher split(AsyncRequestBodySplitConfiguration splitConfiguration) { + return new BlockingSplittingPublisher(this, splitConfiguration); + } + private void waitForSubscriptionIfNeeded() throws InterruptedException { long timeoutSeconds = subscribeTimeout.getSeconds(); if (!subscribedLatch.await(timeoutSeconds, TimeUnit.SECONDS)) { @@ -112,4 +120,44 @@ private void waitForSubscriptionIfNeeded() throws InterruptedException { + "BEFORE invoking doBlockingWrite if your caller is single-threaded."); } } + + private class BlockingSplittingPublisher extends SplittingPublisher { + + public BlockingSplittingPublisher(AsyncRequestBody asyncRequestBody, + AsyncRequestBodySplitConfiguration splitConfiguration) { + super(asyncRequestBody, splitConfiguration); + } + + @Override + public void subscribe(Subscriber downstreamSubscriber) { + Subscriber delegatingSubscriber = new DelegatingSubscriber( + downstreamSubscriber) { + @Override + public void onSubscribe(Subscription subscription) { + Subscription delegatingSubscription = new Subscription() { + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + subscription.cancel(); + + //Cancel origin body to prevent stuck calling thread + BlockingInputStreamAsyncRequestBody.this.cancel(); + } + }; + super.onSubscribe(delegatingSubscription); + } + + @Override + public void onNext(AsyncRequestBody body) { + subscriber.onNext(body); + } + }; + + super.subscribe(delegatingSubscriber); + } + } }