From 802ba712ac9a8190539356bbfd294e103199e40b Mon Sep 17 00:00:00 2001 From: hui lai Date: Tue, 3 Dec 2024 21:34:49 +0800 Subject: [PATCH] [fix](third party) fix hang when destroy of rdkafka instances (#44913) Related PR: https://github.com/confluentinc/librdkafka/pull/4724 --- thirdparty/patches/librdkafka-1.9.2.patch | 111 +++++++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/thirdparty/patches/librdkafka-1.9.2.patch b/thirdparty/patches/librdkafka-1.9.2.patch index b13e740bc5c36a..3caac08f79dacd 100644 --- a/thirdparty/patches/librdkafka-1.9.2.patch +++ b/thirdparty/patches/librdkafka-1.9.2.patch @@ -67,7 +67,19 @@ --- src/rdkafka_broker.c +++ src/rdkafka_broker.c -@@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) { +@@ -3288,6 +3288,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { + : (topic_err + ? topic_err + : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)); ++ ++ if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) { ++ rd_kafka_toppar_purge_internal_fetch_queue_maybe( ++ rktp); ++ } + } + + rd_kafka_toppar_unlock(rktp); +@@ -5461,7 +5466,9 @@ static int rd_kafka_broker_thread_main(void *arg) { */ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) { @@ -78,3 +90,100 @@ rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors)); rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs)); rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs)); +--- src/rdkafka_cgrp.c ++++ src/rdkafka_cgrp.c +@@ -2734,6 +2734,9 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, + rd_kafka_toppar_lock(rktp); + rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP); + rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP; ++ ++ rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp); ++ + rd_kafka_toppar_unlock(rktp); + + rd_list_remove(&rkcg->rkcg_toppars, rktp); +--- src/rdkafka_partition.c ++++ src/rdkafka_partition.c +@@ -959,7 +959,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp, + rd_kafka_toppar_unlock(rktp); + } + ++/** ++ * @brief Purge internal fetch queue if toppar is stopped ++ * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster ++ * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's ++ * removed starting from a metadata response and stopped from a rebalance or a ++ * consumer close. ++ * ++ * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same ++ * toppar that stop destroying a consumer. ++ * ++ * @locks rd_kafka_toppar_lock() MUST be held ++ */ ++void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp) { ++ rd_kafka_q_t *rkq; ++ rkq = rktp->rktp_fetchq; ++ mtx_lock(&rkq->rkq_lock); ++ if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE && ++ !rktp->rktp_fetchq->rkq_fwdq) { ++ rd_kafka_op_t *rko; ++ int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; ++ ++ /* Partition is being removed from the cluster and it's stopped, ++ * so rktp->rktp_fetchq->rkq_fwdq is NULL. ++ * Purge remaining operations in rktp->rktp_fetchq->rkq_q, ++ * while holding lock, to avoid circular references */ ++ rko = TAILQ_FIRST(&rkq->rkq_q); ++ while (rko) { ++ if (rko->rko_type != RD_KAFKA_OP_BARRIER && ++ rko->rko_type != RD_KAFKA_OP_FETCH) { ++ rd_kafka_log( ++ rktp->rktp_rkt->rkt_rk, LOG_WARNING, ++ "PARTDEL", ++ "Purging toppar fetch queue buffer op" ++ "with unexpected type: %s", ++ rd_kafka_op2str(rko->rko_type)); ++ } ++ ++ if (rko->rko_type == RD_KAFKA_OP_BARRIER) ++ barrier_cnt++; ++ else if (rko->rko_type == RD_KAFKA_OP_FETCH) ++ message_cnt++; ++ else ++ other_cnt++; + ++ rko = TAILQ_NEXT(rko, rko_link); ++ cnt++; ++ } ++ ++ if (cnt) { ++ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", ++ "Purge toppar fetch queue buffer " ++ "containing %d op(s) " ++ "(%d barrier(s), %d message(s), %d other)" ++ " to avoid " ++ "circular references", ++ cnt, barrier_cnt, message_cnt, other_cnt); ++ rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false); ++ } else { ++ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", ++ "Not purging toppar fetch queue buffer." ++ " No ops present in the buffer."); ++ } ++ } ++ mtx_unlock(&rkq->rkq_lock); ++} + + /** + * Helper method for purging queues when removing a toppar. +--- src/rdkafka_partition.h ++++ src/rdkafka_partition.h +@@ -541,6 +541,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp, + int64_t query_offset, + int backoff_ms); + ++void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp); ++ + int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp, + int purge_flags, + rd_bool_t include_xmit_msgq);