diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index e161727bb3..b5a3e20ad8 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -33,6 +33,87 @@ *
*
*
+ * This processor does not have a public constructor by design; a new empty instance of this + * {@code BehaviorSubject} can be created via the {@link #create()} method and + * a new non-empty instance can be created via {@link #createDefault(Object)} (named as such to avoid + * overload resolution conflict with {@code Flowable.create} that creates a Flowable, not a {@code BehaviorProcessor}). + *
+ * In accordance with the Reactive Streams specification (Rule 2.13) + * {@code null}s are not allowed as default initial values in {@link #createDefault(Object)} or as parameters to {@link #onNext(Object)} and + * {@link #onError(Throwable)}. + *
+ * When this {@code BehaviorProcessor} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, the + * last observed item (if any) is cleared and late {@link org.reactivestreams.Subscriber}s only receive + * the respective terminal event. + *
+ * The {@code BehaviorProcessor} does not support clearing its cached value (to appear empty again), however, the + * effect can be achieved by using a special item and making sure {@code Subscriber}s subscribe through a + * filter whose predicate filters out this special item: + *
+ * BehaviorProcessor<Integer> processor = BehaviorProcessor.create();
+ *
+ * final Integer EMPTY = Integer.MIN_VALUE;
+ *
+ * Flowable<Integer> flowable = processor.filter(v -> v != EMPTY);
+ *
+ * TestSubscriber<Integer> ts1 = flowable.test();
+ *
+ * processor.onNext(1);
+ * // this will "clear" the cache
+ * processor.onNext(EMPTY);
+ *
+ * TestSubscriber<Integer> ts2 = flowable.test();
+ *
+ * processor.onNext(2);
+ * processor.onComplete();
+ *
+ * // ts1 received both non-empty items
+ * ts1.assertResult(1, 2);
+ *
+ * // ts2 received only 2 even though the current item was EMPTY
+ * // when it got subscribed
+ * ts2.assertResult(2);
+ *
+ * // Subscribers coming after the processor was terminated receive
+ * // no items and only the onComplete event in this case.
+ * flowable.test().assertResult();
+ *
+ * + * Even though {@code BehaviorProcessor} implements the {@code Subscriber} interface, calling + * {@code onSubscribe} is not required (Rule 2.12) + * if the processor is used as a standalone source. However, calling {@code onSubscribe} is + * called after the {@code BehaviorProcessor} reached its terminal state will result in the + * given {@code Subscription} being cancelled immediately. + *
+ * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} + * is still required to be serialized (called from the same thread or called non-overlappingly from different threads + * through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s + * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber} + * consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively. + *
+ * This {@code BehaviorProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the latest observed value + * in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()}, + * {@link #getValues()} or {@link #getValues(Object[])}. + *
+ * Note that this processor signals {@code MissingBackpressureException} if a particular {@code Subscriber} is not + * ready to receive {@code onNext} events. To avoid this exception being signaled, use {@link #offer(Object)} to only + * try to emit an item when all {@code Subscriber}s have requested item(s). + *
* Example usage: *
{@code @@ -94,7 +175,7 @@ public final class BehaviorProcessorextends FlowableProcessor { * Creates a {@link BehaviorProcessor} without a default item. * * @param - * the type of item the Subject will emit + * the type of item the BehaviorProcessor will emit * @return the constructed {@link BehaviorProcessor} */ @CheckReturnValue @@ -107,7 +188,7 @@ public static BehaviorProcessor create() { * {@link Subscriber} that subscribes to it. * * @param - * the type of item the Subject will emit + * the type of item the BehaviorProcessor will emit * @param defaultValue * the item that will be emitted first to any {@link Subscriber} as long as the * {@link BehaviorProcessor} has not yet observed any items from its source {@code Observable} @@ -266,9 +347,9 @@ public Throwable getThrowable() { } /** - * Returns a single value the Subject currently has or null if no such value exists. + * Returns a single value the BehaviorProcessor currently has or null if no such value exists. * The method is thread-safe. - * @return a single value the Subject currently has or null if no such value exists + * @return a single value the BehaviorProcessor currently has or null if no such value exists */ public T getValue() { Object o = value.get(); @@ -279,9 +360,9 @@ public T getValue() { } /** - * Returns an Object array containing snapshot all values of the Subject. + * Returns an Object array containing snapshot all values of the BehaviorProcessor. *
The method is thread-safe. - * @return the array containing the snapshot of all values of the Subject + * @return the array containing the snapshot of all values of the BehaviorProcessor */ public Object[] getValues() { @SuppressWarnings("unchecked") @@ -295,7 +376,7 @@ public Object[] getValues() { } /** - * Returns a typed array containing a snapshot of all values of the Subject. + * Returns a typed array containing a snapshot of all values of the BehaviorProcessor. *
The method follows the conventions of Collection.toArray by setting the array element * after the last value to null (if the capacity permits). *
The method is thread-safe. @@ -337,9 +418,9 @@ public boolean hasThrowable() { } /** - * Returns true if the subject has any value. + * Returns true if the BehaviorProcessor has any value. *
The method is thread-safe. - * @return true if the subject has any value + * @return true if the BehaviorProcessor has any value */ public boolean hasValue() { Object o = value.get();