diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala index 049a02cd..42e3273a 100755 --- a/src/main/scala/rx/lang/scala/Observable.scala +++ b/src/main/scala/rx/lang/scala/Observable.scala @@ -18,6 +18,7 @@ package rx.lang.scala import rx.functions.FuncN import rx.lang.scala.observables.ConnectableObservable +import rx.lang.scala.schedulers.{ImmediateScheduler, TrampolineScheduler, ComputationScheduler} import scala.concurrent.duration import java.util import collection.JavaConversions._ @@ -353,7 +354,7 @@ trait Observable[+T] * @return an Observable that emits timestamped items from the source * Observable with timestamps provided by the given Scheduler */ - def timestamp(scheduler: Scheduler): Observable[(Long, T)] = { + def timestamp(scheduler: Scheduler = ImmediateScheduler()): Observable[(Long, T)] = { toScalaObservable[rx.schedulers.Timestamped[_ <: T]](asJavaObservable.timestamp(scheduler)) .map((t: rx.schedulers.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue)) } @@ -548,7 +549,7 @@ trait Observable[+T] * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers which are emitted after * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). */ - def tumblingBuffer(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Seq[T]] = { + def tumblingBuffer(timespan: Duration, count: Int, scheduler: Scheduler = ComputationScheduler()): Observable[Seq[T]] = { val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit, count, scheduler) Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]]) } @@ -569,7 +570,7 @@ trait Observable[+T] * An [[rx.lang.scala.Observable]] which produces new buffers periodically, and these are emitted after * a fixed timespan has elapsed. */ - def slidingBuffer(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Seq[T]] = { + def slidingBuffer(timespan: Duration, timeshift: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Seq[T]] = { val span: Long = timespan.length val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit) val unit: TimeUnit = timespan.unit @@ -707,7 +708,7 @@ trait Observable[+T] * @return * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows with a fixed duration. */ - def tumbling(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]] = { + def tumbling(timespan: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Observable[T]] = { Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit, scheduler)) : Observable[Observable[T]] // SI-7818 } @@ -770,7 +771,7 @@ trait Observable[+T] * An [[rx.lang.scala.Observable]] which produces new windows periodically, and these are emitted after * a fixed timespan has elapsed. */ - def sliding(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Observable[T]] = { + def sliding(timespan: Duration, timeshift: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Observable[T]] = { val span: Long = timespan.length val shift: Long = timespan.unit.convert(timeshift.length, timeshift.unit) val unit: TimeUnit = timespan.unit @@ -1257,7 +1258,7 @@ trait Observable[+T] * replays no more than `bufferSize` items that were emitted within the window defined by `time` * @throws IllegalArgumentException if `bufferSize` is less than zero */ - def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int, time: Duration, scheduler: Scheduler): Observable[R] = { + def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int, time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[R] = { val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]] val fJava: Func1[rx.Observable[T], rx.Observable[R]] = (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] @@ -1664,7 +1665,7 @@ trait Observable[+T] * @return an Observable that emits the results of sampling the items emitted by the source * Observable at the specified time interval */ - def sample(duration: Duration, scheduler: Scheduler): Observable[T] = { + def sample(duration: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.sample(duration.length, duration.unit, scheduler)) } @@ -1775,7 +1776,7 @@ trait Observable[+T] * @return an Observable that drops values emitted by the source Observable before the time window defined * by `time` elapses and emits the remainder */ - def drop(time: Duration, scheduler: Scheduler): Observable[T] = { + def drop(time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable(asJavaObservable.skip(time.length, time.unit, scheduler)) } @@ -1826,7 +1827,7 @@ trait Observable[+T] * @return an Observable that drops those items emitted by the source Observable in a time window before the * source completes defined by `time` and `scheduler` */ - def dropRight(time: Duration, scheduler: Scheduler): Observable[T] = { + def dropRight(time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable(asJavaObservable.skipLast(time.length, time.unit, scheduler)) } @@ -1877,7 +1878,7 @@ trait Observable[+T] * @return an Observable that emits those items emitted by the source Observable before the time runs out, * according to the specified Scheduler */ - def take(time: Duration, scheduler: Scheduler) { + def take(time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.take(time.length, time.unit, scheduler.asJavaScheduler)) } @@ -1926,7 +1927,7 @@ trait Observable[+T] * time before the Observable completed specified by `time`, where the timing information is * provided by `scheduler` */ - def takeRight(time: Duration, scheduler: Scheduler): Observable[T] = { + def takeRight(time: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.takeLast(time.length, time.unit, scheduler.asJavaScheduler)) } @@ -2345,7 +2346,7 @@ trait Observable[+T] * @return Observable which performs the throttle operation. * @see `Observable.throttleWithTimeout` */ - def debounce(timeout: Duration, scheduler: Scheduler): Observable[T] = { + def debounce(timeout: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.debounce(timeout.length, timeout.unit, scheduler)) } @@ -2363,7 +2364,7 @@ trait Observable[+T] * @return Observable which performs the throttle operation. * @see `Observable.debounce` */ - def throttleWithTimeout(timeout: Duration, scheduler: Scheduler): Observable[T] = { + def throttleWithTimeout(timeout: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit, scheduler)) } @@ -2380,7 +2381,7 @@ trait Observable[+T] * The [[rx.lang.scala.Scheduler]] to use internally to manage the timers which handle timeout for each event. * @return Observable which performs the throttle operation. */ - def throttleFirst(skipDuration: Duration, scheduler: Scheduler): Observable[T] = { + def throttleFirst(skipDuration: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.throttleFirst(skipDuration.length, skipDuration.unit, scheduler)) } @@ -2410,7 +2411,7 @@ trait Observable[+T] * Duration of windows within with the last value will be chosen. * @return Observable which performs the throttle operation. */ - def throttleLast(intervalDuration: Duration, scheduler: Scheduler): Observable[T] = { + def throttleLast(intervalDuration: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler)) } @@ -2447,7 +2448,7 @@ trait Observable[+T] * @return the source Observable modified to notify observers of a * `TimeoutException` in case of a timeout */ - def timeout(timeout: Duration, scheduler: Scheduler): Observable[T] = { + def timeout(timeout: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit, scheduler.asJavaScheduler)) } @@ -2992,7 +2993,7 @@ trait Observable[+T] * @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example * @since 0.20 */ - def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler): Observable[T] = { + def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler = TrampolineScheduler()): Observable[T] = { val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] = (jOt: rx.Observable[_ <: Throwable]) => { val ot = toScalaObservable[Throwable](jOt) @@ -3044,7 +3045,7 @@ trait Observable[+T] * @see RxJava Wiki: repeat() * @see MSDN: Observable.Repeat */ - def repeat(count: Long, scheduler: Scheduler): Observable[T] = { + def repeat(count: Long, scheduler: Scheduler = TrampolineScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.repeat(count, scheduler)) } @@ -3098,7 +3099,7 @@ trait Observable[+T] * @see MSDN: Observable.Repeat * @since 0.20 */ - def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = { + def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler = TrampolineScheduler()): Observable[T] = { val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] = (jOv: rx.Observable[_ <: Void]) => { val ov = toScalaObservable[Void](jOv) @@ -3311,7 +3312,7 @@ trait Observable[+T] * @param scheduler the Scheduler to use for delaying * @return the source Observable shifted in time by the specified delay */ - def delay(delay: Duration, scheduler: Scheduler): Observable[T] = { + def delay(delay: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit, scheduler)) } @@ -3377,7 +3378,7 @@ trait Observable[+T] * @return an Observable that delays the subscription to the source Observable by a given * amount, waiting and subscribing on the given Scheduler */ - def delaySubscription(delay: Duration, scheduler: Scheduler): Observable[T] = { + def delaySubscription(delay: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[T] = { toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler)) } @@ -3522,7 +3523,7 @@ trait Observable[+T] * @param scheduler the [[Scheduler]] used to compute time intervals * @return an Observable that emits time interval information items */ - def timeInterval(scheduler: Scheduler): Observable[(Duration, T)] = { + def timeInterval(scheduler: Scheduler = ImmediateScheduler()): Observable[(Duration, T)] = { toScalaObservable(asJavaObservable.timeInterval(scheduler.asJavaScheduler)) .map(inv => (Duration(inv.getIntervalInMilliseconds, MILLISECONDS), inv.getValue)) } @@ -4249,7 +4250,7 @@ object Observable { * the scheduler to use * @return An Observable that emits a number each time interval. */ - def interval(period: Duration, scheduler: Scheduler): Observable[Long] = { + def interval(period: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Long] = { toScalaObservable[java.lang.Long](rx.Observable.interval(period.length, period.unit, scheduler)).map(_.longValue()) } @@ -4299,7 +4300,7 @@ object Observable { * @param scheduler the Scheduler to use for scheduling the item * @return Observable that emits `0L` after a specified delay, on a specified Scheduler, and then completes */ - def timer(delay: Duration, scheduler: Scheduler): Observable[Long] = { + def timer(delay: Duration, scheduler: Scheduler = ComputationScheduler()): Observable[Long] = { toScalaObservable[java.lang.Long](rx.Observable.timer(delay.length, delay.unit, scheduler)).map(_.longValue()) }