-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Achieving committing offsets only after successful productions #89
Comments
I spent a few days with rdkafka-sys and I wonder if an API adjustment for this crate wouldn't do the trick.
Storing an offset is done with only the information the |
Sorry for the late reply. Yes we could add such a method in the Consumer trait. I'm working on an example solution for your problem based on the |
I apologize for my late delay, I was on vacation. Into the Consumer trait? That's distinct from what I was thinking but I look forward to your example. Please let me know if I can help out in any way. I'll be back to work on Monday and would be pleased to pitch in. |
Unfortunately I didn't have as much time as I thought to dedicate to this. However this is what I've got so far. I've changed the API to make this use case a bit easier to implement. This approach is similar to the first one you mentioned, so it will likely commit out of order and potentially cause data-loss. rust-rdkafka at the moment doesn't provide a way to only commit in order, but it shouldn't be too hard to implement some additional logic to do that. |
This commit adjusts the topic_mirror example to include retry in the case of enqueuing failure, removes the TopicPartitionList in favor of performing offset commits directly with BorrowedMessage. It's possible I've not understood the purpose of TopicPartitionList. As it is now, I think I've saved on one allocation per incoming message by removing TopicPartitionList, have imposed additional allocations per failed enqueuing and have open questions around retrying when production fails. Relevant issue: [fede1024#89](fede1024#89) Signed-off-by: Brian L. Troutwine <[email protected]>
I've got myself worked into a corner and I'm not sure how to work back out with the API as-is. I could use a little guidance. My basic ambition is laid out in an issue on the python client. That is, I'd like to write a process that reads from some
input
topic and writes an anoutput
topic and commits offsets toinput
only if the record has been successfully produced tooutput
.As of 0.17, rdkafka's
ProducerContext::delivery
is called with aDeliveryResult
that contains aBorrowedMessage
to the record published tooutput
and not theBorrowedMessage
which came in frominput
. But,Consumer::store_offset
requires a&BorrowedMessage
be passed, the same that came in from theConsumer
.Okay, so I have to get the original
&BorrowedMessage
through toProducerContext::delivery
along with a reference to theConsumer
to accomplish my aim, I think. It occurred to me that I might pass a(&BorrowedMessage, &Consumer)
through as aProducerContext::DeliveryOpaque
but that seems excessively complicated. Not to mention, there will need to be a rectification layer in there somewhere so that offsets are only committed in-order. I'm really not sure how to fit that in.I noticed that
examples/at_least_once.rs
accomplishes this by auto-committing explicitly stored offsets but waits synchronously for produced records to signal back with success. This is close to what I'd like to accomplish but the throughput is a little disappointing, what with the serialization on every production.I guess, is there a more throughput oriented way to commit consumed offsets only after records that result from them have been successfully produced other than serializing on writes? It feels like there's something close with
ProducerContext::delivery
but I'm not quite finding it.The text was updated successfully, but these errors were encountered: