Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix Flowable.publish(-|Function) subscriber swap possible data loss #5893

Merged
merged 1 commit into from
Mar 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ static final class PublishSubscriber<T>
final int bufferSize;

/** Tracks the subscribed InnerSubscribers. */
final AtomicReference<InnerSubscriber[]> subscribers;
final AtomicReference<InnerSubscriber<T>[]> subscribers;
/**
* Atomically changed from false to true by connect to make sure the
* connection is only performed by one thread.
Expand All @@ -165,8 +165,9 @@ static final class PublishSubscriber<T>
/** Holds notifications from upstream. */
volatile SimpleQueue<T> queue;

@SuppressWarnings("unchecked")
PublishSubscriber(AtomicReference<PublishSubscriber<T>> current, int bufferSize) {
this.subscribers = new AtomicReference<InnerSubscriber[]>(EMPTY);
this.subscribers = new AtomicReference<InnerSubscriber<T>[]>(EMPTY);
this.current = current;
this.shouldConnect = new AtomicBoolean();
this.bufferSize = bufferSize;
Expand All @@ -175,6 +176,7 @@ static final class PublishSubscriber<T>
@Override
public void dispose() {
if (subscribers.get() != TERMINATED) {
@SuppressWarnings("unchecked")
InnerSubscriber[] ps = subscribers.getAndSet(TERMINATED);
if (ps != TERMINATED) {
current.compareAndSet(PublishSubscriber.this, null);
Expand Down Expand Up @@ -263,15 +265,16 @@ boolean add(InnerSubscriber<T> producer) {
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// get the current producer array
InnerSubscriber[] c = subscribers.get();
InnerSubscriber<T>[] c = subscribers.get();
// if this subscriber-to-source reached a terminal state by receiving
// an onError or onComplete, just refuse to add the new producer
if (c == TERMINATED) {
return false;
}
// we perform a copy-on-write logic
int len = c.length;
InnerSubscriber[] u = new InnerSubscriber[len + 1];
@SuppressWarnings("unchecked")
InnerSubscriber<T>[] u = new InnerSubscriber[len + 1];
System.arraycopy(c, 0, u, 0, len);
u[len] = producer;
// try setting the subscribers array
Expand All @@ -287,11 +290,12 @@ boolean add(InnerSubscriber<T> producer) {
* Atomically removes the given InnerSubscriber from the subscribers array.
* @param producer the producer to remove
*/
@SuppressWarnings("unchecked")
void remove(InnerSubscriber<T> producer) {
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// let's read the current subscribers array
InnerSubscriber[] c = subscribers.get();
InnerSubscriber<T>[] c = subscribers.get();
int len = c.length;
// if it is either empty or terminated, there is nothing to remove so we quit
if (len == 0) {
Expand All @@ -311,7 +315,7 @@ void remove(InnerSubscriber<T> producer) {
return;
}
// we do copy-on-write logic here
InnerSubscriber[] u;
InnerSubscriber<T>[] u;
// we don't create a new empty array if producer was the single inhabitant
// but rather reuse an empty array
if (len == 1) {
Expand Down Expand Up @@ -340,6 +344,7 @@ void remove(InnerSubscriber<T> producer) {
* @param empty set to true if the queue is empty
* @return true if there is indeed a terminal condition
*/
@SuppressWarnings("unchecked")
boolean checkTerminated(Object term, boolean empty) {
// first of all, check if there is actually a terminal event
if (term != null) {
Expand Down Expand Up @@ -404,6 +409,17 @@ void dispatch() {
return;
}
int missed = 1;

// saving a local copy because this will be accessed after every item
// delivered to detect changes in the subscribers due to an onNext
// and thus not dropping items
AtomicReference<InnerSubscriber<T>[]> subscribers = this.subscribers;

// We take a snapshot of the current child subscribers.
// Concurrent subscribers may miss this iteration, but it is to be expected
InnerSubscriber<T>[] ps = subscribers.get();

outer:
for (;;) {
/*
* We need to read terminalEvent before checking the queue for emptiness because
Expand Down Expand Up @@ -434,10 +450,6 @@ void dispatch() {
// this loop is the only one which can turn a non-empty queue into an empty one
// and as such, no need to ask the queue itself again for that.
if (!empty) {
// We take a snapshot of the current child subscribers.
// Concurrent subscribers may miss this iteration, but it is to be expected
@SuppressWarnings("unchecked")
InnerSubscriber<T>[] ps = subscribers.get();

int len = ps.length;
// Let's assume everyone requested the maximum value.
Expand All @@ -452,14 +464,11 @@ void dispatch() {
long r = ip.get();
// if there is one child subscriber that hasn't requested yet
// we can't emit anything to anyone
if (r >= 0L) {
maxRequested = Math.min(maxRequested, r);
} else
// cancellation is indicated by a special value
if (r == CANCELLED) {
if (r != CANCELLED) {
maxRequested = Math.min(maxRequested, r - ip.emitted);
} else {
cancelled++;
}
// we ignore those with NOT_REQUESTED as if they aren't even there
}

// it may happen everyone has cancelled between here and subscribers.get()
Expand Down Expand Up @@ -518,20 +527,36 @@ void dispatch() {
}
// we need to unwrap potential nulls
T value = NotificationLite.getValue(v);

boolean subscribersChanged = false;

// let's emit this value to all child subscribers
for (InnerSubscriber<T> ip : ps) {
// if ip.get() is negative, the child has either cancelled in the
// meantime or hasn't requested anything yet
// this eager behavior will skip cancelled children in case
// multiple values are available in the queue
if (ip.get() > 0L) {
long ipr = ip.get();
if (ipr != CANCELLED) {
if (ipr != Long.MAX_VALUE) {
// indicate this child has received 1 element
ip.emitted++;
}
ip.child.onNext(value);
// indicate this child has received 1 element
ip.produced(1);
} else {
subscribersChanged = true;
}
}
// indicate we emitted one element
d++;

// see if the array of subscribers changed as a consequence
// of emission or concurrent activity
InnerSubscriber<T>[] freshArray = subscribers.get();
if (subscribersChanged || freshArray != ps) {
ps = freshArray;
continue outer;
}
}

// if we did emit at least one element, request more to replenish the queue
Expand All @@ -552,6 +577,9 @@ void dispatch() {
if (missed == 0) {
break;
}

// get a fresh copy of the current subscribers
ps = subscribers.get();
}
}
}
Expand All @@ -571,6 +599,9 @@ static final class InnerSubscriber<T> extends AtomicLong implements Subscription
*/
volatile PublishSubscriber<T> parent;

/** Track the number of emitted items (avoids decrementing the request counter). */
long emitted;

InnerSubscriber(Subscriber<? super T> child) {
this.child = child;
}
Expand All @@ -586,15 +617,6 @@ public void request(long n) {
}
}

/**
* Indicate that values have been emitted to this child subscriber by the dispatch() method.
* @param n the number of items emitted
* @return the updated request value (may indicate how much can be produced or a terminal state)
*/
public long produced(long n) {
return BackpressureHelper.producedCancel(this, n);
}

@Override
public void cancel() {
long r = get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,6 @@ static final class MulticastProcessor<T> extends Flowable<T> implements Flowable

int consumed;

long emitted;

@SuppressWarnings("unchecked")
MulticastProcessor(int prefetch, boolean delayError) {
this.prefetch = prefetch;
Expand Down Expand Up @@ -325,18 +323,20 @@ void drain() {
int upstreamConsumed = consumed;
int localLimit = limit;
boolean canRequest = sourceMode != QueueSubscription.SYNC;
long e = emitted;
AtomicReference<MulticastSubscription<T>[]> subs = subscribers;

MulticastSubscription<T>[] array = subs.get();

outer:
for (;;) {
MulticastSubscription<T>[] array = subscribers.get();

int n = array.length;

if (q != null && n != 0) {
long r = Long.MAX_VALUE;

for (MulticastSubscription<T> ms : array) {
long u = ms.get();
long u = ms.get() - ms.emitted;
if (u != Long.MIN_VALUE) {
if (r > u) {
r = u;
Expand All @@ -347,10 +347,10 @@ void drain() {
}

if (n == 0) {
r = e;
r = 0;
}

while (e != r) {
while (r != 0) {
if (isDisposed()) {
q.clear();
return;
Expand Down Expand Up @@ -393,21 +393,35 @@ void drain() {
break;
}

boolean subscribersChange = false;

for (MulticastSubscription<T> ms : array) {
if (ms.get() != Long.MIN_VALUE) {
long msr = ms.get();
if (msr != Long.MIN_VALUE) {
if (msr != Long.MAX_VALUE) {
ms.emitted++;
}
ms.actual.onNext(v);
} else {
subscribersChange = true;
}
}

e++;
r--;

if (canRequest && ++upstreamConsumed == localLimit) {
upstreamConsumed = 0;
s.get().request(localLimit);
}

MulticastSubscription<T>[] freshArray = subs.get();
if (subscribersChange || freshArray != array) {
array = freshArray;
continue outer;
}
}

if (e == r) {
if (r == 0) {
if (isDisposed()) {
q.clear();
return;
Expand Down Expand Up @@ -435,7 +449,6 @@ void drain() {
}
}

emitted = e;
consumed = upstreamConsumed;
missed = wip.addAndGet(-missed);
if (missed == 0) {
Expand All @@ -444,6 +457,7 @@ void drain() {
if (q == null) {
q = queue;
}
array = subs.get();
}
}

Expand Down Expand Up @@ -476,6 +490,8 @@ static final class MulticastSubscription<T>

final MulticastProcessor<T> parent;

long emitted;

MulticastSubscription(Subscriber<? super T> actual, MulticastProcessor<T> parent) {
this.actual = actual;
this.parent = parent;
Expand Down
Loading