Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide the whole batch of messages in the Consumer's API #343

Closed
aptiko opened this issue Mar 27, 2018 · 4 comments
Closed

Provide the whole batch of messages in the Consumer's API #343

aptiko opened this issue Mar 27, 2018 · 4 comments

Comments

@aptiko
Copy link

aptiko commented Mar 27, 2018

My understanding is that the librdkafka consumer reads data in batches into a buffer, and in Python poll() provides items from the buffer one by one. But the logic in my program requires data to be read in batches. So I've implemented a wrapper around poll() that calls it many times in order to provide a batch. But this seems silly when I could just get hold of the original batch; and besides silly, it's hard to make it work well in all the cases.

@edenhill
Copy link
Contributor

The notion of Kafka batches has little meaning to the application and is thus abstracted.
Your current approach with collecting a number of messages by repeatedly calling poll() (and adjusting the timeout) is the recommended solution.

@aptiko
Copy link
Author

aptiko commented Mar 27, 2018

OK, however I have some trouble when collecting the messages by repeatedly calling poll().

I have a function called poll_many() which does exactly that.

Suppose it's collecting messages, it has called poll() five times and got five messages, and while it's polling a sixth time there's an error. What is it going to do?

If it raises an exception, there's the danger that these five messages are going to be committed although they have never been used. (We use autocommit=False, however poll_many()'s caller might decide to commit.) Is it possible to tell confluent_kafka/librdkafka "pretend you never read these messages", i.e. put them back to the incoming buffer, or reset the offset, but without committing anything?

If it ignores the error and returns the list of five messages, well, that's a bit ugly, isn't it? I don't like pretending that everything was done successfully in the presence of an error (and pray that next time poll_many() is called the error will occur again in the first poll() so that it can be properly raised).

@edenhill
Copy link
Contributor

It is not possible to put messages back on the consumer queue, but you can control what messages are to be committed by disable enable.auto.offset.store and explicitly calling store_offsets() when you want to increase the high watermark of messages processed.
This ensures that the auto committer will only commit messages that you have processed.

More info:
#300 (comment)

@aptiko
Copy link
Author

aptiko commented Apr 19, 2018

Getting messages in batches seems to have been fixed in #252.

@aptiko aptiko closed this as completed Apr 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants