From ea6621e84e1b7ec33f424c6fc9e64336c6076afc Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 21 May 2024 10:24:31 +0200 Subject: [PATCH 1/2] Fix to remove fetch queue messages that blocked the destroy of rdkafka instances Circular dependencies from a partition fetch queue message to the same partition blocked the destroy of an instance, that happened in case the partition was removed from the cluster while it was being consumed. Solved by purging internal partition queue, after being stopped and removed, to allow reference count to reach zero and trigger a destroy --- CHANGELOG.md | 14 +++++++-- src/rdkafka_broker.c | 2 ++ src/rdkafka_cgrp.c | 53 +--------------------------------- src/rdkafka_partition.c | 64 +++++++++++++++++++++++++++++++++++++++++ src/rdkafka_partition.h | 2 ++ 5 files changed, 81 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b7dc1b04f..56495ca393 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,10 @@ librdkafka v2.6.1 is a maintenance release: under some particular conditions (#4800). * Fix for retrieving offset commit metadata when it contains zeros and configured with `strndup` (#4876) - * Fix for a loop of ListOffset requests, happening in a Fetch From Follower - scenario, if such request is made to the follower (#4616, #4754, @kphelps). +* Fix for a loop of ListOffset requests, happening in a Fetch From Follower + scenario, if such request is made to the follower (#4616, #4754, @kphelps). +* Fix to remove fetch queue messages that blocked the destroy of rdkafka + instances (#4724) ## Fixes @@ -40,6 +42,14 @@ librdkafka v2.6.1 is a maintenance release: "Not leader for partition" error. Fixed by sending the request always to the leader. Happening since 1.5.0 (tested version) or previous ones (#4616, #4754, @kphelps). +* Issues: + Fix to remove fetch queue messages that blocked the destroy of rdkafka + instances. Circular dependencies from a partition fetch queue message to + the same partition blocked the destroy of an instance, that happened + in case the partition was removed from the cluster while it was being + consumed. Solved by purging internal partition queue, after being stopped + and removed, to allow reference count to reach zero and trigger a destroy. + Happening since 2.0.2 (#4724). diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 25b8c14d1f..48b36ce698 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3496,6 +3496,8 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { : (topic_err ? topic_err : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)); + + rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp); } rd_kafka_toppar_unlock(rktp); diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 547ec1eb98..a56ff9d4c0 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -3474,10 +3474,6 @@ static void rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t *rkcg, */ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp) { - int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; - rd_kafka_op_t *rko; - rd_kafka_q_t *rkq; - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", "Group \"%s\": delete %s [%" PRId32 "]", rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str, @@ -3487,54 +3483,7 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP); rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP; - if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) { - /* Partition is being removed from the cluster and it's stopped, - * so rktp->rktp_fetchq->rkq_fwdq is NULL. - * Purge remaining operations in rktp->rktp_fetchq->rkq_q, - * while holding lock, to avoid circular references */ - rkq = rktp->rktp_fetchq; - mtx_lock(&rkq->rkq_lock); - rd_assert(!rkq->rkq_fwdq); - - rko = TAILQ_FIRST(&rkq->rkq_q); - while (rko) { - if (rko->rko_type != RD_KAFKA_OP_BARRIER && - rko->rko_type != RD_KAFKA_OP_FETCH) { - rd_kafka_log( - rkcg->rkcg_rk, LOG_WARNING, "PARTDEL", - "Purging toppar fetch queue buffer op" - "with unexpected type: %s", - rd_kafka_op2str(rko->rko_type)); - } - - if (rko->rko_type == RD_KAFKA_OP_BARRIER) - barrier_cnt++; - else if (rko->rko_type == RD_KAFKA_OP_FETCH) - message_cnt++; - else - other_cnt++; - - rko = TAILQ_NEXT(rko, rko_link); - cnt++; - } - - mtx_unlock(&rkq->rkq_lock); - - if (cnt) { - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", - "Purge toppar fetch queue buffer " - "containing %d op(s) " - "(%d barrier(s), %d message(s), %d other)" - " to avoid " - "circular references", - cnt, barrier_cnt, message_cnt, other_cnt); - rd_kafka_q_purge(rktp->rktp_fetchq); - } else { - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", - "Not purging toppar fetch queue buffer." - " No ops present in the buffer."); - } - } + rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp); rd_kafka_toppar_unlock(rktp); diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 361dd7077a..78ac9e8d14 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -1009,7 +1009,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp, rd_kafka_toppar_unlock(rktp); } +/** + * @brief Purge internal fetch queue if toppar is stopped + * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster + * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's + * removed starting from a metadata response and stopped from a rebalance or a + * consumer close. + * + * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same + * toppar that stop destroying a consumer. + * + * @locks rd_kafka_toppar_lock() MUST be held + */ +void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp) { + rd_kafka_q_t *rkq; + rkq = rktp->rktp_fetchq; + mtx_lock(&rkq->rkq_lock); + if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE && + !rktp->rktp_fetchq->rkq_fwdq) { + rd_kafka_op_t *rko; + int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; + + /* Partition is being removed from the cluster and it's stopped, + * so rktp->rktp_fetchq->rkq_fwdq is NULL. + * Purge remaining operations in rktp->rktp_fetchq->rkq_q, + * while holding lock, to avoid circular references */ + rko = TAILQ_FIRST(&rkq->rkq_q); + while (rko) { + if (rko->rko_type != RD_KAFKA_OP_BARRIER && + rko->rko_type != RD_KAFKA_OP_FETCH) { + rd_kafka_log( + rktp->rktp_rkt->rkt_rk, LOG_WARNING, + "PARTDEL", + "Purging toppar fetch queue buffer op" + "with unexpected type: %s", + rd_kafka_op2str(rko->rko_type)); + } + + if (rko->rko_type == RD_KAFKA_OP_BARRIER) + barrier_cnt++; + else if (rko->rko_type == RD_KAFKA_OP_FETCH) + message_cnt++; + else + other_cnt++; + + rko = TAILQ_NEXT(rko, rko_link); + cnt++; + } + if (cnt) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", + "Purge toppar fetch queue buffer " + "containing %d op(s) " + "(%d barrier(s), %d message(s), %d other)" + " to avoid " + "circular references", + cnt, barrier_cnt, message_cnt, other_cnt); + rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false); + } else { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", + "Not purging toppar fetch queue buffer." + " No ops present in the buffer."); + } + } + mtx_unlock(&rkq->rkq_lock); +} /** * Helper method for purging queues when removing a toppar. diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index b74daf8e2f..98ff431769 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -648,6 +648,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp, rd_kafka_fetch_pos_t query_pos, int backoff_ms); +void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp); + int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp, int purge_flags, rd_bool_t include_xmit_msgq); From 7ce0d72c19323161826145a9c6d8419beefe2554 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 29 Oct 2024 15:40:52 +0100 Subject: [PATCH 2/2] Purge internal fetch queue on removing the partition only for the consumer --- src/rdkafka_broker.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 48b36ce698..fffb819402 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3497,7 +3497,10 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { ? topic_err : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)); - rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp); + if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) { + rd_kafka_toppar_purge_internal_fetch_queue_maybe( + rktp); + } } rd_kafka_toppar_unlock(rktp);