diff --git a/CHANGELOG.md b/CHANGELOG.md index 5542eb7c9c..34d3e357a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ librdkafka v2.3.1 is a maintenance release: * Fix pipeline inclusion of static binaries (#4666) * Fix to main loop timeout calculation leading to a tight loop for a max period of 1 ms (#4671). + * [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) + Continue partial implementation by adding a metadata cache by topic id + and updating the topic id corresponding to the partition name (#4676) ## Fixes diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index fe15ea39d3..a6fd83ad74 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -1507,7 +1507,7 @@ rd_kafka_admin_MetadataRequest(rd_kafka_broker_t *rkb, rd_kafka_replyq_t replyq, void *opaque) { return rd_kafka_MetadataRequest_resp_cb( - rkb, topics, reason, + rkb, topics, NULL, reason, rd_false /* No admin operation requires topic creation. */, include_cluster_authorized_operations, include_topic_authorized_operations, diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index d969d63927..6ed898e3bb 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -6462,10 +6462,10 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg, rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); - if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0) + if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC) return; - if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC) + if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0) return; /* diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 7365be645f..bc8e5bc5ee 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -474,12 +474,14 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_metadata_internal_t *mdi = NULL; rd_kafka_metadata_t *md = NULL; size_t rkb_namelen; - const int log_decode_errors = LOG_ERR; - rd_list_t *missing_topics = NULL; - - const rd_list_t *requested_topics = request_topics; - rd_bool_t all_topics = rd_false; - rd_bool_t cgrp_update = rd_false; + const int log_decode_errors = LOG_ERR; + rd_list_t *missing_topics = NULL; + rd_list_t *missing_topic_ids = NULL; + + const rd_list_t *requested_topics = request_topics; + const rd_list_t *requested_topic_ids = NULL; + rd_bool_t all_topics = rd_false; + rd_bool_t cgrp_update = rd_false; rd_bool_t has_reliable_leader_epochs = rd_kafka_has_reliable_leader_epochs(rkb); int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion; @@ -496,8 +498,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_bool_t compute_racks = has_client_rack; if (request) { - requested_topics = request->rkbuf_u.Metadata.topics; - all_topics = request->rkbuf_u.Metadata.all_topics; + requested_topics = request->rkbuf_u.Metadata.topics; + requested_topic_ids = request->rkbuf_u.Metadata.topic_ids; + all_topics = request->rkbuf_u.Metadata.all_topics; cgrp_update = request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp; compute_racks |= request->rkbuf_u.Metadata.force_racks; @@ -519,6 +522,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, if (requested_topics) missing_topics = rd_list_copy(requested_topics, rd_list_string_copy, NULL); + if (requested_topic_ids) + missing_topic_ids = + rd_list_copy(requested_topic_ids, rd_list_Uuid_copy, NULL); rd_kafka_broker_lock(rkb); rkb_namelen = strlen(rkb->rkb_name) + 1; @@ -833,39 +839,42 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], &mdi->topics[i]); - // TODO: Should be done for requested_topic_ids as well. - if (requested_topics) { + if (requested_topics) rd_list_free_cb(missing_topics, rd_list_remove_cmp(missing_topics, md->topics[i].topic, (void *)strcmp)); - if (!all_topics) { - /* Only update cache when not asking - * for all topics. */ - - rd_kafka_wrlock(rk); - rd_kafka_metadata_cache_topic_update( - rk, &md->topics[i], &mdi->topics[i], - rd_false /*propagate later*/, - /* use has_client_rack rather than - compute_racks. We need cached rack ids - only in case we need to rejoin the group - if they change and client.rack is set - (KIP-881). */ - has_client_rack, mdi->brokers, - md->broker_cnt); - cache_changes++; - rd_kafka_wrunlock(rk); - } + if (requested_topic_ids) + rd_list_free_cb( + missing_topic_ids, + rd_list_remove_cmp(missing_topic_ids, + &mdi->topics[i].topic_id, + (void *)rd_kafka_Uuid_ptr_cmp)); + if (!all_topics) { + /* Only update cache when not asking + * for all topics. */ + + rd_kafka_wrlock(rk); + rd_kafka_metadata_cache_topic_update( + rk, &md->topics[i], &mdi->topics[i], + rd_false /*propagate later*/, + /* use has_client_rack rather than + compute_racks. We need cached rack ids + only in case we need to rejoin the group + if they change and client.rack is set + (KIP-881). */ + has_client_rack, mdi->brokers, md->broker_cnt); + cache_changes++; + rd_kafka_wrunlock(rk); } } - // TODO: Should be done for missing_topic_ids as well. /* Requested topics not seen in metadata? Propogate to topic code. */ if (missing_topics) { char *topic; rd_rkb_dbg(rkb, TOPIC, "METADATA", - "%d/%d requested topic(s) seen in metadata", + "%d/%d requested topic(s) seen in metadata" + " (lookup by name)", rd_list_cnt(requested_topics) - rd_list_cnt(missing_topics), rd_list_cnt(requested_topics)); @@ -892,6 +901,42 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } } } + if (missing_topic_ids) { + rd_kafka_Uuid_t *topic_id; + rd_rkb_dbg(rkb, TOPIC, "METADATA", + "%d/%d requested topic(s) seen in metadata" + " (lookup by id)", + rd_list_cnt(requested_topic_ids) - + rd_list_cnt(missing_topic_ids), + rd_list_cnt(requested_topic_ids)); + for (i = 0; i < rd_list_cnt(missing_topic_ids); i++) { + rd_kafka_Uuid_t *missing_topic_id = + missing_topic_ids->rl_elems[i]; + rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s", + rd_kafka_Uuid_base64str(missing_topic_id)); + } + RD_LIST_FOREACH(topic_id, missing_topic_ids, i) { + rd_kafka_topic_t *rkt; + + rd_kafka_rdlock(rk); + rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk, + *topic_id); + rd_kafka_rdunlock(rk); + if (rkt) { + /* Received metadata response contained no + * information about topic 'rkt' and thus + * indicates the topic is not available in the + * cluster. + * Mark the topic as non-existent */ + rd_kafka_topic_wrlock(rkt); + rd_kafka_topic_set_notexists( + rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC); + rd_kafka_topic_wrunlock(rkt); + + rd_kafka_topic_destroy0(rkt); + } + } + } rd_kafka_wrlock(rkb->rkb_rk); @@ -961,12 +1006,12 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_metadata_cache_propagate_changes(rk); rd_kafka_metadata_cache_expiry_start(rk); } - - - // TODO: Should be done for requested_topic_ids as well. /* Remove cache hints for the originally requested topics. */ if (requested_topics) rd_kafka_metadata_cache_purge_hints(rk, requested_topics); + if (requested_topic_ids) + rd_kafka_metadata_cache_purge_hints_by_id(rk, + requested_topic_ids); rd_kafka_wrunlock(rkb->rkb_rk); @@ -982,7 +1027,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, * which may contain only a sub-set of the subscribed topics (namely * the effective subscription of available topics) as to not * propagate non-included topics as non-existent. */ - if (cgrp_update && (requested_topics || all_topics)) + if (cgrp_update && + (requested_topics || requested_topic_ids || all_topics)) rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp, rd_true /*do join*/); @@ -995,10 +1041,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, } done: - - // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); + if (missing_topic_ids) + rd_list_destroy(missing_topic_ids); /* This metadata request was triggered by someone wanting * the metadata information back as a reply, so send that reply now. @@ -1013,7 +1059,6 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, err_parse: err = rkbuf->rkbuf_err; err: - // TODO: Should be done for requested_topic_ids as well. if (requested_topics) { /* Failed requests shall purge cache hints for * the requested topics. */ @@ -1021,10 +1066,19 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_metadata_cache_purge_hints(rk, requested_topics); rd_kafka_wrunlock(rkb->rkb_rk); } + if (requested_topic_ids) { + /* Failed requests shall purge cache hints for + * the requested topics. */ + rd_kafka_wrlock(rkb->rkb_rk); + rd_kafka_metadata_cache_purge_hints_by_id(rk, + requested_topic_ids); + rd_kafka_wrunlock(rkb->rkb_rk); + } - // TODO: Should be done for requested_topic_ids as well. if (missing_topics) rd_list_destroy(missing_topics); + if (missing_topic_ids) + rd_list_destroy(missing_topic_ids); rd_tmpabuf_destroy(&tbuf); return err; diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 213bf2b896..2b78a16b5d 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -219,7 +219,8 @@ rd_kafka_metadata_new_topic_with_partition_replicas_mock(int replication_factor, */ struct rd_kafka_metadata_cache_entry { - rd_avl_node_t rkmce_avlnode; /* rkmc_avl */ + rd_avl_node_t rkmce_avlnode; /* rkmc_avl */ + rd_avl_node_t rkmce_avlnode_by_id; /* rkmc_avl_by_id */ TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */ rd_ts_t rkmce_ts_expires; /* Expire time */ rd_ts_t rkmce_ts_insert; /* Insert time */ @@ -243,6 +244,7 @@ struct rd_kafka_metadata_cache_entry { struct rd_kafka_metadata_cache { rd_avl_t rkmc_avl; + rd_avl_t rkmc_avl_by_id; TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry; rd_kafka_timer_t rkmc_expiry_tmr; int rkmc_cnt; @@ -269,6 +271,8 @@ struct rd_kafka_metadata_cache { int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic); +int rd_kafka_metadata_cache_delete_by_topic_id(rd_kafka_t *rk, + const rd_kafka_Uuid_t topic_id); void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk); int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts); void rd_kafka_metadata_cache_topic_update( @@ -282,8 +286,14 @@ void rd_kafka_metadata_cache_topic_update( void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk); struct rd_kafka_metadata_cache_entry * rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid); +struct rd_kafka_metadata_cache_entry * +rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk, + const rd_kafka_Uuid_t, + int valid); void rd_kafka_metadata_cache_purge_hints(rd_kafka_t *rk, const rd_list_t *topics); +void rd_kafka_metadata_cache_purge_hints_by_id(rd_kafka_t *rk, + const rd_list_t *topics); int rd_kafka_metadata_cache_hint(rd_kafka_t *rk, const rd_list_t *topics, rd_list_t *dst, diff --git a/src/rdkafka_metadata_cache.c b/src/rdkafka_metadata_cache.c index b3bad4de8d..33cc71febf 100644 --- a/src/rdkafka_metadata_cache.c +++ b/src/rdkafka_metadata_cache.c @@ -80,8 +80,14 @@ static RD_INLINE void rd_kafka_metadata_cache_delete(rd_kafka_t *rk, struct rd_kafka_metadata_cache_entry *rkmce, int unlink_avl) { - if (unlink_avl) + if (unlink_avl) { RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, rkmce); + if (!RD_KAFKA_UUID_IS_ZERO( + rkmce->rkmce_metadata_internal_topic.topic_id)) { + RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl_by_id, + rkmce); + } + } TAILQ_REMOVE(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link); rd_kafka_assert(NULL, rk->rk_metadata_cache.rkmc_cnt > 0); rk->rk_metadata_cache.rkmc_cnt--; @@ -103,6 +109,21 @@ int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic) { return rkmce ? 1 : 0; } +/** + * @brief Delete cache entry by topic id + * @locks rd_kafka_wrlock() + * @returns 1 if entry was found and removed, else 0. + */ +int rd_kafka_metadata_cache_delete_by_topic_id(rd_kafka_t *rk, + const rd_kafka_Uuid_t topic_id) { + struct rd_kafka_metadata_cache_entry *rkmce; + + rkmce = rd_kafka_metadata_cache_find_by_id(rk, topic_id, 1); + if (rkmce) + rd_kafka_metadata_cache_delete(rk, rkmce, 1); + return rkmce ? 1 : 0; +} + static int rd_kafka_metadata_cache_evict(rd_kafka_t *rk); /** @@ -221,6 +242,25 @@ rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid) { return NULL; } +/** + * @brief Find cache entry by topic id + * + * @param valid: entry must be valid (not hint) + * + * @locks rd_kafka_*lock() + */ +struct rd_kafka_metadata_cache_entry * +rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk, + const rd_kafka_Uuid_t topic_id, + int valid) { + struct rd_kafka_metadata_cache_entry skel, *rkmce; + skel.rkmce_metadata_internal_topic.topic_id = topic_id; + rkmce = RD_AVL_FIND(&rk->rk_metadata_cache.rkmc_avl_by_id, &skel); + if (rkmce && (!valid || RD_KAFKA_METADATA_CACHE_VALID(rkmce))) + return rkmce; + return NULL; +} + /** * @brief Partition (id) comparator @@ -247,7 +287,7 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( rd_bool_t include_racks, rd_kafka_metadata_broker_internal_t *brokers_internal, size_t broker_cnt) { - struct rd_kafka_metadata_cache_entry *rkmce, *old; + struct rd_kafka_metadata_cache_entry *rkmce, *old, *old_by_id = NULL; rd_tmpabuf_t tbuf; int i; @@ -350,8 +390,28 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( /* Insert (and replace existing) entry. */ old = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl, rkmce, rkmce_avlnode); - if (old) + /* Insert (and replace existing) entry into the AVL tree sorted + * by topic id. */ + if (!RD_KAFKA_UUID_IS_ZERO( + rkmce->rkmce_metadata_internal_topic.topic_id)) { + /* If topic id isn't zero insert cache entry into this tree */ + old_by_id = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl_by_id, + rkmce, rkmce_avlnode_by_id); + } else if (old && !RD_KAFKA_UUID_IS_ZERO( + old->rkmce_metadata_internal_topic.topic_id)) { + /* If it had a topic id, remove it from the tree */ + RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl_by_id, old); + } + if (old) { + /* Delete and free old cache entry */ rd_kafka_metadata_cache_delete(rk, old, 0); + } + if (old_by_id && old_by_id != old) { + /* If there was a different cache entry in this tree, + * remove and free it. */ + RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, old_by_id); + rd_kafka_metadata_cache_delete(rk, old_by_id, 0); + } /* Explicitly not freeing the tmpabuf since rkmce points to its * memory. */ @@ -414,6 +474,8 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) { * For permanent errors (authorization failures), we keep * the entry cached for metadata.max.age.ms. * + * @return 1 on metadata change, 0 when no change was applied + * * @remark The cache expiry timer will not be updated/started, * call rd_kafka_metadata_cache_expiry_start() instead. * @@ -427,24 +489,38 @@ void rd_kafka_metadata_cache_topic_update( rd_bool_t include_racks, rd_kafka_metadata_broker_internal_t *brokers, size_t broker_cnt) { - rd_ts_t now = rd_clock(); + struct rd_kafka_metadata_cache_entry *rkmce = NULL; + rd_ts_t now = rd_clock(); rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); int changed = 1; + if (unlikely(!mdt->topic)) { + rkmce = + rd_kafka_metadata_cache_find_by_id(rk, mdit->topic_id, 1); + if (!rkmce) + return; + } - /* Cache unknown topics for a short while (100ms) to allow the cgrp - * logic to find negative cache hits. */ - if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) - ts_expires = RD_MIN(ts_expires, now + (100 * 1000)); - - if (!mdt->err || - mdt->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED || - mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) - rd_kafka_metadata_cache_insert(rk, mdt, mdit, now, ts_expires, - include_racks, brokers, - broker_cnt); - else - changed = - rd_kafka_metadata_cache_delete_by_name(rk, mdt->topic); + if (!mdt->topic) { + /* Cache entry found but no topic name: + * delete it. */ + changed = rd_kafka_metadata_cache_delete_by_topic_id( + rk, mdit->topic_id); + } else { + /* Cache unknown topics for a short while (100ms) to allow the + * cgrp logic to find negative cache hits. */ + if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) + ts_expires = RD_MIN(ts_expires, now + (100 * 1000)); + + if (!mdt->err || + mdt->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED || + mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) + rd_kafka_metadata_cache_insert( + rk, mdt, mdit, now, ts_expires, include_racks, + brokers, broker_cnt); + else + changed = rd_kafka_metadata_cache_delete_by_name( + rk, mdt->topic); + } if (changed && propagate) rd_kafka_metadata_cache_propagate_changes(rk); @@ -485,6 +561,40 @@ void rd_kafka_metadata_cache_purge_hints(rd_kafka_t *rk, } } +/** + * @brief Remove cache hints for topic ids in \p topic_ids + * This is done when the Metadata response has been parsed and + * replaced hints with existing topic information, thus this will + * only remove unmatched topics from the cache. + * + * @locks rd_kafka_wrlock() + */ +void rd_kafka_metadata_cache_purge_hints_by_id(rd_kafka_t *rk, + const rd_list_t *topic_ids) { + const rd_kafka_Uuid_t *topic_id; + int i; + int cnt = 0; + + RD_LIST_FOREACH(topic_id, topic_ids, i) { + struct rd_kafka_metadata_cache_entry *rkmce; + + if (!(rkmce = rd_kafka_metadata_cache_find_by_id(rk, *topic_id, + 0 /*any*/)) || + RD_KAFKA_METADATA_CACHE_VALID(rkmce)) + continue; + + rd_kafka_metadata_cache_delete(rk, rkmce, 1 /*unlink avl*/); + cnt++; + } + + if (cnt > 0) { + rd_kafka_dbg(rk, METADATA, "METADATA", + "Purged %d/%d cached topic hint(s)", cnt, + rd_list_cnt(topic_ids)); + rd_kafka_metadata_cache_propagate_changes(rk); + } +} + /** * @brief Inserts a non-valid entry for topics in \p topics indicating @@ -589,6 +699,16 @@ static int rd_kafka_metadata_cache_entry_cmp(const void *_a, const void *_b) { return strcmp(a->rkmce_mtopic.topic, b->rkmce_mtopic.topic); } +/** + * @brief Cache entry comparator (on topic id) + */ +static int rd_kafka_metadata_cache_entry_by_id_cmp(const void *_a, + const void *_b) { + const struct rd_kafka_metadata_cache_entry *a = _a, *b = _b; + return rd_kafka_Uuid_cmp(a->rkmce_metadata_internal_topic.topic_id, + b->rkmce_metadata_internal_topic.topic_id); +} + /** * @brief Initialize the metadata cache @@ -598,6 +718,8 @@ static int rd_kafka_metadata_cache_entry_cmp(const void *_a, const void *_b) { void rd_kafka_metadata_cache_init(rd_kafka_t *rk) { rd_avl_init(&rk->rk_metadata_cache.rkmc_avl, rd_kafka_metadata_cache_entry_cmp, 0); + rd_avl_init(&rk->rk_metadata_cache.rkmc_avl_by_id, + rd_kafka_metadata_cache_entry_by_id_cmp, 0); TAILQ_INIT(&rk->rk_metadata_cache.rkmc_expiry); mtx_init(&rk->rk_metadata_cache.rkmc_full_lock, mtx_plain); mtx_init(&rk->rk_metadata_cache.rkmc_cnd_lock, mtx_plain); @@ -620,6 +742,7 @@ void rd_kafka_metadata_cache_destroy(rd_kafka_t *rk) { mtx_destroy(&rk->rk_metadata_cache.rkmc_cnd_lock); cnd_destroy(&rk->rk_metadata_cache.rkmc_cnd); rd_avl_destroy(&rk->rk_metadata_cache.rkmc_avl); + rd_avl_destroy(&rk->rk_metadata_cache.rkmc_avl_by_id); } diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 7f202a5e29..686e9c7b62 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -597,19 +597,22 @@ typedef struct rd_kafka_Uuid_s { 0, 1, "" \ } -/** - * Initialize given UUID to zero UUID. - * - * @param uuid UUID to initialize. - */ -static RD_INLINE RD_UNUSED void rd_kafka_Uuid_init(rd_kafka_Uuid_t *uuid) { - memset(uuid, 0, sizeof(*uuid)); -} - static RD_INLINE RD_UNUSED int rd_kafka_Uuid_cmp(rd_kafka_Uuid_t a, rd_kafka_Uuid_t b) { - return (a.most_significant_bits - b.most_significant_bits) || - (a.least_significant_bits - b.least_significant_bits); + if (a.most_significant_bits < b.most_significant_bits) + return -1; + if (a.most_significant_bits > b.most_significant_bits) + return 1; + if (a.least_significant_bits < b.least_significant_bits) + return -1; + if (a.least_significant_bits > b.least_significant_bits) + return 1; + return 0; +} + +static RD_INLINE RD_UNUSED int rd_kafka_Uuid_ptr_cmp(void *a, void *b) { + rd_kafka_Uuid_t *a_uuid = a, *b_uuid = b; + return rd_kafka_Uuid_cmp(*a_uuid, *b_uuid); } rd_kafka_Uuid_t rd_kafka_Uuid_random(); diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 694e061e81..1ed614f542 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2532,31 +2532,63 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, rd_kafka_op_destroy(rko); } - /** - * @brief Internal implementation of MetadataRequest (does not send). + * @brief Internal implementation of MetadataRequest. + * + * - !topics && !topic_ids: only request brokers (if supported by + * broker, else all topics) + * - topics.cnt == 0 || topic_ids.cnt == 0: all topics in cluster + * are requested + * - topics.cnt > 0 || topic_ids.cnt > 0: only specified topics + * are requested + * + * @param topics A list of topic names (char *) to request. + * @param topic_ids A list of topic ids (rd_kafka_Uuid_t *) to request. + * @param reason Metadata request reason + * @param allow_auto_create_topics Allow broker-side auto topic creation. + * This is best-effort, depending on broker + * config and version. + * @param include_cluster_authorized_operations Request for cluster + * authorized operations. + * @param include_topic_authorized_operations Request for topic + * authorized operations. + * @param cgrp_update Update cgrp in parse_Metadata (see comment there). + * @param force_racks Force partition to rack mapping computation in + * parse_Metadata (see comment there). + * @param rko (optional) rko with replyq for handling response. + * Specifying an rko forces a metadata request even if + * there is already a matching one in-transit. + * @param resp_cb Callback to be used for handling response. + * @param replyq replyq on which response is handled. + * @param force rd_true: force a full request (including all topics and + * brokers) even if there is such a request already + * in flight. + * rd_false: check if there are multiple outstanding full + * requests, and don't send one if there is already + * one present. (See note below.) + * @param opaque (optional) parameter to be passed to resp_cb. * - * @param force - rd_true: force a full request (including all topics and - * brokers) even if there is such a request already - * in flight. - * - rd_false: check if there are multiple outstanding full - * requests, and don't send one if there is already - * one present. (See note below.) + * @return Error code: + * If full metadata for all topics is requested (or + * all brokers, which results in all-topics on older brokers) and + * there is already a full request in transit then this function + * will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS, + * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. * - * If full metadata for all topics is requested (or - * all brokers, which results in all-topics on older brokers) and there is - * already a full request in transit then this function will return - * RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS otherwise RD_KAFKA_RESP_ERR_NO_ERROR. - * If \p rko is non-NULL or if \p force is true, the request is sent regardless. + * @remark Either \p topics or \p topic_ids must be set, but not both. + * If \p rko is specified, \p resp_cb, \p replyq, \p force, \p opaque + * should be NULL or rd_false. + * @remark \p include_cluster_authorized_operations and + * \p include_topic_authorized_operations should not be set unless this + * MetadataRequest is for an admin operation. * - * \p include_cluster_authorized_operations should not be set unless this - * MetadataRequest is for an admin operation. \sa - * rd_kafka_MetadataRequest_admin(). + * @sa rd_kafka_MetadataRequest(). + * @sa rd_kafka_MetadataRequest_resp_cb(). */ static rd_kafka_resp_err_t rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb, const rd_list_t *topics, - rd_list_t *topic_ids, + const rd_list_t *topic_ids, const char *reason, rd_bool_t allow_auto_create_topics, rd_bool_t include_cluster_authorized_operations, @@ -2790,48 +2822,18 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR_NO_ERROR; } - /** - * @brief Construct a MetadataRequest which uses an optional rko, and the - * default handler callback. - * @sa rd_kafka_MetadataRequest. - */ -static rd_kafka_resp_err_t -rd_kafka_MetadataRequest_op(rd_kafka_broker_t *rkb, - const rd_list_t *topics, - rd_list_t *topic_ids, - const char *reason, - rd_bool_t allow_auto_create_topics, - rd_bool_t include_cluster_authorized_operations, - rd_bool_t include_topic_authorized_operations, - rd_bool_t cgrp_update, - rd_bool_t force_racks, - rd_kafka_op_t *rko) { - return rd_kafka_MetadataRequest0( - rkb, topics, topic_ids, reason, allow_auto_create_topics, - include_cluster_authorized_operations, - include_topic_authorized_operations, cgrp_update, force_racks, rko, - /* We use the default rd_kafka_handle_Metadata rather than a custom - resp_cb */ - NULL, - /* Use default replyq which works with the default handler - rd_kafka_handle_Metadata. */ - RD_KAFKA_NO_REPLYQ, - /* If the request needs to be forced, rko_u.metadata.force will be - set. We don't provide an explicit parameter force. */ - rd_false, NULL); -} - -/** - * @brief Construct MetadataRequest (does not send) - * - * \p topics is a list of topic names (char *) to request. + * @brief Construct and enqueue a MetadataRequest * - * !topics - only request brokers (if supported by broker, else - * all topics) - * topics.cnt==0 - all topics in cluster are requested - * topics.cnt >0 - only specified topics are requested + * - !topics && !topic_ids: only request brokers (if supported by + * broker, else all topics) + * - topics.cnt == 0 || topic_ids.cnt == 0: all topics in cluster + * are requested + * - topics.cnt > 0 || topic_ids.cnt > 0: only specified topics + * are requested * + * @param topics A list of topic names (char *) to request. + * @param topic_ids A list of topic ids (rd_kafka_Uuid_t *) to request. * @param reason - metadata request reason * @param allow_auto_create_topics - allow broker-side auto topic creation. * This is best-effort, depending on broker @@ -2843,11 +2845,15 @@ rd_kafka_MetadataRequest_op(rd_kafka_broker_t *rkb, * Specifying an rko forces a metadata request even if * there is already a matching one in-transit. * - * If full metadata for all topics is requested (or - * all brokers, which results in all-topics on older brokers) and there is - * already a full request in transit then this function will return - * RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS otherwise RD_KAFKA_RESP_ERR_NO_ERROR. - * If \p rko is non-NULL, the request is sent regardless. + * @return Error code: + * If full metadata for all topics is requested (or + * all brokers, which results in all-topics on older brokers) and + * there is already a full request in transit then this function + * will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS, + * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. + * If \p rko is non-NULL, the request is sent regardless. + * + * @remark Either \p topics or \p topic_ids must be set, but not both. */ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, const rd_list_t *topics, @@ -2857,41 +2863,64 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, rd_bool_t cgrp_update, rd_bool_t force_racks, rd_kafka_op_t *rko) { - return rd_kafka_MetadataRequest_op( + return rd_kafka_MetadataRequest0( rkb, topics, topic_ids, reason, allow_auto_create_topics, - /* Cluster and Topic authorized operations are used by admin - * operations only. For non-admin operation cases, NEVER set them to - * true, since it changes the metadata max version to be 10, until - * KIP-700 can be implemented. */ - rd_false, rd_false, cgrp_update, force_racks, rko); + rd_false /*don't include cluster authorized operations*/, + rd_false /*don't include topic authorized operations*/, cgrp_update, + force_racks, rko, + /* We use the default rd_kafka_handle_Metadata rather than a custom + resp_cb */ + NULL, + /* Use default replyq which works with the default handler + rd_kafka_handle_Metadata. */ + RD_KAFKA_NO_REPLYQ, + /* If the request needs to be forced, rko_u.metadata.force will be + set. We don't provide an explicit parameter force. */ + rd_false, NULL); } - /** - * @brief Construct MetadataRequest for use with AdminAPI (does not send). + * @brief Construct and enqueue a MetadataRequest which use + * response callback \p resp_cb instead of a rko. * - * \p topics is a list of topic names (char *) to request. + * - !topics && !topic_ids: only request brokers (if supported by + * broker, else all topics) + * - topics.cnt == 0 || topic_ids.cnt == 0: all topics in cluster + * are requested + * - topics.cnt > 0 || topic_ids.cnt > 0: only specified topics + * are requested * - * !topics - only request brokers (if supported by broker, else - * all topics) - * topics.cnt==0 - all topics in cluster are requested - * topics.cnt >0 - only specified topics are requested + * @param topics A list of topic names (char *) to request. + * @param topic_ids A list of topic ids (rd_kafka_Uuid_t *) to request. + * @param reason Metadata request reason + * @param allow_auto_create_topics Allow broker-side auto topic creation. + * This is best-effort, depending on broker + * config and version. + * @param include_cluster_authorized_operations Request for cluster + * authorized operations. + * @param include_topic_authorized_operations Request for topic + * authorized operations. + * @param cgrp_update Update cgrp in parse_Metadata (see comment there). + * @param force_racks Force partition to rack mapping computation in + * parse_Metadata (see comment there). + * @param resp_cb Callback to be used for handling response. + * @param replyq replyq on which response is handled. + * @param force Force request even if in progress. + * @param opaque (optional) parameter to be passed to resp_cb. * - * @param reason - metadata request reason - * @param include_cluster_authorized_operations - request for cluster - * authorized operations. - * @param include_topic_authorized_operations - request for topic authorized - * operations. - * @param cgrp_update - Update cgrp in parse_Metadata (see comment there). - * @param force_racks - Force partition to rack mapping computation in - * parse_Metadata (see comment there). - * @param resp_cb - callback to be used for handling response. - * @param replyq - replyq on which response is handled. - * @param opaque - (optional) parameter to be passed to resp_cb. + * @return Error code: + * If full metadata for all topics is requested (or + * all brokers, which results in all-topics on older brokers) and + * there is already a full request in transit then this function + * will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS, + * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. + * + * @remark Either \p topics or \p topic_ids must be set, but not both. */ rd_kafka_resp_err_t rd_kafka_MetadataRequest_resp_cb( rd_kafka_broker_t *rkb, const rd_list_t *topics, + const rd_list_t *topics_ids, const char *reason, rd_bool_t allow_auto_create_topics, rd_bool_t include_cluster_authorized_operations, @@ -2903,11 +2932,10 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest_resp_cb( rd_bool_t force, void *opaque) { return rd_kafka_MetadataRequest0( - rkb, topics, NULL, reason, allow_auto_create_topics, + rkb, topics, topics_ids, reason, allow_auto_create_topics, include_cluster_authorized_operations, include_topic_authorized_operations, cgrp_update, force_racks, - NULL /* No op - using custom resp_cb. */, resp_cb, replyq, - rd_true /* Admin operation metadata requests are always forced. */, + NULL /* No op - using custom resp_cb. */, resp_cb, replyq, force, opaque); } diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index bbb3b747b9..9852759682 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -314,6 +314,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, rd_kafka_resp_err_t rd_kafka_MetadataRequest_resp_cb( rd_kafka_broker_t *rkb, const rd_list_t *topics, + const rd_list_t *topic_ids, const char *reason, rd_bool_t allow_auto_create_topics, rd_bool_t include_cluster_authorized_operations,