Skip to content

Commit

Permalink
fix Amb backpressure problems
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed May 20, 2015
1 parent 1a85656 commit 975df61
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 23 deletions.
72 changes: 49 additions & 23 deletions src/main/java/rx/internal/operators/OnSubscribeAmb.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
Expand Down Expand Up @@ -343,10 +344,13 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
}

}

private final Iterable<? extends Observable<? extends T>> sources;
private final Selection<T> selection = new Selection<T>();


//give default access instead of private as a micro-optimization
//for access from anonymous classes below
final Iterable<? extends Observable<? extends T>> sources;
final Selection<T> selection = new Selection<T>();
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;

private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
this.sources = sources;
}
Expand All @@ -357,41 +361,63 @@ public void call(final Subscriber<? super T> subscriber) {

@Override
public void call() {
if (selection.choice.get() != null) {
AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// there is a single winner so we unsubscribe it
selection.choice.get().unsubscribe();
c.unsubscribe();
}
// if we are racing with others still existing, we'll also unsubscribe them
if(!selection.ambSubscribers.isEmpty()) {
for (AmbSubscriber<T> other : selection.ambSubscribers) {
Collection<AmbSubscriber<T>> ambSubs = selection.ambSubscribers;
if(!ambSubs.isEmpty()) {
for (AmbSubscriber<T> other : ambSubs) {
other.unsubscribe();
}
selection.ambSubscribers.clear();
ambSubs.clear();
}
}

}));
//need to subscribe to all the sources
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
return;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
// check again if choice has been made so can stop subscribing
// if all sources were backpressure aware then this check
// would be pointless given that 0 was requested above from each ambSubscriber
AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// Already chose one, the rest can be skipped and we can clean up
selection.unsubscribeOthers(c);
return;
}
source.unsafeSubscribe(ambSubscriber);
}
subscriber.setProducer(new Producer() {

@Override
public void request(long n) {
if (selection.choice.get() != null) {
final AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// propagate the request to that single Subscriber that won
selection.choice.get().requestMore(n);
c.requestMore(n);
} else {
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(n, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
// possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
if (selection.choice.get() != null) {
// Already chose one, the rest can be skipped and we can clean up
selection.unsubscribeOthers(selection.choice.get());
break;
//propagate the request to all the amb subscribers
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
if (!ambSubscriber.isUnsubscribed()) {
// make a best endeavours check to not waste requests
// if first emission has already occurred
if (choice.get() == ambSubscriber) {
ambSubscriber.requestMore(n);
// don't need to request from other subscribers because choice has been made
// and request has gone to choice
return;
} else {
ambSubscriber.requestMore(n);
}
}
source.unsafeSubscribe(ambSubscriber);
}
}
}
Expand Down
48 changes: 48 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeAmbTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static rx.internal.operators.OnSubscribeAmb.amb;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -219,4 +220,51 @@ public void testBackpressure() {
ts.assertNoErrors();
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
}


@Test
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
final AtomicLong count = new AtomicLong();
Action0 incrementer = new Action0() {
@Override
public void call() {
count.incrementAndGet();
}
};
//this aync stream should emit first
Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
//this stream emits second
Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.amb(o1, o2).subscribe(ts);
ts.requestMore(1);
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertNoErrors();
assertEquals(2, count.get());
}

@Test
public void testSecondaryRequestsPropagatedToChildren() throws InterruptedException {
//this aync stream should emit first
Observable<Integer> o1 = Observable.from(Arrays.asList(1, 2, 3))
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
//this stream emits second
Observable<Integer> o2 = Observable.from(Arrays.asList(4, 5, 6))
.delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onStart() {
request(1);
}};
Observable.amb(o1, o2).subscribe(ts);
// before first emission request 20 more
// this request should suffice to emit all
ts.requestMore(20);
//ensure stream does not hang
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertNoErrors();
}

}

0 comments on commit 975df61

Please sign in to comment.