Skip to content

Commit

Permalink
Add timeout unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Feb 11, 2014
1 parent 0ef2737 commit 356a690
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 4 deletions.
52 changes: 50 additions & 2 deletions rxjava-core/src/test/java/rx/TimeoutTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
*/
package rx;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -26,6 +32,7 @@
import org.mockito.InOrder;
import org.mockito.MockitoAnnotations;

import rx.Observable.OnSubscribe;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;

Expand Down Expand Up @@ -219,4 +226,45 @@ public void shouldSwitchToOtherAndCanBeUnsubscribedIfOnNextNotWithinTimeout() {
inOrder.verify(observer, times(1)).onNext("b");
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldTimeoutIfSynchronizedObservableEmitFirstOnNextNotWithinTimeout()
throws InterruptedException {
final CountDownLatch exit = new CountDownLatch(1);
final CountDownLatch timeoutSetuped = new CountDownLatch(1);

@SuppressWarnings("unchecked")
final Observer<String> observer = mock(Observer.class);
new Thread(new Runnable() {

@Override
public void run() {
Observable.create(new OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
try {
timeoutSetuped.countDown();
exit.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext("a");
subscriber.onCompleted();
}

}).timeout(1, TimeUnit.SECONDS, testScheduler)
.subscribe(observer);
}
}).start();

timeoutSetuped.await();
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(isA(TimeoutException.class));
inOrder.verifyNoMoreInteractions();

exit.countDown(); // exit the thread
}
}
184 changes: 182 additions & 2 deletions rxjava-core/src/test/java/rx/operators/OperationTimeoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,32 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

Expand Down Expand Up @@ -239,4 +254,169 @@ public Observable<Integer> call() {
verify(o, never()).onCompleted();

}

@Test
public void testTimeoutSelectorWithFirstTimeoutFirstAndNoOtherObservable() {
PublishSubject<Integer> source = PublishSubject.create();
final PublishSubject<Integer> timeout = PublishSubject.create();

Func0<Observable<Integer>> firstTimeoutFunc = new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return timeout;
}
};

Func1<Integer, Observable<Integer>> timeoutFunc = new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
return PublishSubject.create();
}
};

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
source.timeout(firstTimeoutFunc, timeoutFunc).subscribe(o);

timeout.onNext(1);

InOrder inOrder = inOrder(o);
inOrder.verify(o).onError(isA(TimeoutException.class));
inOrder.verifyNoMoreInteractions();
}

@Test
public void testTimeoutSelectorWithTimeoutFirstAndNoOtherObservable() {
PublishSubject<Integer> source = PublishSubject.create();
final PublishSubject<Integer> timeout = PublishSubject.create();

Func0<Observable<Integer>> firstTimeoutFunc = new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return PublishSubject.create();
}
};

Func1<Integer, Observable<Integer>> timeoutFunc = new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
return timeout;
}
};

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
source.timeout(firstTimeoutFunc, timeoutFunc).subscribe(o);
source.onNext(1);

timeout.onNext(1);

InOrder inOrder = inOrder(o);
inOrder.verify(o).onNext(1);
inOrder.verify(o).onError(isA(TimeoutException.class));
inOrder.verifyNoMoreInteractions();
}

@Test
public void testTimeoutSelectorWithTimeoutAndOnNextRaceCondition() throws InterruptedException {
// Thread 1 Thread 2
//
// observer.onNext(1)
// start timeout
// unsubscribe timeout in thread 2 start to do some long-time work in "unsubscribe"
// observer.onNext(2)
// timeout.onNext(1)
// "unsubscribe" done
//
//
// In the above case, the timeout operator should ignore "timeout.onNext(1)"
// since "observer" has already seen 2.
final CountDownLatch observerReceivedTwo = new CountDownLatch(1);
final CountDownLatch timeoutEmittedOne = new CountDownLatch(1);
final CountDownLatch observerCompleted = new CountDownLatch(1);

final Func1<Integer, Observable<Integer>> timeoutFunc = new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer t1) {
if (t1 == 1) {
// Force "unsubscribe" run on another thread
return Observable.create(new OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(Subscriptions.create(new Action0(){
@Override
public void call() {
try {
// emulate "unsubscribe" is busy and finishes after timeout.onNext(1)
timeoutEmittedOne.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}}));
// force the timeout message be sent after observer.onNext(2)
try {
observerReceivedTwo.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if(!subscriber.isUnsubscribed()) {
subscriber.onNext(1);
timeoutEmittedOne.countDown();
}
}
}).subscribeOn(Schedulers.newThread());
} else {
return PublishSubject.create();
}
}
};

@SuppressWarnings("unchecked")
final Observer<Integer> o = mock(Observer.class);
doAnswer(new Answer<Void>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
observerReceivedTwo.countDown();
return null;
}

}).when(o).onNext(2);
doAnswer(new Answer<Void>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
observerCompleted.countDown();
return null;
}

}).when(o).onCompleted();

new Thread(new Runnable() {

@Override
public void run() {
PublishSubject<Integer> source = PublishSubject.create();
source.timeout(timeoutFunc, Observable.from(3)).subscribe(o);
source.onNext(1); // start timeout
source.onNext(2); // disable timeout
try {
timeoutEmittedOne.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
source.onCompleted();
}

}).start();

observerCompleted.await();

InOrder inOrder = inOrder(o);
inOrder.verify(o).onNext(1);
inOrder.verify(o).onNext(2);
inOrder.verify(o, never()).onNext(3);
inOrder.verify(o).onCompleted();
inOrder.verifyNoMoreInteractions();
}
}

0 comments on commit 356a690

Please sign in to comment.