Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR https://github.com/spring-projects/spring-data-elasticsearch/pull/2497 might be missing valuable unit test case #2576

Closed
patpatpat123 opened this issue May 29, 2023 · 6 comments · Fixed by #2581
Labels
type: bug A general bug

Comments

@patpatpat123
Copy link

Hello team,

In PR #2497, a great improvement was made for saveAll on flux.

797dbb5#diff-07601eb84212c1000dfec4ee54028e05ccbdae2f81a9756601a3ed0175e66bc4R229

However, the unit test cases from this PR are happy test cases (maybe a bit to happy). The cases are only a portion of what could really happen with a Flux.

I pulled the repo and added a generic test case, nothing fancy. Instead of a steady and simplistic flux, just add another unit test with this flux:

saveAll(Flux.interval(Duration.ofMillis(100)).map([...]))

Adding this unit test with Flux.interval (short interval), which is a common use case, can help improve the overall stability of this repo.

Question 1: Would it be possible to enhance the current test cases with a test where the flux is "more aggressive"?

Furthermore, adding this test will reveal a possible drawback with the current implementation.

The current implementation is vulnerable to Could not emit buffer due to lack of requests issue.

This is very easy to reproduce with a sample piece of code such as

@Service
public final class SomeService implements CommandLineRunner {

    @Autowired
    ReactiveElasticSeachRepository reactiveElasticSeachRepository;

    @Override
    public void run(final String[] args) {
        Flux<Foo> createCommandFlux = Flux.interval(Duration.ofMillis(100)).map(i -> new Foo(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC), String.valueOf(i)));
        Flux<Foo> savedFlux = reactiveElasticSeachRepository.saveAll(createCommandFlux);
        savedFlux.subscribe();
    }

}

issue.zip

While the root cause of the problem is from Reactor Core, the reactor team also mentioned the instability of this .bufferTimeout() method, and instead, even the reactor team suggest the usage of .window() + concatMap() instead.

Question 2: Instead of using a reactive operator which is known to be unstable, can this repo use a more appropriate operator such as .window() + concatMap()?

Thank you

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label May 29, 2023
sothawo added a commit that referenced this issue Jun 2, 2023
Original Pull Request #2581
Closes #2576
@sothawo
Copy link
Collaborator

sothawo commented Jun 2, 2023

just add another unit test with this flux:
saveAll(Flux.interval(Duration.ofMillis(100)).map([...]))

This is no code to be run in a unit or ingtegration test. It creates an unlimited Flux of data. When would this test be seen as successful? It would not end.

I could reproduce the error with the already existing test function by introducing random delays between the elements in the flux. I switched the implementation to use window(), test are running wiht that.

BTW I tried different versions of window(), the ones with timeout are happily throwing errors as well, so this timeout mechanism doesn't seem to be something working reliably in reactor.

@sothawo sothawo added this to the 5.2 M1 (2023.1.0) milestone Jun 2, 2023
@sothawo sothawo added type: bug A general bug and removed status: waiting-for-triage An issue we've not yet triaged labels Jun 2, 2023
sothawo added a commit that referenced this issue Jun 2, 2023
Original Pull Request #2581
Closes #2576

(cherry picked from commit d6b5540)
@patpatpat123
Copy link
Author

Many thanks for this @sothawo

On my side, I am not observing the issue anymore.
Good day!

@tongdaoqa
Copy link

Hey @patpatpat123, not sure about what you said.

While this might have been a good temporary fix, I agree with what @sothawo said here: #2508

As you wrote, the issue you see is not in Spring Data Elasticsearch but in reactor, so there is nothing for us to do here. You could implement your own kind of buffering data from a flux before passing it on

@chemicL from reactor core team fixed the issue via reactor/reactor-core#3332

using window() to concatMap() has negative performance impact. bufferTimeout() is more suited, which was the original proposal.

Would it be possible to use the version 3.5.7 of reactor core, which contains reactor/reactor-core#3332 and revert back to the use of bufferTimeout, which offers better performance and a cleaner design?

@sothawo
Copy link
Collaborator

sothawo commented Jun 27, 2023

Would it be possible to use the version 3.5.7 of reactor core

Spring Data Elasticsearch does not set the version of the webflux dependency (and of the reactor then) but uses the spring-data-common parent where these versions are set. When I check the current snapshot, this one pulls in Spring 6.0.10 and webflux 6.0.10 pulls in reactor core 3.5.7.

So in the current main development branch for the next 5.2 version it should be possible to switch back to bufferTimeout() implementation.

@patpatpat123
Copy link
Author

If I can explain myself @tongdaoqa , before the fix from the reactor team, buffer timeout was not behaving correctly.

You can test it yourself, even with the current version, or as a standalone, using a very aggressive flux, or an irregular flux, the error should be reproducible.

Actually, even with the new fix provided by the reactor team, I still would like to test (I have test cases ready) and confirm reverting back to buffer timeout is not a synonym for new bugs for spring data elasticsearch.

I will include some performance test as well

@chemicL
Copy link
Member

chemicL commented Nov 3, 2023

Please see #2607 (comment) for clarification about the bufferTimeout operator improvements (in short: a new variant was added that handles backpressure, the existing was not modified).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants