Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if saveAll(Flux<T>) can be reverted to user bufferTimeout(). #2607

Closed
sothawo opened this issue Jun 27, 2023 · 2 comments · Fixed by #2753
Closed

Check if saveAll(Flux<T>) can be reverted to user bufferTimeout(). #2607

sothawo opened this issue Jun 27, 2023 · 2 comments · Fixed by #2753
Labels
type: enhancement A general enhancement

Comments

@sothawo
Copy link
Collaborator

sothawo commented Jun 27, 2023

Referring to the discussion in #2576 (comment) we should check if the implementation of saveAll(Flux<T)) can be reverted to use the more performant bufferTimeout() implementation when reactor core is pulled in in version 3.5.7.

@sothawo sothawo added the type: enhancement A general enhancement label Jun 27, 2023
@sothawo
Copy link
Collaborator Author

sothawo commented Aug 21, 2023

Note 2023-08-21: I reverted to using bufferTimeout() again on a branch created from the current development, the test introduced/adapted with #2576 then fails again. The used version of reactore-core is 3.6.0-M2. So we're staying on the window().concatMap() solution.

@chemicL
Copy link
Member

chemicL commented Nov 3, 2023

Hi, I played a bit with the saveAll method and the test introduced in #2581. When you say:

reverted to using bufferTimeout() again

Do you mean using bufferTimeout(int, Duration)? If so, this has not been affected by reactor/reactor-core#3332, but a new variant was introduced, bufferTimeout(int, Duration, boolean), where the last argument can be set to true in order to achieve fair back pressure, that will prevent the overflow error.

Here's a diff of how this could work. I adjusted the time values to make it easily reproducible on my machine.

diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java
index 6c81dfbe..446b18c0 100644
--- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java
+++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java
@@ -233,8 +233,8 @@ abstract public class AbstractReactiveElasticsearchTemplate

                return Flux.defer(() -> {
                        Sinks.Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
-                       entities.window(bulkSize) //
-                                                       .concatMap(flux -> flux.collectList()) //
+                       entities.bufferTimeout(bulkSize, Duration.ofMillis(20), true)
+//                     entities.bufferTimeout(bulkSize, Duration.ofMillis(20)) // <- this fails with onError(reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests))
                                                        .subscribe(new Subscriber<List<T>>() {
                                private Subscription subscription;
                                private AtomicBoolean upstreamComplete = new AtomicBoolean(false);
diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java
index ac5dd642..de46be17 100644
--- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java
+++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java
@@ -1191,7 +1191,7 @@ public abstract class ReactiveElasticsearchIntegrationTests {

                // we add a random delay to make suure the underlying implementation handles irregular incoming data
                var entities = Flux.fromIterable(entityList).concatMap(
-                               entity -> Mono.just(entity).delay(Duration.ofMillis((long) (Math.random() * 10))).thenReturn(entity));
+                               entity -> Mono.just(entity).delay(Duration.ofMillis((long) (Math.random() * 5))).thenReturn(entity));

                operations.save(entities, SampleEntity.class).collectList() //
                                .as(StepVerifier::create) //

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants