diff --git a/src/main/java/reactor/core/publisher/FluxFlattenIterable.java b/src/main/java/reactor/core/publisher/FluxFlattenIterable.java index 21f0845594..ce7537a043 100644 --- a/src/main/java/reactor/core/publisher/FluxFlattenIterable.java +++ b/src/main/java/reactor/core/publisher/FluxFlattenIterable.java @@ -627,7 +627,7 @@ public void clear() { public boolean isEmpty() { Iterator it = current; if (it != null) { - return it.hasNext(); + return !it.hasNext(); } return queue.isEmpty(); // estimate } diff --git a/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java b/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java index 455b80b42b..ce689963d3 100644 --- a/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java +++ b/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java @@ -313,4 +313,18 @@ public void testDrainAsyncCompletesSeveralBatches() { .verifyComplete(); } + /** + * See https://github.com/reactor/reactor-core/issues/508 + */ + @Test + public void testPublishingTwice() { + StepVerifier.create(Flux.just(Flux.range(0, 300).toIterable(), Flux.range(0, 300).toIterable()) + .flatMapIterable(x -> x) + .share() + .share() + .count()) + .expectNext(600L) + .verifyComplete(); + } + }