Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate partition queues no longer working in v1.6.0 #3231

Open
4 of 7 tasks
benesch opened this issue Jan 28, 2021 · 7 comments
Open
4 of 7 tasks

Separate partition queues no longer working in v1.6.0 #3231

benesch opened this issue Jan 28, 2021 · 7 comments

Comments

@benesch
Copy link
Contributor

benesch commented Jan 28, 2021

Description

We're seeing test failures in Materialize with librdkafka v1.6.0 that weren't present in v1.5.3 regarding split partition queues. Basically, we want n different queues to poll given a topic with n partitions. So we create a consumer, assign it to the partitions we want, then call something like:

rd_kafka_assign(rdk, assignment);
rd_kafka_queue_t q = rd_kafka_queue_get_partition(rdk, ...);
rd_kafka_queue_forward(q, NULL);

The code is in Rust and goes through the rust-rdkafka wrapper, so the reality is more complicated.

The problem is that the queue forwarding does not seem to take effect, at least not immediately. The next call to rd_kafka_poll(rdk) on the main consumer, which we expect to serve only callbacks and not return any messages, will return a message from the queue we just forwarded!

How to reproduce

The failing dependency bump PR is here, if it's of interest: MaterializeInc/materialize#5478

I don't have a nice tight reproduction at the moment, but I did manage to bisect the problem down to e4c24e9. The title of that commit is "Rewrote assignment handling and move it out of the cgrp", so definitely seems related.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

@edenhill
Copy link
Contributor

Are you checking the error return of rd_kafka_assign()? its error enforcement has been improved.

@benesch
Copy link
Contributor Author

benesch commented Jan 28, 2021

Ah, sorry, I simplified a bit too much. The call to rd_kafka_assign goes through the Rust wrapper which does check errors:

https://github.com/fede1024/rust-rdkafka/blob/41ccf186c0e0bcd3894b6de1beb10ef7ad8557dd/src/consumer/base_consumer.rs#L311-L319

@benesch
Copy link
Contributor Author

benesch commented Jan 28, 2021

I also assume that the assignment must be taking effect, right, because we do manage to get a message after the call to assign—just from the wrong queue!

@benesch
Copy link
Contributor Author

benesch commented Jan 28, 2021

Ok, I think I see the issue. rd_kafka_cgrp_assign now works like this:

rd_kafka_assignment_clear();
rd_kafka_assignment_serve();
rd_kafka_assignment_add(new_assignment);

So it seems like now after every assignment, existing partition queues are destroyed and new ones are created in their place? And these new queues do not maintain the forwarding of the original queues.

@benesch
Copy link
Contributor Author

benesch commented Jan 28, 2021

Ok, so this turned out to be easy enough to work around. Now after every call to assign we are careful to destroy our old partition queue references, get new queue references, and unforward those queues. The downstream PR is here, if you're curious: https://github.com/MaterializeInc/materialize/pull/5478/files#diff-ad5d38bf37b32dbbd4546b0fcd7f9b6daf573be8d5392847e1ed39c09b6c9f5cR579-R587

I don't have enough context to say whether librdkafka's new behavior is right or wrong, but it is most definitely different! Would be great if the documentation could be updated accordingly if you plan to stick with the new behavior. Roughly speaking, I think every call to rd_kafka_assign invalidates any previous queue references and undoes any previous rd_kafka_queue_forward calls. I haven't though through or tested whether this applies to assignments triggered by cgrp rebalancing too.

@edenhill
Copy link
Contributor

edenhill commented Apr 8, 2022

Great analysis, @benesch ! Would you like to submit a PR to update the docstring for the relevant rd_kafka_queue.. functions and rd_kafka_assign*() for this behaviour?

@benesch
Copy link
Contributor Author

benesch commented May 17, 2022

Apologies, but I'm spinning down my involvement with rdkafka and I won't have the time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants