Skip to content

Commit

Permalink
Merge pull request #1265 from zsxwing/rxscala-more
Browse files Browse the repository at this point in the history
Add more operators to RxScala
  • Loading branch information
benjchristensen committed May 29, 2014
2 parents 95ab2a1 + 49cc022 commit 28a9a99
Show file tree
Hide file tree
Showing 4 changed files with 453 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ class RxScalaDemo extends JUnitSuite {
).subscribe(output(_))
}

@Test def windowExample2() {
val windowObservable = Observable.interval(500 millis)
val o = Observable.from(1 to 20).zip(Observable.interval(100 millis)).map(_._1)
(for ((o, i) <- o.window(windowObservable).zipWithIndex; n <- o)
yield s"Observable#$i emits $n"
).toBlocking.foreach(println)
}

@Test def testReduce() {
assertEquals(10, List(1, 2, 3, 4).toObservable.reduce(_ + _).toBlockingObservable.single)
}
Expand Down Expand Up @@ -731,6 +739,23 @@ class RxScalaDemo extends JUnitSuite {
println(result)
}

@Test def ambWithVarargsExample(): Unit = {
val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds)
val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds)
val o3 = List(10000L, 20000L, 30000L).toObservable.delay(4 seconds)
val result = Observable.amb(o1, o2, o3).toBlocking.toList
println(result)
}

@Test def ambWithSeqExample(): Unit = {
val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds)
val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds)
val o3 = List(10000L, 20000L, 30000L).toObservable.delay(4 seconds)
val o = Seq(o1, o2, o3)
val result = Observable.amb(o: _*).toBlocking.toList
println(result)
}

@Test def delayExample(): Unit = {
val o = List(100L, 200L, 300L).toObservable.delay(2 seconds)
val result = o.toBlockingObservable.toList
Expand Down Expand Up @@ -1012,4 +1037,75 @@ class RxScalaDemo extends JUnitSuite {
subscription.unsubscribe()
}

def createAHotObservable: Observable[String] = {
var first = true
Observable[String] {
subscriber =>
if (first) {
subscriber.onNext("1st: First")
subscriber.onNext("1st: Last")
first = false
}
else {
subscriber.onNext("2nd: First")
subscriber.onNext("2nd: Last")
}
subscriber.onCompleted()
}
}

@Test def withoutPublishLastExample() {
val hot = createAHotObservable
hot.takeRight(1).subscribe(n => println(s"subscriber 1 gets $n"))
hot.takeRight(1).subscribe(n => println(s"subscriber 2 gets $n"))
}

@Test def publishLastExample() {
val hot = createAHotObservable
val o = hot.publishLast
o.subscribe(n => println(s"subscriber 1 gets $n"))
o.subscribe(n => println(s"subscriber 2 gets $n"))
o.connect
}

@Test def publishLastExample2() {
val hot = createAHotObservable
val o = hot.publishLast(co => co ++ co) // "++" subscribes "co" twice
o.subscribe(n => println(s"subscriber gets $n"))
}

@Test def unsubscribeOnExample() {
val o = Observable[String] {
subscriber =>
subscriber.add(Subscription {
println("unsubscribe on " + Thread.currentThread().getName())
})
subscriber.onNext("RxScala")
subscriber.onCompleted()
}
o.unsubscribeOn(NewThreadScheduler()).subscribe(println(_))
}

@Test def parallelMergeExample() {
val o: Observable[Observable[Int]] = (1 to 100).toObservable.map(_ => (1 to 10).toObservable)
assertEquals(100, o.size.toBlockingObservable.single)
assertEquals(1000, o.flatten.size.toBlockingObservable.single)

val o2: Observable[Observable[Int]] = o.parallelMerge(10, ComputationScheduler())
assertEquals(10, o2.size.toBlockingObservable.single)
assertEquals(1000, o2.flatten.size.toBlockingObservable.single)
}

@Test def debounceExample() {
val o = Observable.interval(100 millis).take(20).debounce {
n =>
if (n % 2 == 0) {
Observable.interval(50 millis)
}
else {
Observable.interval(150 millis)
}
}
o.toBlockingObservable.foreach(println(_))
}
}
Loading

0 comments on commit 28a9a99

Please sign in to comment.