diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index c67c90cb2..118e6f4d1 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -656,6 +656,27 @@ where } } + fn get_watermark_offsets(&self, topic: &str, partition: i32) -> KafkaResult<(i64, i64)> { + let mut low = -1; + let mut high = -1; + let topic_c = CString::new(topic.to_string())?; + let result = unsafe { + rdsys::rd_kafka_get_watermark_offsets( + self.client.native_ptr(), + topic_c.as_ptr(), + partition, + &mut low as *mut i64, + &mut high as *mut i64, + ) + }; + + if result.is_error() { + Err(KafkaError::MetadataFetch(result.into())) + } else { + Ok((low, high)) + } + } + fn position(&self) -> KafkaResult { let tpl = self.assignment()?; let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) }; diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 5ce8b05b1..93f8955a7 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -369,6 +369,15 @@ where T: Into, Self: Sized; + /// Get last known low (oldest/beginning) and high (newest/end) offsets for partition. + /// + /// The low offset is updated periodically (if statistics.interval.ms is set) while the + /// high offset is updated on each fetched message set from the broker. + /// + /// If there is no cached offset (either low or high, or both) then OFFSET_INVALID will + /// be returned for the respective offset. + fn get_watermark_offsets(&self, topic: &str, partition: i32) -> KafkaResult<(i64, i64)>; + /// Retrieve current positions (offsets) for topics and partitions. fn position(&self) -> KafkaResult; diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 5a7f60552..3ce05377d 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -493,6 +493,10 @@ where self.base.offsets_for_times(timestamps, timeout) } + fn get_watermark_offsets(&self, topic: &str, partition: i32) -> KafkaResult<(i64, i64)> { + self.base.get_watermark_offsets(topic, partition) + } + fn position(&self) -> KafkaResult { self.base.position() } diff --git a/tests/test_high_consumers.rs b/tests/test_high_consumers.rs index 4aec62595..9546260f2 100644 --- a/tests/test_high_consumers.rs +++ b/tests/test_high_consumers.rs @@ -314,6 +314,19 @@ async fn test_consumer_commit_message() { (0, 12) ); + assert_eq!( + consumer.get_watermark_offsets(&topic_name, 0).unwrap(), + (0, 10) + ); + assert_eq!( + consumer.get_watermark_offsets(&topic_name, 1).unwrap(), + (0, 11) + ); + assert_eq!( + consumer.get_watermark_offsets(&topic_name, 2).unwrap(), + (0, 12) + ); + let mut assignment = TopicPartitionList::new(); assignment .add_partition_offset(&topic_name, 0, Offset::Stored) @@ -399,6 +412,19 @@ async fn test_consumer_store_offset_commit() { (0, 12) ); + assert_eq!( + consumer.get_watermark_offsets(&topic_name, 0).unwrap(), + (0, 10) + ); + assert_eq!( + consumer.get_watermark_offsets(&topic_name, 1).unwrap(), + (0, 11) + ); + assert_eq!( + consumer.get_watermark_offsets(&topic_name, 2).unwrap(), + (0, 12) + ); + let mut assignment = TopicPartitionList::new(); assignment .add_partition_offset(&topic_name, 0, Offset::Stored)