Skip to content

Commit

Permalink
[fix](third party) fix hang when destroy of rdkafka instances (#44913)
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Dec 3, 2024
1 parent 8bc3c43 commit 802ba71
Showing 1 changed file with 110 additions and 1 deletion.
111 changes: 110 additions & 1 deletion thirdparty/patches/librdkafka-1.9.2.patch
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,19 @@

--- src/rdkafka_broker.c
+++ src/rdkafka_broker.c
@@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
@@ -3288,6 +3288,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) {
: (topic_err
? topic_err
: RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION));
+
+ if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
+ rd_kafka_toppar_purge_internal_fetch_queue_maybe(
+ rktp);
+ }
}

rd_kafka_toppar_unlock(rktp);
@@ -5461,7 +5466,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
*/
void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {

Expand All @@ -78,3 +90,100 @@
rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
--- src/rdkafka_cgrp.c
+++ src/rdkafka_cgrp.c
@@ -2734,6 +2734,9 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg,
rd_kafka_toppar_lock(rktp);
rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
+
+ rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp);
+
rd_kafka_toppar_unlock(rktp);

rd_list_remove(&rkcg->rkcg_toppars, rktp);
--- src/rdkafka_partition.c
+++ src/rdkafka_partition.c
@@ -959,7 +959,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp,
rd_kafka_toppar_unlock(rktp);
}

+/**
+ * @brief Purge internal fetch queue if toppar is stopped
+ * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster
+ * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's
+ * removed starting from a metadata response and stopped from a rebalance or a
+ * consumer close.
+ *
+ * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same
+ * toppar that stop destroying a consumer.
+ *
+ * @locks rd_kafka_toppar_lock() MUST be held
+ */
+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp) {
+ rd_kafka_q_t *rkq;
+ rkq = rktp->rktp_fetchq;
+ mtx_lock(&rkq->rkq_lock);
+ if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE &&
+ !rktp->rktp_fetchq->rkq_fwdq) {
+ rd_kafka_op_t *rko;
+ int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
+
+ /* Partition is being removed from the cluster and it's stopped,
+ * so rktp->rktp_fetchq->rkq_fwdq is NULL.
+ * Purge remaining operations in rktp->rktp_fetchq->rkq_q,
+ * while holding lock, to avoid circular references */
+ rko = TAILQ_FIRST(&rkq->rkq_q);
+ while (rko) {
+ if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
+ rko->rko_type != RD_KAFKA_OP_FETCH) {
+ rd_kafka_log(
+ rktp->rktp_rkt->rkt_rk, LOG_WARNING,
+ "PARTDEL",
+ "Purging toppar fetch queue buffer op"
+ "with unexpected type: %s",
+ rd_kafka_op2str(rko->rko_type));
+ }
+
+ if (rko->rko_type == RD_KAFKA_OP_BARRIER)
+ barrier_cnt++;
+ else if (rko->rko_type == RD_KAFKA_OP_FETCH)
+ message_cnt++;
+ else
+ other_cnt++;

+ rko = TAILQ_NEXT(rko, rko_link);
+ cnt++;
+ }
+
+ if (cnt) {
+ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
+ "Purge toppar fetch queue buffer "
+ "containing %d op(s) "
+ "(%d barrier(s), %d message(s), %d other)"
+ " to avoid "
+ "circular references",
+ cnt, barrier_cnt, message_cnt, other_cnt);
+ rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false);
+ } else {
+ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
+ "Not purging toppar fetch queue buffer."
+ " No ops present in the buffer.");
+ }
+ }
+ mtx_unlock(&rkq->rkq_lock);
+}

/**
* Helper method for purging queues when removing a toppar.
--- src/rdkafka_partition.h
+++ src/rdkafka_partition.h
@@ -541,6 +541,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp,
int64_t query_offset,
int backoff_ms);

+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp);
+
int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp,
int purge_flags,
rd_bool_t include_xmit_msgq);

0 comments on commit 802ba71

Please sign in to comment.