diff --git a/src/topic_partition_list.rs b/src/topic_partition_list.rs index 0e3a15c83..0de2ab549 100644 --- a/src/topic_partition_list.rs +++ b/src/topic_partition_list.rs @@ -7,11 +7,13 @@ use std::ffi::{CStr, CString}; use std::fmt; use std::slice; +use libc::c_void; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; use crate::error::{IsError, KafkaError, KafkaResult}; -use crate::util::{KafkaDrop, NativePtr}; +use crate::message::ToBytes; +use crate::util::{self, KafkaDrop, NativePtr}; const PARTITION_UNASSIGNED: i32 = -1; @@ -134,6 +136,23 @@ impl<'a> TopicPartitionListElem<'a> { )), } } + + /// Returns the optional metadata associated with the entry. + pub fn metadata(&self) -> &[u8] { + unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) } + } + + /// Sets the optional metadata associated with the entry. + pub fn set_metadata(&mut self, metadata: &B) + where + B: ToBytes + ?Sized, + { + let metadata = metadata.to_bytes(); + let buf = unsafe { libc::malloc(metadata.len()) }; + unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) }; + self.ptr.metadata = buf; + self.ptr.metadata_size = metadata.len(); + } } impl<'a> PartialEq for TopicPartitionListElem<'a> { diff --git a/tests/test_high_consumers.rs b/tests/test_high_consumers.rs index a276ef7ca..3e0f8c29e 100644 --- a/tests/test_high_consumers.rs +++ b/tests/test_high_consumers.rs @@ -1,6 +1,7 @@ //! Test data consumption using high level consumers. use std::collections::HashMap; +use std::error::Error; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; @@ -372,3 +373,58 @@ async fn test_consumer_store_offset_commit() { .unwrap(); assert_eq!(position, consumer.position().unwrap()); } + +#[tokio::test(flavor = "multi_thread")] +async fn test_consumer_commit_metadata() -> Result<(), Box> { + let _ = env_logger::try_init(); + + let topic_name = rand_test_topic(); + let group_name = rand_test_group(); + populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await; + + let create_consumer = || async { + // Disable auto-commit so we can manually drive the commits. + let mut config = HashMap::new(); + config.insert("enable.auto.commit", "false"); + let consumer = create_stream_consumer(&group_name, Some(config)); + + // Subscribe to the topic and wait for at least one message, which + // ensures that the consumer group has been joined and such. + consumer.subscribe(&[topic_name.as_str()])?; + let _ = consumer.stream().next().await; + + Ok::<_, Box>(consumer) + }; + + // Create a topic partition list where each element has some associated + // metadata. + let tpl = { + let mut tpl = TopicPartitionList::new(); + let mut tpl1 = tpl.add_partition(&topic_name, 0); + tpl1.set_offset(Offset::Offset(1))?; + tpl1.set_metadata(b"one"); + let mut tpl2 = tpl.add_partition(&topic_name, 1); + tpl2.set_offset(Offset::Offset(1))?; + tpl2.set_metadata("t\x00wo"); + let mut tpl3 = tpl.add_partition(&topic_name, 2); + tpl3.set_offset(Offset::Offset(1))?; + tpl3.set_metadata(b"badutf8\x80"); + tpl + }; + + // Ensure that the commit state immediately includes the metadata. + { + let consumer = create_consumer().await?; + consumer.commit(&tpl, CommitMode::Sync)?; + assert_eq!(consumer.committed(None)?, tpl); + } + + // Ensure that the commit state on a new consumer in the same group + // can see the same metadata. + { + let consumer = create_consumer().await?; + assert_eq!(consumer.committed(None)?, tpl); + } + + Ok(()) +}