From 5d5c15ffb571001c0da284861977141d00b4f6db Mon Sep 17 00:00:00 2001 From: Sreeni Date: Tue, 13 Apr 2021 00:31:00 -0700 Subject: [PATCH 1/2] Cooperative Incremental sticky rebalance support --- src/consumer/mod.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 3fb417b6e..9d0e95c6c 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -74,11 +74,19 @@ pub trait ConsumerContext: ClientContext { unsafe { match err { RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => { - rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr()); + if self.is_incremental_assign() { + rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr()); + } else { + rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr()); + } } _ => { // Also for RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS - rdsys::rd_kafka_assign(native_client.ptr(), ptr::null()); + if self.is_incremental_assign() { + rdsys::rd_kafka_incremental_assign(native_client.ptr(), ptr::null()); + } else { + rdsys::rd_kafka_assign(native_client.ptr(), ptr::null()); + } } } } @@ -91,6 +99,12 @@ pub trait ConsumerContext: ClientContext { #[allow(unused_variables)] fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {} + /// Override this to return true when using cooperative-sticky option for + /// partition.assignment.strategy + fn is_incremental_assign(&self) -> bool { + false + } + /// Post-rebalance callback. This method will run after the rebalance and /// should terminate its execution quickly. #[allow(unused_variables)] From 7321941063dc6c3167997399a13e8234a807e368 Mon Sep 17 00:00:00 2001 From: Sreeni Date: Tue, 13 Apr 2021 01:25:12 -0700 Subject: [PATCH 2/2] Support cooperative incremental sticky rebalance --- changelog.md | 19 +++++++++++++-- src/client.rs | 11 +++++++++ src/consumer/base_consumer.rs | 5 ++++ src/consumer/mod.rs | 42 ++++++++++++++++++++------------- src/consumer/stream_consumer.rs | 7 +++++- 5 files changed, 65 insertions(+), 19 deletions(-) diff --git a/changelog.md b/changelog.md index 6f23426c6..4ab66b800 100644 --- a/changelog.md +++ b/changelog.md @@ -2,8 +2,8 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). - -## 0.26.1 (Unreleased) + +## 0.27.0 (Unreleased) * Allow offset 0 in `Offset::to_raw`. @@ -25,11 +25,26 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). `BorrowedMessage`, is still accessible as `Consumer::store_offset_from_message`. +* Support incremental cooperative rebalancing ([#364]). There are two changes + of note: + + * The addition of `Consumer::rebalance_protocol` to determine the rebalance + protocol in use. + + * The modification of the default rebalance callback + (`ConsumerContext::rebalance`) to perform incremental assignments and + unassignments when the rebalance protocol in use is + [`RebalanceProtocol::Cooperative`]. + + Thanks, [@SreeniIO]. + [#89]: https://github.com/fede1024/rust-rdkafka/issues/89 [#95]: https://github.com/fede1024/rust-rdkafka/issues/95 [#360]: https://github.com/fede1024/rust-rdkafka/issues/360 +[#364]: https://github.com/fede1024/rust-rdkafka/issues/364 [#367]: https://github.com/fede1024/rust-rdkafka/issues/367 [@djKooks]: https://github.com/djKooks +[@SreeniIO]: https://github.com/SreeniIO ## 0.26.0 (2021-03-16) diff --git a/src/client.rs b/src/client.rs index 937960bb4..eaf563089 100644 --- a/src/client.rs +++ b/src/client.rs @@ -26,6 +26,7 @@ use rdkafka_sys as rdsys; use rdkafka_sys::types::*; use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel}; +use crate::consumer::RebalanceProtocol; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::groups::GroupList; use crate::metadata::Metadata; @@ -137,6 +138,16 @@ impl NativeClient { pub fn ptr(&self) -> *mut RDKafka { self.ptr.ptr() } + + pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol { + let protocol = unsafe { CStr::from_ptr(rdsys::rd_kafka_rebalance_protocol(self.ptr())) }; + match protocol.to_bytes() { + b"NONE" => RebalanceProtocol::None, + b"EAGER" => RebalanceProtocol::Eager, + b"COOPERATIVE" => RebalanceProtocol::Cooperative, + _ => unreachable!(), + } + } } /// A low-level rdkafka client. diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 5048ffa26..63e24c16d 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -18,6 +18,7 @@ use crate::config::{ }; use crate::consumer::{ CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, + RebalanceProtocol, }; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::groups::GroupList; @@ -566,6 +567,10 @@ where }; Ok(()) } + + fn rebalance_protocol(&self) -> RebalanceProtocol { + self.client.native_client().rebalance_protocol() + } } impl Drop for BaseConsumer diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 9d0e95c6c..76343677d 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -74,20 +74,23 @@ pub trait ConsumerContext: ClientContext { unsafe { match err { RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => { - if self.is_incremental_assign() { - rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr()); - } else { - rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr()); + match native_client.rebalance_protocol() { + RebalanceProtocol::Cooperative => { + rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr()); + } + _ => { + rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr()); + } } } - _ => { - // Also for RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS - if self.is_incremental_assign() { - rdsys::rd_kafka_incremental_assign(native_client.ptr(), ptr::null()); - } else { + _ => match native_client.rebalance_protocol() { + RebalanceProtocol::Cooperative => { + rdsys::rd_kafka_incremental_unassign(native_client.ptr(), tpl.ptr()); + } + _ => { rdsys::rd_kafka_assign(native_client.ptr(), ptr::null()); } - } + }, } } trace!("Running post-rebalance with {:?}", rebalance); @@ -99,12 +102,6 @@ pub trait ConsumerContext: ClientContext { #[allow(unused_variables)] fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {} - /// Override this to return true when using cooperative-sticky option for - /// partition.assignment.strategy - fn is_incremental_assign(&self) -> bool { - false - } - /// Post-rebalance callback. This method will run after the rebalance and /// should terminate its execution quickly. #[allow(unused_variables)] @@ -181,6 +178,16 @@ unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata { unsafe impl Send for ConsumerGroupMetadata {} unsafe impl Sync for ConsumerGroupMetadata {} +/// The rebalance protocol for a consumer. +pub enum RebalanceProtocol { + /// The consumer has not (yet) joined a group. + None, + /// Eager rebalance protocol. + Eager, + /// Cooperative rebalance protocol. + Cooperative, +} + /// Common trait for all consumers. /// /// # Note about object safety @@ -339,4 +346,7 @@ where /// Resumes consumption for the provided list of partitions. fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>; + + /// Reports the rebalance protocol in use. + fn rebalance_protocol(&self) -> RebalanceProtocol; } diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 0e7b8906e..391380e24 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -20,7 +20,8 @@ use crate::client::{Client, ClientContext, NativeClient}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}; use crate::consumer::base_consumer::BaseConsumer; use crate::consumer::{ - CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, Rebalance, + CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, + Rebalance, RebalanceProtocol, }; use crate::error::{KafkaError, KafkaResult}; use crate::groups::GroupList; @@ -501,4 +502,8 @@ where fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> { self.base.resume(partitions) } + + fn rebalance_protocol(&self) -> RebalanceProtocol { + self.base.rebalance_protocol() + } }