From 64d0ea2c7faed266af4e0581b2e83af1d04ffe95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Fri, 7 Feb 2020 17:59:34 +0100 Subject: [PATCH] fix #2036 Add SwapSubscription variant of DeferredSubscription A common base, AbstractArbiterSubscription, has been extracted. --- .../reactor/core/publisher/Operators.java | 64 +++++++++++++++++-- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index af99a6e252..4d93cd948c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -1523,7 +1523,7 @@ public int size() { * Base class for Subscribers that will receive their Subscriptions at any time, yet * they might also need to be cancelled or requested at any time. */ - public static class DeferredSubscription + protected static abstract class AbstractArbiterSubscription implements Subscription, Scannable { volatile Subscription s; @@ -1575,6 +1575,19 @@ public void request(long n) { } } + static final AtomicReferenceFieldUpdater S = + AtomicReferenceFieldUpdater.newUpdater(AbstractArbiterSubscription.class, Subscription.class, "s"); + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(AbstractArbiterSubscription.class, "requested"); + + } + + /** + * Base class for Subscribers that will accept AT MOST one Subscription at any time, yet + * they might also need to be cancelled or requested at any time. + */ + public static class DeferredSubscription extends AbstractArbiterSubscription { + /** * Atomically sets the single subscription and requests the missed amount from it. * @@ -1615,12 +1628,53 @@ public final boolean set(Subscription s) { return false; } + } + + /** + * Base class for Subscribers that will swap their Subscriptions at any time, yet + * they might also need to be cancelled or requested at any time. + * + * This {@link Subscription} maintains only one active wrapped subscription, but allows + * replacement of said subscription, and the old one may be cancelled if + * {@code cancelOnReplace} is set to true. + */ + public static class SwapSubscription extends AbstractArbiterSubscription { + + final boolean cancelOnReplace; + + SwapSubscription(boolean cancelOnReplace) { + this.cancelOnReplace = cancelOnReplace; + } + + /** + * Atomically replaces the single subscription and requests the missed amount from it. + * + * @param s the subscription to set + * @return false if this arbiter is cancelled + */ + boolean swap(Subscription s) { + Objects.requireNonNull(s, "s"); + Subscription a = this.s; + if (a == Operators.cancelledSubscription()) { + s.cancel(); + return false; + } + if (a != null && cancelOnReplace) { + a.cancel(); + } - static final AtomicReferenceFieldUpdater S = - AtomicReferenceFieldUpdater.newUpdater(DeferredSubscription.class, Subscription.class, "s"); - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(DeferredSubscription.class, "requested"); + if (S.compareAndSet(this, a, s)) { + long r = REQUESTED.getAndSet(this, 0L); + + if (r != 0L) { + s.request(r); + } + + return true; + } + return false; + } } /**