From 2a4b18e743f0958551f4e31fdf81e618cc35f238 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Fri, 24 Feb 2017 15:40:05 +0100 Subject: [PATCH] 2.x: fix flatMap not cancelling the upstream eagerly (#5133) --- .../operators/flowable/FlowableFlatMap.java | 36 +++++++++++++------ .../operators/flowable/FlowableMergeTest.java | 19 ++++++++++ .../observable/ObservableMergeTest.java | 19 ++++++++++ .../operators/single/SingleMergeTest.java | 24 ++++++++++++- 4 files changed, 87 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index bd5ad2f916..fbfed551fb 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -551,19 +551,24 @@ void drainLoop() { boolean checkTerminate() { if (cancelled) { - SimpleQueue q = queue; - if (q != null) { - q.clear(); - } + clearScalarQueue(); return true; } if (!delayErrors && errs.get() != null) { + clearScalarQueue(); actual.onError(errs.terminate()); return true; } return false; } + void clearScalarQueue() { + SimpleQueue q = queue; + if (q != null) { + q.clear(); + } + } + void disposeAll() { InnerSubscriber[] a = subscribers.get(); if (a != CANCELLED) { @@ -579,6 +584,21 @@ void disposeAll() { } } } + + void innerError(InnerSubscriber inner, Throwable t) { + if (errs.addThrowable(t)) { + inner.done = true; + if (!delayErrors) { + s.cancel(); + for (InnerSubscriber a : subscribers.getAndSet(CANCELLED)) { + a.dispose(); + } + } + drain(); + } else { + RxJavaPlugins.onError(t); + } + } } static final class InnerSubscriber extends AtomicReference @@ -636,12 +656,8 @@ public void onNext(U t) { } @Override public void onError(Throwable t) { - if (parent.errs.addThrowable(t)) { - done = true; - parent.drain(); - } else { - RxJavaPlugins.onError(t); - } + lazySet(SubscriptionHelper.CANCELLED); + parent.innerError(this, t); } @Override public void onComplete() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java index 607c461443..12cf51900a 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java @@ -28,10 +28,12 @@ import io.reactivex.*; import io.reactivex.Scheduler.Worker; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.*; import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; @@ -1630,4 +1632,21 @@ public void mergeArray() { .test() .assertResult(1, 2); } + + @Test + public void mergeErrors() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable source1 = Flowable.error(new TestException("First")); + Flowable source2 = Flowable.error(new TestException("Second")); + + Flowable.merge(source1, source2) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeTest.java index a5479bf8cf..85907bcf23 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeTest.java @@ -27,8 +27,10 @@ import io.reactivex.Observer; import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; public class ObservableMergeTest { @@ -1125,4 +1127,21 @@ public void mergeArray() { .test() .assertResult(1, 2); } + + @Test + public void mergeErrors() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable source1 = Observable.error(new TestException("First")); + Observable source2 = Observable.error(new TestException("Second")); + + Observable.merge(source1, source2) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleMergeTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleMergeTest.java index 7e9f220192..8fdbff7ff1 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleMergeTest.java @@ -13,9 +13,15 @@ package io.reactivex.internal.operators.single; +import static org.junit.Assert.assertTrue; + +import java.util.List; + import org.junit.Test; -import io.reactivex.Single; +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; public class SingleMergeTest { @@ -48,4 +54,20 @@ public void merge4() { .assertResult(1, 2, 3, 4); } + @Test + public void mergeErrors() { + List errors = TestHelper.trackPluginErrors(); + try { + Single source1 = Single.error(new TestException("First")); + Single source2 = Single.error(new TestException("Second")); + + Single.merge(source1, source2) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } }