Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove PublishLast/InitialValue #1788

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 0 additions & 99 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5347,105 +5347,6 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
return OperatorPublish.create(this, selector);
}

/**
* Returns an Observable that emits {@code initialValue} followed by the results of invoking a specified
* selector on items emitted by a {@link ConnectableObservable} that shares a single subscription to the
* source Observable.
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.if.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the type of items emitted by the resulting Observable
* @param selector
* a function that can use the multicasted source sequence as many times as needed, without
* causing multiple subscriptions to the source Observable. Subscribers to the source will
* receive all notifications of the source from the time of the subscription forward
* @param initialValue
* the initial value of the underlying {@link BehaviorSubject}
* @return an Observable that emits {@code initialValue} followed by the results of invoking the selector
* on a {@link ConnectableObservable} that shares a single subscription to the underlying Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: publish</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
*/
public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector, final T initialValue) {
return concatWith(just(initialValue)).publish(selector);
}

/**
* Returns a {@link ConnectableObservable} that emits {@code initialValue} followed by the items emitted by
* the source Observable. A Connectable Observable resembles an ordinary Observable, except that it does not
* begin emitting items when it is subscribed to, but only when its {@code connect} method is called.
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.i.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValue
* the initial value to be emitted by the resulting Observable
* @return a {@link ConnectableObservable} that shares a single subscription to the underlying Observable
* and starts with {@code initialValue}
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: publish</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
*/
public final ConnectableObservable<T> publish(final T initialValue) {
return concatWith(just(initialValue)).publish();
}

/**
* Returns a {@link ConnectableObservable} that emits only the last item emitted by the source Observable.
* A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items
* when it is subscribed to, but only when its {@code connect} method is called.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishLast.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because by intent it is skipping all values except the
* last.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publishLast} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@link ConnectableObservable} that emits only the last item emitted by the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublishlast">RxJava wiki: publishLast</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
*/
public final ConnectableObservable<T> publishLast() {
return takeLast(1).publish();
}

/**
* Returns an Observable that emits an item that results from invoking a specified selector on the last item
* emitted by a {@link ConnectableObservable} that shares a single subscription to the source Observable.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishLast.f.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because by intent it is skipping all values except the
* last.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publishLast} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the type of items emitted by the resulting Observable
* @param selector
* a function that can use the multicasted source sequence as many times as needed, without
* causing multiple subscriptions to the source Observable. Subscribers to the source will only
* receive the last item emitted by the source.
* @return an Observable that emits an item that is the result of invoking the selector on a {@link ConnectableObservable} that shares a single subscription to the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublishlast">RxJava wiki: publishLast</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
*/
public final <R> Observable<R> publishLast(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return takeLast(1).publish(selector);
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a source
* Observable, then feeds the result of that function along with the second item emitted by the source
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public void run() {
}
}).start();
}
}).publishLast();
}).takeLast(1).publish();

// subscribe once
final CountDownLatch latch = new CountDownLatch(1);
Expand Down