Skip to content

Commit

Permalink
SafeSubscriber: move the upstream subscription to a regular field
Browse files Browse the repository at this point in the history
This removes the AtomicReference / volatile semantics out of the way on the hot path,
at the cost of an additional AtomicBoolean to ensure correctness on multiple onSubscribe
signals.
  • Loading branch information
jponge committed Nov 29, 2020
1 parent 1da1b06 commit bcd9dcb
Showing 1 changed file with 22 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.subscription;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -18,14 +18,22 @@
*/
@SuppressWarnings({ "ReactiveStreamsSubscriberImplementation" })
public final class SafeSubscriber<T> implements Subscriber<T>, Subscription {

/**
* The actual Subscriber.
*/
private final Subscriber<? super T> downstream;

/**
* The subscription.
*/
private final AtomicReference<Subscription> upstream = new AtomicReference<>();
private Subscription upstream;

/**
* Flag to check if we have already subscribed.
*/
private final AtomicBoolean subscribed = new AtomicBoolean(false);

/**
* Indicates a terminal state.
*/
Expand All @@ -50,22 +58,23 @@ boolean isDone() {
}

@Override
public void onSubscribe(Subscription s) {
if (upstream.compareAndSet(null, s)) {
public void onSubscribe(Subscription subscription) {
if (subscribed.compareAndSet(false, true)) {
this.upstream = subscription;
try {
downstream.onSubscribe(this);
} catch (Throwable e) {
done = true;
// can't call onError because the actual's state may be corrupt at this point
try {
s.cancel();
subscription.cancel();
} catch (Throwable e1) {
// ignore it, nothing we can do.
Infrastructure.handleDroppedException(e1);
}
}
} else {
s.cancel();
subscription.cancel();
}
}

Expand All @@ -74,7 +83,7 @@ public void onNext(T t) {
if (done) {
return;
}
if (upstream.get() == null) {
if (upstream == null) {
onNextNoSubscription();
return;
}
Expand All @@ -94,7 +103,7 @@ public void onNext(T t) {

private void cancelAndDispatch(Throwable ex) {
try {
upstream.get().cancel();
upstream.cancel();
} catch (Throwable e1) {
onError(new CompositeException(ex, e1));
return;
Expand All @@ -115,7 +124,7 @@ public void onError(Throwable t) {
}
done = true;

if (upstream.get() == null) {
if (upstream == null) {
Throwable npe = new NullPointerException("Subscription not set!");

try {
Expand Down Expand Up @@ -149,7 +158,7 @@ public void onComplete() {
}
done = true;

if (upstream.get() == null) {
if (upstream == null) {
onCompleteNoSubscription();
return;
}
Expand Down Expand Up @@ -187,10 +196,10 @@ private void manageViolationProtocol() {
@Override
public void request(long n) {
try {
upstream.get().request(n);
upstream.request(n);
} catch (Throwable e) {
try {
upstream.get().cancel();
upstream.cancel();
} catch (Throwable ex) {
// nothing we can do.
Infrastructure.handleDroppedException(new CompositeException(e, ex));
Expand All @@ -201,7 +210,7 @@ public void request(long n) {
@Override
public void cancel() {
try {
upstream.get().cancel();
upstream.cancel();
} catch (Throwable e) {
// nothing we can do.
Infrastructure.handleDroppedException(e);
Expand Down

0 comments on commit bcd9dcb

Please sign in to comment.