Skip to content

Commit

Permalink
2.x: coverage, fixes, cleanup 10/15-1 (#4708)
Browse files Browse the repository at this point in the history
* 2.x: coverage, fixes, cleanup 10/15-1

* Fix error message.
  • Loading branch information
akarnokd authored Oct 15, 2016
1 parent d250ae7 commit 637978c
Show file tree
Hide file tree
Showing 26 changed files with 1,615 additions and 603 deletions.
54 changes: 2 additions & 52 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8305,7 +8305,7 @@ public final Observable<T> onTerminateDetach() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final ConnectableObservable<T> publish() {
return publish(bufferSize());
return ObservablePublish.create(this);
}

/**
Expand All @@ -8329,58 +8329,8 @@ public final ConnectableObservable<T> publish() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends ObservableSource<R>> selector) {
return publish(selector, bufferSize());
}

/**
* Returns an Observable that emits the results of invoking a specified selector on items emitted by a
* {@link ConnectableObservable} that shares a single subscription to the underlying sequence.
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the type of items emitted by the resulting ObservableSource
* @param selector
* a function that can use the multicasted source sequence as many times as needed, without
* causing multiple subscriptions to the source sequence. Observers to the given source will
* receive all notifications of the source from the time of the subscription forward.
* @param bufferSize
* the number of elements to prefetch from the current Observable
* @return an Observable that emits the results of invoking the selector on the items emitted by a {@link ConnectableObservable} that shares a single subscription to the underlying sequence
* @see <a href="http://reactivex.io/documentation/operators/publish.html">ReactiveX operators documentation: Publish</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, int bufferSize) {
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
ObjectHelper.requireNonNull(selector, "selector is null");
return ObservablePublish.create(this, selector, bufferSize);
}

/**
* Returns a {@link ConnectableObservable}, which is a variety of ObservableSource that waits until its
* {@link ConnectableObservable#connect connect} method is called before it begins emitting items to those
* {@link Observer}s that have subscribed to it.
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param bufferSize
* the number of elements to prefetch from the current Observable
* @return a {@link ConnectableObservable} that upon connection causes the source ObservableSource to emit items
* to its {@link Observer}s
* @see <a href="http://reactivex.io/documentation/operators/publish.html">ReactiveX operators documentation: Publish</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final ConnectableObservable<T> publish(int bufferSize) {
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return ObservablePublish.create(this, bufferSize);
return new ObservablePublishSelector<T, R>(this, selector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,15 @@ public boolean isDisposed() {
@Override
public void onNext(T t) {
U b = buffer;
if (b == null) {
return;
}

b.add(t);
if (b != null) {
b.add(t);

if (++size >= count) {
actual.onNext(b);
if (++size >= count) {
actual.onNext(b);

size = 0;
createBuffer();
size = 0;
createBuffer();
}
}
}

Expand Down Expand Up @@ -185,21 +183,14 @@ public void onNext(T t) {
U b;

try {
b = bufferSupplier.call();
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
} catch (Throwable e) {
buffers.clear();
s.dispose();
actual.onError(e);
return;
}

if (b == null) {
buffers.clear();
s.dispose();
actual.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}

buffers.offer(b);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,8 @@ void disposeAll() {

for (;;) {

try {
inner = observers.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
inner = observers.poll();

if (inner == null) {
return;
}
Expand Down
Loading

0 comments on commit 637978c

Please sign in to comment.