Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add Subject and Processor marbles #5816

Merged
merged 1 commit into from
Jan 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/main/java/io/reactivex/processors/AsyncProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
/**
* Processor that emits the very last value followed by a completion event or the received error
* to {@link Subscriber}s.
*
* <p>The implementation of onXXX methods are technically thread-safe but non-serialized calls
* <p>
* <img width="640" height="239" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/AsyncProcessor.png" alt="">
* <p>
* The implementation of onXXX methods are technically thread-safe but non-serialized calls
* to them may lead to undefined state in the currently subscribed Subscribers.
*
* @param <T> the value type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* Processor that multicasts all subsequently observed items to its current {@link Subscriber}s.
*
* <p>
* <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.PublishSubject.png" alt="">
* <img width="640" height="278" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/PublishProcessor.png" alt="">
*
* <p>The processor does not coordinate backpressure for its subscribers and implements a weaker onSubscribe which
* calls requests Long.MAX_VALUE from the incoming Subscriptions. This makes it possible to subscribe the PublishProcessor
Expand Down
27 changes: 25 additions & 2 deletions src/main/java/io/reactivex/processors/ReplayProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,31 @@
/**
* Replays events to Subscribers.
* <p>
* <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.ReplaySubject.png" alt="">
*
* The {@code ReplayProcessor} supports the following item retainment strategies:
* <ul>
* <li>{@link #create()} and {@link #create(int)}: retains and replays all events to current and
* future {@code Subscriber}s.
* <p>
* <img width="640" height="269" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.u.png" alt="">
* <p>
* <img width="640" height="345" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.ue.png" alt="">
* </li>
* <li>{@link #createWithSize(int)}: retains at most the given number of items and replays only these
* latest items to new {@code Subscriber}s.
* <p>
* <img width="640" height="332" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.n.png" alt="">
* </li>
* <li>{@link #createWithTime(long, TimeUnit, Scheduler)}: retains items no older than the specified time
* and replays them to new {@code Subscriber}s (which could mean all items age out).
* <p>
* <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.t.png" alt="">
* </li>
* <li>{@link #createWithTimeAndSize(long, TimeUnit, Scheduler, int)}: retaims no more than the given number of items
* which are also no older than the specified time and replays them to new {@code Subscriber}s (which could mean all items age out).
* <p>
* <img width="640" height="404" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.nt.png" alt="">
* </li>
* </ul>
* <p>
* The ReplayProcessor can be created in bounded and unbounded mode. It can be bounded by
* size (maximum number of elements retained at most) and/or time (maximum age of elements replayed).
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
/**
* A Subject that emits the very last value followed by a completion event or the received error to Observers.
* <p>
* <img width="640" height="239" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/AsyncSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new empty instance of this
* {@code AsyncSubject} can be created via the {@link #create()} method.
* <p>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/CompletableSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* Represents a hot Completable-like source and consumer of events similar to Subjects.
* <p>
* <img width="640" height="243" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/CompletableSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new non-terminated instance of this
* {@code CompletableSubject} can be created via the {@link #create()} method.
* <p>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/MaybeSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* Represents a hot Maybe-like source and consumer of events similar to Subjects.
* <p>
* <img width="640" height="164" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/MaybeSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new non-terminated instance of this
* {@code MaybeSubject} can be created via the {@link #create()} method.
* <p>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* A Subject that emits (multicasts) items to currently subscribed {@link Observer}s and terminal events to current
* or late {@code Observer}s.
* <p>
* <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.PublishSubject.png" alt="">
* <img width="640" height="281" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/PublishSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new empty instance of this
* {@code PublishSubject} can be created via the {@link #create()} method.
Expand Down
27 changes: 20 additions & 7 deletions src/main/java/io/reactivex/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,36 @@
/**
* Replays events (in a configurable bounded or unbounded manner) to current and late {@link Observer}s.
* <p>
* <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.ReplaySubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new empty instance of this
* {@code ReplaySubject} can be created via the following {@code create} methods that
* allow specifying the retention policy for items:
* <ul>
* <li>{@link #create()} - creates an empty, unbounded {@code ReplaySubject} that
* caches all items and the terminal event it receives.</li>
* caches all items and the terminal event it receives.
* <p>
* <img width="640" height="299" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.u.png" alt="">
* <p>
* <img width="640" height="398" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.ue.png" alt="">
* </li>
* <li>{@link #create(int)} - creates an empty, unbounded {@code ReplaySubject}
* with a hint about how many <b>total</b> items one expects to retain.</li>
* with a hint about how many <b>total</b> items one expects to retain.
* </li>
* <li>{@link #createWithSize(int)} - creates an empty, size-bound {@code ReplaySubject}
* that retains at most the given number of the latest item it receives.</li>
* that retains at most the given number of the latest item it receives.
* <p>
* <img width="640" height="420" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.n.png" alt="">
* </li>
* <li>{@link #createWithTime(long, TimeUnit, Scheduler)} - creates an empty, time-bound
* {@code ReplaySubject} that retains items no older than the specified time amount.</li>
* {@code ReplaySubject} that retains items no older than the specified time amount.
* <p>
* <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.t.png" alt="">
* </li>
* <li>{@link #createWithTimeAndSize(long, TimeUnit, Scheduler, int)} - creates an empty,
* time- and size-bound {@code ReplaySubject} that retains at most the given number
* items that are also not older than the specified time amount.</li>
* items that are also not older than the specified time amount.
* <p>
* <img width="640" height="404" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.nt.png" alt="">
* </li>
* </ul>
* <p>
* Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/SingleSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* Represents a hot Single-like source and consumer of events similar to Subjects.
* <p>
* <img width="640" height="236" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/SingleSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new non-terminated instance of this
* {@code SingleSubject} can be created via the {@link #create()} method.
* <p>
Expand Down