-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modify topic_mirror to include questions, notes #90
Conversation
This commit adjusts the topic_mirror example to include retry in the case of enqueuing failure, removes the TopicPartitionList in favor of performing offset commits directly with BorrowedMessage. It's possible I've not understood the purpose of TopicPartitionList. As it is now, I think I've saved on one allocation per incoming message by removing TopicPartitionList, have imposed additional allocations per failed enqueuing and have open questions around retrying when production fails. Relevant issue: [fede1024#89](fede1024#89) Signed-off-by: Brian L. Troutwine <[email protected]>
examples/topic_mirror.rs
Outdated
// | ||
// Either approach, potentially, interacts with the retry | ||
// question below. | ||
while let Err(_) = self.source_consumer.store_offset(&m) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here m
is the message that was just sent to Kafka (the copy, not the original one), so you'd be committing the offset of the copy, which is not what you want. The TopicPartitionList
stores the location (offset and partition) of the original message, which is the one you want to commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, d'oh. Of course.
examples/topic_mirror.rs
Outdated
// data loss. Should, then, the author cobble together a MinHeap | ||
// (or similar) approach to commit only when all the suitable | ||
// offsets have come through -- implying storage here -- or rely | ||
// on rdkafka to take care of this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
librdkafka
doesn't provide a way to commit in order, so yes, we should implement a different way to keep track of them and only periodically commit the lowest one per partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, by rdkafka, I meant this crate, not librdkafka
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether rdkafka would be the right place for this functionality or not, I'm still a touch confused about how to pull this off, mostly down to lifetime management.
In this example, the TopicPartitionList
will always have one originating message associated with it. My initial idea was to TopicPartitionList::elements
to retrieve the single TopicPartitionListElem
and store that in an BinaryHeap
. There's no Ord
impl for TopicPartitionListElem
yet—resolvable—but the key difficulty is the lack of Sync on *mut rdkafka_sys::bindings::macos_64::rd_kafka_topic_partition_list_s
. My latest commit has the full code changes to reproduce, but here's a sample of the compilation error:
error[E0277]: `*mut rdkafka_sys::bindings::macos_64::rd_kafka_topic_partition_list_s` cannot be shared between threads safely
--> examples/topic_mirror.rs:70:10
|
70 | impl<'a> ProducerContext for MirrorProducerContext<'a> {
| ^^^^^^^^^^^^^^^ `*mut rdkafka_sys::bindings::macos_64::rd_kafka_topic_partition_list_s` cannot be shared between threads safely
|
= help: within `rdkafka::topic_partition_list::TopicPartitionListElem<'a>`, the trait `std::marker::Sync` is not implemented for `*mut rdkafka_sys::bindings::macos_64::rd_kafka_topic_partition_list_s`
= note: required because it appears within the type `rdkafka::TopicPartitionList`
= note: required because it appears within the type `&'a rdkafka::TopicPartitionList`
= note: required because it appears within the type `rdkafka::topic_partition_list::TopicPartitionListElem<'a>`
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<rdkafka::topic_partition_list::TopicPartitionListElem<'a>>`
= note: required because it appears within the type `alloc::raw_vec::RawVec<rdkafka::topic_partition_list::TopicPartitionListElem<'a>>`
= note: required because it appears within the type `std::vec::Vec<rdkafka::topic_partition_list::TopicPartitionListElem<'a>>`
= note: required because it appears within the type `std::collections::BinaryHeap<rdkafka::topic_partition_list::TopicPartitionListElem<'a>>`
= note: required because it appears within the type `MirrorProducerContext<'a>`
As per @fede1024's review, I had indeed misunderstood the purpose of the TopicPartitionList. It is not re-introduced, allowing the program to correctly commit offsets. Signed-off-by: Brian L. Troutwine <[email protected]>
This current commit fails to compile for want of lifetime issues with an underlying raw pointer in the TopicPartitionListElem. The notion, however, is to have the producer store up a MinHeap of known-good productions and only commit them once enough have built up for consistency, or something. Signed-off-by: Brian L. Troutwine <[email protected]>
This commit introduces TopicOffsetMap, a structure which records the 'floor' of a topic/partition and a MinHeap of offsets, allowing the producer to record offsets on delivery and only commit those which are sequential from the floor up. This doesn't work, most obviously because `ProducerContext::delivery` takes a `&self` rather than `&mut self`. It's also possible that this behaviour should be included somewhere else. Signed-off-by: Brian L. Troutwine <[email protected]>
This commit includes changes that force the MirrorProducerContext to commit offsets in-order, from the least to greatest, no matter receipt of the message in `delivery`. The implementation is a little grody but seems, overall, to work. The implementation of Ord/PartialOrd for Offset is suspicious, as is TopicOffsetMap in terms of CPU time. I kinda wonder if the TopicOffsetMap isn't something that would be best pushed into a Consumer variant. Signed-off-by: Brian L. Troutwine <[email protected]>
@fede1024 I've added in-order offset commits to my most recent commit. It's a little gross as-is and I'm not sure if the mutability modifications I've made to BaseProducer are acceptable. After doing this I have a vague notion that this might live best in a Consumer somewhere. |
I've realized now that I've got this wired up backward: the consumer needs to be the one to keep the offset run minheap, not the producer. Consider the use case where the user is not trying to establish a mirror but do filtering in some fashion. There, the calling code will need to signal to the Consumer that an offset has been processed--via I'll adjust my PR appropriately. |
Er, no. Just moving the TopicOffsetMap into Consumer suffers the same problem of needing to adjust mutability of the self. So, I started down the road of wrapping If the ConsumerContext functions were adjusted to be |
When I left off in my last commit I thought it would be possible to push offset ordering into come kind of consumer, or into a consumer context. That proved to be very difficult, owing to where the offsets are needed to make decisions. Now, I've made a mutex protected OffsetStore that is available inside the producer context, consumer context etc. The idea here is, whenever a rebalance happens the internal map of OffsetStore is updated, the polling function can make its own decisions about what to commit without involving the producer--as noted in comments--and the producer does basically what it used to. This is a little involved but essentially works. Or, should. I've run short of time today and haven't fully tested it out. I need to make further commits to undo the fiddling I did in rdkafka. All of that turned out to be a dead-end. Signed-off-by: Brian L. Troutwine <[email protected]>
This commit adjusts the topic_mirror example to include retry
in the case of enqueuing failure, removes the TopicPartitionList
in favor of performing offset commits directly with BorrowedMessage.
It's possible I've not understood the purpose of TopicPartitionList.
As it is now, I think I've saved on one allocation per incoming message
by removing TopicPartitionList, have imposed additional allocations per
failed enqueuing and have open questions around retrying when production
fails.
Apologies for some of the rustfmt reshuffling.
Relevant issue: #89
Signed-off-by: Brian L. Troutwine [email protected]