Skip to content

Commit

Permalink
Merge pull request #2961 from davidmoten/amb-bug
Browse files Browse the repository at this point in the history
fix Amb backpressure bug
  • Loading branch information
akarnokd committed May 26, 2015
2 parents eccc8c4 + 9585c6e commit 4329fed
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 29 deletions.
102 changes: 73 additions & 29 deletions src/main/java/rx/internal/operators/OnSubscribeAmb.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private static final class AmbSubscriber<T> extends Subscriber<T> {

private final Subscriber<? super T> subscriber;
private final Selection<T> selection;
private boolean chosen;

private AmbSubscriber(long requested, Subscriber<? super T> subscriber, Selection<T> selection) {
this.subscriber = subscriber;
Expand All @@ -282,11 +283,11 @@ private final void requestMore(long n) {
}

@Override
public void onNext(T args) {
public void onNext(T t) {
if (!isSelected()) {
return;
}
subscriber.onNext(args);
subscriber.onNext(t);
}

@Override
Expand All @@ -306,12 +307,17 @@ public void onError(Throwable e) {
}

private boolean isSelected() {
if (chosen) {
return true;
}
if (selection.choice.get() == this) {
// fast-path
chosen = true;
return true;
} else {
if (selection.choice.compareAndSet(null, this)) {
selection.unsubscribeOthers(this);
chosen = true;
return true;
} else {
// we lost so unsubscribe ... and force cleanup again due to possible race conditions
Expand Down Expand Up @@ -343,59 +349,97 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
}

}

private final Iterable<? extends Observable<? extends T>> sources;
private final Selection<T> selection = new Selection<T>();


//give default access instead of private as a micro-optimization
//for access from anonymous classes below
final Iterable<? extends Observable<? extends T>> sources;
final Selection<T> selection = new Selection<T>();
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;

private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
this.sources = sources;
}

@Override
public void call(final Subscriber<? super T> subscriber) {

//setup unsubscription of all the subscribers to the sources
subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
if (selection.choice.get() != null) {
AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// there is a single winner so we unsubscribe it
selection.choice.get().unsubscribe();
c.unsubscribe();
}
// if we are racing with others still existing, we'll also unsubscribe them
if(!selection.ambSubscribers.isEmpty()) {
for (AmbSubscriber<T> other : selection.ambSubscribers) {
other.unsubscribe();
}
selection.ambSubscribers.clear();
}
// if subscriptions are occurring as this is happening then this call may not
// unsubscribe everything. We protect ourselves though by doing another unsubscribe check
// after the subscription loop below
unsubscribeAmbSubscribers(selection.ambSubscribers);
}

}));

//need to subscribe to all the sources
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
// check again if choice has been made so can stop subscribing
// if all sources were backpressure aware then this check
// would be pointless given that 0 was requested above from each ambSubscriber
AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// Already chose one, the rest can be skipped and we can clean up
selection.unsubscribeOthers(c);
return;
}
source.unsafeSubscribe(ambSubscriber);
}
// while subscribing unsubscription may have occurred so we clean up after
if (subscriber.isUnsubscribed()) {
unsubscribeAmbSubscribers(selection.ambSubscribers);
}

subscriber.setProducer(new Producer() {

@Override
public void request(long n) {
if (selection.choice.get() != null) {
final AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// propagate the request to that single Subscriber that won
selection.choice.get().requestMore(n);
c.requestMore(n);
} else {
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(n, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
// possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
if (selection.choice.get() != null) {
// Already chose one, the rest can be skipped and we can clean up
selection.unsubscribeOthers(selection.choice.get());
break;
//propagate the request to all the amb subscribers
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
if (!ambSubscriber.isUnsubscribed()) {
// make a best endeavours check to not waste requests
// if first emission has already occurred
if (choice.get() == ambSubscriber) {
ambSubscriber.requestMore(n);
// don't need to request from other subscribers because choice has been made
// and request has gone to choice
return;
} else {
ambSubscriber.requestMore(n);
}
}
source.unsafeSubscribe(ambSubscriber);
}
}
}
});
}

private static <T> void unsubscribeAmbSubscribers(Collection<AmbSubscriber<T>> ambSubscribers) {
if(!ambSubscribers.isEmpty()) {
for (AmbSubscriber<T> other : ambSubscribers) {
other.unsubscribe();
}
ambSubscribers.clear();
}
}
}
70 changes: 70 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeAmbTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static rx.internal.operators.OnSubscribeAmb.amb;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -36,6 +37,7 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -219,4 +221,72 @@ public void testBackpressure() {
ts.assertNoErrors();
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
}


@Test
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
final AtomicLong count = new AtomicLong();
Action0 incrementer = new Action0() {
@Override
public void call() {
count.incrementAndGet();
}
};
//this aync stream should emit first
Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
//this stream emits second
Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.amb(o1, o2).subscribe(ts);
ts.requestMore(1);
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertNoErrors();
assertEquals(2, count.get());
}

@Test
public void testSecondaryRequestsPropagatedToChildren() throws InterruptedException {
//this aync stream should emit first
Observable<Integer> o1 = Observable.from(Arrays.asList(1, 2, 3))
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
//this stream emits second
Observable<Integer> o2 = Observable.from(Arrays.asList(4, 5, 6))
.delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onStart() {
request(1);
}};
Observable.amb(o1, o2).subscribe(ts);
// before first emission request 20 more
// this request should suffice to emit all
ts.requestMore(20);
//ensure stream does not hang
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertNoErrors();
}

@Test
public void testSynchronousSources() {
// under async subscription the second observable would complete before
// the first but because this is a synchronous subscription to sources
// then second observable does not get subscribed to before first
// subscription completes hence first observable emits result through
// amb
int result = Observable.just(1).doOnNext(new Action1<Object>() {

@Override
public void call(Object t) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
//
}
}
}).ambWith(Observable.just(2)).toBlocking().single();
assertEquals(1, result);
}

}

0 comments on commit 4329fed

Please sign in to comment.