Skip to content

Commit

Permalink
Merge pull request ReactiveX#379 from jmhofer/interval-multiple-subsc…
Browse files Browse the repository at this point in the history
…ribers

Make `interval` work with multiple subscribers
  • Loading branch information
benjchristensen committed Sep 13, 2013
2 parents 4f13178 + 87b0ce9 commit a73c6d9
Showing 1 changed file with 132 additions and 3 deletions.
135 changes: 132 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationInterval.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.concurrency.TestScheduler;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

Expand All @@ -45,14 +46,20 @@ public final class OperationInterval {
* Creates an event each time interval.
*/
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit) {
return new Interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
return interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
}

/**
* Creates an event each time interval.
*/
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) {
return new Interval(interval, unit, scheduler);
public static OnSubscribeFunc<Long> interval(final long interval, final TimeUnit unit, final Scheduler scheduler) {
// wrapped in order to work with multiple subscribers
return new OnSubscribeFunc<Long>() {
@Override
public Subscription onSubscribe(Observer<? super Long> observer) {
return new Interval(interval, unit, scheduler).onSubscribe(observer);
}
};
}

private static class Interval implements OnSubscribeFunc<Long> {
Expand Down Expand Up @@ -91,12 +98,14 @@ public void call() {
public static class UnitTest {
private TestScheduler scheduler;
private Observer<Long> observer;
private Observer<Long> observer2;

@Before
@SuppressWarnings("unchecked") // due to mocking
public void before() {
scheduler = new TestScheduler();
observer = mock(Observer.class);
observer2 = mock(Observer.class);
}

@Test
Expand All @@ -123,5 +132,125 @@ public void testInterval() {
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}

@Test
public void testWithMultipleSubscribersStartingAtSameTime() {
Observable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler));
Subscription sub1 = w.subscribe(observer);
Subscription sub2 = w.subscribe(observer2);

verify(observer, never()).onNext(anyLong());
verify(observer2, never()).onNext(anyLong());

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);

InOrder inOrder1 = inOrder(observer);
InOrder inOrder2 = inOrder(observer2);

inOrder1.verify(observer, times(1)).onNext(0L);
inOrder1.verify(observer, times(1)).onNext(1L);
inOrder1.verify(observer, never()).onNext(2L);
verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

inOrder2.verify(observer2, times(1)).onNext(0L);
inOrder2.verify(observer2, times(1)).onNext(1L);
inOrder2.verify(observer2, never()).onNext(2L);
verify(observer2, never()).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));

sub1.unsubscribe();
sub2.unsubscribe();
scheduler.advanceTimeTo(4, TimeUnit.SECONDS);

verify(observer, never()).onNext(2L);
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

verify(observer2, never()).onNext(2L);
verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
}

@Test
public void testWithMultipleStaggeredSubscribers() {
Observable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler));
Subscription sub1 = w.subscribe(observer);

verify(observer, never()).onNext(anyLong());

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
Subscription sub2 = w.subscribe(observer2);

InOrder inOrder1 = inOrder(observer);
inOrder1.verify(observer, times(1)).onNext(0L);
inOrder1.verify(observer, times(1)).onNext(1L);
inOrder1.verify(observer, never()).onNext(2L);

verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
verify(observer2, never()).onNext(anyLong());

scheduler.advanceTimeTo(4, TimeUnit.SECONDS);

inOrder1.verify(observer, times(1)).onNext(2L);
inOrder1.verify(observer, times(1)).onNext(3L);

InOrder inOrder2 = inOrder(observer2);
inOrder2.verify(observer2, times(1)).onNext(0L);
inOrder2.verify(observer2, times(1)).onNext(1L);

sub1.unsubscribe();
sub2.unsubscribe();

inOrder1.verify(observer, never()).onNext(anyLong());
inOrder1.verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

inOrder2.verify(observer2, never()).onNext(anyLong());
inOrder2.verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
}

@Test
public void testWithMultipleStaggeredSubscribersAndPublish() {
ConnectableObservable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)).publish();
Subscription sub1 = w.subscribe(observer);
w.connect();

verify(observer, never()).onNext(anyLong());

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
Subscription sub2 = w.subscribe(observer2);

InOrder inOrder1 = inOrder(observer);
inOrder1.verify(observer, times(1)).onNext(0L);
inOrder1.verify(observer, times(1)).onNext(1L);
inOrder1.verify(observer, never()).onNext(2L);

verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
verify(observer2, never()).onNext(anyLong());

scheduler.advanceTimeTo(4, TimeUnit.SECONDS);

inOrder1.verify(observer, times(1)).onNext(2L);
inOrder1.verify(observer, times(1)).onNext(3L);

InOrder inOrder2 = inOrder(observer2);
inOrder2.verify(observer2, times(1)).onNext(2L);
inOrder2.verify(observer2, times(1)).onNext(3L);

sub1.unsubscribe();
sub2.unsubscribe();

inOrder1.verify(observer, never()).onNext(anyLong());
inOrder1.verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

inOrder2.verify(observer2, never()).onNext(anyLong());
inOrder2.verify(observer2, never()).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
}
}
}

0 comments on commit a73c6d9

Please sign in to comment.