Skip to content

Commit

Permalink
Remove unused outer queue in flatmap subscriber
Browse files Browse the repository at this point in the history
Remove the mainqueuesupplier in flatmap subscriber
  • Loading branch information
ozangunalp committed Jul 4, 2022
1 parent 236582d commit ade72b8
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public final class MultiFlatMapOp<I, O> extends AbstractMultiOperator<I, O> {
private final int maxConcurrency;
private final int requests;

private final Supplier<? extends Queue<O>> mainQueueSupplier;

public MultiFlatMapOp(Multi<? extends I> upstream,
Function<? super I, ? extends Publisher<? extends O>> mapper,
boolean postponeFailurePropagation,
Expand All @@ -37,7 +35,6 @@ public MultiFlatMapOp(Multi<? extends I> upstream,
this.mapper = ParameterValidation.nonNull(mapper, "mapper");
this.postponeFailurePropagation = postponeFailurePropagation;
this.maxConcurrency = ParameterValidation.positive(maxConcurrency, "maxConcurrency");
this.mainQueueSupplier = Queues.get(maxConcurrency);
this.requests = ParameterValidation.positive(requests, "requests");
}

Expand All @@ -50,7 +47,6 @@ public void subscribe(MultiSubscriber<? super O> subscriber) {
mapper,
postponeFailurePropagation,
maxConcurrency,
mainQueueSupplier,
requests);

upstream.subscribe(Infrastructure.onMultiSubscription(upstream, sub));
Expand All @@ -64,12 +60,9 @@ public static final class FlatMapMainSubscriber<I, O> extends FlatMapManager<Fla
final int requests;
final int limit;
final Function<? super I, ? extends Publisher<? extends O>> mapper;
final Supplier<? extends Queue<O>> mainQueueSupplier;
final Supplier<? extends Queue<O>> innerQueueSupplier;
final MultiSubscriber<? super O> downstream;

volatile Queue<O> queue;

final AtomicReference<Throwable> failures = new AtomicReference<>();

volatile boolean done;
Expand All @@ -95,13 +88,11 @@ public FlatMapMainSubscriber(MultiSubscriber<? super O> downstream,
Function<? super I, ? extends Publisher<? extends O>> mapper,
boolean delayError,
int concurrency,
Supplier<? extends Queue<O>> mainQueueSupplier,
int requests) {
this.downstream = downstream;
this.mapper = mapper;
this.delayError = delayError;
this.maxConcurrency = concurrency;
this.mainQueueSupplier = mainQueueSupplier;
this.requests = requests;
this.innerQueueSupplier = requests == 0 ? Queues.getXsQueueSupplier() : Queues.get(requests);
this.limit = Subscriptions.unboundedOrLimit(concurrency);
Expand Down Expand Up @@ -151,7 +142,6 @@ public void cancel() {
cancelled = true;

if (wip.getAndIncrement() == 0) {
clearQueue();
UPSTREAM_UPDATER.getAndSet(this, Subscriptions.CANCELLED).cancel();
unsubscribe();
}
Expand Down Expand Up @@ -281,8 +271,6 @@ void drainLoop() {

int n = as.length;

Queue<O> sq = queue;

boolean noSources = isEmpty();

if (ifDoneOrCancelled()) {
Expand All @@ -295,37 +283,6 @@ void drainLoop() {
long e = 0L;
long replenishMain = 0L;

if (r != 0L && sq != null) {

while (e != r) {
d = done;

O v = sq.poll();

boolean empty = v == null;

if (ifDoneOrCancelled()) {
return;
}

if (empty) {
break;
}

a.onItem(v);

e++;
}

if (e != 0L) {
replenishMain += e;
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
e = 0L;
again = true;
}
}
if (r != 0L && !noSources) {

int j = lastIndex;
Expand Down Expand Up @@ -464,21 +421,13 @@ void drainLoop() {
}

private void cancelUpstream(boolean fromOnError) {
clearQueue();
Subscription subscription = UPSTREAM_UPDATER.getAndSet(this, Subscriptions.CANCELLED);
if (subscription != null) {
subscription.cancel();
}
unsubscribe(fromOnError);
}

private void clearQueue() {
if (queue != null) {
queue.clear();
queue = null;
}
}

boolean ifDoneOrCancelled() {
if (cancelled) {
cancelUpstream(false);
Expand All @@ -491,7 +440,7 @@ boolean ifDoneOrCancelled() {

private boolean handleTerminationIfDone() {
boolean wasDone = done;
boolean isEmpty = isEmpty() && (queue == null || queue.isEmpty());
boolean isEmpty = isEmpty();
if (delayError) {
if (wasDone && isEmpty) {
Throwable e = failures.get();
Expand All @@ -508,7 +457,6 @@ private boolean handleTerminationIfDone() {
Throwable e = failures.get();
if (e != null && e != Subscriptions.TERMINATED) {
Throwable throwable = failures.getAndSet(Subscriptions.TERMINATED);
clearQueue();
unsubscribe(true);
downstream.onFailure(throwable);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import io.reactivex.rxjava3.processors.PublishProcessor;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.BackPressureFailure;
Expand All @@ -28,7 +27,6 @@ public void testThatInvalidRequestAreRejectedByMainSubscriber() {
i -> Multi.createFrom().item(2),
false,
4,
Queues.get(4),
10);

Multi.createFrom().item(1)
Expand All @@ -47,7 +45,6 @@ public void testCancellationFromMainSubscriber() {
i -> Multi.createFrom().item(2),
false,
4,
Queues.get(4),
10);

Multi.createFrom().item(1)
Expand All @@ -69,7 +66,6 @@ public void testThatNoItemsAreDispatchedAfterCompletion() {
i -> Multi.createFrom().item(2),
false,
4,
Queues.get(4),
10);

sub.onSubscribe(mock(Subscription.class));
Expand All @@ -93,7 +89,6 @@ public void testThatNoItemsAreDispatchedAfterFailure() {
i -> Multi.createFrom().item(2),
false,
4,
Queues.get(4),
10);

sub.onSubscribe(mock(Subscription.class));
Expand Down Expand Up @@ -136,7 +131,6 @@ public void testInnerOverflow2() {
i -> rogue,
false,
1,
Queues.get(1),
1);

sub.onSubscribe(mock(Subscription.class));
Expand All @@ -157,7 +151,6 @@ public void testInnerOverflowWithWip() {
i -> rogue,
false,
1,
Queues.get(1),
1);

sub.onSubscribe(mock(Subscription.class));
Expand Down

0 comments on commit ade72b8

Please sign in to comment.