Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement the timeout operator and fix timeout bugs #851

Merged
merged 4 commits into from
Feb 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 16 additions & 24 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationTimeInterval;
import rx.operators.OperationTimeout;
import rx.operators.OperationTimer;
import rx.operators.OperationToMap;
import rx.operators.OperationToMultimap;
Expand All @@ -105,6 +104,8 @@
import rx.operators.OperatorRepeat;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorTake;
import rx.operators.OperatorTimeout;
import rx.operators.OperatorTimeoutWithSelector;
import rx.operators.OperatorTimestamp;
import rx.operators.OperatorToObservableList;
import rx.operators.OperatorToObservableSortedList;
Expand Down Expand Up @@ -7754,11 +7755,8 @@ public final Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
* @return an Observable that completes if either the first item or any subsequent item doesn't
* arrive within the time windows specified by the timeout selectors
*/
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector) {
if (firstTimeoutSelector == null) {
throw new NullPointerException("firstTimeoutSelector");
}
return timeout(firstTimeoutSelector, timeoutSelector, Observable.<T> empty());
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<V>> timeoutSelector) {
return timeout(firstTimeoutSelector, timeoutSelector, null);
}

/**
Expand All @@ -7784,14 +7782,11 @@ public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTi
* @return an Observable that mirrors the source Observable, but switches to the {@code other} Observable if either the first item emitted by the source Observable or any
* subsequent item don't arrive within time windows defined by the timeout selectors
*/
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
if (firstTimeoutSelector == null) {
throw new NullPointerException("firstTimeoutSelector");
}
if (other == null) {
throw new NullPointerException("other");
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<V>> timeoutSelector, Observable<? extends T> other) {
if(timeoutSelector == null) {
throw new NullPointerException("timeoutSelector is null");
}
return create(OperationTimeout.timeoutSelector(this, firstTimeoutSelector, timeoutSelector, other));
return lift(new OperatorTimeoutWithSelector<T, U, V>(firstTimeoutSelector, timeoutSelector, other));
}

/**
Expand All @@ -7813,8 +7808,8 @@ public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTi
* the source Observable takes longer to arrive than the time window defined by the
* selector for the previously emitted item
*/
public final <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector) {
return timeout(timeoutSelector, Observable.<T> empty());
public final <V> Observable<T> timeout(Func1<? super T, ? extends Observable<V>> timeoutSelector) {
return timeout(null, timeoutSelector, null);
}

/**
Expand All @@ -7838,11 +7833,8 @@ public final <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>>
* fallback Observable if a item emitted by the source Observable takes longer to arrive
* than the time window defined by the selector for the previously emitted item
*/
public final <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
if (other == null) {
throw new NullPointerException("other");
}
return create(OperationTimeout.timeoutSelector(this, null, timeoutSelector, other));
public final <V> Observable<T> timeout(Func1<? super T, ? extends Observable<V>> timeoutSelector, Observable<? extends T> other) {
return timeout(null, timeoutSelector, other);
}

/**
Expand All @@ -7863,7 +7855,7 @@ public final <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244283.aspx">MSDN: Observable.Timeout</a>
*/
public final Observable<T> timeout(long timeout, TimeUnit timeUnit) {
return create(OperationTimeout.timeout(this, timeout, timeUnit));
return timeout(timeout, timeUnit, null, Schedulers.computation());
}

/**
Expand All @@ -7886,7 +7878,7 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229512.aspx">MSDN: Observable.Timeout</a>
*/
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, other));
return timeout(timeout, timeUnit, other, Schedulers.computation());
}

/**
Expand All @@ -7911,7 +7903,7 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<?
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211676.aspx">MSDN: Observable.Timeout</a>
*/
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler));
return lift(new OperatorTimeout<T>(timeout, timeUnit, other, scheduler));
}

/**
Expand All @@ -7934,7 +7926,7 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<?
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228946.aspx">MSDN: Observable.Timeout</a>
*/
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler));
return timeout(timeout, timeUnit, null, scheduler);
}

/**
Expand Down
Loading