diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index d0403064..3800a85a 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -343,14 +343,54 @@ class RxScalaDemo extends JUnitSuite { @Test def exampleWithReplay() { val numbers = Observable.interval(1000 millis).take(6) - val (startFunc, sharedNumbers) = numbers.replay + val sharedNumbers = numbers.replay sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) - startFunc() + sharedNumbers.connect // subscriber 2 subscribes later but still gets all numbers doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) }) waitFor(sharedNumbers) } + @Test def exampleWithReplay2() { + val numbers = Observable.interval(100 millis).take(10) + val sharedNumbers = numbers.replay(3) + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + sharedNumbers.connect + // subscriber 2 subscribes later but only gets the 3 buffered numbers and the following numbers + Thread.sleep(700) + sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) + waitFor(sharedNumbers) + } + + @Test def exampleWithReplay3() { + val numbers = Observable.interval(100 millis).take(10) + val sharedNumbers = numbers.replay(300 millis) + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + sharedNumbers.connect + // subscriber 2 subscribes later but only gets the buffered numbers and the following numbers + Thread.sleep(700) + sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) + waitFor(sharedNumbers) + } + + @Test def exampleWithReplay4() { + val numbers = Observable.interval(100 millis).take(10) + val sharedNumbers = numbers.replay(2, 300 millis) + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + sharedNumbers.connect + // subscriber 2 subscribes later but only gets the buffered numbers and the following numbers + Thread.sleep(700) + sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) + waitFor(sharedNumbers) + } + + @Test def exampleWithReplay5() { + val numbers = Observable.interval(100 millis).take(10) + val sharedNumbers = numbers.replay[Long, Long]((o: Observable[Long]) => o.map(_ * 2)) + sharedNumbers.subscribe(n => println(s"subscriber gets $n")) + waitFor(sharedNumbers) + } + @Test def testSingleOption() { assertEquals(None, List(1, 2).toObservable.toBlockingObservable.singleOption) assertEquals(Some(1), List(1).toObservable.toBlockingObservable.singleOption) @@ -720,7 +760,7 @@ class RxScalaDemo extends JUnitSuite { } @Test def repeatExample1(): Unit = { - val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat().take(6) + val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat.take(6) assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList) } @@ -802,6 +842,21 @@ class RxScalaDemo extends JUnitSuite { } } + @Test def multicastExample1(): Unit = { + val unshared = Observable.from(1 to 4) + val shared = unshared.multicast(Subject()) + shared.subscribe(n => println(s"subscriber 1 gets $n")) + shared.subscribe(n => println(s"subscriber 2 gets $n")) + shared.connect + } + + @Test def multicastExample2(): Unit = { + val unshared = Observable.from(1 to 4) + val shared = unshared.multicast[Int, String](() => Subject(), o => o.map("No. " + _)) + shared.subscribe(n => println(s"subscriber 1 gets $n")) + shared.subscribe(n => println(s"subscriber 2 gets $n")) + } + @Test def startWithExample(): Unit = { val o1 = List(3, 4).toObservable val o2 = 1 +: 2 +: o1 diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index beeca0d0..d144d188 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -271,10 +271,24 @@ trait Observable[+T] * @return a pair of a start function and an [[rx.lang.scala.Observable]] such that when the start function * is called, the Observable starts to push results into the specified Subject */ - def multicast[R >: T](subject: rx.lang.scala.Subject[R]): (() => Subscription, Observable[R]) = { + def multicast[R >: T](subject: rx.lang.scala.Subject[R]): ConnectableObservable[R] = { val s: rx.subjects.Subject[_ >: T, _<: R] = subject.asJavaSubject - val javaCO: rx.observables.ConnectableObservable[R] = asJavaObservable.multicast(s) - (() => javaCO.connect(), toScalaObservable(javaCO)) + new ConnectableObservable[R](asJavaObservable.multicast(s)) + } + + /** + * Returns an Observable that emits items produced by multicasting the source Observable within a selector function. + * + * @param subjectFactory the `Subject` factory + * @param selector the selector function, which can use the multicasted source Observable subject to the policies + * enforced by the created `Subject` + * @return an Observable that emits the items produced by multicasting the source Observable within a selector function + */ + def multicast[R >: T, U](subjectFactory: () => rx.lang.scala.Subject[R], selector: Observable[R] => Observable[U]): Observable[U] = { + val subjectFactoryJava: Func0[rx.subjects.Subject[_ >: T, _ <: R]] = () => subjectFactory().asJavaSubject + val selectorJava: Func1[rx.Observable[R], rx.Observable[U]] = + (jo: rx.Observable[R]) => selector(toScalaObservable[R](jo)).asJavaObservable.asInstanceOf[rx.Observable[U]] + toScalaObservable[U](asJavaObservable.multicast[R, U](subjectFactoryJava, selectorJava)) } /** @@ -1145,9 +1159,281 @@ trait Observable[+T] * @return a pair of a start function and an [[rx.lang.scala.Observable]] such that when the start function * is called, the Observable starts to emit items to its [[rx.lang.scala.Observer]]s */ - def replay: (() => Subscription, Observable[T]) = { - val javaCO = asJavaObservable.replay() - (() => javaCO.connect(), toScalaObservable[T](javaCO)) + def replay: ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.replay()) + } + + /** + * Returns an Observable that emits items that are the results of invoking a specified selector on the items + * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable. + *
+ * + * + * @param selector the selector function, which can use the multicasted sequence as many times as needed, without + * causing multiple subscriptions to the Observable + * @return an Observable that emits items that are the results of invoking the selector on a `ConnectableObservable` + * that shares a single subscription to the source Observable + */ + def replay[U >: T, R](selector: Observable[U] => Observable[R]): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.replay(fJava)) + } + + /** + * Returns an Observable that emits items that are the results of invoking a specified selector on items + * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying `bufferSize` notifications. + *
+ * + * + * @param selector the selector function, which can use the multicasted sequence as many times as needed, without + * causing multiple subscriptions to the Observable + * @param bufferSize the buffer size that limits the number of items the connectable observable can replay + * @return an Observable that emits items that are the results of invoking the selector on items emitted by + * a `ConnectableObservable` that shares a single subscription to the source Observable replaying + * no more than `bufferSize` items + */ + def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.replay(fJava, bufferSize)) + } + + /** + * Returns an Observable that emits items that are the results of invoking a specified selector on items + * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying no more than `bufferSize` items that were emitted within a specified time window. + *
+ * + * + * @param selector a selector function, which can use the multicasted sequence as many times as needed, without + * causing multiple subscriptions to the Observable + * @param bufferSize the buffer size that limits the number of items the connectable observable can replay + * @param time the duration of the window in which the replayed items must have been emitted + * @return an Observable that emits items that are the results of invoking the selector on items emitted by + * a `ConnectableObservable` that shares a single subscription to the source Observable, and + * replays no more than `bufferSize` items that were emitted within the window defined by `time` + */ + def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int, time: Duration): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.replay(fJava, bufferSize, time.length, time.unit)) + } + + /** + * Returns an Observable that emits items that are the results of invoking a specified selector on items + * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying no more than `bufferSize` items that were emitted within a specified time window. + *
+ * + * + * @param selector a selector function, which can use the multicasted sequence as many times as needed, without + * causing multiple subscriptions to the Observable + * @param bufferSize the buffer size that limits the number of items the connectable observable can replay + * @param time the duration of the window in which the replayed items must have been emitted + * @param scheduler the Scheduler that is the time source for the window + * @return an Observable that emits items that are the results of invoking the selector on items emitted by + * a `ConnectableObservable` that shares a single subscription to the source Observable, and + * replays no more than `bufferSize` items that were emitted within the window defined by `time` + * @throws IllegalArgumentException if `bufferSize` is less than zero + */ + def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int, time: Duration, scheduler: Scheduler): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.replay(fJava, bufferSize, time.length, time.unit, scheduler)) + } + + /** + * Returns an Observable that emits items that are the results of invoking a specified selector on items + * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying a maximum of `bufferSize` items. + *
+ * + * + * @param selector a selector function, which can use the multicasted sequence as many times as needed, without + * causing multiple subscriptions to the Observable + * @param bufferSize the buffer size that limits the number of items the connectable observable can replay + * @param scheduler the Scheduler on which the replay is observed + * @return an Observable that emits items that are the results of invoking the selector on items emitted by + * a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying no more than `bufferSize` notifications + */ + def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int, scheduler: Scheduler): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.replay(fJava, bufferSize, scheduler)) + } + + /** + * Returns an Observable that emits items that are the results of invoking a specified selector on items + * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying all items that were emitted within a specified time window. + *
+ * + * + * @param selector a selector function, which can use the multicasted sequence as many times as needed, without + * causing multiple subscriptions to the Observable + * @param time the duration of the window in which the replayed items must have been emitted + * @return an Observable that emits items that are the results of invoking the selector on items emitted by + * a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying all items that were emitted within the window defined by `time` + */ + def replay[U >: T, R](selector: Observable[U] => Observable[R], time: Duration, scheduler: Scheduler): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.replay(fJava, time.length, time.unit, scheduler)) + } + + /** + * Returns an Observable that emits items that are the results of invoking a specified selector on items + * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable. + *
+ * + * + * @param selector a selector function, which can use the multicasted sequence as many times as needed, without + * causing multiple subscriptions to the Observable + * @param scheduler the Scheduler where the replay is observed + * @return an Observable that emits items that are the results of invoking the selector on items emitted by + * a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying all items + */ + def replay[U >: T, R](selector: Observable[U] => Observable[R], scheduler: Scheduler): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.replay(fJava, scheduler)) + } + + /** + * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays at most `bufferSize` items that were emitted during a specified time window. + *
+ * + * + * @param bufferSize the buffer size that limits the number of items that can be replayed + * @param time the duration of the window in which the replayed items must have been emitted + * @return a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays at most `bufferSize` items that were emitted during the window defined by `time` + */ + def replay(bufferSize: Int, time: Duration): ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit)) + } + + /** + * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and + * that replays a maximum of `bufferSize` items that are emitted within a specified time window. + *
+ * + * + * @param bufferSize the buffer size that limits the number of items that can be replayed + * @param time the duration of the window in which the replayed items must have been emitted + * @param scheduler the scheduler that is used as a time source for the window + * @return a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays at most `bufferSize` items that were emitted during the window defined by `time`` + * @throws IllegalArgumentException if `bufferSize` is less than zero + */ + def replay(bufferSize: Int, time: Duration, scheduler: Scheduler): ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit, scheduler)) + } + + /** + * Returns an Observable that emits items that are the results of invoking a specified selector on items + * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying all items that were emitted within a specified time window. + *
+ * + * + * @param selector a selector function, which can use the multicasted sequence as many times as needed, without + * causing multiple subscriptions to the Observable + * @param time the duration of the window in which the replayed items must have been emitted + * @return an Observable that emits items that are the results of invoking the selector on items emitted by + * a `ConnectableObservable` that shares a single subscription to the source Observable, + * replaying all items that were emitted within the window defined by `time`` + */ + def replay[U >: T, R](selector: Observable[U] => Observable[R], time: Duration): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.replay(fJava, time.length, time.unit)) + } + + /** + * Returns a `ConnectableObservable` that shares a single subscription to the source Observable that + * replays at most `bufferSize` items emitted by that Observable. + *
+ * + * + * @param bufferSize the buffer size that limits the number of items that can be replayed + * @return a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays at most `bufferSize` items emitted by that Observable + */ + def replay(bufferSize: Int): ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.replay(bufferSize)) + } + + /** + * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays at most `bufferSize` items emitted by that Observable. + *
+ * + * + * @param bufferSize the buffer size that limits the number of items that can be replayed + * @param scheduler the scheduler on which the Observers will observe the emitted items + * @return a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays at most `bufferSize` items that were emitted by the Observable + */ + def replay(bufferSize: Int, scheduler: Scheduler): ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.replay(bufferSize, scheduler)) + } + + /** + * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays all items emitted by that Observable within a specified time window. + *
+ * + * + * @param time the duration of the window in which the replayed items must have been emitted + * @return a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays the items that were emitted during the window defined by `time` + */ + def replay(time: Duration): ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.replay(time.length, time.unit)) + } + + /** + * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays all items emitted by that Observable within a specified time window. + *
+ * + * + * @param time the duration of the window in which the replayed items must have been emitted + * @param scheduler the Scheduler that is the time source for the window + * @return a `ConnectableObservable` that shares a single subscription to the source Observable and + * replays the items that were emitted during the window defined by `time` + */ + def replay(time: Duration, scheduler: Scheduler): ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.replay(time.length, time.unit, scheduler)) + } + + /** + * Returns a `ConnectableObservable` that shares a single subscription to the source Observable that + * will replay all of its items and notifications to any future `Observer` on the given `Scheduler`. + *
+ * + * + * @param scheduler the Scheduler on which the Observers will observe the emitted items + * @return a `ConnectableObservable` that shares a single subscription to the source Observable that + * will replay all of its items and notifications to any future `bserver` on the given `Scheduler` + */ + def replay(scheduler: Scheduler): ConnectableObservable[T] = { + new ConnectableObservable[T](asJavaObservable.replay(scheduler)) } /** @@ -1198,7 +1484,7 @@ trait Observable[+T] * * @return an [[rx.lang.scala.observables.ConnectableObservable]]. */ - def publish(): ConnectableObservable[T] = { + def publish: ConnectableObservable[T] = { new ConnectableObservable[T](asJavaObservable.publish()) } @@ -2509,7 +2795,7 @@ trait Observable[+T] * emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted]. * @return Observable with retry logic. */ - def retry(): Observable[T] = { + def retry: Observable[T] = { toScalaObservable[T](asJavaObservable.retry()) } @@ -2522,7 +2808,7 @@ trait Observable[+T] * @see RxJava Wiki: repeat() * @see MSDN: Observable.Repeat */ - def repeat(): Observable[T] = { + def repeat: Observable[T] = { toScalaObservable[T](asJavaObservable.repeat()) } @@ -2856,24 +3142,6 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler)) } - /** - * Returns an Observable that emits the single item at a specified index in a sequence of emissions from a - * source Observbable. - * - * - * - * @param index - * the zero-based index of the item to retrieve - * @return an Observable that emits a single item: the item at the specified position in the sequence of - * those emitted by the source Observable - * @throws IndexOutOfBoundsException - * if index is greater than or equal to the number of items emitted by the source - * Observable, or index is less than 0 - * @see `Observable.elementAt` - */ - @deprecated("Use `elementAt`", "0.18.0") - def apply(index: Int): Observable[T] = elementAt(index) - /** * Returns an Observable that emits the single item at a specified index in a sequence of emissions from a * source Observbable. diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index ddd30e74..cd5c6a1d 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -89,20 +89,27 @@ class CompletenessTest extends JUnitSuite { "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])", "lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])", "mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]", + "multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])", + "multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])", "onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])", "onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])", "onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)", "onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])", "parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])", "parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)", - "publish()" -> "publish()", "publish(T)" -> "publish(U)", "publish(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publish(Observable[U] => Observable[R])", "publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[U] => Observable[R], U)", "reduce(Func2[T, T, T])" -> "reduce((U, U) => U)", "reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", - "repeat()" -> "repeat()", - "retry()" -> "retry()", + "replay(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "replay(Observable[U] => Observable[R])", + "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int)" -> "replay(Observable[U] => Observable[R], Int)", + "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int, Long, TimeUnit)" -> "replay(Observable[U] => Observable[R], Int, Duration)", + "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int, Long, TimeUnit, Scheduler)" -> "replay(Observable[U] => Observable[R], Int, Duration, Scheduler)", + "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int, Scheduler)" -> "replay(Observable[U] => Observable[R], Int, Scheduler)", + "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit)" -> "replay(Observable[U] => Observable[R], Duration)", + "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit, Scheduler)" -> "replay(Observable[U] => Observable[R], Duration, Scheduler)", + "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Scheduler)" -> "replay(Observable[U] => Observable[R], Scheduler)", "scan(Func2[T, T, T])" -> unnecessary, "scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)", "skip(Int)" -> "drop(Int)",