-
-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Slow processors can slow down overall throughput when num.stream.threads
< Number of Stream Tasks
#215
Comments
The problem is also documented here: https://medium.com/@andy.bryant/kafka-streams-work-allocation-4f31c24753cc |
Using Smallrye reactive messaging would linder this pain, but introduce others. With SRM, consumer threads can be scaled independently. Major downsides are that we'd loose the convenience of Kafka-backed state stores (stateful processing / "checkpointing" requires Redis or database access), also the batching functionality provided by Smallrye is sub-optimal as it returns too few records most of the time. |
Wondering how much this actually matters considering that a "completed" analysis would require results from all scanners anyway. Makes it questionable whether OSS Index being significantly faster than Snyk makes a noticeable difference in the end-to-end use case. If multiple analyzers are enabled, the slowest of them will always be the bottleneck, no matter how fast all others are. The effect will also be less noticeable once caching kicks in. |
SRM behaves in very unintuitive ways:
|
Because it does not need any of the features provided by Kafka Streams, it doesn't necessarily make sense to use KS. I looked into migrating to SmallRye messaging (the "native" Quarkus solution), but it still has unpleasant drawbacks that make it not a good fit (#215 (comment)). Most notably, ordered, blocking processing will process all events on a single thread, no matter how many partitions the input topics have. This is prone to become a huge bottleneck under high load. With Parallel Consumer, we get ordering guarantees by key and *still* almost unlimited concurrency (https://github.com/confluentinc/parallel-consumer#ordered-by-key). Further, processing is decoupled from the actual number of partitions. A nice side-effect is also that it supports retries (https://github.com/confluentinc/parallel-consumer#retries). Signed-off-by: nscuro <[email protected]>
Because it does not need any of the features provided by Kafka Streams, it doesn't necessarily make sense to use KS. I looked into migrating to SmallRye messaging (the "native" Quarkus solution), but it still has unpleasant drawbacks that make it not a good fit (#215 (comment)). Most notably, ordered, blocking processing will process all events on a single thread, no matter how many partitions the input topics have. This is prone to become a huge bottleneck under high load. With Parallel Consumer, we get ordering guarantees by key and *still* almost unlimited concurrency (https://github.com/confluentinc/parallel-consumer#ordered-by-key). Further, processing is decoupled from the actual number of partitions. A nice side-effect is also that it supports retries (https://github.com/confluentinc/parallel-consumer#retries). Signed-off-by: nscuro <[email protected]>
As per Kafka Streams' threading model, a topology is broken down into stream tasks. The number of tasks created depends on how many sub-topologies there are, and from how many partitions those sub-topologies consume. One or more tasks are assigned to a single stream thread. The number of stream threads in an application instance is defined by
kafka-streams.num.stream.threads
.If
num.stream.threads
is lower than the number of stream tasks generated for the topology, there will be some threads working on multiple tasks. In Kafka Streams, there is no way to influence how tasks are assigned. This could lead to situations where one thread is assigned to tasks from both:dtrack.vuln-analysis.component
)This means that the upstream task (capable of processing multiple hundreds of records per second) will be significantly delayed, which also has a negative impact on other tasks depending on its results.
In the screenshot below, consumers of the
dtrack.vuln-analysis.component
anddtrack.vuln-analysis.component.purl
topics have a significant lag, despite the respective sub-topologies being capable of processing hundreds of records per second. This should not be the case. Instead, consumers of those topics should be able to keep up with new records in almost realtime.For the vulnerability analyzer, a slow processor like Snyk can additionally lead to sub-optimal batching in other processors. Because fewer records arrive in a given time window, OSS Index will not be able to use the maximum batch size of 128 efficiently:
This is not an issue when there is one stream thread per stream task. For example, when both OSS Index and Snyk are enabled, but the internal analyzer is disabled, there are 18 partitions the application consumes from, leading to 18 stream tasks. Spawning one application instance with
num.stream.threads=18
, or three instances withnum.stream.threads=6
, leads to optimal processing conditions.Effectively, scaling up is unproblematic, but scaling down always comes with the danger of significantly slowing down the entire system.
Possible solutions:
KafkaStreams
instances instead of just one (Quarkus only supports oneKafkaStreams
per application, we'd need more code to make this work, and we have to give up the dev mode integration)The text was updated successfully, but these errors were encountered: