Skip to content

Commit

Permalink
Merge pull request #1160 from zsxwing/replay-multicast
Browse files Browse the repository at this point in the history
Add `replay` and `multicast` variants to RxScala
  • Loading branch information
benjchristensen committed May 20, 2014
2 parents c8596a4 + a9ddbc6 commit c54e688
Show file tree
Hide file tree
Showing 3 changed files with 363 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,54 @@ class RxScalaDemo extends JUnitSuite {

@Test def exampleWithReplay() {
val numbers = Observable.interval(1000 millis).take(6)
val (startFunc, sharedNumbers) = numbers.replay
val sharedNumbers = numbers.replay
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
startFunc()
sharedNumbers.connect
// subscriber 2 subscribes later but still gets all numbers
doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) })
waitFor(sharedNumbers)
}

@Test def exampleWithReplay2() {
val numbers = Observable.interval(100 millis).take(10)
val sharedNumbers = numbers.replay(3)
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
sharedNumbers.connect
// subscriber 2 subscribes later but only gets the 3 buffered numbers and the following numbers
Thread.sleep(700)
sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n"))
waitFor(sharedNumbers)
}

@Test def exampleWithReplay3() {
val numbers = Observable.interval(100 millis).take(10)
val sharedNumbers = numbers.replay(300 millis)
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
sharedNumbers.connect
// subscriber 2 subscribes later but only gets the buffered numbers and the following numbers
Thread.sleep(700)
sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n"))
waitFor(sharedNumbers)
}

@Test def exampleWithReplay4() {
val numbers = Observable.interval(100 millis).take(10)
val sharedNumbers = numbers.replay(2, 300 millis)
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
sharedNumbers.connect
// subscriber 2 subscribes later but only gets the buffered numbers and the following numbers
Thread.sleep(700)
sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n"))
waitFor(sharedNumbers)
}

@Test def exampleWithReplay5() {
val numbers = Observable.interval(100 millis).take(10)
val sharedNumbers = numbers.replay[Long, Long]((o: Observable[Long]) => o.map(_ * 2))
sharedNumbers.subscribe(n => println(s"subscriber gets $n"))
waitFor(sharedNumbers)
}

@Test def testSingleOption() {
assertEquals(None, List(1, 2).toObservable.toBlockingObservable.singleOption)
assertEquals(Some(1), List(1).toObservable.toBlockingObservable.singleOption)
Expand Down Expand Up @@ -720,7 +760,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def repeatExample1(): Unit = {
val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat().take(6)
val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat.take(6)
assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList)
}

Expand Down Expand Up @@ -802,6 +842,21 @@ class RxScalaDemo extends JUnitSuite {
}
}

@Test def multicastExample1(): Unit = {
val unshared = Observable.from(1 to 4)
val shared = unshared.multicast(Subject())
shared.subscribe(n => println(s"subscriber 1 gets $n"))
shared.subscribe(n => println(s"subscriber 2 gets $n"))
shared.connect
}

@Test def multicastExample2(): Unit = {
val unshared = Observable.from(1 to 4)
val shared = unshared.multicast[Int, String](() => Subject(), o => o.map("No. " + _))
shared.subscribe(n => println(s"subscriber 1 gets $n"))
shared.subscribe(n => println(s"subscriber 2 gets $n"))
}

@Test def startWithExample(): Unit = {
val o1 = List(3, 4).toObservable
val o2 = 1 +: 2 +: o1
Expand Down
Loading

0 comments on commit c54e688

Please sign in to comment.