Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add maxConcurrent parameter to flatMapIterable #3722

Merged
merged 1 commit into from
Mar 3, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5558,6 +5558,36 @@ public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterab
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
}

/**
* Returns an Observable that merges each item emitted by the source Observable with the values in an
* Iterable corresponding to that item that is generated by a selector, while limiting the number of concurrent
* subscriptions to these Observables.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the type of item emitted by the resulting Observable
* @param collectionSelector
* a function that returns an Iterable sequence of values for when given an item emitted by the
* source Observable
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits the results of merging the items emitted by the source Observable with
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
* @throws IllegalArgumentException
* if {@code maxConcurrent} is less than or equal to 0
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Beta
public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector, int maxConcurrent) {
return merge(map(OperatorMapPair.convertSelector(collectionSelector)), maxConcurrent);
}

/**
* Returns an Observable that emits the results of applying a function to the pair of values from the source
* Observable and an Iterable corresponding to that item that is generated by a selector.
Expand Down Expand Up @@ -5587,6 +5617,42 @@ public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Ite
return flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector);
}

/**
* Returns an Observable that emits the results of applying a function to the pair of values from the source
* Observable and an Iterable corresponding to that item that is generated by a selector, while limiting the
* number of concurrent subscriptions to these Observables.
* <p>
* <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.r.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the collection element type
* @param <R>
* the type of item emited by the resulting Observable
* @param collectionSelector
* a function that returns an Iterable sequence of values for each item emitted by the source
* Observable
* @param resultSelector
* a function that returns an item based on the item emitted by the source Observable and the
* Iterable returned for that item by the {@code collectionSelector}
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits the items returned by {@code resultSelector} for each item in the source
* Observable
* @throws IllegalArgumentException
* if {@code maxConcurrent} is less than or equal to 0
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Beta
public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector, int maxConcurrent) {
return flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector, maxConcurrent);
}

/**
* Subscribes to the {@link Observable} and receives notifications for each element.
* <p>
Expand Down