From 440d60f1b9477bfd3d9c0a424c3cb6c2eac94b8a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 18 Oct 2016 17:16:51 +0200 Subject: [PATCH] 2.x: coverage, fixes, enhancements, cleanup 10/18-1 --- src/main/java/io/reactivex/Observable.java | 26 -- .../flowables/ConnectableFlowable.java | 12 +- .../observable/ObservableDebounceTimed.java | 17 +- .../operators/observable/ObservableJoin.java | 16 +- .../observable/ObservableObserveOn.java | 154 +++++-- .../observable/ObservableReplay.java | 181 +++----- .../observable/ObservableSampleTimed.java | 9 +- .../observable/ObservableScalarXMap.java | 15 +- .../observable/ObservableTakeLast.java | 3 - .../observable/ObservableTakeUntil.java | 38 +- .../observable/ObservableTimeout.java | 30 +- .../observable/ObservableWindowTimed.java | 21 +- .../internal/util/ConnectConsumer.java | 29 ++ .../observables/ConnectableObservable.java | 12 +- .../observable/ObservableAnyTest.java | 33 ++ .../observable/ObservableBufferTest.java | 6 + .../observable/ObservableDebounceTest.java | 46 +- .../observable/ObservableDelayTest.java | 86 +++- .../observable/ObservableElementAtTest.java | 69 ++- .../ObservableFlattenIterableTest.java | 17 +- .../observable/ObservableIntervalTest.java | 29 ++ .../observable/ObservableJoinTest.java | 147 ++++++ .../observable/ObservableObserveOnTest.java | 264 ++++++++++- .../observable/ObservableRefCountTest.java | 4 +- .../observable/ObservableReplayTest.java | 428 +++++++++++++++++- .../observable/ObservableSampleTest.java | 17 + .../observable/ObservableScalarXMapTest.java | 2 +- .../observable/ObservableScanTest.java | 42 +- .../observable/ObservableSkipWhileTest.java | 29 +- .../observable/ObservableTakeLastTest.java | 34 ++ .../observable/ObservableTakeTest.java | 23 + .../ObservableTakeUntilPredicateTest.java | 48 +- .../observable/ObservableTakeUntilTest.java | 5 + .../observable/ObservableTakeWhileTest.java | 32 +- .../ObservableTimeoutWithSelectorTest.java | 146 +++++- .../ObservableWindowWithObservableTest.java | 8 + 36 files changed, 1783 insertions(+), 295 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/util/ConnectConsumer.java create mode 100644 src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalTest.java diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 9c39713bb6..b38f5302ac 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -7300,32 +7300,6 @@ public final Observable flatMapIterable(final Function - * - *
- *
Scheduler:
- *
{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param - * the type of item emitted by the resulting ObservableSource - * @param mapper - * a function that returns an Iterable sequence of values for when given an item emitted by the - * source ObservableSource - * @param bufferSize - * the number of elements to prefetch from the current Observable - * @return an Observable that emits the results of merging the items emitted by the source ObservableSource with - * the values in the Iterables corresponding to those items, as generated by {@code collectionSelector} - * @see ReactiveX operators documentation: FlatMap - */ - @SchedulerSupport(SchedulerSupport.NONE) - public final Observable flatMapIterable(final Function> mapper, int bufferSize) { - return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), false, bufferSize); - } - /** * Maps each element of the upstream Observable into MaybeSources, subscribes to them and * waits until the upstream and all MaybeSources complete. diff --git a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java index 46fb4ecc31..ee307286d2 100644 --- a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java +++ b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java @@ -20,6 +20,7 @@ import io.reactivex.functions.Consumer; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.operators.flowable.*; +import io.reactivex.internal.util.ConnectConsumer; import io.reactivex.plugins.RxJavaPlugins; /** @@ -58,14 +59,9 @@ public abstract class ConnectableFlowable extends Flowable { * @see ReactiveX documentation: Connect */ public final Disposable connect() { - final Disposable[] connection = new Disposable[1]; - connect(new Consumer() { - @Override - public void accept(Disposable d) { - connection[0] = d; - } - }); - return connection[0]; + ConnectConsumer cc = new ConnectConsumer(); + connect(cc); + return cc.disposable; } /** diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java index 608e35a8a4..20f124e18a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java @@ -86,13 +86,12 @@ public void onNext(T t) { } DebounceEmitter de = new DebounceEmitter(t, idx, this); - if (!timer.compareAndSet(d, de)) { - return; - } + if (timer.compareAndSet(d, de)) { + d = worker.schedule(de, timeout, unit); - d = worker.schedule(de, timeout, unit); + de.setResource(d); + } - de.setResource(d); } @Override @@ -117,7 +116,9 @@ public void onComplete() { if (d != DisposableHelper.DISPOSED) { @SuppressWarnings("unchecked") DebounceEmitter de = (DebounceEmitter)d; - de.emit(); + if (de != null) { + de.run(); + } DisposableHelper.dispose(timer); worker.dispose(); actual.onComplete(); @@ -162,10 +163,6 @@ static final class DebounceEmitter extends AtomicReference implem @Override public void run() { - emit(); - } - - void emit() { if (once.compareAndSet(false, true)) { parent.emit(idx, value, this); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableJoin.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableJoin.java index 565ad42d23..3173249bf2 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableJoin.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableJoin.java @@ -129,13 +129,12 @@ static final class GroupJoinDisposable @Override public void dispose() { - if (cancelled) { - return; - } - cancelled = true; - cancelAll(); - if (getAndIncrement() == 0) { - queue.clear(); + if (!cancelled) { + cancelled = true; + cancelAll(); + if (getAndIncrement() == 0) { + queue.clear(); + } } } @@ -303,8 +302,7 @@ else if (mode == LEFT_CLOSE) { lefts.remove(end.index); disposables.remove(end); - } - else if (mode == RIGHT_CLOSE) { + } else { LeftRightEndObserver end = (LeftRightEndObserver)val; rights.remove(end.index); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java index 1e9794c5dc..fd75ddbc54 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java @@ -13,12 +13,12 @@ package io.reactivex.internal.operators.observable; -import java.util.concurrent.atomic.AtomicInteger; - import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.observers.BasicIntQueueDisposable; import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.internal.schedulers.TrampolineScheduler; import io.reactivex.plugins.RxJavaPlugins; @@ -45,24 +45,16 @@ protected void subscribeActual(Observer observer) { } } - /** - * Pads the base atomic integer used for wip counting. - */ - static class Padding0 extends AtomicInteger { - - private static final long serialVersionUID = 3172843496016154809L; - - volatile long p01, p02, p03, p04, p05, p06, p07; - volatile long p08, p09, p0A, p0B, p0C, p0D, p0E, p0F; - } - - static final class ObserveOnObserver extends Padding0 implements Observer, Disposable, Runnable { + static final class ObserveOnObserver extends BasicIntQueueDisposable + implements Observer, Runnable { private static final long serialVersionUID = 6576896619930983584L; final Observer actual; final Scheduler.Worker worker; final boolean delayError; - final SpscLinkedArrayQueue queue; + final int bufferSize; + + SimpleQueue queue; Disposable s; @@ -71,17 +63,45 @@ static final class ObserveOnObserver extends Padding0 implements Observer, volatile boolean cancelled; + int sourceMode; + + boolean outputFused; + ObserveOnObserver(Observer actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; - this.queue = new SpscLinkedArrayQueue(bufferSize); + this.bufferSize = bufferSize; } @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; + if (s instanceof QueueDisposable) { + @SuppressWarnings("unchecked") + QueueDisposable qd = (QueueDisposable) s; + + int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); + + if (m == QueueDisposable.SYNC) { + sourceMode = m; + queue = qd; + done = true; + actual.onSubscribe(this); + schedule(); + return; + } + if (m == QueueDisposable.ASYNC) { + sourceMode = m; + queue = qd; + actual.onSubscribe(this); + return; + } + } + + queue = new SpscLinkedArrayQueue(bufferSize); + actual.onSubscribe(this); } } @@ -92,10 +112,8 @@ public void onNext(T t) { return; } - if (!queue.offer(t)) { - s.dispose(); - onError(new MissingBackpressureException("Queue full?!")); - return; + if (sourceMode != QueueDisposable.ASYNC) { + queue.offer(t); } schedule(); } @@ -126,6 +144,9 @@ public void dispose() { cancelled = true; s.dispose(); worker.dispose(); + if (getAndIncrement() == 0) { + queue.clear(); + } } } @@ -140,11 +161,10 @@ void schedule() { } } - @Override - public void run() { + void drainNormal() { int missed = 1; - final SpscLinkedArrayQueue q = queue; + final SimpleQueue q = queue; final Observer a = actual; for (;;) { @@ -154,7 +174,17 @@ public void run() { for (;;) { boolean d = done; - T v = q.poll(); + T v; + + try { + v = q.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.dispose(); + q.clear(); + a.onError(ex); + return; + } boolean empty = v == null; if (checkTerminated(d, empty, a)) { @@ -175,10 +205,55 @@ public void run() { } } + void drainFused() { + int missed = 1; + + for (;;) { + if (cancelled) { + return; + } + + boolean d = done; + Throwable ex = error; + + if (!delayError && d && ex != null) { + actual.onError(error); + worker.dispose(); + return; + } + + actual.onNext(null); + + if (d) { + ex = error; + if (ex != null) { + actual.onError(ex); + } else { + actual.onComplete(); + } + worker.dispose(); + return; + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + @Override + public void run() { + if (outputFused) { + drainFused(); + } else { + drainNormal(); + } + } + boolean checkTerminated(boolean d, boolean empty, Observer a) { if (cancelled) { - s.dispose(); - worker.dispose(); + queue.clear(); return true; } if (d) { @@ -195,6 +270,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer a) { } } else { if (e != null) { + queue.clear(); a.onError(e); worker.dispose(); return true; @@ -208,5 +284,29 @@ boolean checkTerminated(boolean d, boolean empty, Observer a) { } return false; } + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + outputFused = true; + return ASYNC; + } + return NONE; + } + + @Override + public T poll() throws Exception { + return queue.poll(); + } + + @Override + public void clear() { + queue.clear(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index b186120b95..39a1a024b5 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -36,14 +36,18 @@ public final class ObservableReplay extends ConnectableObservable implemen /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ final AtomicReference> current; /** A factory that creates the appropriate buffer for the ReplayObserver. */ - final Callable> bufferFactory; + final BufferSupplier bufferFactory; final ObservableSource onSubscribe; + interface BufferSupplier { + ReplayBuffer call(); + } + @SuppressWarnings("rawtypes") - static final Callable DEFAULT_UNBOUNDED_FACTORY = new Callable() { + static final BufferSupplier DEFAULT_UNBOUNDED_FACTORY = new BufferSupplier() { @Override - public Object call() { + public ReplayBuffer call() { return new UnboundedReplayBuffer(16); } }; @@ -134,7 +138,7 @@ public static ConnectableObservable create(ObservableSource source, if (bufferSize == Integer.MAX_VALUE) { return createFrom(source); } - return create(source, new Callable>() { + return create(source, new BufferSupplier() { @Override public ReplayBuffer call() { return new SizeBoundReplayBuffer(bufferSize); @@ -168,7 +172,7 @@ public static ConnectableObservable create(ObservableSource source, */ public static ConnectableObservable create(ObservableSource source, final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) { - return create(source, new Callable>() { + return create(source, new BufferSupplier() { @Override public ReplayBuffer call() { return new SizeAndTimeBoundReplayBuffer(bufferSize, maxAge, unit, scheduler); @@ -183,7 +187,7 @@ public ReplayBuffer call() { * @return the connectable observable */ static ConnectableObservable create(ObservableSource source, - final Callable> bufferFactory) { + final BufferSupplier bufferFactory) { // the current connection to source needs to be shared between the operator and its onSubscribe call final AtomicReference> curr = new AtomicReference>(); ObservableSource onSubscribe = new ObservableSource() { @@ -197,14 +201,7 @@ public void subscribe(Observer child) { // if there isn't one if (r == null) { // create a new subscriber to source - ReplayBuffer buf; - - try { - buf = bufferFactory.call(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } + ReplayBuffer buf = bufferFactory.call(); ReplayObserver u = new ReplayObserver(buf); // let's try setting it as the current subscriber-to-source @@ -219,15 +216,20 @@ public void subscribe(Observer child) { // create the backpressure-managing producer for this child InnerDisposable inner = new InnerDisposable(r, child); - // we try to add it to the array of observers - // if it fails, no worries because we will still have its buffer - // so it is going to replay it for us - r.add(inner); // the producer has been registered with the current subscriber-to-source so // at least it will receive the next terminal event // setting the producer will trigger the first request to be considered by // the subscriber-to-source. child.onSubscribe(inner); + // we try to add it to the array of observers + // if it fails, no worries because we will still have its buffer + // so it is going to replay it for us + r.add(inner); + + if (inner.isDisposed()) { + r.remove(inner); + return; + } // replay the contents of the buffer r.buffer.replay(inner); @@ -241,7 +243,7 @@ public void subscribe(Observer child) { private ObservableReplay(ObservableSource onSubscribe, ObservableSource source, final AtomicReference> current, - final Callable> bufferFactory) { + final BufferSupplier bufferFactory) { this.onSubscribe = onSubscribe; this.source = source; this.current = current; @@ -269,14 +271,7 @@ public void connect(Consumer connection) { // if there is none yet or the current has been disposed if (ps == null || ps.isDisposed()) { // create a new subscriber-to-source - ReplayBuffer buf; - - try { - buf = bufferFactory.call(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } + ReplayBuffer buf = bufferFactory.call(); ReplayObserver u = new ReplayObserver(buf); // try setting it as the current subscriber-to-source @@ -309,6 +304,9 @@ public void connect(Consumer connection) { try { connection.accept(ps); } catch (Throwable ex) { + if (doConnect) { + ps.shouldConnect.compareAndSet(true, false); + } Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); } @@ -370,9 +368,6 @@ public void dispose() { * @return true if succeeded, false otherwise */ boolean add(InnerDisposable producer) { - if (producer == null) { - throw new NullPointerException(); - } // the state can change so we do a CAS loop to achieve atomicity for (;;) { // get the current producer array @@ -405,14 +400,15 @@ void remove(InnerDisposable producer) { for (;;) { // let's read the current observers array InnerDisposable[] c = observers.get(); + + int len = c.length; // if it is either empty or terminated, there is nothing to remove so we quit - if (c == EMPTY || c == TERMINATED) { + if (len == 0) { return; } // let's find the supplied producer in the array // although this is O(n), we don't expect too many child observers in general int j = -1; - int len = c.length; for (int i = 0; i < len; i++) { if (c[i].equals(producer)) { j = i; @@ -467,12 +463,10 @@ public void onError(Throwable e) { // no need to CAS in the terminal value if (!done) { done = true; - try { - buffer.error(e); - replay(); - } finally { - dispose(); // expectation of testIssue2191 - } + buffer.error(e); + replayFinal(); + } else { + RxJavaPlugins.onError(e); } } @Override @@ -481,12 +475,8 @@ public void onComplete() { // no need to CAS in the terminal value if (!done) { done = true; - try { - buffer.complete(); - replay(); - } finally { - dispose(); - } + buffer.complete(); + replayFinal(); } } @@ -500,13 +490,27 @@ void replay() { buffer.replay(rp); } } + + /** + * Tries to replay the buffer contents to all known observers. + */ + void replayFinal() { + @SuppressWarnings("unchecked") + InnerDisposable[] a = observers.getAndSet(TERMINATED); + for (InnerDisposable rp : a) { + buffer.replay(rp); + } + } } /** * A Disposable that manages the disposed state of a * child Observer in thread-safe manner. * @param the value type */ - static final class InnerDisposable implements Disposable { + static final class InnerDisposable + extends AtomicInteger + implements Disposable { + private static final long serialVersionUID = 2728361546769921047L; /** * The parent subscriber-to-source used to allow removing the child in case of * child dispose() call. @@ -519,10 +523,6 @@ static final class InnerDisposable implements Disposable { * Guarded by the emitter loop. */ Object index; - /** Indicates an emission state. Guarded by this. */ - boolean emitting; - /** Indicates a missed update. Guarded by this. */ - boolean missed; volatile boolean cancelled; @@ -617,15 +617,14 @@ public void complete() { @Override public void replay(InnerDisposable output) { - synchronized (output) { - if (output.emitting) { - output.missed = true; - return; - } - output.emitting = true; + if (output.getAndIncrement() != 0) { + return; } + final Observer child = output.child; + int missed = 1; + for (;;) { if (output.isDisposed()) { return; @@ -637,16 +636,7 @@ public void replay(InnerDisposable output) { while (destinationIndex < sourceIndex) { Object o = get(destinationIndex); - try { - if (NotificationLite.accept(o, child)) { - return; - } - } catch (Throwable err) { - Exceptions.throwIfFatal(err); - output.dispose(); - if (!NotificationLite.isError(o) && !NotificationLite.isComplete(o)) { - child.onError(err); - } + if (NotificationLite.accept(o, child)) { return; } if (output.isDisposed()) { @@ -656,13 +646,9 @@ public void replay(InnerDisposable output) { } output.index = destinationIndex; - - synchronized (output) { - if (!output.missed) { - output.emitting = false; - return; - } - output.missed = false; + missed = output.addAndGet(-missed); + if (missed == 0) { + break; } } } @@ -686,7 +672,7 @@ static final class Node extends AtomicReference { * * @param the value type */ - static class BoundedReplayBuffer extends AtomicReference implements ReplayBuffer { + abstract static class BoundedReplayBuffer extends AtomicReference implements ReplayBuffer { private static final long serialVersionUID = 2346567790059478686L; @@ -714,9 +700,6 @@ final void addLast(Node n) { final void removeFirst() { Node head = get(); Node next = head.get(); - if (next == null) { - throw new IllegalStateException("Empty list!"); - } size--; // can't just move the head because it would retain the very first value // can't null out the head's value because of late replayers would see null @@ -766,18 +749,13 @@ public final void complete() { @Override public final void replay(InnerDisposable output) { - synchronized (output) { - if (output.emitting) { - output.missed = true; - return; - } - output.emitting = true; + if (output.getAndIncrement() != 0) { + return; } - for (;;) { - if (output.isDisposed()) { - return; - } + int missed = 1; + + for (;;) { Node node = output.index(); if (node == null) { node = get(); @@ -785,40 +763,28 @@ public final void replay(InnerDisposable output) { } for (;;) { + if (output.isDisposed()) { + return; + } + Node v = node.get(); if (v != null) { Object o = leaveTransform(v.value); - try { - if (NotificationLite.accept(o, output.child)) { - output.index = null; - return; - } - } catch (Throwable err) { - Exceptions.throwIfFatal(err); + if (NotificationLite.accept(o, output.child)) { output.index = null; - output.dispose(); - if (!NotificationLite.isError(o) && !NotificationLite.isComplete(o)) { - output.child.onError(err); - } return; } node = v; } else { break; } - if (output.isDisposed()) { - return; - } } output.index = node; - synchronized (output) { - if (!output.missed) { - output.emitting = false; - return; - } - output.missed = false; + missed = output.addAndGet(-missed); + if (missed == 0) { + break; } } @@ -846,9 +812,8 @@ Object leaveTransform(Object value) { * Override this method to truncate a non-terminated buffer * based on its current properties. */ - void truncate() { + abstract void truncate(); - } /** * Override this method to truncate a terminated buffer * based on its properties (i.e., truncate but the very last node). diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleTimed.java index c41c206ff9..a5312ece30 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSampleTimed.java @@ -65,12 +65,9 @@ public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; actual.onSubscribe(this); - if (timer.get() == null) { - Disposable d = scheduler.schedulePeriodicallyDirect(this, period, period, unit); - if (!timer.compareAndSet(null, d)) { - d.dispose(); - } - } + + Disposable d = scheduler.schedulePeriodicallyDirect(this, period, period, unit); + DisposableHelper.replace(timer, d); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java index 23ad6c9c1c..af23911759 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java @@ -183,8 +183,9 @@ public static final class ScalarDisposable final T value; static final int START = 0; - static final int ON_NEXT = 1; - static final int ON_COMPLETE = 2; + static final int FUSED = 1; + static final int ON_NEXT = 2; + static final int ON_COMPLETE = 3; public ScalarDisposable(Observer observer, T value) { this.observer = observer; @@ -203,7 +204,7 @@ public boolean offer(T v1, T v2) { @Override public T poll() throws Exception { - if (get() == START) { + if (get() == FUSED) { lazySet(ON_COMPLETE); return value; } @@ -212,7 +213,7 @@ public T poll() throws Exception { @Override public boolean isEmpty() { - return get() != START; + return get() != FUSED; } @Override @@ -232,7 +233,11 @@ public boolean isDisposed() { @Override public int requestFusion(int mode) { - return mode & SYNC; + if ((mode & SYNC) != 0) { + lazySet(FUSED); + return SYNC; + } + return NONE; } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLast.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLast.java index db21b4d79e..73bc34b28e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLast.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLast.java @@ -70,9 +70,6 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (cancelled) { - return; - } Observer a = actual; for (;;) { if (cancelled) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java index 0a4c4556cb..cddf66308c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeUntil.java @@ -34,6 +34,8 @@ public void subscribeActual(Observer child) { final TakeUntilObserver tus = new TakeUntilObserver(serial, frc); + child.onSubscribe(frc); + other.subscribe(new Observer() { @Override public void onSubscribe(Disposable s) { @@ -42,36 +44,24 @@ public void onSubscribe(Disposable s) { @Override public void onNext(U t) { frc.dispose(); - if (tus.compareAndSet(false, true)) { - EmptyDisposable.complete(serial); - } else { - serial.onComplete(); - } + serial.onComplete(); } @Override public void onError(Throwable t) { frc.dispose(); - if (tus.compareAndSet(false, true)) { - EmptyDisposable.error(t, serial); - } else { - serial.onError(t); - } + serial.onError(t); } @Override public void onComplete() { frc.dispose(); - if (tus.compareAndSet(false, true)) { - EmptyDisposable.complete(serial); - } else { - serial.onComplete(); - } + serial.onComplete(); } }); source.subscribe(tus); } - static final class TakeUntilObserver extends AtomicBoolean implements Observer, Disposable { + static final class TakeUntilObserver extends AtomicBoolean implements Observer { private static final long serialVersionUID = 3451719290311127173L; final Observer actual; @@ -88,11 +78,7 @@ static final class TakeUntilObserver extends AtomicBoolean implements Observe public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; - if (frc.setResource(0, s)) { - if (compareAndSet(false, true)) { - actual.onSubscribe(this); - } - } + frc.setResource(0, s); } } @@ -112,15 +98,5 @@ public void onComplete() { frc.dispose(); actual.onComplete(); } - - @Override - public void dispose() { - frc.dispose(); - } - - @Override - public boolean isDisposed() { - return frc.isDisposed(); - } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java index 50bf37217f..aac51d154c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java @@ -129,13 +129,13 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - dispose(); + DisposableHelper.dispose(this); actual.onError(t); } @Override public void onComplete() { - dispose(); + DisposableHelper.dispose(this); actual.onComplete(); } @@ -158,12 +158,18 @@ public void timeout(long idx) { actual.onError(new TimeoutException()); } } + + @Override + public void innerError(Throwable e) { + s.dispose(); + actual.onError(e); + } } interface OnTimeout { void timeout(long index); - void onError(Throwable e); + void innerError(Throwable e); } static final class TimeoutInnerObserver extends DisposableObserver { @@ -189,7 +195,12 @@ public void onNext(Object t) { @Override public void onError(Throwable t) { - parent.onError(t); + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + parent.innerError(t); } @Override @@ -234,9 +245,8 @@ public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; - if (!arbiter.setDisposable(s)) { - return; - } + arbiter.setDisposable(s); + Observer a = actual; ObservableSource p = firstTimeoutIndicator; @@ -328,5 +338,11 @@ public void timeout(long idx) { other.subscribe(new FullArbiterObserver(arbiter)); } } + + @Override + public void innerError(Throwable e) { + s.dispose(); + actual.onError(e); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java index 43037dd161..8b4c8a8932 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java @@ -13,7 +13,6 @@ package io.reactivex.internal.operators.observable; -import java.nio.channels.CancelledKeyException; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -86,8 +85,6 @@ static final class WindowExactUnboundedObserver Disposable s; - boolean selfCancel; - UnicastSubject window; final AtomicReference timer = new AtomicReference(); @@ -119,10 +116,7 @@ public void onSubscribe(Disposable s) { if (!cancelled) { Disposable d = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit); - if (!timer.compareAndSet(null, d)) { - d.dispose(); - } - + DisposableHelper.replace(timer, d); } } } @@ -180,17 +174,11 @@ public boolean isDisposed() { } void disposeTimer() { - selfCancel = true; DisposableHelper.dispose(timer); } @Override public void run() { - - if (selfCancel) { - throw new CancelledKeyException(); - } - if (cancelled) { terminated = true; disposeTimer(); @@ -199,7 +187,6 @@ public void run() { if (enter()) { drainLoop(); } - } void drainLoop() { @@ -283,8 +270,6 @@ static final class WindowExactBoundedObserver final boolean restartTimerOnMaxSize; final long maxSize; - boolean selfCancel; - long count; long producerIndex; @@ -427,7 +412,6 @@ public boolean isDisposed() { } void disposeTimer() { - selfCancel = true; DisposableHelper.dispose(timer); } @@ -543,9 +527,6 @@ static final class ConsumerIndexHolder implements Runnable { @Override public void run() { WindowExactBoundedObserver p = parent; - if (p.selfCancel) { - throw new CancelledKeyException(); - } if (!p.cancelled) { p.queue.offer(this); diff --git a/src/main/java/io/reactivex/internal/util/ConnectConsumer.java b/src/main/java/io/reactivex/internal/util/ConnectConsumer.java new file mode 100644 index 0000000000..c67f7315f8 --- /dev/null +++ b/src/main/java/io/reactivex/internal/util/ConnectConsumer.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Consumer; + +/** + * Store the Disposable received from the connection. + */ +public final class ConnectConsumer implements Consumer { + public Disposable disposable; + + @Override + public void accept(Disposable t) throws Exception { + this.disposable = t; + } +} diff --git a/src/main/java/io/reactivex/observables/ConnectableObservable.java b/src/main/java/io/reactivex/observables/ConnectableObservable.java index f04abe556b..2702a440c7 100644 --- a/src/main/java/io/reactivex/observables/ConnectableObservable.java +++ b/src/main/java/io/reactivex/observables/ConnectableObservable.java @@ -20,6 +20,7 @@ import io.reactivex.functions.Consumer; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.operators.observable.*; +import io.reactivex.internal.util.ConnectConsumer; import io.reactivex.plugins.RxJavaPlugins; /** @@ -58,14 +59,9 @@ public abstract class ConnectableObservable extends Observable { * @see ReactiveX documentation: Connect */ public final Disposable connect() { - final Disposable[] connection = new Disposable[1]; - connect(new Consumer() { - @Override - public void accept(Disposable d) { - connection[0] = d; - } - }); - return connection[0]; + ConnectConsumer cc = new ConnectConsumer(); + connect(cc); + return cc.disposable; } /** diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableAnyTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableAnyTest.java index ba138077c5..47123cef73 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableAnyTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableAnyTest.java @@ -488,6 +488,8 @@ public boolean test(String v) { @Test public void dispose() { TestHelper.checkDisposed(Observable.just(1).any(Functions.alwaysTrue()).toObservable()); + + TestHelper.checkDisposed(Observable.just(1).any(Functions.alwaysTrue())); } @Test @@ -498,6 +500,12 @@ public ObservableSource apply(Observable o) throws Exception { return o.any(Functions.alwaysTrue()).toObservable(); } }); + TestHelper.checkDoubleOnSubscribeObservableToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Observable o) throws Exception { + return o.any(Functions.alwaysTrue()); + } + }); } @Test @@ -530,4 +538,29 @@ public boolean test(Integer v) throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void badSourceSingle() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException("First")); + + observer.onNext(1); + observer.onError(new TestException("Second")); + observer.onComplete(); + } + } + .any(Functions.alwaysTrue()) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index c1dc919a2a..aca32b4d0c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -1221,6 +1221,12 @@ public void dispose() { TestHelper.checkDisposed(Observable.range(1, 5).buffer(2, 1)); TestHelper.checkDisposed(Observable.range(1, 5).buffer(1, 2)); + + TestHelper.checkDisposed(PublishSubject.create().buffer(Observable.never())); + + TestHelper.checkDisposed(PublishSubject.create().buffer(Functions.justCallable(Observable.never()))); + + TestHelper.checkDisposed(PublishSubject.create().buffer(Observable.never(), Functions.justFunction(Observable.never()))); } @SuppressWarnings("unchecked") diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDebounceTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDebounceTest.java index 536d16b148..a8092651f3 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDebounceTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDebounceTest.java @@ -13,18 +13,24 @@ package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.*; import org.mockito.InOrder; import io.reactivex.*; -import io.reactivex.disposables.Disposables; +import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subjects.PublishSubject; @@ -298,4 +304,42 @@ public void debounceDefault() throws Exception { .awaitDone(5, TimeUnit.SECONDS) .assertResult(1); } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().debounce(1, TimeUnit.SECONDS, new TestScheduler())); + + TestHelper.checkDisposed(PublishSubject.create().debounce(Functions.justFunction(Observable.never()))); + + Disposable d = new ObservableDebounceTimed.DebounceEmitter(1, 1, null); + assertFalse(d.isDisposed()); + + d.dispose(); + + assertTrue(d.isDisposed()); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onNext(1); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .debounce(1, TimeUnit.SECONDS, new TestScheduler()) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDelayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDelayTest.java index eadd8dd971..e3043c1393 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDelayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDelayTest.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.observable; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.junit.Assert.assertNotEquals; import static org.mockito.Mockito.*; @@ -29,7 +29,8 @@ import io.reactivex.Observer; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; -import io.reactivex.observers.TestObserver; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.*; import io.reactivex.schedulers.*; import io.reactivex.subjects.PublishSubject; @@ -883,4 +884,85 @@ public void accept(Throwable throwable) throws Exception { assertNotEquals(Thread.currentThread(), thread.get()); } + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().delay(1, TimeUnit.SECONDS)); + + TestHelper.checkDisposed(PublishSubject.create().delay(Functions.justFunction(Observable.never()))); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.delay(1, TimeUnit.SECONDS); + } + }); + + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.delay(Functions.justFunction(Observable.never())); + } + }); + } + + @Test + public void onCompleteFinal() { + TestScheduler scheduler = new TestScheduler(); + + Observable.empty() + .delay(1, TimeUnit.MILLISECONDS, scheduler) + .subscribe(new DisposableObserver() { + @Override + public void onNext(Object value) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + throw new TestException(); + } + }); + + try { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + } + + @Test + public void onErrorFinal() { + TestScheduler scheduler = new TestScheduler(); + + Observable.error(new TestException()) + .delay(1, TimeUnit.MILLISECONDS, scheduler) + .subscribe(new DisposableObserver() { + @Override + public void onNext(Object value) { + } + + @Override + public void onError(Throwable e) { + throw new TestException(); + } + + @Override + public void onComplete() { + } + }); + + try { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableElementAtTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableElementAtTest.java index 0f3e21ddee..cede83d5e1 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableElementAtTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableElementAtTest.java @@ -177,7 +177,10 @@ public void elementAtOrErrorIndex1OnEmptySource() { @Test public void dispose() { TestHelper.checkDisposed(PublishSubject.create().elementAt(0).toObservable()); + TestHelper.checkDisposed(PublishSubject.create().elementAt(0)); + TestHelper.checkDisposed(PublishSubject.create().elementAt(0, 1).toObservable()); + TestHelper.checkDisposed(PublishSubject.create().elementAt(0, 1)); } @Test @@ -188,6 +191,20 @@ public ObservableSource apply(Observable o) throws Exception { return o.elementAt(0).toObservable(); } }); + + TestHelper.checkDoubleOnSubscribeObservableToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Observable o) throws Exception { + return o.elementAt(0); + } + }); + + TestHelper.checkDoubleOnSubscribeObservableToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Observable o) throws Exception { + return o.elementAt(0, 1); + } + }); } @Test @@ -209,7 +226,7 @@ public void errorObservable() { } @Test - public void badSource() { + public void badSourceObservable() { List errors = TestHelper.trackPluginErrors(); try { new Observable() { @@ -233,4 +250,54 @@ protected void subscribeActual(Observer observer) { RxJavaPlugins.reset(); } } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + + observer.onNext(1); + observer.onNext(2); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .elementAt(0) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badSource2() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + + observer.onNext(1); + observer.onNext(2); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .elementAt(0, 1) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterableTest.java index f0fcdb2933..7ef1bbbe58 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterableTest.java @@ -17,23 +17,20 @@ import org.junit.Test; -import io.reactivex.Observable; +import io.reactivex.TestHelper; import io.reactivex.functions.Function; +import io.reactivex.subjects.PublishSubject; public class ObservableFlattenIterableTest { @Test - public void flatMapIterablePrefetch() { - - Observable.just(1, 2) - .flatMapIterable(new Function>() { + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().flatMapIterable(new Function>() { @Override - public Iterable apply(Integer t) throws Exception { - return Arrays.asList(t * 10); + public Iterable apply(Object v) throws Exception { + return Arrays.asList(10, 20); } - }, 1) - .test() - .assertResult(10, 20); + })); } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalTest.java new file mode 100644 index 0000000000..c6aaa638d7 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableIntervalTest.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.schedulers.TestScheduler; + +public class ObservableIntervalTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.interval(1, TimeUnit.MILLISECONDS, new TestScheduler())); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableJoinTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableJoinTest.java index 752b1cc9a6..6850a72044 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableJoinTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableJoinTest.java @@ -15,13 +15,21 @@ */ package io.reactivex.internal.operators.observable; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.util.List; + import org.junit.*; import org.mockito.MockitoAnnotations; import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.PublishSubject; public class ObservableJoinTest { @@ -298,4 +306,143 @@ public Integer apply(Integer t1, Integer t2) { verify(observer, never()).onComplete(); verify(observer, never()).onNext(any()); } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().join(Observable.just(1), + Functions.justFunction(Observable.never()), + Functions.justFunction(Observable.never()), new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + })); + } + + @Test + public void take() { + Observable.just(1).join( + Observable.just(2), + Functions.justFunction(Observable.never()), + Functions.justFunction(Observable.never()), + new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .take(1) + .test() + .assertResult(3); + } + + @Test + public void rightClose() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.join(Observable.just(2), + Functions.justFunction(Observable.never()), + Functions.justFunction(Observable.empty()), + new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertEmpty(); + + ps.onNext(1); + + to.assertEmpty(); + } + + @Test + public void resultSelectorThrows2() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.join( + Observable.just(2), + Functions.justFunction(Observable.never()), + Functions.justFunction(Observable.never()), + new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + throw new TestException(); + } + }) + .test(); + + ps.onNext(1); + ps.onComplete(); + + to.assertFailure(TestException.class); + } + + @Test + public void badOuterSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException("First")); + observer.onError(new TestException("Second")); + } + } + .join(Observable.just(2), + Functions.justFunction(Observable.never()), + Functions.justFunction(Observable.never()), + new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badEndSource() { + List errors = TestHelper.trackPluginErrors(); + try { + @SuppressWarnings("rawtypes") + final Observer[] o = { null }; + + TestObserver to = Observable.just(1) + .join(Observable.just(2), + Functions.justFunction(Observable.never()), + Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + o[0] = observer; + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException("First")); + } + }), + new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test(); + + o[0].onError(new TestException("Second")); + + to + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java index 7b7515899e..3ba4b27100 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java @@ -14,9 +14,10 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import java.util.Iterator; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -24,11 +25,18 @@ import org.mockito.InOrder; import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.operators.observable.ObservableObserveOn.ObserveOnObserver; import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; +import io.reactivex.subjects.*; public class ObservableObserveOnTest { @@ -455,4 +463,258 @@ public void accept(Integer v) throws Exception { .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TestException.class, 1, 2, 3, 4, 5); } + + @Test + public void trampolineScheduler() { + Observable.just(1) + .observeOn(Schedulers.trampoline()) + .test() + .assertResult(1); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().observeOn(new TestScheduler())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.observeOn(new TestScheduler()); + } + }); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + TestScheduler scheduler = new TestScheduler(); + TestObserver to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onNext(1); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .observeOn(scheduler) + .test(); + + scheduler.triggerActions(); + + to.assertResult(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void inputSyncFused() { + Observable.range(1, 5) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void inputAsyncFused() { + UnicastSubject us = UnicastSubject.create(); + + TestObserver to = us.observeOn(Schedulers.single()).test(); + + TestHelper.emit(us, 1, 2, 3, 4, 5); + + to + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void inputAsyncFusedError() { + UnicastSubject us = UnicastSubject.create(); + + TestObserver to = us.observeOn(Schedulers.single()).test(); + + us.onError(new TestException()); + + to + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void inputAsyncFusedErrorDelayed() { + UnicastSubject us = UnicastSubject.create(); + + TestObserver to = us.observeOn(Schedulers.single(), true).test(); + + us.onError(new TestException()); + + to + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void outputFused() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + + Observable.range(1, 5).hide() + .observeOn(Schedulers.single()) + .subscribe(to); + + ObserverFusion.assertFusion(to, QueueDisposable.ASYNC) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void outputFusedReject() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.SYNC); + + Observable.range(1, 5).hide() + .observeOn(Schedulers.single()) + .subscribe(to); + + ObserverFusion.assertFusion(to, QueueDisposable.NONE) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void inputOutputAsyncFusedError() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + + UnicastSubject us = UnicastSubject.create(); + + us.observeOn(Schedulers.single()) + .subscribe(to); + + us.onError(new TestException()); + + to + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + + ObserverFusion.assertFusion(to, QueueDisposable.ASYNC) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void inputOutputAsyncFusedErrorDelayed() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + + UnicastSubject us = UnicastSubject.create(); + + us.observeOn(Schedulers.single(), true) + .subscribe(to); + + us.onError(new TestException()); + + to + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + + ObserverFusion.assertFusion(to, QueueDisposable.ASYNC) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void outputFusedCancelReentrant() throws Exception { + final UnicastSubject us = UnicastSubject.create(); + + final CountDownLatch cdl = new CountDownLatch(1); + + us.observeOn(Schedulers.single()) + .subscribe(new Observer() { + Disposable d; + int count; + @Override + public void onSubscribe(Disposable d) { + this.d = d; + ((QueueDisposable)d).requestFusion(QueueDisposable.ANY); + } + + @Override + public void onNext(Integer value) { + if (++count == 1) { + us.onNext(2); + d.dispose(); + cdl.countDown(); + } + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + }); + + us.onNext(1); + + cdl.await(); + } + + @Test + public void nonFusedPollThrows() { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + + @SuppressWarnings("unchecked") + ObserveOnObserver oo = (ObserveOnObserver)observer; + + oo.queue = new SimpleQueue() { + + @Override + public boolean offer(Integer value) { + return false; + } + + @Override + public boolean offer(Integer v1, Integer v2) { + return false; + } + + @Override + public Integer poll() throws Exception { + throw new TestException(); + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public void clear() { + } + }; + + oo.clear(); + + oo.schedule(); + } + } + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index e92af3b2bb..9121209eff 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -40,7 +40,7 @@ public class ObservableRefCountTest { public void testRefCountAsync() { final AtomicInteger subscribeCount = new AtomicInteger(); final AtomicInteger nextCount = new AtomicInteger(); - Observable r = Observable.interval(0, 5, TimeUnit.MILLISECONDS) + Observable r = Observable.interval(0, 25, TimeUnit.MILLISECONDS) .doOnSubscribe(new Consumer() { @Override public void accept(Disposable s) { @@ -67,7 +67,7 @@ public void accept(Long l) { // give time to emit try { - Thread.sleep(52); + Thread.sleep(260); } catch (InterruptedException e) { } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 711257be14..3a9b09ac52 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; @@ -31,9 +32,11 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.HasUpstreamObservableSource; import io.reactivex.internal.operators.observable.ObservableReplay.*; import io.reactivex.observables.ConnectableObservable; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; import io.reactivex.subjects.PublishSubject; @@ -541,7 +544,7 @@ public void testIssue2191_UnsubscribeSource() throws Exception { verifyObserverMock(spiedSubscriberBeforeConnect, 2, 4); verifyObserverMock(spiedSubscriberAfterConnect, 2, 4); - verify(sourceUnsubscribed, times(1)).run(); +// verify(sourceUnsubscribed, times(1)).run(); verifyNoMoreInteractions(sourceNext); verifyNoMoreInteractions(sourceCompleted); @@ -595,7 +598,7 @@ public void testIssue2191_SchedulerUnsubscribe() throws Exception { // FIXME not supported // verify(spiedWorker, times(1)).isUnsubscribed(); // FIXME publish calls cancel too - verify(sourceUnsubscribed, times(1)).run(); +// verify(sourceUnsubscribed, times(1)).run(); verifyNoMoreInteractions(sourceNext); verifyNoMoreInteractions(sourceCompleted); @@ -651,7 +654,7 @@ public void testIssue2191_SchedulerUnsubscribeOnError() throws Exception { // FIXME no longer supported // verify(spiedWorker, times(1)).isUnsubscribed(); // FIXME publish also calls cancel - verify(sourceUnsubscribed, times(1)).run(); +// verify(sourceUnsubscribed, times(1)).run(); verifyNoMoreInteractions(sourceNext); verifyNoMoreInteractions(sourceCompleted); @@ -711,7 +714,13 @@ public boolean isDisposed() { @Test public void testBoundedReplayBuffer() { - BoundedReplayBuffer buf = new BoundedReplayBuffer(); + BoundedReplayBuffer buf = new BoundedReplayBuffer() { + private static final long serialVersionUID = -5182053207244406872L; + + @Override + void truncate() { + } + }; buf.addLast(new Node(1)); buf.addLast(new Node(2)); buf.addLast(new Node(3)); @@ -764,6 +773,7 @@ public void testTimedAndSizedTruncation() { values.clear(); buf.collect(values); Assert.assertEquals(Arrays.asList(5), values); + Assert.assertFalse(buf.hasCompleted()); test.advanceTimeBy(2, TimeUnit.SECONDS); buf.complete(); @@ -774,6 +784,85 @@ public void testTimedAndSizedTruncation() { Assert.assertEquals(1, buf.size); Assert.assertTrue(buf.hasCompleted()); + Assert.assertFalse(buf.hasError()); + } + + @Test + public void testTimedAndSizedTruncationError() { + TestScheduler test = new TestScheduler(); + SizeAndTimeBoundReplayBuffer buf = new SizeAndTimeBoundReplayBuffer(2, 2000, TimeUnit.MILLISECONDS, test); + + Assert.assertFalse(buf.hasCompleted()); + Assert.assertFalse(buf.hasError()); + + List values = new ArrayList(); + + buf.next(1); + test.advanceTimeBy(1, TimeUnit.SECONDS); + buf.next(2); + test.advanceTimeBy(1, TimeUnit.SECONDS); + buf.collect(values); + Assert.assertEquals(Arrays.asList(1, 2), values); + + buf.next(3); + buf.next(4); + values.clear(); + buf.collect(values); + Assert.assertEquals(Arrays.asList(3, 4), values); + + test.advanceTimeBy(2, TimeUnit.SECONDS); + buf.next(5); + + values.clear(); + buf.collect(values); + Assert.assertEquals(Arrays.asList(5), values); + Assert.assertFalse(buf.hasCompleted()); + Assert.assertFalse(buf.hasError()); + + test.advanceTimeBy(2, TimeUnit.SECONDS); + buf.error(new TestException()); + + values.clear(); + buf.collect(values); + Assert.assertTrue(values.isEmpty()); + + Assert.assertEquals(1, buf.size); + Assert.assertFalse(buf.hasCompleted()); + Assert.assertTrue(buf.hasError()); + } + + @Test + public void testSizedTruncation() { + SizeBoundReplayBuffer buf = new SizeBoundReplayBuffer(2); + List values = new ArrayList(); + + buf.next(1); + buf.next(2); + buf.collect(values); + Assert.assertEquals(Arrays.asList(1, 2), values); + + buf.next(3); + buf.next(4); + values.clear(); + buf.collect(values); + Assert.assertEquals(Arrays.asList(3, 4), values); + + buf.next(5); + + values.clear(); + buf.collect(values); + Assert.assertEquals(Arrays.asList(4, 5), values); + Assert.assertFalse(buf.hasCompleted()); + + buf.complete(); + + values.clear(); + buf.collect(values); + Assert.assertEquals(Arrays.asList(4, 5), values); + + Assert.assertEquals(3, buf.size); + Assert.assertTrue(buf.hasCompleted()); + Assert.assertFalse(buf.hasError()); } @Test @@ -977,6 +1066,7 @@ public void testValuesAndThenError() { } @Test + @Ignore("onNext should not throw") public void unsafeChildThrows() { final AtomicInteger count = new AtomicInteger(); @@ -1067,4 +1157,334 @@ public void replaySelectorTime() { .assertResult(1); } + @Test + public void replayMaxInt() { + Observable.range(1, 2) + .replay(Integer.MAX_VALUE) + .autoConnect() + .test() + .assertResult(1, 2); + } + + @Test + public void source() { + Observable source = Observable.range(1, 3); + + assertSame(source, (((HasUpstreamObservableSource)source.replay())).source()); + } + + @Test + public void connectRace() { + for (int i = 0; i < 500; i++) { + final ConnectableObservable co = Observable.range(1, 3).replay(); + + Runnable r = new Runnable() { + @Override + public void run() { + co.connect(); + } + }; + + TestHelper.race(r, r); + } + } + + @Test + public void subscribeRace() { + for (int i = 0; i < 500; i++) { + final ConnectableObservable co = Observable.range(1, 3).replay(); + + final TestObserver to1 = new TestObserver(); + final TestObserver to2 = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + co.subscribe(to1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + co.subscribe(to2); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void addRemoveRace() { + for (int i = 0; i < 500; i++) { + final ConnectableObservable co = Observable.range(1, 3).replay(); + + final TestObserver to1 = new TestObserver(); + final TestObserver to2 = new TestObserver(); + + co.subscribe(to1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to1.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + co.subscribe(to2); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void cancelOnArrival() { + Observable.range(1, 2) + .replay(Integer.MAX_VALUE) + .autoConnect() + .test(true) + .assertEmpty(); + } + + @Test + public void cancelOnArrival2() { + ConnectableObservable co = PublishSubject.create() + .replay(Integer.MAX_VALUE); + + co.test(); + + co + .autoConnect() + .test(true) + .assertEmpty(); + } + + @Test + public void connectConsumerThrows() { + ConnectableObservable co = Observable.range(1, 2) + .replay(); + + try { + co.connect(new Consumer() { + @Override + public void accept(Disposable t) throws Exception { + throw new TestException(); + } + }); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + + co.test().assertEmpty().cancel(); + + co.connect(); + + co.test().assertResult(1, 2); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException("First")); + observer.onNext(1); + observer.onError(new TestException("Second")); + observer.onComplete(); + } + }.replay() + .autoConnect() + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void subscribeOnNextRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + final ConnectableObservable co = ps.replay(); + + final TestObserver to1 = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + co.subscribe(to1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int j = 0; j < 1000; j++) { + ps.onNext(j); + } + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void unsubscribeOnNextRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + final ConnectableObservable co = ps.replay(); + + final TestObserver to1 = new TestObserver(); + + co.subscribe(to1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to1.dispose(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int j = 0; j < 1000; j++) { + ps.onNext(j); + } + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void unsubscribeReplayRace() { + for (int i = 0; i < 500; i++) { + final ConnectableObservable co = Observable.range(1, 1000).replay(); + + final TestObserver to1 = new TestObserver(); + + co.connect(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + co.subscribe(to1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to1.dispose(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void reentrantOnNext() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + if (t == 1) { + ps.onNext(2); + ps.onComplete(); + } + super.onNext(t); + } + }; + + ps.replay().autoConnect().subscribe(to); + + ps.onNext(1); + + to.assertResult(1, 2); + } + + @Test + public void reentrantOnNextBound() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + if (t == 1) { + ps.onNext(2); + ps.onComplete(); + } + super.onNext(t); + } + }; + + ps.replay(10).autoConnect().subscribe(to); + + ps.onNext(1); + + to.assertResult(1, 2); + } + + @Test + public void reentrantOnNextCancel() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + if (t == 1) { + ps.onNext(2); + cancel(); + } + super.onNext(t); + } + }; + + ps.replay().autoConnect().subscribe(to); + + ps.onNext(1); + + to.assertValues(1); + } + + @Test + public void reentrantOnNextCancelBounded() { + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + if (t == 1) { + ps.onNext(2); + cancel(); + } + super.onNext(t); + } + }; + + ps.replay(10).autoConnect().subscribe(to); + + ps.onNext(1); + + to.assertValues(1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSampleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSampleTest.java index 4ea4ddfa45..2c791c759e 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSampleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSampleTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; @@ -22,6 +23,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subjects.PublishSubject; @@ -274,4 +276,19 @@ public void subscribe(Observer observer) { o.throttleLast(1, TimeUnit.MILLISECONDS).subscribe().dispose(); verify(s).dispose(); } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().sample(1, TimeUnit.SECONDS, new TestScheduler())); + + TestHelper.checkDisposed(PublishSubject.create().sample(Observable.never())); + } + + @Test + public void error() { + Observable.error(new TestException()) + .sample(1, TimeUnit.SECONDS) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableScalarXMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableScalarXMapTest.java index 8dc3eaf582..76d3f8ab86 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableScalarXMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableScalarXMapTest.java @@ -187,7 +187,7 @@ public void scalarDisposableStateCheck() { assertFalse(sd.isDisposed()); - assertFalse(sd.isEmpty()); + assertTrue(sd.isEmpty()); sd.run(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableScanTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableScanTest.java index d99bd568cc..1e5e6f0796 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableScanTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableScanTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; @@ -22,9 +23,10 @@ import org.junit.Test; +import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; -import io.reactivex.TestHelper; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.observers.*; import io.reactivex.subjects.PublishSubject; @@ -226,4 +228,42 @@ public Integer apply(Integer t1, Integer t2) { ts.assertNotComplete(); ts.assertValue(0); } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().scan(new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.scan(new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }); + } + }); + } + + @Test + public void error() { + Observable.error(new TestException()) + .scan(new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }) + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipWhileTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipWhileTest.java index ffc67f914d..7b9f2fcdfe 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipWhileTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSkipWhileTest.java @@ -13,13 +13,17 @@ package io.reactivex.internal.operators.observable; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import org.junit.Test; import org.mockito.InOrder; import io.reactivex.*; -import io.reactivex.functions.Predicate; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.subjects.PublishSubject; public class ObservableSkipWhileTest { @@ -128,4 +132,27 @@ public void testSkipManySubscribers() { verify(o, never()).onError(any(Throwable.class)); } } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().skipWhile(Functions.alwaysFalse())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.skipWhile(Functions.alwaysFalse()); + } + }); + } + + @Test + public void error() { + Observable.error(new TestException()) + .skipWhile(Functions.alwaysFalse()) + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastTest.java index 42549028e8..0f2ab38fd6 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.concurrent.atomic.AtomicInteger; @@ -22,6 +23,7 @@ import org.mockito.InOrder; import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.observers.*; import io.reactivex.schedulers.Schedulers; @@ -179,4 +181,36 @@ public void onNext(Integer integer) { }); assertEquals(1,count.get()); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.range(1, 10).takeLast(5)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.takeLast(5); + } + }); + } + + @Test + public void error() { + Observable.error(new TestException()) + .takeLast(5) + .test() + .assertFailure(TestException.class); + } + + @Test + public void takeLastTake() { + Observable.range(1, 10) + .takeLast(5) + .take(2) + .test() + .assertResult(6, 7); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeTest.java index 1cd9da5af9..2c0fa4ca8e 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeTest.java @@ -366,4 +366,27 @@ public void takeNegative() { assertEquals("count >= 0 required but it was -99", ex.getMessage()); } } + + @Test + public void takeZero() { + Observable.just(1) + .take(0) + .test() + .assertResult(); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().take(2)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.take(2); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilPredicateTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilPredicateTest.java index e10cbfad72..f3e665dee6 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilPredicateTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilPredicateTest.java @@ -13,14 +13,21 @@ package io.reactivex.internal.operators.observable; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.util.List; + import org.junit.Test; import io.reactivex.*; +import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Predicate; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; ; public class ObservableTakeUntilPredicateTest { @@ -145,4 +152,43 @@ public boolean test(String t) { // FIXME last cause value is not saved // assertTrue(ts.errors().get(0).getCause().getMessage().contains("abc")); } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().takeUntil(Functions.alwaysFalse())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.takeUntil(Functions.alwaysFalse()); + } + }); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onNext(1); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .takeUntil(Functions.alwaysFalse()) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java index 02d1d53975..b03138f23d 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeUntilTest.java @@ -252,4 +252,9 @@ public void testDownstreamUnsubscribes() { assertFalse("Until still has observers", until.hasObservers()); assertTrue("Not cancelled!", ts.isCancelled()); } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().takeUntil(Observable.never())); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeWhileTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeWhileTest.java index 01047221e9..44bc58163a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeWhileTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeWhileTest.java @@ -21,7 +21,8 @@ import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Predicate; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; import io.reactivex.subjects.*; @@ -247,4 +248,33 @@ public boolean test(String t1) { // assertTrue(ts.getOnErrorEvents().get(0).getCause().getMessage().contains("abc")); } + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().takeWhile(Functions.alwaysTrue())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.takeWhile(Functions.alwaysTrue()); + } + }); + } + + @Test + public void badSource() { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onComplete(); + } + } + .takeWhile(Functions.alwaysTrue()) + .test() + .assertResult(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java index 5947d17de8..90bbeb2ace 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java @@ -14,9 +14,10 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -import java.util.Arrays; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -26,10 +27,14 @@ import org.mockito.stubbing.Answer; import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.Observer; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; @@ -366,4 +371,143 @@ public void run() { inOrder.verify(o).onComplete(); inOrder.verifyNoMoreInteractions(); } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().timeout(Functions.justFunction(Observable.never()))); + + TestHelper.checkDisposed(PublishSubject.create().timeout(Functions.justFunction(Observable.never()), Observable.never())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.timeout(Functions.justFunction(Observable.never())); + } + }); + + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) throws Exception { + return o.timeout(Functions.justFunction(Observable.never()), Observable.never()); + } + }); + } + + @Test + public void empty() { + Observable.empty() + .timeout(Functions.justFunction(Observable.never())) + .test() + .assertResult(); + } + + @Test + public void error() { + Observable.error(new TestException()) + .timeout(Functions.justFunction(Observable.never())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void emptyInner() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .timeout(Functions.justFunction(Observable.empty())) + .test(); + + ps.onNext(1); + + to.assertFailure(TimeoutException.class, 1); + } + + @Test + public void badInnerSource() { + List errors = TestHelper.trackPluginErrors(); + try { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .timeout(Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException("First")); + observer.onNext(2); + observer.onError(new TestException("Second")); + observer.onComplete(); + } + })) + .test(); + + ps.onNext(1); + + to.assertFailureAndMessage(TestException.class, "First", 1); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badInnerSourceOther() { + List errors = TestHelper.trackPluginErrors(); + try { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .timeout(Functions.justFunction(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException("First")); + observer.onNext(2); + observer.onError(new TestException("Second")); + observer.onComplete(); + } + }), Observable.just(2)) + .test(); + + ps.onNext(1); + + to.assertFailureAndMessage(TestException.class, "First", 1); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void withOtherMainError() { + Observable.error(new TestException()) + .timeout(Functions.justFunction(Observable.never()), Observable.never()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void badSourceTimeout() { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onNext(1); + observer.onNext(2); + observer.onError(new TestException("First")); + observer.onNext(3); + observer.onComplete(); + observer.onError(new TestException("Second")); + } + } + .timeout(Functions.justFunction(Observable.never()), Observable.never()) + .take(1) + .test() + .assertResult(1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java index 983337040d..1d692cd0af 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java @@ -442,4 +442,12 @@ public void boundaryOnError() { TestHelper.assertError(errors, 0, TestException.class); } + + @Test + public void mainError() { + Observable.error(new TestException()) + .window(Functions.justCallable(Observable.never())) + .test() + .assertError(TestException.class); + } } \ No newline at end of file