From b02e572bdb781485ce87181f70b426454ab8abcb Mon Sep 17 00:00:00 2001 From: David Gross Date: Tue, 9 Dec 2014 10:59:29 -0800 Subject: [PATCH] tidying up AbstractOnSubscribe javadocs --- .../rx/observables/AbstractOnSubscribe.java | 245 +++++++++++------- 1 file changed, 148 insertions(+), 97 deletions(-) diff --git a/src/main/java/rx/observables/AbstractOnSubscribe.java b/src/main/java/rx/observables/AbstractOnSubscribe.java index 7cbb46b8a1..6dbe7ad44a 100644 --- a/src/main/java/rx/observables/AbstractOnSubscribe.java +++ b/src/main/java/rx/observables/AbstractOnSubscribe.java @@ -26,58 +26,65 @@ import rx.functions.*; /** - * Abstract base class for the OnSubscribe interface that helps building - * observable sources one onNext at a time and automatically supports - * unsubscription and backpressure. + * Abstract base class for the {@link OnSubscribe} interface that helps you build Observable sources one + * {@code onNext} at a time, and automatically supports unsubscription and backpressure. *

*

Usage rules

- * Implementors of the {@code next()} method + * When you implement the {@code next()} method, you * * - * The {@code SubscriptionState} object features counters that may help implement a state machine: + * The {@link SubscriptionState} object features counters that may help implement a state machine: * *

- * The implementors of the {@code AbstractOnSubscribe} may override the {@code onSubscribe} to perform - * special actions (such as registering {@code Subscription}s with {@code Subscriber.add()}) and return additional state for each subscriber subscribing. This custom state is - * accessible through the {@code state.state()} method. If the custom state requires some form of cleanup, - * the {@code onTerminated} method can be overridden. + * When you implement {@code AbstractOnSubscribe}, you may override {@link AbstractOnSubscribe#onSubscribe} to + * perform special actions (such as registering {@code Subscription}s with {@code Subscriber.add()}) and return + * additional state for each subscriber subscribing. You can access this custom state with the + * {@link SubscriptionState#state state.state()} method. If you need to do some cleanup, you can override the + * {@link #onTerminated} method. *

- * For convenience, lambda-accepting static factory methods, named {@code create()}, are available. Another - * convenience is the {@code toObservable} which turns an {@code AbstractOnSubscribe} instance into an {@code Observable} fluently. + * For convenience, a lambda-accepting static factory method, {@link #create}, is available. + * Another convenience is {@link #toObservable} which turns an {@code AbstractOnSubscribe} + * instance into an {@code Observable} fluently. * *

Examples

- * Note: the examples use the lambda-helper factories to avoid boilerplane. + * Note: these examples use the lambda-helper factories to avoid boilerplane. * *

Implement: just

*

@@ -100,7 +107,7 @@
  *   }
  * }, u -> iterable.iterator()).subscribe(System.out::println);
  * 
- + * *

Implement source that fails a number of times before succeeding

*

  * AtomicInteger fails = new AtomicInteger();
@@ -136,37 +143,43 @@
  * .timeout(1, TimeUnit.SECONDS)
  * .subscribe(System.out::println, Throwable::printStacktrace, () -> System.out.println("Done"));
  * 
- + * * @param the value type * @param the per-subscriber user-defined state type + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + * @Experimental */ @Experimental public abstract class AbstractOnSubscribe implements OnSubscribe { /** - * Called when a Subscriber subscribes and let's the implementor - * create a per-subscriber custom state. + * Called when a Subscriber subscribes and lets the implementor create a per-subscriber custom state. *

- * Override this method to have custom state per-subscriber. - * The default implementation returns {@code null}. + * Override this method to have custom state per-subscriber. The default implementation returns + * {@code null}. + * * @param subscriber the subscriber who is subscribing * @return the custom state */ protected S onSubscribe(Subscriber subscriber) { return null; } + /** * Called after the terminal emission or when the downstream unsubscribes. *

- * This is called only once and it is made sure no onNext call runs concurrently with it. - * The default implementation does nothing. + * This is called only once and no {@code onNext} call will run concurrently with it. The default + * implementation does nothing. + * * @param state the user-provided state */ protected void onTerminated(S state) { } + /** - * Override this method and create an emission state-machine. - * @param state the per-subscriber subscription state. + * Override this method to create an emission state-machine. + * + * @param state the per-subscriber subscription state */ protected abstract void next(SubscriptionState state); @@ -179,7 +192,8 @@ public final void call(final Subscriber subscriber) { } /** - * Convenience method to create an observable from the implemented instance + * Convenience method to create an Observable from this implemented instance. + * * @return the created observable */ public final Observable toObservable() { @@ -195,14 +209,15 @@ public Object call(Object t1) { }; /** - * Creates an AbstractOnSubscribe instance which calls the provided {@code next} action. + * Creates an {@code AbstractOnSubscribe} instance which calls the provided {@code next} action. *

- * This is a convenience method to help create AbstractOnSubscribe instances with the - * help of lambdas. + * This is a convenience method to help create {@code AbstractOnSubscribe} instances with the help of + * lambdas. + * * @param the value type * @param the per-subscriber user-defined state type * @param next the next action to call - * @return an AbstractOnSubscribe instance + * @return an {@code AbstractOnSubscribe} instance */ public static AbstractOnSubscribe create(Action1> next) { @SuppressWarnings("unchecked") @@ -210,42 +225,49 @@ public static AbstractOnSubscribe create(Action1, ? extends S>)NULL_FUNC1; return create(next, nullFunc, Actions.empty()); } + /** - * Creates an AbstractOnSubscribe instance which creates a custom state with the - * {@code onSubscribe} function and calls the provided {@code next} action. + * Creates an {@code AbstractOnSubscribe} instance which creates a custom state with the {@code onSubscribe} + * function and calls the provided {@code next} action. *

- * This is a convenience method to help create AbstractOnSubscribe instances with the - * help of lambdas. + * This is a convenience method to help create {@code AbstractOnSubscribe} instances with the help of + * lambdas. + * * @param the value type * @param the per-subscriber user-defined state type * @param next the next action to call - * @param onSubscribe the function that returns a per-subscriber state to be used by next - * @return an AbstractOnSubscribe instance + * @param onSubscribe the function that returns a per-subscriber state to be used by {@code next} + * @return an {@code AbstractOnSubscribe} instance */ public static AbstractOnSubscribe create(Action1> next, Func1, ? extends S> onSubscribe) { return create(next, onSubscribe, Actions.empty()); } + /** - * Creates an AbstractOnSubscribe instance which creates a custom state with the - * {@code onSubscribe} function, calls the provided {@code next} action and - * calls the {@code onTerminated} action to release the state when its no longer needed. + * Creates an {@code AbstractOnSubscribe} instance which creates a custom state with the {@code onSubscribe} + * function, calls the provided {@code next} action and calls the {@code onTerminated} action to release the + * state when its no longer needed. *

- * This is a convenience method to help create AbstractOnSubscribe instances with the - * help of lambdas. + * This is a convenience method to help create {@code AbstractOnSubscribe} instances with the help of + * lambdas. + * * @param the value type * @param the per-subscriber user-defined state type * @param next the next action to call - * @param onSubscribe the function that returns a per-subscriber state to be used by next - * @param onTerminated the action to call to release the state created by the onSubscribe function - * @return an AbstractOnSubscribe instance + * @param onSubscribe the function that returns a per-subscriber state to be used by {@code next} + * @param onTerminated the action to call to release the state created by the {@code onSubscribe} function + * @return an {@code AbstractOnSubscribe} instance */ public static AbstractOnSubscribe create(Action1> next, Func1, ? extends S> onSubscribe, Action1 onTerminated) { return new LambdaOnSubscribe(next, onSubscribe, onTerminated); } + /** - * An implementation that forwards the 3 main methods to functional callbacks. + * An implementation that forwards the three main methods ({@code next}, {@code onSubscribe}, and + * {@code onTermianted}) to functional callbacks. + * * @param the value type * @param the per-subscriber user-defined state type */ @@ -272,13 +294,14 @@ protected void next(SubscriptionState state) { next.call(state); } } + /** * Manages unsubscription of the state. + * * @param the value type * @param the per-subscriber user-defined state type */ private static final class SubscriptionCompleter extends AtomicBoolean implements Subscription { - /** */ private static final long serialVersionUID = 7993888274897325004L; private final SubscriptionState state; private SubscriptionCompleter(SubscriptionState state) { @@ -298,6 +321,7 @@ public void unsubscribe() { } /** * Contains the producer loop that reacts to downstream requests of work. + * * @param the value type * @param the per-subscriber user-defined state type */ @@ -325,9 +349,11 @@ public void request(long n) { } } } + /** * Executes the user-overridden next() method and performs state bookkeeping and * verification. + * * @return true if the outer loop may continue */ protected boolean doNext() { @@ -355,12 +381,15 @@ protected boolean doNext() { return false; } } + /** - * Represents a per-subscription state for the AbstractOnSubscribe operation. - * It supports phasing and counts the number of times a value was requested - * by the downstream. + * Represents a per-subscription state for the {@code AbstractOnSubscribe} operation. It supports phasing + * and counts the number of times a value was requested by the downstream. + * * @param the value type * @param the per-subscriber user-defined state type + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + * @Experimental */ public static final class SubscriptionState { private final AbstractOnSubscribe parent; @@ -382,51 +411,61 @@ private SubscriptionState(AbstractOnSubscribe parent, Subscriber - * Throws IllegalStateException if there is a value already offered but not taken or - * a terminal state is reached. - * @param value the value to onNext + * Call this method to offer the next {@code onNext} value for the subscriber. + * + * @param value the value to {@code onNext} + * @throws IllegalStateException if there is a value already offered but not taken or a terminal state + * is reached */ public void onNext(T value) { if (hasOnNext) { @@ -438,13 +477,14 @@ public void onNext(T value) { theValue = value; hasOnNext = true; } + /** - * Call this method to send an onError to the subscriber and terminate - * all further activities. If there is an onNext even not taken, that - * value is emitted to the subscriber followed by this exception. - *

- * Throws IllegalStateException if the terminal state has been reached already. + * Call this method to send an {@code onError} to the subscriber and terminate all further activities. + * If there is a pending {@code onNext}, that value is emitted to the subscriber followed by this + * exception. + * * @param e the exception to deliver to the client + * @throws IllegalStateException if the terminal state has been reached already */ public void onError(Throwable e) { if (e == null) { @@ -456,13 +496,13 @@ public void onError(Throwable e) { theException = e; hasCompleted = true; } + /** - * Call this method to send an onCompleted to the subscriber and terminate - * all further activities. If there is an onNext even not taken, that - * value is emitted to the subscriber followed by this exception. - *

- * Throws IllegalStateException if the terminal state has been reached already. - * @param e the exception to deliver to the client + * Call this method to send an {@code onCompleted} to the subscriber and terminate all further + * activities. If there is a pending {@code onNext}, that value is emitted to the subscriber followed by + * this exception. + * + * @throws IllegalStateException if the terminal state has been reached already */ public void onCompleted() { if (hasCompleted) { @@ -470,15 +510,18 @@ public void onCompleted() { } hasCompleted = true; } + /** * Signals that there won't be any further events. */ public void stop() { stopRequested = true; } + /** - * Emits the onNextValue and/or the terminal value to the actual subscriber. - * @return true if the event was a terminal event + * Emits the {@code onNext} and/or the terminal value to the actual subscriber. + * + * @return {@code true} if the event was a terminal event */ protected boolean accept() { if (hasOnNext) { @@ -513,21 +556,28 @@ protected boolean accept() { } return false; } + /** - * Verify if the next() generated an event or requested a stop. + * Verify if the {@code next()} generated an event or requested a stop. + * * @return true if either event was generated or stop was requested */ protected boolean verify() { return hasOnNext || hasCompleted || stopRequested; } - /** @returns true if the next() requested a stop. */ + + /** @return true if the {@code next()} requested a stop */ protected boolean stopRequested() { return stopRequested; } + /** - * Request the state to be used by onNext or returns false if - * the downstream has unsubscribed. - * @return true if the state can be used exclusively + * Request the state to be used by {@code onNext} or returns {@code false} if the downstream has + * unsubscribed. + * + * @return {@code true} if the state can be used exclusively + * @throws IllegalStateEception + * @warn "throws" section incomplete */ protected boolean use() { int i = inUse.get(); @@ -539,9 +589,9 @@ protected boolean use() { } throw new IllegalStateException("This is not reentrant nor threadsafe!"); } + /** - * Release the state if there are no more interest in it and - * is not in use. + * Release the state if there are no more interest in it and it is not in use. */ protected void free() { int i = inUse.get(); @@ -552,9 +602,10 @@ protected void free() { parent.onTerminated(state); } } + /** - * Terminates the state immediately and calls - * onTerminated with the custom state. + * Terminates the state immediately and calls {@link AbstractOnSubscribe#onTerminated} with the custom + * state. */ protected void terminate() { for (;;) {