From fa58d36375ecc084b21bca299ca4780946b15dc5 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 1 Apr 2017 21:11:23 +0200 Subject: [PATCH] More nullability annotations (#5251) * More nullability annotations * Refactored imports * Changes based on akarnokd's review * A few more annotations * Changes based on akarnokd's 2nd review --- src/main/java/io/reactivex/Notification.java | 10 ++++- src/main/java/io/reactivex/Observer.java | 7 ++-- src/main/java/io/reactivex/Single.java | 5 +-- .../disposables/ActionDisposable.java | 3 +- .../disposables/CompositeDisposable.java | 13 +++--- .../io/reactivex/disposables/Disposables.java | 21 ++++++---- .../disposables/ReferenceDisposable.java | 3 +- .../disposables/RunnableDisposable.java | 4 +- .../disposables/SerialDisposable.java | 10 +++-- .../disposables/SubscriptionDisposable.java | 3 +- .../exceptions/CompositeException.java | 9 ++++- .../io/reactivex/exceptions/Exceptions.java | 6 ++- .../OnErrorNotImplementedException.java | 6 +-- .../flowables/ConnectableFlowable.java | 9 ++++- .../reactivex/flowables/GroupedFlowable.java | 6 ++- .../disposables/DisposableHelper.java | 1 + .../observables/ConnectableObservable.java | 9 ++++- .../observables/GroupedObservable.java | 6 ++- .../reactivex/observers/DefaultObserver.java | 3 +- .../DisposableCompletableObserver.java | 3 +- .../observers/DisposableMaybeObserver.java | 4 +- .../observers/DisposableObserver.java | 6 ++- .../observers/DisposableSingleObserver.java | 4 +- .../ResourceCompletableObserver.java | 8 ++-- .../observers/ResourceMaybeObserver.java | 8 ++-- .../reactivex/observers/ResourceObserver.java | 3 +- .../observers/ResourceSingleObserver.java | 8 ++-- .../io/reactivex/observers/SafeObserver.java | 9 +++-- .../observers/SerializedObserver.java | 11 ++--- .../reactivex/parallel/ParallelFlowable.java | 40 ++++++++++++++++++- .../reactivex/processors/AsyncProcessor.java | 6 +-- .../processors/FlowableProcessor.java | 5 ++- .../java/io/reactivex/schedulers/Timed.java | 7 +++- .../io/reactivex/subjects/SingleSubject.java | 15 ++++--- .../java/io/reactivex/subjects/Subject.java | 3 ++ 35 files changed, 187 insertions(+), 87 deletions(-) diff --git a/src/main/java/io/reactivex/Notification.java b/src/main/java/io/reactivex/Notification.java index c0ad0b6f7b..84ceb38a88 100644 --- a/src/main/java/io/reactivex/Notification.java +++ b/src/main/java/io/reactivex/Notification.java @@ -13,6 +13,7 @@ package io.reactivex; +import io.reactivex.annotations.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.util.NotificationLite; @@ -66,6 +67,7 @@ public boolean isOnNext() { * @see #isOnNext() */ @SuppressWarnings("unchecked") + @Nullable public T getValue() { Object o = value; if (o != null && !NotificationLite.isError(o)) { @@ -80,6 +82,7 @@ public T getValue() { * @return the Throwable error contained or null * @see #isOnError() */ + @Nullable public Throwable getError() { Object o = value; if (NotificationLite.isError(o)) { @@ -122,7 +125,8 @@ public String toString() { * @return the new Notification instance * @throws NullPointerException if value is null */ - public static Notification createOnNext(T value) { + @NonNull + public static Notification createOnNext(@NonNull T value) { ObjectHelper.requireNonNull(value, "value is null"); return new Notification(value); } @@ -134,7 +138,8 @@ public static Notification createOnNext(T value) { * @return the new Notification instance * @throws NullPointerException if error is null */ - public static Notification createOnError(Throwable error) { + @NonNull + public static Notification createOnError(@NonNull Throwable error) { ObjectHelper.requireNonNull(error, "error is null"); return new Notification(NotificationLite.error(error)); } @@ -146,6 +151,7 @@ public static Notification createOnError(Throwable error) { * @return the shared Notification instance representing an onComplete signal */ @SuppressWarnings("unchecked") + @NonNull public static Notification createOnComplete() { return (Notification)COMPLETE; } diff --git a/src/main/java/io/reactivex/Observer.java b/src/main/java/io/reactivex/Observer.java index 55b38dc0d7..a383d04a27 100644 --- a/src/main/java/io/reactivex/Observer.java +++ b/src/main/java/io/reactivex/Observer.java @@ -13,6 +13,7 @@ package io.reactivex; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; /** @@ -40,7 +41,7 @@ public interface Observer { * be called anytime to cancel the connection * @since 2.0 */ - void onSubscribe(Disposable d); + void onSubscribe(@NonNull Disposable d); /** * Provides the Observer with a new item to observe. @@ -53,7 +54,7 @@ public interface Observer { * @param t * the item emitted by the Observable */ - void onNext(T t); + void onNext(@NonNull T t); /** * Notifies the Observer that the {@link Observable} has experienced an error condition. @@ -64,7 +65,7 @@ public interface Observer { * @param e * the exception encountered by the Observable */ - void onError(Throwable e); + void onError(@NonNull Throwable e); /** * Notifies the Observer that the {@link Observable} has finished sending push-based notifications. diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index f628e463b4..3cef7c9aa4 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -16,8 +16,6 @@ import java.util.NoSuchElementException; import java.util.concurrent.*; -import org.reactivestreams.Publisher; - import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; @@ -34,6 +32,7 @@ import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; +import org.reactivestreams.Publisher; /** * The Single class implements the Reactive Pattern for a single value response. @@ -2714,7 +2713,7 @@ public final void subscribe(SingleObserver subscriber) { * Override this method in subclasses to handle the incoming SingleObservers. * @param observer the SingleObserver to handle, not null */ - protected abstract void subscribeActual(SingleObserver observer); + protected abstract void subscribeActual(@NonNull SingleObserver observer); /** * Subscribes a given SingleObserver (subclass) to this Single and returns the given diff --git a/src/main/java/io/reactivex/disposables/ActionDisposable.java b/src/main/java/io/reactivex/disposables/ActionDisposable.java index d112a4e3a1..447dfe2e34 100644 --- a/src/main/java/io/reactivex/disposables/ActionDisposable.java +++ b/src/main/java/io/reactivex/disposables/ActionDisposable.java @@ -12,6 +12,7 @@ */ package io.reactivex.disposables; +import io.reactivex.annotations.NonNull; import io.reactivex.functions.Action; import io.reactivex.internal.util.ExceptionHelper; @@ -24,7 +25,7 @@ final class ActionDisposable extends ReferenceDisposable { } @Override - protected void onDisposed(Action value) { + protected void onDisposed(@NonNull Action value) { try { value.run(); } catch (Throwable ex) { diff --git a/src/main/java/io/reactivex/disposables/CompositeDisposable.java b/src/main/java/io/reactivex/disposables/CompositeDisposable.java index 3e02288f2e..f9bf2d9e84 100644 --- a/src/main/java/io/reactivex/disposables/CompositeDisposable.java +++ b/src/main/java/io/reactivex/disposables/CompositeDisposable.java @@ -14,6 +14,7 @@ import java.util.*; +import io.reactivex.annotations.NonNull; import io.reactivex.exceptions.*; import io.reactivex.internal.disposables.DisposableContainer; import io.reactivex.internal.functions.ObjectHelper; @@ -39,7 +40,7 @@ public CompositeDisposable() { * Creates a CompositeDisposables with the given array of initial elements. * @param resources the array of Disposables to start with */ - public CompositeDisposable(Disposable... resources) { + public CompositeDisposable(@NonNull Disposable... resources) { ObjectHelper.requireNonNull(resources, "resources is null"); this.resources = new OpenHashSet(resources.length + 1); for (Disposable d : resources) { @@ -52,7 +53,7 @@ public CompositeDisposable(Disposable... resources) { * Creates a CompositeDisposables with the given Iterable sequence of initial elements. * @param resources the Iterable sequence of Disposables to start with */ - public CompositeDisposable(Iterable resources) { + public CompositeDisposable(@NonNull Iterable resources) { ObjectHelper.requireNonNull(resources, "resources is null"); this.resources = new OpenHashSet(); for (Disposable d : resources) { @@ -85,7 +86,7 @@ public boolean isDisposed() { } @Override - public boolean add(Disposable d) { + public boolean add(@NonNull Disposable d) { ObjectHelper.requireNonNull(d, "d is null"); if (!disposed) { synchronized (this) { @@ -110,7 +111,7 @@ public boolean add(Disposable d) { * @param ds the array of Disposables * @return true if the operation was successful, false if the container has been disposed */ - public boolean addAll(Disposable... ds) { + public boolean addAll(@NonNull Disposable... ds) { ObjectHelper.requireNonNull(ds, "ds is null"); if (!disposed) { synchronized (this) { @@ -135,7 +136,7 @@ public boolean addAll(Disposable... ds) { } @Override - public boolean remove(Disposable d) { + public boolean remove(@NonNull Disposable d) { if (delete(d)) { d.dispose(); return true; @@ -144,7 +145,7 @@ public boolean remove(Disposable d) { } @Override - public boolean delete(Disposable d) { + public boolean delete(@NonNull Disposable d) { ObjectHelper.requireNonNull(d, "Disposable item is null"); if (disposed) { return false; diff --git a/src/main/java/io/reactivex/disposables/Disposables.java b/src/main/java/io/reactivex/disposables/Disposables.java index cac17cad7a..7fbac0f63e 100644 --- a/src/main/java/io/reactivex/disposables/Disposables.java +++ b/src/main/java/io/reactivex/disposables/Disposables.java @@ -15,11 +15,11 @@ import java.util.concurrent.Future; -import org.reactivestreams.Subscription; - +import io.reactivex.annotations.NonNull; import io.reactivex.functions.Action; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.*; +import org.reactivestreams.Subscription; /** * Utility class to help create disposables by wrapping @@ -38,7 +38,8 @@ private Disposables() { * @param run the Runnable to wrap * @return the new Disposable instance */ - public static Disposable fromRunnable(Runnable run) { + @NonNull + public static Disposable fromRunnable(@NonNull Runnable run) { ObjectHelper.requireNonNull(run, "run is null"); return new RunnableDisposable(run); } @@ -49,7 +50,8 @@ public static Disposable fromRunnable(Runnable run) { * @param run the Action to wrap * @return the new Disposable instance */ - public static Disposable fromAction(Action run) { + @NonNull + public static Disposable fromAction(@NonNull Action run) { ObjectHelper.requireNonNull(run, "run is null"); return new ActionDisposable(run); } @@ -60,7 +62,8 @@ public static Disposable fromAction(Action run) { * @param future the Future to wrap * @return the new Disposable instance */ - public static Disposable fromFuture(Future future) { + @NonNull + public static Disposable fromFuture(@NonNull Future future) { ObjectHelper.requireNonNull(future, "future is null"); return fromFuture(future, true); } @@ -72,7 +75,8 @@ public static Disposable fromFuture(Future future) { * @param allowInterrupt if true, the future cancel happens via Future.cancel(true) * @return the new Disposable instance */ - public static Disposable fromFuture(Future future, boolean allowInterrupt) { + @NonNull + public static Disposable fromFuture(@NonNull Future future, boolean allowInterrupt) { ObjectHelper.requireNonNull(future, "future is null"); return new FutureDisposable(future, allowInterrupt); } @@ -83,7 +87,8 @@ public static Disposable fromFuture(Future future, boolean allowInterrupt) { * @param subscription the Runnable to wrap * @return the new Disposable instance */ - public static Disposable fromSubscription(Subscription subscription) { + @NonNull + public static Disposable fromSubscription(@NonNull Subscription subscription) { ObjectHelper.requireNonNull(subscription, "subscription is null"); return new SubscriptionDisposable(subscription); } @@ -92,6 +97,7 @@ public static Disposable fromSubscription(Subscription subscription) { * Returns a new, non-disposed Disposable instance. * @return a new, non-disposed Disposable instance */ + @NonNull public static Disposable empty() { return fromRunnable(Functions.EMPTY_RUNNABLE); } @@ -100,6 +106,7 @@ public static Disposable empty() { * Returns a disposed Disposable instance. * @return a disposed Disposable instance */ + @NonNull public static Disposable disposed() { return EmptyDisposable.INSTANCE; } diff --git a/src/main/java/io/reactivex/disposables/ReferenceDisposable.java b/src/main/java/io/reactivex/disposables/ReferenceDisposable.java index a21449a403..100b1848ed 100644 --- a/src/main/java/io/reactivex/disposables/ReferenceDisposable.java +++ b/src/main/java/io/reactivex/disposables/ReferenceDisposable.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.annotations.NonNull; import io.reactivex.internal.functions.ObjectHelper; /** @@ -31,7 +32,7 @@ abstract class ReferenceDisposable extends AtomicReference implements Disp super(ObjectHelper.requireNonNull(value, "value is null")); } - protected abstract void onDisposed(T value); + protected abstract void onDisposed(@NonNull T value); @Override public final void dispose() { diff --git a/src/main/java/io/reactivex/disposables/RunnableDisposable.java b/src/main/java/io/reactivex/disposables/RunnableDisposable.java index 417d55088c..a70df60317 100644 --- a/src/main/java/io/reactivex/disposables/RunnableDisposable.java +++ b/src/main/java/io/reactivex/disposables/RunnableDisposable.java @@ -12,6 +12,8 @@ */ package io.reactivex.disposables; +import io.reactivex.annotations.NonNull; + /** * A disposable container that manages a Runnable instance. */ @@ -24,7 +26,7 @@ final class RunnableDisposable extends ReferenceDisposable { } @Override - protected void onDisposed(Runnable value) { + protected void onDisposed(@NonNull Runnable value) { value.run(); } diff --git a/src/main/java/io/reactivex/disposables/SerialDisposable.java b/src/main/java/io/reactivex/disposables/SerialDisposable.java index 96beb94b55..d5063928a2 100644 --- a/src/main/java/io/reactivex/disposables/SerialDisposable.java +++ b/src/main/java/io/reactivex/disposables/SerialDisposable.java @@ -15,7 +15,8 @@ import java.util.concurrent.atomic.AtomicReference; -import io.reactivex.internal.disposables.*; +import io.reactivex.annotations.Nullable; +import io.reactivex.internal.disposables.DisposableHelper; /** * A Disposable container that allows atomically updating/replacing the contained @@ -36,7 +37,7 @@ public SerialDisposable() { * Constructs a SerialDisposable with the given initial Disposable instance. * @param initialDisposable the initial Disposable instance to use, null allowed */ - public SerialDisposable(Disposable initialDisposable) { + public SerialDisposable(@Nullable Disposable initialDisposable) { this.resource = new AtomicReference(initialDisposable); } @@ -47,7 +48,7 @@ public SerialDisposable(Disposable initialDisposable) { * @return true if the operation succeeded, false if the container has been disposed * @see #replace(Disposable) */ - public boolean set(Disposable next) { + public boolean set(@Nullable Disposable next) { return DisposableHelper.set(resource, next); } @@ -58,7 +59,7 @@ public boolean set(Disposable next) { * @return true if the operation succeeded, false if the container has been disposed * @see #set(Disposable) */ - public boolean replace(Disposable next) { + public boolean replace(@Nullable Disposable next) { return DisposableHelper.replace(resource, next); } @@ -66,6 +67,7 @@ public boolean replace(Disposable next) { * Returns the currently contained Disposable or null if this container is empty. * @return the current Disposable, may be null */ + @Nullable public Disposable get() { Disposable d = resource.get(); if (d == DisposableHelper.DISPOSED) { diff --git a/src/main/java/io/reactivex/disposables/SubscriptionDisposable.java b/src/main/java/io/reactivex/disposables/SubscriptionDisposable.java index 54b53b18c6..ebf8934a37 100644 --- a/src/main/java/io/reactivex/disposables/SubscriptionDisposable.java +++ b/src/main/java/io/reactivex/disposables/SubscriptionDisposable.java @@ -12,6 +12,7 @@ */ package io.reactivex.disposables; +import io.reactivex.annotations.NonNull; import org.reactivestreams.Subscription; /** @@ -26,7 +27,7 @@ final class SubscriptionDisposable extends ReferenceDisposable { } @Override - protected void onDisposed(Subscription value) { + protected void onDisposed(@NonNull Subscription value) { value.cancel(); } } diff --git a/src/main/java/io/reactivex/exceptions/CompositeException.java b/src/main/java/io/reactivex/exceptions/CompositeException.java index bcd5baffbb..0b18f8ce3f 100644 --- a/src/main/java/io/reactivex/exceptions/CompositeException.java +++ b/src/main/java/io/reactivex/exceptions/CompositeException.java @@ -18,6 +18,8 @@ import java.io.*; import java.util.*; +import io.reactivex.annotations.NonNull; + /** * Represents an exception that is a composite of one or more other exceptions. A {@code CompositeException} * does not modify the structure of any exception it wraps, but at print-time it iterates through the list of @@ -47,7 +49,7 @@ public final class CompositeException extends RuntimeException { * * @throws IllegalArgumentException if exceptions is empty. */ - public CompositeException(Throwable... exceptions) { + public CompositeException(@NonNull Throwable... exceptions) { this(exceptions == null ? Collections.singletonList(new NullPointerException("exceptions was null")) : Arrays.asList(exceptions)); } @@ -59,7 +61,7 @@ public CompositeException(Throwable... exceptions) { * * @throws IllegalArgumentException if errors is empty. */ - public CompositeException(Iterable errors) { + public CompositeException(@NonNull Iterable errors) { Set deDupedExceptions = new LinkedHashSet(); List localExceptions = new ArrayList(); if (errors != null) { @@ -89,16 +91,19 @@ public CompositeException(Iterable errors) { * * @return the exceptions that make up the {@code CompositeException}, as a {@link List} of {@link Throwable}s */ + @NonNull public List getExceptions() { return exceptions; } @Override + @NonNull public String getMessage() { return message; } @Override + @NonNull public synchronized Throwable getCause() { // NOPMD if (cause == null) { // we lazily generate this causal chain if this is called diff --git a/src/main/java/io/reactivex/exceptions/Exceptions.java b/src/main/java/io/reactivex/exceptions/Exceptions.java index cad41e6e51..42c2aa360d 100644 --- a/src/main/java/io/reactivex/exceptions/Exceptions.java +++ b/src/main/java/io/reactivex/exceptions/Exceptions.java @@ -13,6 +13,7 @@ package io.reactivex.exceptions; +import io.reactivex.annotations.*; import io.reactivex.internal.util.ExceptionHelper; /** @@ -32,7 +33,8 @@ private Exceptions() { * @return because {@code propagate} itself throws an exception or error, this is a sort of phantom return * value; {@code propagate} does not actually return anything */ - public static RuntimeException propagate(Throwable t) { + @NonNull + public static RuntimeException propagate(@NonNull Throwable t) { /* * The return type of RuntimeException is a trick for code to be like this: * @@ -61,7 +63,7 @@ public static RuntimeException propagate(Throwable t) { * the {@code Throwable} to test and perhaps throw * @see RxJava: StackOverflowError is swallowed (Issue #748) */ - public static void throwIfFatal(Throwable t) { + public static void throwIfFatal(@NonNull Throwable t) { // values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495 if (t instanceof VirtualMachineError) { throw (VirtualMachineError) t; diff --git a/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java b/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java index dc61f2eea8..13488f79b4 100644 --- a/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java +++ b/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java @@ -13,7 +13,7 @@ package io.reactivex.exceptions; -import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.*; /** * Represents an exception used to signal to the {@code RxJavaPlugins.onError()} that a @@ -35,7 +35,7 @@ public final class OnErrorNotImplementedException extends RuntimeException { * @param e * the {@code Throwable} to signal; if null, a NullPointerException is constructed */ - public OnErrorNotImplementedException(String message, Throwable e) { + public OnErrorNotImplementedException(String message, @NonNull Throwable e) { super(message, e != null ? e : new NullPointerException()); } @@ -47,7 +47,7 @@ public OnErrorNotImplementedException(String message, Throwable e) { * @param e * the {@code Throwable} to signal; if null, a NullPointerException is constructed */ - public OnErrorNotImplementedException(Throwable e) { + public OnErrorNotImplementedException(@NonNull Throwable e) { super(e != null ? e.getMessage() : null, e != null ? e : new NullPointerException()); } } \ No newline at end of file diff --git a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java index fb74764935..41039682f8 100644 --- a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java +++ b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java @@ -13,6 +13,7 @@ package io.reactivex.flowables; +import io.reactivex.annotations.NonNull; import org.reactivestreams.Subscriber; import io.reactivex.Flowable; @@ -47,7 +48,7 @@ public abstract class ConnectableFlowable extends Flowable { * allowing the caller to synchronously disconnect a synchronous source * @see ReactiveX documentation: Connect */ - public abstract void connect(Consumer connection); + public abstract void connect(@NonNull Consumer connection); /** * Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying @@ -71,6 +72,7 @@ public final Disposable connect() { * @return a {@link Flowable} * @see ReactiveX documentation: RefCount */ + @NonNull public Flowable refCount() { return RxJavaPlugins.onAssembly(new FlowableRefCount(this)); } @@ -82,6 +84,7 @@ public Flowable refCount() { * @return an Observable that automatically connects to this ConnectableObservable * when the first Subscriber subscribes */ + @NonNull public Flowable autoConnect() { return autoConnect(1); } @@ -95,6 +98,7 @@ public Flowable autoConnect() { * @return an Observable that automatically connects to this ConnectableObservable * when the specified number of Subscribers subscribe to it */ + @NonNull public Flowable autoConnect(int numberOfSubscribers) { return autoConnect(numberOfSubscribers, Functions.emptyConsumer()); } @@ -113,7 +117,8 @@ public Flowable autoConnect(int numberOfSubscribers) { * when the specified number of Subscribers subscribe to it and calls the * specified callback with the Subscription associated with the established connection */ - public Flowable autoConnect(int numberOfSubscribers, Consumer connection) { + @NonNull + public Flowable autoConnect(int numberOfSubscribers, @NonNull Consumer connection) { if (numberOfSubscribers <= 0) { this.connect(connection); return RxJavaPlugins.onAssembly(this); diff --git a/src/main/java/io/reactivex/flowables/GroupedFlowable.java b/src/main/java/io/reactivex/flowables/GroupedFlowable.java index 4ffd71ad14..c7eb4dc121 100644 --- a/src/main/java/io/reactivex/flowables/GroupedFlowable.java +++ b/src/main/java/io/reactivex/flowables/GroupedFlowable.java @@ -13,6 +13,7 @@ package io.reactivex.flowables; import io.reactivex.Flowable; +import io.reactivex.annotations.Nullable; /** * A {@link Flowable} that has been grouped by key, the value of which can be obtained with {@link #getKey()}. @@ -30,14 +31,14 @@ * @see ReactiveX documentation: GroupBy */ public abstract class GroupedFlowable extends Flowable { - + final K key; /** * Constructs a GroupedFlowable with the given key. * @param key the key */ - protected GroupedFlowable(K key) { + protected GroupedFlowable(@Nullable K key) { this.key = key; } @@ -46,6 +47,7 @@ protected GroupedFlowable(K key) { * * @return the key that the items emitted by this {@code GroupedObservable} were grouped by */ + @Nullable public K getKey() { return key; } diff --git a/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java b/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java index 46f13bd743..f9ad177399 100644 --- a/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java +++ b/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java @@ -20,6 +20,7 @@ import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.plugins.RxJavaPlugins; + /** * Utility methods for working with Disposables atomically. */ diff --git a/src/main/java/io/reactivex/observables/ConnectableObservable.java b/src/main/java/io/reactivex/observables/ConnectableObservable.java index cb66875298..2fed7634eb 100644 --- a/src/main/java/io/reactivex/observables/ConnectableObservable.java +++ b/src/main/java/io/reactivex/observables/ConnectableObservable.java @@ -13,6 +13,7 @@ package io.reactivex.observables; +import io.reactivex.annotations.NonNull; import org.reactivestreams.Subscriber; import io.reactivex.*; @@ -47,7 +48,7 @@ public abstract class ConnectableObservable extends Observable { * allowing the caller to synchronously disconnect a synchronous source * @see ReactiveX documentation: Connect */ - public abstract void connect(Consumer connection); + public abstract void connect(@NonNull Consumer connection); /** * Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying @@ -71,6 +72,7 @@ public final Disposable connect() { * @return an {@link Observable} * @see ReactiveX documentation: RefCount */ + @NonNull public Observable refCount() { return RxJavaPlugins.onAssembly(new ObservableRefCount(this)); } @@ -82,6 +84,7 @@ public Observable refCount() { * @return an Observable that automatically connects to this ConnectableObservable * when the first Subscriber subscribes */ + @NonNull public Observable autoConnect() { return autoConnect(1); } @@ -95,6 +98,7 @@ public Observable autoConnect() { * @return an Observable that automatically connects to this ConnectableObservable * when the specified number of Subscribers subscribe to it */ + @NonNull public Observable autoConnect(int numberOfSubscribers) { return autoConnect(numberOfSubscribers, Functions.emptyConsumer()); } @@ -113,7 +117,8 @@ public Observable autoConnect(int numberOfSubscribers) { * when the specified number of Subscribers subscribe to it and calls the * specified callback with the Subscription associated with the established connection */ - public Observable autoConnect(int numberOfSubscribers, Consumer connection) { + @NonNull + public Observable autoConnect(int numberOfSubscribers, @NonNull Consumer connection) { if (numberOfSubscribers <= 0) { this.connect(connection); return RxJavaPlugins.onAssembly(this); diff --git a/src/main/java/io/reactivex/observables/GroupedObservable.java b/src/main/java/io/reactivex/observables/GroupedObservable.java index 2641ad43f7..a9673715dd 100644 --- a/src/main/java/io/reactivex/observables/GroupedObservable.java +++ b/src/main/java/io/reactivex/observables/GroupedObservable.java @@ -13,6 +13,7 @@ package io.reactivex.observables; import io.reactivex.Observable; +import io.reactivex.annotations.Nullable; /** * An {@link Observable} that has been grouped by key, the value of which can be obtained with {@link #getKey()}. @@ -30,14 +31,14 @@ * @see ReactiveX documentation: GroupBy */ public abstract class GroupedObservable extends Observable { - + final K key; /** * Constructs a GroupedObservable with the given key. * @param key the key */ - protected GroupedObservable(K key) { + protected GroupedObservable(@Nullable K key) { this.key = key; } @@ -46,6 +47,7 @@ protected GroupedObservable(K key) { * * @return the key that the items emitted by this {@code GroupedObservable} were grouped by */ + @Nullable public K getKey() { return key; } diff --git a/src/main/java/io/reactivex/observers/DefaultObserver.java b/src/main/java/io/reactivex/observers/DefaultObserver.java index 096388c2e9..c0ef8475c3 100644 --- a/src/main/java/io/reactivex/observers/DefaultObserver.java +++ b/src/main/java/io/reactivex/observers/DefaultObserver.java @@ -14,6 +14,7 @@ package io.reactivex.observers; import io.reactivex.Observer; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; @@ -65,7 +66,7 @@ public abstract class DefaultObserver implements Observer { private Disposable s; @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; onStart(); diff --git a/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java index b0a1c18914..e0d6902a76 100644 --- a/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.CompletableObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; @@ -53,7 +54,7 @@ public abstract class DisposableCompletableObserver implements CompletableObserv final AtomicReference s = new AtomicReference(); @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java index f4d980f060..09686adc3d 100644 --- a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.MaybeObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; @@ -60,10 +61,11 @@ * @param the received value type */ public abstract class DisposableMaybeObserver implements MaybeObserver, Disposable { + final AtomicReference s = new AtomicReference(); @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/DisposableObserver.java b/src/main/java/io/reactivex/observers/DisposableObserver.java index da863b1fa5..7d22b8201b 100644 --- a/src/main/java/io/reactivex/observers/DisposableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableObserver.java @@ -16,8 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.Observer; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.*; +import io.reactivex.internal.disposables.DisposableHelper; /** * An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable. @@ -63,10 +64,11 @@ * @param the received value type */ public abstract class DisposableObserver implements Observer, Disposable { + final AtomicReference s = new AtomicReference(); @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java index dd5f0aab09..0515f458ac 100644 --- a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.SingleObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; @@ -52,10 +53,11 @@ * @param the received value type */ public abstract class DisposableSingleObserver implements SingleObserver, Disposable { + final AtomicReference s = new AtomicReference(); @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java index b892abbf62..58906dcb6c 100644 --- a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java @@ -16,9 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.CompletableObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.disposables.ListCompositeDisposable; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; /** @@ -85,13 +85,13 @@ public abstract class ResourceCompletableObserver implements CompletableObserver * * @throws NullPointerException if resource is null */ - public final void add(Disposable resource) { + public final void add(@NonNull Disposable resource) { ObjectHelper.requireNonNull(resource, "resource is null"); resources.add(resource); } @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java index c12e9e2417..4a0287e01f 100644 --- a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java @@ -16,9 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.MaybeObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.disposables.ListCompositeDisposable; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; /** @@ -95,13 +95,13 @@ public abstract class ResourceMaybeObserver implements MaybeObserver, Disp * * @throws NullPointerException if resource is null */ - public final void add(Disposable resource) { + public final void add(@NonNull Disposable resource) { ObjectHelper.requireNonNull(resource, "resource is null"); resources.add(resource); } @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/ResourceObserver.java b/src/main/java/io/reactivex/observers/ResourceObserver.java index ef7fd703fd..834b73674a 100644 --- a/src/main/java/io/reactivex/observers/ResourceObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceObserver.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.Observer; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; @@ -92,7 +93,7 @@ public abstract class ResourceObserver implements Observer, Disposable { * * @throws NullPointerException if resource is null */ - public final void add(Disposable resource) { + public final void add(@NonNull Disposable resource) { ObjectHelper.requireNonNull(resource, "resource is null"); resources.add(resource); } diff --git a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java index 88efae5c74..4c18a139c2 100644 --- a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java @@ -16,9 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.SingleObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.disposables.ListCompositeDisposable; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; /** @@ -88,13 +88,13 @@ public abstract class ResourceSingleObserver implements SingleObserver, Di * * @throws NullPointerException if resource is null */ - public final void add(Disposable resource) { + public final void add(@NonNull Disposable resource) { ObjectHelper.requireNonNull(resource, "resource is null"); resources.add(resource); } @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/SafeObserver.java b/src/main/java/io/reactivex/observers/SafeObserver.java index 76edd194d3..a370f3799b 100644 --- a/src/main/java/io/reactivex/observers/SafeObserver.java +++ b/src/main/java/io/reactivex/observers/SafeObserver.java @@ -13,6 +13,7 @@ package io.reactivex.observers; import io.reactivex.Observer; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.internal.disposables.*; @@ -36,12 +37,12 @@ public final class SafeObserver implements Observer, Disposable { * Constructs a SafeObserver by wrapping the given actual Observer. * @param actual the actual Observer to wrap, not null (not validated) */ - public SafeObserver(Observer actual) { + public SafeObserver(@NonNull Observer actual) { this.actual = actual; } @Override - public void onSubscribe(Disposable s) { + public void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; try { @@ -74,7 +75,7 @@ public boolean isDisposed() { } @Override - public void onNext(T t) { + public void onNext(@NonNull T t) { if (done) { return; } @@ -134,7 +135,7 @@ void onNextNoSubscription() { } @Override - public void onError(Throwable t) { + public void onError(@NonNull Throwable t) { if (done) { RxJavaPlugins.onError(t); return; diff --git a/src/main/java/io/reactivex/observers/SerializedObserver.java b/src/main/java/io/reactivex/observers/SerializedObserver.java index 7b429415af..ec2061ba97 100644 --- a/src/main/java/io/reactivex/observers/SerializedObserver.java +++ b/src/main/java/io/reactivex/observers/SerializedObserver.java @@ -13,6 +13,7 @@ package io.reactivex.observers; import io.reactivex.Observer; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.util.*; @@ -46,7 +47,7 @@ public final class SerializedObserver implements Observer, Disposable { * Construct a SerializedObserver by wrapping the given actual Observer. * @param actual the actual Observer, not null (not verified) */ - public SerializedObserver(Observer actual) { + public SerializedObserver(@NonNull Observer actual) { this(actual, false); } @@ -57,13 +58,13 @@ public SerializedObserver(Observer actual) { * @param actual the actual Observer, not null (not verified) * @param delayError if true, errors are emitted after regular values have been emitted */ - public SerializedObserver(Observer actual, boolean delayError) { + public SerializedObserver(@NonNull Observer actual, boolean delayError) { this.actual = actual; this.delayError = delayError; } @Override - public void onSubscribe(Disposable s) { + public void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; @@ -84,7 +85,7 @@ public boolean isDisposed() { @Override - public void onNext(T t) { + public void onNext(@NonNull T t) { if (done) { return; } @@ -115,7 +116,7 @@ public void onNext(T t) { } @Override - public void onError(Throwable t) { + public void onError(@NonNull Throwable t) { if (done) { RxJavaPlugins.onError(t); return; diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index b308545be4..e42e2c78ad 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -16,8 +16,6 @@ import java.util.*; import java.util.concurrent.Callable; -import org.reactivestreams.*; - import io.reactivex.*; import io.reactivex.annotations.*; import io.reactivex.exceptions.Exceptions; @@ -27,6 +25,7 @@ import io.reactivex.internal.subscriptions.EmptySubscription; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; +import org.reactivestreams.*; /** * Abstract base class for Parallel publishers that take an array of Subscribers. @@ -112,6 +111,7 @@ public static ParallelFlowable from(@NonNull Publisher sourc * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public static ParallelFlowable from(@NonNull Publisher source, int parallelism, int prefetch) { ObjectHelper.requireNonNull(source, "source"); @@ -130,6 +130,7 @@ public static ParallelFlowable from(@NonNull Publisher sourc * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable map(@NonNull Function mapper) { ObjectHelper.requireNonNull(mapper, "mapper"); return RxJavaPlugins.onAssembly(new ParallelMap(this, mapper)); @@ -149,6 +150,7 @@ public final ParallelFlowable map(@NonNull Function ParallelFlowable map(@NonNull Function mapper, @NonNull ParallelFailureHandling errorHandler) { ObjectHelper.requireNonNull(mapper, "mapper"); ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); @@ -170,6 +172,7 @@ public final ParallelFlowable map(@NonNull Function ParallelFlowable map(@NonNull Function mapper, @NonNull BiFunction errorHandler) { ObjectHelper.requireNonNull(mapper, "mapper"); ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); @@ -249,6 +252,7 @@ public final ParallelFlowable filter(@NonNull Predicate predicate, * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable runOn(@NonNull Scheduler scheduler) { return runOn(scheduler, Flowable.bufferSize()); } @@ -275,6 +279,7 @@ public final ParallelFlowable runOn(@NonNull Scheduler scheduler) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable runOn(@NonNull Scheduler scheduler, int prefetch) { ObjectHelper.requireNonNull(scheduler, "scheduler"); ObjectHelper.verifyPositive(prefetch, "prefetch"); @@ -290,6 +295,7 @@ public final ParallelFlowable runOn(@NonNull Scheduler scheduler, int prefetc * @return the new Flowable instance emitting the reduced value or empty if the ParallelFlowable was empty */ @CheckReturnValue + @NonNull public final Flowable reduce(@NonNull BiFunction reducer) { ObjectHelper.requireNonNull(reducer, "reducer"); return RxJavaPlugins.onAssembly(new ParallelReduceFull(this, reducer)); @@ -307,6 +313,7 @@ public final Flowable reduce(@NonNull BiFunction reducer) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable reduce(@NonNull Callable initialSupplier, @NonNull BiFunction reducer) { ObjectHelper.requireNonNull(initialSupplier, "initialSupplier"); ObjectHelper.requireNonNull(reducer, "reducer"); @@ -356,6 +363,7 @@ public final Flowable sequential() { @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue + @NonNull public final Flowable sequential(int prefetch) { ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ParallelJoin(this, prefetch, false)); @@ -383,6 +391,7 @@ public final Flowable sequential(int prefetch) { @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue @Experimental + @NonNull public final Flowable sequentialDelayError() { return sequentialDelayError(Flowable.bufferSize()); } @@ -407,6 +416,7 @@ public final Flowable sequentialDelayError() { @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue + @NonNull public final Flowable sequentialDelayError(int prefetch) { ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ParallelJoin(this, prefetch, true)); @@ -422,6 +432,7 @@ public final Flowable sequentialDelayError(int prefetch) { * @return the new Flowable instance */ @CheckReturnValue + @NonNull public final Flowable sorted(@NonNull Comparator comparator) { return sorted(comparator, 16); } @@ -437,6 +448,7 @@ public final Flowable sorted(@NonNull Comparator comparator) { * @return the new Flowable instance */ @CheckReturnValue + @NonNull public final Flowable sorted(@NonNull Comparator comparator, int capacityHint) { ObjectHelper.requireNonNull(comparator, "comparator is null"); ObjectHelper.verifyPositive(capacityHint, "capacityHint"); @@ -456,6 +468,7 @@ public final Flowable sorted(@NonNull Comparator comparator, int c * @return the new Flowable instance */ @CheckReturnValue + @NonNull public final Flowable> toSortedList(@NonNull Comparator comparator) { return toSortedList(comparator, 16); } @@ -469,6 +482,7 @@ public final Flowable> toSortedList(@NonNull Comparator compa * @return the new Flowable instance */ @CheckReturnValue + @NonNull public final Flowable> toSortedList(@NonNull Comparator comparator, int capacityHint) { ObjectHelper.requireNonNull(comparator, "comparator is null"); ObjectHelper.verifyPositive(capacityHint, "capacityHint"); @@ -489,6 +503,7 @@ public final Flowable> toSortedList(@NonNull Comparator compa * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnNext(@NonNull Consumer onNext) { ObjectHelper.requireNonNull(onNext, "onNext is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -516,6 +531,7 @@ public final ParallelFlowable doOnNext(@NonNull Consumer onNext) { */ @CheckReturnValue @Experimental + @NonNull public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @NonNull ParallelFailureHandling errorHandler) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); @@ -535,6 +551,7 @@ public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @ */ @CheckReturnValue @Experimental + @NonNull public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @NonNull BiFunction errorHandler) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); @@ -549,6 +566,7 @@ public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @ * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doAfterNext(@NonNull Consumer onAfterNext) { ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -570,6 +588,7 @@ public final ParallelFlowable doAfterNext(@NonNull Consumer onAfte * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnError(@NonNull Consumer onError) { ObjectHelper.requireNonNull(onError, "onError is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -591,6 +610,7 @@ public final ParallelFlowable doOnError(@NonNull Consumer onError) * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnComplete(@NonNull Action onComplete) { ObjectHelper.requireNonNull(onComplete, "onComplete is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -612,6 +632,7 @@ public final ParallelFlowable doOnComplete(@NonNull Action onComplete) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doAfterTerminated(@NonNull Action onAfterTerminate) { ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -633,6 +654,7 @@ public final ParallelFlowable doAfterTerminated(@NonNull Action onAfterTermin * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnSubscribe(@NonNull Consumer onSubscribe) { ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -654,6 +676,7 @@ public final ParallelFlowable doOnSubscribe(@NonNull Consumer doOnRequest(@NonNull LongConsumer onRequest) { ObjectHelper.requireNonNull(onRequest, "onRequest is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -675,6 +698,7 @@ public final ParallelFlowable doOnRequest(@NonNull LongConsumer onRequest) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnCancel(@NonNull Action onCancel) { ObjectHelper.requireNonNull(onCancel, "onCancel is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -699,6 +723,7 @@ public final ParallelFlowable doOnCancel(@NonNull Action onCancel) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable collect(@NonNull Callable collectionSupplier, @NonNull BiConsumer collector) { ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null"); ObjectHelper.requireNonNull(collector, "collector is null"); @@ -714,6 +739,7 @@ public final ParallelFlowable collect(@NonNull Callable coll * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public static ParallelFlowable fromArray(@NonNull Publisher... publishers) { if (publishers.length == 0) { throw new IllegalArgumentException("Zero publishers not supported"); @@ -730,6 +756,7 @@ public static ParallelFlowable fromArray(@NonNull Publisher... publish * @return the value returned by the converter function */ @CheckReturnValue + @NonNull public final U to(@NonNull Function, U> converter) { try { return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); @@ -748,6 +775,7 @@ public final U to(@NonNull Function, U> converte * @return the ParallelFlowable returned by the function */ @CheckReturnValue + @NonNull public final ParallelFlowable compose(@NonNull ParallelTransformer composer) { return RxJavaPlugins.onAssembly(ObjectHelper.requireNonNull(composer, "composer is null").apply(this)); } @@ -762,6 +790,7 @@ public final ParallelFlowable compose(@NonNull ParallelTransformer * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable flatMap(@NonNull Function> mapper) { return flatMap(mapper, false, Integer.MAX_VALUE, Flowable.bufferSize()); } @@ -777,6 +806,7 @@ public final ParallelFlowable flatMap(@NonNull Function ParallelFlowable flatMap( @NonNull Function> mapper, boolean delayError) { return flatMap(mapper, delayError, Integer.MAX_VALUE, Flowable.bufferSize()); @@ -795,6 +825,7 @@ public final ParallelFlowable flatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable flatMap( @NonNull Function> mapper, boolean delayError, int maxConcurrency) { return flatMap(mapper, delayError, maxConcurrency, Flowable.bufferSize()); @@ -813,6 +844,7 @@ public final ParallelFlowable flatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable flatMap( @NonNull Function> mapper, boolean delayError, int maxConcurrency, int prefetch) { @@ -832,6 +864,7 @@ public final ParallelFlowable flatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable concatMap( @NonNull Function> mapper) { return concatMap(mapper, 2); @@ -848,6 +881,7 @@ public final ParallelFlowable concatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable concatMap( @NonNull Function> mapper, int prefetch) { @@ -868,6 +902,7 @@ public final ParallelFlowable concatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable concatMapDelayError( @NonNull Function> mapper, boolean tillTheEnd) { @@ -886,6 +921,7 @@ public final ParallelFlowable concatMapDelayError( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable concatMapDelayError( @NonNull Function> mapper, int prefetch, boolean tillTheEnd) { diff --git a/src/main/java/io/reactivex/processors/AsyncProcessor.java b/src/main/java/io/reactivex/processors/AsyncProcessor.java index f24a1954c1..67e6b7f6a8 100644 --- a/src/main/java/io/reactivex/processors/AsyncProcessor.java +++ b/src/main/java/io/reactivex/processors/AsyncProcessor.java @@ -12,14 +12,13 @@ */ package io.reactivex.processors; -import io.reactivex.annotations.CheckReturnValue; import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; -import org.reactivestreams.*; - +import io.reactivex.annotations.*; import io.reactivex.internal.subscriptions.DeferredScalarSubscription; import io.reactivex.plugins.RxJavaPlugins; +import org.reactivestreams.*; /** * A Subject that emits the very last value followed by a completion event or the received error to Subscribers. @@ -51,6 +50,7 @@ public final class AsyncProcessor extends FlowableProcessor { * @return the new AsyncProcessor instance */ @CheckReturnValue + @NonNull public static AsyncProcessor create() { return new AsyncProcessor(); } diff --git a/src/main/java/io/reactivex/processors/FlowableProcessor.java b/src/main/java/io/reactivex/processors/FlowableProcessor.java index 80699926a3..e5fe4ad838 100644 --- a/src/main/java/io/reactivex/processors/FlowableProcessor.java +++ b/src/main/java/io/reactivex/processors/FlowableProcessor.java @@ -13,9 +13,9 @@ package io.reactivex.processors; -import org.reactivestreams.Processor; - import io.reactivex.*; +import io.reactivex.annotations.NonNull; +import org.reactivestreams.Processor; /** * Represents a Subscriber and a Flowable (Publisher) at the same time, allowing @@ -66,6 +66,7 @@ public abstract class FlowableProcessor extends Flowable implements Proces *

The method is thread-safe. * @return the wrapped and serialized subject */ + @NonNull public final FlowableProcessor toSerialized() { if (this instanceof SerializedProcessor) { return this; diff --git a/src/main/java/io/reactivex/schedulers/Timed.java b/src/main/java/io/reactivex/schedulers/Timed.java index fefcc17620..a1bd20e4be 100644 --- a/src/main/java/io/reactivex/schedulers/Timed.java +++ b/src/main/java/io/reactivex/schedulers/Timed.java @@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit; +import io.reactivex.annotations.NonNull; import io.reactivex.internal.functions.ObjectHelper; /** @@ -34,7 +35,7 @@ public final class Timed { * @param unit the time unit, not null * @throws NullPointerException if unit is null */ - public Timed(T value, long time, TimeUnit unit) { + public Timed(@NonNull T value, long time, @NonNull TimeUnit unit) { this.value = value; this.time = time; this.unit = ObjectHelper.requireNonNull(unit, "unit is null"); @@ -44,6 +45,7 @@ public Timed(T value, long time, TimeUnit unit) { * Returns the contained value. * @return the contained value */ + @NonNull public T value() { return value; } @@ -52,6 +54,7 @@ public T value() { * Returns the time unit of the contained time. * @return the time unit of the contained time */ + @NonNull public TimeUnit unit() { return unit; } @@ -69,7 +72,7 @@ public long time() { * @param unit the time unt * @return the converted time */ - public long time(TimeUnit unit) { + public long time(@NonNull TimeUnit unit) { return unit.convert(time, this.unit); } diff --git a/src/main/java/io/reactivex/subjects/SingleSubject.java b/src/main/java/io/reactivex/subjects/SingleSubject.java index 0c3eab5582..6e8b0e2006 100644 --- a/src/main/java/io/reactivex/subjects/SingleSubject.java +++ b/src/main/java/io/reactivex/subjects/SingleSubject.java @@ -53,6 +53,7 @@ public final class SingleSubject extends Single implements SingleObserver< * @return the new SingleSubject instance */ @CheckReturnValue + @NonNull public static SingleSubject create() { return new SingleSubject(); } @@ -64,7 +65,7 @@ public static SingleSubject create() { } @Override - public void onSubscribe(Disposable d) { + public void onSubscribe(@NonNull Disposable d) { if (observers.get() == TERMINATED) { d.dispose(); } @@ -72,7 +73,7 @@ public void onSubscribe(Disposable d) { @SuppressWarnings("unchecked") @Override - public void onSuccess(T value) { + public void onSuccess(@NonNull T value) { if (value == null) { onError(new NullPointerException("Null values are not allowed in 2.x")); return; @@ -87,7 +88,7 @@ public void onSuccess(T value) { @SuppressWarnings("unchecked") @Override - public void onError(Throwable e) { + public void onError(@NonNull Throwable e) { if (e == null) { e = new NullPointerException("Null errors are not allowed in 2.x"); } @@ -102,7 +103,7 @@ public void onError(Throwable e) { } @Override - protected void subscribeActual(SingleObserver observer) { + protected void subscribeActual(@NonNull SingleObserver observer) { SingleDisposable md = new SingleDisposable(observer, this); observer.onSubscribe(md); if (add(md)) { @@ -119,7 +120,7 @@ protected void subscribeActual(SingleObserver observer) { } } - boolean add(SingleDisposable inner) { + boolean add(@NonNull SingleDisposable inner) { for (;;) { SingleDisposable[] a = observers.get(); if (a == TERMINATED) { @@ -138,7 +139,7 @@ boolean add(SingleDisposable inner) { } @SuppressWarnings("unchecked") - void remove(SingleDisposable inner) { + void remove(@NonNull SingleDisposable inner) { for (;;) { SingleDisposable[] a = observers.get(); int n = a.length; @@ -177,6 +178,7 @@ void remove(SingleDisposable inner) { * Returns the success value if this SingleSubject was terminated with a success value. * @return the success value or null */ + @Nullable public T getValue() { if (observers.get() == TERMINATED) { return value; @@ -196,6 +198,7 @@ public boolean hasValue() { * Returns the terminal error if this SingleSubject has been terminated with an error, null otherwise. * @return the terminal error or null if not terminated or not with an error */ + @Nullable public Throwable getThrowable() { if (observers.get() == TERMINATED) { return error; diff --git a/src/main/java/io/reactivex/subjects/Subject.java b/src/main/java/io/reactivex/subjects/Subject.java index f18464bebd..79bcb3934b 100644 --- a/src/main/java/io/reactivex/subjects/Subject.java +++ b/src/main/java/io/reactivex/subjects/Subject.java @@ -14,6 +14,7 @@ package io.reactivex.subjects; import io.reactivex.*; +import io.reactivex.annotations.*; /** * Represents an Observer and an Observable at the same time, allowing @@ -55,6 +56,7 @@ public abstract class Subject extends Observable implements Observer { * @return the error that caused the Subject to terminate or null if the Subject * hasn't terminated yet */ + @Nullable public abstract Throwable getThrowable(); /** @@ -63,6 +65,7 @@ public abstract class Subject extends Observable implements Observer { *

The method is thread-safe. * @return the wrapped and serialized subject */ + @NonNull public final Subject toSerialized() { if (this instanceof SerializedSubject) { return this;