From 87b0ce9fe46f00000a88c079c605b59dbe0f90fc Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Fri, 13 Sep 2013 10:00:02 +0200 Subject: [PATCH] wrapped subscription so that interval works for multiple subscribers and added a test for staggered subscriptions with publish/connect, too --- .../java/rx/operators/OperationInterval.java | 57 +++++++++++++++++-- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index 5c74709b2f..0f53c884ba 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -32,6 +32,7 @@ import rx.Subscription; import rx.concurrency.Schedulers; import rx.concurrency.TestScheduler; +import rx.observables.ConnectableObservable; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; @@ -45,14 +46,20 @@ public final class OperationInterval { * Creates an event each time interval. */ public static OnSubscribeFunc interval(long interval, TimeUnit unit) { - return new Interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + return interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); } /** * Creates an event each time interval. */ - public static OnSubscribeFunc interval(long interval, TimeUnit unit, Scheduler scheduler) { - return new Interval(interval, unit, scheduler); + public static OnSubscribeFunc interval(final long interval, final TimeUnit unit, final Scheduler scheduler) { + // wrapped in order to work with multiple subscribers + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + return new Interval(interval, unit, scheduler).onSubscribe(observer); + } + }; } private static class Interval implements OnSubscribeFunc { @@ -188,7 +195,6 @@ public void testWithMultipleStaggeredSubscribers() { inOrder1.verify(observer, times(1)).onNext(2L); inOrder1.verify(observer, times(1)).onNext(3L); - inOrder1.verify(observer, never()).onNext(4L); InOrder inOrder2 = inOrder(observer2); inOrder2.verify(observer2, times(1)).onNext(0L); @@ -201,9 +207,50 @@ public void testWithMultipleStaggeredSubscribers() { inOrder1.verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); - inOrder2.verify(observer2, never()).onNext(2L); + inOrder2.verify(observer2, never()).onNext(anyLong()); inOrder2.verify(observer2, times(1)).onCompleted(); verify(observer2, never()).onError(any(Throwable.class)); } + + @Test + public void testWithMultipleStaggeredSubscribersAndPublish() { + ConnectableObservable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)).publish(); + Subscription sub1 = w.subscribe(observer); + w.connect(); + + verify(observer, never()).onNext(anyLong()); + + scheduler.advanceTimeTo(2, TimeUnit.SECONDS); + Subscription sub2 = w.subscribe(observer2); + + InOrder inOrder1 = inOrder(observer); + inOrder1.verify(observer, times(1)).onNext(0L); + inOrder1.verify(observer, times(1)).onNext(1L); + inOrder1.verify(observer, never()).onNext(2L); + + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer2, never()).onNext(anyLong()); + + scheduler.advanceTimeTo(4, TimeUnit.SECONDS); + + inOrder1.verify(observer, times(1)).onNext(2L); + inOrder1.verify(observer, times(1)).onNext(3L); + + InOrder inOrder2 = inOrder(observer2); + inOrder2.verify(observer2, times(1)).onNext(2L); + inOrder2.verify(observer2, times(1)).onNext(3L); + + sub1.unsubscribe(); + sub2.unsubscribe(); + + inOrder1.verify(observer, never()).onNext(anyLong()); + inOrder1.verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + inOrder2.verify(observer2, never()).onNext(anyLong()); + inOrder2.verify(observer2, never()).onCompleted(); + verify(observer2, never()).onError(any(Throwable.class)); + } } }