Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out-of-Memory errors when sinking large topics #270

Open
vpapanchev opened this issue Jan 16, 2024 · 5 comments
Open

Out-of-Memory errors when sinking large topics #270

vpapanchev opened this issue Jan 16, 2024 · 5 comments
Labels

Comments

@vpapanchev
Copy link

Scenario Overview

We have several topics, each of them already containing gigabytes of data (~1-10 millions of records). We need to export the data to S3.

Issue:

Using the Aiven S3 Connector we run into Out-of-memory errors indicating that the Kafka Connect JVM process does not have enough heap space.

Consequences:

The S3 connector runs into errors.
The entire Kafka Connect cluster is lagging.
The Aiven CLI stops working and returns an 503 error.

Details:

Looking at the logs it looks like the connector is permanently ingesting messages from the topic and storing them in memory.
(log messages come from here)

It looks like the connector is not fast enough in writing to S3 and thus the memory is not freed in time.

We managed to get rid of the Out-of-memory errors by scaling up the Kafka Connect cluster. However, this is not a suitable long-term solution as we would need to setup multiple such connectors in parallel in the future.

We would like to have something that gives us some control over the memory consumption of the connector, e.g., a configuration for the maximum size of the input records buffer.

PS: Trying out the Confluent S3 connector provided by Aiven (version 5.0.0) does not run into Out-of-memory errors and utilizes a lot less memory but it's not an option for us.

@vpapanchev
Copy link
Author

Out-of-memory logs:
[kafka-development-connect-6]2024-01-16T13:41:47.138064[kafka-connect]Terminating due to java.lang.OutOfMemoryError: Java heap space

@vpapanchev
Copy link
Author

After some investigation we managed to confirm that the out-of-memory errors are due to the connector consuming from the Kafka topics faster than it writes to S3. This happens especially in the cases when the topics already contain a lot of data.

We found that using the offset_flush_interval_ms configuration property we can control how often the connector writes to S3. By default, this is done once every 60 seconds. Setting this to a much lower value (5 seconds or even 1 second) triggers the writing to S3 much quickly. Therefore, memory is being freed up on time.

Nonetheless, the main problem remains: If the connector needs to copy large amounts of data and for some reason writing to S3 takes up more time (maybe also due to other reasons unrelated to the configured flush interval), the connector will consume the entire topic in memory, thus, overloading the Kafka Connect cluster.

We would like to request a new feature that allows us to configure a maximum size of the input records buffer. When this size is reached, the Kafka consumer should pause and wait until memory has been freed up.
In this way, we can easily configure the maximum memory consumption of each connector and avoid overloading the cluster.

@jeqo
Copy link
Contributor

jeqo commented Jan 26, 2024

@vpapanchev thanks for reporting this issue!
Agree with your assessment. This is something we were aware of and the workaround described is the current alternative to deal with this issue. Nonetheless, your feature request is valid as there should be a better way to avoid OOM on this connector.

Let us know if you are planning to work on this, otherwise we will add it to our backlog

@vpapanchev
Copy link
Author

Thank you @jeqo for the response.

I am not currently planning on working on this, so please add it to your backlog. I would appreciate it if you give an update here once you start working on it.

If it's a known issue, then you might be able to help me with my current struggles :)
So far, we were only able to configure the offset_flush_interval_ms property on the Kafka Connect cluster itself. Do you know if it's possible to configure it per connector, i.e., whether a connector is able to override the value configured in the cluster?

We enabled the parameter https://docs.aiven.io/docs/products/kafka/kafka-connect/reference/advanced-params#connector-client-config-override-policy for the Kafka Connect cluster by setting its value to “All”. I then tried to configure a particular offset_flush_interval_ms for an S3 Connector using various properties, such as:

  • offset.flush.interval.ms
  • admin.override.offset.flush.interval.ms
  • consumer.override.offset.flush.interval.ms
  • kafka_connect.offset.flush.interval.ms
  • override.kafka_connect.offset.flush.interval.ms
  • override.offset.flush.interval.ms
  • ...
    None of it seemed to work..

If you have any suggestions, that would be great!

Kind regards,
Vasil

@ahmedsobeh ahmedsobeh transferred this issue from Aiven-Open/s3-connector-for-apache-kafka Aug 27, 2024
@mkeskells
Copy link
Contributor

mkeskells commented Oct 7, 2024

https://strimzi.io/docs/operators/0.33.1/full/configuring#configuring_kafka_connect_for_high_volume_messages

This seems to be the same problem
and they are proposing setting these properties.
This is not my expertise (I am new to kafka connect) so I haven't tested and can recommend these, but equally would like to know if they fix the issue!

So it looks like you can configure the amount of data that can be help - I understand that this isn't the same and the java memory, but its a crude approximation, and may be good enough for the problem that you are seeing

consumer.fetch.max.bytes: 52428800
consumer.max.partition.fetch.bytes: 1048576
consumer.max.poll.records: 500

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants