Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance drop for Kafka reactive messaging since Quarkus 2.0 #19098

Closed
loicmathieu opened this issue Jul 29, 2021 · 19 comments · Fixed by #19461
Closed

Performance drop for Kafka reactive messaging since Quarkus 2.0 #19098

loicmathieu opened this issue Jul 29, 2021 · 19 comments · Fixed by #19461

Comments

@loicmathieu
Copy link
Contributor

loicmathieu commented Jul 29, 2021

Describe the bug

We made a lot of test and ends up to the conclusion that the thoughtput of Smallrye Reactive Messaging Kakfa drops by approximatively 4 times from Quarkus 1.13 to Quarkus 2.

I also try Quarkus 2.1 and by forcing the latest version of smallrye reactive messaging Kafka (3.8) to see if it changes something but no.

I isolated one of our Kafka components that reads from one AVRO topic (6 partition), transform the message to JSON, adds some metadata and do some logging to narrow the test.
The component runs with 0.5 CPU and 512m of RAM, we start two instances of the component.
The only things that changed between the two run is the Quarkus version.
We generate loads by stopping the component and resetting the offsets of the topic.

On Quarkus 1.13 we can handle more than 200k msg/min per instance.
See cpu and allocation profiles (60s): profile-1.13.zip
During the run, CPU goes up to 20% usage.

On Quarkus 2 we handle at most 50k msg/min per instance.
See cpu and allocation profiles (60s): profile-2.1.zip
During the run, CPU goes no more than 5% usage.

Expected behavior

Performance is on par with 1.13

Actual behavior

Big performance degradation between Quarkus 1.13 and Quarkus 2.0/2.1

How to Reproduce?

No response

Output of uname -a or ver

Linux 5.4.109+ #1 SMP Fri Apr 30 19:51:55 PDT 2021 x86_64 GNU/Linux

Output of java -version

16

GraalVM version (if different from Java)

No response

Quarkus version or git rev

2.0 or 2.1

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

@quarkus-bot
Copy link

quarkus-bot bot commented Jul 29, 2021

@loicmathieu
Copy link
Contributor Author

After disabling pause-if-no-requests I had to double the memory due to being OOMKilled and disabled liveness/readiness probes.
Then I saw spurious blocking threads on random frame.
After looking at some profile and GC logs, it appears that the new version needs a LOT more memory, doubling it is really not enought as with 2Go limit it uses a heap of 1.5GB (usually, on Quarkus 1.13, heap is around 50MB) and still GC is using almost all CPU (more than 80% of CPU is inside the GC).

See the attached profiles and GC logs:
profile-no-pause.zip
gc-no-pause.log

@gsmet
Copy link
Member

gsmet commented Jul 29, 2021 via email

@gsmet
Copy link
Member

gsmet commented Jul 29, 2021

I see you moved to Micrometer. Can you drop it and see how it goes?

@cescoffier
Copy link
Member

cescoffier commented Jul 29, 2021

If you disable the pause/request cycles, you must be prepared for having to store the records in memory. In the previous version, it stopped polling, so you didn't have this problem, but as you were not polling you were also consider dead by the broker.

My guess is that we are not writing fast enough on the other side. This has changed recently to work around a critical bug in the kafka client (which could block under certain circumstances and so need to be handled by a worker thread). So validate this hypothesis, can you try to use a regular 'bare' client on the producer side, and see how it behaves. Not great but it would at least validate my guess.

That would also explain the difference in CPU usage.

@loicmathieu
Copy link
Contributor Author

@gsmet

Can you take a heap dump for both and compare things?

I'll provides heap histogram and dump if possible. As the heap usage difference is huge (50MB vs 1.5GB) I think an histogram may reveal the offender easily.

I see you moved to Micrometer. Can you drop it and see how it goes?

We moved multiple releases away, and without it we will be blind. Is there some structural changes to micrometer between 1.13 and 2.0 ? This really seems to be related to Kafka as when we put less load, heap reamains around 50MB and everything is OK. I would not expecte micrometer to have an impact linked to the number of messages handled by the component.

@cescoffier you're supposed to be on PTO man! Enjoy your days off it's not urgent and @Ladicek already help me on this :)

can you try to use a regular 'bare' client on the producer side, and see how it behaves

I'll see if I can make the change to validate your hypothesis.

@loicmathieu
Copy link
Contributor Author

loicmathieu commented Jul 30, 2021

So I can confirm that my high heap usage is due to too many Kafka messages kept in memort. When doing heap histogram of live object I have 1000x more Kafka message in memory using 2.1 (with pause-if-no-request disabled) than 1.13.

See the following heap histograms:
histo-1.13-1.txt
histo-2.1-1.txt

@loicmathieu
Copy link
Contributor Author

I replaced the @Outgoing usage with a raw Kafka Producer that I manually instantiate, re-enabled pause-if-no-request, and use it to send the outgoing message and it shows no performance differencies.
So the issue is on the consumer side.

@loicmathieu
Copy link
Contributor Author

I remove the Kafka Producer to only consume messages to by 100% sure and still the same.
I also try Quarkus 999-SNAPSHOT that contains SR reactive messaging 3.8 and no difference.

@cescoffier
Copy link
Member

Thanks for the investigation. When we switched to the new client, we made lots of tests and didn't see that performance drop. However that was like 6 months ago. On the top of my head, I would try to disable cloud events and tracing.

I will have a look when I'm back, but I will need a reproducer as all the test we made (using production traffic) didn't show such a difference.

@cescoffier
Copy link
Member

Depending on the code, the throttled commit strategy may see more contention, as accesses are made on two threads.

You can try to enable auto commit (or switch the commit strategy to ignore) to see if it has an impact.

@loicmathieu
Copy link
Contributor Author

On the top of my head, I would try to disable cloud events and tracing.

We disabled it multiple releases ago so it's not linked to this.

When we switched to the new client, we made lots of tests and didn't see that performance drop.

Yes, I know. I'll discuss with Ladislav as he may have some idea and I think I know how to reproduce it: creating a lot of messages prior to launching the applicaiton may allow to create a high enought througtput to show the issue.

@Ladicek
Copy link
Contributor

Ladicek commented Aug 4, 2021

I spent my Monday on this and it's relatively straightforward to reproduce.

I put a Kafka broker on my new machine, which is quite powerful, created a 10-partition topic there, and used a small Quarkus application to produce 1_000_000 messages to the topic, each message being a random 100-byte string.

Then, I wrote another small Quarkus application that just consumes the messages from the broker and logs a summary on every 10_000 messages. I run that application on my work laptop.

The consumer is:

@ApplicationScoped
public class Consumer {
    private final AtomicInteger counter = new AtomicInteger(0);

    private volatile long startTime;

    @PostConstruct
    public void init() {
        startTime = System.currentTimeMillis();
    }

    @Incoming("msgs-consumer")
    public CompletionStage<Void> consume(Message<String> msg) {
        int count = counter.incrementAndGet();

        if (count % 10_000 == 0) {
            long endTime = System.currentTimeMillis();
            long diffInMillis = endTime - startTime;

            System.out.println("Consumed " + count + " msgs in " + (diffInMillis / 1_000) + " seconds, that is " + count / diffInMillis + " msgs/ms");
        }

        return msg.ack();
    }
}

And the config is:

kafka.bootstrap.servers=hyperion:9092

mp.messaging.incoming.msgs-consumer.connector=smallrye-kafka
mp.messaging.incoming.msgs-consumer.topic=my-topic
mp.messaging.incoming.msgs-consumer.auto.offset.reset=earliest
mp.messaging.incoming.msgs-consumer.group.id=have-to-change-this-all-the-time
mp.messaging.incoming.msgs-consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

With Quarkus 1.13.7.Final, it takes roughly 22 seconds to consume all 1_000_000 messages, while with Quarkus 2.1.0.Final, it would take more than 10x more. (I don't have a precise number, because I was too bored and stopped the app after a minute. The msgs/ms number was more than 40 with 1.13.7 and somewhere around 1 or 2 with 2.1.0.)

My suspicion was that the queue we add in between the Kafka consumer and the rest of Reactive Messaging is the bottleneck. Given that downstream request items 1 by 1, our pause/resume logic basically always allows asking only for 1 batch of records and then we pause. We don't use the Kafka consumer efficiently.

I tried various configurations of max.poll.records and with a batch size of 20_000, I get pretty close to the original performance. (With 10_000, it's about a half.) Also one thing that's clear from the experiments with batch size: the Kafka consumer can fill the queue very quickly (10s of millis, or low 100s of millis with batches of 10_000s of items).

I also tried changing the pause/resume logic to be driven stricly by queue size, trying to keep the queue always at least half-full (https://github.com/Ladicek/smallrye-reactive-messaging/commits/kafka-pause-resume), but with the default queue size (which I arbitrarily set to 1000), that didn't really change much. Again, it's only when I set the max desired queue size to 10_000s items when I start getting back to the original performance. (With a queue of 50_000 items, I slightly surpass the original performance and hit some other bottleneck, because increasing the queue size further doesn't change anything.) This is rather crude and low-level, but allows the user to make their own trade-off between consumption speed and memory occupancy.

@ozangunalp has been investigating a different approach, but I'll let him talk about it :-)

Overall, I think I can safely conclude that the problem is the queue. I don't really know, but ... can we do without it? :-)

@ozangunalp
Copy link
Contributor

I tested bunch of scenarios: with and without throttled commit, blocking/unblocking vs noop processing, with and without chained outgoing from the consumed message.

I concluded that the main issue with the performance drop comes from the pause-if-no-requests, and, commit strategy and chained outgoing didn't have that much impact.

As with the tests conducted by @Ladicek, my test scenarios involved consuming a high amount of already published messages from a topic as fast as possible.

I noticed that when pause-if-no-requests is enabled, the consuming thread keeps on flapping between pause/resume. This surely limits messages kept in buffer, but it also starves the consuming thread and drops the message consumption throughput from X2 to up to X10 depending on the size of the topic to consume.
To note of course that pause-if-no-requests=false results with unbounded buffering of messages and thus the consumer thread is never in starvation.

I wanted to separate two types of back-pressure we need to apply in deciding when to pause/resume:

  1. The downstream consumer code can't keep up with the rate of incoming messages. This will manifest itself by downstream requesting messages slowly, resulting in requested counter to drop to 0 inside the drain loop.
  2. Whether or not the downstream can keep up, if the poll loop keeps on returning new messages to consume, we need to limit the size of messages in buffer, simply to avoid OOM.

For handling the first type of back-pressure, I tested moving the pause/resume logic inside the poll loop, instead of the drain loop, and conditioned it simply on the requested to drop to zero. This does work with a blocking consumer (or doing something with Uni chain) and gives us a steady flow of message intake to the consumer. Because the processing takes time, the time overhead is comparatively low. To give a rough idea, consuming 20_000 messages each taking 3ms of sleep to process, took 1m13s with pause-if-no-requests=false and 1m26s with pause-if-no-requests=true.

However, with the noop processing this doesn't work at all. The consumer will keep on requesting more messages and pause/resume is never activated. Strangely in my tests I couldn't find a difference with noop @Incoming method and noop @Incoming/@Outgoing with ack propagation. Both keep on requesting more messages, and I didn't modify the producer config max.inflight requests.

This brings us to the second type of back-pressure, limiting the buffer size.
I find the custom drain loop inside KafkaRecordStream complex enough :)
So I wanted to apply this on a downstream processor with a buffer. This will, in theory prefetch the amount of the max buffer size from the upstream, and when the buffer is full, skip the request. When the buffer is drained, it'll request back the skipped messages. I think this should apply the same back-pressure to the KafkaRecordStream and result in desired pause/resume behavior.

Except that I wasn't able to successfully implement this Reactive Stream operator I am describing. I can't reuse existing operators, as far as I understand the Mutiny on overflow buffer variations prefetch long max items, and therefore not applying back-pressure to the upstream.

Maybe my hypothesis is not correct with the described operator or it is unnecessarily complex. But I wanted to dig more in order to understand what was happening inside the drain loop.

For the existence of the queue, I think that as long as the poll loop and consumers are on different threads we'll need some kind of queue. And because poll can bring us a variable number of records the size of this queue will be an issue.

I agree that as proposed before, a quick win is to keep the pause/resume logic inside KafkaRecordStream with a check on requested to zero and buffer size limit to a given max size config.

If no objections, I'll keep on with a PR adding my ad-hoc tests and the proposed quick win.

@loicmathieu
Copy link
Contributor Author

@ozangunalp thanks for the detailed feedback on your findings and your testings.
If you have a PR with some quick win, I can test it quickly.

@ozangunalp
Copy link
Contributor

I'll push the quick fix with max buffer size config. I need more time for the tests as I was side-tracked by this buffer implementation.

@Ladicek
Copy link
Contributor

Ladicek commented Aug 16, 2021

I realized we didn't post an update here for a while. The TLDR is that a fix has been merged to SmallRye Reactive Messaging (smallrye/smallrye-reactive-messaging#1339) and will be available in a 3.9.0 release, which will be available in time for Quarkus 2.2.

I'll share my personal take on this, but @ozangunalp did most of the work, so I'll let him share his view, if so desired :-)

So basically, as described in my previous comment, I concluded that the queue is the problem, and that the Kafka consumer can fill the queue very quickly, so I thought the Kafka consumer can't be the problem. Of course, that was partially correct and partially wrong.

By accident, I ran the following experiment: I took the Blackhole.consumeCPU method from JMH and modified the @Incoming method to call consumeCPU(10_000). With Quarkus 1.13.7, that caused throughput to drop to 1/2, while with Quarkus 2.1.0, the throughput wasn't affected at all. Then I remembered everything I knew about queueing theory, which is this single equation: response time = service time + wait time (queueing delay). If increasing service time decreases throughput, clearly the service time is a dominating factor (Quarkus 1.13.7). If increasing service time doesn't affect throughput, it's the queueing delay that dominates (Quarkus 2.1.0).

Now, clearly the queue is the problem -- that was correct --, but how come we wait on the queue so long? There's nothing obviously wrong with the queue implementation, so I realized that even though the Kafka consumer can fill the queue very quickly, it must be the problem. Then I remembered that a Kafka consumer is supposed to be used like this:

while (true) {
    var batch = consumer.poll(someTimeout);
    for (var record : batch) {
        ... process record ...
    }
}

But we don't do this. We put all records from the batch to a queue (to be processed on another thread) and immediately poll again. That is, we poll too fast. That's why we have the pause/resume logic. So I thought, what if we don't pause immediately? Instead, we just stop polling for a short while (say 100 millis) and only after that time, if the queue is still more than half full, we pause. If the @Incoming method is able to process messages fast enough, we wouldn't pause at all (and hence wouldn't resume), we would just start polling again when the queue becomes half empty.

I implemented this and it fixed the problem completely. It is however a bit complex and we really shouldn't stop polling, because polling includes heartbeat. At that point, by another accident, I realized that consumer.poll(someTimeout) may block (up to given timeout). What does it do if all assigned partitions are paused? That's right -- it blocks. And since no message can come (assigned partitions are paused), it blocks for the entire given timeout. In SmallRye Reactive Messaging, the poll timeout is configurable and defaults to 1 second. This means that when the consumer is paused, a fast enough @Incoming method will finish processing the record queue and then has to wait [almost] 1 second for the consumer to return from poll, get resumed and fill the queue again.

One last experiment: what happens when we pass Duration.ZERO to the poll() method if the consumer is paused? Lo and behold, throughput is back to where it should be.

The rest is just fixing tests etc. :-)

@loicmathieu
Copy link
Contributor Author

Many thanks @Ladicek for the detailed explaination.
And a big THANK YOU to you and @ozangunalp for working on this and making it performs so well.

I'll post final performance tests results here with new profiles then close it as now the performance issue is fixed.

@ozangunalp
Copy link
Contributor

👍 I've written my resume of the fix in the PR.
We went along with a lot of false assumptions but I'm glad that we made the backpressure work without a performance drop. I think the key was to test the issue with "hard-working" consumers (via consumeCPU etc.).
@loicmathieu let us know if your other tests are looking good, then we need to wire it in Quarkus and close this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants