Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add takeUntil support in Single #3712

Merged
merged 2 commits into from
Feb 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 227 additions & 3 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
*/
package rx;

import java.util.Collection;
import java.util.concurrent.*;

import rx.Observable.Operator;
import rx.annotations.Beta;
import rx.annotations.Experimental;
Expand All @@ -23,15 +26,13 @@
import rx.internal.util.ScalarSynchronousSingle;
import rx.internal.util.UtilityFunctions;
import rx.observers.SafeSubscriber;
import rx.observers.SerializedSubscriber;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;
import rx.singles.BlockingSingle;
import rx.subscriptions.Subscriptions;

import java.util.Collection;
import java.util.concurrent.*;

/**
* The Single class implements the Reactive Pattern for a single value response. See {@link Observable} for the
* implementation of the Reactive Pattern for a stream or vector of values.
Expand Down Expand Up @@ -1800,6 +1801,229 @@ public void onError(Throwable error) {
}
});
}

/**
* Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon
* termination of {@code other}, this will emit a {@link CancellationException} rather than go to
* {@link SingleSubscriber#onSuccess(Object)}.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the Completable whose termination will cause {@code takeUntil} to emit the item from the source
* Single
* @return a Single that emits the item emitted by the source Single until such time as {@code other} terminates.
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
*/
public final Single<T> takeUntil(final Completable other) {
return lift(new Operator<T, T>() {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);

final Subscriber<T> main = new Subscriber<T>(serial, false) {
@Override
public void onNext(T t) {
serial.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
serial.onError(e);
} finally {
serial.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
serial.onCompleted();
} finally {
serial.unsubscribe();
}
}
};

final Completable.CompletableSubscriber so = new Completable.CompletableSubscriber() {
@Override
public void onCompleted() {
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
}

@Override
public void onError(Throwable e) {
main.onError(e);
}

@Override
public void onSubscribe(Subscription d) {
serial.add(d);
}
};

serial.add(main);
child.add(serial);

other.subscribe(so);

return main;
}
});
}

/**
* Returns a Single that emits the item emitted by the source Single until an Observable emits an item. Upon
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
* {@link SingleSubscriber#onSuccess(Object)}.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the Observable whose first emitted item will cause {@code takeUntil} to emit the item from the source
* Single
* @param <E>
* the type of items emitted by {@code other}
* @return a Single that emits the item emitted by the source Single until such time as {@code other} emits
* its first item
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
*/
public final <E> Single<T> takeUntil(final Observable<? extends E> other) {
return lift(new Operator<T, T>() {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);

final Subscriber<T> main = new Subscriber<T>(serial, false) {
@Override
public void onNext(T t) {
serial.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
serial.onError(e);
} finally {
serial.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
serial.onCompleted();
} finally {
serial.unsubscribe();
}
}
};

final Subscriber<E> so = new Subscriber<E>() {

@Override
public void onCompleted() {
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
}

@Override
public void onError(Throwable e) {
main.onError(e);
}

@Override
public void onNext(E e) {
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
}
};

serial.add(main);
serial.add(so);

child.add(serial);

other.unsafeSubscribe(so);

return main;
}
});
}

/**
* Returns a Single that emits the item emitted by the source Single until a second Single emits an item. Upon
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
* {@link SingleSubscriber#onSuccess(Object)}.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the Single whose emitted item will cause {@code takeUntil} to emit the item from the source Single
* @param <E>
* the type of item emitted by {@code other}
* @return a Single that emits the item emitted by the source Single until such time as {@code other} emits its item
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
*/
public final <E> Single<T> takeUntil(final Single<? extends E> other) {
return lift(new Operator<T, T>() {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);

final Subscriber<T> main = new Subscriber<T>(serial, false) {
@Override
public void onNext(T t) {
serial.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
serial.onError(e);
} finally {
serial.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
serial.onCompleted();
} finally {
serial.unsubscribe();
}
}
};

final SingleSubscriber<E> so = new SingleSubscriber<E>() {
@Override
public void onSuccess(E value) {
onError(new CancellationException("Stream was canceled before emitting a terminal event."));
}

@Override
public void onError(Throwable e) {
main.onError(e);
}
};

serial.add(main);
serial.add(so);

child.add(serial);

other.subscribe(so);

return main;
}
});
}

/**
* Converts this Single into an {@link Observable}.
Expand Down
Loading