From 356a6902a6e96e4d1a131d71521b62d26596fd6c Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 11 Feb 2014 13:17:26 +0800 Subject: [PATCH] Add timeout unit tests --- .../src/test/java/rx/TimeoutTests.java | 52 ++++- .../rx/operators/OperationTimeoutTest.java | 184 +++++++++++++++++- 2 files changed, 232 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/test/java/rx/TimeoutTests.java b/rxjava-core/src/test/java/rx/TimeoutTests.java index 2e1bf6412e..0934d197b2 100644 --- a/rxjava-core/src/test/java/rx/TimeoutTests.java +++ b/rxjava-core/src/test/java/rx/TimeoutTests.java @@ -15,9 +15,15 @@ */ package rx; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -26,6 +32,7 @@ import org.mockito.InOrder; import org.mockito.MockitoAnnotations; +import rx.Observable.OnSubscribe; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; @@ -219,4 +226,45 @@ public void shouldSwitchToOtherAndCanBeUnsubscribedIfOnNextNotWithinTimeout() { inOrder.verify(observer, times(1)).onNext("b"); inOrder.verifyNoMoreInteractions(); } + + @Test + public void shouldTimeoutIfSynchronizedObservableEmitFirstOnNextNotWithinTimeout() + throws InterruptedException { + final CountDownLatch exit = new CountDownLatch(1); + final CountDownLatch timeoutSetuped = new CountDownLatch(1); + + @SuppressWarnings("unchecked") + final Observer observer = mock(Observer.class); + new Thread(new Runnable() { + + @Override + public void run() { + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber subscriber) { + try { + timeoutSetuped.countDown(); + exit.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + subscriber.onNext("a"); + subscriber.onCompleted(); + } + + }).timeout(1, TimeUnit.SECONDS, testScheduler) + .subscribe(observer); + } + }).start(); + + timeoutSetuped.await(); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError(isA(TimeoutException.class)); + inOrder.verifyNoMoreInteractions(); + + exit.countDown(); // exit the thread + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationTimeoutTest.java b/rxjava-core/src/test/java/rx/operators/OperationTimeoutTest.java index 8644127110..ab22e20993 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationTimeoutTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationTimeoutTest.java @@ -15,17 +15,32 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; import org.junit.Test; import org.mockito.InOrder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observer; +import rx.Subscriber; +import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -239,4 +254,169 @@ public Observable call() { verify(o, never()).onCompleted(); } + + @Test + public void testTimeoutSelectorWithFirstTimeoutFirstAndNoOtherObservable() { + PublishSubject source = PublishSubject.create(); + final PublishSubject timeout = PublishSubject.create(); + + Func0> firstTimeoutFunc = new Func0>() { + @Override + public Observable call() { + return timeout; + } + }; + + Func1> timeoutFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return PublishSubject.create(); + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + source.timeout(firstTimeoutFunc, timeoutFunc).subscribe(o); + + timeout.onNext(1); + + InOrder inOrder = inOrder(o); + inOrder.verify(o).onError(isA(TimeoutException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testTimeoutSelectorWithTimeoutFirstAndNoOtherObservable() { + PublishSubject source = PublishSubject.create(); + final PublishSubject timeout = PublishSubject.create(); + + Func0> firstTimeoutFunc = new Func0>() { + @Override + public Observable call() { + return PublishSubject.create(); + } + }; + + Func1> timeoutFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return timeout; + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + source.timeout(firstTimeoutFunc, timeoutFunc).subscribe(o); + source.onNext(1); + + timeout.onNext(1); + + InOrder inOrder = inOrder(o); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onError(isA(TimeoutException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testTimeoutSelectorWithTimeoutAndOnNextRaceCondition() throws InterruptedException { + // Thread 1 Thread 2 + // + // observer.onNext(1) + // start timeout + // unsubscribe timeout in thread 2 start to do some long-time work in "unsubscribe" + // observer.onNext(2) + // timeout.onNext(1) + // "unsubscribe" done + // + // + // In the above case, the timeout operator should ignore "timeout.onNext(1)" + // since "observer" has already seen 2. + final CountDownLatch observerReceivedTwo = new CountDownLatch(1); + final CountDownLatch timeoutEmittedOne = new CountDownLatch(1); + final CountDownLatch observerCompleted = new CountDownLatch(1); + + final Func1> timeoutFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + if (t1 == 1) { + // Force "unsubscribe" run on another thread + return Observable.create(new OnSubscribe(){ + @Override + public void call(Subscriber subscriber) { + subscriber.add(Subscriptions.create(new Action0(){ + @Override + public void call() { + try { + // emulate "unsubscribe" is busy and finishes after timeout.onNext(1) + timeoutEmittedOne.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }})); + // force the timeout message be sent after observer.onNext(2) + try { + observerReceivedTwo.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if(!subscriber.isUnsubscribed()) { + subscriber.onNext(1); + timeoutEmittedOne.countDown(); + } + } + }).subscribeOn(Schedulers.newThread()); + } else { + return PublishSubject.create(); + } + } + }; + + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + observerReceivedTwo.countDown(); + return null; + } + + }).when(o).onNext(2); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + observerCompleted.countDown(); + return null; + } + + }).when(o).onCompleted(); + + new Thread(new Runnable() { + + @Override + public void run() { + PublishSubject source = PublishSubject.create(); + source.timeout(timeoutFunc, Observable.from(3)).subscribe(o); + source.onNext(1); // start timeout + source.onNext(2); // disable timeout + try { + timeoutEmittedOne.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + source.onCompleted(); + } + + }).start(); + + observerCompleted.await(); + + InOrder inOrder = inOrder(o); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o, never()).onNext(3); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } }