Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3821] Support Set for exact buffers in Flux.buffer #3822

Merged
merged 11 commits into from
Aug 22, 2024
7 changes: 7 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -2883,6 +2883,9 @@ public final Flux<List<T>> buffer(int maxSize) {
* or once this Flux completes.
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSize.svg" alt="">
* <p>
* Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation
chemicL marked this conversation as resolved.
Show resolved Hide resolved
* of {@link Collection#add(Object)} for a given element, that element will be discarded.
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal,
* as well as latest unbuffered element if the bufferSupplier fails.
Expand Down Expand Up @@ -2947,6 +2950,10 @@ public final Flux<List<T>> buffer(int maxSize, int skip) {
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSizeEqualsSkipSize.svg" alt="">
*
* <p>
* Note for exact buffers: If buffers provided by the bufferSupplier return {@literal false} upon invocation
* of {@link Collection#add(Object)} for a given element, that element will be discarded.
*
* <p><strong>Discard Support:</strong> This operator discards elements in between buffers (in the case of
* dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal.
* Note however that overlapping buffer variant DOES NOT discard, as this might result in an element
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -153,11 +153,14 @@ public void onNext(T t) {
buffer = b;
}

b.add(t);

if (b.size() == size) {
buffer = null;
actual.onNext(b);
if (b.add(t)) {
if (b.size() == size) {
buffer = null;
actual.onNext(b);
}
} else {
Operators.onDiscard(t, actual.currentContext());
s.request(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,17 +19,24 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.CsvSource;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.ParameterizedTestWithName;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -661,4 +668,33 @@ public void discardOnErrorOverlap() {
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3, 3); //we already opened a 2nd buffer
}

@ParameterizedTestWithName
@CsvSource({
"1|2, 1|2, ",
"1|1|1, 1, 1|1",
"1|1|2, 1|2, 1",
"1|2|1, 1|2;1, ",
"1|2|1|3, 1|2;1|3, ",
"1|1|2|3, 1|2;3, 1",
"2|1|1|3, 2|1;1|3, "
})
public void bufferExactSupplierUsesSet(String input, String output, @Nullable String discard) {
List<Set<Object>> outputs = Arrays.stream(output.split(";"))
.map(it -> Arrays.<Object>stream(it.split("\\|")).collect(Collectors.toSet()))
.collect(Collectors.toList());

StepVerifier.Assertions assertions = Flux.just(input.split("\\|"))
.<Collection<Object>>buffer(2, HashSet::new)
.as(it -> StepVerifier.create(it, outputs.size()))
.expectNextSequence(outputs)
.expectComplete()
.verifyThenAssertThat(Duration.ofSeconds(2));

if (discard == null) {
assertions.hasNotDiscardedElements();
} else {
assertions.hasDiscardedExactly((Object[]) discard.split("\\|"));
}
}
}
Loading