Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the rest overloads of Timeout operator #463

Merged
merged 1 commit into from
Nov 12, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4526,31 +4526,86 @@ public Observable<T> ignoreElements() {
}

/**
* Returns either the observable sequence or an TimeoutException if timeout elapses.
*
* Applies a timeout policy for each element in the observable sequence,
* using the specified scheduler to run timeout timers. If the next element
* isn't received within the specified timeout duration starting from its
* predecessor, a TimeoutException is propagated to the observer.
*
* @param timeout
* The timeout duration
* Maximum duration between values before a timeout occurs.
* @param timeUnit
* The time unit of the timeout
* The unit of time which applies to the "timeout" argument.
*
* @return The source sequence with a TimeoutException in case of a timeout.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244283(v=vs.103).aspx">MSDN: Observable.Timeout</a>
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
return create(OperationTimeout.timeout(this, timeout, timeUnit));
}

/**
* Applies a timeout policy for each element in the observable sequence,
* using the specified scheduler to run timeout timers. If the next element
* isn't received within the specified timeout duration starting from its
* predecessor, the other observable sequence is used to produce future
* messages from that point on.
*
* @param timeout
* Maximum duration between values before a timeout occurs.
* @param timeUnit
* The unit of time which applies to the "timeout" argument.
* @param other
* Sequence to return in case of a timeout.
*
* @return The source sequence switching to the other sequence in case of a timeout.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229512(v=vs.103).aspx">MSDN: Observable.Timeout</a>
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, other));
}

/**
* Applies a timeout policy for each element in the observable sequence,
* using the specified scheduler to run timeout timers. If the next element
* isn't received within the specified timeout duration starting from its
* predecessor, a TimeoutException is propagated to the observer.
*
* @param timeout
* Maximum duration between values before a timeout occurs.
* @param timeUnit
* The unit of time which applies to the "timeout" argument.
* @param scheduler
* The scheduler to run the timeout timers on.
* Scheduler to run the timeout timers on.
*
* @return The source sequence with a TimeoutException in case of a timeout.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228946(v=vs.103).aspx">MSDN: Observable.Timeout</a>
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler));
}

/**
* Returns either the observable sequence or an TimeoutException if timeout elapses.
*
* Applies a timeout policy for each element in the observable sequence,
* using the specified scheduler to run timeout timers. If the next element
* isn't received within the specified timeout duration starting from its
* predecessor, the other observable sequence is used to produce future
* messages from that point on.
*
* @param timeout
* The timeout duration
* Maximum duration between values before a timeout occurs.
* @param timeUnit
* The time unit of the timeout
* @return The source sequence with a TimeoutException in case of a timeout.
* The unit of time which applies to the "timeout" argument.
* @param other
* Sequence to return in case of a timeout.
* @param scheduler
* Scheduler to run the timeout timers on.
*
* @return The source sequence switching to the other sequence in case of a
* timeout.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211676(v=vs.103).aspx">MSDN: Observable.Timeout</a>
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation()));
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler));
}

/**
Expand Down
37 changes: 33 additions & 4 deletions rxjava-core/src/main/java/rx/operators/OperationTimeout.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,53 @@
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

/**
* Applies a timeout policy for each element in the observable sequence, using
* the specified scheduler to run timeout timers. If the next element isn't
* received within the specified timeout duration starting from its predecessor,
* the other observable sequence is used to produce future messages from that
* point on.
*/
public final class OperationTimeout {
public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return new Timeout<T>(source, timeout, timeUnit, scheduler);

public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit) {
return new Timeout<T>(source, timeout, timeUnit, null, Schedulers.threadPoolForComputation());
}

public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> sequence, long timeout, TimeUnit timeUnit, Observable<? extends T> other) {
return new Timeout<T>(sequence, timeout, timeUnit, other, Schedulers.threadPoolForComputation());
}

public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return new Timeout<T>(source, timeout, timeUnit, null, scheduler);
}

public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> sequence, long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
return new Timeout<T>(sequence, timeout, timeUnit, other, scheduler);
}

private static class Timeout<T> implements Observable.OnSubscribeFunc<T> {
private final Observable<? extends T> source;
private final long timeout;
private final TimeUnit timeUnit;
private final Scheduler scheduler;
private final Observable<? extends T> other;

private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
this.source = source;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.other = other;
this.scheduler = scheduler;
}

Expand All @@ -68,7 +92,12 @@ public void call() {
}
}
if (timeoutWins) {
observer.onError(new TimeoutException());
if (other == null) {
observer.onError(new TimeoutException());
}
else {
serial.setSubscription(other.subscribe(observer));
}
}

}
Expand Down
114 changes: 112 additions & 2 deletions rxjava-core/src/test/java/rx/TimeoutTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
*/
package rx;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.MockitoAnnotations;

import rx.concurrency.TestScheduler;
Expand All @@ -46,6 +51,7 @@ public void setUp() {

@Test
public void shouldNotTimeoutIfOnNextWithinTimeout() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -58,6 +64,7 @@ public void shouldNotTimeoutIfOnNextWithinTimeout() {

@Test
public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -72,6 +79,7 @@ public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {

@Test
public void shouldTimeoutIfOnNextNotWithinTimeout() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
Expand All @@ -81,6 +89,7 @@ public void shouldTimeoutIfOnNextNotWithinTimeout() {

@Test
public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -93,6 +102,7 @@ public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {

@Test
public void shouldCompleteIfUnderlyingComletes() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -105,6 +115,7 @@ public void shouldCompleteIfUnderlyingComletes() {

@Test
public void shouldErrorIfUnderlyingErrors() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
Expand All @@ -113,4 +124,103 @@ public void shouldErrorIfUnderlyingErrors() {
verify(observer).onError(any(UnsupportedOperationException.class));
subscription.unsubscribe();
}

@Test
public void shouldSwitchToOtherIfOnNextNotWithinTimeout() {
Observable<String> other = Observable.from("a", "b", "c");
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = source.subscribe(observer);

testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
underlyingSubject.onNext("Two");
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("One");
inOrder.verify(observer, times(1)).onNext("a");
inOrder.verify(observer, times(1)).onNext("b");
inOrder.verify(observer, times(1)).onNext("c");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
subscription.unsubscribe();
}

@Test
public void shouldSwitchToOtherIfOnErrorNotWithinTimeout() {
Observable<String> other = Observable.from("a", "b", "c");
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = source.subscribe(observer);

testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
underlyingSubject.onError(new UnsupportedOperationException());
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("One");
inOrder.verify(observer, times(1)).onNext("a");
inOrder.verify(observer, times(1)).onNext("b");
inOrder.verify(observer, times(1)).onNext("c");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
subscription.unsubscribe();
}

@Test
public void shouldSwitchToOtherIfOnCompletedNotWithinTimeout() {
Observable<String> other = Observable.from("a", "b", "c");
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = source.subscribe(observer);

testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
underlyingSubject.onCompleted();
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("One");
inOrder.verify(observer, times(1)).onNext("a");
inOrder.verify(observer, times(1)).onNext("b");
inOrder.verify(observer, times(1)).onNext("c");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
subscription.unsubscribe();
}

@Test
public void shouldSwitchToOtherAndCanBeUnsubscribedIfOnNextNotWithinTimeout() {
PublishSubject<String> other = PublishSubject.create();
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Subscription subscription = source.subscribe(observer);

testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
underlyingSubject.onNext("Two");

other.onNext("a");
other.onNext("b");
subscription.unsubscribe();

// The following messages should not be delivered.
other.onNext("c");
other.onNext("d");
other.onCompleted();

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("One");
inOrder.verify(observer, times(1)).onNext("a");
inOrder.verify(observer, times(1)).onNext("b");
inOrder.verifyNoMoreInteractions();
}
}