From 68988505bb10cdfc4d1109166451ce09be12f026 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Mon, 16 Jan 2023 19:33:14 +0200 Subject: [PATCH] ensures empty inner handled properly in case of sync fusion (#3329) closes https://github.com/reactor/reactor-core/issues/3327 (#3307 followup) --- .../core/publisher/FluxFlattenIterable.java | 4 ++-- .../publisher/FluxFlattenIterableTest.java | 20 ++++++++++++++++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java index 1764b5ab8e..c4465f685d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -588,7 +588,7 @@ void drainSync() { sp = iterable.spliterator(); itFinite = FluxIterable.checkFinite(sp); - isEmpty = itFinite && sp.estimateSize() == 0; + isEmpty = itFinite ? sp.estimateSize() == 0 : !hasNext(sp); } catch (Throwable exc) { resetCurrent(); diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java index f288dcc39c..39a87feaf9 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -705,6 +705,24 @@ public void testFluxIterableEmptyCase(Function fn) { .verify(); } + @ParameterizedTestWithName + @MethodSource("reactor.core.publisher.FluxIterableTest#factory") + public void testFluxIterableSyncFusionEmptyCase(Function fn) { + Iterable iterable = mock(Iterable.class); + Mockito.when(iterable.spliterator()) + .thenReturn(mock(Spliterator.class)); + + StepVerifier.create( + Flux.just(1, 2) + .flatMapIterable(__ -> iterable) + .as(fn) + .next() + ) + .expectSubscription() + .expectComplete() + .verify(); + } + @ParameterizedTestWithName @MethodSource("reactor.core.publisher.FluxIterableTest#factory") public void testFluxIterableErrorHasNext(Function fn) {