Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(tuning): consumer config: updates content on rebalances #10292

Merged
merged 3 commits into from
Jul 4, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 77 additions & 25 deletions documentation/modules/managing/con-consumer-config-properties.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,19 @@ A shorter interval between consecutive heartbeats allows for quicker detection o
The heartbeat interval must be lower, usually by a third, than the session timeout.
Decreasing the heartbeat interval reduces the chance of accidental rebalancing, but more frequent heartbeats increases the overhead on broker resources.

[source,env]
----
# ...
max.poll.records=100 # <1>
session.timeout.ms=60000 # <2>
heartbeat.interval.ms=10000 # <3>
# ...
----
<1> Set the number records returned to the consumer when calling the `poll()` method.
<2> Set the timeout for detecting client failure.
If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range.
<3> Adjust the heartbeat interval lower according to anticipated rebalances.
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved

== Managing offset policy

Use the `auto.offset.reset` property to control how a consumer behaves when no offsets have been committed,
Expand All @@ -267,49 +280,88 @@ If a consumer group or standalone consumer is inactive and commits no offsets du
[source,env]
----
# ...
heartbeat.interval.ms=3000 <1>
session.timeout.ms=45000 <2>
auto.offset.reset=earliest <3>
auto.offset.reset=earliest # <1>
# ...
----
<1> Adjust the heartbeat interval lower according to anticipated rebalances.
<2> If no heartbeats are received by the Kafka broker before the timeout duration expires, the consumer is removed from the consumer group and a rebalance is initiated.
If the broker configuration has a `group.min.session.timeout.ms` and `group.max.session.timeout.ms`, the session timeout value must be within that range.
<3> Set to `earliest` to return to the start of a partition and avoid data loss if offsets were not committed.
<1> Set to `earliest` to return to the start of a partition and avoid data loss if offsets were not committed.

If the amount of data returned in a single fetch request is large,
a timeout might occur before the consumer has processed it.
In this case, you can lower `max.partition.fetch.bytes` or increase `session.timeout.ms`.

== Minimizing the impact of rebalances
== Minimizing the impact of rebalances

Rebalances in Kafka consumer groups can introduce latency and reduce throughput, impacting overall service performance.
The rebalancing of a partition between active consumers in a group is the time it takes for the following to take place:

* Consumers to commit their offsets
* The new consumer group to be formed
* The group leader to assign partitions to group members
* The consumers in the group to receive their assignments and start fetching

The rebalancing process can increase the downtime of a service, particularly if it happens repeatedly during a rolling restart of a consumer group cluster.
Rebalances are triggered by changes in consumer health, network issues, configuration updates, and scaling events.
This process can increase service downtime, especially if it occurs frequently, such as during rolling restarts of consumers in a group.

In this situation, you can introduce _static membership_ by assigning a unique identifier (`group.instance.id`) to each consumer instance within the group.
Static membership uses persistence so that a consumer instance is recognized during a restart after a session timeout.
Consequently, the consumer maintains its assignment of topic partitions, reducing unnecessary rebalancing when it rejoins the group after a failure or restart.

Additionally, adjusting the `max.poll.interval.ms` configuration can prevent rebalances caused by prolonged processing tasks, allowing you to specify the maximum interval between polls for new messages.
Use the `max.poll.records` property to cap the number of records returned from the consumer buffer during each poll.
Reducing the number of records allows the consumer to process fewer messages more efficiently.
In cases where lengthy message processing is unavoidable, consider offloading such tasks to a pool of worker threads.
This parallel processing approach prevents delays and potential rebalances caused by overwhelming the consumer with a large volume of records.
To minimize the impact of rebalances, consider the following strategies and configurations:

[source,shell,subs="+quotes"]
Assess throughput and parallelism:: Assess the expected throughput (bytes and records per second) and parallelism (number of partitions) of the input topics against the number of consumers.
+
If adjustments are needed, start by setting up static membership, adopting a partition assignment strategy, and setting a limit on the number of records returned using the `max.poll.records` property.
Add further configurations for timeouts and intervals, if required and with care, as these can introduce issues related to the handling of failures.

Use static membership:: Assign a unique identifier (`group.instance.id`) to each consumer instance.
Static membership introduces persistence so static consumers retain partition assignments across restarts, reducing unnecessary rebalances.

Adopt partition assignment strategies::
* Use appropriate partition assignment strategies to reduce the number of partitions that need to be reassigned during a rebalance, minimizing the impact on active consumers.
* The `org.apache.kafka.clients.consumer.CooperativeStickyAssignor` strategy is particularly effective, as it ensures minimal partition movement and better stability during rebalances.

Adjust record limits and poll intervals::
* Use the `max.poll.records` property to limit the number of records returned during each poll.
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved
Processing fewer messages more efficiently can prevent delays.
* Use the `max.poll.interval.ms` property to prevent rebalances caused by prolonged processing tasks by setting the maximum interval between calls to the `poll()` method.
* Alternatively, consider pausing partitions to retrieve fewer records at a time.

Adjust session timeout and heartbeat intervals::
* Use the `session.timeout.ms` property to set a longer timeout to reduce rebalances caused by temporary network glitches or minor processing delays.
* Adjust the `heartbeat.interval.ms` property to balance failure detection checks with minimizing unnecessary rebalances.

Monitor consumer health:: Instability in consumer applications, such as frequent crashes, can trigger rebalances.
Use Kafka consumer metrics to monitor such things as rebalance rates, session timouts, and failed fetch requests.

.Example configuration to minimize the impact of rebalances
[source,shell]
----
# ...
group.instance.id=_UNIQUE-ID_ <1>
max.poll.interval.ms=300000 <2>
max.poll.records=500 <3>
group.instance.id=<unique_id>
max.poll.interval.ms=300000
max.poll.records=500
session.timeout.ms=45000
heartbeat.interval.ms=3000
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved
# ...
----
<1> The unique instance id ensures that a new consumer instance receives the same assignment of topic partitions.
<2> Set the interval to check the consumer is continuing to process messages.
<3> Sets the number of processed records returned from the consumer.

.Scaling strategies
To minimize the impact of rebalances during scaling of consumer groups, consider the following approaches:

Set a rebalance delay:: Use the `group.initial.rebalance.delay.ms` property in the Kafka configuration to delay the time it takes for consumers to join a new consumer group before performing a rebalance.
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved
Introducing a delay helps avoid triggering several rebalances when starting multiple consumers near the same time.
The appropriate delay depends on the orchestration used and might not be suitable in some circumstances.

Avoid frequent scaling::
* Keep the number of consumers stable, scaling only when necessary and in controlled increments.
* Monitor system performance and adjust your scaling strategy as needed.
PaulRMellor marked this conversation as resolved.
Show resolved Hide resolved
** Lag per partition should be constant and low.
** Records processed per second by consumers should match the records per second in the input topics.
* Use the Kafka Exporter to check for consumer lag and determine if scaling is required.

Implement dynamic scaling policies::
* If using dynamic or event-driven tools for scaling of consumer applications, set lag thresholds based on the backlog of messages.
* Define maximum and minimum replica counts for consumer groups.
* Set periods between scaling events to prevent rapid scaling.

NOTE: In cases where lengthy message processing is unavoidable, consider pausing and resuming partitions as needed.
If you pause all partitions, `poll()` returns no records, allowing you to keep calling it without overwhelming the consumers.
Alternatively, you can offload the processing tasks to a pool of worker threads.
This helps prevents delays and potential rebalances.
Loading