diff --git a/changelog.md b/changelog.md
index 6f23426c6..4ab66b800 100644
--- a/changelog.md
+++ b/changelog.md
@@ -2,8 +2,8 @@
See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
-
-## 0.26.1 (Unreleased)
+
+## 0.27.0 (Unreleased)
* Allow offset 0 in `Offset::to_raw`.
@@ -25,11 +25,26 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
`BorrowedMessage`, is still accessible as
`Consumer::store_offset_from_message`.
+* Support incremental cooperative rebalancing ([#364]). There are two changes
+ of note:
+
+ * The addition of `Consumer::rebalance_protocol` to determine the rebalance
+ protocol in use.
+
+ * The modification of the default rebalance callback
+ (`ConsumerContext::rebalance`) to perform incremental assignments and
+ unassignments when the rebalance protocol in use is
+ [`RebalanceProtocol::Cooperative`].
+
+ Thanks, [@SreeniIO].
+
[#89]: https://github.com/fede1024/rust-rdkafka/issues/89
[#95]: https://github.com/fede1024/rust-rdkafka/issues/95
[#360]: https://github.com/fede1024/rust-rdkafka/issues/360
+[#364]: https://github.com/fede1024/rust-rdkafka/issues/364
[#367]: https://github.com/fede1024/rust-rdkafka/issues/367
[@djKooks]: https://github.com/djKooks
+[@SreeniIO]: https://github.com/SreeniIO
## 0.26.0 (2021-03-16)
diff --git a/src/client.rs b/src/client.rs
index 937960bb4..eaf563089 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -26,6 +26,7 @@ use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
+use crate::consumer::RebalanceProtocol;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::metadata::Metadata;
@@ -137,6 +138,16 @@ impl NativeClient {
pub fn ptr(&self) -> *mut RDKafka {
self.ptr.ptr()
}
+
+ pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol {
+ let protocol = unsafe { CStr::from_ptr(rdsys::rd_kafka_rebalance_protocol(self.ptr())) };
+ match protocol.to_bytes() {
+ b"NONE" => RebalanceProtocol::None,
+ b"EAGER" => RebalanceProtocol::Eager,
+ b"COOPERATIVE" => RebalanceProtocol::Cooperative,
+ _ => unreachable!(),
+ }
+ }
}
/// A low-level rdkafka client.
diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs
index 5048ffa26..63e24c16d 100644
--- a/src/consumer/base_consumer.rs
+++ b/src/consumer/base_consumer.rs
@@ -18,6 +18,7 @@ use crate::config::{
};
use crate::consumer::{
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
+ RebalanceProtocol,
};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::groups::GroupList;
@@ -566,6 +567,10 @@ where
};
Ok(())
}
+
+ fn rebalance_protocol(&self) -> RebalanceProtocol {
+ self.client.native_client().rebalance_protocol()
+ }
}
impl Drop for BaseConsumer
diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs
index 3fb417b6e..76343677d 100644
--- a/src/consumer/mod.rs
+++ b/src/consumer/mod.rs
@@ -74,12 +74,23 @@ pub trait ConsumerContext: ClientContext {
unsafe {
match err {
RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
- rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr());
- }
- _ => {
- // Also for RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
- rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
+ match native_client.rebalance_protocol() {
+ RebalanceProtocol::Cooperative => {
+ rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr());
+ }
+ _ => {
+ rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr());
+ }
+ }
}
+ _ => match native_client.rebalance_protocol() {
+ RebalanceProtocol::Cooperative => {
+ rdsys::rd_kafka_incremental_unassign(native_client.ptr(), tpl.ptr());
+ }
+ _ => {
+ rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
+ }
+ },
}
}
trace!("Running post-rebalance with {:?}", rebalance);
@@ -167,6 +178,16 @@ unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata {
unsafe impl Send for ConsumerGroupMetadata {}
unsafe impl Sync for ConsumerGroupMetadata {}
+/// The rebalance protocol for a consumer.
+pub enum RebalanceProtocol {
+ /// The consumer has not (yet) joined a group.
+ None,
+ /// Eager rebalance protocol.
+ Eager,
+ /// Cooperative rebalance protocol.
+ Cooperative,
+}
+
/// Common trait for all consumers.
///
/// # Note about object safety
@@ -325,4 +346,7 @@ where
/// Resumes consumption for the provided list of partitions.
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
+
+ /// Reports the rebalance protocol in use.
+ fn rebalance_protocol(&self) -> RebalanceProtocol;
}
diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs
index 0e7b8906e..391380e24 100644
--- a/src/consumer/stream_consumer.rs
+++ b/src/consumer/stream_consumer.rs
@@ -20,7 +20,8 @@ use crate::client::{Client, ClientContext, NativeClient};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel};
use crate::consumer::base_consumer::BaseConsumer;
use crate::consumer::{
- CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, Rebalance,
+ CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
+ Rebalance, RebalanceProtocol,
};
use crate::error::{KafkaError, KafkaResult};
use crate::groups::GroupList;
@@ -501,4 +502,8 @@ where
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
self.base.resume(partitions)
}
+
+ fn rebalance_protocol(&self) -> RebalanceProtocol {
+ self.base.rebalance_protocol()
+ }
}