Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add sample() overload that can emit the very last buffered item #4955

Merged
merged 1 commit into from
Jan 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 114 additions & 2 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -11530,6 +11530,41 @@ public final Flowable<T> sample(long period, TimeUnit unit) {
return sample(period, unit, Schedulers.computation());
}

/**
* Returns a Flowable that emits the most recently emitted item (if any) emitted by the source Publisher
* within periodic time intervals and optionally emit the very last upstream item when the upstream completes.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code sample} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param period
* the sampling rate
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @param emitLast
* if true and the upstream completes while there is still an unsampled item available,
* that item is emitted to downstream before completion
* if false, an unsampled last item is ignored.
* @return a Flowable that emits the results of sampling the items emitted by the source Publisher at
* the specified time interval
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #throttleLast(long, TimeUnit)
* @since 2.0.5 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@Experimental
public final Flowable<T> sample(long period, TimeUnit unit, boolean emitLast) {
return sample(period, unit, Schedulers.computation(), emitLast);
}

/**
* Returns a Flowable that emits the most recently emitted item (if any) emitted by the source Publisher
* within periodic time intervals, where the intervals are defined on a particular Scheduler.
Expand Down Expand Up @@ -11560,7 +11595,47 @@ public final Flowable<T> sample(long period, TimeUnit unit) {
public final Flowable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<T>(this, period, unit, scheduler));
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<T>(this, period, unit, scheduler, false));
}

/**
* Returns a Flowable that emits the most recently emitted item (if any) emitted by the source Publisher
* within periodic time intervals, where the intervals are defined on a particular Scheduler
* and optionally emit the very last upstream item when the upstream completes.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param period
* the sampling rate
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @param scheduler
* the {@link Scheduler} to use when sampling
* @param emitLast
* if true and the upstream completes while there is still an unsampled item available,
* that item is emitted to downstream before completion
* if false, an unsampled last item is ignored.
* @return a Flowable that emits the results of sampling the items emitted by the source Publisher at
* the specified time interval
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #throttleLast(long, TimeUnit, Scheduler)
* @since 2.0.5 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Flowable<T> sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<T>(this, period, unit, scheduler, emitLast));
}

/**
Expand Down Expand Up @@ -11590,7 +11665,44 @@ public final Flowable<T> sample(long period, TimeUnit unit, Scheduler scheduler)
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Flowable<T> sample(Publisher<U> sampler) {
ObjectHelper.requireNonNull(sampler, "sampler is null");
return RxJavaPlugins.onAssembly(new FlowableSamplePublisher<T>(this, sampler));
return RxJavaPlugins.onAssembly(new FlowableSamplePublisher<T>(this, sampler, false));
}

/**
* Returns a Flowable that, when the specified {@code sampler} Publisher emits an item or completes,
* emits the most recently emitted item (if any) emitted by the source Publisher since the previous
* emission from the {@code sampler} Publisher
* and optionally emit the very last upstream item when the upstream or other Publisher complete.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.o.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses the emissions of the {@code sampler}
* Publisher to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code sample} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U> the element type of the sampler Publisher
* @param sampler
* the Publisher to use for sampling the source Publisher
* @param emitLast
* if true and the upstream completes while there is still an unsampled item available,
* that item is emitted to downstream before completion
* if false, an unsampled last item is ignored.
* @return a Flowable that emits the results of sampling the items emitted by this Publisher whenever
* the {@code sampler} Publisher emits an item or completes
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @since 2.0.5 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <U> Flowable<T> sample(Publisher<U> sampler, boolean emitLast) {
ObjectHelper.requireNonNull(sampler, "sampler is null");
return RxJavaPlugins.onAssembly(new FlowableSamplePublisher<T>(this, sampler, emitLast));
}

/**
Expand Down
104 changes: 102 additions & 2 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9610,6 +9610,37 @@ public final Observable<T> sample(long period, TimeUnit unit) {
return sample(period, unit, Schedulers.computation());
}

/**
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source ObservableSource
* within periodic time intervals and optionally emit the very last upstream item when the upstream completes.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code sample} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param period
* the sampling rate
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @return an Observable that emits the results of sampling the items emitted by the source ObservableSource at
* the specified time interval
* @param emitLast
* if true and the upstream completes while there is still an unsampled item available,
* that item is emitted to downstream before completion
* if false, an unsampled last item is ignored.
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see #throttleLast(long, TimeUnit)
* @since 2.0.5 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@Experimental
public final Observable<T> sample(long period, TimeUnit unit, boolean emitLast) {
return sample(period, unit, Schedulers.computation(), emitLast);
}

/**
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source ObservableSource
* within periodic time intervals, where the intervals are defined on a particular Scheduler.
Expand All @@ -9636,7 +9667,43 @@ public final Observable<T> sample(long period, TimeUnit unit) {
public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<T>(this, period, unit, scheduler));
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<T>(this, period, unit, scheduler, false));
}

/**
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source ObservableSource
* within periodic time intervals, where the intervals are defined on a particular Scheduler
* and optionally emit the very last upstream item when the upstream completes.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param period
* the sampling rate
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @param scheduler
* the {@link Scheduler} to use when sampling
* @param emitLast
* if true and the upstream completes while there is still an unsampled item available,
* that item is emitted to downstream before completion
* if false, an unsampled last item is ignored.
* @return an Observable that emits the results of sampling the items emitted by the source ObservableSource at
* the specified time interval
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @see #throttleLast(long, TimeUnit, Scheduler)
* @since 2.0.5 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<T>(this, period, unit, scheduler, emitLast));
}

/**
Expand All @@ -9662,7 +9729,40 @@ public final Observable<T> sample(long period, TimeUnit unit, Scheduler schedule
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<T> sample(ObservableSource<U> sampler) {
ObjectHelper.requireNonNull(sampler, "sampler is null");
return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable<T>(this, sampler));
return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable<T>(this, sampler, false));
}

/**
* Returns an Observable that, when the specified {@code sampler} ObservableSource emits an item or completes,
* emits the most recently emitted item (if any) emitted by the source ObservableSource since the previous
* emission from the {@code sampler} ObservableSource
* and optionally emit the very last upstream item when the upstream or other ObservableSource complete.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.o.png" alt="">
* <dl>
* ObservableSource to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code sample} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U> the element type of the sampler ObservableSource
* @param sampler
* the ObservableSource to use for sampling the source ObservableSource
* @param emitLast
* if true and the upstream completes while there is still an unsampled item available,
* that item is emitted to downstream before completion
* if false, an unsampled last item is ignored.
* @return an Observable that emits the results of sampling the items emitted by this ObservableSource whenever
* the {@code sampler} ObservableSource emits an item or completes
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
* @since 2.0.5 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <U> Observable<T> sample(ObservableSource<U> sampler, boolean emitLast) {
ObjectHelper.requireNonNull(sampler, "sampler is null");
return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable<T>(this, sampler, emitLast));
}

/**
Expand Down
Loading