Skip to content

Commit

Permalink
Don't destroy metadata cache until brokers are destroyed (confluentin…
Browse files Browse the repository at this point in the history
…c#3279)

As ops on broker queues may have references to the metadata cache's mutex

(cherry picked from commit 51c49f6)
  • Loading branch information
edenhill authored and azat committed Aug 19, 2021
1 parent 43491d3 commit b8554f1
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 8 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# librdkafka NEXT

## Fixes

### General fixes

* Fix accesses to freed metadata cache mutexes on client termination (#3279)


# librdkafka v1.6.1

librdkafka v1.6.1 is a maintenance release.
Expand Down
15 changes: 13 additions & 2 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,14 @@ static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
/* Destroy the coordinator cache */
rd_kafka_coord_cache_destroy(&rk->rk_coord_cache);

/* Destroy metadata cache */
rd_kafka_metadata_cache_destroy(rk);
/* Purge metadata cache.
* #3279:
* We mustn't call cache_destroy() here since there might be outstanding
* broker rkos that hold references to the metadata cache lock,
* and these brokers are destroyed below. So to avoid a circular
* dependency refcnt deadlock we first purge the cache here
* and destroy it after the brokers are destroyed. */
rd_kafka_metadata_cache_purge(rk, rd_true/*observers too*/);

rd_kafka_wrunlock(rk);

Expand Down Expand Up @@ -1292,6 +1298,11 @@ static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
rd_assert(!*"All mock clusters must be destroyed prior to "
"rd_kafka_t destroy");
}

/* Destroy metadata cache */
rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_destroy(rk);
rd_kafka_wrunlock(rk);
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk);

void rd_kafka_metadata_cache_init (rd_kafka_t *rk);
void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk);
void rd_kafka_metadata_cache_purge (rd_kafka_t *rk, rd_bool_t purge_observers);
int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms);
void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk);

Expand Down
15 changes: 9 additions & 6 deletions src/rdkafka_metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ rd_kafka_metadata_cache_insert (rd_kafka_t *rk,
/**
* @brief Purge the metadata cache
*
* @locks rd_kafka_wrlock()
* @locks_required rd_kafka_wrlock()
*/
static void rd_kafka_metadata_cache_purge (rd_kafka_t *rk) {
void rd_kafka_metadata_cache_purge (rd_kafka_t *rk, rd_bool_t purge_observers) {
struct rd_kafka_metadata_cache_entry *rkmce;
int was_empty = TAILQ_EMPTY(&rk->rk_metadata_cache.rkmc_expiry);

Expand All @@ -281,6 +281,9 @@ static void rd_kafka_metadata_cache_purge (rd_kafka_t *rk) {

if (!was_empty)
rd_kafka_metadata_cache_propagate_changes(rk);

if (purge_observers)
rd_list_clear(&rk->rk_metadata_cache.rkmc_observers);
}


Expand Down Expand Up @@ -369,7 +372,7 @@ void rd_kafka_metadata_cache_update (rd_kafka_t *rk,
md->topic_cnt);

if (abs_update)
rd_kafka_metadata_cache_purge(rk);
rd_kafka_metadata_cache_purge(rk, rd_false/*not observers*/);


for (i = 0 ; i < md->topic_cnt ; i++)
Expand Down Expand Up @@ -545,15 +548,15 @@ void rd_kafka_metadata_cache_init (rd_kafka_t *rk) {
}

/**
* @brief Purge and destroy metadata cache
* @brief Purge and destroy metadata cache.
*
* @locks rd_kafka_wrlock()
* @locks_required rd_kafka_wrlock()
*/
void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk) {
rd_list_destroy(&rk->rk_metadata_cache.rkmc_observers);
rd_kafka_timer_stop(&rk->rk_timers,
&rk->rk_metadata_cache.rkmc_query_tmr, 1/*lock*/);
rd_kafka_metadata_cache_purge(rk);
rd_kafka_metadata_cache_purge(rk, rd_true/*observers too*/);
mtx_destroy(&rk->rk_metadata_cache.rkmc_full_lock);
mtx_destroy(&rk->rk_metadata_cache.rkmc_cnd_lock);
cnd_destroy(&rk->rk_metadata_cache.rkmc_cnd);
Expand Down

0 comments on commit b8554f1

Please sign in to comment.