Skip to content

Commit

Permalink
Re-introduce TopicPartitionList
Browse files Browse the repository at this point in the history
As per @fede1024's review, I had indeed misunderstood the purpose of
the TopicPartitionList. It is not re-introduced, allowing the program
to correctly commit offsets.

Signed-off-by: Brian L. Troutwine <[email protected]>
  • Loading branch information
Brian L. Troutwine committed Jul 10, 2018
1 parent d538c4f commit 95decde
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions examples/topic_mirror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ struct MirrorProducerContext {

impl ClientContext for MirrorProducerContext {}
impl ProducerContext for MirrorProducerContext {
type DeliveryOpaque = ();
type DeliveryOpaque = TopicPartitionList;

fn delivery(&self, delivery_result: &Result<BorrowedMessage, (KafkaError, BorrowedMessage)>, _delivery_opaque: ()) {
fn delivery(
&self,
delivery_result: &Result<BorrowedMessage, (KafkaError, BorrowedMessage)>,
delivery_opaque: TopicPartitionList,
) {
match delivery_result {
Ok(m) => {
debug!("Delivered copy of {:?} to {}", m.payload().unwrap(), m.topic());
debug!("Delivered copy of {:?} to {}", delivery_opaque, m.topic());
// Question(blt) -- This call to `store_offset` will eventually
// cause offsets to be committed out of order. That will cause
// data loss. Should, then, the author cobble together a MinHeap
Expand All @@ -80,7 +84,7 @@ impl ProducerContext for MirrorProducerContext {
//
// Either approach, potentially, interacts with the retry
// question below.
while let Err(_) = self.source_consumer.store_offset(&m) {
while let Err(_) = self.source_consumer.store_offset_list(&delivery_opaque) {
// TODO(blt) -- backoff
}
}
Expand Down Expand Up @@ -126,7 +130,10 @@ fn mirroring(consumer: &LoggingConsumer, producer: &MirrorProducer, output_topic
};

loop {
let mut record = BaseRecord::with_opaque_to(output_topic, ());
let mut tpl = TopicPartitionList::new();
tpl.add_message_offset(&message);

let mut record = BaseRecord::with_opaque_to(output_topic, tpl);
if message.key().is_some() {
record = record.key(message.key().unwrap());
}
Expand Down

0 comments on commit 95decde

Please sign in to comment.