Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-848] User documentation #4702

Merged
merged 5 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

librdkafka v2.4.0 is a feature release:

* [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records) Augment ProduceResponse error messaging for specific culprit records (#4583).
* [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): The Next Generation of the Consumer Rebalance Protocol (#4610)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this EA outside of the brackets and it should be spelled fully "EARLY ACCESS".

We should explain little more here itself instead of below. Something like "EARLY ACCESS: This should be used only for testing and not to be used in Production. Some of the feature/contract of this KIP might be changed in future."

* [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records): Augment ProduceResponse error messaging for specific culprit records (#4583).
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes,
check the [release notes](https://www.openssl.org/news/cl30.txt).
* Integration tests can be started in KRaft mode and run against any
Expand All @@ -12,9 +16,6 @@ librdkafka v2.4.0 is a feature release:
max period of 1 ms (#4671).
* Fixed a bug causing duplicate message consumption from a stale
fetch start offset in some particular cases (#4636)
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Fix to metadata cache expiration on full metadata refresh (#4677).
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).
Expand All @@ -32,6 +33,9 @@ librdkafka v2.4.0 is a feature release:
error. Rest of records in the batch will fail with the new error code
_INVALID_DIFFERENT_RECORD (Java: KafkaException) and can be retried manually,
depending on the application logic (#4583).
* The new consumer group rebalance protocol, defined in KIP 848, is still in **Early Availability**: _not production-ready_, _not supported_.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we usually refer to it as "Early Access", and that't actually the term we see in the logs when running the broker with it...

The new 'consumer' rebalance protocol is enabled along with the new group coordinator. This is part of the early access of KIP-848 and MUST NOT be used in production

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also add a warning log for the same.

It's possible to try it in a non-production enviroment.
A [guide](INTRODUCTION.md#next-generation-of-the-consumer-group-protocol-kip-848) is available (#4610).


## Fixes
Expand Down
94 changes: 93 additions & 1 deletion INTRODUCTION.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to adjust few sections and move some part to the changelog instead of here.

Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ librdkafka also provides a native C++ interface.
- [Auto offset reset](#auto-offset-reset)
- [Consumer groups](#consumer-groups)
- [Static consumer groups](#static-consumer-groups)
- [Next generation of the consumer group protocol](#next-generation-of-the-consumer-group-protocol-kip-848)
- [Topics](#topics)
- [Unknown or unauthorized topics](#unknown-or-unauthorized-topics)
- [Topic metadata propagation for newly created topics](#topic-metadata-propagation-for-newly-created-topics)
Expand Down Expand Up @@ -1540,6 +1541,96 @@ the original fatal error code and reason.
To read more about static group membership, see [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances).


### Next generation of the consumer group protocol: [KIP 848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)

Starting from librdkafka 2.4.0 the next generation consumer group rebalance protocol
is in **Early Availability**. It means it's still _not production-ready_ and
_not supported_, given it's still under validation and lacking some needed features.

With this protocol the role of the Group Leader (a member) is removed and
the assignment is calculated by the Group Coordinator (a broker) and sent
to each member through heartbeats.

To test it, a Kafka cluster must be set up, in KRaft mode, and the new group
protocol enabled with the `group.coordinator.rebalance.protocols` property.
Broker version must be Apache Kafka 3.7.0 or newer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Client side, it can be enabled by setting the new property `group.protocol=consumer`.
A second property named `group.remote.assignor` is added to choose desired
remote assignor.

**Available features**

- Subscription to one or more topics
- Rebalance callbacks (see contract changes)
- Static group membership
- Configure remote assignor
- Max poll interval is enforced
- Offline upgrade from an empty consumer group with committed offsets

**Missing features**

- Regular expression support when subscribing
- AdminClient changes as described in the KIP
- Client side assignors as described in the KIP
- Online upgrade of a non-empty consumer group
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we should call these two as missing features or not. Adding them to missing features shows that we are planning to implement these in future which we are not as of now.

Instead of "Missing Features" we should call them "Future features" and remove these two points from there.


**Contract changes**

Along with the new feature there are some needed contract changes,
so the protocol will be enabled by default only with a librdkafka major release.

- These client properties are deprecated: `partition.assignment.strategy`,
replaced by `group.remote.assignor`;
`session.timeout.ms` and `heartbeat.interval.ms`, replaced by broker configurations
`group.consumer.session.timeout.ms` and `group.consumer.heartbeat.interval.ms`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- These client properties are deprecated: `partition.assignment.strategy`,
replaced by `group.remote.assignor`;
`session.timeout.ms` and `heartbeat.interval.ms`, replaced by broker configurations
`group.consumer.session.timeout.ms` and `group.consumer.heartbeat.interval.ms`.
- Deprecated client configurations for the new protocol
- `partition.assignment.strategy` replaced by `group.remote.assignor`
- `session.timeout.ms` replaced by broker configuration `group.consumer.session.timeout.ms`
- `heartbeat.interval.ms`, replaced by broker configuration `group.consumer.heartbeat.interval.ms`
- `group.protocol.type` which is not used in the new protocol


- Protocol rebalance is fully incremental, so the only allowed functions to
use in a rebalance callback will be `rd_kafka_incremental_assign` and
`rd_kafka_incremental_unassign`. Currently you can still use existing code
and the expected function to call is determined based on the chosen
`partition.assignment.strategy` but this will be removed already in next
release.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Protocol rebalance is fully incremental, so the only allowed functions to
use in a rebalance callback will be `rd_kafka_incremental_assign` and
`rd_kafka_incremental_unassign`. Currently you can still use existing code
and the expected function to call is determined based on the chosen
`partition.assignment.strategy` but this will be removed already in next
release.
- Protocol rebalance is fully incremental, so the only allowed functions to
use in a rebalance callback will be `rd_kafka_incremental_assign` and
`rd_kafka_incremental_unassign`. Currently you can still use existing code
and the expected function to call is determined based on the chosen
`partition.assignment.strategy` but this will be removed in the next
release.


When setting the `group.remote.assignor` property, it's already
required to use the incremental assign and unassign functions.
Comment on lines +1599 to +1600
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also explain that even range assignor is sticky in the new protocol.


- With a static group membership, if two members are using the same
`group.instance.id`, the one that joins the consumer group last will be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`group.instance.id`, the one that joins the consumer group last will be
`group.instance.id`, the one that joins the consumer group later will be

fenced, with the fatal `UNRELEASED_INSTANCE_ID` error. Before, it was the existing
member to be fenced. This was changed to avoid two members contending the
same id. It also means that any instance that crashes won't be automatically
replaced by a new instance until session times out and it's especially required
to check that consumers are being closed properly on shutdown. Ensuring that
no two instances with same `group.instance.id` are running at any time
is also important.

- Session timeout is remote only and, if the Coordinator isn't reachable
by a member, this will continue to fetch messages, even if it won't be able to
commit them. Otherwise, the member will be fenced as soon as it receives an
heartbeat response from the Coordinator.
With `classic` protocol, instead, member stops fetching when session timeout
expires on the client.

For the same reason, when closing or unsubscribing with auto-commit set,
the member will try to commit until a specific timeout has passed.
Currently the timeout is the same as the `classic` protocol and it corresponds
to the deprecated `session.timeout.ms`, but it will change before the feature
reaches a stable state.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should explain that we are still using deprecated property.


- An `UNKNOWN_TOPIC_OR_PART` error won't be received when a consumer is
subscribing to a topic that doesn't exist in local cache, as the consumer
is still subscribing to the topic and it could be created just after that.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explain more on why we are writing this. What used to happen in old protocol.


- A consumer won't do a preliminary Metadata call that returns a
`TOPIC_AUTHORIZATION_FAILED`, topic partitions will be assigned to the member
by the Coordinator only if it's authorized to consume from the topic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same


- Number of assign/revoke callbacks isn't fixed anymore, as it depends on
heartbeat timing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean and does it affect the users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed




### Note on Batch consume APIs

Using multiple instances of `rd_kafka_consume_batch()` and/or `rd_kafka_consume_batch_queue()`
Expand Down Expand Up @@ -1951,7 +2042,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported |
| KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported |
| KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | Supported |
| KIP-584 - Versioning scheme for features | WIP | Not supported |
| KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported |
| KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported |
Expand All @@ -1961,6 +2052,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported |
| KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported |
| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 (WIP) | Supported |
| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 3.7.0 (EA) | Early availability |



Expand Down