From 0165f52e2bcbfa45a3c558b12154b76a85b76c59 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 12 Dec 2013 21:03:37 +0100 Subject: [PATCH] Operation: Replay additional overloads --- rxjava-core/src/main/java/rx/Observable.java | 336 +++++++++++++++- .../java/rx/operators/OperationMulticast.java | 58 +++ .../java/rx/operators/OperationReplay.java | 50 ++- .../rx/operators/OperationReplayTest.java | 370 ++++++++++++++++++ 4 files changed, 790 insertions(+), 24 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 74a50c075f..f3dba504b2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -73,6 +73,7 @@ import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallel; import rx.operators.OperationParallelMerge; +import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; import rx.operators.OperationScan; @@ -526,6 +527,24 @@ public ConnectableObservable multicast(Subject su return OperationMulticast.multicast(this, subject); } + /** + * Returns an observable sequence that contains the elements of a sequence + * produced by multicasting the source sequence within a selector function. + * + * @param subjectFactory the subject factory + * @param selector The selector function which can use the multicasted + * source sequence subject to the policies enforced by the + * created subject. + * @return the Observable sequence that contains the elements of a sequence + * produced by multicasting the source sequence within a selector function. + * + * @see MSDN: Observable.Multicast + */ + public Observable multicast( + final Func0> subjectFactory, + final Func1, ? extends Observable> selector) { + return OperationMulticast.multicast(this, subjectFactory, selector); + } /** * Allow the {@link RxJavaErrorHandler} to receive the exception from * onError. @@ -4277,20 +4296,327 @@ public Observable> maxBy(Func1 selector, Comparator public ConnectableObservable replay() { return OperationMulticast.multicast(this, ReplaySubject. create()); } - + /** * Returns a {@link ConnectableObservable} that shares a single subscription * to the underlying Observable that will replay all of its items and - * notifications to any future {@link Observer} on the given scheduler + * notifications to any future {@link Observer} on the given scheduler. * - * @param scheduler - * @return + * @param scheduler the scheduler where the Observers will receive the events + * @return a {@link ConnectableObservable} that shares a single subscription + * to the underlying Observable that will replay all of its items and + * notifications to any future {@link Observer} on the given scheduler * * @see MSDN: Observable.Replay */ public ConnectableObservable replay(Scheduler scheduler) { - return OperationMulticast.multicast(this, ReplaySubject. create()); + return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject.create(), scheduler)); + } + + /** + * Returns a connectable observable sequence that shares a single subscription + * to the underlying sequence replaying bufferSize notifications. + * + * @param bufferSize the buffer size + * @return a connectable observable sequence that shares a single subscription + * to the underlying sequence replaying bufferSize notifications + * + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(int bufferSize) { + return OperationMulticast.multicast(this, OperationReplay.replayBuffered(bufferSize)); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications. + * + * @param bufferSize the buffer size + * @param scheduler the scheduler where the Observers will receive the events + * @return a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(int bufferSize, Scheduler scheduler) { + return OperationMulticast.multicast(this, + OperationReplay.createScheduledSubject( + OperationReplay.replayBuffered(bufferSize), scheduler)); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window. + * + * @param time the window length + * @param unit the window length time unit + * @return a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(long time, TimeUnit unit) { + return replay(time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window. + * + * @param time the window length + * @param unit the window length time unit + * @param scheduler the scheduler which is used as a time source for the window + * @return a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(long time, TimeUnit unit, Scheduler scheduler) { + return OperationMulticast.multicast(this, OperationReplay.replayWindowed(time, unit, -1, scheduler)); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications within window. + * + * @param bufferSize the buffer size + * @param time the window length + * @param unit the window length time unit + * @return Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications within window + * + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(int bufferSize, long time, TimeUnit unit) { + return replay(bufferSize, time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Returns a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications within window. + * + * @param bufferSize the buffer size + * @param time the window length + * @param unit the window length time unit + * @param scheduler the scheduler which is used as a time source for the window + * @return a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications within window + * + * @see MSDN: Observable.Replay + */ + public ConnectableObservable replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) { + if (bufferSize < 0) { + throw new IllegalArgumentException("bufferSize < 0"); + } + return OperationMulticast.multicast(this, OperationReplay.replayWindowed(time, unit, bufferSize, scheduler)); + } + + /** + * Returns an observable sequence that is the result of invoking the selector + * on a connectable observable sequence that shares a single subscription to + * the underlying sequence and starts with initial value. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @return an observable sequence that is the result of invoking the selector + * on a connectable observable sequence that shares a single subscription to + * the underlying sequence and starts with initial value + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return ReplaySubject.create(); + } + }, selector); + } + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param scheduler the scheduler where the replay is observed + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final Scheduler scheduler) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.createScheduledSubject(ReplaySubject.create(), scheduler); + } + }, selector); + } + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param bufferSize the buffer size + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final int bufferSize) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.replayBuffered(bufferSize); + } + }, selector); + } + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param bufferSize the buffer size + * @param scheduler the scheduler where the replay is observed + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.createScheduledSubject(OperationReplay.replayBuffered(bufferSize), scheduler); + } + }, selector); + } + + /** + * Returns an observable sequence that is the result of invoking + * the selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param time the window length + * @param unit the window length time unit + * @return an observable sequence that is the result of invoking + * the selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, long time, TimeUnit unit) { + return replay(selector, time, unit, Schedulers.threadPoolForComputation()); } + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param time the window length + * @param unit the window length time unit + * @param scheduler the scheduler which is used as a time source for the window + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying all notifications within window + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final long time, final TimeUnit unit, final Scheduler scheduler) { + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.replayWindowed(time, unit, -1, scheduler); + } + }, selector); + } + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * within window. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param bufferSize the buffer size + * @param time the window length + * @param unit the window length time unit + * + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * within window + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, int bufferSize, long time, TimeUnit unit) { + return replay(selector, bufferSize, time, unit, Schedulers.threadPoolForComputation()); + } + + + /** + * Returns an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * within window. + * + * @param the return element type + * @param selector The selector function which can use the multicasted + * this sequence as many times as needed, without causing + * multiple subscriptions to this sequence. + * @param bufferSize the buffer size + * @param time the window length + * @param unit the window length time unit + * @param scheduler the scheduler which is used as a time source for the window + * + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence replaying bufferSize notifications + * within window + * + * @see MSDN: Observable.Replay + */ + public Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { + if (bufferSize < 0) { + throw new IllegalArgumentException("bufferSize < 0"); + } + return OperationMulticast.multicast(this, new Func0>() { + @Override + public Subject call() { + return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler); + } + }, selector); + } + /** * Retry subscription to the source Observable when it calls * onError up to a certain number of retries. diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java index b634b2dbac..ef310986f5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -16,10 +16,15 @@ package rx.operators; import rx.Observable; +import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; import rx.observables.ConnectableObservable; import rx.subjects.Subject; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func0; +import rx.util.functions.Func1; public class OperationMulticast { public static ConnectableObservable multicast(Observable source, final Subject subject) { @@ -81,4 +86,57 @@ public void unsubscribe() { } } + /** + * Returns an observable sequence that contains the elements of a sequence + * produced by multicasting the source sequence within a selector function. + * + * @param source + * @param subjectFactory + * @param selector + * @return + * + * @see MSDN: Observable.Multicast + */ + public static Observable multicast( + final Observable source, + final Func0> subjectFactory, + final Func1, ? extends Observable> selector) { + return Observable.create(new MulticastSubscribeFunc(source, subjectFactory, selector)); + } + /** The multicast subscription function. */ + private static final class MulticastSubscribeFunc implements OnSubscribeFunc { + final Observable source; + final Func0> subjectFactory; + final Func1, ? extends Observable> resultSelector; + public MulticastSubscribeFunc(Observable source, + Func0> subjectFactory, + Func1, ? extends Observable> resultSelector) { + this.source = source; + this.subjectFactory = subjectFactory; + this.resultSelector = resultSelector; + } + @Override + public Subscription onSubscribe(Observer t1) { + Observable observable; + ConnectableObservable connectable; + try { + Subject subject = subjectFactory.call(); + + connectable = new MulticastConnectableObservable(source, subject); + + observable = resultSelector.call(connectable); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + + CompositeSubscription csub = new CompositeSubscription(); + + csub.add(observable.subscribe(new SafeObserver( + new SafeObservableSubscription(csub), t1))); + csub.add(connectable.connect()); + + return csub; + } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationReplay.java b/rxjava-core/src/main/java/rx/operators/OperationReplay.java index 8f9e332c16..1a89b3d2ac 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationReplay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationReplay.java @@ -49,7 +49,7 @@ public final class OperationReplay { /** * Create a BoundedReplaySubject with the given buffer size. */ - public static Subject replayWithBufferSize(int bufferSize) { + public static Subject replayBuffered(int bufferSize) { return CustomReplaySubject.create(bufferSize); } /** @@ -61,22 +61,12 @@ public static Subject createScheduledSubject(Subject subject, Sc SubjectWrapper s = new SubjectWrapper(subscriberOf(observedOn), subject); return s; } - /** - * Return an OnSubscribeFunc which delegates the subscription to the given observable. - */ - public static OnSubscribeFunc subscriberOf(final Observable target) { - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer t1) { - return target.subscribe(t1); - } - }; - } /** * Create a CustomReplaySubject with the given time window length * and optional buffer size. * + * @param the source and return type * @param time the length of the time window * @param unit the unit of the time window length * @param bufferSize the buffer size if >= 0, otherwise, the buffer will be unlimited @@ -84,7 +74,7 @@ public Subscription onSubscribe(Observer t1) { * observers will not observe on this scheduler. * @return a Subject with the required replay behavior */ - public static Subject replayWithTimeWindowOrBufferSize(long time, TimeUnit unit, int bufferSize, final Scheduler scheduler) { + public static Subject replayWindowed(long time, TimeUnit unit, int bufferSize, final Scheduler scheduler) { final long ms = unit.toMillis(time); if (ms <= 0) { throw new IllegalArgumentException("The time window is less than 1 millisecond!"); @@ -134,6 +124,19 @@ public void call() { return brs; } + + /** + * Return an OnSubscribeFunc which delegates the subscription to the given observable. + */ + public static OnSubscribeFunc subscriberOf(final Observable target) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer t1) { + return target.subscribe(t1); + } + }; + } + /** * Subject that wraps another subject and uses a mapping function * to transform the received values. @@ -208,12 +211,15 @@ public void unlock() { } /** * Base interface for logically indexing a list. - * @param + * @param the value type */ public interface VirtualList { /** @return the number of elements in this list */ int size(); - /** Add an element to the list. */ + /** + * Add an element to the list. + * @param value the value to add + */ void add(T value); /** * Retrieve an element at the specified logical index. @@ -249,6 +255,11 @@ public interface VirtualList { * Clears and resets the indexes of the list. */ void reset(); + /** + * Returns the current content as a list. + * @return + */ + List toList(); } /** * Behaves like a normal, unbounded ArrayList but with virtual index. @@ -302,6 +313,10 @@ public void reset() { list.clear(); startIndex = 0; } + @Override + public List toList() { + return new ArrayList(list); + } } /** @@ -396,10 +411,7 @@ public void removeBefore(int index) { startIndex = index; head = head2 % maxSize; } - /** - * Returns a list with the current elements in this bounded list. - * @return - */ + @Override public List toList() { List r = new ArrayList(list.size() + 1); for (int i = head; i < head + count; i++) { diff --git a/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java b/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java index 6db879582c..56f540d466 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java @@ -16,9 +16,18 @@ package rx.operators; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.*; +import rx.Observable; +import rx.Observer; +import rx.observables.ConnectableObservable; import rx.operators.OperationReplay.VirtualBoundedList; +import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; +import rx.util.functions.Func1; public class OperationReplayTest { @Test @@ -70,4 +79,365 @@ public void testReadAfter() { list.get(4); } + @Test + public void testBufferedReplay() { + PublishSubject source = PublishSubject.create(); + + ConnectableObservable co = source.replay(3); + co.connect(); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + inOrder.verify(observer1, times(1)).onNext(1); + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + + source.onNext(4); + source.onCompleted(); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + } + } + @Test + public void testWindowedReplay() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + ConnectableObservable co = source.replay(100, TimeUnit.MILLISECONDS, scheduler); + co.connect(); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(2); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(3); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onCompleted(); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + + inOrder.verify(observer1, times(1)).onNext(1); + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + inOrder.verify(observer1, times(1)).onNext(3); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + } + } + @Test + public void testReplaySelector() { + final Func1 dbl = new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * 2; + } + + }; + + Func1, Observable> selector = new Func1, Observable>() { + + @Override + public Observable call(Observable t1) { + return t1.map(dbl); + } + + }; + + PublishSubject source = PublishSubject.create(); + + Observable co = source.replay(selector); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onNext(6); + + source.onNext(4); + source.onCompleted(); + inOrder.verify(observer1, times(1)).onNext(8); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + + } + + @Test + public void testBufferedReplaySelector() { + + final Func1 dbl = new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * 2; + } + + }; + + Func1, Observable> selector = new Func1, Observable>() { + + @Override + public Observable call(Observable t1) { + return t1.map(dbl); + } + + }; + + PublishSubject source = PublishSubject.create(); + + Observable co = source.replay(selector, 3); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onNext(6); + + source.onNext(4); + source.onCompleted(); + inOrder.verify(observer1, times(1)).onNext(8); + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + } + } + @Test + public void testWindowedReplaySelector() { + + final Func1 dbl = new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * 2; + } + + }; + + Func1, Observable> selector = new Func1, Observable>() { + + @Override + public Observable call(Observable t1) { + return t1.map(dbl); + } + + }; + + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable co = source.replay(selector, 100, TimeUnit.MILLISECONDS, scheduler); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(2); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(3); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onCompleted(); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onNext(6); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + } + } + @Test + public void testBufferedReplayError() { + PublishSubject source = PublishSubject.create(); + + ConnectableObservable co = source.replay(3); + co.connect(); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + inOrder.verify(observer1, times(1)).onNext(1); + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + + source.onNext(4); + source.onError(new RuntimeException("Forced failure")); + + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onCompleted(); + + } + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onCompleted(); + } + } + @Test + public void testWindowedReplayError() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + ConnectableObservable co = source.replay(100, TimeUnit.MILLISECONDS, scheduler); + co.connect(); + + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(2); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onNext(3); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + source.onError(new RuntimeException("Forced failure")); + scheduler.advanceTimeBy(60, TimeUnit.MILLISECONDS); + + inOrder.verify(observer1, times(1)).onNext(1); + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + + inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onCompleted(); + + } + { + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + inOrder.verify(observer1, times(1)).onNext(3); + + inOrder.verify(observer1, times(1)).onError(any(RuntimeException.class)); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onCompleted(); + } + } }