Skip to content

Commit

Permalink
wrapped subscription so that interval works for multiple subscribers …
Browse files Browse the repository at this point in the history
…and added a test for staggered subscriptions with publish/connect, too
  • Loading branch information
Joachim Hofer committed Sep 13, 2013
1 parent 43a0f36 commit 87b0ce9
Showing 1 changed file with 52 additions and 5 deletions.
57 changes: 52 additions & 5 deletions rxjava-core/src/main/java/rx/operators/OperationInterval.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.concurrency.TestScheduler;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

Expand All @@ -45,14 +46,20 @@ public final class OperationInterval {
* Creates an event each time interval.
*/
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit) {
return new Interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
return interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
}

/**
* Creates an event each time interval.
*/
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) {
return new Interval(interval, unit, scheduler);
public static OnSubscribeFunc<Long> interval(final long interval, final TimeUnit unit, final Scheduler scheduler) {
// wrapped in order to work with multiple subscribers
return new OnSubscribeFunc<Long>() {
@Override
public Subscription onSubscribe(Observer<? super Long> observer) {
return new Interval(interval, unit, scheduler).onSubscribe(observer);
}
};
}

private static class Interval implements OnSubscribeFunc<Long> {
Expand Down Expand Up @@ -188,7 +195,6 @@ public void testWithMultipleStaggeredSubscribers() {

inOrder1.verify(observer, times(1)).onNext(2L);
inOrder1.verify(observer, times(1)).onNext(3L);
inOrder1.verify(observer, never()).onNext(4L);

InOrder inOrder2 = inOrder(observer2);
inOrder2.verify(observer2, times(1)).onNext(0L);
Expand All @@ -201,9 +207,50 @@ public void testWithMultipleStaggeredSubscribers() {
inOrder1.verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

inOrder2.verify(observer2, never()).onNext(2L);
inOrder2.verify(observer2, never()).onNext(anyLong());
inOrder2.verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
}

@Test
public void testWithMultipleStaggeredSubscribersAndPublish() {
ConnectableObservable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)).publish();
Subscription sub1 = w.subscribe(observer);
w.connect();

verify(observer, never()).onNext(anyLong());

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
Subscription sub2 = w.subscribe(observer2);

InOrder inOrder1 = inOrder(observer);
inOrder1.verify(observer, times(1)).onNext(0L);
inOrder1.verify(observer, times(1)).onNext(1L);
inOrder1.verify(observer, never()).onNext(2L);

verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
verify(observer2, never()).onNext(anyLong());

scheduler.advanceTimeTo(4, TimeUnit.SECONDS);

inOrder1.verify(observer, times(1)).onNext(2L);
inOrder1.verify(observer, times(1)).onNext(3L);

InOrder inOrder2 = inOrder(observer2);
inOrder2.verify(observer2, times(1)).onNext(2L);
inOrder2.verify(observer2, times(1)).onNext(3L);

sub1.unsubscribe();
sub2.unsubscribe();

inOrder1.verify(observer, never()).onNext(anyLong());
inOrder1.verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

inOrder2.verify(observer2, never()).onNext(anyLong());
inOrder2.verify(observer2, never()).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
}
}
}

0 comments on commit 87b0ce9

Please sign in to comment.