diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index bb636c4a9c..055a0e97be 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -2883,6 +2883,9 @@ public final Flux> buffer(int maxSize) { * or once this Flux completes. *

* + *

+ * Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation + * of {@link Collection#add(Object)} for a given element, that element will be discarded. * *

Discard Support: This operator discards the currently open buffer upon cancellation or error triggered by a data signal, * as well as latest unbuffered element if the bufferSupplier fails. @@ -2947,6 +2950,10 @@ public final Flux> buffer(int maxSize, int skip) { *

* * + *

+ * Note for exact buffers: If buffers provided by the bufferSupplier return {@literal false} upon invocation + * of {@link Collection#add(Object)} for a given element, that element will be discarded. + * *

Discard Support: This operator discards elements in between buffers (in the case of * dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal. * Note however that overlapping buffer variant DOES NOT discard, as this might result in an element diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java index e3dd4695ca..a979f85d3e 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -153,11 +153,14 @@ public void onNext(T t) { buffer = b; } - b.add(t); - - if (b.size() == size) { - buffer = null; - actual.onNext(b); + if (b.add(t)) { + if (b.size() == size) { + buffer = null; + actual.onNext(b); + } + } else { + Operators.onDiscard(t, actual.currentContext()); + s.request(1); } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index 54617c2673..0e02134af1 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,17 +19,24 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.provider.CsvSource; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Scannable; +import reactor.test.ParameterizedTestWithName; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; import reactor.test.subscriber.AssertSubscriber; +import reactor.util.annotation.Nullable; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -661,4 +668,33 @@ public void discardOnErrorOverlap() { .verifyThenAssertThat() .hasDiscardedExactly(1, 2, 3, 3); //we already opened a 2nd buffer } + + @ParameterizedTestWithName + @CsvSource({ + "1|2, 1|2, ", + "1|1|1, 1, 1|1", + "1|1|2, 1|2, 1", + "1|2|1, 1|2;1, ", + "1|2|1|3, 1|2;1|3, ", + "1|1|2|3, 1|2;3, 1", + "2|1|1|3, 2|1;1|3, " + }) + public void bufferExactSupplierUsesSet(String input, String output, @Nullable String discard) { + List> outputs = Arrays.stream(output.split(";")) + .map(it -> Arrays.stream(it.split("\\|")).collect(Collectors.toSet())) + .collect(Collectors.toList()); + + StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) + .>buffer(2, HashSet::new) + .as(it -> StepVerifier.create(it, outputs.size())) + .expectNextSequence(outputs) + .expectComplete() + .verifyThenAssertThat(Duration.ofSeconds(2)); + + if (discard == null) { + assertions.hasNotDiscardedElements(); + } else { + assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); + } + } }