Skip to content

Commit

Permalink
Change => Unit callbacks to () => Unit (Fixes ReactiveX#225 rever…
Browse files Browse the repository at this point in the history
…ts RxJava/#1345)
  • Loading branch information
David Hoepelman committed May 12, 2017
1 parent a6ebaf3 commit de4524a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object ExperimentalAPIExamples {
@Test def onBackpressureBufferWithCapacityExample2(): Unit = {
Observable[Int](subscriber => {
(1 to 200).foreach(subscriber.onNext)
}).onBackpressureBuffer(10, println("Overflow")).observeOn(IOScheduler()).subscribe(
}).onBackpressureBuffer(10, () => println("Overflow")).observeOn(IOScheduler()).subscribe(
v => {
Thread.sleep(10)
// A slow consumer
Expand Down
62 changes: 31 additions & 31 deletions examples/src/test/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doOnCompletedExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doOnCompleted { println("onCompleted") }
val o = List("red", "green", "blue").toObservable.doOnCompleted(() => println("onCompleted"))
o.subscribe(v => println(v), e => e.printStackTrace)
// red
// green
Expand All @@ -862,7 +862,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doOnSubscribeExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doOnSubscribe { println("subscribed") }
val o = List("red", "green", "blue").toObservable.doOnSubscribe(() => println("subscribed"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// subscribed
// red
Expand All @@ -872,7 +872,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doOnTerminateExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doOnTerminate { println("terminate") }
val o = List("red", "green", "blue").toObservable.doOnTerminate(() => println("terminate"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
Expand All @@ -882,7 +882,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doOnUnsubscribeExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doOnUnsubscribe { println("unsubscribed") }
val o = List("red", "green", "blue").toObservable.doOnUnsubscribe(() => println("unsubscribed"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
Expand All @@ -892,7 +892,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doAfterTerminateExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doAfterTerminate { println("finally") }
val o = List("red", "green", "blue").toObservable.doAfterTerminate(() => println("finally"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
Expand Down Expand Up @@ -1545,7 +1545,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}.switch.take(5).subscribe(println(_))
}

Expand All @@ -1558,7 +1558,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}
}.switch.subscribe(println(_), _.printStackTrace())

Expand All @@ -1572,7 +1572,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}
}.delayError.switch.subscribe(println(_), _.printStackTrace())
}
Expand All @@ -1593,7 +1593,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}
}.subscribe(println(_), _.printStackTrace())

Expand All @@ -1607,7 +1607,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}
}.subscribe(println(_), _.printStackTrace())
}
Expand Down Expand Up @@ -1778,56 +1778,56 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def concatEagerExample(): Unit = {
val o1 = Observable.interval(100 millis).take(3).map(l => s"o1 emit $l").doOnSubscribe(println("subscribe to o1"))
val o2 = Observable.interval(100 millis).take(3).map(l => s"o2 emit $l").doOnSubscribe(println("subscribe to o2"))
val o1 = Observable.interval(100 millis).take(3).map(l => s"o1 emit $l").doOnSubscribe(() => println("subscribe to o1"))
val o2 = Observable.interval(100 millis).take(3).map(l => s"o2 emit $l").doOnSubscribe(() => println("subscribe to o2"))
o1.concatEager(o2).subscribe(println(_))
}

@Test def concatEagerExample2(): Unit = {
(0 until 10).map { i =>
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}.toObservable.concatEager.subscribe(println(_))
}

@Test def concatEagerExample3(): Unit = {
(0 until 10).map { i =>
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}.toObservable.concatEager(capacityHint = 3).subscribe(println(_))
}

@Test def concatMapEagerExample(): Unit = {
(0 until 10).toObservable.concatMapEager { i =>
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}.subscribe(println(_))
}

@Test def concatMapEagerExample2(): Unit = {
(0 until 10).toObservable.concatMapEager(capacityHint = 10, i => {
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}).subscribe(println(_))
}

@Test def concatMapEagerExample3(): Unit = {
(0 until 10).toObservable.concatMapEager(capacityHint = 10, maxConcurrent = 3, i => {
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}).subscribe(println(_))
}

@Test def flattenDelayErrorExample() {
val o1 = Observable.just(1).delay(200 millis).
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(() => println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o4"))
Observable.just(o1, o2, o3, o4).delayError.flatten.subscribe(println(_), _.printStackTrace())
}

@Test def flattenDelayErrorExample2() {
val o1 = Observable.just(1).delay(200 millis).
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(() => println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o4"))
Observable.just(o1, o2, o3, o4).delayError.flatten(2).subscribe(println(_), _.printStackTrace())
}

Expand Down Expand Up @@ -1860,17 +1860,17 @@ class RxScalaDemo extends JUnitSuite {
}

def autoConnectExample(): Unit = {
val o = Observable.just(1, 2, 3).doOnSubscribe {
println("Start to emit items")
}.publish.autoConnect
val o = Observable.just(1, 2, 3)
.doOnSubscribe(() => println("Start to emit items"))
.publish.autoConnect
println("1st Observer is subscribing")
o.subscribe(println(_))
}

def autoConnectExample2(): Unit = {
val o = Observable.just(1, 2, 3).doOnSubscribe {
println("Start to emit items")
}.publish.autoConnect(3)
val o = Observable.just(1, 2, 3)
.doOnSubscribe(() => println("Start to emit items"))
.publish.autoConnect(3)
println("1st Observer is subscribing")
o.subscribe(i => println(s"s1: $i"))
println("2nd Observer is subscribing")
Expand Down
26 changes: 12 additions & 14 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1118,8 +1118,8 @@ trait Observable[+T]
* @return an [[Observable]] that emits the same items as the source [[Observable]], then invokes the `action`
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
def doAfterTerminate(action: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action))
def doAfterTerminate(action: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doAfterTerminate(action))
}

/**
Expand Down Expand Up @@ -3888,8 +3888,8 @@ trait Observable[+T]
* `onCompleted`
* @return the source Observable with the side-effecting behavior applied
*/
def doOnCompleted(onCompleted: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnCompleted(() => onCompleted))
def doOnCompleted(onCompleted: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnCompleted(onCompleted))
}

/**
Expand Down Expand Up @@ -3949,8 +3949,8 @@ trait Observable[+T]
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#doonsubscribe">RxJava wiki: doOnSubscribe</a>
* @since 0.20
*/
def doOnSubscribe(onSubscribe: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnSubscribe(() => onSubscribe))
def doOnSubscribe(onSubscribe: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnSubscribe(onSubscribe))
}

/**
Expand All @@ -3965,8 +3965,8 @@ trait Observable[+T]
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#wiki-doonterminate">RxJava Wiki: doOnTerminate()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804.aspx">MSDN: Observable.Do</a>
*/
def doOnTerminate(onTerminate: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnTerminate(() => onTerminate))
def doOnTerminate(onTerminate: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnTerminate(onTerminate))
}

/**
Expand Down Expand Up @@ -4003,8 +4003,8 @@ trait Observable[+T]
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#doonunsubscribe">RxJava wiki: doOnUnsubscribe</a>
* @since 0.20
*/
def doOnUnsubscribe(onUnsubscribe: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnUnsubscribe(() => onUnsubscribe))
def doOnUnsubscribe(onUnsubscribe: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnUnsubscribe(onUnsubscribe))
}

/**
Expand Down Expand Up @@ -4716,10 +4716,8 @@ trait Observable[+T]
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Beta
def onBackpressureBuffer(capacity: Long, onOverflow: => Unit): Observable[T] = {
asJavaObservable.onBackpressureBuffer(capacity, new Action0 {
override def call(): Unit = onOverflow
})
def onBackpressureBuffer(capacity: Long, onOverflow: () => Unit): Observable[T] = {
asJavaObservable.onBackpressureBuffer(capacity, onOverflow)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ class ObservableCompletenessKit extends CompletenessKit {
"delaySubscription(Func0[_ <: Observable[U]])" -> "delaySubscription(() => Observable[Any])",
"delaySubscription(Observable[U])" -> "[use `delaySubscription(() => Observable[Any])`]",
"dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])",
"doOnCompleted(Action0)" -> "doOnCompleted(=> Unit)",
"doOnCompleted(Action0)" -> "doOnCompleted(() => Unit)",
"doOnEach(Action1[Notification[_ >: T]])" -> "[use `doOnEach(T => Unit, Throwable => Unit, () => Unit)`]",
"doOnSubscribe(Action0)" -> "doOnSubscribe(=> Unit)",
"doOnUnsubscribe(Action0)" -> "doOnUnsubscribe(=> Unit)",
"doOnTerminate(Action0)" -> "doOnTerminate(=> Unit)",
"doOnSubscribe(Action0)" -> "doOnSubscribe(() => Unit)",
"doOnUnsubscribe(Action0)" -> "doOnUnsubscribe(() => Unit)",
"doOnTerminate(Action0)" -> "doOnTerminate(() => Unit)",
"elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)",
"to(Func1[_ >: Observable[T], R])" -> "[use Scala implicit feature to extend `Observable`]",
"doAfterTerminate(Action0)" -> "doAfterTerminate(=> Unit)",
"doAfterTerminate(Action0)" -> "doAfterTerminate(() => Unit)",
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"firstOrDefault(T)" -> "firstOrElse(=> U)",
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
Expand All @@ -108,7 +108,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R], Func1[Action1[K], Map[K, Object]])" -> "[TODO]",
"mergeWith(Observable[_ <: T])" -> "merge(Observable[U])",
"ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])`]",
"onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, => Unit)",
"onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, () => Unit)",
"onBackpressureBuffer(Long, Action0, Strategy)" -> "[TODO]",
"onErrorResumeNext(Func1[_ >: Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Throwable => Observable[U])",
Expand Down

0 comments on commit de4524a

Please sign in to comment.