From d4402419b5050dd46e36cf9ea8b06256ff32b1da Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 29 Apr 2024 17:56:55 +0200 Subject: [PATCH 1/5] [KIP-848] User documentation --- CHANGELOG.md | 12 ++++--- INTRODUCTION.md | 94 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 101 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c724f5e7b6..e184960efb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,11 @@ librdkafka v2.4.0 is a feature release: - * [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records) Augment ProduceResponse error messaging for specific culprit records (#4583). + * [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): The Next Generation of the Consumer Rebalance Protocol (#4610) + * [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records): Augment ProduceResponse error messaging for specific culprit records (#4583). + * [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) + Continue partial implementation by adding a metadata cache by topic id + and updating the topic id corresponding to the partition name (#4676) * Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes, check the [release notes](https://www.openssl.org/news/cl30.txt). * Integration tests can be started in KRaft mode and run against any @@ -12,9 +16,6 @@ librdkafka v2.4.0 is a feature release: max period of 1 ms (#4671). * Fixed a bug causing duplicate message consumption from a stale fetch start offset in some particular cases (#4636) - * [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) - Continue partial implementation by adding a metadata cache by topic id - and updating the topic id corresponding to the partition name (#4676) * Fix to metadata cache expiration on full metadata refresh (#4677). * Fix for a wrong error returned on full metadata refresh before joining a consumer group (#4678). @@ -32,6 +33,9 @@ librdkafka v2.4.0 is a feature release: error. Rest of records in the batch will fail with the new error code _INVALID_DIFFERENT_RECORD (Java: KafkaException) and can be retried manually, depending on the application logic (#4583). + * The new consumer group rebalance protocol, defined in KIP 848, is still in **Early Availability**: _not production-ready_, _not supported_. + It's possible to try it in a non-production enviroment. + A [guide](INTRODUCTION.md#next-generation-of-the-consumer-group-protocol-kip-848) is available (#4610). ## Fixes diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 87058449e5..03527a1fdc 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -72,6 +72,7 @@ librdkafka also provides a native C++ interface. - [Auto offset reset](#auto-offset-reset) - [Consumer groups](#consumer-groups) - [Static consumer groups](#static-consumer-groups) + - [Next generation of the consumer group protocol](#next-generation-of-the-consumer-group-protocol-kip-848) - [Topics](#topics) - [Unknown or unauthorized topics](#unknown-or-unauthorized-topics) - [Topic metadata propagation for newly created topics](#topic-metadata-propagation-for-newly-created-topics) @@ -1540,6 +1541,96 @@ the original fatal error code and reason. To read more about static group membership, see [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). +### Next generation of the consumer group protocol: [KIP 848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) + +Starting from librdkafka 2.4.0 the next generation consumer group rebalance protocol +is in **Early Availability**. It means it's still _not production-ready_ and +_not supported_, given it's still under validation and lacking some needed features. + +With this protocol the role of the Group Leader (a member) is removed and +the assignment is calculated by the Group Coordinator (a broker) and sent +to each member through heartbeats. + +To test it, a Kafka cluster must be set up, in KRaft mode, and the new group +protocol enabled with the `group.coordinator.rebalance.protocols` property. +Broker version must be Apache Kafka 3.7.0 or newer. + +Client side, it can be enabled by setting the new property `group.protocol=consumer`. +A second property named `group.remote.assignor` is added to choose desired +remote assignor. + +**Available features** + +- Subscription to one or more topics +- Rebalance callbacks (see contract changes) +- Static group membership +- Configure remote assignor +- Max poll interval is enforced +- Offline upgrade from an empty consumer group with committed offsets + +**Missing features** + +- Regular expression support when subscribing +- AdminClient changes as described in the KIP +- Client side assignors as described in the KIP +- Online upgrade of a non-empty consumer group + +**Contract changes** + +Along with the new feature there are some needed contract changes, +so the protocol will be enabled by default only with a librdkafka major release. + + - These client properties are deprecated: `partition.assignment.strategy`, + replaced by `group.remote.assignor`; +`session.timeout.ms` and `heartbeat.interval.ms`, replaced by broker configurations +`group.consumer.session.timeout.ms` and `group.consumer.heartbeat.interval.ms`. + + - Protocol rebalance is fully incremental, so the only allowed functions to + use in a rebalance callback will be `rd_kafka_incremental_assign` and + `rd_kafka_incremental_unassign`. Currently you can still use existing code + and the expected function to call is determined based on the chosen + `partition.assignment.strategy` but this will be removed already in next + release. + + When setting the `group.remote.assignor` property, it's already + required to use the incremental assign and unassign functions. + + - With a static group membership, if two members are using the same + `group.instance.id`, the one that joins the consumer group last will be + fenced, with the fatal `UNRELEASED_INSTANCE_ID` error. Before, it was the existing + member to be fenced. This was changed to avoid two members contending the + same id. It also means that any instance that crashes won't be automatically + replaced by a new instance until session times out and it's especially required + to check that consumers are being closed properly on shutdown. Ensuring that + no two instances with same `group.instance.id` are running at any time + is also important. + + - Session timeout is remote only and, if the Coordinator isn't reachable + by a member, this will continue to fetch messages, even if it won't be able to + commit them. Otherwise, the member will be fenced as soon as it receives an + heartbeat response from the Coordinator. + With `classic` protocol, instead, member stops fetching when session timeout + expires on the client. + + For the same reason, when closing or unsubscribing with auto-commit set, + the member will try to commit until a specific timeout has passed. + Currently the timeout is the same as the `classic` protocol and it corresponds + to the deprecated `session.timeout.ms`, but it will change before the feature + reaches a stable state. + + - An `UNKNOWN_TOPIC_OR_PART` error won't be received when a consumer is + subscribing to a topic that doesn't exist in local cache, as the consumer + is still subscribing to the topic and it could be created just after that. + + - A consumer won't do a preliminary Metadata call that returns a + `TOPIC_AUTHORIZATION_FAILED`, topic partitions will be assigned to the member + by the Coordinator only if it's authorized to consume from the topic. + + - Number of assign/revoke callbacks isn't fixed anymore, as it depends on + heartbeat timing. + + + ### Note on Batch consume APIs Using multiple instances of `rd_kafka_consume_batch()` and/or `rd_kafka_consume_batch_queue()` @@ -1951,7 +2042,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported | | KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported | | KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported | -| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | supported | +| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | Supported | | KIP-584 - Versioning scheme for features | WIP | Not supported | | KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported | | KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported | @@ -1961,6 +2052,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported | | KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported | | KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 (WIP) | Supported | +| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 3.7.0 (EA) | Early availability | From 3403d2005adb07b7baf345078a3de3a1426840bd Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 1 May 2024 15:44:37 +0200 Subject: [PATCH 2/5] Address comments --- CHANGELOG.md | 2 +- INTRODUCTION.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e184960efb..e9f8ea2ae2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,7 @@ librdkafka v2.4.0 is a feature release: error. Rest of records in the batch will fail with the new error code _INVALID_DIFFERENT_RECORD (Java: KafkaException) and can be retried manually, depending on the application logic (#4583). - * The new consumer group rebalance protocol, defined in KIP 848, is still in **Early Availability**: _not production-ready_, _not supported_. + * The new consumer group rebalance protocol, defined in KIP 848, is still in **Early Access**: _not production-ready_, _not supported_. It's possible to try it in a non-production enviroment. A [guide](INTRODUCTION.md#next-generation-of-the-consumer-group-protocol-kip-848) is available (#4610). diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 03527a1fdc..424a06234d 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1544,7 +1544,7 @@ To read more about static group membership, see [KIP-345](https://cwiki.apache.o ### Next generation of the consumer group protocol: [KIP 848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) Starting from librdkafka 2.4.0 the next generation consumer group rebalance protocol -is in **Early Availability**. It means it's still _not production-ready_ and +is in **Early Access**. It means it's still _not production-ready_ and _not supported_, given it's still under validation and lacking some needed features. With this protocol the role of the Group Leader (a member) is removed and @@ -2052,7 +2052,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported | | KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported | | KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 (WIP) | Supported | -| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 3.7.0 (EA) | Early availability | +| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 3.7.0 (EA) | Early Access | From dc886dd87f3c8b3e1deb18335c1798d8f15d345f Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 6 May 2024 18:41:04 +0200 Subject: [PATCH 3/5] Address comments --- CHANGELOG.md | 16 +++++++++++++--- INTRODUCTION.md | 38 ++++++++++++++++++++------------------ 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e9f8ea2ae2..610e4c2620 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ librdkafka v2.4.0 is a feature release: - * [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): The Next Generation of the Consumer Rebalance Protocol (#4610) + * [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): The Next Generation of the Consumer Rebalance Protocol. + **Early Access**: This should be used only for evaluation and must not be used in production. Features and contract of this KIP might change in future (#4610). * [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records): Augment ProduceResponse error messaging for specific culprit records (#4583). * [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) Continue partial implementation by adding a metadata cache by topic id @@ -33,9 +34,18 @@ librdkafka v2.4.0 is a feature release: error. Rest of records in the batch will fail with the new error code _INVALID_DIFFERENT_RECORD (Java: KafkaException) and can be retried manually, depending on the application logic (#4583). - * The new consumer group rebalance protocol, defined in KIP 848, is still in **Early Access**: _not production-ready_, _not supported_. + + +## Early Access + * The new consumer group rebalance protocol, defined in KIP 848, is still _not production-ready_ and _not supported_. It's possible to try it in a non-production enviroment. - A [guide](INTRODUCTION.md#next-generation-of-the-consumer-group-protocol-kip-848) is available (#4610). + + With this protocol the role of the Group Leader (a member) is removed and + the assignment is calculated by the Group Coordinator (a broker) and sent + to each member through heartbeats. + + A [guide](INTRODUCTION.md#next-generation-of-the-consumer-group-protocol-kip-848) is available + with considerations and steps to follow to test it (#4610). ## Fixes diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 424a06234d..56cf5f1c8b 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1544,8 +1544,12 @@ To read more about static group membership, see [KIP-345](https://cwiki.apache.o ### Next generation of the consumer group protocol: [KIP 848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) Starting from librdkafka 2.4.0 the next generation consumer group rebalance protocol -is in **Early Access**. It means it's still _not production-ready_ and +defined in KIP 848 is introduced. + +**Warning** +It's still in **Early Access** which means it's _not production-ready_ and _not supported_, given it's still under validation and lacking some needed features. +Features and their contract might change in future. With this protocol the role of the Group Leader (a member) is removed and the assignment is calculated by the Group Coordinator (a broker) and sent @@ -1553,7 +1557,8 @@ to each member through heartbeats. To test it, a Kafka cluster must be set up, in KRaft mode, and the new group protocol enabled with the `group.coordinator.rebalance.protocols` property. -Broker version must be Apache Kafka 3.7.0 or newer. +Broker version must be Apache Kafka 3.7.0 or newer. See Apache Kafka +[Release Notes](https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes). Client side, it can be enabled by setting the new property `group.protocol=consumer`. A second property named `group.remote.assignor` is added to choose desired @@ -1568,35 +1573,35 @@ remote assignor. - Max poll interval is enforced - Offline upgrade from an empty consumer group with committed offsets -**Missing features** +**Future features** - Regular expression support when subscribing - AdminClient changes as described in the KIP -- Client side assignors as described in the KIP -- Online upgrade of a non-empty consumer group **Contract changes** Along with the new feature there are some needed contract changes, so the protocol will be enabled by default only with a librdkafka major release. - - These client properties are deprecated: `partition.assignment.strategy`, - replaced by `group.remote.assignor`; -`session.timeout.ms` and `heartbeat.interval.ms`, replaced by broker configurations -`group.consumer.session.timeout.ms` and `group.consumer.heartbeat.interval.ms`. + - Deprecated client configurations with the new protocol: + - `partition.assignment.strategy` replaced by `group.remote.assignor` + - `session.timeout.ms` replaced by broker configuration `group.consumer.session.timeout.ms` + - `heartbeat.interval.ms`, replaced by broker configuration `group.consumer.heartbeat.interval.ms` + - `group.protocol.type` which is not used in the new protocol - Protocol rebalance is fully incremental, so the only allowed functions to use in a rebalance callback will be `rd_kafka_incremental_assign` and `rd_kafka_incremental_unassign`. Currently you can still use existing code and the expected function to call is determined based on the chosen - `partition.assignment.strategy` but this will be removed already in next + `partition.assignment.strategy` but this will be removed in next release. When setting the `group.remote.assignor` property, it's already required to use the incremental assign and unassign functions. + All assignors are sticky with new protocol, including the _range_ one, that wasn't. - With a static group membership, if two members are using the same - `group.instance.id`, the one that joins the consumer group last will be + `group.instance.id`, the one that joins the consumer group later will be fenced, with the fatal `UNRELEASED_INSTANCE_ID` error. Before, it was the existing member to be fenced. This was changed to avoid two members contending the same id. It also means that any instance that crashes won't be automatically @@ -1615,21 +1620,18 @@ so the protocol will be enabled by default only with a librdkafka major release. For the same reason, when closing or unsubscribing with auto-commit set, the member will try to commit until a specific timeout has passed. Currently the timeout is the same as the `classic` protocol and it corresponds - to the deprecated `session.timeout.ms`, but it will change before the feature + to the `session.timeout.ms`, but it will change before the feature reaches a stable state. - - An `UNKNOWN_TOPIC_OR_PART` error won't be received when a consumer is + - An `UNKNOWN_TOPIC_OR_PART` error isn't received anymore when a consumer is subscribing to a topic that doesn't exist in local cache, as the consumer is still subscribing to the topic and it could be created just after that. - A consumer won't do a preliminary Metadata call that returns a - `TOPIC_AUTHORIZATION_FAILED`, topic partitions will be assigned to the member + `TOPIC_AUTHORIZATION_FAILED`, as it's happening with group protocol `classic`. + Topic partitions will still be assigned to the member by the Coordinator only if it's authorized to consume from the topic. - - Number of assign/revoke callbacks isn't fixed anymore, as it depends on - heartbeat timing. - - ### Note on Batch consume APIs From 4bc0cdf4e33e5fd402bb8473ef58ea3ff0215953 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 7 May 2024 00:02:25 +0200 Subject: [PATCH 4/5] Address comments --- CHANGELOG.md | 8 +++++--- src/rdkafka_cgrp.c | 7 +++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 610e4c2620..cad57c1ff4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,13 +37,15 @@ librdkafka v2.4.0 is a feature release: ## Early Access - * The new consumer group rebalance protocol, defined in KIP 848, is still _not production-ready_ and _not supported_. - It's possible to try it in a non-production enviroment. - With this protocol the role of the Group Leader (a member) is removed and +### [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): The Next Generation of the Consumer Rebalance Protocol + * With this new protocol the role of the Group Leader (a member) is removed and the assignment is calculated by the Group Coordinator (a broker) and sent to each member through heartbeats. + The feature is still _not production-ready_ and _not supported_. + It's possible to try it in a non-production enviroment. + A [guide](INTRODUCTION.md#next-generation-of-the-consumer-group-protocol-kip-848) is available with considerations and steps to follow to test it (#4610). diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index f10be22f29..1917991ddd 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -486,6 +486,13 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, rk->rk_conf.auto_commit_interval_ms * 1000ll, rd_kafka_cgrp_offset_commit_tmr_cb, rkcg); + if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) { + rd_kafka_log( + rk, LOG_WARNING, "CGRP", + "KIP-848 Consumer Group Protocol is in Early Access " + "and MUST NOT be used in production"); + } + return rkcg; } From 8ea2cee442db8442ba0df4979f79bb959a095bf1 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 7 May 2024 10:13:20 +0200 Subject: [PATCH 5/5] Remove the not supported statement --- CHANGELOG.md | 2 +- INTRODUCTION.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cad57c1ff4..cbdf0fbc9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,7 +43,7 @@ librdkafka v2.4.0 is a feature release: the assignment is calculated by the Group Coordinator (a broker) and sent to each member through heartbeats. - The feature is still _not production-ready_ and _not supported_. + The feature is still _not production-ready_. It's possible to try it in a non-production enviroment. A [guide](INTRODUCTION.md#next-generation-of-the-consumer-group-protocol-kip-848) is available diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 56cf5f1c8b..1cefbc5aaa 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1547,8 +1547,8 @@ Starting from librdkafka 2.4.0 the next generation consumer group rebalance prot defined in KIP 848 is introduced. **Warning** -It's still in **Early Access** which means it's _not production-ready_ and -_not supported_, given it's still under validation and lacking some needed features. +It's still in **Early Access** which means it's _not production-ready_, +given it's still under validation and lacking some needed features. Features and their contract might change in future. With this protocol the role of the Group Leader (a member) is removed and