Skip to content

Commit

Permalink
fix #2036 Add SwapSubscription variant of DeferredSubscription
Browse files Browse the repository at this point in the history
A common base, AbstractArbiterSubscription, has been extracted.
  • Loading branch information
simonbasle committed Feb 7, 2020
1 parent 29218c1 commit 64d0ea2
Showing 1 changed file with 59 additions and 5 deletions.
64 changes: 59 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ public int size() {
* Base class for Subscribers that will receive their Subscriptions at any time, yet
* they might also need to be cancelled or requested at any time.
*/
public static class DeferredSubscription
protected static abstract class AbstractArbiterSubscription
implements Subscription, Scannable {

volatile Subscription s;
Expand Down Expand Up @@ -1575,6 +1575,19 @@ public void request(long n) {
}
}

static final AtomicReferenceFieldUpdater<AbstractArbiterSubscription, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(AbstractArbiterSubscription.class, Subscription.class, "s");
static final AtomicLongFieldUpdater<AbstractArbiterSubscription> REQUESTED =
AtomicLongFieldUpdater.newUpdater(AbstractArbiterSubscription.class, "requested");

}

/**
* Base class for Subscribers that will accept AT MOST one Subscription at any time, yet
* they might also need to be cancelled or requested at any time.
*/
public static class DeferredSubscription extends AbstractArbiterSubscription {

/**
* Atomically sets the single subscription and requests the missed amount from it.
*
Expand Down Expand Up @@ -1615,12 +1628,53 @@ public final boolean set(Subscription s) {

return false;
}
}

/**
* Base class for Subscribers that will swap their Subscriptions at any time, yet
* they might also need to be cancelled or requested at any time.
*
* This {@link Subscription} maintains only one active wrapped subscription, but allows
* replacement of said subscription, and the old one may be cancelled if
* {@code cancelOnReplace} is set to true.
*/
public static class SwapSubscription extends AbstractArbiterSubscription {

final boolean cancelOnReplace;

SwapSubscription(boolean cancelOnReplace) {
this.cancelOnReplace = cancelOnReplace;
}

/**
* Atomically replaces the single subscription and requests the missed amount from it.
*
* @param s the subscription to set
* @return false if this arbiter is cancelled
*/
boolean swap(Subscription s) {
Objects.requireNonNull(s, "s");
Subscription a = this.s;
if (a == Operators.cancelledSubscription()) {
s.cancel();
return false;
}
if (a != null && cancelOnReplace) {
a.cancel();
}

static final AtomicReferenceFieldUpdater<DeferredSubscription, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(DeferredSubscription.class, Subscription.class, "s");
static final AtomicLongFieldUpdater<DeferredSubscription> REQUESTED =
AtomicLongFieldUpdater.newUpdater(DeferredSubscription.class, "requested");
if (S.compareAndSet(this, a, s)) {

long r = REQUESTED.getAndSet(this, 0L);

if (r != 0L) {
s.request(r);
}

return true;
}
return false;
}
}

/**
Expand Down

0 comments on commit 64d0ea2

Please sign in to comment.