From 02c72fdd1637979574dbc50956c254f330fa0b87 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 6 Jan 2017 23:47:46 +0100 Subject: [PATCH 1/2] 2.x: add subjects for Single, Maybe and Completable --- .../subjects/CompletableSubject.java | 228 ++++++++++++++ .../io/reactivex/subjects/MaybeSubject.java | 276 ++++++++++++++++ .../io/reactivex/subjects/SingleSubject.java | 253 +++++++++++++++ .../subjects/CompletableSubjectTest.java | 226 +++++++++++++ .../reactivex/subjects/MaybeSubjectTest.java | 297 ++++++++++++++++++ .../reactivex/subjects/SingleSubjectTest.java | 243 ++++++++++++++ 6 files changed, 1523 insertions(+) create mode 100644 src/main/java/io/reactivex/subjects/CompletableSubject.java create mode 100644 src/main/java/io/reactivex/subjects/MaybeSubject.java create mode 100644 src/main/java/io/reactivex/subjects/SingleSubject.java create mode 100644 src/test/java/io/reactivex/subjects/CompletableSubjectTest.java create mode 100644 src/test/java/io/reactivex/subjects/MaybeSubjectTest.java create mode 100644 src/test/java/io/reactivex/subjects/SingleSubjectTest.java diff --git a/src/main/java/io/reactivex/subjects/CompletableSubject.java b/src/main/java/io/reactivex/subjects/CompletableSubject.java new file mode 100644 index 0000000000..086fee647b --- /dev/null +++ b/src/main/java/io/reactivex/subjects/CompletableSubject.java @@ -0,0 +1,228 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.subjects; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Represents a hot Completable-like source and consumer of events similar to Subjects. + *

+ * All methods are thread safe. Calling onComplete multiple + * times has no effect. Calling onError multiple times relays the Throwable to + * the RxJavaPlugins' error handler. + *

+ * The CompletableSubject doesn't store the Disposables coming through onSubscribe but + * disposes them once the other onXXX methods were called (terminal state reached). + * @since 2.0.5 - experimental + */ +@Experimental +public final class CompletableSubject extends Completable implements CompletableObserver { + + final AtomicReference observers; + + static final CompletableDisposable[] EMPTY = new CompletableDisposable[0]; + + static final CompletableDisposable[] TERMINATED = new CompletableDisposable[0]; + + final AtomicBoolean once; + Throwable error; + + /** + * Creates a fresh CompletableSubject. + * @return the new CompletableSubject instance + */ + public static CompletableSubject create() { + return new CompletableSubject(); + } + + CompletableSubject() { + once = new AtomicBoolean(); + observers = new AtomicReference(EMPTY); + } + + @Override + public void onSubscribe(Disposable d) { + if (observers.get() == TERMINATED) { + d.dispose(); + } + } + + @Override + public void onError(Throwable e) { + if (e == null) { + e = new NullPointerException("Null errors are not allowed in 2.x"); + } + if (once.compareAndSet(false, true)) { + this.error = e; + for (CompletableDisposable md : observers.getAndSet(TERMINATED)) { + md.actual.onError(e); + } + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + for (CompletableDisposable md : observers.getAndSet(TERMINATED)) { + md.actual.onComplete(); + } + } + } + + @Override + protected void subscribeActual(CompletableObserver observer) { + CompletableDisposable md = new CompletableDisposable(observer, this); + observer.onSubscribe(md); + if (add(md)) { + if (md.isDisposed()) { + remove(md); + } + } else { + Throwable ex = error; + if (ex != null) { + observer.onError(ex); + } else { + observer.onComplete(); + } + } + } + + boolean add(CompletableDisposable inner) { + for (;;) { + CompletableDisposable[] a = observers.get(); + if (a == TERMINATED) { + return false; + } + + int n = a.length; + + CompletableDisposable[] b = new CompletableDisposable[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (observers.compareAndSet(a, b)) { + return true; + } + } + } + + void remove(CompletableDisposable inner) { + for (;;) { + CompletableDisposable[] a = observers.get(); + int n = a.length; + if (n == 0) { + return; + } + + int j = -1; + + for (int i = 0; i < n; i++) { + if (a[i] == inner) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + CompletableDisposable[] b; + if (n == 1) { + b = EMPTY; + } else { + b = new CompletableDisposable[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + + if (observers.compareAndSet(a, b)) { + return; + } + } + } + + /** + * Returns the terminal error if this CompletableSubject has been terminated with an error, null otherwise. + * @return the terminal error or null if not terminated or not with an error + */ + public Throwable getThrowable() { + if (observers.get() == TERMINATED) { + return error; + } + return null; + } + + /** + * Returns true if this CompletableSubject has been terminated with an error. + * @return true if this CompletableSubject has been terminated with an error + */ + public boolean hasThrowable() { + return observers.get() == TERMINATED && error != null; + } + + /** + * Returns true if this CompletableSubject has been completed. + * @return true if this CompletableSubject has been completed + */ + public boolean hasComplete() { + return observers.get() == TERMINATED && error == null; + } + + /** + * Returns true if this CompletableSubject has observers. + * @return true if this CompletableSubject has observers + */ + public boolean hasObservers() { + return observers.get().length != 0; + } + + /** + * Returns the number of current observers. + * @return the number of current observers + */ + /* test */ int observerCount() { + return observers.get().length; + } + + static final class CompletableDisposable + extends AtomicReference implements Disposable { + private static final long serialVersionUID = -7650903191002190468L; + + final CompletableObserver actual; + + CompletableDisposable(CompletableObserver actual, CompletableSubject parent) { + this.actual = actual; + lazySet(parent); + } + + @Override + public void dispose() { + CompletableSubject parent = getAndSet(null); + if (parent != null) { + parent.remove(this); + } + } + + @Override + public boolean isDisposed() { + return get() == null; + } + } +} diff --git a/src/main/java/io/reactivex/subjects/MaybeSubject.java b/src/main/java/io/reactivex/subjects/MaybeSubject.java new file mode 100644 index 0000000000..4faf6033ce --- /dev/null +++ b/src/main/java/io/reactivex/subjects/MaybeSubject.java @@ -0,0 +1,276 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.subjects; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Represents a hot Maybe-like source and consumer of events similar to Subjects. + *

+ * All methods are thread safe. Calling onSuccess or onComplete multiple + * times has no effect. Calling onError multiple times relays the Throwable to + * the RxJavaPlugins' error handler. + *

+ * The MaybeSubject doesn't store the Disposables coming through onSubscribe but + * disposes them once the other onXXX methods were called (terminal state reached). + * @param the value type received and emitted + * @since 2.0.5 - experimental + */ +@Experimental +public final class MaybeSubject extends Maybe implements MaybeObserver { + + final AtomicReference[]> observers; + + @SuppressWarnings("rawtypes") + static final MaybeDisposable[] EMPTY = new MaybeDisposable[0]; + + @SuppressWarnings("rawtypes") + static final MaybeDisposable[] TERMINATED = new MaybeDisposable[0]; + + final AtomicBoolean once; + T value; + Throwable error; + + /** + * Creates a fresh MaybeSubject. + * @param the value type received and emitted + * @return the new MaybeSubject instance + */ + public static MaybeSubject create() { + return new MaybeSubject(); + } + + @SuppressWarnings("unchecked") + MaybeSubject() { + once = new AtomicBoolean(); + observers = new AtomicReference[]>(EMPTY); + } + + @Override + public void onSubscribe(Disposable d) { + if (observers.get() == TERMINATED) { + d.dispose(); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onSuccess(T value) { + if (value == null) { + onError(new NullPointerException("Null values are not allowed in 2.x")); + return; + } + if (once.compareAndSet(false, true)) { + this.value = value; + for (MaybeDisposable md : observers.getAndSet(TERMINATED)) { + md.actual.onSuccess(value); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable e) { + if (e == null) { + e = new NullPointerException("Null errors are not allowed in 2.x"); + } + if (once.compareAndSet(false, true)) { + this.error = e; + for (MaybeDisposable md : observers.getAndSet(TERMINATED)) { + md.actual.onError(e); + } + } else { + RxJavaPlugins.onError(e); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + for (MaybeDisposable md : observers.getAndSet(TERMINATED)) { + md.actual.onComplete(); + } + } + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + MaybeDisposable md = new MaybeDisposable(observer, this); + observer.onSubscribe(md); + if (add(md)) { + if (md.isDisposed()) { + remove(md); + } + } else { + Throwable ex = error; + if (ex != null) { + observer.onError(ex); + } else { + T v = value; + if (v == null) { + observer.onComplete(); + } else { + observer.onSuccess(v); + } + } + } + } + + boolean add(MaybeDisposable inner) { + for (;;) { + MaybeDisposable[] a = observers.get(); + if (a == TERMINATED) { + return false; + } + + int n = a.length; + @SuppressWarnings("unchecked") + MaybeDisposable[] b = new MaybeDisposable[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (observers.compareAndSet(a, b)) { + return true; + } + } + } + + @SuppressWarnings("unchecked") + void remove(MaybeDisposable inner) { + for (;;) { + MaybeDisposable[] a = observers.get(); + int n = a.length; + if (n == 0) { + return; + } + + int j = -1; + + for (int i = 0; i < n; i++) { + if (a[i] == inner) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + MaybeDisposable[] b; + if (n == 1) { + b = EMPTY; + } else { + b = new MaybeDisposable[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + + if (observers.compareAndSet(a, b)) { + return; + } + } + } + + /** + * Returns the success value if this MaybeSubject was terminated with a success value. + * @return the success value or null + */ + public T getValue() { + if (observers.get() == TERMINATED) { + return value; + } + return null; + } + + /** + * Returns true if this MaybeSubject was terminated with a success value. + * @return true if this MaybeSubject was terminated with a success value + */ + public boolean hasValue() { + return observers.get() == TERMINATED && value != null; + } + + /** + * Returns the terminal error if this MaybeSubject has been terminated with an error, null otherwise. + * @return the terminal error or null if not terminated or not with an error + */ + public Throwable getThrowable() { + if (observers.get() == TERMINATED) { + return error; + } + return null; + } + + /** + * Returns true if this MaybeSubject has been terminated with an error. + * @return true if this MaybeSubject has been terminated with an error + */ + public boolean hasThrowable() { + return observers.get() == TERMINATED && error != null; + } + + /** + * Returns true if this MaybeSubject has been completed. + * @return true if this MaybeSubject has been completed + */ + public boolean hasComplete() { + return observers.get() == TERMINATED && value == null && error == null; + } + + /** + * Returns true if this MaybeSubject has observers. + * @return true if this MaybeSubject has observers + */ + public boolean hasObservers() { + return observers.get().length != 0; + } + + /** + * Returns the number of current observers. + * @return the number of current observers + */ + /* test */ int observerCount() { + return observers.get().length; + } + + static final class MaybeDisposable + extends AtomicReference> implements Disposable { + private static final long serialVersionUID = -7650903191002190468L; + + final MaybeObserver actual; + + MaybeDisposable(MaybeObserver actual, MaybeSubject parent) { + this.actual = actual; + lazySet(parent); + } + + @Override + public void dispose() { + MaybeSubject parent = getAndSet(null); + if (parent != null) { + parent.remove(this); + } + } + + @Override + public boolean isDisposed() { + return get() == null; + } + } +} diff --git a/src/main/java/io/reactivex/subjects/SingleSubject.java b/src/main/java/io/reactivex/subjects/SingleSubject.java new file mode 100644 index 0000000000..ab749c5097 --- /dev/null +++ b/src/main/java/io/reactivex/subjects/SingleSubject.java @@ -0,0 +1,253 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.subjects; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Represents a hot Single-like source and consumer of events similar to Subjects. + *

+ * All methods are thread safe. Calling onSuccess multiple + * times has no effect. Calling onError multiple times relays the Throwable to + * the RxJavaPlugins' error handler. + *

+ * The SingleSubject doesn't store the Disposables coming through onSubscribe but + * disposes them once the other onXXX methods were called (terminal state reached). + * @param the value type received and emitted + * @since 2.0.5 - experimental + */ +@Experimental +public final class SingleSubject extends Single implements SingleObserver { + + final AtomicReference[]> observers; + + @SuppressWarnings("rawtypes") + static final SingleDisposable[] EMPTY = new SingleDisposable[0]; + + @SuppressWarnings("rawtypes") + static final SingleDisposable[] TERMINATED = new SingleDisposable[0]; + + final AtomicBoolean once; + T value; + Throwable error; + + /** + * Creates a fresh SingleSubject. + * @param the value type received and emitted + * @return the new SingleSubject instance + */ + public static SingleSubject create() { + return new SingleSubject(); + } + + @SuppressWarnings("unchecked") + SingleSubject() { + once = new AtomicBoolean(); + observers = new AtomicReference[]>(EMPTY); + } + + @Override + public void onSubscribe(Disposable d) { + if (observers.get() == TERMINATED) { + d.dispose(); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onSuccess(T value) { + if (value == null) { + onError(new NullPointerException("Null values are not allowed in 2.x")); + return; + } + if (once.compareAndSet(false, true)) { + this.value = value; + for (SingleDisposable md : observers.getAndSet(TERMINATED)) { + md.actual.onSuccess(value); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable e) { + if (e == null) { + e = new NullPointerException("Null errors are not allowed in 2.x"); + } + if (once.compareAndSet(false, true)) { + this.error = e; + for (SingleDisposable md : observers.getAndSet(TERMINATED)) { + md.actual.onError(e); + } + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + protected void subscribeActual(SingleObserver observer) { + SingleDisposable md = new SingleDisposable(observer, this); + observer.onSubscribe(md); + if (add(md)) { + if (md.isDisposed()) { + remove(md); + } + } else { + Throwable ex = error; + if (ex != null) { + observer.onError(ex); + } else { + observer.onSuccess(value); + } + } + } + + boolean add(SingleDisposable inner) { + for (;;) { + SingleDisposable[] a = observers.get(); + if (a == TERMINATED) { + return false; + } + + int n = a.length; + @SuppressWarnings("unchecked") + SingleDisposable[] b = new SingleDisposable[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (observers.compareAndSet(a, b)) { + return true; + } + } + } + + @SuppressWarnings("unchecked") + void remove(SingleDisposable inner) { + for (;;) { + SingleDisposable[] a = observers.get(); + int n = a.length; + if (n == 0) { + return; + } + + int j = -1; + + for (int i = 0; i < n; i++) { + if (a[i] == inner) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + SingleDisposable[] b; + if (n == 1) { + b = EMPTY; + } else { + b = new SingleDisposable[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + + if (observers.compareAndSet(a, b)) { + return; + } + } + } + + /** + * Returns the success value if this SingleSubject was terminated with a success value. + * @return the success value or null + */ + public T getValue() { + if (observers.get() == TERMINATED) { + return value; + } + return null; + } + + /** + * Returns true if this SingleSubject was terminated with a success value. + * @return true if this SingleSubject was terminated with a success value + */ + public boolean hasValue() { + return observers.get() == TERMINATED && value != null; + } + + /** + * Returns the terminal error if this SingleSubject has been terminated with an error, null otherwise. + * @return the terminal error or null if not terminated or not with an error + */ + public Throwable getThrowable() { + if (observers.get() == TERMINATED) { + return error; + } + return null; + } + + /** + * Returns true if this SingleSubject has been terminated with an error. + * @return true if this SingleSubject has been terminated with an error + */ + public boolean hasThrowable() { + return observers.get() == TERMINATED && error != null; + } + + /** + * Returns true if this SingleSubject has observers. + * @return true if this SingleSubject has observers + */ + public boolean hasObservers() { + return observers.get().length != 0; + } + + /** + * Returns the number of current observers. + * @return the number of current observers + */ + /* test */ int observerCount() { + return observers.get().length; + } + + static final class SingleDisposable + extends AtomicReference> implements Disposable { + private static final long serialVersionUID = -7650903191002190468L; + + final SingleObserver actual; + + SingleDisposable(SingleObserver actual, SingleSubject parent) { + this.actual = actual; + lazySet(parent); + } + + @Override + public void dispose() { + SingleSubject parent = getAndSet(null); + if (parent != null) { + parent.remove(this); + } + } + + @Override + public boolean isDisposed() { + return get() == null; + } + } +} diff --git a/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java b/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java new file mode 100644 index 0000000000..18da0c68a8 --- /dev/null +++ b/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java @@ -0,0 +1,226 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.subjects; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +public class CompletableSubjectTest { + + @Test + public void once() { + CompletableSubject ms = CompletableSubject.create(); + + TestObserver to = ms.test(); + + ms.onComplete(); + + List errors = TestHelper.trackPluginErrors(); + try { + ms.onError(new IOException()); + + TestHelper.assertError(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + ms.onComplete(); + + to.assertResult(); + } + + @Test + public void error() { + CompletableSubject ms = CompletableSubject.create(); + + assertFalse(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + TestObserver to = ms.test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + assertEquals(1, ms.observerCount()); + + ms.onError(new IOException()); + + assertFalse(ms.hasComplete()); + assertTrue(ms.hasThrowable()); + assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + to.assertFailure(IOException.class); + + ms.test().assertFailure(IOException.class); + + assertFalse(ms.hasComplete()); + assertTrue(ms.hasThrowable()); + assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + } + + @Test + public void complete() { + CompletableSubject ms = CompletableSubject.create(); + + assertFalse(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + TestObserver to = ms.test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + assertEquals(1, ms.observerCount()); + + ms.onComplete(); + + assertTrue(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + to.assertResult(); + + ms.test().assertResult(); + + assertTrue(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + } + + @Test + public void nullThrowable() { + CompletableSubject ms = CompletableSubject.create(); + + TestObserver to = ms.test(); + + ms.onError(null); + + to.assertFailure(NullPointerException.class); + } + + @Test + public void cancelOnArrival() { + CompletableSubject.create() + .test(true) + .assertEmpty(); + } + + @Test + public void cancelOnArrival2() { + CompletableSubject ms = CompletableSubject.create(); + + ms.test(); + + ms + .test(true) + .assertEmpty(); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(CompletableSubject.create()); + } + + @Test + public void disposeTwice() { + CompletableSubject.create() + .subscribe(new CompletableObserver() { + @Override + public void onSubscribe(Disposable d) { + assertFalse(d.isDisposed()); + + d.dispose(); + d.dispose(); + + assertTrue(d.isDisposed()); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + }); + } + + @Test + public void onSubscribeDispose() { + CompletableSubject ms = CompletableSubject.create(); + + Disposable d = Disposables.empty(); + + ms.onSubscribe(d); + + assertFalse(d.isDisposed()); + + ms.onComplete(); + + d = Disposables.empty(); + + ms.onSubscribe(d); + + assertTrue(d.isDisposed()); + } + + @Test + public void addRemoveRace() { + for (int i = 0; i < 500; i++) { + final CompletableSubject ms = CompletableSubject.create(); + + final TestObserver to = ms.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ms.test(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + TestHelper.race(r1, r2, Schedulers.single()); + } + } +} diff --git a/src/test/java/io/reactivex/subjects/MaybeSubjectTest.java b/src/test/java/io/reactivex/subjects/MaybeSubjectTest.java new file mode 100644 index 0000000000..4e04212757 --- /dev/null +++ b/src/test/java/io/reactivex/subjects/MaybeSubjectTest.java @@ -0,0 +1,297 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.subjects; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +public class MaybeSubjectTest { + + @Test + public void success() { + MaybeSubject ms = MaybeSubject.create(); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertFalse(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + TestObserver to = ms.test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + assertEquals(1, ms.observerCount()); + + ms.onSuccess(1); + + assertTrue(ms.hasValue()); + assertEquals(1, ms.getValue().intValue()); + assertFalse(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + to.assertResult(1); + + ms.test().assertResult(1); + + assertTrue(ms.hasValue()); + assertEquals(1, ms.getValue().intValue()); + assertFalse(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + } + + @Test + public void once() { + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = ms.test(); + + ms.onSuccess(1); + ms.onSuccess(2); + + List errors = TestHelper.trackPluginErrors(); + try { + ms.onError(new IOException()); + + TestHelper.assertError(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + ms.onComplete(); + + to.assertResult(1); + } + + @Test + public void error() { + MaybeSubject ms = MaybeSubject.create(); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertFalse(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + TestObserver to = ms.test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + assertEquals(1, ms.observerCount()); + + ms.onError(new IOException()); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertFalse(ms.hasComplete()); + assertTrue(ms.hasThrowable()); + assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + to.assertFailure(IOException.class); + + ms.test().assertFailure(IOException.class); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertFalse(ms.hasComplete()); + assertTrue(ms.hasThrowable()); + assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + } + + @Test + public void complete() { + MaybeSubject ms = MaybeSubject.create(); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertFalse(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + TestObserver to = ms.test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + assertEquals(1, ms.observerCount()); + + ms.onComplete(); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertTrue(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + to.assertResult(); + + ms.test().assertResult(); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertTrue(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + } + + @Test + public void nullValue() { + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = ms.test(); + + ms.onSuccess(null); + + to.assertFailure(NullPointerException.class); + } + + @Test + public void nullThrowable() { + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = ms.test(); + + ms.onError(null); + + to.assertFailure(NullPointerException.class); + } + + @Test + public void cancelOnArrival() { + MaybeSubject.create() + .test(true) + .assertEmpty(); + } + + @Test + public void cancelOnArrival2() { + MaybeSubject ms = MaybeSubject.create(); + + ms.test(); + + ms + .test(true) + .assertEmpty(); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(MaybeSubject.create()); + } + + @Test + public void disposeTwice() { + MaybeSubject.create() + .subscribe(new MaybeObserver() { + @Override + public void onSubscribe(Disposable d) { + assertFalse(d.isDisposed()); + + d.dispose(); + d.dispose(); + + assertTrue(d.isDisposed()); + } + + @Override + public void onSuccess(Object value) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + }); + } + + @Test + public void onSubscribeDispose() { + MaybeSubject ms = MaybeSubject.create(); + + Disposable d = Disposables.empty(); + + ms.onSubscribe(d); + + assertFalse(d.isDisposed()); + + ms.onComplete(); + + d = Disposables.empty(); + + ms.onSubscribe(d); + + assertTrue(d.isDisposed()); + } + + @Test + public void addRemoveRace() { + for (int i = 0; i < 500; i++) { + final MaybeSubject ms = MaybeSubject.create(); + + final TestObserver to = ms.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ms.test(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + TestHelper.race(r1, r2, Schedulers.single()); + } + } +} diff --git a/src/test/java/io/reactivex/subjects/SingleSubjectTest.java b/src/test/java/io/reactivex/subjects/SingleSubjectTest.java new file mode 100644 index 0000000000..558a58c7df --- /dev/null +++ b/src/test/java/io/reactivex/subjects/SingleSubjectTest.java @@ -0,0 +1,243 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.subjects; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +public class SingleSubjectTest { + + @Test + public void success() { + SingleSubject ms = SingleSubject.create(); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + TestObserver to = ms.test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + assertEquals(1, ms.observerCount()); + + ms.onSuccess(1); + + assertTrue(ms.hasValue()); + assertEquals(1, ms.getValue().intValue()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + to.assertResult(1); + + ms.test().assertResult(1); + + assertTrue(ms.hasValue()); + assertEquals(1, ms.getValue().intValue()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + } + + @Test + public void once() { + SingleSubject ms = SingleSubject.create(); + + TestObserver to = ms.test(); + + ms.onSuccess(1); + ms.onSuccess(2); + + List errors = TestHelper.trackPluginErrors(); + try { + ms.onError(new IOException()); + + TestHelper.assertError(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + + to.assertResult(1); + } + + @Test + public void error() { + SingleSubject ms = SingleSubject.create(); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + TestObserver to = ms.test(); + + to.assertEmpty(); + + assertTrue(ms.hasObservers()); + assertEquals(1, ms.observerCount()); + + ms.onError(new IOException()); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertTrue(ms.hasThrowable()); + assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + + to.assertFailure(IOException.class); + + ms.test().assertFailure(IOException.class); + + assertFalse(ms.hasValue()); + assertNull(ms.getValue()); + assertTrue(ms.hasThrowable()); + assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); + assertFalse(ms.hasObservers()); + assertEquals(0, ms.observerCount()); + } + + @Test + public void nullValue() { + SingleSubject ms = SingleSubject.create(); + + TestObserver to = ms.test(); + + ms.onSuccess(null); + + to.assertFailure(NullPointerException.class); + } + + @Test + public void nullThrowable() { + SingleSubject ms = SingleSubject.create(); + + TestObserver to = ms.test(); + + ms.onError(null); + + to.assertFailure(NullPointerException.class); + } + + @Test + public void cancelOnArrival() { + SingleSubject.create() + .test(true) + .assertEmpty(); + } + + @Test + public void cancelOnArrival2() { + SingleSubject ms = SingleSubject.create(); + + ms.test(); + + ms + .test(true) + .assertEmpty(); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(SingleSubject.create()); + } + + @Test + public void disposeTwice() { + SingleSubject.create() + .subscribe(new SingleObserver() { + @Override + public void onSubscribe(Disposable d) { + assertFalse(d.isDisposed()); + + d.dispose(); + d.dispose(); + + assertTrue(d.isDisposed()); + } + + @Override + public void onSuccess(Object value) { + + } + + @Override + public void onError(Throwable e) { + + } + }); + } + + @Test + public void onSubscribeDispose() { + SingleSubject ms = SingleSubject.create(); + + Disposable d = Disposables.empty(); + + ms.onSubscribe(d); + + assertFalse(d.isDisposed()); + + ms.onSuccess(1); + + d = Disposables.empty(); + + ms.onSubscribe(d); + + assertTrue(d.isDisposed()); + } + + @Test + public void addRemoveRace() { + for (int i = 0; i < 500; i++) { + final SingleSubject ms = SingleSubject.create(); + + final TestObserver to = ms.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ms.test(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + TestHelper.race(r1, r2, Schedulers.single()); + } + } +} From d2725335252a38b3230662713c765e940e68cee7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 7 Jan 2017 16:59:07 +0100 Subject: [PATCH 2/2] Add CheckReturnValue, rename local test vars --- .../subjects/CompletableSubject.java | 3 +- .../io/reactivex/subjects/MaybeSubject.java | 3 +- .../io/reactivex/subjects/SingleSubject.java | 3 +- .../subjects/CompletableSubjectTest.java | 120 +++++++-------- .../reactivex/subjects/SingleSubjectTest.java | 138 +++++++++--------- 5 files changed, 135 insertions(+), 132 deletions(-) diff --git a/src/main/java/io/reactivex/subjects/CompletableSubject.java b/src/main/java/io/reactivex/subjects/CompletableSubject.java index 086fee647b..dd5394d899 100644 --- a/src/main/java/io/reactivex/subjects/CompletableSubject.java +++ b/src/main/java/io/reactivex/subjects/CompletableSubject.java @@ -16,7 +16,7 @@ import java.util.concurrent.atomic.*; import io.reactivex.*; -import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.plugins.RxJavaPlugins; @@ -47,6 +47,7 @@ public final class CompletableSubject extends Completable implements Completable * Creates a fresh CompletableSubject. * @return the new CompletableSubject instance */ + @CheckReturnValue public static CompletableSubject create() { return new CompletableSubject(); } diff --git a/src/main/java/io/reactivex/subjects/MaybeSubject.java b/src/main/java/io/reactivex/subjects/MaybeSubject.java index 4faf6033ce..3d03d5a637 100644 --- a/src/main/java/io/reactivex/subjects/MaybeSubject.java +++ b/src/main/java/io/reactivex/subjects/MaybeSubject.java @@ -16,7 +16,7 @@ import java.util.concurrent.atomic.*; import io.reactivex.*; -import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.plugins.RxJavaPlugins; @@ -52,6 +52,7 @@ public final class MaybeSubject extends Maybe implements MaybeObserver * @param the value type received and emitted * @return the new MaybeSubject instance */ + @CheckReturnValue public static MaybeSubject create() { return new MaybeSubject(); } diff --git a/src/main/java/io/reactivex/subjects/SingleSubject.java b/src/main/java/io/reactivex/subjects/SingleSubject.java index ab749c5097..2f8f076d5c 100644 --- a/src/main/java/io/reactivex/subjects/SingleSubject.java +++ b/src/main/java/io/reactivex/subjects/SingleSubject.java @@ -16,7 +16,7 @@ import java.util.concurrent.atomic.*; import io.reactivex.*; -import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.plugins.RxJavaPlugins; @@ -52,6 +52,7 @@ public final class SingleSubject extends Single implements SingleObserver< * @param the value type received and emitted * @return the new SingleSubject instance */ + @CheckReturnValue public static SingleSubject create() { return new SingleSubject(); } diff --git a/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java b/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java index 18da0c68a8..e6c953d1e0 100644 --- a/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java @@ -30,104 +30,104 @@ public class CompletableSubjectTest { @Test public void once() { - CompletableSubject ms = CompletableSubject.create(); + CompletableSubject cs = CompletableSubject.create(); - TestObserver to = ms.test(); + TestObserver to = cs.test(); - ms.onComplete(); + cs.onComplete(); List errors = TestHelper.trackPluginErrors(); try { - ms.onError(new IOException()); + cs.onError(new IOException()); TestHelper.assertError(errors, 0, IOException.class); } finally { RxJavaPlugins.reset(); } - ms.onComplete(); + cs.onComplete(); to.assertResult(); } @Test public void error() { - CompletableSubject ms = CompletableSubject.create(); + CompletableSubject cs = CompletableSubject.create(); - assertFalse(ms.hasComplete()); - assertFalse(ms.hasThrowable()); - assertNull(ms.getThrowable()); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertFalse(cs.hasComplete()); + assertFalse(cs.hasThrowable()); + assertNull(cs.getThrowable()); + assertFalse(cs.hasObservers()); + assertEquals(0, cs.observerCount()); - TestObserver to = ms.test(); + TestObserver to = cs.test(); to.assertEmpty(); - assertTrue(ms.hasObservers()); - assertEquals(1, ms.observerCount()); + assertTrue(cs.hasObservers()); + assertEquals(1, cs.observerCount()); - ms.onError(new IOException()); + cs.onError(new IOException()); - assertFalse(ms.hasComplete()); - assertTrue(ms.hasThrowable()); - assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertFalse(cs.hasComplete()); + assertTrue(cs.hasThrowable()); + assertTrue(cs.getThrowable().toString(), cs.getThrowable() instanceof IOException); + assertFalse(cs.hasObservers()); + assertEquals(0, cs.observerCount()); to.assertFailure(IOException.class); - ms.test().assertFailure(IOException.class); + cs.test().assertFailure(IOException.class); - assertFalse(ms.hasComplete()); - assertTrue(ms.hasThrowable()); - assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertFalse(cs.hasComplete()); + assertTrue(cs.hasThrowable()); + assertTrue(cs.getThrowable().toString(), cs.getThrowable() instanceof IOException); + assertFalse(cs.hasObservers()); + assertEquals(0, cs.observerCount()); } @Test public void complete() { - CompletableSubject ms = CompletableSubject.create(); + CompletableSubject cs = CompletableSubject.create(); - assertFalse(ms.hasComplete()); - assertFalse(ms.hasThrowable()); - assertNull(ms.getThrowable()); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertFalse(cs.hasComplete()); + assertFalse(cs.hasThrowable()); + assertNull(cs.getThrowable()); + assertFalse(cs.hasObservers()); + assertEquals(0, cs.observerCount()); - TestObserver to = ms.test(); + TestObserver to = cs.test(); to.assertEmpty(); - assertTrue(ms.hasObservers()); - assertEquals(1, ms.observerCount()); + assertTrue(cs.hasObservers()); + assertEquals(1, cs.observerCount()); - ms.onComplete(); + cs.onComplete(); - assertTrue(ms.hasComplete()); - assertFalse(ms.hasThrowable()); - assertNull(ms.getThrowable()); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertTrue(cs.hasComplete()); + assertFalse(cs.hasThrowable()); + assertNull(cs.getThrowable()); + assertFalse(cs.hasObservers()); + assertEquals(0, cs.observerCount()); to.assertResult(); - ms.test().assertResult(); + cs.test().assertResult(); - assertTrue(ms.hasComplete()); - assertFalse(ms.hasThrowable()); - assertNull(ms.getThrowable()); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertTrue(cs.hasComplete()); + assertFalse(cs.hasThrowable()); + assertNull(cs.getThrowable()); + assertFalse(cs.hasObservers()); + assertEquals(0, cs.observerCount()); } @Test public void nullThrowable() { - CompletableSubject ms = CompletableSubject.create(); + CompletableSubject cs = CompletableSubject.create(); - TestObserver to = ms.test(); + TestObserver to = cs.test(); - ms.onError(null); + cs.onError(null); to.assertFailure(NullPointerException.class); } @@ -141,11 +141,11 @@ public void cancelOnArrival() { @Test public void cancelOnArrival2() { - CompletableSubject ms = CompletableSubject.create(); + CompletableSubject cs = CompletableSubject.create(); - ms.test(); + cs.test(); - ms + cs .test(true) .assertEmpty(); } @@ -183,19 +183,19 @@ public void onComplete() { @Test public void onSubscribeDispose() { - CompletableSubject ms = CompletableSubject.create(); + CompletableSubject cs = CompletableSubject.create(); Disposable d = Disposables.empty(); - ms.onSubscribe(d); + cs.onSubscribe(d); assertFalse(d.isDisposed()); - ms.onComplete(); + cs.onComplete(); d = Disposables.empty(); - ms.onSubscribe(d); + cs.onSubscribe(d); assertTrue(d.isDisposed()); } @@ -203,14 +203,14 @@ public void onSubscribeDispose() { @Test public void addRemoveRace() { for (int i = 0; i < 500; i++) { - final CompletableSubject ms = CompletableSubject.create(); + final CompletableSubject cs = CompletableSubject.create(); - final TestObserver to = ms.test(); + final TestObserver to = cs.test(); Runnable r1 = new Runnable() { @Override public void run() { - ms.test(); + cs.test(); } }; diff --git a/src/test/java/io/reactivex/subjects/SingleSubjectTest.java b/src/test/java/io/reactivex/subjects/SingleSubjectTest.java index 558a58c7df..92affc7cf3 100644 --- a/src/test/java/io/reactivex/subjects/SingleSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/SingleSubjectTest.java @@ -30,55 +30,55 @@ public class SingleSubjectTest { @Test public void success() { - SingleSubject ms = SingleSubject.create(); + SingleSubject ss = SingleSubject.create(); - assertFalse(ms.hasValue()); - assertNull(ms.getValue()); - assertFalse(ms.hasThrowable()); - assertNull(ms.getThrowable()); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertFalse(ss.hasValue()); + assertNull(ss.getValue()); + assertFalse(ss.hasThrowable()); + assertNull(ss.getThrowable()); + assertFalse(ss.hasObservers()); + assertEquals(0, ss.observerCount()); - TestObserver to = ms.test(); + TestObserver to = ss.test(); to.assertEmpty(); - assertTrue(ms.hasObservers()); - assertEquals(1, ms.observerCount()); + assertTrue(ss.hasObservers()); + assertEquals(1, ss.observerCount()); - ms.onSuccess(1); + ss.onSuccess(1); - assertTrue(ms.hasValue()); - assertEquals(1, ms.getValue().intValue()); - assertFalse(ms.hasThrowable()); - assertNull(ms.getThrowable()); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertTrue(ss.hasValue()); + assertEquals(1, ss.getValue().intValue()); + assertFalse(ss.hasThrowable()); + assertNull(ss.getThrowable()); + assertFalse(ss.hasObservers()); + assertEquals(0, ss.observerCount()); to.assertResult(1); - ms.test().assertResult(1); + ss.test().assertResult(1); - assertTrue(ms.hasValue()); - assertEquals(1, ms.getValue().intValue()); - assertFalse(ms.hasThrowable()); - assertNull(ms.getThrowable()); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertTrue(ss.hasValue()); + assertEquals(1, ss.getValue().intValue()); + assertFalse(ss.hasThrowable()); + assertNull(ss.getThrowable()); + assertFalse(ss.hasObservers()); + assertEquals(0, ss.observerCount()); } @Test public void once() { - SingleSubject ms = SingleSubject.create(); + SingleSubject ss = SingleSubject.create(); - TestObserver to = ms.test(); + TestObserver to = ss.test(); - ms.onSuccess(1); - ms.onSuccess(2); + ss.onSuccess(1); + ss.onSuccess(2); List errors = TestHelper.trackPluginErrors(); try { - ms.onError(new IOException()); + ss.onError(new IOException()); TestHelper.assertError(errors, 0, IOException.class); } finally { @@ -90,61 +90,61 @@ public void once() { @Test public void error() { - SingleSubject ms = SingleSubject.create(); + SingleSubject ss = SingleSubject.create(); - assertFalse(ms.hasValue()); - assertNull(ms.getValue()); - assertFalse(ms.hasThrowable()); - assertNull(ms.getThrowable()); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertFalse(ss.hasValue()); + assertNull(ss.getValue()); + assertFalse(ss.hasThrowable()); + assertNull(ss.getThrowable()); + assertFalse(ss.hasObservers()); + assertEquals(0, ss.observerCount()); - TestObserver to = ms.test(); + TestObserver to = ss.test(); to.assertEmpty(); - assertTrue(ms.hasObservers()); - assertEquals(1, ms.observerCount()); + assertTrue(ss.hasObservers()); + assertEquals(1, ss.observerCount()); - ms.onError(new IOException()); + ss.onError(new IOException()); - assertFalse(ms.hasValue()); - assertNull(ms.getValue()); - assertTrue(ms.hasThrowable()); - assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertFalse(ss.hasValue()); + assertNull(ss.getValue()); + assertTrue(ss.hasThrowable()); + assertTrue(ss.getThrowable().toString(), ss.getThrowable() instanceof IOException); + assertFalse(ss.hasObservers()); + assertEquals(0, ss.observerCount()); to.assertFailure(IOException.class); - ms.test().assertFailure(IOException.class); + ss.test().assertFailure(IOException.class); - assertFalse(ms.hasValue()); - assertNull(ms.getValue()); - assertTrue(ms.hasThrowable()); - assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); - assertFalse(ms.hasObservers()); - assertEquals(0, ms.observerCount()); + assertFalse(ss.hasValue()); + assertNull(ss.getValue()); + assertTrue(ss.hasThrowable()); + assertTrue(ss.getThrowable().toString(), ss.getThrowable() instanceof IOException); + assertFalse(ss.hasObservers()); + assertEquals(0, ss.observerCount()); } @Test public void nullValue() { - SingleSubject ms = SingleSubject.create(); + SingleSubject ss = SingleSubject.create(); - TestObserver to = ms.test(); + TestObserver to = ss.test(); - ms.onSuccess(null); + ss.onSuccess(null); to.assertFailure(NullPointerException.class); } @Test public void nullThrowable() { - SingleSubject ms = SingleSubject.create(); + SingleSubject ss = SingleSubject.create(); - TestObserver to = ms.test(); + TestObserver to = ss.test(); - ms.onError(null); + ss.onError(null); to.assertFailure(NullPointerException.class); } @@ -158,11 +158,11 @@ public void cancelOnArrival() { @Test public void cancelOnArrival2() { - SingleSubject ms = SingleSubject.create(); + SingleSubject ss = SingleSubject.create(); - ms.test(); + ss.test(); - ms + ss .test(true) .assertEmpty(); } @@ -200,19 +200,19 @@ public void onError(Throwable e) { @Test public void onSubscribeDispose() { - SingleSubject ms = SingleSubject.create(); + SingleSubject ss = SingleSubject.create(); Disposable d = Disposables.empty(); - ms.onSubscribe(d); + ss.onSubscribe(d); assertFalse(d.isDisposed()); - ms.onSuccess(1); + ss.onSuccess(1); d = Disposables.empty(); - ms.onSubscribe(d); + ss.onSubscribe(d); assertTrue(d.isDisposed()); } @@ -220,14 +220,14 @@ public void onSubscribeDispose() { @Test public void addRemoveRace() { for (int i = 0; i < 500; i++) { - final SingleSubject ms = SingleSubject.create(); + final SingleSubject ss = SingleSubject.create(); - final TestObserver to = ms.test(); + final TestObserver to = ss.test(); Runnable r1 = new Runnable() { @Override public void run() { - ms.test(); + ss.test(); } };