Skip to content

Commit

Permalink
Merge pull request #364 from SreeniIO/sticky-rebalance
Browse files Browse the repository at this point in the history
Cooperative Incremental sticky rebalance
  • Loading branch information
benesch authored Oct 16, 2021
2 parents 108043c + 7321941 commit 3aa7f92
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 8 deletions.
19 changes: 17 additions & 2 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

<a name="0.26.1"></a>
## 0.26.1 (Unreleased)
<a name="0.27.0"></a>
## 0.27.0 (Unreleased)

* Allow offset 0 in `Offset::to_raw`.

Expand All @@ -25,11 +25,26 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
`BorrowedMessage`, is still accessible as
`Consumer::store_offset_from_message`.

* Support incremental cooperative rebalancing ([#364]). There are two changes
of note:

* The addition of `Consumer::rebalance_protocol` to determine the rebalance
protocol in use.

* The modification of the default rebalance callback
(`ConsumerContext::rebalance`) to perform incremental assignments and
unassignments when the rebalance protocol in use is
[`RebalanceProtocol::Cooperative`].

Thanks, [@SreeniIO].

[#89]: https://github.com/fede1024/rust-rdkafka/issues/89
[#95]: https://github.com/fede1024/rust-rdkafka/issues/95
[#360]: https://github.com/fede1024/rust-rdkafka/issues/360
[#364]: https://github.com/fede1024/rust-rdkafka/issues/364
[#367]: https://github.com/fede1024/rust-rdkafka/issues/367
[@djKooks]: https://github.com/djKooks
[@SreeniIO]: https://github.com/SreeniIO

<a name="0.26.0"></a>
## 0.26.0 (2021-03-16)
Expand Down
11 changes: 11 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
use crate::consumer::RebalanceProtocol;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::metadata::Metadata;
Expand Down Expand Up @@ -137,6 +138,16 @@ impl NativeClient {
pub fn ptr(&self) -> *mut RDKafka {
self.ptr.ptr()
}

pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol {
let protocol = unsafe { CStr::from_ptr(rdsys::rd_kafka_rebalance_protocol(self.ptr())) };
match protocol.to_bytes() {
b"NONE" => RebalanceProtocol::None,
b"EAGER" => RebalanceProtocol::Eager,
b"COOPERATIVE" => RebalanceProtocol::Cooperative,
_ => unreachable!(),
}
}
}

/// A low-level rdkafka client.
Expand Down
5 changes: 5 additions & 0 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::config::{
};
use crate::consumer::{
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
RebalanceProtocol,
};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::groups::GroupList;
Expand Down Expand Up @@ -566,6 +567,10 @@ where
};
Ok(())
}

fn rebalance_protocol(&self) -> RebalanceProtocol {
self.client.native_client().rebalance_protocol()
}
}

impl<C> Drop for BaseConsumer<C>
Expand Down
34 changes: 29 additions & 5 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,23 @@ pub trait ConsumerContext: ClientContext {
unsafe {
match err {
RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr());
}
_ => {
// Also for RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
match native_client.rebalance_protocol() {
RebalanceProtocol::Cooperative => {
rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr());
}
_ => {
rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr());
}
}
}
_ => match native_client.rebalance_protocol() {
RebalanceProtocol::Cooperative => {
rdsys::rd_kafka_incremental_unassign(native_client.ptr(), tpl.ptr());
}
_ => {
rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
}
},
}
}
trace!("Running post-rebalance with {:?}", rebalance);
Expand Down Expand Up @@ -167,6 +178,16 @@ unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata {
unsafe impl Send for ConsumerGroupMetadata {}
unsafe impl Sync for ConsumerGroupMetadata {}

/// The rebalance protocol for a consumer.
pub enum RebalanceProtocol {
/// The consumer has not (yet) joined a group.
None,
/// Eager rebalance protocol.
Eager,
/// Cooperative rebalance protocol.
Cooperative,
}

/// Common trait for all consumers.
///
/// # Note about object safety
Expand Down Expand Up @@ -325,4 +346,7 @@ where

/// Resumes consumption for the provided list of partitions.
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;

/// Reports the rebalance protocol in use.
fn rebalance_protocol(&self) -> RebalanceProtocol;
}
7 changes: 6 additions & 1 deletion src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::client::{Client, ClientContext, NativeClient};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel};
use crate::consumer::base_consumer::BaseConsumer;
use crate::consumer::{
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, Rebalance,
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
Rebalance, RebalanceProtocol,
};
use crate::error::{KafkaError, KafkaResult};
use crate::groups::GroupList;
Expand Down Expand Up @@ -501,4 +502,8 @@ where
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
self.base.resume(partitions)
}

fn rebalance_protocol(&self) -> RebalanceProtocol {
self.base.rebalance_protocol()
}
}

0 comments on commit 3aa7f92

Please sign in to comment.