Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3821] Support Set for exact buffers in Flux.buffer #3822

Merged
merged 11 commits into from
Aug 22, 2024
32 changes: 30 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -2882,6 +2882,9 @@ public final Flux<List<T>> buffer(int maxSize) {
* will be emitted by the returned {@link Flux} each time the given max size is reached
* or once this Flux completes.
* <p>
* Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation
chemicL marked this conversation as resolved.
Show resolved Hide resolved
* of {@link Collection#add(Object)} for a given element, that element will be discarded.
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSize.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal,
Expand All @@ -2900,8 +2903,7 @@ public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Suppl
/**
* Collect incoming values into multiple {@link List} buffers that will be emitted
* by the returned {@link Flux} each time the given max size is reached or once this
* Flux completes. Buffers can be created with gaps, as a new buffer will be created
* every time {@code skip} values have been emitted by the source.
* Flux completes.
* <p>
* When maxSize < skip : dropping buffers
* <p>
Expand All @@ -2915,6 +2917,13 @@ public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Suppl
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSizeEqualsSkipSize.svg" alt="">
*
* <p>There are nuances to consider if a supplied buffer may return {@code false} when elements
* are added. For "dropping" buffers, if an item is not added to an in-flight buffer, it is
* discarded, and therefore a buffer may not be created every {@code skip} elements. For
* "overlapping" buffers, an item will be discarded if it cannot be added to <strong>any</strong>
* in-flight buffer. For "exact" buffers, an item will be discarded if cannot be added to the
* in-flight buffer.
*
* <p><strong>Discard Support:</strong> This operator discards elements in between buffers (in the case of
* dropping buffers). It also discards the currently open buffer upon cancellation or error triggered by a data signal.
* Note however that overlapping buffer variant DOES NOT discard, as this might result in an element
Expand Down Expand Up @@ -2945,6 +2954,9 @@ public final Flux<List<T>> buffer(int maxSize, int skip) {
* <p>
* When maxSize == skip : exact buffers
* <p>
* Note that if buffers provided by the bufferSupplier return {@literal false} upon invocation
* of {@link Collection#add(Object)} for a given element, that element will be discarded.
* <p>
* <img class="marble" src="doc-files/marbles/bufferWithMaxSizeEqualsSkipSize.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards elements in between buffers (in the case of
Expand Down Expand Up @@ -3123,6 +3135,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) {
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is false.

* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand Down Expand Up @@ -3163,6 +3179,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Schedule
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is false as well.

* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand Down Expand Up @@ -3230,6 +3250,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime,
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this one is also false. Discard support is not present in bufferTimeout for buffer.add returning false.

* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand All @@ -3254,6 +3278,10 @@ public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSiz
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}.
* <p>
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is
* less than the specified max size. The element will be discarded in such a case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also false here.

* <p>
* <img class="marble" src="doc-files/marbles/bufferTimeoutWithMaxSizeAndTimespan.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards the currently open buffer upon cancellation or error triggered by a data signal.
Expand Down
51 changes: 31 additions & 20 deletions reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -153,9 +153,10 @@ public void onNext(T t) {
buffer = b;
}

b.add(t);

if (b.size() == size) {
if (!b.add(t)) {
Sage-Pierce marked this conversation as resolved.
Show resolved Hide resolved
Sage-Pierce marked this conversation as resolved.
Show resolved Hide resolved
Operators.onDiscard(t, actual.currentContext());
s.request(1);
} else if (b.size() == size) {
buffer = null;
actual.onNext(b);
}
Expand Down Expand Up @@ -306,8 +307,11 @@ public void onNext(T t) {
}

if (b != null) {
b.add(t);
if (b.size() == size) {
if (!b.add(t)) {
Operators.onDiscard(t, this.ctx);
s.request(1);
return;
Sage-Pierce marked this conversation as resolved.
Show resolved Hide resolved
} else if (b.size() == size) {
buffer = null;
actual.onNext(b);
}
Expand Down Expand Up @@ -477,6 +481,17 @@ public void onNext(T t) {
return;
}

boolean added = isEmpty();
for (C b : this) {
added |= b.add(t);
}

if (!added) {
Operators.onDiscard(t, actual.currentContext());
s.request(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have doubts here - the demand flow without Set is that every buffer that is initiated has a chance to be fulfilled. If the downstream requests 1 it means a full buffer should be completed. As the buffers are delivered, the downstream can request more. With potentially non-deterministic implementations of Collection which can unpredictably return false some assumptions might not hold. There are a few assumptions here that dictate the flow that follows. I'd probably prefer to back away from implementations for skip != maxSize as they become complicated to reason about and explain to users what are the risks (e.g. keeping a growing list of undelivered buffers). I think we could terminate early instead of stalling and deliver an error to the downstream and cancel the source in case the buffer is unable to accept the item.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the early termination, I clarified in the general comment - better to leave the undefined behaviour in such case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the demand flow without Set is that every buffer that is initiated has a chance to be fulfilled. If the downstream requests 1 it means a full buffer should be completed. As the buffers are delivered, the downstream can request more. With potentially non-deterministic implementations of Collection which can unpredictably return false some assumptions might not hold.

Indeed, this is one of the reasons I originally implemented this with "if the data can't be added to all in-flight buffers, then discard it", which might avoid some of this weirdness.

I'd probably prefer to back away from implementations for skip != maxSize as they become complicated to reason about and explain to users what are the risks ... better to leave the undefined behaviour in such case.

I agree 😄

return;
}

long i = index;

if (i % skip == 0L) {
Expand All @@ -493,23 +508,19 @@ public void onNext(T t) {
return;
}

offer(b);
}

C b = peek();

if (b != null && b.size() + 1 == size) {
poll();

b.add(t);

actual.onNext(b);

produced++;
offer(b);
}

for (C b0 : this) {
b0.add(t);
for (C b : this) {
if (b.size() == size) {
poll();
actual.onNext(b);
produced++;
} else {
// Safe to break as soon as we find a buffer that's not yet at size
break;
}
}

index = i + 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,17 +19,24 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.CsvSource;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.ParameterizedTestWithName;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -661,4 +668,96 @@ public void discardOnErrorOverlap() {
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3, 3); //we already opened a 2nd buffer
}

@ParameterizedTestWithName
@CsvSource({
"1|2, 1|2, ",
"1|1|1, 1, 1|1",
"1|1|2, 1|2, 1",
"1|2|1, 1|2;1, ",
"1|2|1|3, 1|2;1|3, ",
"1|1|2|3, 1|2;3, 1",
"2|1|1|3, 2|1;1|3, "
})
public void bufferExactSupplierUsesSet(String input, String output, @Nullable String discard) {
List<Set<Object>> outputs = Arrays.stream(output.split(";"))
.map(it -> Arrays.<Object>stream(it.split("\\|")).collect(Collectors.toSet()))
.collect(Collectors.toList());

StepVerifier.Assertions assertions = Flux.just(input.split("\\|"))
.<Collection<Object>>buffer(2, HashSet::new)
.as(it -> StepVerifier.create(it, outputs.size()))
.expectNextSequence(outputs)
.expectComplete()
.verifyThenAssertThat(Duration.ofSeconds(2));

if (discard == null) {
assertions.hasNotDiscardedElements();
} else {
assertions.hasDiscardedExactly((Object[]) discard.split("\\|"));
}
}

@ParameterizedTestWithName
@CsvSource({
"1|2, 1|2, ",
"1|1|1, 1, 1|1",
"1|1|2, 1|2, 1",
"1|2|1, 1|2, ",
"1|2|1|3, 1|2;3, 1",
"1|2|1|1|3, 1|2;1|3, 1",
"1|1|2|3, 1|2, 1",
Sage-Pierce marked this conversation as resolved.
Show resolved Hide resolved
"2|1|1|3, 2|1;3, 1"
})
public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, @Nullable String discard) {
List<Set<Object>> outputs = Arrays.stream(output.split(";"))
.map(it -> Arrays.<Object>stream(it.split("\\|")).collect(Collectors.toSet()))
.collect(Collectors.toList());

StepVerifier.Assertions assertions = Flux.just(input.split("\\|"))
.<Collection<Object>>buffer(2, 3, HashSet::new)
.as(it -> StepVerifier.create(it, outputs.size()))
.expectNextSequence(outputs)
.thenCancel()
.verifyThenAssertThat(Duration.ofSeconds(2));

if (discard == null) {
assertions.hasNotDiscardedElements();
} else {
assertions.hasDiscardedExactly((Object[]) discard.split("\\|"));
}
}

@ParameterizedTestWithName
@CsvSource({
"1|2, 1|2, ",
"1|1|1, 1, 1|1",
"1|1|2, 1|2, 1",
"1|2|1, 1|2, 1",
"1|2|1|3, 1|2|3;3, 1",
"1|2|1|3|3|4, 1|2|3|4;3|4, 1|3",
"1|2|3|1|3|4, 1|2|3|4;3|1|4;4, 3",
"3|2|1|2|3|2|3|4, 3|2|1|4;1|2|3|4;3|2|4;4, 3",
"1|1|2|3, 1|2|3;3, 1",
"2|1|1|3, 2|1|3;3, 1",
"1|2|1|2|3, 1|2|3;3, 1|2"
})
public void bufferOverlapWithMax4Skip2SupplierUsesSet(String input, String output, @Nullable String discard) {
List<Set<Object>> outputs = Arrays.stream(output.split(";"))
.map(it -> Arrays.<Object>stream(it.split("\\|")).collect(Collectors.toSet()))
.collect(Collectors.toList());

StepVerifier.Assertions assertions = Flux.just(input.split("\\|"))
.<Collection<Object>>buffer(4, 2, HashSet::new)
.as(it -> StepVerifier.create(it, outputs.size()))
.expectNextSequence(outputs)
.expectComplete()
.verifyThenAssertThat(Duration.ofSeconds(2));

if (discard == null) {
assertions.hasNotDiscardedElements();
} else {
assertions.hasDiscardedExactly((Object[]) discard.split("\\|"));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -421,4 +422,14 @@ public void discardOnError() {
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}

@Test
public void bufferSupplierUsesSet() {
Flux.just(1, 1, 1, 1, 1, 1, 1)
.<Set<Object>>bufferTimeout(3, Duration.ofSeconds(2), HashSet::new)
.as(it -> StepVerifier.create(it, 3))
.expectNext(Collections.singleton(1), Collections.singleton(1), Collections.singleton(1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it of little practical use in the end -> the result is that a buffer is emitted every time maxSize items are consumed from the source, not when the buffer size is at the maxSize limit.

.expectComplete()
.verify(Duration.ofSeconds(2));
}
}