From 5c89b89abdd482add032509811c09045423e0092 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 11 Oct 2017 14:13:12 +0200 Subject: [PATCH 1/3] 2.x: improve Flowable.timeout() --- .../operators/flowable/FlowableTimeout.java | 407 ++++++++++-------- .../flowable/FlowableTimeoutTimed.java | 298 +++++++------ .../reactivex/schedulers/TestScheduler.java | 6 +- .../reactivex/flowable/FlowableNullTests.java | 2 +- .../flowable/FlowableTimeoutTests.java | 90 +++- .../FlowableTimeoutWithSelectorTest.java | 218 ++++++++++ 6 files changed, 691 insertions(+), 330 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java index 7f9e7568eb..e5edb97783 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java @@ -14,7 +14,7 @@ package io.reactivex.internal.operators.flowable; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -22,12 +22,11 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; -import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.SequentialDisposable; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.subscribers.FullArbiterSubscriber; +import io.reactivex.internal.operators.flowable.FlowableTimeoutTimed.TimeoutSupport; import io.reactivex.internal.subscriptions.*; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subscribers.*; public final class FlowableTimeout extends AbstractFlowableWithUpstream { final Publisher firstTimeoutIndicator; @@ -48,299 +47,343 @@ public FlowableTimeout( @Override protected void subscribeActual(Subscriber s) { if (other == null) { - source.subscribe(new TimeoutSubscriber( - new SerializedSubscriber(s), - firstTimeoutIndicator, itemTimeoutIndicator)); + TimeoutSubscriber parent = new TimeoutSubscriber(s, itemTimeoutIndicator); + s.onSubscribe(parent); + parent.startFirstTimeout(firstTimeoutIndicator); + source.subscribe(parent); } else { - source.subscribe(new TimeoutOtherSubscriber( - s, firstTimeoutIndicator, itemTimeoutIndicator, other)); + TimeoutFallbackSubscriber parent = new TimeoutFallbackSubscriber(s, itemTimeoutIndicator, other); + s.onSubscribe(parent); + parent.startFirstTimeout(firstTimeoutIndicator); + source.subscribe(parent); } } - static final class TimeoutSubscriber implements FlowableSubscriber, Subscription, OnTimeout { + interface TimeoutSelectorSupport extends TimeoutSupport { + void onTimeoutError(long idx, Throwable ex); + } + + static final class TimeoutSubscriber extends AtomicLong + implements FlowableSubscriber, Subscription, TimeoutSelectorSupport { + + private static final long serialVersionUID = 3764492702657003550L; + final Subscriber actual; - final Publisher firstTimeoutIndicator; - final Function> itemTimeoutIndicator; - Subscription s; + final Function> itemTimeoutIndicator; - volatile boolean cancelled; + final SequentialDisposable task; - volatile long index; + final AtomicReference upstream; - final AtomicReference timeout = new AtomicReference(); + final AtomicLong requested; - TimeoutSubscriber(Subscriber actual, - Publisher firstTimeoutIndicator, - Function> itemTimeoutIndicator) { + TimeoutSubscriber(Subscriber actual, Function> itemTimeoutIndicator) { this.actual = actual; - this.firstTimeoutIndicator = firstTimeoutIndicator; this.itemTimeoutIndicator = itemTimeoutIndicator; + this.task = new SequentialDisposable(); + this.upstream = new AtomicReference(); + this.requested = new AtomicLong(); } @Override public void onSubscribe(Subscription s) { - if (!SubscriptionHelper.validate(this.s, s)) { - return; - } - this.s = s; - - if (cancelled) { - return; - } - - Subscriber a = actual; - - Publisher p = firstTimeoutIndicator; - - if (p != null) { - TimeoutInnerSubscriber tis = new TimeoutInnerSubscriber(this, 0); - - if (timeout.compareAndSet(null, tis)) { - a.onSubscribe(this); - p.subscribe(tis); - } - } else { - a.onSubscribe(this); - } + SubscriptionHelper.deferredSetOnce(upstream, requested, s); } @Override public void onNext(T t) { - long idx = index + 1; - index = idx; - - actual.onNext(t); + long idx = get(); + if (idx == Long.MAX_VALUE || !compareAndSet(idx, idx + 1)) { + return; + } - Disposable d = timeout.get(); + Disposable d = task.get(); if (d != null) { d.dispose(); } - Publisher p; + actual.onNext(t); + + Publisher itemTimeoutPublisher; try { - p = ObjectHelper.requireNonNull(itemTimeoutIndicator.apply(t), "The publisher returned is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - cancel(); - actual.onError(e); + itemTimeoutPublisher = ObjectHelper.requireNonNull( + itemTimeoutIndicator.apply(t), + "The itemTimeoutIndicator returned a null Publisher."); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.get().cancel(); + getAndSet(Long.MAX_VALUE); + actual.onError(ex); return; } - TimeoutInnerSubscriber tis = new TimeoutInnerSubscriber(this, idx); + TimeoutConsumer consumer = new TimeoutConsumer(idx + 1, this); + if (task.replace(consumer)) { + itemTimeoutPublisher.subscribe(consumer); + } + } - if (timeout.compareAndSet(d, tis)) { - p.subscribe(tis); + void startFirstTimeout(Publisher firstTimeoutIndicator) { + if (firstTimeoutIndicator != null) { + TimeoutConsumer consumer = new TimeoutConsumer(0L, this); + if (task.replace(consumer)) { + firstTimeoutIndicator.subscribe(consumer); + } } } @Override public void onError(Throwable t) { - cancel(); - actual.onError(t); + if (getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onError(t); + } else { + RxJavaPlugins.onError(t); + } } @Override public void onComplete() { - cancel(); - actual.onComplete(); - } + if (getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); - @Override - public void request(long n) { - s.request(n); + actual.onComplete(); + } } @Override - public void cancel() { - cancelled = true; - s.cancel(); - DisposableHelper.dispose(timeout); - } + public void onTimeout(long idx) { + if (compareAndSet(idx, Long.MAX_VALUE)) { + SubscriptionHelper.cancel(upstream); - @Override - public void timeout(long idx) { - if (idx == index) { - cancel(); actual.onError(new TimeoutException()); } } - } - - interface OnTimeout { - void timeout(long index); - - void onError(Throwable e); - } - - static final class TimeoutInnerSubscriber extends DisposableSubscriber { - final OnTimeout parent; - final long index; - - boolean done; - - TimeoutInnerSubscriber(OnTimeout parent, final long index) { - this.parent = parent; - this.index = index; - } @Override - public void onNext(Object t) { - if (done) { - return; + public void onTimeoutError(long idx, Throwable ex) { + if (compareAndSet(idx, Long.MAX_VALUE)) { + SubscriptionHelper.cancel(upstream); + + actual.onError(ex); + } else { + RxJavaPlugins.onError(ex); } - done = true; - cancel(); - parent.timeout(index); } @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - done = true; - parent.onError(t); + public void request(long n) { + SubscriptionHelper.deferredRequest(upstream, requested, n); } @Override - public void onComplete() { - if (done) { - return; - } - done = true; - parent.timeout(index); + public void cancel() { + SubscriptionHelper.cancel(upstream); + task.dispose(); } } - static final class TimeoutOtherSubscriber implements FlowableSubscriber, Disposable, OnTimeout { + static final class TimeoutFallbackSubscriber extends SubscriptionArbiter + implements FlowableSubscriber, TimeoutSelectorSupport { + + private static final long serialVersionUID = 3764492702657003550L; + final Subscriber actual; - final Publisher firstTimeoutIndicator; - final Function> itemTimeoutIndicator; - final Publisher other; - final FullArbiter arbiter; - Subscription s; + final Function> itemTimeoutIndicator; - boolean done; + final SequentialDisposable task; - volatile boolean cancelled; + final AtomicReference upstream; - volatile long index; + final AtomicLong index; - final AtomicReference timeout = new AtomicReference(); + Publisher fallback; - TimeoutOtherSubscriber(Subscriber actual, - Publisher firstTimeoutIndicator, - Function> itemTimeoutIndicator, Publisher other) { + long consumed; + + TimeoutFallbackSubscriber(Subscriber actual, + Function> itemTimeoutIndicator, + Publisher fallback) { this.actual = actual; - this.firstTimeoutIndicator = firstTimeoutIndicator; this.itemTimeoutIndicator = itemTimeoutIndicator; - this.other = other; - this.arbiter = new FullArbiter(actual, this, 8); + this.task = new SequentialDisposable(); + this.upstream = new AtomicReference(); + this.fallback = fallback; + this.index = new AtomicLong(); } @Override public void onSubscribe(Subscription s) { - if (!SubscriptionHelper.validate(this.s, s)) { - return; + if (SubscriptionHelper.setOnce(this.upstream, s)) { + setSubscription(s); } - this.s = s; + } - if (!arbiter.setSubscription(s)) { + @Override + public void onNext(T t) { + long idx = index.get(); + if (idx == Long.MAX_VALUE || !index.compareAndSet(idx, idx + 1)) { return; } - Subscriber a = actual; - Publisher p = firstTimeoutIndicator; + Disposable d = task.get(); + if (d != null) { + d.dispose(); + } + + consumed++; + + actual.onNext(t); - if (p != null) { - TimeoutInnerSubscriber tis = new TimeoutInnerSubscriber(this, 0); + Publisher itemTimeoutPublisher; + + try { + itemTimeoutPublisher = ObjectHelper.requireNonNull( + itemTimeoutIndicator.apply(t), + "The itemTimeoutIndicator returned a null Publisher."); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.get().cancel(); + index.getAndSet(Long.MAX_VALUE); + actual.onError(ex); + return; + } - if (timeout.compareAndSet(null, tis)) { - a.onSubscribe(arbiter); - p.subscribe(tis); + TimeoutConsumer consumer = new TimeoutConsumer(idx + 1, this); + if (task.replace(consumer)) { + itemTimeoutPublisher.subscribe(consumer); + } + } + + void startFirstTimeout(Publisher firstTimeoutIndicator) { + if (firstTimeoutIndicator != null) { + TimeoutConsumer consumer = new TimeoutConsumer(0L, this); + if (task.replace(consumer)) { + firstTimeoutIndicator.subscribe(consumer); } - } else { - a.onSubscribe(arbiter); } } @Override - public void onNext(T t) { - if (done) { - return; + public void onError(Throwable t) { + if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onError(t); + + task.dispose(); + } else { + RxJavaPlugins.onError(t); } - long idx = index + 1; - index = idx; + } - if (!arbiter.onNext(t, s)) { - return; + @Override + public void onComplete() { + if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onComplete(); + + task.dispose(); } + } - Disposable d = timeout.get(); - if (d != null) { - d.dispose(); + @Override + public void onTimeout(long idx) { + if (index.compareAndSet(idx, Long.MAX_VALUE)) { + SubscriptionHelper.cancel(upstream); + + Publisher f = fallback; + fallback = null; + + long c = consumed; + if (c != 0L) { + produced(c); + } + + f.subscribe(new FlowableTimeoutTimed.FallbackSubscriber(actual, this)); } + } - Publisher p; + @Override + public void onTimeoutError(long idx, Throwable ex) { + if (index.compareAndSet(idx, Long.MAX_VALUE)) { + SubscriptionHelper.cancel(upstream); - try { - p = ObjectHelper.requireNonNull(itemTimeoutIndicator.apply(t), "The publisher returned is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - actual.onError(e); - return; + actual.onError(ex); + } else { + RxJavaPlugins.onError(ex); } + } - TimeoutInnerSubscriber tis = new TimeoutInnerSubscriber(this, idx); + @Override + public void cancel() { + super.cancel(); + task.dispose(); + } + } + + static final class TimeoutConsumer extends AtomicReference + implements FlowableSubscriber, Disposable { + + private static final long serialVersionUID = 8708641127342403073L; - if (timeout.compareAndSet(d, tis)) { - p.subscribe(tis); + final TimeoutSelectorSupport parent; + + final long idx; + + TimeoutConsumer(long idx, TimeoutSelectorSupport parent) { + this.idx = idx; + this.parent = parent; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(Object t) { + if (get() != SubscriptionHelper.CANCELLED) { + get().cancel(); + lazySet(SubscriptionHelper.CANCELLED); + parent.onTimeout(idx); } } @Override public void onError(Throwable t) { - if (done) { + if (get() != SubscriptionHelper.CANCELLED) { + lazySet(SubscriptionHelper.CANCELLED); + parent.onTimeoutError(idx, t); + } else { RxJavaPlugins.onError(t); - return; } - done = true; - dispose(); - arbiter.onError(t, s); } @Override public void onComplete() { - if (done) { - return; + if (get() != SubscriptionHelper.CANCELLED) { + lazySet(SubscriptionHelper.CANCELLED); + parent.onTimeout(idx); } - done = true; - dispose(); - arbiter.onComplete(s); } @Override public void dispose() { - cancelled = true; - s.cancel(); - DisposableHelper.dispose(timeout); + SubscriptionHelper.cancel(this); } @Override public boolean isDisposed() { - return cancelled; - } - - @Override - public void timeout(long idx) { - if (idx == index) { - dispose(); - other.subscribe(new FullArbiterSubscriber(arbiter)); - } + return SubscriptionHelper.isCancelled(this.get()); } } + } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java index 5a42761cd9..e16f894736 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java @@ -14,16 +14,14 @@ package io.reactivex.internal.operators.flowable; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.Scheduler.Worker; -import io.reactivex.disposables.Disposable; -import io.reactivex.internal.subscribers.FullArbiterSubscriber; +import io.reactivex.internal.disposables.SequentialDisposable; import io.reactivex.internal.subscriptions.*; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subscribers.SerializedSubscriber; public final class FlowableTimeoutTimed extends AbstractFlowableWithUpstream { final long timeout; @@ -31,8 +29,6 @@ public final class FlowableTimeoutTimed extends AbstractFlowableWithUpstream< final Scheduler scheduler; final Publisher other; - static final Disposable NEW_TIMER = new EmptyDispose(); - public FlowableTimeoutTimed(Flowable source, long timeout, TimeUnit unit, Scheduler scheduler, Publisher other) { super(source); @@ -45,256 +41,282 @@ public FlowableTimeoutTimed(Flowable source, @Override protected void subscribeActual(Subscriber s) { if (other == null) { - source.subscribe(new TimeoutTimedSubscriber( - new SerializedSubscriber(s), // because errors can race - timeout, unit, scheduler.createWorker())); + TimeoutSubscriber parent = new TimeoutSubscriber(s, timeout, unit, scheduler.createWorker()); + s.onSubscribe(parent); + parent.startTimeout(0L); + source.subscribe(parent); } else { - source.subscribe(new TimeoutTimedOtherSubscriber( - s, // the FullArbiter serializes - timeout, unit, scheduler.createWorker(), other)); + TimeoutFallbackSubscriber parent = new TimeoutFallbackSubscriber(s, timeout, unit, scheduler.createWorker(), other); + s.onSubscribe(parent); + parent.startTimeout(0L); + source.subscribe(parent); } } - static final class TimeoutTimedOtherSubscriber implements FlowableSubscriber, Disposable { + static final class TimeoutSubscriber extends AtomicLong + implements FlowableSubscriber, Subscription, TimeoutSupport { + + private static final long serialVersionUID = 3764492702657003550L; + final Subscriber actual; + final long timeout; - final TimeUnit unit; - final Scheduler.Worker worker; - final Publisher other; - Subscription s; + final TimeUnit unit; - final FullArbiter arbiter; + final Scheduler.Worker worker; - Disposable timer; + final SequentialDisposable task; - volatile long index; + final AtomicReference upstream; - volatile boolean done; + final AtomicLong requested; - TimeoutTimedOtherSubscriber(Subscriber actual, long timeout, TimeUnit unit, Worker worker, - Publisher other) { + TimeoutSubscriber(Subscriber actual, long timeout, TimeUnit unit, Scheduler.Worker worker) { this.actual = actual; this.timeout = timeout; this.unit = unit; this.worker = worker; - this.other = other; - this.arbiter = new FullArbiter(actual, this, 8); + this.task = new SequentialDisposable(); + this.upstream = new AtomicReference(); + this.requested = new AtomicLong(); } @Override public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - if (arbiter.setSubscription(s)) { - actual.onSubscribe(arbiter); - - scheduleTimeout(0L); - } - } + SubscriptionHelper.deferredSetOnce(upstream, requested, s); } @Override public void onNext(T t) { - if (done) { + long idx = get(); + if (idx == Long.MAX_VALUE || !compareAndSet(idx, idx + 1)) { return; } - long idx = index + 1; - index = idx; - if (arbiter.onNext(t, s)) { - scheduleTimeout(idx); - } - } + task.get().dispose(); - void scheduleTimeout(final long idx) { - if (timer != null) { - timer.dispose(); - } + actual.onNext(t); - timer = worker.schedule(new TimeoutTask(idx), timeout, unit); + startTimeout(idx + 1); } - void subscribeNext() { - other.subscribe(new FullArbiterSubscriber(arbiter)); + void startTimeout(long nextIndex) { + task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit)); } @Override public void onError(Throwable t) { - if (done) { + if (getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onError(t); + + worker.dispose(); + } else { RxJavaPlugins.onError(t); - return; } - done = true; - arbiter.onError(t, s); - worker.dispose(); } @Override public void onComplete() { - if (done) { - return; + if (getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onComplete(); + + worker.dispose(); } - done = true; - arbiter.onComplete(s); - worker.dispose(); } @Override - public void dispose() { - s.cancel(); - worker.dispose(); + public void onTimeout(long idx) { + if (compareAndSet(idx, Long.MAX_VALUE)) { + SubscriptionHelper.cancel(upstream); + + actual.onError(new TimeoutException()); + + worker.dispose(); + } } @Override - public boolean isDisposed() { - return worker.isDisposed(); + public void request(long n) { + SubscriptionHelper.deferredRequest(upstream, requested, n); } - final class TimeoutTask implements Runnable { - private final long idx; + @Override + public void cancel() { + SubscriptionHelper.cancel(upstream); + worker.dispose(); + } + } - TimeoutTask(long idx) { - this.idx = idx; - } - @Override - public void run() { - if (idx == index) { - done = true; - s.cancel(); - worker.dispose(); + static final class TimeoutTask implements Runnable { - subscribeNext(); + final TimeoutSupport parent; - } - } + final long idx; + + TimeoutTask(long idx, TimeoutSupport parent) { + this.idx = idx; + this.parent = parent; + } + + @Override + public void run() { + parent.onTimeout(idx); } } - static final class TimeoutTimedSubscriber implements FlowableSubscriber, Disposable, Subscription { + static final class TimeoutFallbackSubscriber extends SubscriptionArbiter + implements FlowableSubscriber, TimeoutSupport { + + private static final long serialVersionUID = 3764492702657003550L; + final Subscriber actual; + final long timeout; + final TimeUnit unit; + final Scheduler.Worker worker; - Subscription s; + final SequentialDisposable task; - Disposable timer; + final AtomicReference upstream; - volatile long index; + final AtomicLong index; - volatile boolean done; + long consumed; - TimeoutTimedSubscriber(Subscriber actual, long timeout, TimeUnit unit, Worker worker) { + Publisher fallback; + + TimeoutFallbackSubscriber(Subscriber actual, long timeout, TimeUnit unit, + Scheduler.Worker worker, Publisher fallback) { this.actual = actual; this.timeout = timeout; this.unit = unit; this.worker = worker; + this.fallback = fallback; + this.task = new SequentialDisposable(); + this.upstream = new AtomicReference(); + this.index = new AtomicLong(); } @Override public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - actual.onSubscribe(this); - scheduleTimeout(0L); + if (SubscriptionHelper.setOnce(upstream, s)) { + setSubscription(s); } } @Override public void onNext(T t) { - if (done) { + long idx = index.get(); + if (idx == Long.MAX_VALUE || !index.compareAndSet(idx, idx + 1)) { return; } - long idx = index + 1; - index = idx; + + task.get().dispose(); + + consumed++; actual.onNext(t); - scheduleTimeout(idx); + startTimeout(idx + 1); } - void scheduleTimeout(final long idx) { - if (timer != null) { - timer.dispose(); - } - - timer = worker.schedule(new TimeoutTask(idx), timeout, unit); + void startTimeout(long nextIndex) { + task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit)); } @Override public void onError(Throwable t) { - if (done) { + if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onError(t); + + worker.dispose(); + } else { RxJavaPlugins.onError(t); - return; } - done = true; - - actual.onError(t); - worker.dispose(); } @Override public void onComplete() { - if (done) { - return; - } - done = true; + if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); - actual.onComplete(); - worker.dispose(); - } + actual.onComplete(); - @Override - public void dispose() { - s.cancel(); - worker.dispose(); + worker.dispose(); + } } @Override - public boolean isDisposed() { - return worker.isDisposed(); - } + public void onTimeout(long idx) { + if (index.compareAndSet(idx, Long.MAX_VALUE)) { + SubscriptionHelper.cancel(upstream); - @Override - public void request(long n) { - s.request(n); + long c = consumed; + if (c != 0L) { + produced(c); + } + + Publisher f = fallback; + fallback = null; + + f.subscribe(new FallbackSubscriber(actual, this)); + + worker.dispose(); + } } @Override public void cancel() { - dispose(); + super.cancel(); + worker.dispose(); } + } - final class TimeoutTask implements Runnable { - private final long idx; + static final class FallbackSubscriber implements FlowableSubscriber { - TimeoutTask(long idx) { - this.idx = idx; - } + final Subscriber actual; - @Override - public void run() { - if (idx == index) { - done = true; - dispose(); + final SubscriptionArbiter arbiter; - actual.onError(new TimeoutException()); - } - } + FallbackSubscriber(Subscriber actual, SubscriptionArbiter arbiter) { + this.actual = actual; + this.arbiter = arbiter; + } + + @Override + public void onSubscribe(Subscription s) { + arbiter.setSubscription(s); } - } - static final class EmptyDispose implements Disposable { @Override - public void dispose() { } + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + } @Override - public boolean isDisposed() { - return true; + public void onComplete() { + actual.onComplete(); } } + interface TimeoutSupport { + + void onTimeout(long idx); + } } diff --git a/src/main/java/io/reactivex/schedulers/TestScheduler.java b/src/main/java/io/reactivex/schedulers/TestScheduler.java index 929fa22432..b2ca366978 100644 --- a/src/main/java/io/reactivex/schedulers/TestScheduler.java +++ b/src/main/java/io/reactivex/schedulers/TestScheduler.java @@ -102,14 +102,14 @@ public void triggerActions() { } private void triggerActions(long targetTimeInNanoseconds) { - while (!queue.isEmpty()) { + for (;;) { TimedRunnable current = queue.peek(); - if (current.time > targetTimeInNanoseconds) { + if (current == null || current.time > targetTimeInNanoseconds) { break; } // if scheduled time is 0 (immediate) use current virtual time time = current.time == 0 ? time : current.time; - queue.remove(); + queue.remove(current); // Only execute if not unsubscribed if (!current.scheduler.disposed) { diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index 7ff326dfbf..72fc7289d6 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -2328,7 +2328,7 @@ public void timeoutFirstItemNull() { @Test(expected = NullPointerException.class) public void timeoutFirstItemReturnsNull() { - just1.timeout(just1, new Function>() { + just1.timeout(Flowable.never(), new Function>() { @Override public Publisher apply(Integer v) { return null; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java index 0aa0318543..296627e2fe 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java @@ -425,12 +425,6 @@ public void timedEmpty() { .assertResult(); } - @Test - public void newTimer() { - FlowableTimeoutTimed.NEW_TIMER.dispose(); - assertTrue(FlowableTimeoutTimed.NEW_TIMER.isDisposed()); - } - @Test public void badSource() { List errors = TestHelper.trackPluginErrors(); @@ -518,4 +512,88 @@ public void timedFallbackTake() { to.assertResult(1); } + @Test + public void fallbackErrors() { + Flowable.never() + .timeout(1, TimeUnit.MILLISECONDS, Flowable.error(new TestException())) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void onNextOnTimeoutRace() { + for (int i = 0; i < 1000; i++) { + final TestScheduler sch = new TestScheduler(); + + final PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.timeout(1, TimeUnit.SECONDS, sch).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sch.advanceTimeBy(1, TimeUnit.SECONDS); + } + }; + + TestHelper.race(r1, r2); + + if (ts.valueCount() != 0) { + if (ts.errorCount() != 0) { + ts.assertFailure(TimeoutException.class, 1); + } else { + ts.assertValuesOnly(1); + } + } else { + ts.assertFailure(TimeoutException.class); + } + } + } + + @Test + public void onNextOnTimeoutRaceFallback() { + for (int i = 0; i < 1000; i++) { + final TestScheduler sch = new TestScheduler(); + + final PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.timeout(1, TimeUnit.SECONDS, sch, Flowable.just(2)).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sch.advanceTimeBy(1, TimeUnit.SECONDS); + } + }; + + TestHelper.race(r1, r2); + + if (ts.isTerminated()) { + int c = ts.valueCount(); + if (c == 1) { + int v = ts.values().get(0); + assertTrue("" + v, v == 1 || v == 2); + } else { + ts.assertResult(1, 2); + } + } else { + ts.assertValuesOnly(1); + } + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java index 77a5112305..0482ff5e68 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java @@ -28,6 +28,7 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; @@ -545,4 +546,221 @@ public void selectorFallbackTake() { to.assertResult(1); } + + @Test + public void lateOnTimeoutError() { + for (int i = 0; i < 1000; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final Subscriber[] sub = { null, null }; + + final Flowable pp2 = new Flowable() { + + int count; + + @Override + protected void subscribeActual( + Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + sub[count++] = s; + } + }; + + TestSubscriber ts = pp.timeout(Functions.justFunction(pp2)).test(); + + pp.onNext(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + final Throwable ex = new TestException(); + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onError(ex); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void lateOnTimeoutFallbackRace() { + for (int i = 0; i < 1000; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final Subscriber[] sub = { null, null }; + + final Flowable pp2 = new Flowable() { + + int count; + + @Override + protected void subscribeActual( + Subscriber s) { + assertFalse(((Disposable)s).isDisposed()); + s.onSubscribe(new BooleanSubscription()); + sub[count++] = s; + } + }; + + TestSubscriber ts = pp.timeout(Functions.justFunction(pp2), Flowable.never()).test(); + + pp.onNext(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + final Throwable ex = new TestException(); + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onError(ex); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void onErrorOnTimeoutRace() { + for (int i = 0; i < 1000; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final Subscriber[] sub = { null, null }; + + final Flowable pp2 = new Flowable() { + + int count; + + @Override + protected void subscribeActual( + Subscriber s) { + assertFalse(((Disposable)s).isDisposed()); + s.onSubscribe(new BooleanSubscription()); + sub[count++] = s; + } + }; + + TestSubscriber ts = pp.timeout(Functions.justFunction(pp2)).test(); + + pp.onNext(0); + + final Throwable ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void onECompleteOnTimeoutRace() { + for (int i = 0; i < 1000; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final Subscriber[] sub = { null, null }; + + final Flowable pp2 = new Flowable() { + + int count; + + @Override + protected void subscribeActual( + Subscriber s) { + assertFalse(((Disposable)s).isDisposed()); + s.onSubscribe(new BooleanSubscription()); + sub[count++] = s; + } + }; + + TestSubscriber ts = pp.timeout(Functions.justFunction(pp2)).test(); + + pp.onNext(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } } From 2db74c145b50100dd37cf3b1df06b3b9d5c27e5e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 11 Oct 2017 14:28:36 +0200 Subject: [PATCH 2/3] Remove the now unused FullArbiter(Subscriber) --- .../subscribers/FullArbiterSubscriber.java | 57 ----- .../internal/subscriptions/FullArbiter.java | 232 ------------------ .../subscriptions/FullArbiterTest.java | 129 ---------- 3 files changed, 418 deletions(-) delete mode 100644 src/main/java/io/reactivex/internal/subscribers/FullArbiterSubscriber.java delete mode 100644 src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java delete mode 100644 src/test/java/io/reactivex/internal/subscriptions/FullArbiterTest.java diff --git a/src/main/java/io/reactivex/internal/subscribers/FullArbiterSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/FullArbiterSubscriber.java deleted file mode 100644 index e145489296..0000000000 --- a/src/main/java/io/reactivex/internal/subscribers/FullArbiterSubscriber.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.subscribers; - -import org.reactivestreams.Subscription; - -import io.reactivex.FlowableSubscriber; -import io.reactivex.internal.subscriptions.*; - -/** - * Subscriber that communicates with a FullArbiter. - * - * @param the value type - */ -public final class FullArbiterSubscriber implements FlowableSubscriber { - final FullArbiter arbiter; - - Subscription s; - - public FullArbiterSubscriber(FullArbiter arbiter) { - this.arbiter = arbiter; - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - arbiter.setSubscription(s); - } - } - - @Override - public void onNext(T t) { - arbiter.onNext(t, s); - } - - @Override - public void onError(Throwable t) { - arbiter.onError(t, s); - } - - @Override - public void onComplete() { - arbiter.onComplete(s); - } -} diff --git a/src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java b/src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java deleted file mode 100644 index 283ad2d2a9..0000000000 --- a/src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.subscriptions; - -import java.util.concurrent.atomic.*; - -import org.reactivestreams.*; - -import io.reactivex.disposables.Disposable; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.queue.SpscLinkedArrayQueue; -import io.reactivex.internal.util.*; -import io.reactivex.plugins.RxJavaPlugins; - -/** - * Performs full arbitration of Subscriber events with strict drain (i.e., old emissions of another - * subscriber are dropped). - * - * @param the value type - */ -public final class FullArbiter extends FullArbiterPad2 implements Subscription { - final Subscriber actual; - final SpscLinkedArrayQueue queue; - - long requested; - - volatile Subscription s; - static final Subscription INITIAL = new InitialSubscription(); - - - Disposable resource; - - volatile boolean cancelled; - - static final Object REQUEST = new Object(); - - public FullArbiter(Subscriber actual, Disposable resource, int capacity) { - this.actual = actual; - this.resource = resource; - this.queue = new SpscLinkedArrayQueue(capacity); - this.s = INITIAL; - } - - @Override - public void request(long n) { - if (SubscriptionHelper.validate(n)) { - BackpressureHelper.add(missedRequested, n); - queue.offer(REQUEST, REQUEST); - drain(); - } - } - - @Override - public void cancel() { - if (!cancelled) { - cancelled = true; - dispose(); - } - } - - void dispose() { - Disposable d = resource; - resource = null; - if (d != null) { - d.dispose(); - } - } - - public boolean setSubscription(Subscription s) { - if (cancelled) { - if (s != null) { - s.cancel(); - } - return false; - } - - ObjectHelper.requireNonNull(s, "s is null"); - queue.offer(this.s, NotificationLite.subscription(s)); - drain(); - return true; - } - - public boolean onNext(T value, Subscription s) { - if (cancelled) { - return false; - } - - queue.offer(s, NotificationLite.next(value)); - drain(); - return true; - } - - public void onError(Throwable value, Subscription s) { - if (cancelled) { - RxJavaPlugins.onError(value); - return; - } - queue.offer(s, NotificationLite.error(value)); - drain(); - } - - public void onComplete(Subscription s) { - queue.offer(s, NotificationLite.complete()); - drain(); - } - - void drain() { - if (wip.getAndIncrement() != 0) { - return; - } - - int missed = 1; - - final SpscLinkedArrayQueue q = queue; - final Subscriber a = actual; - - for (;;) { - - for (;;) { - - Object o = q.poll(); - if (o == null) { - break; - } - Object v = q.poll(); - - if (o == REQUEST) { - long mr = missedRequested.getAndSet(0L); - if (mr != 0L) { - requested = BackpressureHelper.addCap(requested, mr); - s.request(mr); - } - } else - if (o == s) { - if (NotificationLite.isSubscription(v)) { - Subscription next = NotificationLite.getSubscription(v); - if (!cancelled) { - s = next; - long r = requested; - if (r != 0L) { - next.request(r); - } - } else { - next.cancel(); - } - } else if (NotificationLite.isError(v)) { - q.clear(); - dispose(); - - Throwable ex = NotificationLite.getError(v); - if (!cancelled) { - cancelled = true; - a.onError(ex); - } else { - RxJavaPlugins.onError(ex); - } - } else if (NotificationLite.isComplete(v)) { - q.clear(); - dispose(); - - if (!cancelled) { - cancelled = true; - a.onComplete(); - } - } else { - long r = requested; - if (r != 0) { - a.onNext(NotificationLite.getValue(v)); - requested = r - 1; - } - } - } - } - - missed = wip.addAndGet(-missed); - if (missed == 0) { - break; - } - } - } - - static final class InitialSubscription implements Subscription { - @Override - public void request(long n) { - // deliberately no op - } - - @Override - public void cancel() { - // deliberately no op - } - } -} - -/** Pads the object header away. */ -class FullArbiterPad0 { - volatile long p1a, p2a, p3a, p4a, p5a, p6a, p7a; - volatile long p8a, p9a, p10a, p11a, p12a, p13a, p14a, p15a; -} - -/** The work-in-progress counter. */ -class FullArbiterWip extends FullArbiterPad0 { - final AtomicInteger wip = new AtomicInteger(); -} - -/** Pads the wip counter away. */ -class FullArbiterPad1 extends FullArbiterWip { - volatile long p1b, p2b, p3b, p4b, p5b, p6b, p7b; - volatile long p8b, p9b, p10b, p11b, p12b, p13b, p14b, p15b; -} - -/** The missed request counter. */ -class FullArbiterMissed extends FullArbiterPad1 { - final AtomicLong missedRequested = new AtomicLong(); -} - -/** Pads the missed request counter away. */ -class FullArbiterPad2 extends FullArbiterMissed { - volatile long p1c, p2c, p3c, p4c, p5c, p6c, p7c; - volatile long p8c, p9c, p10c, p11c, p12c, p13c, p14c, p15c; -} diff --git a/src/test/java/io/reactivex/internal/subscriptions/FullArbiterTest.java b/src/test/java/io/reactivex/internal/subscriptions/FullArbiterTest.java deleted file mode 100644 index e5dd5d20ce..0000000000 --- a/src/test/java/io/reactivex/internal/subscriptions/FullArbiterTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.subscriptions; - -import static org.junit.Assert.*; - -import java.util.List; - -import org.junit.Test; - -import io.reactivex.TestHelper; -import io.reactivex.exceptions.TestException; -import io.reactivex.internal.util.NotificationLite; -import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subscribers.TestSubscriber; - -public class FullArbiterTest { - - @Test - public void initialRequested() { - FullArbiter.INITIAL.request(-99); - } - - @Test - public void initialCancel() { - FullArbiter.INITIAL.cancel(); - } - - @Test - public void invalidDeferredRequest() { - List errors = TestHelper.trackPluginErrors(); - try { - new FullArbiter(new TestSubscriber(), null, 128).request(-99); - - TestHelper.assertError(errors, 0, IllegalArgumentException.class, "n > 0 required but it was -99"); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void setSubscriptionAfterCancel() { - FullArbiter fa = new FullArbiter(new TestSubscriber(), null, 128); - - fa.cancel(); - - BooleanSubscription bs = new BooleanSubscription(); - - assertFalse(fa.setSubscription(bs)); - - assertFalse(fa.setSubscription(null)); - } - - @Test - public void cancelAfterPoll() { - FullArbiter fa = new FullArbiter(new TestSubscriber(), null, 128); - - BooleanSubscription bs = new BooleanSubscription(); - - fa.queue.offer(fa.s, NotificationLite.subscription(bs)); - - fa.cancel(); - - fa.drain(); - - assertTrue(bs.isCancelled()); - } - - @Test - public void errorAfterCancel() { - FullArbiter fa = new FullArbiter(new TestSubscriber(), null, 128); - - BooleanSubscription bs = new BooleanSubscription(); - - fa.cancel(); - - List errors = TestHelper.trackPluginErrors(); - try { - fa.onError(new TestException(), bs); - - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void cancelAfterError() { - FullArbiter fa = new FullArbiter(new TestSubscriber(), null, 128); - - List errors = TestHelper.trackPluginErrors(); - try { - fa.queue.offer(fa.s, NotificationLite.error(new TestException())); - - fa.cancel(); - - fa.drain(); - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void offerDifferentSubscription() { - TestSubscriber ts = new TestSubscriber(); - - FullArbiter fa = new FullArbiter(ts, null, 128); - - BooleanSubscription bs = new BooleanSubscription(); - - fa.queue.offer(bs, NotificationLite.next(1)); - - fa.drain(); - - ts.assertNoValues(); - } -} From 46c39f2c6bdef240b53ae9a6e97d178935613da7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 13 Oct 2017 22:01:16 +0200 Subject: [PATCH 3/3] Don't read the volatile twice --- .../internal/operators/flowable/FlowableTimeout.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java index e5edb97783..e01fb5b525 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java @@ -350,8 +350,9 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Object t) { - if (get() != SubscriptionHelper.CANCELLED) { - get().cancel(); + Subscription upstream = get(); + if (upstream != SubscriptionHelper.CANCELLED) { + upstream.cancel(); lazySet(SubscriptionHelper.CANCELLED); parent.onTimeout(idx); }