From ccb635d3f8d5bbb1c47750c5779d2e8db9edb393 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 30 Oct 2024 13:44:46 +0100 Subject: [PATCH] Fix to remove fetch queue messages that blocked the destroy of rdkafka instances (#4724) Circular dependencies from a partition fetch queue message to the same partition blocked the destroy of an instance, that happened in case the partition was removed from the cluster while it was being consumed. Solved by purging internal partition queue, after being stopped and removed, to allow reference count to reach zero and trigger a destroy. Purging internal fetch queue on removing the partition only for the consumer. --- CHANGELOG.md | 14 +++++++-- src/rdkafka_broker.c | 5 ++++ src/rdkafka_cgrp.c | 53 +--------------------------------- src/rdkafka_partition.c | 64 +++++++++++++++++++++++++++++++++++++++++ src/rdkafka_partition.h | 2 ++ 5 files changed, 84 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b7dc1b04f..56495ca393 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,10 @@ librdkafka v2.6.1 is a maintenance release: under some particular conditions (#4800). * Fix for retrieving offset commit metadata when it contains zeros and configured with `strndup` (#4876) - * Fix for a loop of ListOffset requests, happening in a Fetch From Follower - scenario, if such request is made to the follower (#4616, #4754, @kphelps). +* Fix for a loop of ListOffset requests, happening in a Fetch From Follower + scenario, if such request is made to the follower (#4616, #4754, @kphelps). +* Fix to remove fetch queue messages that blocked the destroy of rdkafka + instances (#4724) ## Fixes @@ -40,6 +42,14 @@ librdkafka v2.6.1 is a maintenance release: "Not leader for partition" error. Fixed by sending the request always to the leader. Happening since 1.5.0 (tested version) or previous ones (#4616, #4754, @kphelps). +* Issues: + Fix to remove fetch queue messages that blocked the destroy of rdkafka + instances. Circular dependencies from a partition fetch queue message to + the same partition blocked the destroy of an instance, that happened + in case the partition was removed from the cluster while it was being + consumed. Solved by purging internal partition queue, after being stopped + and removed, to allow reference count to reach zero and trigger a destroy. + Happening since 2.0.2 (#4724). diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 25b8c14d1f..fffb819402 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3496,6 +3496,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { : (topic_err ? topic_err : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)); + + if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) { + rd_kafka_toppar_purge_internal_fetch_queue_maybe( + rktp); + } } rd_kafka_toppar_unlock(rktp); diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 1faec81f85..741bcf9fa0 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -3474,10 +3474,6 @@ static void rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t *rkcg, */ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp) { - int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; - rd_kafka_op_t *rko; - rd_kafka_q_t *rkq; - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", "Group \"%s\": delete %s [%" PRId32 "]", rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str, @@ -3487,54 +3483,7 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP); rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP; - if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) { - /* Partition is being removed from the cluster and it's stopped, - * so rktp->rktp_fetchq->rkq_fwdq is NULL. - * Purge remaining operations in rktp->rktp_fetchq->rkq_q, - * while holding lock, to avoid circular references */ - rkq = rktp->rktp_fetchq; - mtx_lock(&rkq->rkq_lock); - rd_assert(!rkq->rkq_fwdq); - - rko = TAILQ_FIRST(&rkq->rkq_q); - while (rko) { - if (rko->rko_type != RD_KAFKA_OP_BARRIER && - rko->rko_type != RD_KAFKA_OP_FETCH) { - rd_kafka_log( - rkcg->rkcg_rk, LOG_WARNING, "PARTDEL", - "Purging toppar fetch queue buffer op" - "with unexpected type: %s", - rd_kafka_op2str(rko->rko_type)); - } - - if (rko->rko_type == RD_KAFKA_OP_BARRIER) - barrier_cnt++; - else if (rko->rko_type == RD_KAFKA_OP_FETCH) - message_cnt++; - else - other_cnt++; - - rko = TAILQ_NEXT(rko, rko_link); - cnt++; - } - - mtx_unlock(&rkq->rkq_lock); - - if (cnt) { - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", - "Purge toppar fetch queue buffer " - "containing %d op(s) " - "(%d barrier(s), %d message(s), %d other)" - " to avoid " - "circular references", - cnt, barrier_cnt, message_cnt, other_cnt); - rd_kafka_q_purge(rktp->rktp_fetchq); - } else { - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", - "Not purging toppar fetch queue buffer." - " No ops present in the buffer."); - } - } + rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp); rd_kafka_toppar_unlock(rktp); diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 361dd7077a..78ac9e8d14 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -1009,7 +1009,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp, rd_kafka_toppar_unlock(rktp); } +/** + * @brief Purge internal fetch queue if toppar is stopped + * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster + * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's + * removed starting from a metadata response and stopped from a rebalance or a + * consumer close. + * + * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same + * toppar that stop destroying a consumer. + * + * @locks rd_kafka_toppar_lock() MUST be held + */ +void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp) { + rd_kafka_q_t *rkq; + rkq = rktp->rktp_fetchq; + mtx_lock(&rkq->rkq_lock); + if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE && + !rktp->rktp_fetchq->rkq_fwdq) { + rd_kafka_op_t *rko; + int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; + + /* Partition is being removed from the cluster and it's stopped, + * so rktp->rktp_fetchq->rkq_fwdq is NULL. + * Purge remaining operations in rktp->rktp_fetchq->rkq_q, + * while holding lock, to avoid circular references */ + rko = TAILQ_FIRST(&rkq->rkq_q); + while (rko) { + if (rko->rko_type != RD_KAFKA_OP_BARRIER && + rko->rko_type != RD_KAFKA_OP_FETCH) { + rd_kafka_log( + rktp->rktp_rkt->rkt_rk, LOG_WARNING, + "PARTDEL", + "Purging toppar fetch queue buffer op" + "with unexpected type: %s", + rd_kafka_op2str(rko->rko_type)); + } + + if (rko->rko_type == RD_KAFKA_OP_BARRIER) + barrier_cnt++; + else if (rko->rko_type == RD_KAFKA_OP_FETCH) + message_cnt++; + else + other_cnt++; + + rko = TAILQ_NEXT(rko, rko_link); + cnt++; + } + if (cnt) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", + "Purge toppar fetch queue buffer " + "containing %d op(s) " + "(%d barrier(s), %d message(s), %d other)" + " to avoid " + "circular references", + cnt, barrier_cnt, message_cnt, other_cnt); + rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false); + } else { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", + "Not purging toppar fetch queue buffer." + " No ops present in the buffer."); + } + } + mtx_unlock(&rkq->rkq_lock); +} /** * Helper method for purging queues when removing a toppar. diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index b74daf8e2f..98ff431769 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -648,6 +648,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp, rd_kafka_fetch_pos_t query_pos, int backoff_ms); +void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp); + int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp, int purge_flags, rd_bool_t include_xmit_msgq);