Skip to content

Commit

Permalink
[#3821] Support Set for exact buffers in Flux.buffer (#3822)
Browse files Browse the repository at this point in the history
When a `Set` was returned from the `Supplier` provided to `Flux.buffer`,
the stream would hang if/when there were duplicates in a given buffer.
This change allows using `Set` for exact buffers - buffers where
`maxSize` and `skip` arguments are equal. It also guarantees in this
case the duplicate items are discarded and the upstream demand is
replenished.

Resolves #3821

---------

Co-authored-by: spierce <[email protected]>
  • Loading branch information
Sage-Pierce and spierce authored Aug 22, 2024
1 parent 6b71ff0 commit e58658d
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 7 deletions.
7 changes: 7 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -2883,6 +2883,9 @@ public final Flux<List<T>> buffer(int maxSize) {
* or once this Flux completes.
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSize.svg" alt="">
* <p>
* Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation
* of {@link Collection#add(Object)} for a given element, that element will be discarded.
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal,
* as well as latest unbuffered element if the bufferSupplier fails.
Expand Down Expand Up @@ -2947,6 +2950,10 @@ public final Flux<List<T>> buffer(int maxSize, int skip) {
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSizeEqualsSkipSize.svg" alt="">
*
* <p>
* Note for exact buffers: If buffers provided by the bufferSupplier return {@literal false} upon invocation
* of {@link Collection#add(Object)} for a given element, that element will be discarded.
*
* <p><strong>Discard Support:</strong> This operator discards elements in between buffers (in the case of
* dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal.
* Note however that overlapping buffer variant DOES NOT discard, as this might result in an element
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -153,11 +153,14 @@ public void onNext(T t) {
buffer = b;
}

b.add(t);

if (b.size() == size) {
buffer = null;
actual.onNext(b);
if (b.add(t)) {
if (b.size() == size) {
buffer = null;
actual.onNext(b);
}
} else {
Operators.onDiscard(t, actual.currentContext());
s.request(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,17 +19,24 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.CsvSource;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.ParameterizedTestWithName;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -661,4 +668,33 @@ public void discardOnErrorOverlap() {
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3, 3); //we already opened a 2nd buffer
}

@ParameterizedTestWithName
@CsvSource({
"1|2, 1|2, ",
"1|1|1, 1, 1|1",
"1|1|2, 1|2, 1",
"1|2|1, 1|2;1, ",
"1|2|1|3, 1|2;1|3, ",
"1|1|2|3, 1|2;3, 1",
"2|1|1|3, 2|1;1|3, "
})
public void bufferExactSupplierUsesSet(String input, String output, @Nullable String discard) {
List<Set<Object>> outputs = Arrays.stream(output.split(";"))
.map(it -> Arrays.<Object>stream(it.split("\\|")).collect(Collectors.toSet()))
.collect(Collectors.toList());

StepVerifier.Assertions assertions = Flux.just(input.split("\\|"))
.<Collection<Object>>buffer(2, HashSet::new)
.as(it -> StepVerifier.create(it, outputs.size()))
.expectNextSequence(outputs)
.expectComplete()
.verifyThenAssertThat(Duration.ofSeconds(2));

if (discard == null) {
assertions.hasNotDiscardedElements();
} else {
assertions.hasDiscardedExactly((Object[]) discard.split("\\|"));
}
}
}

0 comments on commit e58658d

Please sign in to comment.