Skip to content

Commit

Permalink
Merge pull request #720 from hura/master
Browse files Browse the repository at this point in the history
Added `Observable.timeout` wrappers to scala adapter
  • Loading branch information
benjchristensen committed Jan 9, 2014
2 parents 0b1b6e7 + c030321 commit ab689bf
Showing 1 changed file with 79 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,81 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler))
}

/**
* Applies a timeout policy for each item emitted by the Observable, using
* the specified scheduler to run timeout timers. If the next item isn't
* observed within the specified timeout duration starting from its
* predecessor, observers are notified of a `TimeoutException`.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1.png">
*
* @param timeout maximum duration between items before a timeout occurs
* @return the source Observable modified to notify observers of a
* `TimeoutException` in case of a timeout
*/
def timeout(timeout: Duration): Observable[T] = {
toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit))
}

/**
* Applies a timeout policy for each item emitted by the Observable, using
* the specified scheduler to run timeout timers. If the next item isn't
* observed within the specified timeout duration starting from its
* predecessor, a specified fallback Observable produces future items and
* notifications from that point on.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2.png">
*
* @param timeout maximum duration between items before a timeout occurs
* @param other fallback Observable to use in case of a timeout
* @return the source Observable modified to switch to the fallback
* Observable in case of a timeout
*/
def timeout[U >: T](timeout: Duration, other: Observable[U]): Observable[U] = {
val otherJava: rx.Observable[_ <: U] = other.asJavaObservable
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava))
}

/**
* Applies a timeout policy for each item emitted by the Observable, using
* the specified scheduler to run timeout timers. If the next item isn't
* observed within the specified timeout duration starting from its
* predecessor, the observer is notified of a `TimeoutException`.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1s.png">
*
* @param timeout maximum duration between items before a timeout occurs
* @param scheduler Scheduler to run the timeout timers on
* @return the source Observable modified to notify observers of a
* `TimeoutException` in case of a timeout
*/
def timeout(timeout: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit, scheduler.asJavaScheduler))
}

/**
* Applies a timeout policy for each item emitted by the Observable, using
* the specified scheduler to run timeout timers. If the next item isn't
* observed within the specified timeout duration starting from its
* predecessor, a specified fallback Observable sequence produces future
* items and notifications from that point on.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2s.png">
*
* @param timeout maximum duration between items before a timeout occurs
* @param other Observable to use as the fallback in case of a timeout
* @param scheduler Scheduler to run the timeout timers on
* @return the source Observable modified so that it will switch to the
* fallback Observable in case of a timeout
*/
def timeout[U >: T](timeout: Duration, other: Observable[U], scheduler: Scheduler): Observable[U] = {
val otherJava: rx.Observable[_ <: U] = other.asJavaObservable
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava, scheduler.asJavaScheduler))
}


/**
* Returns an Observable that sums up the elements of this Observable.
*
Expand Down Expand Up @@ -1894,21 +1969,21 @@ trait Observable[+T]
}

/**
* Invokes an action if the source Observable calls <code>onError</code>.
* Invokes an action if the source Observable calls `onError`.
*
* @param onError the action to invoke if the source Observable calls
* <code>onError</code>
* `onError`
* @return the source Observable with the side-effecting behavior applied
*/
def doOnError(onError: Throwable => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnError(onError))
}

/**
* Invokes an action when the source Observable calls <code>onCompleted</code>.
* Invokes an action when the source Observable calls `onCompleted`.
*
* @param onCompleted the action to invoke when the source Observable calls
* <code>onCompleted</code>
* `onCompleted`
* @return the source Observable with the side-effecting behavior applied
*/
def doOnCompleted(onCompleted: () => Unit): Observable[T] = {
Expand Down

0 comments on commit ab689bf

Please sign in to comment.