-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix to remove fetch queue messages that blocked the destroy of rdkafka instances #4724
Conversation
How to reproduce: happening sporadically with test |
108046f
to
82080db
Compare
82080db
to
9b9d001
Compare
FWIW I've confirmed that this branch also fixes a non-constant but somewhat frequent issue I've been observing. I reproduced it by stressing the client node's swap while also stressing the broker node's cpu. It took a few restart cycles but within several hours the deadlocked destroy call occurred. |
Is there any expected date on this to be merged? |
🎉 All Contributor License Agreements have been signed. Ready to merge. |
One step closer! 🎉 |
src/rdkafka_broker.c
Outdated
@@ -3407,6 +3407,8 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { | |||
: (topic_err | |||
? topic_err | |||
: RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)); | |||
|
|||
rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we do this only for the Consumer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, better to avoid it for the producer given rktp_fetchq
is allocated there too but the queue isn't forwarded we avoid looping through this empty queue.
…a instances Circular dependencies from a partition fetch queue message to the same partition blocked the destroy of an instance, that happened in case the partition was removed from the cluster while it was being consumed. Solved by purging internal partition queue, after being stopped and removed, to allow reference count to reach zero and trigger a destroy
on removing the partition only for the consumer
9b9d001
to
7ce0d72
Compare
Addressed comment, updated CHANGELOG and rebased |
Hi, is there an estimated time to have this fix in a release? I have got the same issue when closing kafka consumer #4885. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!.
Any plan to release the fix in a new version? |
@pranavrth @emasab Hi, is there any expected time to have this fix in a new release? |
…a instances (#4724) Circular dependencies from a partition fetch queue message to the same partition blocked the destroy of an instance, that happened in case the partition was removed from the cluster while it was being consumed. Solved by purging internal partition queue, after being stopped and removed, to allow reference count to reach zero and trigger a destroy. Purging internal fetch queue on removing the partition only for the consumer.
Circular dependencies from a partition fetch queue message to the same partition blocked the destroy of an instance, that happened in case the partition was removed from the cluster while it was being consumed. Solved by purging internal partition queue, after being stopped and removed, to allow reference count to reach zero and trigger a destroy