diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java index 69552293b..680022a9f 100755 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java @@ -2,9 +2,7 @@ import java.util.Objects; import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import java.util.function.Function; import java.util.function.Supplier; @@ -566,7 +564,9 @@ static final class FlatMapInner implements Subscription, MultiSubscriber { final int limit; - final AtomicReference subscription = new AtomicReference<>(); + volatile Subscription subscription = null; + private static final AtomicReferenceFieldUpdater SUBSCRIPTION_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(FlatMapInner.class, Subscription.class, "subscription"); long produced; @@ -585,7 +585,7 @@ static final class FlatMapInner implements Subscription, MultiSubscriber { @Override public void onSubscribe(Subscription s) { Objects.requireNonNull(s); - if (subscription.compareAndSet(null, s)) { + if (SUBSCRIPTION_UPDATER.compareAndSet(this, null, s)) { s.request(Subscriptions.unboundedOrRequests(requests)); } } @@ -613,7 +613,7 @@ public void request(long n) { long p = produced + n; if (p >= limit) { produced = 0L; - subscription.get().request(p); + subscription.request(p); } else { produced = p; } @@ -626,7 +626,7 @@ public void cancel() { public void cancel(boolean doNotCancel) { if (!doNotCancel) { - Subscription last = subscription.getAndSet(Subscriptions.CANCELLED); + Subscription last = SUBSCRIPTION_UPDATER.getAndSet(this, Subscriptions.CANCELLED); if (last != null) { last.cancel(); }