diff --git a/changelog.md b/changelog.md index 71c30270a..4bccd59fe 100644 --- a/changelog.md +++ b/changelog.md @@ -12,6 +12,12 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). * Fix a segfault when calling `Consumer::position` on a consumer that was improperly configured ([#360]). +* **Breaking change.** Change `Consumer::store_offset` to accept the topic, + partition, and offset directly ([#89], [#368]). The old API, which took a + `BorrowedMessage`, is still accessible as + `Consumer::store_offset_from_message`. + +[#89]: https://github.com/fede1024/rust-rdkafka/issues/89 [#360]: https://github.com/fede1024/rust-rdkafka/issues/360 diff --git a/examples/at_least_once.rs b/examples/at_least_once.rs index c0b876bc4..afe413e91 100644 --- a/examples/at_least_once.rs +++ b/examples/at_least_once.rs @@ -163,7 +163,7 @@ async fn main() { .expect("Message delivery failed for some topic"); // Now that the message is completely processed, add it's position to the offset // store. The actual offset will be committed every 5 seconds. - if let Err(e) = consumer.store_offset(&m) { + if let Err(e) = consumer.store_offset_from_message(&m) { warn!("Error while storing offset: {}", e); } } diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 6fbc0ee52..5048ffa26 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -384,7 +384,17 @@ where } } - fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> { + fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> { + let topic = self.client.native_topic(topic)?; + let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) }; + if error.is_error() { + Err(KafkaError::StoreOffset(error.into())) + } else { + Ok(()) + } + } + + fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> { let error = unsafe { rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset()) }; diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index b2d75c98c..3fb417b6e 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -238,10 +238,14 @@ where /// commit every message with lower offset within the same partition. fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()>; - /// Stores offset for this message to be used on the next (auto)commit. When + /// Stores offset to be used on the next (auto)commit. When /// using this `enable.auto.offset.store` should be set to `false` in the /// config. - fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>; + fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()>; + + /// Like [`Consumer::store_offset`], but the offset to store is derived from + /// the provided message. + fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>; /// Store offsets to be used on the next (auto)commit. When using this /// `enable.auto.offset.store` should be set to `false` in the config. diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs index 888550c6e..0e7b8906e 100644 --- a/src/consumer/stream_consumer.rs +++ b/src/consumer/stream_consumer.rs @@ -398,8 +398,12 @@ where self.base.commit_message(message, mode) } - fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> { - self.base.store_offset(message) + fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> { + self.base.store_offset(topic, partition, offset) + } + + fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> { + self.base.store_offset_from_message(message) } fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> { diff --git a/tests/test_high_consumers.rs b/tests/test_high_consumers.rs index 138ac6e1b..a276ef7ca 100644 --- a/tests/test_high_consumers.rs +++ b/tests/test_high_consumers.rs @@ -309,7 +309,7 @@ async fn test_consumer_store_offset_commit() { match message { Ok(m) => { if m.partition() == 1 { - consumer.store_offset(&m).unwrap(); + consumer.store_offset_from_message(&m).unwrap(); } } Err(KafkaError::PartitionEOF(_)) => {}