Skip to content

Commit

Permalink
2.x: Add concatArrayEagerDelayError operator (expose feature) (#6143)
Browse files Browse the repository at this point in the history
* 2.x: Add concatArrayEagerDelayError operator (expose feature)

* Change text to "Concatenates an array of"
  • Loading branch information
akarnokd authored Aug 7, 2018
1 parent 3562dfc commit 1ad606b
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 11 deletions.
77 changes: 73 additions & 4 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.reactivestreams.*;

import io.reactivex.Observable;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
Expand Down Expand Up @@ -1415,7 +1416,9 @@ public static <T> Flowable<T> concatArrayDelayError(Publisher<? extends T>... so
}

/**
* Concatenates a sequence of Publishers eagerly into a single stream of values.
* Concatenates an array of Publishers eagerly into a single stream of values.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatArrayEager.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source Publishers. The operator buffers the values emitted by these Publishers and then drains them
Expand All @@ -1430,7 +1433,7 @@ public static <T> Flowable<T> concatArrayDelayError(Publisher<? extends T>... so
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of Publishers that need to be eagerly concatenated
* @param sources an array of Publishers that need to be eagerly concatenated
* @return the new Publisher instance with the specified concatenation behavior
* @since 2.0
*/
Expand All @@ -1442,7 +1445,9 @@ public static <T> Flowable<T> concatArrayEager(Publisher<? extends T>... sources
}

/**
* Concatenates a sequence of Publishers eagerly into a single stream of values.
* Concatenates an array of Publishers eagerly into a single stream of values.
* <p>
* <img width="640" height="406" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatArrayEager.nn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source Publishers. The operator buffers the values emitted by these Publishers and then drains them
Expand All @@ -1457,7 +1462,7 @@ public static <T> Flowable<T> concatArrayEager(Publisher<? extends T>... sources
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of Publishers that need to be eagerly concatenated
* @param sources an array of Publishers that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
* is interpreted as an indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each Publisher source
Expand All @@ -1475,6 +1480,70 @@ public static <T> Flowable<T> concatArrayEager(int maxConcurrency, int prefetch,
return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromArray(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE));
}

/**
* Concatenates an array of {@link Publisher}s eagerly into a single stream of values
* and delaying any errors until all sources terminate.
* <p>
* <img width="640" height="358" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatArrayEagerDelayError.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s
* and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The {@code Publisher}
* sources are expected to honor backpressure as well.
* If any of the source {@code Publisher}s violate this, the operator will signal a
* {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of {@code Publisher}s that need to be eagerly concatenated
* @return the new Flowable instance with the specified concatenation behavior
* @since 2.2.1 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources) {
return concatArrayEagerDelayError(bufferSize(), bufferSize(), sources);
}

/**
* Concatenates an array of {@link Publisher}s eagerly into a single stream of values
* and delaying any errors until all sources terminate.
* <p>
* <img width="640" height="359" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatArrayEagerDelayError.nn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s
* and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The {@code Publisher}
* sources are expected to honor backpressure as well.
* If any of the source {@code Publisher}s violate this, the operator will signal a
* {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of {@code Publisher}s that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
* is interpreted as indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each {@code Publisher} source
* @return the new Flowable instance with the specified concatenation behavior
* @since 2.2.1 - experimental
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> concatArrayEagerDelayError(int maxConcurrency, int prefetch, Publisher<? extends T>... sources) {
return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, true);
}

/**
* Concatenates the Iterable sequence of Publishers into a single sequence by subscribing to each Publisher,
* one after the other, one at a time and delays any errors till the all inner Publishers terminate.
Expand Down
66 changes: 60 additions & 6 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1294,19 +1294,19 @@ public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends
}

/**
* Concatenates a sequence of ObservableSources eagerly into a single stream of values.
* Concatenates an array of ObservableSources eagerly into a single stream of values.
* <p>
* <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
* in order, each one after the previous one completes.
* <p>
* <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of ObservableSources that need to be eagerly concatenated
* @param sources an array of ObservableSources that need to be eagerly concatenated
* @return the new ObservableSource instance with the specified concatenation behavior
* @since 2.0
*/
Expand All @@ -1317,7 +1317,9 @@ public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>..
}

/**
* Concatenates a sequence of ObservableSources eagerly into a single stream of values.
* Concatenates an array of ObservableSources eagerly into a single stream of values.
* <p>
* <img width="640" height="495" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.nn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
Expand All @@ -1327,7 +1329,7 @@ public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>..
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of ObservableSources that need to be eagerly concatenated
* @param sources an array of ObservableSources that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
* is interpreted as indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each ObservableSource source
Expand All @@ -1341,6 +1343,58 @@ public static <T> Observable<T> concatArrayEager(int maxConcurrency, int prefetc
return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, false);
}

/**
* Concatenates an array of {@link ObservableSource}s eagerly into a single stream of values
* and delaying any errors until all sources terminate.
* <p>
* <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEagerDelayError.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s
* and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of {@code ObservableSource}s that need to be eagerly concatenated
* @return the new Observable instance with the specified concatenation behavior
* @since 2.2.1 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayEagerDelayError(ObservableSource<? extends T>... sources) {
return concatArrayEagerDelayError(bufferSize(), bufferSize(), sources);
}

/**
* Concatenates an array of {@link ObservableSource}s eagerly into a single stream of values
* and delaying any errors until all sources terminate.
* <p>
* <img width="640" height="460" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEagerDelayError.nn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s
* and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of {@code ObservableSource}s that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
* is interpreted as indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each {@code ObservableSource} source
* @return the new Observable instance with the specified concatenation behavior
* @since 2.2.1 - experimental
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayEagerDelayError(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources) {
return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, true);
}

/**
* Concatenates the Iterable sequence of ObservableSources into a single sequence by subscribing to each ObservableSource,
* one after the other, one at a time and delays any errors till the all inner ObservableSources terminate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,4 +1228,110 @@ public void accept(List<Integer> v)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(list);
}

@Test
public void arrayDelayErrorDefault() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
PublishProcessor<Integer> pp3 = PublishProcessor.create();

@SuppressWarnings("unchecked")
TestSubscriber<Integer> ts = Flowable.concatArrayEagerDelayError(pp1, pp2, pp3)
.test();

ts.assertEmpty();

assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
assertTrue(pp3.hasSubscribers());

pp2.onNext(2);
pp2.onComplete();

ts.assertEmpty();

pp1.onNext(1);

ts.assertValuesOnly(1);

pp1.onComplete();

ts.assertValuesOnly(1, 2);

pp3.onComplete();

ts.assertResult(1, 2);
}

@Test
public void arrayDelayErrorMaxConcurrency() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
PublishProcessor<Integer> pp3 = PublishProcessor.create();

@SuppressWarnings("unchecked")
TestSubscriber<Integer> ts = Flowable.concatArrayEagerDelayError(2, 2, pp1, pp2, pp3)
.test();

ts.assertEmpty();

assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
assertFalse(pp3.hasSubscribers());

pp2.onNext(2);
pp2.onComplete();

ts.assertEmpty();

pp1.onNext(1);

ts.assertValuesOnly(1);

pp1.onComplete();

assertTrue(pp3.hasSubscribers());

ts.assertValuesOnly(1, 2);

pp3.onComplete();

ts.assertResult(1, 2);
}

@Test
public void arrayDelayErrorMaxConcurrencyErrorDelayed() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
PublishProcessor<Integer> pp3 = PublishProcessor.create();

@SuppressWarnings("unchecked")
TestSubscriber<Integer> ts = Flowable.concatArrayEagerDelayError(2, 2, pp1, pp2, pp3)
.test();

ts.assertEmpty();

assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
assertFalse(pp3.hasSubscribers());

pp2.onNext(2);
pp2.onError(new TestException());

ts.assertEmpty();

pp1.onNext(1);

ts.assertValuesOnly(1);

pp1.onComplete();

assertTrue(pp3.hasSubscribers());

ts.assertValuesOnly(1, 2);

pp3.onComplete();

ts.assertFailure(TestException.class, 1, 2);
}
}
Loading

0 comments on commit 1ad606b

Please sign in to comment.