diff --git a/examples/src/test/scala/examples/ExperimentalAPIExamples.scala b/examples/src/test/scala/examples/ExperimentalAPIExamples.scala index 5523462f..db8b69f1 100644 --- a/examples/src/test/scala/examples/ExperimentalAPIExamples.scala +++ b/examples/src/test/scala/examples/ExperimentalAPIExamples.scala @@ -64,7 +64,7 @@ object ExperimentalAPIExamples { @Test def onBackpressureBufferWithCapacityExample2(): Unit = { Observable[Int](subscriber => { (1 to 200).foreach(subscriber.onNext) - }).onBackpressureBuffer(10, println("Overflow")).observeOn(IOScheduler()).subscribe( + }).onBackpressureBuffer(10, () => println("Overflow")).observeOn(IOScheduler()).subscribe( v => { Thread.sleep(10) // A slow consumer diff --git a/examples/src/test/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/examples/RxScalaDemo.scala index aaf5681c..0efe1eb8 100644 --- a/examples/src/test/scala/examples/RxScalaDemo.scala +++ b/examples/src/test/scala/examples/RxScalaDemo.scala @@ -853,7 +853,7 @@ class RxScalaDemo extends JUnitSuite { } @Test def doOnCompletedExample(): Unit = { - val o = List("red", "green", "blue").toObservable.doOnCompleted { println("onCompleted") } + val o = List("red", "green", "blue").toObservable.doOnCompleted(() => println("onCompleted")) o.subscribe(v => println(v), e => e.printStackTrace) // red // green @@ -862,7 +862,7 @@ class RxScalaDemo extends JUnitSuite { } @Test def doOnSubscribeExample(): Unit = { - val o = List("red", "green", "blue").toObservable.doOnSubscribe { println("subscribed") } + val o = List("red", "green", "blue").toObservable.doOnSubscribe(() => println("subscribed")) o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted")) // subscribed // red @@ -872,7 +872,7 @@ class RxScalaDemo extends JUnitSuite { } @Test def doOnTerminateExample(): Unit = { - val o = List("red", "green", "blue").toObservable.doOnTerminate { println("terminate") } + val o = List("red", "green", "blue").toObservable.doOnTerminate(() => println("terminate")) o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted")) // red // green @@ -882,7 +882,7 @@ class RxScalaDemo extends JUnitSuite { } @Test def doOnUnsubscribeExample(): Unit = { - val o = List("red", "green", "blue").toObservable.doOnUnsubscribe { println("unsubscribed") } + val o = List("red", "green", "blue").toObservable.doOnUnsubscribe(() => println("unsubscribed")) o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted")) // red // green @@ -892,7 +892,7 @@ class RxScalaDemo extends JUnitSuite { } @Test def doAfterTerminateExample(): Unit = { - val o = List("red", "green", "blue").toObservable.doAfterTerminate { println("finally") } + val o = List("red", "green", "blue").toObservable.doAfterTerminate(() => println("finally")) o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted")) // red // green @@ -1545,7 +1545,7 @@ class RxScalaDemo extends JUnitSuite { Observable.interval(100 millis) .map(l => s"o$n emit $l") .take(3) - .doOnSubscribe(println(s"subscribe to o$n")) + .doOnSubscribe(() => println(s"subscribe to o$n")) }.switch.take(5).subscribe(println(_)) } @@ -1558,7 +1558,7 @@ class RxScalaDemo extends JUnitSuite { Observable.interval(100 millis) .map(l => s"o$n emit $l") .take(3) - .doOnSubscribe(println(s"subscribe to o$n")) + .doOnSubscribe(() => println(s"subscribe to o$n")) } }.switch.subscribe(println(_), _.printStackTrace()) @@ -1572,7 +1572,7 @@ class RxScalaDemo extends JUnitSuite { Observable.interval(100 millis) .map(l => s"o$n emit $l") .take(3) - .doOnSubscribe(println(s"subscribe to o$n")) + .doOnSubscribe(() => println(s"subscribe to o$n")) } }.delayError.switch.subscribe(println(_), _.printStackTrace()) } @@ -1593,7 +1593,7 @@ class RxScalaDemo extends JUnitSuite { Observable.interval(100 millis) .map(l => s"o$n emit $l") .take(3) - .doOnSubscribe(println(s"subscribe to o$n")) + .doOnSubscribe(() => println(s"subscribe to o$n")) } }.subscribe(println(_), _.printStackTrace()) @@ -1607,7 +1607,7 @@ class RxScalaDemo extends JUnitSuite { Observable.interval(100 millis) .map(l => s"o$n emit $l") .take(3) - .doOnSubscribe(println(s"subscribe to o$n")) + .doOnSubscribe(() => println(s"subscribe to o$n")) } }.subscribe(println(_), _.printStackTrace()) } @@ -1778,56 +1778,56 @@ class RxScalaDemo extends JUnitSuite { } @Test def concatEagerExample(): Unit = { - val o1 = Observable.interval(100 millis).take(3).map(l => s"o1 emit $l").doOnSubscribe(println("subscribe to o1")) - val o2 = Observable.interval(100 millis).take(3).map(l => s"o2 emit $l").doOnSubscribe(println("subscribe to o2")) + val o1 = Observable.interval(100 millis).take(3).map(l => s"o1 emit $l").doOnSubscribe(() => println("subscribe to o1")) + val o2 = Observable.interval(100 millis).take(3).map(l => s"o2 emit $l").doOnSubscribe(() => println("subscribe to o2")) o1.concatEager(o2).subscribe(println(_)) } @Test def concatEagerExample2(): Unit = { (0 until 10).map { i => - Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i")) + Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i")) }.toObservable.concatEager.subscribe(println(_)) } @Test def concatEagerExample3(): Unit = { (0 until 10).map { i => - Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i")) + Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i")) }.toObservable.concatEager(capacityHint = 3).subscribe(println(_)) } @Test def concatMapEagerExample(): Unit = { (0 until 10).toObservable.concatMapEager { i => - Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i")) + Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i")) }.subscribe(println(_)) } @Test def concatMapEagerExample2(): Unit = { (0 until 10).toObservable.concatMapEager(capacityHint = 10, i => { - Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i")) + Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i")) }).subscribe(println(_)) } @Test def concatMapEagerExample3(): Unit = { (0 until 10).toObservable.concatMapEager(capacityHint = 10, maxConcurrent = 3, i => { - Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i")) + Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i")) }).subscribe(println(_)) } @Test def flattenDelayErrorExample() { val o1 = Observable.just(1).delay(200 millis). - flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1")) - val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2")) - val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3")) - val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4")) + flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(() => println(s"subscribe to o1")) + val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o2")) + val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o3")) + val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o4")) Observable.just(o1, o2, o3, o4).delayError.flatten.subscribe(println(_), _.printStackTrace()) } @Test def flattenDelayErrorExample2() { val o1 = Observable.just(1).delay(200 millis). - flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1")) - val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2")) - val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3")) - val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4")) + flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(() => println(s"subscribe to o1")) + val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o2")) + val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o3")) + val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o4")) Observable.just(o1, o2, o3, o4).delayError.flatten(2).subscribe(println(_), _.printStackTrace()) } @@ -1860,17 +1860,17 @@ class RxScalaDemo extends JUnitSuite { } def autoConnectExample(): Unit = { - val o = Observable.just(1, 2, 3).doOnSubscribe { - println("Start to emit items") - }.publish.autoConnect + val o = Observable.just(1, 2, 3) + .doOnSubscribe(() => println("Start to emit items")) + .publish.autoConnect println("1st Observer is subscribing") o.subscribe(println(_)) } def autoConnectExample2(): Unit = { - val o = Observable.just(1, 2, 3).doOnSubscribe { - println("Start to emit items") - }.publish.autoConnect(3) + val o = Observable.just(1, 2, 3) + .doOnSubscribe(() => println("Start to emit items")) + .publish.autoConnect(3) println("1st Observer is subscribing") o.subscribe(i => println(s"s1: $i")) println("2nd Observer is subscribing") diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala index 82862cea..72f81660 100644 --- a/src/main/scala/rx/lang/scala/Observable.scala +++ b/src/main/scala/rx/lang/scala/Observable.scala @@ -1118,8 +1118,8 @@ trait Observable[+T] * @return an [[Observable]] that emits the same items as the source [[Observable]], then invokes the `action` * @see ReactiveX operators documentation: Do */ - def doAfterTerminate(action: => Unit): Observable[T] = { - toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action)) + def doAfterTerminate(action: () => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doAfterTerminate(action)) } /** @@ -3888,8 +3888,8 @@ trait Observable[+T] * `onCompleted` * @return the source Observable with the side-effecting behavior applied */ - def doOnCompleted(onCompleted: => Unit): Observable[T] = { - toScalaObservable[T](asJavaObservable.doOnCompleted(() => onCompleted)) + def doOnCompleted(onCompleted: () => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doOnCompleted(onCompleted)) } /** @@ -3949,8 +3949,8 @@ trait Observable[+T] * @see RxJava wiki: doOnSubscribe * @since 0.20 */ - def doOnSubscribe(onSubscribe: => Unit): Observable[T] = { - toScalaObservable[T](asJavaObservable.doOnSubscribe(() => onSubscribe)) + def doOnSubscribe(onSubscribe: () => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doOnSubscribe(onSubscribe)) } /** @@ -3965,8 +3965,8 @@ trait Observable[+T] * @see RxJava Wiki: doOnTerminate() * @see MSDN: Observable.Do */ - def doOnTerminate(onTerminate: => Unit): Observable[T] = { - toScalaObservable[T](asJavaObservable.doOnTerminate(() => onTerminate)) + def doOnTerminate(onTerminate: () => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doOnTerminate(onTerminate)) } /** @@ -4003,8 +4003,8 @@ trait Observable[+T] * @see RxJava wiki: doOnUnsubscribe * @since 0.20 */ - def doOnUnsubscribe(onUnsubscribe: => Unit): Observable[T] = { - toScalaObservable[T](asJavaObservable.doOnUnsubscribe(() => onUnsubscribe)) + def doOnUnsubscribe(onUnsubscribe: () => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doOnUnsubscribe(onUnsubscribe)) } /** @@ -4716,10 +4716,8 @@ trait Observable[+T] * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) */ @Beta - def onBackpressureBuffer(capacity: Long, onOverflow: => Unit): Observable[T] = { - asJavaObservable.onBackpressureBuffer(capacity, new Action0 { - override def call(): Unit = onOverflow - }) + def onBackpressureBuffer(capacity: Long, onOverflow: () => Unit): Observable[T] = { + asJavaObservable.onBackpressureBuffer(capacity, onOverflow) } /** diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala index 178d3c25..10bd85a2 100644 --- a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala +++ b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala @@ -75,14 +75,14 @@ class ObservableCompletenessKit extends CompletenessKit { "delaySubscription(Func0[_ <: Observable[U]])" -> "delaySubscription(() => Observable[Any])", "delaySubscription(Observable[U])" -> "[use `delaySubscription(() => Observable[Any])`]", "dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])", - "doOnCompleted(Action0)" -> "doOnCompleted(=> Unit)", + "doOnCompleted(Action0)" -> "doOnCompleted(() => Unit)", "doOnEach(Action1[Notification[_ >: T]])" -> "[use `doOnEach(T => Unit, Throwable => Unit, () => Unit)`]", - "doOnSubscribe(Action0)" -> "doOnSubscribe(=> Unit)", - "doOnUnsubscribe(Action0)" -> "doOnUnsubscribe(=> Unit)", - "doOnTerminate(Action0)" -> "doOnTerminate(=> Unit)", + "doOnSubscribe(Action0)" -> "doOnSubscribe(() => Unit)", + "doOnUnsubscribe(Action0)" -> "doOnUnsubscribe(() => Unit)", + "doOnTerminate(Action0)" -> "doOnTerminate(() => Unit)", "elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)", "to(Func1[_ >: Observable[T], R])" -> "[use Scala implicit feature to extend `Observable`]", - "doAfterTerminate(Action0)" -> "doAfterTerminate(=> Unit)", + "doAfterTerminate(Action0)" -> "doAfterTerminate(() => Unit)", "first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, "firstOrDefault(T)" -> "firstOrElse(=> U)", "firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]", @@ -108,7 +108,7 @@ class ObservableCompletenessKit extends CompletenessKit { "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R], Func1[Action1[K], Map[K, Object]])" -> "[TODO]", "mergeWith(Observable[_ <: T])" -> "merge(Observable[U])", "ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])`]", - "onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, => Unit)", + "onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, () => Unit)", "onBackpressureBuffer(Long, Action0, Strategy)" -> "[TODO]", "onErrorResumeNext(Func1[_ >: Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])", "onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Throwable => Observable[U])",