Skip to content

Commit

Permalink
Merge pull request #1336 from zsxwing/rxscala-bo
Browse files Browse the repository at this point in the history
RxScala: Add the rest missing methods to BlockingObservable
  • Loading branch information
benjchristensen committed Jun 16, 2014
2 parents 01c7406 + 91c8bea commit dcad052
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -226,6 +227,28 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(squares.toBlockingObservable.toList, List(4, 100, 400, 900))
}

@Test def nextExample() {
val o = Observable.interval(100 millis).take(20)
for(i <- o.toBlocking.next) {
println(i)
Thread.sleep(200)
}
}

@Test def latestExample() {
val o = Observable.interval(100 millis).take(20)
for(i <- o.toBlocking.latest) {
println(i)
Thread.sleep(200)
}
}

@Test def toFutureExample() {
val o = Observable.interval(500 millis).take(1)
val r = Await.result(o.toBlocking.toFuture, 2 seconds)
println(r)
}

@Test def testTwoSubscriptionsToOneInterval() {
val o = Observable.interval(100 millis).take(8)
o.subscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2985,13 +2985,14 @@ trait Observable[+T]

/**
* If the source Observable completes after emitting a single item, return an Observable that emits that
* item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`.
* item. If the source Observable emits more than one item or no items, notify of an `IllegalArgumentException`
* or `NoSuchElementException` respectively.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
*
* @return an Observable that emits the single item emitted by the source Observable
* @throws NoSuchElementException
* if the source emits more than one item or no items
* @throws IllegalArgumentException if the source emits more than one item
* @throws NoSuchElementException if the source emits no items
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-single-and-singleordefault">RxJava Wiki: single()</a>
* @see "MSDN: Observable.singleAsync()"
*/
Expand Down Expand Up @@ -3252,7 +3253,7 @@ trait Observable[+T]
*/
@deprecated("Use `toBlocking` instead", "0.19")
def toBlockingObservable: BlockingObservable[T] = {
new BlockingObservable[T](asJavaObservable.toBlocking)
new BlockingObservable[T](this)
}

/**
Expand All @@ -3264,7 +3265,7 @@ trait Observable[+T]
* @since 0.19
*/
def toBlocking: BlockingObservable[T] = {
new BlockingObservable[T](asJavaObservable.toBlocking)
new BlockingObservable[T](this)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@
package rx.lang.scala.observables

import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
import rx.lang.scala.ImplicitFunctionConversions._
import rx.lang.scala.Observable
import rx.observables.{BlockingObservable => JBlockingObservable}


/**
* An Observable that provides blocking operators.
*
* You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlockingObservable]]
* You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlocking]]
*/
// constructor is private because users should use Observable.toBlockingObservable
class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T])
extends AnyVal
// constructor is private because users should use Observable.toBlocking
class BlockingObservable[+T] private[scala] (val o: Observable[T])
extends AnyVal
{

// This is def because "field definition is not allowed in value class"
private def asJava: JBlockingObservable[_ <: T] = o.asJavaObservable.toBlocking
/**
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
* completes.
Expand Down Expand Up @@ -69,6 +73,31 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
asJava.last : T
}

/**
* Returns an `Option` with the last item emitted by the source Observable,
* or `None` if the source Observable completes without emitting any items.
*
* @return an `Option` with the last item emitted by the source Observable,
* or `None` if the source Observable is empty
*/
def lastOption: Option[T] = {
o.lastOption.toBlocking.single
}

/**
* Returns the last item emitted by the source Observable, or a default item
* if the source Observable completes without emitting any items.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/lastOrDefault.png">
*
* @param default the default item to emit if the source Observable is empty.
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
* @return the last item emitted by the source Observable, or a default item if the source Observable is empty
*/
def lastOrElse[U >: T](default: => U): U = {
lastOption getOrElse default
}

/**
* Returns the first item emitted by a specified [[Observable]], or
* `NoSuchElementException` if source contains no elements.
Expand Down Expand Up @@ -96,12 +125,29 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
*/
def head : T = first

// last -> use toIterable.last
// lastOrDefault -> use toIterable.lastOption
// first -> use toIterable.head
// firstOrDefault -> use toIterable.headOption
// single(predicate) -> use filter and single
// singleOrDefault -> use singleOption
/**
* Returns an `Option` with the very first item emitted by the source Observable,
* or `None` if the source Observable is empty.
*
* @return an `Option` with the very first item from the source,
* or `None` if the source Observable completes without emitting any item.
*/
def headOption: Option[T] = {
o.headOption.toBlocking.single
}

/**
* Returns the very first item emitted by the source Observable, or a default value if the source Observable is empty.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstOrDefault.png">
*
* @param default The default value to emit if the source Observable doesn't emit anything.
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
* @return the very first item from the source, or a default value if the source Observable completes without emitting any item.
*/
def headOrElse[U >: T](default: => U): U = {
headOption getOrElse default
}

/**
* Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}.
Expand Down Expand Up @@ -130,32 +176,48 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
}

/**
* If this {@link Observable} completes after emitting a single item, return that item,
* otherwise throw an exception.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.single.png">
* If the source Observable completes after emitting a single item, return that item. If the source Observable
* emits more than one item or no items, notify of an `IllegalArgumentException` or `NoSuchElementException` respectively.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
*
* @return the single item emitted by the {@link Observable}
* @return an Observable that emits the single item emitted by the source Observable
* @throws IllegalArgumentException if the source emits more than one item
* @throws NoSuchElementException if the source emits no items
*/
def single: T = {
asJava.single(): T // useless ascription because of compiler bug
}

/**
* If this {@link Observable} completes after emitting a single item, return an Option containing
* this item, otherwise return {@code None}.
* If the source Observable completes after emitting a single item, return an `Option` with that item;
* if the source Observable is empty, return `None`. If the source Observable emits more than one item,
* throw an `IllegalArgumentException`.
*
* @return an `Option` with the single item emitted by the source Observable, or
* `None` if the source Observable is empty
* @throws IllegalArgumentException if the source Observable emits more than one item
*/
def singleOption: Option[T] = {
var size: Int = 0
var last: Option[T] = None
for (t <- toIterable) {
size += 1
last = Some(t)
}
if (size == 1) last else None
o.singleOption.toBlocking.single
}

// TODO toFuture()
/**
* If the source Observable completes after emitting a single item, return that item;
* if the source Observable is empty, return a default item. If the source Observable
* emits more than one item, throw an `IllegalArgumentException`.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/singleOrDefault.png">
*
* @param default a default value to emit if the source Observable emits no item.
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
* @return the single item emitted by the source Observable, or a default item if
* the source Observable is empty
* @throws IllegalArgumentException if the source Observable emits more than one item
*/
def singleOrElse[U >: T](default: => U): U = {
singleOption getOrElse default
}

/**
* Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}.
Expand All @@ -171,6 +233,38 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking
asJava.toIterable.asScala.toList: List[T] // useless ascription because of compiler bug
}

/**
* Returns an `Iterable` that returns the latest item emitted by this `BlockingObservable`,
* waiting if necessary for one to become available.
*
* If this `BlockingObservable` produces items faster than `Iterator.next` takes them,
* `onNext` events might be skipped, but `onError` or `onCompleted` events are not.
*
* Note also that an `onNext` directly followed by `onCompleted` might hide the `onNext` event.
*
* @return an `Iterable` that always returns the latest item emitted by this `BlockingObservable`
*/
def latest: Iterable[T] = {
asJava.latest.asScala: Iterable[T] // useless ascription because of compiler bug
}

/**
* Returns a `Future` representing the single value emitted by this `BlockingObservable`.
*
* The returned `Future` will be completed with an `IllegalArgumentException` if the `BlockingObservable`
* emits more than one item. And it will be completed with an `NoSuchElementException` if the `BlockingObservable`
* is empty. Use `Observable.toSeq.toBlocking.toFuture` if you are not sure about the size of `BlockingObservable`
* and do not want to handle these `Exception`s.
*
* <img width="640" height="395" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.toFuture.png">
*
* @return a `Future` that expects a single item to be emitted by this `BlockingObservable`.
*/
def toFuture: Future[T] = {
val p = Promise[T]()
o.single.subscribe(t => p.success(t), e => p.failure(e))
p.future
}
}

// Cannot yet have inner class because of this error message:
Expand Down
Loading

0 comments on commit dcad052

Please sign in to comment.