From bf8da158a9b9e73b618bd5dd079dc2814c46a2fe Mon Sep 17 00:00:00 2001 From: Des Petrov Date: Mon, 9 Jan 2023 18:48:44 +0000 Subject: [PATCH] Add onDropped callback for throttleWithTimeout - #7458 (#7510) --- .../io/reactivex/rxjava3/core/Flowable.java | 93 +++++++++++++++- .../io/reactivex/rxjava3/core/Observable.java | 85 ++++++++++++++- .../flowable/FlowableDebounceTimed.java | 35 ++++-- .../observable/ObservableDebounceTimed.java | 34 ++++-- .../flowable/FlowableDebounceTest.java | 101 +++++++++++++++--- .../observable/ObservableDebounceTest.java | 73 ++++++++++++- .../ParamValidationCheckerTest.java | 4 + 7 files changed, 387 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 4873837033..b7781c57f8 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -8930,7 +8930,57 @@ public final Flowable debounce(long timeout, @NonNull TimeUnit unit) { public final Flowable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler)); + return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, null)); + } + + /** + * Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the + * current {@code Flowable} that are followed by newer items before a timeout value expires on a specified + * {@link Scheduler}. The timer resets on each emission. + *

+ * Note: If items keep being emitted by the current {@code Flowable} faster than the timeout then no items + * will be emitted by the resulting {@code Flowable}. + *

+ * + *

+ * Delivery of the item after the grace period happens on the given {@code Scheduler}'s + * {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the + * {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation + * (yielding an {@code InterruptedException}). It is recommended processing items + * that may take long time to be moved to another thread via {@link #observeOn} applied after + * {@code debounce} itself. + *

+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + * @param timeout + * the time each item has to be "the most recent" of those emitted by the current {@code Flowable} to + * ensure that it's not dropped + * @param unit + * the unit of time for the specified {@code timeout} + * @param scheduler + * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each + * item + * @param onDropped + * called with the current entry when it has been replaced by a new one + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Debounce + * @see RxJava wiki: Backpressure + * @see #throttleWithTimeout(long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.ERROR) + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Flowable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + Objects.requireNonNull(onDropped, "onDropped is null"); + return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, onDropped)); } /** @@ -17587,6 +17637,47 @@ public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit uni return debounce(timeout, unit, scheduler); } + /** + * Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the + * current {@code Flowable} that are followed by newer items before a timeout value expires on a specified + * {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}). + *

+ * Note: If items keep being emitted by the current {@code Flowable} faster than the timeout then no items + * will be emitted by the resulting {@code Flowable}. + *

+ * + *

+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + * @param timeout + * the length of the window of time that must pass after the emission of an item from the current + * {@code Flowable} in which it emits no items in order for the item to be emitted by the + * resulting {@code Flowable} + * @param unit + * the unit of time for the specified {@code timeout} + * @param scheduler + * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each + * item + * @param onDropped + * called with the current entry when it has been replaced by a new one + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Debounce + * @see RxJava wiki: Backpressure + * @see #debounce(long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.ERROR) + @SchedulerSupport(SchedulerSupport.CUSTOM) + @NonNull + public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + return debounce(timeout, unit, scheduler, onDropped); + } + /** * Returns a {@code Flowable} that emits records of the time interval between consecutive items emitted by the * current {@code Flowable}. diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index b6fdb534ec..5bb6508994 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -7896,7 +7896,53 @@ public final Observable debounce(long timeout, @NonNull TimeUnit unit) { public final Observable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler)); + return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler, null)); + } + + /** + * Returns an {@code Observable} that mirrors the current {@code Observable}, except that it drops items emitted by the + * current {@code Observable} that are followed by newer items before a timeout value expires on a specified + * {@link Scheduler}. The timer resets on each emission. + *

+ * Note: If items keep being emitted by the current {@code Observable} faster than the timeout then no items + * will be emitted by the resulting {@code Observable}. + *

+ * + *

+ * Delivery of the item after the grace period happens on the given {@code Scheduler}'s + * {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the + * {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation + * (yielding an {@code InterruptedException}). It is recommended processing items + * that may take long time to be moved to another thread via {@link #observeOn} applied after + * {@code debounce} itself. + *

+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + * @param timeout + * the time each item has to be "the most recent" of those emitted by the current {@code Observable} to + * ensure that it's not dropped + * @param unit + * the unit of time for the specified {@code timeout} + * @param scheduler + * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each + * item + * @param onDropped + * called with the current entry when it has been replaced by a new one + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} } or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Debounce + * @see #throttleWithTimeout(long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + @NonNull + public final Observable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + Objects.requireNonNull(onDropped, "onDropped is null"); + return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler, onDropped)); } /** @@ -14597,6 +14643,43 @@ public final Observable throttleWithTimeout(long timeout, @NonNull TimeUnit u return debounce(timeout, unit, scheduler); } + /** + * Returns an {@code Observable} that mirrors the current {@code Observable}, except that it drops items emitted by the + * current {@code Observable} that are followed by newer items before a timeout value expires on a specified + * {@link Scheduler}. The timer resets on each emission (Alias to {@link #debounce(long, TimeUnit, Scheduler)}). + *

+ * Note: If items keep being emitted by the current {@code Observable} faster than the timeout then no items + * will be emitted by the resulting {@code Observable}. + *

+ * + *

+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + * @param timeout + * the length of the window of time that must pass after the emission of an item from the current + * {@code Observable}, in which the current {@code Observable} emits no items, in order for the item to be emitted by the + * resulting {@code Observable} + * @param unit + * the unit of time for the specified {@code timeout} + * @param scheduler + * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each + * item + * @param onDropped + * called with the current entry when it has been replaced by a new one + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Debounce + * @see #debounce(long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + @NonNull + public final Observable throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + return debounce(timeout, unit, scheduler, onDropped); + } + /** * Returns an {@code Observable} that emits records of the time interval between consecutive items emitted by the * current {@code Observable}. diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java index 30b398ab7b..086dd6c1fd 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java @@ -16,6 +16,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.*; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Consumer; import org.reactivestreams.*; import io.reactivex.rxjava3.core.*; @@ -32,19 +34,20 @@ public final class FlowableDebounceTimed extends AbstractFlowableWithUpstream final long timeout; final TimeUnit unit; final Scheduler scheduler; + final Consumer onDropped; - public FlowableDebounceTimed(Flowable source, long timeout, TimeUnit unit, Scheduler scheduler) { + public FlowableDebounceTimed(Flowable source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { super(source); this.timeout = timeout; this.unit = unit; this.scheduler = scheduler; + this.onDropped = onDropped; } @Override protected void subscribeActual(Subscriber s) { source.subscribe(new DebounceTimedSubscriber<>( - new SerializedSubscriber<>(s), - timeout, unit, scheduler.createWorker())); + new SerializedSubscriber<>(s), timeout, unit, scheduler.createWorker(), onDropped)); } static final class DebounceTimedSubscriber extends AtomicLong @@ -55,20 +58,22 @@ static final class DebounceTimedSubscriber extends AtomicLong final long timeout; final TimeUnit unit; final Scheduler.Worker worker; + final Consumer onDropped; Subscription upstream; - Disposable timer; + DebounceEmitter timer; volatile long index; boolean done; - DebounceTimedSubscriber(Subscriber actual, long timeout, TimeUnit unit, Worker worker) { + DebounceTimedSubscriber(Subscriber actual, long timeout, TimeUnit unit, Worker worker, Consumer onDropped) { this.downstream = actual; this.timeout = timeout; this.unit = unit; this.worker = worker; + this.onDropped = onDropped; } @Override @@ -93,6 +98,18 @@ public void onNext(T t) { d.dispose(); } + if (onDropped != null && timer != null) { + try { + onDropped.accept(timer.value); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + done = true; + downstream.onError(ex); + worker.dispose(); + } + } + DebounceEmitter de = new DebounceEmitter<>(t, idx, this); timer = de; d = worker.schedule(de, timeout, unit); @@ -121,15 +138,13 @@ public void onComplete() { } done = true; - Disposable d = timer; + DebounceEmitter d = timer; if (d != null) { d.dispose(); } - @SuppressWarnings("unchecked") - DebounceEmitter de = (DebounceEmitter)d; - if (de != null) { - de.emit(); + if (d != null) { + d.emit(); } downstream.onComplete(); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java index 248d00ea23..48861a778e 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java @@ -19,6 +19,8 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.core.Scheduler.Worker; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; import io.reactivex.rxjava3.observers.SerializedObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; @@ -27,19 +29,20 @@ public final class ObservableDebounceTimed extends AbstractObservableWithUpst final long timeout; final TimeUnit unit; final Scheduler scheduler; + final Consumer onDropped; - public ObservableDebounceTimed(ObservableSource source, long timeout, TimeUnit unit, Scheduler scheduler) { + public ObservableDebounceTimed(ObservableSource source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer onDropped) { super(source); this.timeout = timeout; this.unit = unit; this.scheduler = scheduler; + this.onDropped = onDropped; } @Override public void subscribeActual(Observer t) { source.subscribe(new DebounceTimedObserver<>( - new SerializedObserver<>(t), - timeout, unit, scheduler.createWorker())); + new SerializedObserver<>(t), timeout, unit, scheduler.createWorker(), onDropped)); } static final class DebounceTimedObserver @@ -48,20 +51,22 @@ static final class DebounceTimedObserver final long timeout; final TimeUnit unit; final Scheduler.Worker worker; + final Consumer onDropped; Disposable upstream; - Disposable timer; + DebounceEmitter timer; volatile long index; boolean done; - DebounceTimedObserver(Observer actual, long timeout, TimeUnit unit, Worker worker) { + DebounceTimedObserver(Observer actual, long timeout, TimeUnit unit, Worker worker, Consumer onDropped) { this.downstream = actual; this.timeout = timeout; this.unit = unit; this.worker = worker; + this.onDropped = onDropped; } @Override @@ -85,6 +90,17 @@ public void onNext(T t) { d.dispose(); } + if (onDropped != null && timer != null) { + try { + onDropped.accept(timer.value); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.dispose(); + downstream.onError(ex); + done = true; + } + } + DebounceEmitter de = new DebounceEmitter<>(t, idx, this); timer = de; d = worker.schedule(de, timeout, unit); @@ -113,15 +129,13 @@ public void onComplete() { } done = true; - Disposable d = timer; + DebounceEmitter d = timer; if (d != null) { d.dispose(); } - @SuppressWarnings("unchecked") - DebounceEmitter de = (DebounceEmitter)d; - if (de != null) { - de.run(); + if (d != null) { + d.run(); } downstream.onComplete(); worker.dispose(); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTest.java index 69027d5bd7..01122106c8 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.rxjava3.functions.Action; import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.*; @@ -41,16 +42,86 @@ public class FlowableDebounceTest extends RxJavaTest { private TestScheduler scheduler; - private Subscriber Subscriber; + private Subscriber subscriber; private Scheduler.Worker innerScheduler; @Before public void before() { scheduler = new TestScheduler(); - Subscriber = TestHelper.mockSubscriber(); + subscriber = TestHelper.mockSubscriber(); innerScheduler = scheduler.createWorker(); } + @Test + public void debounceWithOnDroppedCallbackWithEx() throws Throwable { + Flowable source = Flowable.unsafeCreate(new Publisher() { + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(new BooleanSubscription()); + publishNext(subscriber, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(subscriber, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(subscriber, 900, "three"); // Should be skipped since "four" will arrive before the timout expires. + publishNext(subscriber, 999, "four"); // Should be skipped since onComplete will arrive before the timeout expires. + publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires. + } + }); + + Action whenDisposed = mock(Action.class); + + Flowable sampled = source + .doOnCancel(whenDisposed) + .debounce(400, TimeUnit.MILLISECONDS, scheduler, e -> { + if ("three".equals(e)) { + throw new TestException("forced"); + } + }); + sampled.subscribe(subscriber); + + InOrder inOrder = inOrder(subscriber); + + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(subscriber, times(1)).onNext("two"); + inOrder.verify(subscriber, times(1)).onError(any(TestException.class)); + inOrder.verify(subscriber, times(0)).onNext("three"); + inOrder.verify(subscriber, times(0)).onNext("four"); + inOrder.verify(subscriber, times(0)).onComplete(); + inOrder.verifyNoMoreInteractions(); + verify(whenDisposed).run(); + } + + @Test + public void debounceWithOnDroppedCallback() { + Flowable source = Flowable.unsafeCreate(new Publisher() { + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(new BooleanSubscription()); + publishNext(subscriber, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(subscriber, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(subscriber, 900, "three"); // Should be skipped since "four" will arrive before the timout expires. + publishNext(subscriber, 999, "four"); // Should be skipped since onComplete will arrive before the timeout expires. + publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires. + } + }); + + Observer dropCallbackObserver = TestHelper.mockObserver(); + Flowable sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler, dropCallbackObserver::onNext); + sampled.subscribe(subscriber); + + scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(subscriber); + InOrder dropCallbackOrder = inOrder(dropCallbackObserver); + + // must go to 800 since it must be 400 after when two is sent, which is at 400 + scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS); + inOrder.verify(subscriber, times(1)).onNext("two"); + dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("one"); + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("three"); + inOrder.verify(subscriber, times(1)).onComplete(); + inOrder.verifyNoMoreInteractions(); + dropCallbackOrder.verifyNoMoreInteractions(); + } + @Test public void debounceWithCompleted() { Flowable source = Flowable.unsafeCreate(new Publisher() { @@ -65,15 +136,15 @@ public void subscribe(Subscriber subscriber) { }); Flowable sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler); - sampled.subscribe(Subscriber); + sampled.subscribe(subscriber); scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); - InOrder inOrder = inOrder(Subscriber); + InOrder inOrder = inOrder(subscriber); // must go to 800 since it must be 400 after when two is sent, which is at 400 scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS); - inOrder.verify(Subscriber, times(1)).onNext("two"); + inOrder.verify(subscriber, times(1)).onNext("two"); scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); - inOrder.verify(Subscriber, times(1)).onComplete(); + inOrder.verify(subscriber, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); } @@ -97,13 +168,13 @@ public void subscribe(Subscriber subscriber) { }); Flowable sampled = source.debounce(200, TimeUnit.MILLISECONDS, scheduler); - sampled.subscribe(Subscriber); + sampled.subscribe(subscriber); scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); - InOrder inOrder = inOrder(Subscriber); - inOrder.verify(Subscriber, times(0)).onNext(anyString()); + InOrder inOrder = inOrder(subscriber); + inOrder.verify(subscriber, times(0)).onNext(anyString()); scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); - inOrder.verify(Subscriber, times(1)).onComplete(); + inOrder.verify(subscriber, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); } @@ -121,15 +192,15 @@ public void subscribe(Subscriber subscriber) { }); Flowable sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler); - sampled.subscribe(Subscriber); + sampled.subscribe(subscriber); scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); - InOrder inOrder = inOrder(Subscriber); + InOrder inOrder = inOrder(subscriber); // 100 + 400 means it triggers at 500 scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); - inOrder.verify(Subscriber).onNext("one"); + inOrder.verify(subscriber).onNext("one"); scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS); - inOrder.verify(Subscriber).onError(any(TestException.class)); + inOrder.verify(subscriber).onError(any(TestException.class)); inOrder.verifyNoMoreInteractions(); } @@ -530,7 +601,7 @@ public void timedBadRequest() { public void timedLateEmit() { TestSubscriber ts = new TestSubscriber<>(); DebounceTimedSubscriber sub = new DebounceTimedSubscriber<>( - ts, 1, TimeUnit.SECONDS, new TestScheduler().createWorker()); + ts, 1, TimeUnit.SECONDS, new TestScheduler().createWorker(), null); sub.onSubscribe(new BooleanSubscription()); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTest.java index b43b1d2d27..fc4e7d8478 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.rxjava3.functions.Action; import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.Publisher; @@ -50,6 +51,76 @@ public void before() { innerScheduler = scheduler.createWorker(); } + @Test + public void debounceWithOnDroppedCallbackWithEx() throws Throwable { + Observable source = Observable.unsafeCreate(new ObservableSource() { + @Override + public void subscribe(Observer observer) { + observer.onSubscribe(Disposable.empty()); + publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(observer, 900, "three"); // Should be skipped since onComplete will arrive before the timeout expires. + publishNext(observer, 999, "four"); // Should be skipped since onComplete will arrive before the timeout expires. + publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. + } + }); + + Action whenDisposed = mock(Action.class); + Observable sampled = source + .doOnDispose(whenDisposed) + .debounce(400, TimeUnit.MILLISECONDS, scheduler, e -> { + if ("three".equals(e)) { + throw new TestException("forced"); + } + }); + sampled.subscribe(observer); + + scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(observer); + // must go to 800 since it must be 400 after when two is sent, which is at 400 + scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("two"); + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onError(any(TestException.class)); + inOrder.verify(observer, never()).onNext("three"); + inOrder.verify(observer, never()).onNext("four"); + inOrder.verify(observer, never()).onComplete(); + inOrder.verifyNoMoreInteractions(); + verify(whenDisposed).run(); + } + + @Test + public void debounceWithOnDroppedCallback() { + Observable source = Observable.unsafeCreate(new ObservableSource() { + @Override + public void subscribe(Observer observer) { + observer.onSubscribe(Disposable.empty()); + publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(observer, 900, "three"); // Should be skipped since onComplete will arrive before the timeout expires. + publishNext(observer, 999, "four"); // Should be skipped since onComplete will arrive before the timeout expires. + publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. + } + }); + + Observer drops = TestHelper.mockObserver(); + InOrder inOrderDrops = inOrder(drops); + Observable sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler, drops::onNext); + sampled.subscribe(observer); + + scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(observer); + // must go to 800 since it must be 400 after when two is sent, which is at 400 + scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS); + inOrderDrops.verify(drops, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onNext("two"); + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrderDrops.verify(drops, times(1)).onNext("three"); + inOrder.verify(observer, times(1)).onComplete(); + inOrder.verifyNoMoreInteractions(); + inOrderDrops.verifyNoMoreInteractions(); + } + @Test public void debounceWithCompleted() { Observable source = Observable.unsafeCreate(new ObservableSource() { @@ -489,7 +560,7 @@ protected void subscribeActual( public void timedLateEmit() { TestObserver to = new TestObserver<>(); DebounceTimedObserver sub = new DebounceTimedObserver<>( - to, 1, TimeUnit.SECONDS, new TestScheduler().createWorker()); + to, 1, TimeUnit.SECONDS, new TestScheduler().createWorker(), null); sub.onSubscribe(Disposable.empty()); diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index ba377dbf63..8dd240119b 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -149,6 +149,7 @@ public void checkParallelFlowable() { // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class)); // null Action allowed addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "onBackpressureBuffer", Long.TYPE, Action.class, BackpressureOverflowStrategy.class)); @@ -177,6 +178,7 @@ public void checkParallelFlowable() { // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class)); // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class)); @@ -400,6 +402,7 @@ public void checkParallelFlowable() { // negative time is considered as zero time addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class)); // zero repeat is allowed addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE)); @@ -425,6 +428,7 @@ public void checkParallelFlowable() { // negative time is considered as zero time addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class)); // negative time is considered as zero time addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class));