From 9585c6e2a54d89f032ab9f2082395eada69b3512 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Tue, 19 May 2015 15:40:58 +1000 Subject: [PATCH] fix Amb backpressure problems --- .../rx/internal/operators/OnSubscribeAmb.java | 102 +++++++++++++----- .../operators/OnSubscribeAmbTest.java | 70 ++++++++++++ 2 files changed, 143 insertions(+), 29 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeAmb.java b/src/main/java/rx/internal/operators/OnSubscribeAmb.java index 534e308653..2ddd0dc820 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeAmb.java +++ b/src/main/java/rx/internal/operators/OnSubscribeAmb.java @@ -269,6 +269,7 @@ private static final class AmbSubscriber extends Subscriber { private final Subscriber subscriber; private final Selection selection; + private boolean chosen; private AmbSubscriber(long requested, Subscriber subscriber, Selection selection) { this.subscriber = subscriber; @@ -282,11 +283,11 @@ private final void requestMore(long n) { } @Override - public void onNext(T args) { + public void onNext(T t) { if (!isSelected()) { return; } - subscriber.onNext(args); + subscriber.onNext(t); } @Override @@ -306,12 +307,17 @@ public void onError(Throwable e) { } private boolean isSelected() { + if (chosen) { + return true; + } if (selection.choice.get() == this) { // fast-path + chosen = true; return true; } else { if (selection.choice.compareAndSet(null, this)) { selection.unsubscribeOthers(this); + chosen = true; return true; } else { // we lost so unsubscribe ... and force cleanup again due to possible race conditions @@ -343,59 +349,97 @@ public void unsubscribeOthers(AmbSubscriber notThis) { } } - - private final Iterable> sources; - private final Selection selection = new Selection(); - + + //give default access instead of private as a micro-optimization + //for access from anonymous classes below + final Iterable> sources; + final Selection selection = new Selection(); + final AtomicReference> choice = selection.choice; + private OnSubscribeAmb(Iterable> sources) { this.sources = sources; } @Override public void call(final Subscriber subscriber) { + + //setup unsubscription of all the subscribers to the sources subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { - if (selection.choice.get() != null) { + AmbSubscriber c; + if ((c = choice.get()) != null) { // there is a single winner so we unsubscribe it - selection.choice.get().unsubscribe(); + c.unsubscribe(); } // if we are racing with others still existing, we'll also unsubscribe them - if(!selection.ambSubscribers.isEmpty()) { - for (AmbSubscriber other : selection.ambSubscribers) { - other.unsubscribe(); - } - selection.ambSubscribers.clear(); - } + // if subscriptions are occurring as this is happening then this call may not + // unsubscribe everything. We protect ourselves though by doing another unsubscribe check + // after the subscription loop below + unsubscribeAmbSubscribers(selection.ambSubscribers); } - + })); + + //need to subscribe to all the sources + for (Observable source : sources) { + if (subscriber.isUnsubscribed()) { + break; + } + AmbSubscriber ambSubscriber = new AmbSubscriber(0, subscriber, selection); + selection.ambSubscribers.add(ambSubscriber); + // check again if choice has been made so can stop subscribing + // if all sources were backpressure aware then this check + // would be pointless given that 0 was requested above from each ambSubscriber + AmbSubscriber c; + if ((c = choice.get()) != null) { + // Already chose one, the rest can be skipped and we can clean up + selection.unsubscribeOthers(c); + return; + } + source.unsafeSubscribe(ambSubscriber); + } + // while subscribing unsubscription may have occurred so we clean up after + if (subscriber.isUnsubscribed()) { + unsubscribeAmbSubscribers(selection.ambSubscribers); + } + subscriber.setProducer(new Producer() { @Override public void request(long n) { - if (selection.choice.get() != null) { + final AmbSubscriber c; + if ((c = choice.get()) != null) { // propagate the request to that single Subscriber that won - selection.choice.get().requestMore(n); + c.requestMore(n); } else { - for (Observable source : sources) { - if (subscriber.isUnsubscribed()) { - break; - } - AmbSubscriber ambSubscriber = new AmbSubscriber(n, subscriber, selection); - selection.ambSubscribers.add(ambSubscriber); - // possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing) - if (selection.choice.get() != null) { - // Already chose one, the rest can be skipped and we can clean up - selection.unsubscribeOthers(selection.choice.get()); - break; + //propagate the request to all the amb subscribers + for (AmbSubscriber ambSubscriber: selection.ambSubscribers) { + if (!ambSubscriber.isUnsubscribed()) { + // make a best endeavours check to not waste requests + // if first emission has already occurred + if (choice.get() == ambSubscriber) { + ambSubscriber.requestMore(n); + // don't need to request from other subscribers because choice has been made + // and request has gone to choice + return; + } else { + ambSubscriber.requestMore(n); + } } - source.unsafeSubscribe(ambSubscriber); } } } }); } + private static void unsubscribeAmbSubscribers(Collection> ambSubscribers) { + if(!ambSubscribers.isEmpty()) { + for (AmbSubscriber other : ambSubscribers) { + other.unsubscribe(); + } + ambSubscribers.clear(); + } + } } diff --git a/src/test/java/rx/internal/operators/OnSubscribeAmbTest.java b/src/test/java/rx/internal/operators/OnSubscribeAmbTest.java index cb4a307822..76cb40800e 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeAmbTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeAmbTest.java @@ -22,6 +22,7 @@ import static rx.internal.operators.OnSubscribeAmb.amb; import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -36,6 +37,7 @@ import rx.Scheduler; import rx.Subscriber; import rx.functions.Action0; +import rx.functions.Action1; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -219,4 +221,72 @@ public void testBackpressure() { ts.assertNoErrors(); assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); } + + + @Test + public void testSubscriptionOnlyHappensOnce() throws InterruptedException { + final AtomicLong count = new AtomicLong(); + Action0 incrementer = new Action0() { + @Override + public void call() { + count.incrementAndGet(); + } + }; + //this aync stream should emit first + Observable o1 = Observable.just(1).doOnSubscribe(incrementer) + .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation()); + //this stream emits second + Observable o2 = Observable.just(1).doOnSubscribe(incrementer) + .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation()); + TestSubscriber ts = new TestSubscriber(); + Observable.amb(o1, o2).subscribe(ts); + ts.requestMore(1); + ts.awaitTerminalEvent(5, TimeUnit.SECONDS); + ts.assertNoErrors(); + assertEquals(2, count.get()); + } + + @Test + public void testSecondaryRequestsPropagatedToChildren() throws InterruptedException { + //this aync stream should emit first + Observable o1 = Observable.from(Arrays.asList(1, 2, 3)) + .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation()); + //this stream emits second + Observable o2 = Observable.from(Arrays.asList(4, 5, 6)) + .delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation()); + TestSubscriber ts = new TestSubscriber() { + @Override + public void onStart() { + request(1); + }}; + Observable.amb(o1, o2).subscribe(ts); + // before first emission request 20 more + // this request should suffice to emit all + ts.requestMore(20); + //ensure stream does not hang + ts.awaitTerminalEvent(5, TimeUnit.SECONDS); + ts.assertNoErrors(); + } + + @Test + public void testSynchronousSources() { + // under async subscription the second observable would complete before + // the first but because this is a synchronous subscription to sources + // then second observable does not get subscribed to before first + // subscription completes hence first observable emits result through + // amb + int result = Observable.just(1).doOnNext(new Action1() { + + @Override + public void call(Object t) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // + } + } + }).ambWith(Observable.just(2)).toBlocking().single(); + assertEquals(1, result); + } + }