diff --git a/rxjava-core/src/main/java/rx/observers/TestObserver.java b/rxjava-core/src/main/java/rx/observers/TestObserver.java index f4e278118d..dd546359c8 100644 --- a/rxjava-core/src/main/java/rx/observers/TestObserver.java +++ b/rxjava-core/src/main/java/rx/observers/TestObserver.java @@ -27,7 +27,6 @@ */ public class TestObserver implements Observer { - private final Observer delegate; private final ArrayList onNextEvents = new ArrayList(); private final ArrayList onErrorEvents = new ArrayList(); @@ -91,7 +90,8 @@ public void assertReceivedOnNext(List items) { throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + onNextEvents.get(i) + "]"); } } else if (!items.get(i).equals(onNextEvents.get(i))) { - throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] but was: [" + onNextEvents.get(i) + "]"); + throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] (" + items.get(i).getClass().getSimpleName() + ") but was: [" + onNextEvents.get(i) + "] (" + onNextEvents.get(i).getClass().getSimpleName() + ")"); + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java index 2bb9a11d40..1231c71fa4 100644 --- a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java @@ -97,9 +97,13 @@ private void triggerActions(long targetTimeInNanos) { time = targetTimeInNanos; } + public Inner createInnerScheduler() { + return new InnerTestScheduler(); + } + @Override public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { - InnerTestScheduler inner = new InnerTestScheduler(); + Inner inner = createInnerScheduler(); final TimedAction timedAction = new TimedAction(inner, time + unit.toNanos(delayTime), action); queue.add(timedAction); return inner; @@ -107,7 +111,7 @@ public Subscription schedule(Action1 action, long delayTime, TimeUnit uni @Override public Subscription schedule(Action1 action) { - InnerTestScheduler inner = new InnerTestScheduler(); + Inner inner = createInnerScheduler(); final TimedAction timedAction = new TimedAction(inner, 0, action); queue.add(timedAction); return inner; diff --git a/rxjava-core/src/main/java/rx/subjects/TestSubject.java b/rxjava-core/src/main/java/rx/subjects/TestSubject.java new file mode 100644 index 0000000000..af3a02f3ee --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/TestSubject.java @@ -0,0 +1,182 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subjects; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Notification; +import rx.Observer; +import rx.Scheduler; +import rx.Scheduler.Inner; +import rx.functions.Action1; +import rx.schedulers.TestScheduler; +import rx.subjects.SubjectSubscriptionManager.SubjectObserver; + +/** + * Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber. + *

+ * + *

+ * Example usage: + *

+ *

 {@code
+
+ * PublishSubject subject = PublishSubject.create();
+  // observer1 will receive all onNext and onCompleted events
+  subject.subscribe(observer1);
+  subject.onNext("one");
+  subject.onNext("two");
+  // observer2 will only receive "three" and onCompleted
+  subject.subscribe(observer2);
+  subject.onNext("three");
+  subject.onCompleted();
+
+  } 
+ * 
+ * @param 
+ */
+public final class TestSubject extends Subject {
+
+    public static  TestSubject create(TestScheduler scheduler) {
+        final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager();
+        // set a default value so subscriptions will immediately receive this until a new notification is received
+        final AtomicReference> lastNotification = new AtomicReference>();
+
+        OnSubscribe onSubscribe = subscriptionManager.getOnSubscribeFunc(
+                /**
+                 * This function executes at beginning of subscription.
+                 * 
+                 * This will always run, even if Subject is in terminal state.
+                 */
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        // nothing onSubscribe unless in terminal state which is the next function
+                    }
+                },
+                /**
+                 * This function executes if the Subject is terminated before subscription occurs.
+                 */
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        /*
+                         * If we are already terminated, or termination happens while trying to subscribe
+                         * this will be invoked and we emit whatever the last terminal value was.
+                         */
+                        lastNotification.get().accept(o);
+                    }
+                });
+
+        return new TestSubject(onSubscribe, subscriptionManager, lastNotification, scheduler);
+    }
+
+    private final SubjectSubscriptionManager subscriptionManager;
+    private final AtomicReference> lastNotification;
+    private final Scheduler.Inner innerScheduler;
+
+    protected TestSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager subscriptionManager, AtomicReference> lastNotification, TestScheduler scheduler) {
+        super(onSubscribe);
+        this.subscriptionManager = subscriptionManager;
+        this.lastNotification = lastNotification;
+        this.innerScheduler = scheduler.createInnerScheduler();
+    }
+
+    @Override
+    public void onCompleted() {
+        onCompleted(innerScheduler.now());
+    }
+
+    private void _onCompleted() {
+        subscriptionManager.terminate(new Action1>>() {
+
+            @Override
+            public void call(Collection> observers) {
+                lastNotification.set(Notification. createOnCompleted());
+                for (Observer o : observers) {
+                    o.onCompleted();
+                }
+            }
+        });
+    }
+
+    public void onCompleted(long timeInMilliseconds) {
+        innerScheduler.schedule(new Action1() {
+
+            @Override
+            public void call(Inner t1) {
+                _onCompleted();
+            }
+
+        }, timeInMilliseconds, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void onError(final Throwable e) {
+        onError(e, innerScheduler.now());
+    }
+
+    private void _onError(final Throwable e) {
+        subscriptionManager.terminate(new Action1>>() {
+
+            @Override
+            public void call(Collection> observers) {
+                lastNotification.set(Notification. createOnError(e));
+                for (Observer o : observers) {
+                    o.onError(e);
+                }
+            }
+        });
+
+    }
+
+    public void onError(final Throwable e, long timeInMilliseconds) {
+        innerScheduler.schedule(new Action1() {
+
+            @Override
+            public void call(Inner t1) {
+                _onError(e);
+            }
+
+        }, timeInMilliseconds, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void onNext(T v) {
+        onNext(v, innerScheduler.now());
+    }
+
+    private void _onNext(T v) {
+        for (Observer o : subscriptionManager.rawSnapshot()) {
+            o.onNext(v);
+        }
+    }
+
+    public void onNext(final T v, long timeInMilliseconds) {
+        innerScheduler.schedule(new Action1() {
+
+            @Override
+            public void call(Inner t1) {
+                _onNext(v);
+            }
+
+        }, timeInMilliseconds, TimeUnit.MILLISECONDS);
+    }
+}
\ No newline at end of file
diff --git a/rxjava-core/src/test/java/rx/observers/TestObserverTest.java b/rxjava-core/src/test/java/rx/observers/TestObserverTest.java
index 70e22d8fb2..67883043bd 100644
--- a/rxjava-core/src/test/java/rx/observers/TestObserverTest.java
+++ b/rxjava-core/src/test/java/rx/observers/TestObserverTest.java
@@ -66,7 +66,7 @@ public void testAssertNotMatchValue() {
         oi.subscribe(o);
 
         thrown.expect(AssertionError.class);
-        thrown.expectMessage("Value at index: 1 expected to be [3] but was: [2]");
+        thrown.expectMessage("Value at index: 1 expected to be [3] (Integer) but was: [2] (Integer)");
 
         o.assertReceivedOnNext(Arrays.asList(1, 3));
         assertEquals(2, o.getOnNextEvents().size());