diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index caeac835ff..79f238e38f 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -19,6 +19,7 @@ import java.io.IOException import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import scala.concurrent.Await import scala.collection.mutable import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt @@ -226,6 +227,28 @@ class RxScalaDemo extends JUnitSuite { assertEquals(squares.toBlockingObservable.toList, List(4, 100, 400, 900)) } + @Test def nextExample() { + val o = Observable.interval(100 millis).take(20) + for(i <- o.toBlocking.next) { + println(i) + Thread.sleep(200) + } + } + + @Test def latestExample() { + val o = Observable.interval(100 millis).take(20) + for(i <- o.toBlocking.latest) { + println(i) + Thread.sleep(200) + } + } + + @Test def toFutureExample() { + val o = Observable.interval(500 millis).take(1) + val r = Await.result(o.toBlocking.toFuture, 2 seconds) + println(r) + } + @Test def testTwoSubscriptionsToOneInterval() { val o = Observable.interval(100 millis).take(8) o.subscribe( diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 47cfc762aa..64b98dd101 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2985,13 +2985,14 @@ trait Observable[+T] /** * If the source Observable completes after emitting a single item, return an Observable that emits that - * item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`. + * item. If the source Observable emits more than one item or no items, notify of an `IllegalArgumentException` + * or `NoSuchElementException` respectively. * * * * @return an Observable that emits the single item emitted by the source Observable - * @throws NoSuchElementException - * if the source emits more than one item or no items + * @throws IllegalArgumentException if the source emits more than one item + * @throws NoSuchElementException if the source emits no items * @see RxJava Wiki: single() * @see "MSDN: Observable.singleAsync()" */ @@ -3252,7 +3253,7 @@ trait Observable[+T] */ @deprecated("Use `toBlocking` instead", "0.19") def toBlockingObservable: BlockingObservable[T] = { - new BlockingObservable[T](asJavaObservable.toBlocking) + new BlockingObservable[T](this) } /** @@ -3264,7 +3265,7 @@ trait Observable[+T] * @since 0.19 */ def toBlocking: BlockingObservable[T] = { - new BlockingObservable[T](asJavaObservable.toBlocking) + new BlockingObservable[T](this) } /** diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index 4ae6a54ce7..6dd2ab83ce 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -16,19 +16,23 @@ package rx.lang.scala.observables import scala.collection.JavaConverters._ +import scala.concurrent.{Future, Promise} import rx.lang.scala.ImplicitFunctionConversions._ +import rx.lang.scala.Observable +import rx.observables.{BlockingObservable => JBlockingObservable} /** * An Observable that provides blocking operators. * - * You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlockingObservable]] + * You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlocking]] */ -// constructor is private because users should use Observable.toBlockingObservable -class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T]) - extends AnyVal +// constructor is private because users should use Observable.toBlocking +class BlockingObservable[+T] private[scala] (val o: Observable[T]) + extends AnyVal { - + // This is def because "field definition is not allowed in value class" + private def asJava: JBlockingObservable[_ <: T] = o.asJavaObservable.toBlocking /** * Invoke a method on each item emitted by the {@link Observable}; block until the Observable * completes. @@ -69,6 +73,31 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking asJava.last : T } + /** + * Returns an `Option` with the last item emitted by the source Observable, + * or `None` if the source Observable completes without emitting any items. + * + * @return an `Option` with the last item emitted by the source Observable, + * or `None` if the source Observable is empty + */ + def lastOption: Option[T] = { + o.lastOption.toBlocking.single + } + + /** + * Returns the last item emitted by the source Observable, or a default item + * if the source Observable completes without emitting any items. + * + * + * + * @param default the default item to emit if the source Observable is empty. + * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything. + * @return the last item emitted by the source Observable, or a default item if the source Observable is empty + */ + def lastOrElse[U >: T](default: => U): U = { + lastOption getOrElse default + } + /** * Returns the first item emitted by a specified [[Observable]], or * `NoSuchElementException` if source contains no elements. @@ -96,12 +125,29 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking */ def head : T = first - // last -> use toIterable.last - // lastOrDefault -> use toIterable.lastOption - // first -> use toIterable.head - // firstOrDefault -> use toIterable.headOption - // single(predicate) -> use filter and single - // singleOrDefault -> use singleOption + /** + * Returns an `Option` with the very first item emitted by the source Observable, + * or `None` if the source Observable is empty. + * + * @return an `Option` with the very first item from the source, + * or `None` if the source Observable completes without emitting any item. + */ + def headOption: Option[T] = { + o.headOption.toBlocking.single + } + + /** + * Returns the very first item emitted by the source Observable, or a default value if the source Observable is empty. + * + * + * + * @param default The default value to emit if the source Observable doesn't emit anything. + * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything. + * @return the very first item from the source, or a default value if the source Observable completes without emitting any item. + */ + def headOrElse[U >: T](default: => U): U = { + headOption getOrElse default + } /** * Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}. @@ -130,32 +176,48 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking } /** - * If this {@link Observable} completes after emitting a single item, return that item, - * otherwise throw an exception. - *

- * + * If the source Observable completes after emitting a single item, return that item. If the source Observable + * emits more than one item or no items, notify of an `IllegalArgumentException` or `NoSuchElementException` respectively. + * + * * - * @return the single item emitted by the {@link Observable} + * @return an Observable that emits the single item emitted by the source Observable + * @throws IllegalArgumentException if the source emits more than one item + * @throws NoSuchElementException if the source emits no items */ def single: T = { asJava.single(): T // useless ascription because of compiler bug } /** - * If this {@link Observable} completes after emitting a single item, return an Option containing - * this item, otherwise return {@code None}. + * If the source Observable completes after emitting a single item, return an `Option` with that item; + * if the source Observable is empty, return `None`. If the source Observable emits more than one item, + * throw an `IllegalArgumentException`. + * + * @return an `Option` with the single item emitted by the source Observable, or + * `None` if the source Observable is empty + * @throws IllegalArgumentException if the source Observable emits more than one item */ def singleOption: Option[T] = { - var size: Int = 0 - var last: Option[T] = None - for (t <- toIterable) { - size += 1 - last = Some(t) - } - if (size == 1) last else None + o.singleOption.toBlocking.single } - // TODO toFuture() + /** + * If the source Observable completes after emitting a single item, return that item; + * if the source Observable is empty, return a default item. If the source Observable + * emits more than one item, throw an `IllegalArgumentException`. + * + * + * + * @param default a default value to emit if the source Observable emits no item. + * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything. + * @return the single item emitted by the source Observable, or a default item if + * the source Observable is empty + * @throws IllegalArgumentException if the source Observable emits more than one item + */ + def singleOrElse[U >: T](default: => U): U = { + singleOption getOrElse default + } /** * Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}. @@ -171,6 +233,38 @@ class BlockingObservable[+T] private[scala] (val asJava: rx.observables.Blocking asJava.toIterable.asScala.toList: List[T] // useless ascription because of compiler bug } + /** + * Returns an `Iterable` that returns the latest item emitted by this `BlockingObservable`, + * waiting if necessary for one to become available. + * + * If this `BlockingObservable` produces items faster than `Iterator.next` takes them, + * `onNext` events might be skipped, but `onError` or `onCompleted` events are not. + * + * Note also that an `onNext` directly followed by `onCompleted` might hide the `onNext` event. + * + * @return an `Iterable` that always returns the latest item emitted by this `BlockingObservable` + */ + def latest: Iterable[T] = { + asJava.latest.asScala: Iterable[T] // useless ascription because of compiler bug + } + + /** + * Returns a `Future` representing the single value emitted by this `BlockingObservable`. + * + * The returned `Future` will be completed with an `IllegalArgumentException` if the `BlockingObservable` + * emits more than one item. And it will be completed with an `NoSuchElementException` if the `BlockingObservable` + * is empty. Use `Observable.toSeq.toBlocking.toFuture` if you are not sure about the size of `BlockingObservable` + * and do not want to handle these `Exception`s. + * + * + * + * @return a `Future` that expects a single item to be emitted by this `BlockingObservable`. + */ + def toFuture: Future[T] = { + val p = Promise[T]() + o.single.subscribe(t => p.success(t), e => p.failure(e)) + p.future + } } // Cannot yet have inner class because of this error message: diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala new file mode 100644 index 0000000000..d488150f46 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/observables/BlockingObservableTest.scala @@ -0,0 +1,152 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.observables + +import scala.concurrent.Await +import scala.concurrent.duration._ +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnitSuite +import scala.language.postfixOps +import rx.lang.scala.Observable + +class BlockingObservableTest extends JUnitSuite { + + @Test + def testSingleOption() { + val o = Observable.items(1) + assertEquals(Some(1), o.toBlocking.singleOption) + } + + @Test + def testSingleOptionWithEmpty() { + val o = Observable.empty + assertEquals(None, o.toBlocking.singleOption) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testSingleOptionWithMultipleItems() { + Observable.items(1, 2).toBlocking.singleOption + } + + @Test + def testSingleOrElse() { + val o = Observable.items(1) + assertEquals(1, o.toBlocking.singleOrElse(2)) + } + + @Test + def testSingleOrElseWithEmpty() { + val o = Observable.empty + assertEquals(2, o.toBlocking.singleOrElse(2)) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testSingleOrElseWithMultipleItems() { + Observable.items(1, 2).toBlocking.singleOrElse(2) + } + + @Test + def testHeadOption() { + val o = Observable.items(1) + assertEquals(Some(1), o.toBlocking.headOption) + } + + @Test + def testHeadOptionWithEmpty() { + val o = Observable.empty + assertEquals(None, o.toBlocking.headOption) + } + + @Test + def testHeadOptionWithMultipleItems() { + val o = Observable.items(1, 2) + assertEquals(Some(1), o.toBlocking.headOption) + } + + @Test + def testHeadOrElse() { + val o = Observable.items(1) + assertEquals(1, o.toBlocking.headOrElse(2)) + } + + @Test + def testHeadOrElseWithEmpty() { + val o = Observable.empty + assertEquals(2, o.toBlocking.headOrElse(2)) + } + + @Test + def testHeadOrElseWithMultipleItems() { + val o = Observable.items(1, 2) + assertEquals(1, o.toBlocking.headOrElse(2)) + } + + @Test + def testLastOption() { + val o = Observable.items(1) + assertEquals(Some(1), o.toBlocking.lastOption) + } + + @Test + def testLastOptionWithEmpty() { + val o = Observable.empty + assertEquals(None, o.toBlocking.lastOption) + } + + @Test + def testLastOptionWithMultipleItems() { + val o = Observable.items(1, 2) + assertEquals(Some(2), o.toBlocking.lastOption) + } + + @Test + def testLastOrElse() { + val o = Observable.items(1) + assertEquals(1, o.toBlocking.lastOrElse(2)) + } + + @Test + def testLastOrElseWithEmpty() { + val o = Observable.empty + assertEquals(2, o.toBlocking.lastOrElse(2)) + } + + @Test + def testLastOrElseWithMultipleItems() { + val o = Observable.items(1, 2) + assertEquals(2, o.toBlocking.lastOrElse(3)) + } + + @Test + def testToFuture() { + val o = Observable.items(1) + val r = Await.result(o.toBlocking.toFuture, 10 seconds) + assertEquals(1, r) + } + + @Test(expected = classOf[NoSuchElementException]) + def testToFutureWithEmpty() { + val o = Observable.empty + Await.result(o.toBlocking.toFuture, 10 seconds) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testToFutureWithMultipleItems() { + val o = Observable.items(1, 2) + Await.result(o.toBlocking.toFuture, 10 seconds) + } +}