diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java index af2c36d59..097564802 100755 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java @@ -26,8 +26,6 @@ public final class MultiFlatMapOp extends AbstractMultiOperator { private final int maxConcurrency; private final int requests; - private final Supplier> mainQueueSupplier; - public MultiFlatMapOp(Multi upstream, Function> mapper, boolean postponeFailurePropagation, @@ -37,7 +35,6 @@ public MultiFlatMapOp(Multi upstream, this.mapper = ParameterValidation.nonNull(mapper, "mapper"); this.postponeFailurePropagation = postponeFailurePropagation; this.maxConcurrency = ParameterValidation.positive(maxConcurrency, "maxConcurrency"); - this.mainQueueSupplier = Queues.get(maxConcurrency); this.requests = ParameterValidation.positive(requests, "requests"); } @@ -50,7 +47,6 @@ public void subscribe(MultiSubscriber subscriber) { mapper, postponeFailurePropagation, maxConcurrency, - mainQueueSupplier, requests); upstream.subscribe(Infrastructure.onMultiSubscription(upstream, sub)); @@ -64,12 +60,9 @@ public static final class FlatMapMainSubscriber extends FlatMapManager> mapper; - final Supplier> mainQueueSupplier; final Supplier> innerQueueSupplier; final MultiSubscriber downstream; - volatile Queue queue; - final AtomicReference failures = new AtomicReference<>(); volatile boolean done; @@ -95,13 +88,11 @@ public FlatMapMainSubscriber(MultiSubscriber downstream, Function> mapper, boolean delayError, int concurrency, - Supplier> mainQueueSupplier, int requests) { this.downstream = downstream; this.mapper = mapper; this.delayError = delayError; this.maxConcurrency = concurrency; - this.mainQueueSupplier = mainQueueSupplier; this.requests = requests; this.innerQueueSupplier = requests == 0 ? Queues.getXsQueueSupplier() : Queues.get(requests); this.limit = Subscriptions.unboundedOrLimit(concurrency); @@ -151,7 +142,6 @@ public void cancel() { cancelled = true; if (wip.getAndIncrement() == 0) { - clearQueue(); UPSTREAM_UPDATER.getAndSet(this, Subscriptions.CANCELLED).cancel(); unsubscribe(); } @@ -281,8 +271,6 @@ void drainLoop() { int n = as.length; - Queue sq = queue; - boolean noSources = isEmpty(); if (ifDoneOrCancelled()) { @@ -295,37 +283,6 @@ void drainLoop() { long e = 0L; long replenishMain = 0L; - if (r != 0L && sq != null) { - - while (e != r) { - d = done; - - O v = sq.poll(); - - boolean empty = v == null; - - if (ifDoneOrCancelled()) { - return; - } - - if (empty) { - break; - } - - a.onItem(v); - - e++; - } - - if (e != 0L) { - replenishMain += e; - if (r != Long.MAX_VALUE) { - r = requested.addAndGet(-e); - } - e = 0L; - again = true; - } - } if (r != 0L && !noSources) { int j = lastIndex; @@ -464,7 +421,6 @@ void drainLoop() { } private void cancelUpstream(boolean fromOnError) { - clearQueue(); Subscription subscription = UPSTREAM_UPDATER.getAndSet(this, Subscriptions.CANCELLED); if (subscription != null) { subscription.cancel(); @@ -472,13 +428,6 @@ private void cancelUpstream(boolean fromOnError) { unsubscribe(fromOnError); } - private void clearQueue() { - if (queue != null) { - queue.clear(); - queue = null; - } - } - boolean ifDoneOrCancelled() { if (cancelled) { cancelUpstream(false); @@ -491,7 +440,7 @@ boolean ifDoneOrCancelled() { private boolean handleTerminationIfDone() { boolean wasDone = done; - boolean isEmpty = isEmpty() && (queue == null || queue.isEmpty()); + boolean isEmpty = isEmpty(); if (delayError) { if (wasDone && isEmpty) { Throwable e = failures.get(); @@ -508,7 +457,6 @@ private boolean handleTerminationIfDone() { Throwable e = failures.get(); if (e != null && e != Subscriptions.TERMINATED) { Throwable throwable = failures.getAndSet(Subscriptions.TERMINATED); - clearQueue(); unsubscribe(true); downstream.onFailure(throwable); return true; diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/FlatMapMainSubscriberTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/FlatMapMainSubscriberTest.java index 587a50ddd..2faf15c7d 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/FlatMapMainSubscriberTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/FlatMapMainSubscriberTest.java @@ -11,7 +11,6 @@ import io.reactivex.rxjava3.processors.PublishProcessor; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.queues.Queues; import io.smallrye.mutiny.helpers.test.AssertSubscriber; import io.smallrye.mutiny.operators.AbstractMulti; import io.smallrye.mutiny.subscription.BackPressureFailure; @@ -28,7 +27,6 @@ public void testThatInvalidRequestAreRejectedByMainSubscriber() { i -> Multi.createFrom().item(2), false, 4, - Queues.get(4), 10); Multi.createFrom().item(1) @@ -47,7 +45,6 @@ public void testCancellationFromMainSubscriber() { i -> Multi.createFrom().item(2), false, 4, - Queues.get(4), 10); Multi.createFrom().item(1) @@ -69,7 +66,6 @@ public void testThatNoItemsAreDispatchedAfterCompletion() { i -> Multi.createFrom().item(2), false, 4, - Queues.get(4), 10); sub.onSubscribe(mock(Subscription.class)); @@ -93,7 +89,6 @@ public void testThatNoItemsAreDispatchedAfterFailure() { i -> Multi.createFrom().item(2), false, 4, - Queues.get(4), 10); sub.onSubscribe(mock(Subscription.class)); @@ -136,7 +131,6 @@ public void testInnerOverflow2() { i -> rogue, false, 1, - Queues.get(1), 1); sub.onSubscribe(mock(Subscription.class)); @@ -157,7 +151,6 @@ public void testInnerOverflowWithWip() { i -> rogue, false, 1, - Queues.get(1), 1); sub.onSubscribe(mock(Subscription.class));