Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: Add Completable.andThen(Single) #3799

Merged
merged 1 commit into from
Mar 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,24 @@ public final <T> Observable<T> andThen(Observable<T> next) {
requireNonNull(next);
return next.delaySubscription(toObservable());
}

/**
* Returns a Single which will subscribe to this Completable and once that is completed then
* will subscribe to the {@code next} Single. An error event from this Completable will be
* propagated to the downstream subscriber and will result in skipping the subscription of the
* Single.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code andThen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note about scheduling required

* @param next the Single to subscribe after this Completable is completed, not null
* @return Single that composes this Completable and next
*/
public final <T> Single<T> andThen(Single<T> next) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd @Experimental?

requireNonNull(next);
return next.delaySubscription(toObservable());
}

/**
* Concatenates this Completable with another Completable.
Expand Down
25 changes: 24 additions & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import rx.internal.util.UtilityFunctions;
import rx.observers.SafeSubscriber;
import rx.observers.SerializedSubscriber;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSingleExecutionHook;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -2671,4 +2670,28 @@ public static <T, Resource> Single<T> using(
return create(new SingleOnSubscribeUsing<T, Resource>(resourceFactory, singleFactory, disposeAction, disposeEagerly));
}

/**
* Returns a Single that delays the subscription to this Single
* until the Observable completes. In case the {@code onError} of the supplied observer throws,
* the exception will be propagated to the downstream subscriber
* and will result in skipping the subscription of this Single.
*
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add info about the case when Observable throws onError

* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other the Observable that should trigger the subscription
* to this Single.
* @return a Single that delays the subscription to this Single
* until the Observable emits an element or completes normally.
*/
@Experimental
public final Single<T> delaySubscription(Observable<?> other) {
if (other == null) {
throw new NullPointerException();
}
return create(new SingleOnSubscribeDelaySubscriptionOther<T>(this, other));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package rx.internal.operators;

import rx.Observable;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscriber;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.SerialSubscription;

/**
* Delays the subscription to the Single until the Observable
* fires an event or completes.
*
* @param <T> the Single value type
*/
public final class SingleOnSubscribeDelaySubscriptionOther<T> implements Single.OnSubscribe<T> {
final Single<? extends T> main;
final Observable<?> other;

public SingleOnSubscribeDelaySubscriptionOther(Single<? extends T> main, Observable<?> other) {
this.main = main;
this.other = other;
}

@Override
public void call(final SingleSubscriber<? super T> subscriber) {
final SingleSubscriber<T> child = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
subscriber.onSuccess(value);
}

@Override
public void onError(Throwable error) {
subscriber.onError(error);
}
};

final SerialSubscription serial = new SerialSubscription();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to add serial to t .

subscriber.add(serial);

Subscriber<Object> otherSubscriber = new Subscriber<Object>() {
boolean done;
@Override
public void onNext(Object t) {
onCompleted();
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
return;
}
done = true;
child.onError(e);
}

@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
serial.set(child);

main.subscribe(child);
}
};

serial.set(otherSubscriber);

other.subscribe(otherSubscriber);
}
}
55 changes: 55 additions & 0 deletions src/test/java/rx/CompletableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,61 @@ public void andThenSubscribeOn() {
ts.assertCompleted();
ts.assertNoErrors();
}

@Test
public void andThenSingle() {
TestSubscriber<String> ts = new TestSubscriber<String>(0);
Completable.complete().andThen(Single.just("foo")).subscribe(ts);
ts.requestMore(1);
ts.assertValue("foo");
ts.assertCompleted();
ts.assertNoErrors();
ts.assertUnsubscribed();
}

@Test
public void andThenSingleNever() {
TestSubscriber<String> ts = new TestSubscriber<String>(0);
Completable.never().andThen(Single.just("foo")).subscribe(ts);
ts.requestMore(1);
ts.assertNoValues();
ts.assertNoTerminalEvent();
}

@Test
public void andThenSingleError() {
TestSubscriber<String> ts = new TestSubscriber<String>(0);
final AtomicBoolean hasRun = new AtomicBoolean(false);
final Exception e = new Exception();
Completable.error(e)
.andThen(Single.<String>create(new Single.OnSubscribe<String>() {
@Override
public void call(SingleSubscriber<? super String> s) {
hasRun.set(true);
s.onSuccess("foo");
}
}))
.subscribe(ts);
ts.assertNoValues();
ts.assertError(e);
ts.assertUnsubscribed();
Assert.assertFalse("Should not have subscribed to single when completable errors", hasRun.get());
}

@Test
public void andThenSingleSubscribeOn() {
TestSubscriber<String> ts = new TestSubscriber<String>(0);
TestScheduler scheduler = new TestScheduler();
Completable.complete().andThen(Single.just("foo").delay(1, TimeUnit.SECONDS, scheduler)).subscribe(ts);
ts.requestMore(1);
ts.assertNoValues();
ts.assertNoTerminalEvent();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
ts.assertValue("foo");
ts.assertCompleted();
ts.assertNoErrors();
ts.assertUnsubscribed();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check that unsubscribe works in each phase of a running subscription.


@Test(expected = NullPointerException.class)
public void createNull() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package rx.internal.operators;

import org.junit.Assert;
import org.junit.Test;
import rx.Single;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

import java.util.concurrent.atomic.AtomicInteger;

public class SingleOnSubscribeDelaySubscriptionOtherTest {
@Test
public void noPrematureSubscription() {
PublishSubject<Object> other = PublishSubject.create();

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger subscribed = new AtomicInteger();

Single.just(1)
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

Assert.assertEquals("Premature subscription", 0, subscribed.get());

other.onNext(1);

Assert.assertEquals("No subscription", 1, subscribed.get());

ts.assertValue(1);
ts.assertNoErrors();
ts.assertCompleted();
}

@Test
public void noPrematureSubscriptionToError() {
PublishSubject<Object> other = PublishSubject.create();

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger subscribed = new AtomicInteger();

Single.<Integer>error(new TestException())
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

Assert.assertEquals("Premature subscription", 0, subscribed.get());

other.onNext(1);

Assert.assertEquals("No subscription", 1, subscribed.get());

ts.assertNoValues();
ts.assertNotCompleted();
ts.assertError(TestException.class);
}

@Test
public void noSubscriptionIfOtherErrors() {
PublishSubject<Object> other = PublishSubject.create();

TestSubscriber<Integer> ts = TestSubscriber.create();

final AtomicInteger subscribed = new AtomicInteger();

Single.<Integer>error(new TestException())
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.getAndIncrement();
}
})
.delaySubscription(other)
.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertNoValues();

Assert.assertEquals("Premature subscription", 0, subscribed.get());

other.onError(new TestException());

Assert.assertEquals("Premature subscription", 0, subscribed.get());

ts.assertNoValues();
ts.assertNotCompleted();
ts.assertError(TestException.class);
}
}