diff --git a/Cargo.lock b/Cargo.lock index 75212b7c1..b93361ba3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,10 +1070,11 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.30.0" +version = "0.31.0" dependencies = [ "async-std", "backoff", + "byteorder", "chrono", "clap", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index 36c2f0a77..d161f725d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ futures-util = { version = "0.3.0", default-features = false } libc = "0.2.0" log = "0.4.8" serde = { version = "1.0.0", features = ["derive"] } +byteorder = "1.3.2" serde_derive = "1.0.0" serde_json = "1.0.0" slab = "0.4" diff --git a/src/error.rs b/src/error.rs index c3f046dad..9c0589d7d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -159,6 +159,8 @@ pub enum KafkaError { MessageProduction(RDKafkaErrorCode), /// Metadata fetch error. MetadataFetch(RDKafkaErrorCode), + /// Member assignment parsing failed + MemberAssignment(String), /// No message was received. NoMessageReceived, /// Unexpected null pointer @@ -216,6 +218,9 @@ impl fmt::Debug for KafkaError { KafkaError::MetadataFetch(err) => { write!(f, "KafkaError (Metadata fetch error: {})", err) } + KafkaError::MemberAssignment(ref err) => { + write!(f, "KafkaError (Member assignment parsing error: {})", err) + } KafkaError::NoMessageReceived => { write!(f, "No message received within the given poll interval") } @@ -258,6 +263,7 @@ impl fmt::Display for KafkaError { KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err), KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err), KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err), + KafkaError::MemberAssignment(ref err) => write!(f, "Member assignment parsing error: {}", err), KafkaError::NoMessageReceived => { write!(f, "No message received within the given poll interval") } @@ -283,6 +289,7 @@ impl Error for KafkaError { KafkaError::Canceled => None, KafkaError::ClientConfig(..) => None, KafkaError::ClientCreation(_) => None, + KafkaError::MemberAssignment(_) => None, KafkaError::ConsumerCommit(err) => Some(err), KafkaError::Flush(err) => Some(err), KafkaError::Global(err) => Some(err), @@ -321,6 +328,7 @@ impl KafkaError { KafkaError::Canceled => None, KafkaError::ClientConfig(..) => None, KafkaError::ClientCreation(_) => None, + KafkaError::MemberAssignment(_) => None, KafkaError::ConsumerCommit(err) => Some(*err), KafkaError::Flush(err) => Some(*err), KafkaError::Global(err) => Some(*err), diff --git a/src/groups.rs b/src/groups.rs index 2c805dc79..7127b81ab 100644 --- a/src/groups.rs +++ b/src/groups.rs @@ -1,12 +1,26 @@ -//! Group membership API. - use std::ffi::CStr; use std::fmt; use std::slice; +use std::io::Cursor; + +use byteorder::{BigEndian, ReadBytesExt}; +use serde::{Serialize, Deserialize}; + +/// Group member assignment +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct MemberAssignment { + /// Kafka topic name + pub topic: String, + /// Assigned partitions + pub partitions: Vec, +} use rdkafka_sys as rdsys; use rdkafka_sys::types::*; +use crate::error::KafkaError; +use crate::error::KafkaResult; +use crate::util::read_str; use crate::util::{KafkaDrop, NativePtr}; /// Group member information container. @@ -54,18 +68,38 @@ impl GroupMemberInfo { } } - /// Return the partition assignment of the member. - pub fn assignment(&self) -> Option<&[u8]> { - unsafe { - if self.0.member_assignment.is_null() { - None - } else { - Some(slice::from_raw_parts::( - self.0.member_assignment as *const u8, - self.0.member_assignment_size as usize, - )) + /// Return the assignment of the member + pub fn assignment(&self) -> KafkaResult> { + if self.0.member_assignment.is_null() { + return Ok(Vec::new()); + } + let payload = unsafe { + slice::from_raw_parts::( + self.0.member_assignment as *const u8, + self.0.member_assignment_size as usize, + ) + }; + let mut cursor = Cursor::new(payload); + let _version = cursor.read_i16::() + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))?; + let assign_len = cursor.read_i32::() + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))?; + let mut assigns = Vec::with_capacity(assign_len as usize); + for _ in 0..assign_len { + let topic = read_str(&mut cursor) + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))? + .to_string(); + let partition_len = cursor.read_i32::() + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))?; + let mut partitions = Vec::with_capacity(partition_len as usize); + for _ in 0..partition_len { + let partition = cursor.read_i32::() + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))?; + partitions.push(partition); } + assigns.push(MemberAssignment { topic, partitions }) } + Ok(assigns) } } diff --git a/src/util.rs b/src/util.rs index 16b146f58..a6d6e1a25 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,5 +1,6 @@ //! Utility functions and types. +use std::io::{Cursor, BufRead}; use std::ffi::CStr; use std::fmt; use std::future::Future; @@ -9,10 +10,12 @@ use std::os::raw::c_void; use std::ptr; use std::ptr::NonNull; use std::slice; +use std::str; use std::sync::Arc; #[cfg(feature = "naive-runtime")] use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use byteorder::{BigEndian, ReadBytesExt}; #[cfg(feature = "naive-runtime")] use futures_channel::oneshot; @@ -256,6 +259,14 @@ impl AsCArray for Vec { } } +pub(crate) fn read_str<'a>(rdr: &'a mut Cursor<&[u8]>) -> Result<&'a str, Box> { + let len = (rdr.read_i16::())? as usize; + let pos = rdr.position() as usize; + let slice = str::from_utf8(&rdr.get_ref()[pos..(pos + len)])?; + rdr.consume(len); + Ok(slice) +} + pub(crate) struct NativePtr where T: KafkaDrop, diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs index 3b2667a9c..eca7194df 100644 --- a/tests/test_metadata.rs +++ b/tests/test_metadata.rs @@ -4,6 +4,7 @@ use std::time::Duration; use rdkafka::config::ClientConfig; use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::groups::MemberAssignment; use rdkafka::error::KafkaError; use rdkafka::topic_partition_list::TopicPartitionList; @@ -159,6 +160,17 @@ async fn test_group_membership() { consumer_member.client_id(), "rdkafka_integration_test_client" ); + + assert_eq!(consumer_member.assignment().unwrap().len(), 1); + + let assignment_topic = &consumer_member.assignment().unwrap()[0]; + assert_eq!( + assignment_topic, + &MemberAssignment { + partitions: vec![0, 1, 2], + topic: topic_name + } + ); } #[tokio::test]