-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
keep pushback intact on Multi.groupItems().intoLists().of(size,duration) #493
keep pushback intact on Multi.groupItems().intoLists().of(size,duration) #493
Conversation
In the previous implementation the logic was partly handled by calling intoMultis().every(Duration). This was causing to get an unlimited number of items from upstream. When processing a kafka stream with many small messages fitting in memory, the throttled policy would eventually (60seconds) see 'stale' non-processed messages causing an exception shutting processing down completely. This problem is solved using the MultiBufferWithTimeoutOp directly.
Codecov Report
@@ Coverage Diff @@
## master #493 +/- ##
============================================
+ Coverage 89.95% 90.16% +0.21%
- Complexity 2931 2937 +6
============================================
Files 382 382
Lines 11340 11340
Branches 1420 1422 +2
============================================
+ Hits 10201 10225 +24
+ Misses 578 562 -16
+ Partials 561 553 -8 |
Would you have a complete use-case to understand the context of this pull request and what exactly it fixes? |
When reading a high volume kafka topic we first need to do some processing and then we store the resulting events in an elasticsearch index. We insert these in batches (of configurable size) into elastic to gain a massive performance increase. Using the throttled setting on the mutiny-kafka-connector we ensure async and at least once processing. Problem was that the upstream would read all data from kafka resulting in not timely processed (acked) events. The line: .groupItems().intoLists().of(100, Duration.ofMillis(1000)) was requesting Integer.MAX_VALUE, after the change it requests the list-size. Example simulation code:
|
Update: This changed fixed the pushback problem. Testing this in a more realistic setting (kube, dev environment) returned an exception.
|
Very good catch!
Thanks! |
Damned, didn't see your last comment. The back pressure failure you see is due to a consumer not consuming fast enough. |
Actually, pretty sure it comes from io.smallrye.mutiny.subscription.MultiSubscriber#onFailure line 161. Basically every time window we try to emit the accumulated items. If we don't have requests, we fail, as we can't continue accumulating (that would be an OOM) |
That is the only place with that exact exception text. What I see is that my downstream requested 16. Then the MultiBufferWithTimeoutOp is producing these 16 lists, counting down line:144 - most of the time it will jump from 1 to 16. Sometimes it ends at 0. Currently trying to get a better understanding why. |
Exception only occurs when there is no .emitOn(Infrastructure.getDefaultWorkerPool()) before the group into list with size and duration. Maybe one of you can explpain this difference in behaviour? |
In the previous implementation the logic was partly handled by calling intoMultis().every(Duration). This was causing to get an unlimited number of items from upstream.
When processing a kafka stream with many small messages fitting in memory, the throttled policy would eventually (60seconds) see 'stale' non-processed messages causing an exception shutting processing down completely. This problem is solved using the MultiBufferWithTimeoutOp directly.