Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add concatArrayEagerDelayError operator (expose feature) #6143

Merged
merged 2 commits into from
Aug 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 73 additions & 4 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.reactivestreams.*;

import io.reactivex.Observable;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
Expand Down Expand Up @@ -1415,7 +1416,9 @@ public static <T> Flowable<T> concatArrayDelayError(Publisher<? extends T>... so
}

/**
* Concatenates a sequence of Publishers eagerly into a single stream of values.
* Concatenates an array of Publishers eagerly into a single stream of values.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatArrayEager.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source Publishers. The operator buffers the values emitted by these Publishers and then drains them
Expand All @@ -1430,7 +1433,7 @@ public static <T> Flowable<T> concatArrayDelayError(Publisher<? extends T>... so
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of Publishers that need to be eagerly concatenated
* @param sources an array of Publishers that need to be eagerly concatenated
* @return the new Publisher instance with the specified concatenation behavior
* @since 2.0
*/
Expand All @@ -1442,7 +1445,9 @@ public static <T> Flowable<T> concatArrayEager(Publisher<? extends T>... sources
}

/**
* Concatenates a sequence of Publishers eagerly into a single stream of values.
* Concatenates an array of Publishers eagerly into a single stream of values.
* <p>
* <img width="640" height="406" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatArrayEager.nn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source Publishers. The operator buffers the values emitted by these Publishers and then drains them
Expand All @@ -1457,7 +1462,7 @@ public static <T> Flowable<T> concatArrayEager(Publisher<? extends T>... sources
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of Publishers that need to be eagerly concatenated
* @param sources an array of Publishers that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
* is interpreted as an indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each Publisher source
Expand All @@ -1475,6 +1480,70 @@ public static <T> Flowable<T> concatArrayEager(int maxConcurrency, int prefetch,
return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromArray(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE));
}

/**
* Concatenates an array of {@link Publisher}s eagerly into a single stream of values
* and delaying any errors until all sources terminate.
* <p>
* <img width="640" height="358" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatArrayEagerDelayError.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s
* and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The {@code Publisher}
* sources are expected to honor backpressure as well.
* If any of the source {@code Publisher}s violate this, the operator will signal a
* {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of {@code Publisher}s that need to be eagerly concatenated
* @return the new Flowable instance with the specified concatenation behavior
* @since 2.2.1 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources) {
return concatArrayEagerDelayError(bufferSize(), bufferSize(), sources);
}

/**
* Concatenates an array of {@link Publisher}s eagerly into a single stream of values
* and delaying any errors until all sources terminate.
* <p>
* <img width="640" height="359" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatArrayEagerDelayError.nn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s
* and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. The {@code Publisher}
* sources are expected to honor backpressure as well.
* If any of the source {@code Publisher}s violate this, the operator will signal a
* {@code MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of {@code Publisher}s that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
* is interpreted as indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each {@code Publisher} source
* @return the new Flowable instance with the specified concatenation behavior
* @since 2.2.1 - experimental
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> concatArrayEagerDelayError(int maxConcurrency, int prefetch, Publisher<? extends T>... sources) {
return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, true);
}

/**
* Concatenates the Iterable sequence of Publishers into a single sequence by subscribing to each Publisher,
* one after the other, one at a time and delays any errors till the all inner Publishers terminate.
Expand Down
66 changes: 60 additions & 6 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1294,19 +1294,19 @@ public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends
}

/**
* Concatenates a sequence of ObservableSources eagerly into a single stream of values.
* Concatenates an array of ObservableSources eagerly into a single stream of values.
* <p>
* <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
* in order, each one after the previous one completes.
* <p>
* <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of ObservableSources that need to be eagerly concatenated
* @param sources an array of ObservableSources that need to be eagerly concatenated
* @return the new ObservableSource instance with the specified concatenation behavior
* @since 2.0
*/
Expand All @@ -1317,7 +1317,9 @@ public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>..
}

/**
* Concatenates a sequence of ObservableSources eagerly into a single stream of values.
* Concatenates an array of ObservableSources eagerly into a single stream of values.
* <p>
* <img width="640" height="495" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEager.nn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
Expand All @@ -1327,7 +1329,7 @@ public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>..
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of ObservableSources that need to be eagerly concatenated
* @param sources an array of ObservableSources that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
* is interpreted as indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each ObservableSource source
Expand All @@ -1341,6 +1343,58 @@ public static <T> Observable<T> concatArrayEager(int maxConcurrency, int prefetc
return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, false);
}

/**
* Concatenates an array of {@link ObservableSource}s eagerly into a single stream of values
* and delaying any errors until all sources terminate.
* <p>
* <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEagerDelayError.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s
* and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of {@code ObservableSource}s that need to be eagerly concatenated
* @return the new Observable instance with the specified concatenation behavior
* @since 2.2.1 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayEagerDelayError(ObservableSource<? extends T>... sources) {
return concatArrayEagerDelayError(bufferSize(), bufferSize(), sources);
}

/**
* Concatenates an array of {@link ObservableSource}s eagerly into a single stream of values
* and delaying any errors until all sources terminate.
* <p>
* <img width="640" height="460" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatArrayEagerDelayError.nn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s
* and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources an array of {@code ObservableSource}s that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
* is interpreted as indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each {@code ObservableSource} source
* @return the new Observable instance with the specified concatenation behavior
* @since 2.2.1 - experimental
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> concatArrayEagerDelayError(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources) {
return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, true);
}

/**
* Concatenates the Iterable sequence of ObservableSources into a single sequence by subscribing to each ObservableSource,
* one after the other, one at a time and delays any errors till the all inner ObservableSources terminate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,4 +1228,110 @@ public void accept(List<Integer> v)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(list);
}

@Test
public void arrayDelayErrorDefault() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
PublishProcessor<Integer> pp3 = PublishProcessor.create();

@SuppressWarnings("unchecked")
TestSubscriber<Integer> ts = Flowable.concatArrayEagerDelayError(pp1, pp2, pp3)
.test();

ts.assertEmpty();

assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
assertTrue(pp3.hasSubscribers());

pp2.onNext(2);
pp2.onComplete();

ts.assertEmpty();

pp1.onNext(1);

ts.assertValuesOnly(1);

pp1.onComplete();

ts.assertValuesOnly(1, 2);

pp3.onComplete();

ts.assertResult(1, 2);
}

@Test
public void arrayDelayErrorMaxConcurrency() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
PublishProcessor<Integer> pp3 = PublishProcessor.create();

@SuppressWarnings("unchecked")
TestSubscriber<Integer> ts = Flowable.concatArrayEagerDelayError(2, 2, pp1, pp2, pp3)
.test();

ts.assertEmpty();

assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
assertFalse(pp3.hasSubscribers());

pp2.onNext(2);
pp2.onComplete();

ts.assertEmpty();

pp1.onNext(1);

ts.assertValuesOnly(1);

pp1.onComplete();

assertTrue(pp3.hasSubscribers());

ts.assertValuesOnly(1, 2);

pp3.onComplete();

ts.assertResult(1, 2);
}

@Test
public void arrayDelayErrorMaxConcurrencyErrorDelayed() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
PublishProcessor<Integer> pp3 = PublishProcessor.create();

@SuppressWarnings("unchecked")
TestSubscriber<Integer> ts = Flowable.concatArrayEagerDelayError(2, 2, pp1, pp2, pp3)
.test();

ts.assertEmpty();

assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
assertFalse(pp3.hasSubscribers());

pp2.onNext(2);
pp2.onError(new TestException());

ts.assertEmpty();

pp1.onNext(1);

ts.assertValuesOnly(1);

pp1.onComplete();

assertTrue(pp3.hasSubscribers());

ts.assertValuesOnly(1, 2);

pp3.onComplete();

ts.assertFailure(TestException.class, 1, 2);
}
}
Loading