From 40e851b963da58ababfe864286ae0dcb612e3148 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 10 Jan 2014 13:22:47 +0100 Subject: [PATCH 1/2] Delay with subscription and item delaying observables. --- rxjava-core/src/main/java/rx/Observable.java | 44 +++ .../java/rx/operators/OperationDelay.java | 195 ++++++++++++ .../java/rx/operators/OperationDelayTest.java | 286 ++++++++++++++++++ 3 files changed, 525 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac2fb7b5d..87bce6ba27 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2167,6 +2167,50 @@ public static Observable timer(long initialDelay, long period, TimeUnit un return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler)); } + /** + * Create an Observable which delays the events via another Observable on a per item-basis. + *

+ * Note: onError or onCompleted events are immediately propagated. + *

+ * Note: if the Observable returned by the {@code itemDelay} just completes, that + * particular source item is not emitted. + * + * @param the item delay value type (ignored) + * @param itemDelay function that returns an Observable for each source item which is + * then used for delaying that particular item until the Observable + * fires its first onNext event. + * @return an Observable which delays the events via another Observable on a per item-basis. + */ + public Observable delay(Func1> itemDelay) { + return create(OperationDelay.delay(this, itemDelay)); + } + /** + * Create an Observable which delays the subscription and events via another Observables on a per item-basis. + *

+ * Note: onError or onCompleted events are immediately propagated. + *

+ * Note: if the Observable returned by the {@code itemDelay} just completes, that + * particular source item is not emitted. + *

+ * Note: if the {@code subscriptionDelay}'s Observable just completes, the created + * observable will just complete as well. + * + * @param the subscription delay value type (ignored) + * @param the item delay value type (ignored) + * @param subscriptionDelay function that returns an Observable which will trigger + * the subscription to the source observable once it fires an + * onNext event. + * @param itemDelay function that returns an Observable for each source item which is + * then used for delaying that particular item until the Observable + * fires its first onNext event. + * @return an Observable which delays the events via another Observable on a per item-basis. + */ + public Observable delay( + Func0> subscriptionDelay, + Func1> itemDelay) { + return create(OperationDelay.delay(this, subscriptionDelay, itemDelay)); + } + /** * Returns an Observable that emits the items emitted by the source * Observable shifted forward in time by a specified delay. Error diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java index 4594adda76..56d9de1cb3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDelay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -23,8 +23,11 @@ import rx.Scheduler; import rx.Subscription; import rx.observables.ConnectableObservable; +import rx.subscriptions.CompositeSubscription; import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; +import rx.util.functions.Func0; import rx.util.functions.Func1; public final class OperationDelay { @@ -82,4 +85,196 @@ public void call() { return ssub; } } + /** + * Delay the emission of the source items by a per-item observable that fires its first element. + */ + public static OnSubscribeFunc delay(Observable source, + Func1> itemDelay) { + return new DelayViaObservable(source, null, itemDelay); + } + /** + * Delay the subscription and emission of the source items by a per-item observable that fires its first element. + */ + public static OnSubscribeFunc delay(Observable source, + Func0> subscriptionDelay, + Func1> itemDelay) { + return new DelayViaObservable(source, subscriptionDelay, itemDelay); + } + /** + * Delay the emission of the source items by a per-item observable that fires its first element. + */ + private static final class DelayViaObservable implements OnSubscribeFunc { + final Observable source; + final Func0> subscriptionDelay; + final Func1> itemDelay; + + public DelayViaObservable(Observable source, + Func0> subscriptionDelay, + Func1> itemDelay) { + this.source = source; + this.subscriptionDelay = subscriptionDelay; + this.itemDelay = itemDelay; + } + + @Override + public Subscription onSubscribe(Observer t1) { + CompositeSubscription csub = new CompositeSubscription(); + + if (subscriptionDelay == null) { + csub.add(source.subscribe(new SourceObserver(t1, itemDelay, csub))); + } else { + Observable subscriptionSource; + try { + subscriptionSource = subscriptionDelay.call(); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + SerialSubscription ssub = new SerialSubscription(); + csub.add(ssub); + ssub.set(subscriptionSource.subscribe(new SubscribeDelay(source, t1, itemDelay, csub, ssub))); + } + + return csub; + } + private static final class SubscribeDelay implements Observer { + final Observable source; + final Observer observer; + final Func1> itemDelay; + final CompositeSubscription csub; + final Subscription self; + /** Prevent any onError and onCompleted once the first item was delivered. */ + boolean subscribed; + + public SubscribeDelay(Observable source, Observer observer, Func1> itemDelay, + CompositeSubscription csub, Subscription self) { + this.source = source; + this.observer = observer; + this.itemDelay = itemDelay; + this.csub = csub; + this.self = self; + } + + @Override + public void onNext(U args) { + subscribed = true; + csub.remove(self); + csub.add(source.subscribe(new SourceObserver(observer, itemDelay, csub))); + } + + @Override + public void onError(Throwable e) { + if (!subscribed) { + observer.onError(e); + csub.unsubscribe(); + } + } + + @Override + public void onCompleted() { + if (!subscribed) { + observer.onCompleted(); + csub.unsubscribe(); + } + } + } + /** The source observer. */ + private static final class SourceObserver implements Observer { + final Observer observer; + final Func1> itemDelay; + final CompositeSubscription csub; + /** Guard to avoid overlapping events from the various sources. */ + final Object guard; + boolean done; + + public SourceObserver(Observer observer, Func1> itemDelay, CompositeSubscription csub) { + this.observer = observer; + this.itemDelay = itemDelay; + this.csub = csub; + this.guard = new Object(); + } + + @Override + public void onNext(T args) { + Observable delayer; + try { + delayer = itemDelay.call(args); + } catch (Throwable t) { + onError(t); + return; + } + SerialSubscription ssub = new SerialSubscription(); + csub.add(ssub); + + ssub.set(delayer.subscribe(new DelayObserver(args, this, ssub))); + } + + @Override + public void onError(Throwable e) { + synchronized (guard) { + if (done) { + return; + } + done = true; + observer.onError(e); + } + csub.unsubscribe(); + } + + @Override + public void onCompleted() { + synchronized (guard) { + if (done) { + return; + } + done = true; + observer.onCompleted(); + } + csub.unsubscribe(); + } + + public void emit(T value, Subscription token) { + synchronized (guard) { + if (done) { + return; + } + observer.onNext(value); + } + remove(token); + } + public void remove(Subscription token) { + csub.remove(token); + } + } + /** + * Delay observer. + */ + private static final class DelayObserver implements Observer { + final T value; + final SourceObserver parent; + final Subscription token; + + public DelayObserver(T value, SourceObserver parent, Subscription token) { + this.value = value; + this.parent = parent; + this.token = token; + } + + @Override + public void onNext(U args) { + parent.emit(value, token); + } + + @Override + public void onError(Throwable e) { + parent.onError(e); + } + + @Override + public void onCompleted() { + parent.remove(token); + } + + } + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java index 10fdd459bd..f040a012c2 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java @@ -15,6 +15,8 @@ */ package rx.operators; +import java.util.ArrayList; +import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.inOrder; @@ -36,6 +38,8 @@ import rx.Observer; import rx.Subscription; import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; +import rx.util.functions.Func0; import rx.util.functions.Func1; public class OperationDelayTest { @@ -235,4 +239,286 @@ public void testDelaySubscriptionCancelBeforeTime() { verify(o, never()).onCompleted(); verify(o, never()).onError(any(Throwable.class)); } + + @Test + public void testDelayWithObservableNormal1() { + PublishSubject source = PublishSubject.create(); + final List> delays = new ArrayList>(); + final int n = 10; + for (int i = 0; i < n; i++) { + PublishSubject delay = PublishSubject.create(); + delays.add(delay); + } + + Func1> delayFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return delays.get(t1); + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + + + for (int i = 0; i < n; i++) { + source.onNext(i); + delays.get(i).onNext(i); + inOrder.verify(o).onNext(i); + } + source.onCompleted(); + + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testDelayWithObservableSkipper1() { + PublishSubject source = PublishSubject.create(); + final List> delays = new ArrayList>(); + final int n = 10; + for (int i = 0; i < n; i++) { + PublishSubject delay = PublishSubject.create(); + delays.add(delay); + } + + Func1> delayFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return delays.get(t1); + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + + + for (int i = 0; i < n; i++) { + source.onNext(i); + if (i % 2 == 0) { + delays.get(i).onNext(i); + inOrder.verify(o).onNext(i); + } else { + delays.get(i).onCompleted(); + inOrder.verify(o, never()).onNext(i); + } + } + source.onCompleted(); + + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testDelayWithObservableSingleSend1() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + + source.onNext(1); + delay.onNext(1); + delay.onNext(2); + + inOrder.verify(o).onNext(1); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testDelayWithObservableSourceThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + source.onNext(1); + source.onError(new OperationReduceTest.CustomException()); + delay.onNext(1); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + @Test + public void testDelayWithObservableDelayFunctionThrows() { + PublishSubject source = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + throw new OperationReduceTest.CustomException(); + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + source.onNext(1); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + + @Test + public void testDelayWithObservableDelayThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + source.onNext(1); + delay.onError(new OperationReduceTest.CustomException()); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + @Test + public void testDelayWithObservableSubscriptionNormal() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + Func0> subFunc = new Func0>() { + @Override + public Observable call() { + return delay; + } + }; + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(subFunc, delayFunc).subscribe(o); + + source.onNext(1); + delay.onNext(1); + + source.onNext(2); + delay.onNext(2); + + inOrder.verify(o).onNext(2); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + verify(o, never()).onCompleted(); + } + @Test + public void testDelayWithObservableSubscriptionFunctionThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + Func0> subFunc = new Func0>() { + @Override + public Observable call() { + throw new OperationReduceTest.CustomException(); + } + }; + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(subFunc, delayFunc).subscribe(o); + + source.onNext(1); + delay.onNext(1); + + source.onNext(2); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + @Test + public void testDelayWithObservableSubscriptionThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + Func0> subFunc = new Func0>() { + @Override + public Observable call() { + return delay; + } + }; + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(subFunc, delayFunc).subscribe(o); + + source.onNext(1); + delay.onError(new OperationReduceTest.CustomException()); + + source.onNext(2); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } } From 337efeb42f1ec27679632718fd022d98f61efa1a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 13 Jan 2014 09:06:43 +0100 Subject: [PATCH 2/2] Modified to conform Rx.NET --- rxjava-core/src/main/java/rx/Observable.java | 13 +-- .../java/rx/operators/OperationDelay.java | 87 +++++++++------ .../java/rx/operators/OperationDelayTest.java | 104 +++++++++++------- 3 files changed, 117 insertions(+), 87 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 87bce6ba27..ba777759cb 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2170,10 +2170,7 @@ public static Observable timer(long initialDelay, long period, TimeUnit un /** * Create an Observable which delays the events via another Observable on a per item-basis. *

- * Note: onError or onCompleted events are immediately propagated. - *

- * Note: if the Observable returned by the {@code itemDelay} just completes, that - * particular source item is not emitted. + * Note: onError event is immediately propagated. * * @param the item delay value type (ignored) * @param itemDelay function that returns an Observable for each source item which is @@ -2187,13 +2184,7 @@ public Observable delay(Func1> itemDel /** * Create an Observable which delays the subscription and events via another Observables on a per item-basis. *

- * Note: onError or onCompleted events are immediately propagated. - *

- * Note: if the Observable returned by the {@code itemDelay} just completes, that - * particular source item is not emitted. - *

- * Note: if the {@code subscriptionDelay}'s Observable just completes, the created - * observable will just complete as well. + * Note: onError event is immediately propagated. * * @param the subscription delay value type (ignored) * @param the item delay value type (ignored) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java index 56d9de1cb3..e553d09b60 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDelay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -120,8 +120,11 @@ public DelayViaObservable(Observable source, public Subscription onSubscribe(Observer t1) { CompositeSubscription csub = new CompositeSubscription(); + SerialSubscription sosub = new SerialSubscription(); + csub.add(sosub); + SourceObserver so = new SourceObserver(t1, itemDelay, csub, sosub); if (subscriptionDelay == null) { - csub.add(source.subscribe(new SourceObserver(t1, itemDelay, csub))); + sosub.set(source.subscribe(so)); } else { Observable subscriptionSource; try { @@ -132,50 +135,48 @@ public Subscription onSubscribe(Observer t1) { } SerialSubscription ssub = new SerialSubscription(); csub.add(ssub); - ssub.set(subscriptionSource.subscribe(new SubscribeDelay(source, t1, itemDelay, csub, ssub))); + ssub.set(subscriptionSource.subscribe(new SubscribeDelay(source, so, csub, ssub))); } return csub; } + /** Subscribe delay observer. */ private static final class SubscribeDelay implements Observer { final Observable source; - final Observer observer; - final Func1> itemDelay; + final SourceObserver so; final CompositeSubscription csub; final Subscription self; - /** Prevent any onError and onCompleted once the first item was delivered. */ + /** Prevent any onError once the first item was delivered. */ boolean subscribed; - public SubscribeDelay(Observable source, Observer observer, Func1> itemDelay, + public SubscribeDelay( + Observable source, + SourceObserver so, CompositeSubscription csub, Subscription self) { this.source = source; - this.observer = observer; - this.itemDelay = itemDelay; + this.so = so; this.csub = csub; this.self = self; } @Override public void onNext(U args) { - subscribed = true; - csub.remove(self); - csub.add(source.subscribe(new SourceObserver(observer, itemDelay, csub))); + onCompleted(); } @Override public void onError(Throwable e) { if (!subscribed) { - observer.onError(e); + so.observer.onError(e); csub.unsubscribe(); } } @Override public void onCompleted() { - if (!subscribed) { - observer.onCompleted(); - csub.unsubscribe(); - } + subscribed = true; + csub.remove(self); + so.self.set(source.subscribe(so)); } } /** The source observer. */ @@ -183,15 +184,21 @@ private static final class SourceObserver implements Observer { final Observer observer; final Func1> itemDelay; final CompositeSubscription csub; + final SerialSubscription self; /** Guard to avoid overlapping events from the various sources. */ final Object guard; boolean done; + int wip; - public SourceObserver(Observer observer, Func1> itemDelay, CompositeSubscription csub) { + public SourceObserver(Observer observer, + Func1> itemDelay, + CompositeSubscription csub, + SerialSubscription self) { this.observer = observer; this.itemDelay = itemDelay; this.csub = csub; this.guard = new Object(); + this.self = self; } @Override @@ -203,19 +210,19 @@ public void onNext(T args) { onError(t); return; } + + synchronized (guard) { + wip++; + } + SerialSubscription ssub = new SerialSubscription(); csub.add(ssub); - ssub.set(delayer.subscribe(new DelayObserver(args, this, ssub))); } @Override public void onError(Throwable e) { synchronized (guard) { - if (done) { - return; - } - done = true; observer.onError(e); } csub.unsubscribe(); @@ -223,27 +230,37 @@ public void onError(Throwable e) { @Override public void onCompleted() { + boolean b; synchronized (guard) { - if (done) { - return; - } done = true; - observer.onCompleted(); + b = checkDone(); + } + if (b) { + csub.unsubscribe(); + } else { + self.unsubscribe(); } - csub.unsubscribe(); } - public void emit(T value, Subscription token) { + void emit(T value, Subscription token) { + boolean b; synchronized (guard) { - if (done) { - return; - } observer.onNext(value); + wip--; + b = checkDone(); + } + if (b) { + csub.unsubscribe(); + } else { + csub.remove(token); } - remove(token); } - public void remove(Subscription token) { - csub.remove(token); + boolean checkDone() { + if (done && wip == 0) { + observer.onCompleted(); + return true; + } + return false; } } /** @@ -272,7 +289,7 @@ public void onError(Throwable e) { @Override public void onCompleted() { - parent.remove(token); + parent.emit(value, token); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java index f040a012c2..81728753c6 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java @@ -276,47 +276,7 @@ public Observable call(Integer t1) { verify(o, never()).onError(any(Throwable.class)); } - @Test - public void testDelayWithObservableSkipper1() { - PublishSubject source = PublishSubject.create(); - final List> delays = new ArrayList>(); - final int n = 10; - for (int i = 0; i < n; i++) { - PublishSubject delay = PublishSubject.create(); - delays.add(delay); - } - - Func1> delayFunc = new Func1>() { - @Override - public Observable call(Integer t1) { - return delays.get(t1); - } - }; - - @SuppressWarnings("unchecked") - Observer o = mock(Observer.class); - InOrder inOrder = inOrder(o); - - source.delay(delayFunc).subscribe(o); - - - for (int i = 0; i < n; i++) { - source.onNext(i); - if (i % 2 == 0) { - delays.get(i).onNext(i); - inOrder.verify(o).onNext(i); - } else { - delays.get(i).onCompleted(); - inOrder.verify(o, never()).onNext(i); - } - } - source.onCompleted(); - - inOrder.verify(o).onCompleted(); - inOrder.verifyNoMoreInteractions(); - - verify(o, never()).onError(any(Throwable.class)); - } + @Test public void testDelayWithObservableSingleSend1() { PublishSubject source = PublishSubject.create(); @@ -521,4 +481,66 @@ public Observable call(Integer t1) { verify(o, never()).onNext(any()); verify(o, never()).onCompleted(); } + @Test + public void testDelayWithObservableEmptyDelayer() { + PublishSubject source = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return Observable.empty(); + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + + source.onNext(1); + source.onCompleted(); + + inOrder.verify(o).onNext(1); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testDelayWithObservableSubscriptionRunCompletion() { + PublishSubject source = PublishSubject.create(); + final PublishSubject sdelay = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + Func0> subFunc = new Func0>() { + @Override + public Observable call() { + return sdelay; + } + }; + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(subFunc, delayFunc).subscribe(o); + + source.onNext(1); + sdelay.onCompleted(); + + source.onNext(2); + delay.onNext(2); + + inOrder.verify(o).onNext(2); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + verify(o, never()).onCompleted(); + } }