diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b7dc1b04..56495ca39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,10 @@ librdkafka v2.6.1 is a maintenance release: under some particular conditions (#4800). * Fix for retrieving offset commit metadata when it contains zeros and configured with `strndup` (#4876) - * Fix for a loop of ListOffset requests, happening in a Fetch From Follower - scenario, if such request is made to the follower (#4616, #4754, @kphelps). +* Fix for a loop of ListOffset requests, happening in a Fetch From Follower + scenario, if such request is made to the follower (#4616, #4754, @kphelps). +* Fix to remove fetch queue messages that blocked the destroy of rdkafka + instances (#4724) ## Fixes @@ -40,6 +42,14 @@ librdkafka v2.6.1 is a maintenance release: "Not leader for partition" error. Fixed by sending the request always to the leader. Happening since 1.5.0 (tested version) or previous ones (#4616, #4754, @kphelps). +* Issues: + Fix to remove fetch queue messages that blocked the destroy of rdkafka + instances. Circular dependencies from a partition fetch queue message to + the same partition blocked the destroy of an instance, that happened + in case the partition was removed from the cluster while it was being + consumed. Solved by purging internal partition queue, after being stopped + and removed, to allow reference count to reach zero and trigger a destroy. + Happening since 2.0.2 (#4724). diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 25b8c14d1..fffb81940 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3496,6 +3496,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { : (topic_err ? topic_err : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)); + + if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) { + rd_kafka_toppar_purge_internal_fetch_queue_maybe( + rktp); + } } rd_kafka_toppar_unlock(rktp); diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 1faec81f8..741bcf9fa 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -3474,10 +3474,6 @@ static void rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t *rkcg, */ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp) { - int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; - rd_kafka_op_t *rko; - rd_kafka_q_t *rkq; - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", "Group \"%s\": delete %s [%" PRId32 "]", rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str, @@ -3487,54 +3483,7 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP); rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP; - if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) { - /* Partition is being removed from the cluster and it's stopped, - * so rktp->rktp_fetchq->rkq_fwdq is NULL. - * Purge remaining operations in rktp->rktp_fetchq->rkq_q, - * while holding lock, to avoid circular references */ - rkq = rktp->rktp_fetchq; - mtx_lock(&rkq->rkq_lock); - rd_assert(!rkq->rkq_fwdq); - - rko = TAILQ_FIRST(&rkq->rkq_q); - while (rko) { - if (rko->rko_type != RD_KAFKA_OP_BARRIER && - rko->rko_type != RD_KAFKA_OP_FETCH) { - rd_kafka_log( - rkcg->rkcg_rk, LOG_WARNING, "PARTDEL", - "Purging toppar fetch queue buffer op" - "with unexpected type: %s", - rd_kafka_op2str(rko->rko_type)); - } - - if (rko->rko_type == RD_KAFKA_OP_BARRIER) - barrier_cnt++; - else if (rko->rko_type == RD_KAFKA_OP_FETCH) - message_cnt++; - else - other_cnt++; - - rko = TAILQ_NEXT(rko, rko_link); - cnt++; - } - - mtx_unlock(&rkq->rkq_lock); - - if (cnt) { - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", - "Purge toppar fetch queue buffer " - "containing %d op(s) " - "(%d barrier(s), %d message(s), %d other)" - " to avoid " - "circular references", - cnt, barrier_cnt, message_cnt, other_cnt); - rd_kafka_q_purge(rktp->rktp_fetchq); - } else { - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", - "Not purging toppar fetch queue buffer." - " No ops present in the buffer."); - } - } + rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp); rd_kafka_toppar_unlock(rktp); diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 361dd7077..78ac9e8d1 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -1009,7 +1009,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp, rd_kafka_toppar_unlock(rktp); } +/** + * @brief Purge internal fetch queue if toppar is stopped + * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster + * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's + * removed starting from a metadata response and stopped from a rebalance or a + * consumer close. + * + * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same + * toppar that stop destroying a consumer. + * + * @locks rd_kafka_toppar_lock() MUST be held + */ +void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp) { + rd_kafka_q_t *rkq; + rkq = rktp->rktp_fetchq; + mtx_lock(&rkq->rkq_lock); + if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE && + !rktp->rktp_fetchq->rkq_fwdq) { + rd_kafka_op_t *rko; + int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; + + /* Partition is being removed from the cluster and it's stopped, + * so rktp->rktp_fetchq->rkq_fwdq is NULL. + * Purge remaining operations in rktp->rktp_fetchq->rkq_q, + * while holding lock, to avoid circular references */ + rko = TAILQ_FIRST(&rkq->rkq_q); + while (rko) { + if (rko->rko_type != RD_KAFKA_OP_BARRIER && + rko->rko_type != RD_KAFKA_OP_FETCH) { + rd_kafka_log( + rktp->rktp_rkt->rkt_rk, LOG_WARNING, + "PARTDEL", + "Purging toppar fetch queue buffer op" + "with unexpected type: %s", + rd_kafka_op2str(rko->rko_type)); + } + + if (rko->rko_type == RD_KAFKA_OP_BARRIER) + barrier_cnt++; + else if (rko->rko_type == RD_KAFKA_OP_FETCH) + message_cnt++; + else + other_cnt++; + + rko = TAILQ_NEXT(rko, rko_link); + cnt++; + } + if (cnt) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", + "Purge toppar fetch queue buffer " + "containing %d op(s) " + "(%d barrier(s), %d message(s), %d other)" + " to avoid " + "circular references", + cnt, barrier_cnt, message_cnt, other_cnt); + rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false); + } else { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", + "Not purging toppar fetch queue buffer." + " No ops present in the buffer."); + } + } + mtx_unlock(&rkq->rkq_lock); +} /** * Helper method for purging queues when removing a toppar. diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index b74daf8e2..98ff43176 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -648,6 +648,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp, rd_kafka_fetch_pos_t query_pos, int backoff_ms); +void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp); + int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp, int purge_flags, rd_bool_t include_xmit_msgq);