Skip to content

Commit

Permalink
Improve subscription ceremony in the case of a Strict subscriber or a…
Browse files Browse the repository at this point in the history
… built-in operator
  • Loading branch information
cescoffier committed Jan 4, 2021
1 parent b5ac99d commit 9f16bb4
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.smallrye.mutiny.operators;

import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNullNpe;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.function.Predicate;

Expand All @@ -25,11 +25,16 @@ public void subscribe(MultiSubscriber<? super T> subscriber) {
this.subscribe(Infrastructure.onMultiSubscription(this, subscriber));
}

@SuppressWarnings("unchecked")
@Override
public void subscribe(Subscriber<? super T> subscriber) {
// NOTE The Reactive Streams TCK mandates throwing an NPE.
Objects.requireNonNull(subscriber, "Subscriber is `null`");
this.subscribe(new StrictMultiSubscriber<>(subscriber));
if (subscriber instanceof MultiOperator || subscriber instanceof StrictMultiSubscriber) {
this.subscribe((MultiSubscriber<? super T>) subscriber);
} else {
// NOTE The Reactive Streams TCK mandates throwing an NPE.
nonNullNpe(subscriber, "subscriber");
this.subscribe(new StrictMultiSubscriber<>(subscriber));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,4 @@ public abstract class AbstractMultiOperator<I, O> extends AbstractMulti<O> imple
public AbstractMultiOperator(Multi<? extends I> upstream) {
this.upstream = ParameterValidation.nonNull(upstream, "upstream");
}

public Multi<? extends I> upstream() {
return upstream;
}
}

0 comments on commit 9f16bb4

Please sign in to comment.