Skip to content

Commit

Permalink
2.x: add Flowable.parallel() and parallel operators (#4974)
Browse files Browse the repository at this point in the history
* 2.x: add ParallelFlowable

* Fix groupBy benchmark
  • Loading branch information
akarnokd authored Jan 18, 2017
1 parent 3bc2823 commit 6c88036
Show file tree
Hide file tree
Showing 23 changed files with 5,078 additions and 2 deletions.
95 changes: 95 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.subscribers.*;
import io.reactivex.internal.util.*;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.*;
import io.reactivex.subscribers.*;
Expand Down Expand Up @@ -10363,6 +10364,100 @@ public final Flowable<T> onTerminateDetach() {
return RxJavaPlugins.onAssembly(new FlowableDetach<T>(this));
}

/**
* Parallelizes the flow by creating multiple 'rails' (equal to the number of CPUs)
* and dispatches the upstream items to them in a round-robin fashion.
* <p>
* Note that the rails don't execute in parallel on their own and one needs to
* apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where
* each rail will execute.
* <p>
* To merge the parallel 'rails' back into a single sequence, use {@link ParallelFlowable#sequential()}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flowable.parallel.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requires the upstream to honor backpressure and each 'rail' honors backpressure
* as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code parallel} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new ParallelFlowable instance
* @since 2.0.5 - experimental
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@Experimental
public final ParallelFlowable<T> parallel() {
return ParallelFlowable.from(this);
}

/**
* Parallelizes the flow by creating the specified number of 'rails'
* and dispatches the upstream items to them in a round-robin fashion.
* <p>
* Note that the rails don't execute in parallel on their own and one needs to
* apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where
* each rail will execute.
* <p>
* To merge the parallel 'rails' back into a single sequence, use {@link ParallelFlowable#sequential()}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flowable.parallel.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requires the upstream to honor backpressure and each 'rail' honors backpressure
* as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code parallel} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param parallelism the number of 'rails' to use
* @return the new ParallelFlowable instance
* @since 2.0.5 - experimental
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@Experimental
public final ParallelFlowable<T> parallel(int parallelism) {
ObjectHelper.verifyPositive(parallelism, "parallelism");
return ParallelFlowable.from(this, parallelism);
}

/**
* Parallelizes the flow by creating the specified number of 'rails'
* and dispatches the upstream items to them in a round-robin fashion and
* uses the defined per-'rail' prefetch amount.
* <p>
* Note that the rails don't execute in parallel on their own and one needs to
* apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where
* each rail will execute.
* <p>
* To merge the parallel 'rails' back into a single sequence, use {@link ParallelFlowable#sequential()}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flowable.parallel.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requires the upstream to honor backpressure and each 'rail' honors backpressure
* as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code parallel} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param parallelism the number of 'rails' to use
* @param prefetch the number of items each 'rail' should prefetch
* @return the new ParallelFlowable instance
* @since 2.0.5 - experimental
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@Experimental
public final ParallelFlowable<T> parallel(int parallelism, int prefetch) {
ObjectHelper.verifyPositive(parallelism, "parallelism");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return ParallelFlowable.from(this, parallelism, prefetch);
}

/**
* Returns a {@link ConnectableFlowable}, which is a variety of Publisher that waits until its
* {@link ConnectableFlowable#connect connect} method is called before it begins emitting items to those
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.parallel;

import java.util.concurrent.Callable;

import org.reactivestreams.*;

import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiConsumer;
import io.reactivex.internal.subscribers.DeferredScalarSubscriber;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Reduce the sequence of values in each 'rail' to a single value.
*
* @param <T> the input value type
* @param <C> the collection type
*/
public final class ParallelCollect<T, C> extends ParallelFlowable<C> {

final ParallelFlowable<? extends T> source;

final Callable<C> initialCollection;

final BiConsumer<C, T> collector;

public ParallelCollect(ParallelFlowable<? extends T> source,
Callable<C> initialCollection, BiConsumer<C, T> collector) {
this.source = source;
this.initialCollection = initialCollection;
this.collector = collector;
}

@Override
public void subscribe(Subscriber<? super C>[] subscribers) {
if (!validate(subscribers)) {
return;
}

int n = subscribers.length;
@SuppressWarnings("unchecked")
Subscriber<T>[] parents = new Subscriber[n];

for (int i = 0; i < n; i++) {

C initialValue;

try {
initialValue = initialCollection.call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
reportError(subscribers, ex);
return;
}

if (initialValue == null) {
reportError(subscribers, new NullPointerException("The initialSupplier returned a null value"));
return;
}

parents[i] = new ParallelCollectSubscriber<T, C>(subscribers[i], initialValue, collector);
}

source.subscribe(parents);
}

void reportError(Subscriber<?>[] subscribers, Throwable ex) {
for (Subscriber<?> s : subscribers) {
EmptySubscription.error(ex, s);
}
}

@Override
public int parallelism() {
return source.parallelism();
}

static final class ParallelCollectSubscriber<T, C> extends DeferredScalarSubscriber<T, C> {


private static final long serialVersionUID = -4767392946044436228L;

final BiConsumer<C, T> collector;

C collection;

boolean done;

ParallelCollectSubscriber(Subscriber<? super C> subscriber,
C initialValue, BiConsumer<C, T> collector) {
super(subscriber);
this.collection = initialValue;
this.collector = collector;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);

s.request(Long.MAX_VALUE);
}
}

@Override
public void onNext(T t) {
if (done) {
return;
}

try {
collector.accept(collection, t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
onError(ex);
return;
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
collection = null;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
C c = collection;
collection = null;
complete(c);
}

@Override
public void cancel() {
super.cancel();
s.cancel();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.parallel;

import org.reactivestreams.*;

import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.operators.flowable.FlowableConcatMap;
import io.reactivex.internal.util.ErrorMode;
import io.reactivex.parallel.ParallelFlowable;

/**
* Concatenates the generated Publishers on each rail.
*
* @param <T> the input value type
* @param <R> the output value type
*/
public final class ParallelConcatMap<T, R> extends ParallelFlowable<R> {

final ParallelFlowable<T> source;

final Function<? super T, ? extends Publisher<? extends R>> mapper;

final int prefetch;

final ErrorMode errorMode;

public ParallelConcatMap(
ParallelFlowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper,
int prefetch, ErrorMode errorMode) {
this.source = source;
this.mapper = ObjectHelper.requireNonNull(mapper, "mapper");
this.prefetch = prefetch;
this.errorMode = ObjectHelper.requireNonNull(errorMode, "errorMode");
}

@Override
public int parallelism() {
return source.parallelism();
}

@Override
public void subscribe(Subscriber<? super R>[] subscribers) {
if (!validate(subscribers)) {
return;
}

int n = subscribers.length;

@SuppressWarnings("unchecked")
final Subscriber<T>[] parents = new Subscriber[n];

// FIXME cheat until we have support from RxJava2 internals
Publisher<T> p = new Publisher<T>() {
int i;

@SuppressWarnings("unchecked")
@Override
public void subscribe(Subscriber<? super T> s) {
parents[i++] = (Subscriber<T>)s;
}
};

FlowableConcatMap<T, R> op = new FlowableConcatMap<T, R>(p, mapper, prefetch, errorMode);

for (int i = 0; i < n; i++) {

op.subscribe(subscribers[i]);
// FIXME needs a FlatMap subscriber
// parents[i] = FlowableConcatMap.createSubscriber(s, mapper, prefetch, errorMode);
}

source.subscribe(parents);
}
}
Loading

0 comments on commit 6c88036

Please sign in to comment.