diff --git a/rdkafka-sys/src/helpers.rs b/rdkafka-sys/src/helpers.rs index d8f739291..b4b3122e9 100644 --- a/rdkafka-sys/src/helpers.rs +++ b/rdkafka-sys/src/helpers.rs @@ -1,5 +1,7 @@ //! Utility functions. +use libc::c_char; + use crate::types::RDKafkaErrorCode; use crate::types::RDKafkaErrorCode::*; use crate::types::RDKafkaRespErr; @@ -175,3 +177,7 @@ pub fn rd_kafka_resp_err_t_to_rdkafka_error(err: RDKafkaRespErr) -> RDKafkaError RD_KAFKA_RESP_ERR_END_ALL => EndAll, } } + +extern "C" { + pub fn rd_strndup(_: *const c_char, _: isize) -> *mut c_char; +} diff --git a/src/topic_partition_list.rs b/src/topic_partition_list.rs index 53796df9b..e5154d36c 100644 --- a/src/topic_partition_list.rs +++ b/src/topic_partition_list.rs @@ -141,6 +141,18 @@ impl<'a> TopicPartitionListElem<'a> { let slice: &[u8] = unsafe { ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) }; from_utf8(slice).expect("metadata is not UTF-8") } + + /// Set the metadata. + pub fn set_metadata(&mut self, metadata: impl AsRef) { + let metadata = metadata.as_ref(); + let c_metadata = CString::new(metadata).expect("metadat is not UTF-8"); + + unsafe { + self.ptr.metadata = + rdsys::helpers::rd_strndup(c_metadata.as_ptr(), metadata.len() as isize) as *mut _; + self.ptr.metadata_size = libc::strlen(self.ptr.metadata as *const _); + } + } } impl<'a> PartialEq for TopicPartitionListElem<'a> {