Skip to content

Commit

Permalink
Merge pull request #463 from zsxwing/timeout-overload
Browse files Browse the repository at this point in the history
Added the rest overloads of Timeout operator
  • Loading branch information
benjchristensen committed Nov 12, 2013
2 parents 1b0deef + 0b8014c commit 0b160cb
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 18 deletions.
79 changes: 67 additions & 12 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4543,31 +4543,86 @@ public Observable<T> ignoreElements() {
}

/**
* Returns either the observable sequence or an TimeoutException if timeout elapses.
*
* Applies a timeout policy for each element in the observable sequence,
* using the specified scheduler to run timeout timers. If the next element
* isn't received within the specified timeout duration starting from its
* predecessor, a TimeoutException is propagated to the observer.
*
* @param timeout
* The timeout duration
* Maximum duration between values before a timeout occurs.
* @param timeUnit
* The time unit of the timeout
* The unit of time which applies to the "timeout" argument.
*
* @return The source sequence with a TimeoutException in case of a timeout.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244283(v=vs.103).aspx">MSDN: Observable.Timeout</a>
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
return create(OperationTimeout.timeout(this, timeout, timeUnit));
}

/**
* Applies a timeout policy for each element in the observable sequence,
* using the specified scheduler to run timeout timers. If the next element
* isn't received within the specified timeout duration starting from its
* predecessor, the other observable sequence is used to produce future
* messages from that point on.
*
* @param timeout
* Maximum duration between values before a timeout occurs.
* @param timeUnit
* The unit of time which applies to the "timeout" argument.
* @param other
* Sequence to return in case of a timeout.
*
* @return The source sequence switching to the other sequence in case of a timeout.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229512(v=vs.103).aspx">MSDN: Observable.Timeout</a>
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, other));
}

/**
* Applies a timeout policy for each element in the observable sequence,
* using the specified scheduler to run timeout timers. If the next element
* isn't received within the specified timeout duration starting from its
* predecessor, a TimeoutException is propagated to the observer.
*
* @param timeout
* Maximum duration between values before a timeout occurs.
* @param timeUnit
* The unit of time which applies to the "timeout" argument.
* @param scheduler
* The scheduler to run the timeout timers on.
* Scheduler to run the timeout timers on.
*
* @return The source sequence with a TimeoutException in case of a timeout.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228946(v=vs.103).aspx">MSDN: Observable.Timeout</a>
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler));
}

/**
* Returns either the observable sequence or an TimeoutException if timeout elapses.
*
* Applies a timeout policy for each element in the observable sequence,
* using the specified scheduler to run timeout timers. If the next element
* isn't received within the specified timeout duration starting from its
* predecessor, the other observable sequence is used to produce future
* messages from that point on.
*
* @param timeout
* The timeout duration
* Maximum duration between values before a timeout occurs.
* @param timeUnit
* The time unit of the timeout
* @return The source sequence with a TimeoutException in case of a timeout.
* The unit of time which applies to the "timeout" argument.
* @param other
* Sequence to return in case of a timeout.
* @param scheduler
* Scheduler to run the timeout timers on.
*
* @return The source sequence switching to the other sequence in case of a
* timeout.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211676(v=vs.103).aspx">MSDN: Observable.Timeout</a>
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation()));
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler));
}

/**
Expand Down
37 changes: 33 additions & 4 deletions rxjava-core/src/main/java/rx/operators/OperationTimeout.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,53 @@
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

/**
* Applies a timeout policy for each element in the observable sequence, using
* the specified scheduler to run timeout timers. If the next element isn't
* received within the specified timeout duration starting from its predecessor,
* the other observable sequence is used to produce future messages from that
* point on.
*/
public final class OperationTimeout {
public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return new Timeout<T>(source, timeout, timeUnit, scheduler);

public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit) {
return new Timeout<T>(source, timeout, timeUnit, null, Schedulers.threadPoolForComputation());
}

public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> sequence, long timeout, TimeUnit timeUnit, Observable<? extends T> other) {
return new Timeout<T>(sequence, timeout, timeUnit, other, Schedulers.threadPoolForComputation());
}

public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return new Timeout<T>(source, timeout, timeUnit, null, scheduler);
}

public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> sequence, long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
return new Timeout<T>(sequence, timeout, timeUnit, other, scheduler);
}

private static class Timeout<T> implements Observable.OnSubscribeFunc<T> {
private final Observable<? extends T> source;
private final long timeout;
private final TimeUnit timeUnit;
private final Scheduler scheduler;
private final Observable<? extends T> other;

private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
this.source = source;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.other = other;
this.scheduler = scheduler;
}

Expand All @@ -68,7 +92,12 @@ public void call() {
}
}
if (timeoutWins) {
observer.onError(new TimeoutException());
if (other == null) {
observer.onError(new TimeoutException());
}
else {
serial.setSubscription(other.subscribe(observer));
}
}

}
Expand Down
114 changes: 112 additions & 2 deletions rxjava-core/src/test/java/rx/TimeoutTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
*/
package rx;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.MockitoAnnotations;

import rx.concurrency.TestScheduler;
Expand All @@ -46,6 +51,7 @@ public void setUp() {

@Test
public void shouldNotTimeoutIfOnNextWithinTimeout() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -58,6 +64,7 @@ public void shouldNotTimeoutIfOnNextWithinTimeout() {

@Test
public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -72,6 +79,7 @@ public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {

@Test
public void shouldTimeoutIfOnNextNotWithinTimeout() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
Expand All @@ -81,6 +89,7 @@ public void shouldTimeoutIfOnNextNotWithinTimeout() {

@Test
public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -93,6 +102,7 @@ public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {

@Test
public void shouldCompleteIfUnderlyingComletes() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -105,6 +115,7 @@ public void shouldCompleteIfUnderlyingComletes() {

@Test
public void shouldErrorIfUnderlyingErrors() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -113,4 +124,103 @@ public void shouldErrorIfUnderlyingErrors() {
verify(observer).onError(any(UnsupportedOperationException.class));
subscription.unsubscribe();
}

@Test
public void shouldSwitchToOtherIfOnNextNotWithinTimeout() {
Observable<String> other = Observable.from("a", "b", "c");
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = source.subscribe(observer);

testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
underlyingSubject.onNext("Two");
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("One");
inOrder.verify(observer, times(1)).onNext("a");
inOrder.verify(observer, times(1)).onNext("b");
inOrder.verify(observer, times(1)).onNext("c");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
subscription.unsubscribe();
}

@Test
public void shouldSwitchToOtherIfOnErrorNotWithinTimeout() {
Observable<String> other = Observable.from("a", "b", "c");
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = source.subscribe(observer);

testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
underlyingSubject.onError(new UnsupportedOperationException());
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("One");
inOrder.verify(observer, times(1)).onNext("a");
inOrder.verify(observer, times(1)).onNext("b");
inOrder.verify(observer, times(1)).onNext("c");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
subscription.unsubscribe();
}

@Test
public void shouldSwitchToOtherIfOnCompletedNotWithinTimeout() {
Observable<String> other = Observable.from("a", "b", "c");
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = source.subscribe(observer);

testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
underlyingSubject.onCompleted();
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("One");
inOrder.verify(observer, times(1)).onNext("a");
inOrder.verify(observer, times(1)).onNext("b");
inOrder.verify(observer, times(1)).onNext("c");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
subscription.unsubscribe();
}

@Test
public void shouldSwitchToOtherAndCanBeUnsubscribedIfOnNextNotWithinTimeout() {
PublishSubject<String> other = PublishSubject.create();
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = source.subscribe(observer);

testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
underlyingSubject.onNext("Two");

other.onNext("a");
other.onNext("b");
subscription.unsubscribe();

// The following messages should not be delivered.
other.onNext("c");
other.onNext("d");
other.onCompleted();

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("One");
inOrder.verify(observer, times(1)).onNext("a");
inOrder.verify(observer, times(1)).onNext("b");
inOrder.verifyNoMoreInteractions();
}
}

0 comments on commit 0b160cb

Please sign in to comment.