diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e83e029db..a5ff101fd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# librdkafka NEXT + +## Fixes + +### General fixes + + * Fix accesses to freed metadata cache mutexes on client termination (#3279) + + # librdkafka v1.6.1 librdkafka v1.6.1 is a maintenance release. diff --git a/src/rdkafka.c b/src/rdkafka.c index c2e264bd07..21dd9cd68a 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -1223,8 +1223,14 @@ static void rd_kafka_destroy_internal (rd_kafka_t *rk) { /* Destroy the coordinator cache */ rd_kafka_coord_cache_destroy(&rk->rk_coord_cache); - /* Destroy metadata cache */ - rd_kafka_metadata_cache_destroy(rk); + /* Purge metadata cache. + * #3279: + * We mustn't call cache_destroy() here since there might be outstanding + * broker rkos that hold references to the metadata cache lock, + * and these brokers are destroyed below. So to avoid a circular + * dependency refcnt deadlock we first purge the cache here + * and destroy it after the brokers are destroyed. */ + rd_kafka_metadata_cache_purge(rk, rd_true/*observers too*/); rd_kafka_wrunlock(rk); @@ -1292,6 +1298,11 @@ static void rd_kafka_destroy_internal (rd_kafka_t *rk) { rd_assert(!*"All mock clusters must be destroyed prior to " "rd_kafka_t destroy"); } + + /* Destroy metadata cache */ + rd_kafka_wrlock(rk); + rd_kafka_metadata_cache_destroy(rk); + rd_kafka_wrunlock(rk); } /** diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 4b849d7512..8dad539986 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -184,6 +184,7 @@ void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk); void rd_kafka_metadata_cache_init (rd_kafka_t *rk); void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk); +void rd_kafka_metadata_cache_purge (rd_kafka_t *rk, rd_bool_t purge_observers); int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms); void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk); diff --git a/src/rdkafka_metadata_cache.c b/src/rdkafka_metadata_cache.c index 0569423a0f..a08a5abc8a 100644 --- a/src/rdkafka_metadata_cache.c +++ b/src/rdkafka_metadata_cache.c @@ -267,9 +267,9 @@ rd_kafka_metadata_cache_insert (rd_kafka_t *rk, /** * @brief Purge the metadata cache * - * @locks rd_kafka_wrlock() + * @locks_required rd_kafka_wrlock() */ -static void rd_kafka_metadata_cache_purge (rd_kafka_t *rk) { +void rd_kafka_metadata_cache_purge (rd_kafka_t *rk, rd_bool_t purge_observers) { struct rd_kafka_metadata_cache_entry *rkmce; int was_empty = TAILQ_EMPTY(&rk->rk_metadata_cache.rkmc_expiry); @@ -281,6 +281,9 @@ static void rd_kafka_metadata_cache_purge (rd_kafka_t *rk) { if (!was_empty) rd_kafka_metadata_cache_propagate_changes(rk); + + if (purge_observers) + rd_list_clear(&rk->rk_metadata_cache.rkmc_observers); } @@ -369,7 +372,7 @@ void rd_kafka_metadata_cache_update (rd_kafka_t *rk, md->topic_cnt); if (abs_update) - rd_kafka_metadata_cache_purge(rk); + rd_kafka_metadata_cache_purge(rk, rd_false/*not observers*/); for (i = 0 ; i < md->topic_cnt ; i++) @@ -545,15 +548,15 @@ void rd_kafka_metadata_cache_init (rd_kafka_t *rk) { } /** - * @brief Purge and destroy metadata cache + * @brief Purge and destroy metadata cache. * - * @locks rd_kafka_wrlock() + * @locks_required rd_kafka_wrlock() */ void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk) { rd_list_destroy(&rk->rk_metadata_cache.rkmc_observers); rd_kafka_timer_stop(&rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr, 1/*lock*/); - rd_kafka_metadata_cache_purge(rk); + rd_kafka_metadata_cache_purge(rk, rd_true/*observers too*/); mtx_destroy(&rk->rk_metadata_cache.rkmc_full_lock); mtx_destroy(&rk->rk_metadata_cache.rkmc_cnd_lock); cnd_destroy(&rk->rk_metadata_cache.rkmc_cnd);