Skip to content

Commit

Permalink
Merge pull request #1210 from zsxwing/rxscala-more
Browse files Browse the repository at this point in the history
Add more operators to RxScala
  • Loading branch information
benjchristensen committed May 20, 2014
2 parents 62266af + c965815 commit 27d1194
Show file tree
Hide file tree
Showing 3 changed files with 349 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ class RxScalaDemo extends JUnitSuite {
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
}

@Test def bufferExample() {
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
val boundary = Observable.interval(500 millis)
o.buffer(boundary).toBlockingObservable.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
}

@Test def windowExample() {
(for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o)
yield s"Observable#$i emits $n"
Expand Down Expand Up @@ -644,6 +650,21 @@ class RxScalaDemo extends JUnitSuite {
println(result)
}

@Test def delayExample3(): Unit = {
val o = List(100, 500, 200).toObservable.delay(
(i: Int) => Observable.items(i).delay(i millis)
)
o.toBlockingObservable.foreach(println(_))
}

@Test def delayExample4(): Unit = {
val o = List(100, 500, 200).toObservable.delay(
() => Observable.interval(500 millis).take(1),
(i: Int) => Observable.items(i).delay(i millis)
)
o.toBlockingObservable.foreach(println(_))
}

@Test def delaySubscriptionExample(): Unit = {
val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds)
val result = o.toBlockingObservable.toList
Expand Down Expand Up @@ -792,6 +813,53 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(List(1, 2, 3, 4), o.toBlockingObservable.toList)
}

@Test def sequenceEqualExampe(): Unit = {
val o1 = List(1, 2, 3).toObservable
val o2 = List(1, 2, 3).toObservable
val o3 = List(1, 2).toObservable
val o4 = List(1.0, 2.0, 3.0).toObservable
assertTrue(o1.sequenceEqual(o2).toBlockingObservable.single)
assertFalse(o1.sequenceEqual(o3).toBlockingObservable.single)
assertTrue(o1.sequenceEqual(o4).toBlockingObservable.single)
}

@Test def takeExample(): Unit = {
val o = (1 to 20).toObservable
.zip(Observable.interval(300 millis))
.map(_._1)
.take(2 seconds)
println(o.toBlockingObservable.toList)
}

@Test def takeRightExample(): Unit = {
val o = (1 to 6).toObservable.takeRight(3)
assertEquals(List(4, 5, 6), o.toBlockingObservable.toList)
}

@Test def takeRightExample2(): Unit = {
val o = (1 to 10).toObservable
.zip(Observable.interval(100 millis))
.map(_._1)
.takeRight(300 millis)
println(o.toBlockingObservable.toList)
}

@Test def takeRightExample3(): Unit = {
val o = (1 to 10).toObservable
.zip(Observable.interval(100 millis))
.map(_._1)
.takeRight(2, 300 millis)
println(o.toBlockingObservable.toList)
}

@Test def timeIntervalExample(): Unit = {
val o = (1 to 10).toObservable
.zip(Observable.interval(100 millis))
.map(_._1)
.timeInterval
println(o.toBlockingObservable.toList)
}

@Test def schedulerExample1(): Unit = {
val latch = new CountDownLatch(1)
val worker = IOScheduler().createWorker
Expand Down
Loading

0 comments on commit 27d1194

Please sign in to comment.