diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index ed3bafe518..20dc599ab1 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -274,6 +274,14 @@ public static Observable merge(int maxConcurrency, int bufferSize, Publis public static Observable merge(int maxConcurrency, int bufferSize, Iterable> sources) { return fromIterable(sources).flatMap(v -> v, false, maxConcurrency, bufferSize); } + + public static Observable merge(Publisher> sources) { + return merge(sources, bufferSize()); + } + + public static Observable merge(Publisher> sources, int maxConcurrency) { + return fromPublisher(sources).flatMap(v -> v, maxConcurrency); + } @SafeVarargs public static Observable mergeDelayError(Publisher... sources) { @@ -302,6 +310,15 @@ public static Observable mergeDelayError(int maxConcurrency, int bufferSi return fromIterable(sources).flatMap(v -> v, true, maxConcurrency, bufferSize); } + public static Observable mergeDelayError(Publisher> sources) { + return mergeDelayError(sources, bufferSize()); + } + + public static Observable mergeDelayError(Publisher> sources, int maxConcurrency) { + return fromPublisher(sources).flatMap(v -> v, true, maxConcurrency); + } + + public final Observable take(long n) { if (n < 0) { throw new IllegalArgumentException("n >= required but it was " + n); @@ -608,7 +625,7 @@ public final Observable concatMap(Function 0 required but it was " + prefetch); } - return lift(new ConcatMap<>(mapper, prefetch)); + return lift(new OperatorConcatMap<>(mapper, prefetch)); } public final Observable concatWith(Publisher other) { @@ -616,9 +633,22 @@ public final Observable concatWith(Publisher other) { return concat(this, other); } + public final Observable concat(Publisher> sources) { + return concat(sources, bufferSize()); + } + + public final Observable concat(Publisher> sources, int bufferSize) { + return fromPublisher(sources).concatMap(v -> v); + } + @SafeVarargs public static Observable concat(Publisher... sources) { - Objects.requireNonNull(sources); + if (sources.length == 0) { + return empty(); + } else + if (sources.length == 1) { + return fromPublisher(sources[0]); + } return fromArray(sources).concatMap(v -> v); } @@ -1513,4 +1543,24 @@ public final Observable last() { public final Observable last(T defaultValue) { return takeLast(1).single(defaultValue); } + + public final Observable switchMap(Function> mapper) { + return switchMap(mapper, bufferSize()); + } + + public final Observable switchMap(Function> mapper, int bufferSize) { + Objects.requireNonNull(mapper); + if (bufferSize <= 0) { + throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize); + } + return lift(new OperatorSwitchMap<>(mapper, bufferSize)); + } + + public static Observable switchOnNext(Publisher> sources) { + return fromPublisher(sources).switchMap(v -> v); + } + + public static Observable switchOnNext(int bufferSize, Publisher> sources) { + return fromPublisher(sources).switchMap(v -> v, bufferSize); + } } diff --git a/src/main/java/io/reactivex/internal/operators/OperatorCollect.java b/src/main/java/io/reactivex/internal/operators/OperatorCollect.java index d042a989bd..eca4a50e08 100644 --- a/src/main/java/io/reactivex/internal/operators/OperatorCollect.java +++ b/src/main/java/io/reactivex/internal/operators/OperatorCollect.java @@ -17,6 +17,7 @@ import org.reactivestreams.*; import io.reactivex.Observable.Operator; +import io.reactivex.internal.subscribers.CancelledSubscriber; import io.reactivex.internal.subscriptions.*; public final class OperatorCollect implements Operator { diff --git a/src/main/java/io/reactivex/internal/operators/ConcatMap.java b/src/main/java/io/reactivex/internal/operators/OperatorConcatMap.java similarity index 96% rename from src/main/java/io/reactivex/internal/operators/ConcatMap.java rename to src/main/java/io/reactivex/internal/operators/OperatorConcatMap.java index ad453b4b6b..bfd1c322c4 100644 --- a/src/main/java/io/reactivex/internal/operators/ConcatMap.java +++ b/src/main/java/io/reactivex/internal/operators/OperatorConcatMap.java @@ -25,10 +25,10 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subscribers.SerializedSubscriber; -public final class ConcatMap implements Operator { +public final class OperatorConcatMap implements Operator { final Function> mapper; final int bufferSize; - public ConcatMap(Function> mapper, int bufferSize) { + public OperatorConcatMap(Function> mapper, int bufferSize) { this.mapper = mapper; this.bufferSize = bufferSize; } diff --git a/src/main/java/io/reactivex/internal/operators/OperatorDistinct.java b/src/main/java/io/reactivex/internal/operators/OperatorDistinct.java index ec898169e3..09971ceb0e 100644 --- a/src/main/java/io/reactivex/internal/operators/OperatorDistinct.java +++ b/src/main/java/io/reactivex/internal/operators/OperatorDistinct.java @@ -19,7 +19,7 @@ import org.reactivestreams.*; import io.reactivex.Observable.Operator; -import io.reactivex.internal.subscribers.CancellingSubscriber; +import io.reactivex.internal.subscribers.CancelledSubscriber; import io.reactivex.internal.subscriptions.*; public final class OperatorDistinct implements Operator { @@ -77,13 +77,13 @@ public Subscriber apply(Subscriber t) { } catch (Throwable e) { t.onSubscribe(EmptySubscription.INSTANCE); t.onError(e); - return CancellingSubscriber.INSTANCE; + return CancelledSubscriber.INSTANCE; } if (coll == null) { t.onSubscribe(EmptySubscription.INSTANCE); t.onError(new NullPointerException("predicateSupplier returned null")); - return CancellingSubscriber.INSTANCE; + return CancelledSubscriber.INSTANCE; } return null; diff --git a/src/main/java/io/reactivex/internal/operators/OperatorSwitchMap.java b/src/main/java/io/reactivex/internal/operators/OperatorSwitchMap.java new file mode 100644 index 0000000000..e5785f09d1 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/OperatorSwitchMap.java @@ -0,0 +1,372 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators; + +import java.util.Queue; +import java.util.concurrent.atomic.*; +import java.util.function.Function; + +import org.reactivestreams.*; + +import io.reactivex.Observable; +import io.reactivex.Observable.Operator; +import io.reactivex.internal.queue.*; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +public final class OperatorSwitchMap implements Operator { + final Function> mapper; + final int bufferSize; + + public OperatorSwitchMap(Function> mapper, int bufferSize) { + this.mapper = mapper; + this.bufferSize = bufferSize; + } + + @Override + public Subscriber apply(Subscriber t) { + return new SwitchMapSubscriber<>(t, mapper, bufferSize); + } + + static final class SwitchMapSubscriber extends AtomicInteger implements Subscriber, Subscription { + /** */ + private static final long serialVersionUID = -3491074160481096299L; + final Subscriber actual; + final Function> mapper; + final int bufferSize; + + long requested; + + volatile long missedRequested; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater MISSED_REQUESTED = + AtomicLongFieldUpdater.newUpdater(SwitchMapSubscriber.class, "missedRequested"); + + + volatile boolean done; + Throwable error; + + volatile boolean cancelled; + + Subscription s; + + volatile Publisher publisher; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater PUBLISHER = + AtomicReferenceFieldUpdater.newUpdater(SwitchMapSubscriber.class, Publisher.class, "publisher"); + + SwitchMapInnerSubscriber active; + + volatile long unique; + + public SwitchMapSubscriber(Subscriber actual, Function> mapper, int bufferSize) { + this.actual = actual; + this.mapper = mapper; + this.bufferSize = bufferSize; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validateSubscription(this.s, s)) { + return; + } + this.s = s; + actual.onSubscribe(this); + } + + @Override + public void onNext(T t) { + Publisher p; + try { + p = mapper.apply(t); + } catch (Throwable e) { + s.cancel(); + onError(e); + return; + } + + PUBLISHER.lazySet(this, p); + drain(); + } + + @Override + public void onError(Throwable t) { + error = t; + done = true; + drain(); + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validateRequest(n)) { + return; + } + BackpressureHelper.add(MISSED_REQUESTED, this, n); + if (unique == 0L) { + s.request(Long.MAX_VALUE); + } else { + drain(); + } + } + + @Override + public void cancel() { + if (!cancelled) { + cancelled = true; + + if (getAndIncrement() == 0) { + clear(active); + } + } + } + + void clear(SwitchMapInnerSubscriber inner) { + if (inner != null) { + inner.cancel(); + } + publisher = null; + s.cancel(); + } + + @SuppressWarnings("unchecked") + void drain() { + if (getAndIncrement() != 0) { + return; + } + + final Subscriber a = actual; + + int missing = 1; + + for (;;) { + boolean d = done; + Publisher p = publisher; + SwitchMapInnerSubscriber inner = active; + boolean empty = p == null && inner == null; + + if (checkTerminated(d, empty, a, inner)) { + return; + } + + // get the latest publisher + if (p != null) { + p = PUBLISHER.getAndSet(this, null); + } + + long c = unique; + if (p != null) { + if (inner != null) { + inner.cancel(); + inner = null; + } + unique = ++c; + inner = new SwitchMapInnerSubscriber<>(this, c, bufferSize); + active = inner; + + p.subscribe(inner); + } + + if (inner != null) { + long r = requested; + + long mr = MISSED_REQUESTED.getAndSet(this, 0L); + if (mr != 0L) { + r = BackpressureHelper.addCap(r, mr); + requested = r; + } + + boolean unbounded = r == Long.MAX_VALUE; + + Queue q = inner.queue; + long e = 0; + + while (r != 0L) { + d = inner.done; + R v = q.poll(); + empty = v == null; + + if (cancelled) { + clear(active); + return; + } + if (d) { + Throwable err = inner.error; + if (err != null) { + clear(active); + a.onError(err); + return; + } else + if (empty) { + active = null; + break; + } + } + + if (publisher != null) { + break; + } + + if (empty) { + break; + } + + a.onNext(v); + + r--; + e++; + } + + if (e != 0L) { + if (!unbounded) { + requested -= e; + inner.get().request(e); + } + + } + } + + missing = addAndGet(-missing); + if (missing == 0) { + break; + } + } + } + + boolean checkTerminated(boolean d, boolean empty, Subscriber a, SwitchMapInnerSubscriber inner) { + if (cancelled) { + clear(inner); + return true; + } + if (d) { + Throwable e = error; + if (e != null) { + cancelled = true; + clear(inner); + a.onError(e); + return true; + } else + if (empty) { + a.onComplete(); + return true; + } + } + + return false; + } + } + + static final class SwitchMapInnerSubscriber extends AtomicReference implements Subscriber { + /** */ + private static final long serialVersionUID = 3837284832786408377L; + final SwitchMapSubscriber parent; + final long index; + final int bufferSize; + final Queue queue; + + volatile boolean done; + Throwable error; + + static final Subscription CANCELLED = new Subscription() { + @Override + public void request(long n) { + + } + + @Override + public void cancel() { + + } + }; + + public SwitchMapInnerSubscriber(SwitchMapSubscriber parent, long index, int bufferSize) { + this.parent = parent; + this.index = index; + this.bufferSize = bufferSize; + Queue q; + if (Pow2.isPowerOfTwo(bufferSize)) { + q = new SpscArrayQueue<>(bufferSize); + } else { + q = new SpscExactArrayQueue<>(bufferSize); + } + this.queue = q; + } + + @Override + public void onSubscribe(Subscription s) { + if (index == parent.unique) { + if (!compareAndSet(null, s)) { + s.cancel(); + if (get() != CANCELLED) { + SubscriptionHelper.reportSubscriptionSet(); + } + return; + } + s.request(bufferSize); + } else { + s.cancel(); + } + } + + @Override + public void onNext(R t) { + if (index == parent.unique) { + if (!queue.offer(t)) { + onError(new IllegalStateException("Queue full?!")); + return; + } + parent.drain(); + } + } + + @Override + public void onError(Throwable t) { + if (index == parent.unique) { + error = t; + done = true; + parent.drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + if (index == parent.unique) { + done = true; + parent.drain(); + } + } + + public void cancel() { + Subscription s = get(); + if (s != CANCELLED) { + s = getAndSet(CANCELLED); + if (s != CANCELLED && s != null) { + s.cancel(); + } + } + } + } + + public static void main(String[] args) { + Observable.range(1, 10).switchMap(Observable::just).subscribe(System.out::println); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/OperatorToList.java b/src/main/java/io/reactivex/internal/operators/OperatorToList.java index fd118e3e21..3e904c4e26 100644 --- a/src/main/java/io/reactivex/internal/operators/OperatorToList.java +++ b/src/main/java/io/reactivex/internal/operators/OperatorToList.java @@ -19,6 +19,7 @@ import org.reactivestreams.*; import io.reactivex.Observable.Operator; +import io.reactivex.internal.subscribers.CancelledSubscriber; import io.reactivex.internal.subscriptions.EmptySubscription; import io.reactivex.plugins.RxJavaPlugins; diff --git a/src/main/java/io/reactivex/internal/operators/CancelledSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/CancelledSubscriber.java similarity index 85% rename from src/main/java/io/reactivex/internal/operators/CancelledSubscriber.java rename to src/main/java/io/reactivex/internal/subscribers/CancelledSubscriber.java index caf48fd261..c00df6f9ac 100644 --- a/src/main/java/io/reactivex/internal/operators/CancelledSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/CancelledSubscriber.java @@ -10,17 +10,20 @@ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ -package io.reactivex.internal.operators; + +package io.reactivex.internal.subscribers; import org.reactivestreams.*; import io.reactivex.plugins.RxJavaPlugins; /** - * Subscriber that cancels all subscriptions sent to it. + * A subscriber cancels the subscription sent to it + * and ignores all events (onError is forwarded to RxJavaPlugins though). */ public enum CancelledSubscriber implements Subscriber { INSTANCE; + @Override public void onSubscribe(Subscription s) { s.cancel(); diff --git a/src/main/java/io/reactivex/internal/subscribers/CancellingSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/CancellingSubscriber.java deleted file mode 100644 index b5c0e90611..0000000000 --- a/src/main/java/io/reactivex/internal/subscribers/CancellingSubscriber.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.subscribers; - -import org.reactivestreams.*; - -import io.reactivex.plugins.RxJavaPlugins; - -/** - * A subscriber cancels the subscription sent to it - * and ignores all events (onError is forwarded to RxJavaPlugins though). - */ -public enum CancellingSubscriber implements Subscriber { - INSTANCE; - - @Override - public void onSubscribe(Subscription s) { - s.cancel(); - } - - @Override - public void onNext(Object t) { - - } - - @Override - public void onError(Throwable t) { - RxJavaPlugins.onError(t); - } - - @Override - public void onComplete() { - - } -}