diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java index ac195cd724..1764b5ab8e 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java @@ -382,7 +382,7 @@ void drainAsync() { sp = iterable.spliterator(); itFinite = FluxIterable.checkFinite(sp); - isEmpty = itFinite && sp.estimateSize() == 0; + isEmpty = itFinite ? sp.estimateSize() == 0 : !hasNext(sp); } catch (Throwable exc) { sp = null; diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxIterable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxIterable.java index 49e714f21d..30e8e704e0 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxIterable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxIterable.java @@ -61,13 +61,9 @@ static boolean checkFinite(Spliterator spliterator) { @Nullable private final Runnable onClose; - FluxIterable(Iterable iterable, @Nullable Runnable onClose) { - this.iterable = Objects.requireNonNull(iterable, "iterable"); - this.onClose = onClose; - } - FluxIterable(Iterable iterable) { - this(iterable, null); + this.iterable = Objects.requireNonNull(iterable, "iterable"); + this.onClose = null; } @Override @@ -158,11 +154,51 @@ static void subscribe(CoreSubscriber s, Spliterator } if (s instanceof ConditionalSubscriber) { - s.onSubscribe(new IterableSubscriptionConditional<>((ConditionalSubscriber) s, - sp, knownToBeFinite, onClose)); + IterableSubscriptionConditional isc = + new IterableSubscriptionConditional<>((ConditionalSubscriber) s, + sp, + knownToBeFinite, + onClose); + + boolean hasNext; + try { + hasNext = isc.hasNext(); + } + catch (Throwable ex) { + Operators.error(s, ex); + isc.onCloseWithDropError(); + return; + } + + if (!hasNext) { + Operators.complete(s); + isc.onCloseWithDropError(); + return; + } + + s.onSubscribe(isc); } else { - s.onSubscribe(new IterableSubscription<>(s, sp, knownToBeFinite, onClose)); + IterableSubscription is = + new IterableSubscription<>(s, sp, knownToBeFinite, onClose); + + boolean hasNext; + try { + hasNext = is.hasNext(); + } + catch (Throwable ex) { + Operators.error(s, ex); + is.onCloseWithDropError(); + return; + } + + if (!hasNext) { + Operators.complete(s); + is.onCloseWithDropError(); + return; + } + + s.onSubscribe(is); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxStream.java b/reactor-core/src/main/java/reactor/core/publisher/FluxStream.java index 2680bafcef..270ddd2045 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxStream.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxStream.java @@ -16,10 +16,8 @@ package reactor.core.publisher; -import java.util.Iterator; import java.util.Objects; import java.util.Spliterator; -import java.util.Spliterators; import java.util.function.Supplier; import java.util.stream.Stream; diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java index 6ddd231027..f288dcc39c 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java @@ -23,19 +23,24 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Spliterator; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Fuseable; import reactor.core.Scannable; import reactor.core.Scannable.Attr; import reactor.core.scheduler.Schedulers; +import reactor.test.ParameterizedTestWithName; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; import reactor.test.subscriber.AssertSubscriber; @@ -43,6 +48,9 @@ import reactor.util.context.Context; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class FluxFlattenIterableTest extends FluxOperatorTest { @@ -678,6 +686,47 @@ public Context currentContext() { assertThat(test.currentKnownToBeFinite).as("knownFinite reset").isFalse(); } + @ParameterizedTestWithName + @MethodSource("reactor.core.publisher.FluxIterableTest#factory") + public void testFluxIterableEmptyCase(Function fn) { + Iterable iterable = mock(Iterable.class); + Mockito.when(iterable.spliterator()) + .thenReturn(mock(Spliterator.class)); + + StepVerifier.create( + Flux.just(1) + .hide() + .flatMapIterable(__ -> iterable) + .as(fn) + .next() + ) + .expectSubscription() + .expectComplete() + .verify(); + } + + @ParameterizedTestWithName + @MethodSource("reactor.core.publisher.FluxIterableTest#factory") + public void testFluxIterableErrorHasNext(Function fn) { + Iterable iterable = mock(Iterable.class); + Spliterator mock = mock(Spliterator.class); + Mockito.when(iterable.spliterator()) + .thenReturn(mock); + + when(mock.tryAdvance(any())).thenThrow(); + + StepVerifier.create( + Flux.just(1) + .hide() + .flatMapIterable(__ -> iterable) + .as(fn) + .next() + ) + .expectSubscription() + .expectError() + .verify(); + } + static class ReferenceCounted { int refCount = 1; diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxIterableTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxIterableTest.java index 412d0b5e87..5b910168c7 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxIterableTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxIterableTest.java @@ -22,11 +22,16 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -36,6 +41,7 @@ import reactor.core.Scannable; import reactor.core.scheduler.Schedulers; import reactor.test.MockUtils; +import reactor.test.ParameterizedTestWithName; import reactor.test.StepVerifier; import reactor.test.subscriber.AssertSubscriber; import reactor.util.annotation.NonNull; @@ -44,11 +50,119 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class FluxIterableTest { final Iterable source = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + static Stream> factory() { + return Stream.of(new Function() { + @Override + public Flux apply(Flux flux) { + return flux; + } + + @Override + public String toString() { + return "normal fast-path"; + } + }, new Function() { + @Override + public Flux apply(Flux flux) { + return flux.filter(__ -> true); + } + + @Override + public String toString() { + return "conditional fast-path"; + } + }, new Function() { + @Override + public Flux apply(Flux flux) { + return flux.limitRate(1); + } + + @Override + public String toString() { + return "fused"; + } + }, new Function() { + @Override + public Flux apply(Flux flux) { + return flux.hide() + .limitRate(1); + } + + @Override + public String toString() { + return "normal slow-path"; + } + }, new Function() { + @Override + public Flux apply(Flux flux) { + return flux.filter(__ -> true) + .hide() + .limitRate(1); + } + + @Override + public String toString() { + return "conditional slow-path"; + } + }, new Function() { + @Override + public Flux apply(Flux flux) { + return flux.filter(__ -> true) + .limitRate(1); + } + + @Override + public String toString() { + return "conditional-fused"; + } + }); + } + + @ParameterizedTestWithName + @MethodSource("factory") + public void testFluxIterableEmptyCase(Function fn) { + Iterable iterable = mock(Iterable.class); + Mockito.when(iterable.spliterator()) + .thenReturn(mock(Spliterator.class)); + + StepVerifier.create( + Flux.fromIterable(iterable) + .as(fn) + .next() + ) + .expectSubscription() + .expectComplete() + .verify(); + } + + @ParameterizedTestWithName + @MethodSource("factory") + public void testFluxIterableErrorHasNext(Function fn) { + Iterable iterable = mock(Iterable.class); + Spliterator mock = mock(Spliterator.class); + Mockito.when(iterable.spliterator()) + .thenReturn(mock); + + when(mock.tryAdvance(any())).thenThrow(); + + StepVerifier.create( + Flux.fromIterable(iterable) + .as(fn) + .next() + ) + .expectSubscription() + .expectError() + .verify(); + } + @Test //https://github.com/reactor/reactor-core/issues/3295 public void useIterableOncePerSubscriber() { @@ -253,7 +367,7 @@ public void scanSubscription() { @Test public void scanConditionalSubscription() { @SuppressWarnings("unchecked") - Fuseable.ConditionalSubscriber actual = Mockito.mock(MockUtils.TestScannableConditionalSubscriber.class); + Fuseable.ConditionalSubscriber actual = mock(MockUtils.TestScannableConditionalSubscriber.class); Mockito.when(actual.currentContext()).thenReturn(Context.empty()); FluxIterable.IterableSubscriptionConditional test = new FluxIterable.IterableSubscriptionConditional<>(actual, Collections.singleton("test").spliterator(), true); @@ -328,7 +442,7 @@ void smokeTestIterableConditionalSubscriptionWithInfiniteIterable() { Context discardingContext = Operators.enableOnDiscard(Context.empty(), v -> { }); @SuppressWarnings("unchecked") - Fuseable.ConditionalSubscriber testSubscriber = Mockito.mock(Fuseable.ConditionalSubscriber.class); + Fuseable.ConditionalSubscriber testSubscriber = mock(Fuseable.ConditionalSubscriber.class); Mockito.when(testSubscriber.currentContext()).thenReturn(discardingContext); Spliterator iterator = Spliterators.spliteratorUnknownSize(new Iterator() {