diff --git a/src/main/java/rx/observers/SerializedObserver.java b/src/main/java/rx/observers/SerializedObserver.java index 86ca42f8cf..8125ce54e6 100644 --- a/src/main/java/rx/observers/SerializedObserver.java +++ b/src/main/java/rx/observers/SerializedObserver.java @@ -16,7 +16,8 @@ package rx.observers; import rx.Observer; -import rx.exceptions.Exceptions; +import rx.exceptions.*; +import rx.internal.operators.NotificationLite; /** * Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and @@ -35,13 +36,15 @@ public class SerializedObserver implements Observer { private final Observer actual; - private boolean emitting = false; - private boolean terminated = false; + private boolean emitting; + /** Set to true if a terminal event was received. */ + private volatile boolean terminated; + /** If not null, it indicates more work. */ private FastList queue; + private final NotificationLite nl = NotificationLite.instance(); - private static final int MAX_DRAIN_ITERATION = Integer.MAX_VALUE; - private static final Object NULL_SENTINEL = new Object(); - private static final Object COMPLETE_SENTINEL = new Object(); + /** Number of iterations without additional safepoint poll in the drain loop. */ + private static final int MAX_DRAIN_ITERATION = 1024; static final class FastList { Object[] array; @@ -64,150 +67,119 @@ public void add(Object o) { } } - private static final class ErrorSentinel { - final Throwable e; - - ErrorSentinel(Throwable e) { - this.e = e; - } - } - public SerializedObserver(Observer s) { this.actual = s; } @Override - public void onCompleted() { - FastList list; + public void onNext(T t) { + if (terminated) { + return; + } synchronized (this) { if (terminated) { return; } - terminated = true; if (emitting) { - if (queue == null) { - queue = new FastList(); + FastList list = queue; + if (list == null) { + list = new FastList(); + queue = list; } - queue.add(COMPLETE_SENTINEL); + list.add(nl.next(t)); return; } emitting = true; - list = queue; - queue = null; } - drainQueue(list); - actual.onCompleted(); + try { + actual.onNext(t); + } catch (Throwable e) { + terminated = true; + Exceptions.throwIfFatal(e); + actual.onError(OnErrorThrowable.addValueAsLastCause(e, t)); + return; + } + for (;;) { + for (int i = 0; i < MAX_DRAIN_ITERATION; i++) { + FastList list; + synchronized (this) { + list = queue; + if (list == null) { + emitting = false; + return; + } + queue = null; + } + for (Object o : list.array) { + if (o == null) { + break; + } + try { + if (nl.accept(actual, o)) { + terminated = true; + return; + } + } catch (Throwable e) { + terminated = true; + Exceptions.throwIfFatal(e); + actual.onError(OnErrorThrowable.addValueAsLastCause(e, t)); + return; + } + } + } + } } - + @Override public void onError(final Throwable e) { Exceptions.throwIfFatal(e); - FastList list; + if (terminated) { + return; + } synchronized (this) { if (terminated) { return; } + terminated = true; if (emitting) { - if (queue == null) { - queue = new FastList(); + /* + * FIXME: generally, errors jump the queue but this wasn't true + * for SerializedObserver and may break existing expectations. + */ + FastList list = queue; + if (list == null) { + list = new FastList(); + queue = list; } - queue.add(new ErrorSentinel(e)); + list.add(nl.error(e)); return; } emitting = true; - list = queue; - queue = null; } - drainQueue(list); actual.onError(e); - synchronized(this) { - emitting = false; - } } @Override - public void onNext(T t) { - FastList list; - + public void onCompleted() { + if (terminated) { + return; + } synchronized (this) { if (terminated) { return; } + terminated = true; if (emitting) { - if (queue == null) { - queue = new FastList(); + FastList list = queue; + if (list == null) { + list = new FastList(); + queue = list; } - queue.add(t != null ? t : NULL_SENTINEL); - // another thread is emitting so we add to the queue and return + list.add(nl.completed()); return; } - // we can emit emitting = true; - // reference to the list to drain before emitting our value - list = queue; - queue = null; - } - - // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above - boolean skipFinal = false; - try { - int iter = MAX_DRAIN_ITERATION; - do { - drainQueue(list); - if (iter == MAX_DRAIN_ITERATION) { - // after the first draining we emit our own value - actual.onNext(t); - } - --iter; - if (iter > 0) { - synchronized (this) { - list = queue; - queue = null; - if (list == null) { - emitting = false; - skipFinal = true; - return; - } - } - } - } while (iter > 0); - } finally { - if (!skipFinal) { - synchronized (this) { - if (terminated) { - list = queue; - queue = null; - } else { - emitting = false; - list = null; - } - } - } - } - - // this will only drain if terminated (done here outside of synchronized block) - drainQueue(list); - } - - void drainQueue(FastList list) { - if (list == null || list.size == 0) { - return; - } - for (Object v : list.array) { - if (v == null) { - break; - } - if (v == NULL_SENTINEL) { - actual.onNext(null); - } else if (v == COMPLETE_SENTINEL) { - actual.onCompleted(); - } else if (v.getClass() == ErrorSentinel.class) { - actual.onError(((ErrorSentinel) v).e); - } else { - @SuppressWarnings("unchecked") - T t = (T)v; - actual.onNext(t); - } } + actual.onCompleted(); } } diff --git a/src/main/java/rx/observers/TestObserver.java b/src/main/java/rx/observers/TestObserver.java index e7f02131b6..c20784187a 100644 --- a/src/main/java/rx/observers/TestObserver.java +++ b/src/main/java/rx/observers/TestObserver.java @@ -117,13 +117,17 @@ public void assertReceivedOnNext(List items) { } for (int i = 0; i < items.size(); i++) { - if (items.get(i) == null) { + T expected = items.get(i); + T actual = onNextEvents.get(i); + if (expected == null) { // check for null equality - if (onNextEvents.get(i) != null) { - throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + onNextEvents.get(i) + "]"); + if (actual != null) { + throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + actual + "]"); } - } else if (!items.get(i).equals(onNextEvents.get(i))) { - throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] (" + items.get(i).getClass().getSimpleName() + ") but was: [" + onNextEvents.get(i) + "] (" + onNextEvents.get(i).getClass().getSimpleName() + ")"); + } else if (!expected.equals(actual)) { + throw new AssertionError("Value at index: " + i + + " expected to be [" + expected + "] (" + expected.getClass().getSimpleName() + + ") but was: [" + actual + "] (" + (actual != null ? actual.getClass().getSimpleName() : "null") + ")"); } } diff --git a/src/main/java/rx/observers/TestSubscriber.java b/src/main/java/rx/observers/TestSubscriber.java index a2255cf401..2d46a25179 100644 --- a/src/main/java/rx/observers/TestSubscriber.java +++ b/src/main/java/rx/observers/TestSubscriber.java @@ -258,10 +258,15 @@ public void assertUnsubscribed() { * if this {@code Subscriber} has received one or more {@code onError} notifications */ public void assertNoErrors() { - if (getOnErrorEvents().size() > 0) { - // can't use AssertionError because (message, cause) doesn't exist until Java 7 - throw new RuntimeException("Unexpected onError events: " + getOnErrorEvents().size(), getOnErrorEvents().get(0)); - // TODO possibly check for Java7+ and then use AssertionError at runtime (since we always compile with 7) + List onErrorEvents = getOnErrorEvents(); + if (onErrorEvents.size() > 0) { + AssertionError ae = new AssertionError("Unexpected onError events: " + getOnErrorEvents().size()); + if (onErrorEvents.size() == 1) { + ae.initCause(getOnErrorEvents().get(0)); + } else { + ae.initCause(new CompositeException(onErrorEvents)); + } + throw ae; } } diff --git a/src/test/java/rx/observers/ObserversTest.java b/src/test/java/rx/observers/ObserversTest.java new file mode 100644 index 0000000000..df8b3aae99 --- /dev/null +++ b/src/test/java/rx/observers/ObserversTest.java @@ -0,0 +1,189 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observers; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.lang.reflect.*; +import java.util.concurrent.atomic.*; + +import org.junit.Test; + +import rx.exceptions.*; +import rx.functions.*; + +public class ObserversTest { + @Test + public void testNotInstantiable() { + try { + Constructor c = Observers.class.getDeclaredConstructor(); + c.setAccessible(true); + Object instance = c.newInstance(); + fail("Could instantiate Actions! " + instance); + } catch (NoSuchMethodException ex) { + ex.printStackTrace(); + } catch (InvocationTargetException ex) { + ex.printStackTrace(); + } catch (InstantiationException ex) { + ex.printStackTrace(); + } catch (IllegalAccessException ex) { + ex.printStackTrace(); + } + } + + @Test + public void testEmptyOnErrorNotImplemented() { + try { + Observers.empty().onError(new TestException()); + fail("OnErrorNotImplementedException not thrown!"); + } catch (OnErrorNotImplementedException ex) { + if (!(ex.getCause() instanceof TestException)) { + fail("TestException not wrapped, instead: " + ex.getCause()); + } + } + } + @Test + public void testCreate1OnErrorNotImplemented() { + try { + Observers.create(Actions.empty()).onError(new TestException()); + fail("OnErrorNotImplementedException not thrown!"); + } catch (OnErrorNotImplementedException ex) { + if (!(ex.getCause() instanceof TestException)) { + fail("TestException not wrapped, instead: " + ex.getCause()); + } + } + } + @Test(expected = IllegalArgumentException.class) + public void testCreate1Null() { + Observers.create(null); + } + @Test(expected = IllegalArgumentException.class) + public void testCreate2Null() { + Action1 throwAction = Actions.empty(); + Observers.create(null, throwAction); + } + @Test(expected = IllegalArgumentException.class) + public void testCreate3Null() { + Observers.create(Actions.empty(), null); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreate4Null() { + Action1 throwAction = Actions.empty(); + Observers.create(null, throwAction, Actions.empty()); + } + @Test(expected = IllegalArgumentException.class) + public void testCreate5Null() { + Observers.create(Actions.empty(), null, Actions.empty()); + } + @Test(expected = IllegalArgumentException.class) + public void testCreate6Null() { + Action1 throwAction = Actions.empty(); + Observers.create(Actions.empty(), throwAction, null); + } + + @Test + public void testCreate1Value() { + final AtomicInteger value = new AtomicInteger(); + Action1 action = new Action1() { + @Override + public void call(Integer t) { + value.set(t); + } + }; + Observers.create(action).onNext(1); + + assertEquals(1, value.get()); + } + @Test + public void testCreate2Value() { + final AtomicInteger value = new AtomicInteger(); + Action1 action = new Action1() { + @Override + public void call(Integer t) { + value.set(t); + } + }; + Action1 throwAction = Actions.empty(); + Observers.create(action, throwAction).onNext(1); + + assertEquals(1, value.get()); + } + + @Test + public void testCreate3Value() { + final AtomicInteger value = new AtomicInteger(); + Action1 action = new Action1() { + @Override + public void call(Integer t) { + value.set(t); + } + }; + Action1 throwAction = Actions.empty(); + Observers.create(action, throwAction, Actions.empty()).onNext(1); + + assertEquals(1, value.get()); + } + + @Test + public void testError2() { + final AtomicReference value = new AtomicReference(); + Action1 action = new Action1() { + @Override + public void call(Throwable t) { + value.set(t); + } + }; + TestException exception = new TestException(); + Observers.create(Actions.empty(), action).onError(exception); + + assertEquals(exception, value.get()); + } + + @Test + public void testError3() { + final AtomicReference value = new AtomicReference(); + Action1 action = new Action1() { + @Override + public void call(Throwable t) { + value.set(t); + } + }; + TestException exception = new TestException(); + Observers.create(Actions.empty(), action, Actions.empty()).onError(exception); + + assertEquals(exception, value.get()); + } + + @Test + public void testCompleted() { + Action0 action = mock(Action0.class); + + Action1 throwAction = Actions.empty(); + Observers.create(Actions.empty(), throwAction, action).onCompleted(); + + verify(action).call(); + } + + @Test + public void testEmptyCompleted() { + Observers.create(Actions.empty()).onCompleted(); + + Action1 throwAction = Actions.empty(); + Observers.create(Actions.empty(), throwAction).onCompleted(); + } +} diff --git a/src/test/java/rx/observers/SafeObserverTest.java b/src/test/java/rx/observers/SafeObserverTest.java index 584c6ee117..1083e995c7 100644 --- a/src/test/java/rx/observers/SafeObserverTest.java +++ b/src/test/java/rx/observers/SafeObserverTest.java @@ -15,11 +15,7 @@ */ package rx.observers; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -27,9 +23,7 @@ import org.junit.Test; import rx.Subscriber; -import rx.exceptions.CompositeException; -import rx.exceptions.OnErrorFailedException; -import rx.exceptions.OnErrorNotImplementedException; +import rx.exceptions.*; import rx.functions.Action0; import rx.subscriptions.Subscriptions; @@ -462,4 +456,45 @@ public SafeObserverTestException(String message) { super(message); } } + + @Test + public void testOnCompletedThrows() { + final AtomicReference error = new AtomicReference(); + SafeSubscriber s = new SafeSubscriber(new Subscriber() { + @Override + public void onNext(Integer t) { + + } + @Override + public void onError(Throwable e) { + error.set(e); + } + @Override + public void onCompleted() { + throw new TestException(); + } + }); + + s.onCompleted(); + + assertTrue("Error not received", error.get() instanceof TestException); + } + + @Test + public void testActual() { + Subscriber actual = new Subscriber() { + @Override + public void onNext(Integer t) { + } + @Override + public void onError(Throwable e) { + } + @Override + public void onCompleted() { + } + }; + SafeSubscriber s = new SafeSubscriber(actual); + + assertSame(actual, s.getActual()); + } } diff --git a/src/test/java/rx/observers/SafeSubscriberTest.java b/src/test/java/rx/observers/SafeSubscriberTest.java new file mode 100644 index 0000000000..85c2d7b07f --- /dev/null +++ b/src/test/java/rx/observers/SafeSubscriberTest.java @@ -0,0 +1,230 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observers; + +import static org.junit.Assert.assertTrue; + +import java.lang.reflect.Method; + +import org.junit.*; + +import rx.exceptions.*; +import rx.functions.Action0; +import rx.plugins.*; +import rx.subscriptions.Subscriptions; + +public class SafeSubscriberTest { + + @Before + @After + public void resetBefore() { + RxJavaPlugins ps = RxJavaPlugins.getInstance(); + + try { + Method m = ps.getClass().getDeclaredMethod("reset"); + m.setAccessible(true); + m.invoke(ps); + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + + @Test + public void testOnCompletedThrows() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onCompleted() { + throw new TestException(); + } + }; + SafeSubscriber safe = new SafeSubscriber(ts); + + safe.onCompleted(); + + assertTrue(safe.isUnsubscribed()); + } + + @Test + public void testOnCompletedThrows2() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onCompleted() { + throw new OnErrorNotImplementedException(new TestException()); + } + }; + SafeSubscriber safe = new SafeSubscriber(ts); + + try { + safe.onCompleted(); + } catch (OnErrorNotImplementedException ex) { + // expected + } + + assertTrue(safe.isUnsubscribed()); + } + + @Test + public void testPluginException() { + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + @Override + public void handleError(Throwable e) { + throw new RuntimeException(); + } + }); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onCompleted() { + throw new TestException(); + } + }; + SafeSubscriber safe = new SafeSubscriber(ts); + + safe.onCompleted(); + } + + @Test(expected = OnErrorFailedException.class) + public void testPluginExceptionWhileOnErrorUnsubscribeThrows() { + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + int calls; + @Override + public void handleError(Throwable e) { + if (++calls == 2) { + throw new RuntimeException(); + } + } + }); + + TestSubscriber ts = new TestSubscriber(); + SafeSubscriber safe = new SafeSubscriber(ts); + safe.add(Subscriptions.create(new Action0() { + @Override + public void call() { + throw new RuntimeException(); + } + })); + + safe.onError(new TestException()); + } + + @Test(expected = RuntimeException.class) + public void testPluginExceptionWhileOnErrorThrowsNotImplAndUnsubscribeThrows() { + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + int calls; + @Override + public void handleError(Throwable e) { + if (++calls == 2) { + throw new RuntimeException(); + } + } + }); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onError(Throwable e) { + throw new OnErrorNotImplementedException(e); + } + }; + SafeSubscriber safe = new SafeSubscriber(ts); + safe.add(Subscriptions.create(new Action0() { + @Override + public void call() { + throw new RuntimeException(); + } + })); + + safe.onError(new TestException()); + } + + @Test(expected = OnErrorFailedException.class) + public void testPluginExceptionWhileOnErrorThrows() { + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + int calls; + @Override + public void handleError(Throwable e) { + if (++calls == 2) { + throw new RuntimeException(); + } + } + }); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onError(Throwable e) { + throw new RuntimeException(e); + } + }; + SafeSubscriber safe = new SafeSubscriber(ts); + + safe.onError(new TestException()); + } + @Test(expected = OnErrorFailedException.class) + public void testPluginExceptionWhileOnErrorThrowsAndUnsubscribeThrows() { + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + int calls; + @Override + public void handleError(Throwable e) { + if (++calls == 2) { + throw new RuntimeException(); + } + } + }); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onError(Throwable e) { + throw new RuntimeException(e); + } + }; + SafeSubscriber safe = new SafeSubscriber(ts); + safe.add(Subscriptions.create(new Action0() { + @Override + public void call() { + throw new RuntimeException(); + } + })); + + safe.onError(new TestException()); + } + @Test(expected = OnErrorFailedException.class) + public void testPluginExceptionWhenUnsubscribing2() { + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + int calls; + @Override + public void handleError(Throwable e) { + if (++calls == 3) { + throw new RuntimeException(); + } + } + }); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onError(Throwable e) { + throw new RuntimeException(e); + } + }; + SafeSubscriber safe = new SafeSubscriber(ts); + safe.add(Subscriptions.create(new Action0() { + @Override + public void call() { + throw new RuntimeException(); + } + })); + + safe.onError(new TestException()); + } +} diff --git a/src/test/java/rx/observers/SerializedObserverTest.java b/src/test/java/rx/observers/SerializedObserverTest.java index b469c131d4..a14f146e75 100644 --- a/src/test/java/rx/observers/SerializedObserverTest.java +++ b/src/test/java/rx/observers/SerializedObserverTest.java @@ -15,35 +15,20 @@ */ package rx.observers; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import rx.Observable; +import static org.mockito.Mockito.*; + +import java.util.Arrays; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import org.junit.*; +import org.mockito.*; + +import rx.*; import rx.Observable.OnSubscribe; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; +import rx.exceptions.TestException; import rx.schedulers.Schedulers; public class SerializedObserverTest { @@ -813,4 +798,164 @@ protected void captureMaxThreads() { } } + + @Test + public void testSerializeNull() { + final AtomicReference> serial = new AtomicReference>(); + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + if (t != null && t == 0) { + serial.get().onNext(null); + } + super.onNext(t); + } + }; + + SerializedObserver sobs = new SerializedObserver(to); + serial.set(sobs); + + sobs.onNext(0); + + to.assertReceivedOnNext(Arrays.asList(0, null)); + } + + @Test + public void testSerializeAllowsOnError() { + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + throw new TestException(); + } + }; + + SerializedObserver sobs = new SerializedObserver(to); + + try { + sobs.onNext(0); + } catch (TestException ex) { + sobs.onError(ex); + } + + assertEquals(1, to.getOnErrorEvents().size()); + assertTrue(to.getOnErrorEvents().get(0) instanceof TestException); + } + + @Test + public void testSerializeReentrantNullAndComplete() { + final AtomicReference> serial = new AtomicReference>(); + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + serial.get().onCompleted(); + throw new TestException(); + } + }; + + SerializedObserver sobs = new SerializedObserver(to); + serial.set(sobs); + + try { + sobs.onNext(0); + } catch (TestException ex) { + sobs.onError(ex); + } + + assertEquals(1, to.getOnErrorEvents().size()); + assertTrue(to.getOnErrorEvents().get(0) instanceof TestException); + assertTrue(to.getOnCompletedEvents().isEmpty()); + } + + @Test + public void testSerializeReentrantNullAndError() { + final AtomicReference> serial = new AtomicReference>(); + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + serial.get().onError(new RuntimeException()); + throw new TestException(); + } + }; + + SerializedObserver sobs = new SerializedObserver(to); + serial.set(sobs); + + try { + sobs.onNext(0); + } catch (TestException ex) { + sobs.onError(ex); + } + + assertEquals(1, to.getOnErrorEvents().size()); + assertTrue(to.getOnErrorEvents().get(0) instanceof TestException); + assertTrue(to.getOnCompletedEvents().isEmpty()); + } + + @Test + public void testSerializeDrainPhaseThrows() { + final AtomicReference> serial = new AtomicReference>(); + TestObserver to = new TestObserver() { + @Override + public void onNext(Integer t) { + if (t != null && t == 0) { + serial.get().onNext(null); + } else + if (t == null) { + throw new TestException(); + } + super.onNext(t); + } + }; + + SerializedObserver sobs = new SerializedObserver(to); + serial.set(sobs); + + sobs.onNext(0); + + to.assertReceivedOnNext(Arrays.asList(0)); + assertEquals(1, to.getOnErrorEvents().size()); + assertTrue(to.getOnErrorEvents().get(0) instanceof TestException); + } + + @Test + public void testErrorReentry() { + final AtomicReference> serial = new AtomicReference>(); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer v) { + serial.get().onError(new TestException()); + serial.get().onError(new TestException()); + super.onNext(v); + } + }; + SerializedObserver sobs = new SerializedObserver(ts); + serial.set(sobs); + + sobs.onNext(1); + + ts.assertValue(1); + ts.assertError(TestException.class); + } + @Test + public void testCompleteReentry() { + final AtomicReference> serial = new AtomicReference>(); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer v) { + serial.get().onCompleted(); + serial.get().onCompleted(); + super.onNext(v); + } + }; + SerializedObserver sobs = new SerializedObserver(ts); + serial.set(sobs); + + sobs.onNext(1); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + } } diff --git a/src/test/java/rx/observers/SubscribersTest.java b/src/test/java/rx/observers/SubscribersTest.java new file mode 100644 index 0000000000..241ecae9af --- /dev/null +++ b/src/test/java/rx/observers/SubscribersTest.java @@ -0,0 +1,188 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observers; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.lang.reflect.*; +import java.util.concurrent.atomic.*; + +import org.junit.Test; + +import rx.exceptions.*; +import rx.functions.*; + +public class SubscribersTest { + @Test + public void testNotInstantiable() { + try { + Constructor c = Subscribers.class.getDeclaredConstructor(); + c.setAccessible(true); + Object instance = c.newInstance(); + fail("Could instantiate Actions! " + instance); + } catch (NoSuchMethodException ex) { + ex.printStackTrace(); + } catch (InvocationTargetException ex) { + ex.printStackTrace(); + } catch (InstantiationException ex) { + ex.printStackTrace(); + } catch (IllegalAccessException ex) { + ex.printStackTrace(); + } + } + + @Test + public void testEmptyOnErrorNotImplemented() { + try { + Subscribers.empty().onError(new TestException()); + fail("OnErrorNotImplementedException not thrown!"); + } catch (OnErrorNotImplementedException ex) { + if (!(ex.getCause() instanceof TestException)) { + fail("TestException not wrapped, instead: " + ex.getCause()); + } + } + } + @Test + public void testCreate1OnErrorNotImplemented() { + try { + Subscribers.create(Actions.empty()).onError(new TestException()); + fail("OnErrorNotImplementedException not thrown!"); + } catch (OnErrorNotImplementedException ex) { + if (!(ex.getCause() instanceof TestException)) { + fail("TestException not wrapped, instead: " + ex.getCause()); + } + } + } + @Test(expected = IllegalArgumentException.class) + public void testCreate1Null() { + Subscribers.create(null); + } + @Test(expected = IllegalArgumentException.class) + public void testCreate2Null() { + Action1 throwAction = Actions.empty(); + Subscribers.create(null, throwAction); + } + @Test(expected = IllegalArgumentException.class) + public void testCreate3Null() { + Subscribers.create(Actions.empty(), null); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreate4Null() { + Action1 throwAction = Actions.empty(); + Subscribers.create(null, throwAction, Actions.empty()); + } + @Test(expected = IllegalArgumentException.class) + public void testCreate5Null() { + Subscribers.create(Actions.empty(), null, Actions.empty()); + } + @Test(expected = IllegalArgumentException.class) + public void testCreate6Null() { + Action1 throwAction = Actions.empty(); + Subscribers.create(Actions.empty(), throwAction, null); + } + + @Test + public void testCreate1Value() { + final AtomicInteger value = new AtomicInteger(); + Action1 action = new Action1() { + @Override + public void call(Integer t) { + value.set(t); + } + }; + Subscribers.create(action).onNext(1); + + assertEquals(1, value.get()); + } + @Test + public void testCreate2Value() { + final AtomicInteger value = new AtomicInteger(); + Action1 action = new Action1() { + @Override + public void call(Integer t) { + value.set(t); + } + }; + Action1 throwAction = Actions.empty(); + Subscribers.create(action, throwAction).onNext(1); + + assertEquals(1, value.get()); + } + + @Test + public void testCreate3Value() { + final AtomicInteger value = new AtomicInteger(); + Action1 action = new Action1() { + @Override + public void call(Integer t) { + value.set(t); + } + }; + Action1 throwAction = Actions.empty(); + Subscribers.create(action, throwAction, Actions.empty()).onNext(1); + + assertEquals(1, value.get()); + } + + @Test + public void testError2() { + final AtomicReference value = new AtomicReference(); + Action1 action = new Action1() { + @Override + public void call(Throwable t) { + value.set(t); + } + }; + TestException exception = new TestException(); + Subscribers.create(Actions.empty(), action).onError(exception); + + assertEquals(exception, value.get()); + } + + @Test + public void testError3() { + final AtomicReference value = new AtomicReference(); + Action1 action = new Action1() { + @Override + public void call(Throwable t) { + value.set(t); + } + }; + TestException exception = new TestException(); + Subscribers.create(Actions.empty(), action, Actions.empty()).onError(exception); + + assertEquals(exception, value.get()); + } + + @Test + public void testCompleted() { + Action0 action = mock(Action0.class); + + Action1 throwAction = Actions.empty(); + Subscribers.create(Actions.empty(), throwAction, action).onCompleted(); + + verify(action).call(); + } + @Test + public void testEmptyCompleted() { + Subscribers.create(Actions.empty()).onCompleted(); + + Action1 throwAction = Actions.empty(); + Subscribers.create(Actions.empty(), throwAction).onCompleted(); + } +} diff --git a/src/test/java/rx/observers/TestObserverTest.java b/src/test/java/rx/observers/TestObserverTest.java index aa253f2cd2..53f7a06746 100644 --- a/src/test/java/rx/observers/TestObserverTest.java +++ b/src/test/java/rx/observers/TestObserverTest.java @@ -15,20 +15,19 @@ */ package rx.observers; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; -import java.util.Arrays; +import java.util.*; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.ExpectedException; import org.mockito.InOrder; +import rx.Notification; import rx.Observable; import rx.Observer; +import rx.exceptions.TestException; import rx.subjects.PublishSubject; public class TestObserverTest { @@ -124,5 +123,109 @@ public void testWrappingMockWhenUnsubscribeInvolved() { public void testErrorSwallowed() { Observable.error(new RuntimeException()).subscribe(new TestObserver()); } + + @Test + public void testGetEvents() { + TestObserver to = new TestObserver(); + to.onNext(1); + to.onNext(2); + + assertEquals(Arrays.asList(Arrays.asList(1, 2), + Collections.emptyList(), + Collections.emptyList()), to.getEvents()); + + to.onCompleted(); + + assertEquals(Arrays.asList(Arrays.asList(1, 2), Collections.emptyList(), + Collections.singletonList(Notification.createOnCompleted())), to.getEvents()); + + TestException ex = new TestException(); + TestObserver to2 = new TestObserver(); + to2.onNext(1); + to2.onNext(2); + + assertEquals(Arrays.asList(Arrays.asList(1, 2), + Collections.emptyList(), + Collections.emptyList()), to2.getEvents()); + + to2.onError(ex); + + assertEquals(Arrays.asList( + Arrays.asList(1, 2), + Collections.singletonList(ex), + Collections.emptyList()), + to2.getEvents()); + } + @Test + public void testNullExpected() { + TestObserver to = new TestObserver(); + to.onNext(1); + + try { + to.assertReceivedOnNext(Arrays.asList((Integer)null)); + } catch (AssertionError ex) { + // this is expected + return; + } + fail("Null element check assertion didn't happen!"); + } + + @Test + public void testNullActual() { + TestObserver to = new TestObserver(); + to.onNext(null); + + try { + to.assertReceivedOnNext(Arrays.asList(1)); + } catch (AssertionError ex) { + // this is expected + return; + } + fail("Null element check assertion didn't happen!"); + } + + @Test + public void testTerminalErrorOnce() { + TestObserver to = new TestObserver(); + to.onError(new TestException()); + to.onError(new TestException()); + + try { + to.assertTerminalEvent(); + } catch (AssertionError ex) { + // this is expected + return; + } + fail("Failed to report multiple onError terminal events!"); + } + @Test + public void testTerminalCompletedOnce() { + TestObserver to = new TestObserver(); + to.onCompleted(); + to.onCompleted(); + + try { + to.assertTerminalEvent(); + } catch (AssertionError ex) { + // this is expected + return; + } + fail("Failed to report multiple onError terminal events!"); + } + + @Test + public void testTerminalOneKind() { + TestObserver to = new TestObserver(); + to.onError(new TestException()); + to.onCompleted(); + + try { + to.assertTerminalEvent(); + } catch (AssertionError ex) { + // this is expected + return; + } + fail("Failed to report multiple kinds of events!"); + } } diff --git a/src/test/java/rx/observers/TestSubscriberTest.java b/src/test/java/rx/observers/TestSubscriberTest.java index 75d59fc1f8..1076d2152f 100644 --- a/src/test/java/rx/observers/TestSubscriberTest.java +++ b/src/test/java/rx/observers/TestSubscriberTest.java @@ -15,25 +15,22 @@ */ package rx.observers; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.ExpectedException; import org.mockito.InOrder; -import rx.Observable; -import rx.Observer; +import rx.*; +import rx.Scheduler.Worker; +import rx.exceptions.*; import rx.functions.Action0; +import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; public class TestSubscriberTest { @@ -160,4 +157,442 @@ public void call() { assertTrue(unsub.get()); } + @Test(expected = NullPointerException.class) + public void testNullDelegate1() { + TestSubscriber ts = new TestSubscriber((Observer)null); + ts.onCompleted(); + } + + @Test(expected = NullPointerException.class) + public void testNullDelegate2() { + TestSubscriber ts = new TestSubscriber((Subscriber)null); + ts.onCompleted(); + } + + @Test(expected = NullPointerException.class) + public void testNullDelegate3() { + TestSubscriber ts = new TestSubscriber((Subscriber)null, 0); + ts.onCompleted(); + } + + @Test + public void testDelegate1() { + TestObserver to = new TestObserver(); + TestSubscriber ts = TestSubscriber.create(to); + ts.onCompleted(); + + to.assertTerminalEvent(); + } + + @Test + public void testDelegate2() { + TestSubscriber ts1 = TestSubscriber.create(); + TestSubscriber ts2 = TestSubscriber.create(ts1); + ts2.onCompleted(); + + ts1.assertCompleted(); + } + + @Test + public void testDelegate3() { + TestSubscriber ts1 = TestSubscriber.create(); + TestSubscriber ts2 = TestSubscriber.create(ts1, 0); + ts2.onCompleted(); + ts1.assertCompleted(); + } + + @Test + public void testUnsubscribed() { + TestSubscriber ts = new TestSubscriber(); + try { + ts.assertUnsubscribed(); + } catch (AssertionError ex) { + // expected + return; + } + fail("Not unsubscribed but not reported!"); + } + + @Test + public void testNoErrors() { + TestSubscriber ts = new TestSubscriber(); + ts.onError(new TestException()); + try { + ts.assertNoErrors(); + } catch (AssertionError ex) { + // expected + return; + } + fail("Error present but no assertion error!"); + } + + @Test + public void testNotCompleted() { + TestSubscriber ts = new TestSubscriber(); + try { + ts.assertCompleted(); + } catch (AssertionError ex) { + // expected + return; + } + fail("Not completed and no assertion error!"); + } + + @Test + public void testMultipleCompletions() { + TestSubscriber ts = new TestSubscriber(); + ts.onCompleted(); + ts.onCompleted(); + try { + ts.assertCompleted(); + } catch (AssertionError ex) { + // expected + return; + } + fail("Multiple completions and no assertion error!"); + } + + @Test + public void testCompleted() { + TestSubscriber ts = new TestSubscriber(); + ts.onCompleted(); + try { + ts.assertNotCompleted(); + } catch (AssertionError ex) { + // expected + return; + } + fail("Completed and no assertion error!"); + } + + @Test + public void testMultipleCompletions2() { + TestSubscriber ts = new TestSubscriber(); + ts.onCompleted(); + ts.onCompleted(); + try { + ts.assertNotCompleted(); + } catch (AssertionError ex) { + // expected + return; + } + fail("Multiple completions and no assertion error!"); + } + + @Test + public void testMultipleErrors() { + TestSubscriber ts = new TestSubscriber(); + ts.onError(new TestException()); + ts.onError(new TestException()); + try { + ts.assertNoErrors(); + } catch (AssertionError ex) { + if (!(ex.getCause() instanceof CompositeException)) { + fail("Multiple Error present but the reported error doesn't have a composite cause!"); + } + // expected + return; + } + fail("Multiple Error present but no assertion error!"); + } + + @Test + public void testMultipleErrors2() { + TestSubscriber ts = new TestSubscriber(); + ts.onError(new TestException()); + ts.onError(new TestException()); + try { + ts.assertError(TestException.class); + } catch (AssertionError ex) { + if (!(ex.getCause() instanceof CompositeException)) { + fail("Multiple Error present but the reported error doesn't have a composite cause!"); + } + // expected + return; + } + fail("Multiple Error present but no assertion error!"); + } + + @Test + public void testMultipleErrors3() { + TestSubscriber ts = new TestSubscriber(); + ts.onError(new TestException()); + ts.onError(new TestException()); + try { + ts.assertError(new TestException()); + } catch (AssertionError ex) { + if (!(ex.getCause() instanceof CompositeException)) { + fail("Multiple Error present but the reported error doesn't have a composite cause!"); + } + // expected + return; + } + fail("Multiple Error present but no assertion error!"); + } + + @Test + public void testDifferentError() { + TestSubscriber ts = new TestSubscriber(); + ts.onError(new TestException()); + try { + ts.assertError(new TestException()); + } catch (AssertionError ex) { + // expected + return; + } + fail("Different Error present but no assertion error!"); + } + + @Test + public void testDifferentError2() { + TestSubscriber ts = new TestSubscriber(); + ts.onError(new RuntimeException()); + try { + ts.assertError(new TestException()); + } catch (AssertionError ex) { + // expected + return; + } + fail("Different Error present but no assertion error!"); + } + + @Test + public void testDifferentError3() { + TestSubscriber ts = new TestSubscriber(); + ts.onError(new RuntimeException()); + try { + ts.assertError(TestException.class); + } catch (AssertionError ex) { + // expected + return; + } + fail("Different Error present but no assertion error!"); + } + + @Test + public void testNoError() { + TestSubscriber ts = new TestSubscriber(); + try { + ts.assertError(TestException.class); + } catch (AssertionError ex) { + // expected + return; + } + fail("No present but no assertion error!"); + } + + @Test + public void testNoError2() { + TestSubscriber ts = new TestSubscriber(); + try { + ts.assertError(new TestException()); + } catch (AssertionError ex) { + // expected + return; + } + fail("No present but no assertion error!"); + } + + @Test + public void testInterruptTerminalEventAwait() { + TestSubscriber ts = TestSubscriber.create(); + + final Thread t0 = Thread.currentThread(); + Worker w = Schedulers.computation().createWorker(); + try { + w.schedule(new Action0() { + @Override + public void call() { + t0.interrupt(); + } + }, 200, TimeUnit.MILLISECONDS); + + try { + ts.awaitTerminalEvent(); + fail("Did not interrupt wait!"); + } catch (RuntimeException ex) { + if (!(ex.getCause() instanceof InterruptedException)) { + fail("The cause is not InterruptedException! " + ex.getCause()); + } + } + } finally { + w.unsubscribe(); + } + } + + @Test + public void testInterruptTerminalEventAwaitTimed() { + TestSubscriber ts = TestSubscriber.create(); + + final Thread t0 = Thread.currentThread(); + Worker w = Schedulers.computation().createWorker(); + try { + w.schedule(new Action0() { + @Override + public void call() { + t0.interrupt(); + } + }, 200, TimeUnit.MILLISECONDS); + + try { + ts.awaitTerminalEvent(5, TimeUnit.SECONDS); + fail("Did not interrupt wait!"); + } catch (RuntimeException ex) { + if (!(ex.getCause() instanceof InterruptedException)) { + fail("The cause is not InterruptedException! " + ex.getCause()); + } + } + } finally { + w.unsubscribe(); + } + } + + @Test + public void testInterruptTerminalEventAwaitAndUnsubscribe() { + TestSubscriber ts = TestSubscriber.create(); + + final Thread t0 = Thread.currentThread(); + Worker w = Schedulers.computation().createWorker(); + try { + w.schedule(new Action0() { + @Override + public void call() { + t0.interrupt(); + } + }, 200, TimeUnit.MILLISECONDS); + + ts.awaitTerminalEventAndUnsubscribeOnTimeout(5, TimeUnit.SECONDS); + if (!ts.isUnsubscribed()) { + fail("Did not unsubscribe!"); + } + } finally { + w.unsubscribe(); + } + } + + @Test + public void testNoTerminalEventBut1Completed() { + TestSubscriber ts = TestSubscriber.create(); + + ts.onCompleted(); + + try { + ts.assertNoTerminalEvent(); + fail("Failed to report there were terminal event(s)!"); + } catch (AssertionError ex) { + // expected + } + } + + @Test + public void testNoTerminalEventBut1Error() { + TestSubscriber ts = TestSubscriber.create(); + + ts.onError(new TestException()); + + try { + ts.assertNoTerminalEvent(); + fail("Failed to report there were terminal event(s)!"); + } catch (AssertionError ex) { + // expected + } + } + + @Test + public void testNoTerminalEventBut1Error1Completed() { + TestSubscriber ts = TestSubscriber.create(); + + ts.onCompleted(); + ts.onError(new TestException()); + + try { + ts.assertNoTerminalEvent(); + fail("Failed to report there were terminal event(s)!"); + } catch (AssertionError ex) { + // expected + } + } + + @Test + public void testNoTerminalEventBut2Errors() { + TestSubscriber ts = TestSubscriber.create(); + + ts.onError(new TestException()); + ts.onError(new TestException()); + + try { + ts.assertNoTerminalEvent(); + fail("Failed to report there were terminal event(s)!"); + } catch (AssertionError ex) { + // expected + if (!(ex.getCause() instanceof CompositeException)) { + fail("Did not report a composite exception cause: " + ex.getCause()); + } + } + } + + @Test + public void testNoValues() { + TestSubscriber ts = TestSubscriber.create(); + ts.onNext(1); + + try { + ts.assertNoValues(); + fail("Failed to report there were values!"); + } catch (AssertionError ex) { + // expected + } + } + + @Test + public void testValueCount() { + TestSubscriber ts = TestSubscriber.create(); + ts.onNext(1); + ts.onNext(2); + + try { + ts.assertValueCount(3); + fail("Failed to report there were values!"); + } catch (AssertionError ex) { + // expected + } + } + + @Test(timeout = 1000) + public void testOnCompletedCrashCountsDownLatch() { + TestObserver to = new TestObserver() { + @Override + public void onCompleted() { + throw new TestException(); + } + }; + TestSubscriber ts = TestSubscriber.create(to); + + try { + ts.onCompleted(); + } catch (TestException ex) { + // expected + } + + ts.awaitTerminalEvent(); + } + + @Test(timeout = 1000) + public void testOnErrorCrashCountsDownLatch() { + TestObserver to = new TestObserver() { + @Override + public void onError(Throwable e) { + throw new TestException(); + } + }; + TestSubscriber ts = TestSubscriber.create(to); + + try { + ts.onError(new RuntimeException()); + } catch (TestException ex) { + // expected + } + + ts.awaitTerminalEvent(); + } }