Skip to content

Commit

Permalink
Merge pull request #3234 from akarnokd/OperatorSwitchMap2x
Browse files Browse the repository at this point in the history
Operator switchMap, switchOnNext, added missing merge and concat
  • Loading branch information
akarnokd committed Aug 29, 2015
2 parents 26ba5eb + 9e296f4 commit c27c903
Show file tree
Hide file tree
Showing 8 changed files with 436 additions and 55 deletions.
54 changes: 52 additions & 2 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ public static <T> Observable<T> merge(int maxConcurrency, int bufferSize, Publis
public static <T> Observable<T> merge(int maxConcurrency, int bufferSize, Iterable<? extends Publisher<? extends T>> sources) {
return fromIterable(sources).flatMap(v -> v, false, maxConcurrency, bufferSize);
}

public static <T> Observable<T> merge(Publisher<? extends Publisher<? extends T>> sources) {
return merge(sources, bufferSize());
}

public static <T> Observable<T> merge(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency) {
return fromPublisher(sources).flatMap(v -> v, maxConcurrency);
}

@SafeVarargs
public static <T> Observable<T> mergeDelayError(Publisher<? extends T>... sources) {
Expand Down Expand Up @@ -302,6 +310,15 @@ public static <T> Observable<T> mergeDelayError(int maxConcurrency, int bufferSi
return fromIterable(sources).flatMap(v -> v, true, maxConcurrency, bufferSize);
}

public static <T> Observable<T> mergeDelayError(Publisher<? extends Publisher<? extends T>> sources) {
return mergeDelayError(sources, bufferSize());
}

public static <T> Observable<T> mergeDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency) {
return fromPublisher(sources).flatMap(v -> v, true, maxConcurrency);
}


public final Observable<T> take(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= required but it was " + n);
Expand Down Expand Up @@ -608,17 +625,30 @@ public final <R> Observable<R> concatMap(Function<? super T, ? extends Publisher
if (prefetch <= 0) {
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
}
return lift(new ConcatMap<>(mapper, prefetch));
return lift(new OperatorConcatMap<>(mapper, prefetch));
}

public final Observable<T> concatWith(Publisher<? extends T> other) {
Objects.requireNonNull(other);
return concat(this, other);
}

public final Observable<T> concat(Publisher<? extends Publisher<? extends T>> sources) {
return concat(sources, bufferSize());
}

public final Observable<T> concat(Publisher<? extends Publisher<? extends T>> sources, int bufferSize) {
return fromPublisher(sources).concatMap(v -> v);
}

@SafeVarargs
public static <T> Observable<T> concat(Publisher<? extends T>... sources) {
Objects.requireNonNull(sources);
if (sources.length == 0) {
return empty();
} else
if (sources.length == 1) {
return fromPublisher(sources[0]);
}
return fromArray(sources).concatMap(v -> v);
}

Expand Down Expand Up @@ -1513,4 +1543,24 @@ public final Observable<T> last() {
public final Observable<T> last(T defaultValue) {
return takeLast(1).single(defaultValue);
}

public final <R> Observable<R> switchMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return switchMap(mapper, bufferSize());
}

public final <R> Observable<R> switchMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int bufferSize) {
Objects.requireNonNull(mapper);
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
return lift(new OperatorSwitchMap<>(mapper, bufferSize));
}

public static <T> Observable<T> switchOnNext(Publisher<? extends Publisher<? extends T>> sources) {
return fromPublisher(sources).switchMap(v -> v);
}

public static <T> Observable<T> switchOnNext(int bufferSize, Publisher<? extends Publisher<? extends T>> sources) {
return fromPublisher(sources).switchMap(v -> v, bufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.reactivestreams.*;

import io.reactivex.Observable.Operator;
import io.reactivex.internal.subscribers.CancelledSubscriber;
import io.reactivex.internal.subscriptions.*;

public final class OperatorCollect<T, U> implements Operator<U, T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subscribers.SerializedSubscriber;

public final class ConcatMap<T, U> implements Operator<U, T> {
public final class OperatorConcatMap<T, U> implements Operator<U, T> {
final Function<? super T, ? extends Publisher<? extends U>> mapper;
final int bufferSize;
public ConcatMap(Function<? super T, ? extends Publisher<? extends U>> mapper, int bufferSize) {
public OperatorConcatMap(Function<? super T, ? extends Publisher<? extends U>> mapper, int bufferSize) {
this.mapper = mapper;
this.bufferSize = bufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.reactivestreams.*;

import io.reactivex.Observable.Operator;
import io.reactivex.internal.subscribers.CancellingSubscriber;
import io.reactivex.internal.subscribers.CancelledSubscriber;
import io.reactivex.internal.subscriptions.*;

public final class OperatorDistinct<T> implements Operator<T, T> {
Expand Down Expand Up @@ -77,13 +77,13 @@ public Subscriber<? super T> apply(Subscriber<? super T> t) {
} catch (Throwable e) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(e);
return CancellingSubscriber.INSTANCE;
return CancelledSubscriber.INSTANCE;
}

if (coll == null) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(new NullPointerException("predicateSupplier returned null"));
return CancellingSubscriber.INSTANCE;
return CancelledSubscriber.INSTANCE;
}

return null;
Expand Down
Loading

0 comments on commit c27c903

Please sign in to comment.