Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change => Unit callbacks to () => Unit #229

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object ExperimentalAPIExamples {
@Test def onBackpressureBufferWithCapacityExample2(): Unit = {
Observable[Int](subscriber => {
(1 to 200).foreach(subscriber.onNext)
}).onBackpressureBuffer(10, println("Overflow")).observeOn(IOScheduler()).subscribe(
}).onBackpressureBuffer(10, () => println("Overflow")).observeOn(IOScheduler()).subscribe(
v => {
Thread.sleep(10)
// A slow consumer
Expand Down
62 changes: 31 additions & 31 deletions examples/src/test/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doOnCompletedExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doOnCompleted { println("onCompleted") }
val o = List("red", "green", "blue").toObservable.doOnCompleted(() => println("onCompleted"))
o.subscribe(v => println(v), e => e.printStackTrace)
// red
// green
Expand All @@ -862,7 +862,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doOnSubscribeExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doOnSubscribe { println("subscribed") }
val o = List("red", "green", "blue").toObservable.doOnSubscribe(() => println("subscribed"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// subscribed
// red
Expand All @@ -872,7 +872,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doOnTerminateExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doOnTerminate { println("terminate") }
val o = List("red", "green", "blue").toObservable.doOnTerminate(() => println("terminate"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
Expand All @@ -882,7 +882,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doOnUnsubscribeExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doOnUnsubscribe { println("unsubscribed") }
val o = List("red", "green", "blue").toObservable.doOnUnsubscribe(() => println("unsubscribed"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
Expand All @@ -892,7 +892,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def doAfterTerminateExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doAfterTerminate { println("finally") }
val o = List("red", "green", "blue").toObservable.doAfterTerminate(() => println("finally"))
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
Expand Down Expand Up @@ -1545,7 +1545,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}.switch.take(5).subscribe(println(_))
}

Expand All @@ -1558,7 +1558,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}
}.switch.subscribe(println(_), _.printStackTrace())

Expand All @@ -1572,7 +1572,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}
}.delayError.switch.subscribe(println(_), _.printStackTrace())
}
Expand All @@ -1593,7 +1593,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}
}.subscribe(println(_), _.printStackTrace())

Expand All @@ -1607,7 +1607,7 @@ class RxScalaDemo extends JUnitSuite {
Observable.interval(100 millis)
.map(l => s"o$n emit $l")
.take(3)
.doOnSubscribe(println(s"subscribe to o$n"))
.doOnSubscribe(() => println(s"subscribe to o$n"))
}
}.subscribe(println(_), _.printStackTrace())
}
Expand Down Expand Up @@ -1778,56 +1778,56 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def concatEagerExample(): Unit = {
val o1 = Observable.interval(100 millis).take(3).map(l => s"o1 emit $l").doOnSubscribe(println("subscribe to o1"))
val o2 = Observable.interval(100 millis).take(3).map(l => s"o2 emit $l").doOnSubscribe(println("subscribe to o2"))
val o1 = Observable.interval(100 millis).take(3).map(l => s"o1 emit $l").doOnSubscribe(() => println("subscribe to o1"))
val o2 = Observable.interval(100 millis).take(3).map(l => s"o2 emit $l").doOnSubscribe(() => println("subscribe to o2"))
o1.concatEager(o2).subscribe(println(_))
}

@Test def concatEagerExample2(): Unit = {
(0 until 10).map { i =>
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}.toObservable.concatEager.subscribe(println(_))
}

@Test def concatEagerExample3(): Unit = {
(0 until 10).map { i =>
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}.toObservable.concatEager(capacityHint = 3).subscribe(println(_))
}

@Test def concatMapEagerExample(): Unit = {
(0 until 10).toObservable.concatMapEager { i =>
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}.subscribe(println(_))
}

@Test def concatMapEagerExample2(): Unit = {
(0 until 10).toObservable.concatMapEager(capacityHint = 10, i => {
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}).subscribe(println(_))
}

@Test def concatMapEagerExample3(): Unit = {
(0 until 10).toObservable.concatMapEager(capacityHint = 10, maxConcurrent = 3, i => {
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(() => println(s"subscribe to o$i"))
}).subscribe(println(_))
}

@Test def flattenDelayErrorExample() {
val o1 = Observable.just(1).delay(200 millis).
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(() => println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o4"))
Observable.just(o1, o2, o3, o4).delayError.flatten.subscribe(println(_), _.printStackTrace())
}

@Test def flattenDelayErrorExample2() {
val o1 = Observable.just(1).delay(200 millis).
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(() => println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(() => println(s"subscribe to o4"))
Observable.just(o1, o2, o3, o4).delayError.flatten(2).subscribe(println(_), _.printStackTrace())
}

Expand Down Expand Up @@ -1860,17 +1860,17 @@ class RxScalaDemo extends JUnitSuite {
}

def autoConnectExample(): Unit = {
val o = Observable.just(1, 2, 3).doOnSubscribe {
println("Start to emit items")
}.publish.autoConnect
val o = Observable.just(1, 2, 3)
.doOnSubscribe(() => println("Start to emit items"))
.publish.autoConnect
println("1st Observer is subscribing")
o.subscribe(println(_))
}

def autoConnectExample2(): Unit = {
val o = Observable.just(1, 2, 3).doOnSubscribe {
println("Start to emit items")
}.publish.autoConnect(3)
val o = Observable.just(1, 2, 3)
.doOnSubscribe(() => println("Start to emit items"))
.publish.autoConnect(3)
println("1st Observer is subscribing")
o.subscribe(i => println(s"s1: $i"))
println("2nd Observer is subscribing")
Expand Down
26 changes: 12 additions & 14 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1118,8 +1118,8 @@ trait Observable[+T]
* @return an [[Observable]] that emits the same items as the source [[Observable]], then invokes the `action`
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
def doAfterTerminate(action: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action))
def doAfterTerminate(action: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doAfterTerminate(action))
}

/**
Expand Down Expand Up @@ -3888,8 +3888,8 @@ trait Observable[+T]
* `onCompleted`
* @return the source Observable with the side-effecting behavior applied
*/
def doOnCompleted(onCompleted: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnCompleted(() => onCompleted))
def doOnCompleted(onCompleted: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnCompleted(onCompleted))
}

/**
Expand Down Expand Up @@ -3949,8 +3949,8 @@ trait Observable[+T]
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#doonsubscribe">RxJava wiki: doOnSubscribe</a>
* @since 0.20
*/
def doOnSubscribe(onSubscribe: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnSubscribe(() => onSubscribe))
def doOnSubscribe(onSubscribe: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnSubscribe(onSubscribe))
}

/**
Expand All @@ -3965,8 +3965,8 @@ trait Observable[+T]
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#wiki-doonterminate">RxJava Wiki: doOnTerminate()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804.aspx">MSDN: Observable.Do</a>
*/
def doOnTerminate(onTerminate: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnTerminate(() => onTerminate))
def doOnTerminate(onTerminate: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnTerminate(onTerminate))
}

/**
Expand Down Expand Up @@ -4003,8 +4003,8 @@ trait Observable[+T]
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#doonunsubscribe">RxJava wiki: doOnUnsubscribe</a>
* @since 0.20
*/
def doOnUnsubscribe(onUnsubscribe: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnUnsubscribe(() => onUnsubscribe))
def doOnUnsubscribe(onUnsubscribe: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnUnsubscribe(onUnsubscribe))
}

/**
Expand Down Expand Up @@ -4716,10 +4716,8 @@ trait Observable[+T]
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Beta
def onBackpressureBuffer(capacity: Long, onOverflow: => Unit): Observable[T] = {
asJavaObservable.onBackpressureBuffer(capacity, new Action0 {
override def call(): Unit = onOverflow
})
def onBackpressureBuffer(capacity: Long, onOverflow: () => Unit): Observable[T] = {
asJavaObservable.onBackpressureBuffer(capacity, onOverflow)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ class ObservableCompletenessKit extends CompletenessKit {
"delaySubscription(Func0[_ <: Observable[U]])" -> "delaySubscription(() => Observable[Any])",
"delaySubscription(Observable[U])" -> "[use `delaySubscription(() => Observable[Any])`]",
"dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])",
"doOnCompleted(Action0)" -> "doOnCompleted(=> Unit)",
"doOnCompleted(Action0)" -> "doOnCompleted(() => Unit)",
"doOnEach(Action1[Notification[_ >: T]])" -> "[use `doOnEach(T => Unit, Throwable => Unit, () => Unit)`]",
"doOnSubscribe(Action0)" -> "doOnSubscribe(=> Unit)",
"doOnUnsubscribe(Action0)" -> "doOnUnsubscribe(=> Unit)",
"doOnTerminate(Action0)" -> "doOnTerminate(=> Unit)",
"doOnSubscribe(Action0)" -> "doOnSubscribe(() => Unit)",
"doOnUnsubscribe(Action0)" -> "doOnUnsubscribe(() => Unit)",
"doOnTerminate(Action0)" -> "doOnTerminate(() => Unit)",
"elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)",
"to(Func1[_ >: Observable[T], R])" -> "[use Scala implicit feature to extend `Observable`]",
"doAfterTerminate(Action0)" -> "doAfterTerminate(=> Unit)",
"doAfterTerminate(Action0)" -> "doAfterTerminate(() => Unit)",
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"firstOrDefault(T)" -> "firstOrElse(=> U)",
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
Expand All @@ -108,7 +108,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R], Func1[Action1[K], Map[K, Object]])" -> "[TODO]",
"mergeWith(Observable[_ <: T])" -> "merge(Observable[U])",
"ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])`]",
"onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, => Unit)",
"onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, () => Unit)",
"onBackpressureBuffer(Long, Action0, Strategy)" -> "[TODO]",
"onErrorResumeNext(Func1[_ >: Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Throwable => Observable[U])",
Expand Down