Skip to content

Commit

Permalink
Metadata cache by topic id (#4676)
Browse files Browse the repository at this point in the history
improve documentation of Metadata functions
  • Loading branch information
emasab committed Apr 15, 2024
1 parent f4ca373 commit 414fb34
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 160 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ librdkafka v2.4.0 is a feature release:
max period of 1 ms (#4671).
* Fixed a bug causing duplicate message consumption from a stale
fetch start offset in some particular cases (#4636)
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)


## Upgrade considerations
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ rd_kafka_admin_MetadataRequest(rd_kafka_broker_t *rkb,
rd_kafka_replyq_t replyq,
void *opaque) {
return rd_kafka_MetadataRequest_resp_cb(
rkb, topics, reason,
rkb, topics, NULL, reason,
rd_false /* No admin operation requires topic creation. */,
include_cluster_authorized_operations,
include_topic_authorized_operations,
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -6462,10 +6462,10 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,

rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));

if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC)
return;

if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC)
if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
return;

/*
Expand Down
132 changes: 94 additions & 38 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,14 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_internal_t *mdi = NULL;
rd_kafka_metadata_t *md = NULL;
size_t rkb_namelen;
const int log_decode_errors = LOG_ERR;
rd_list_t *missing_topics = NULL;

const rd_list_t *requested_topics = request_topics;
rd_bool_t all_topics = rd_false;
rd_bool_t cgrp_update = rd_false;
const int log_decode_errors = LOG_ERR;
rd_list_t *missing_topics = NULL;
rd_list_t *missing_topic_ids = NULL;

const rd_list_t *requested_topics = request_topics;
const rd_list_t *requested_topic_ids = NULL;
rd_bool_t all_topics = rd_false;
rd_bool_t cgrp_update = rd_false;
rd_bool_t has_reliable_leader_epochs =
rd_kafka_has_reliable_leader_epochs(rkb);
int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
Expand All @@ -496,8 +498,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_bool_t compute_racks = has_client_rack;

if (request) {
requested_topics = request->rkbuf_u.Metadata.topics;
all_topics = request->rkbuf_u.Metadata.all_topics;
requested_topics = request->rkbuf_u.Metadata.topics;
requested_topic_ids = request->rkbuf_u.Metadata.topic_ids;
all_topics = request->rkbuf_u.Metadata.all_topics;
cgrp_update =
request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp;
compute_racks |= request->rkbuf_u.Metadata.force_racks;
Expand All @@ -519,6 +522,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
if (requested_topics)
missing_topics =
rd_list_copy(requested_topics, rd_list_string_copy, NULL);
if (requested_topic_ids)
missing_topic_ids =
rd_list_copy(requested_topic_ids, rd_list_Uuid_copy, NULL);

rd_kafka_broker_lock(rkb);
rkb_namelen = strlen(rkb->rkb_name) + 1;
Expand Down Expand Up @@ -635,6 +641,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,

if (ApiVersion >= 10) {
rd_kafka_buf_read_uuid(rkbuf, &mdi->topics[i].topic_id);
} else {
mdi->topics[i].topic_id = RD_KAFKA_UUID_ZERO;
}

if (ApiVersion >= 1)
Expand Down Expand Up @@ -831,39 +839,42 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i],
&mdi->topics[i]);

// TODO: Should be done for requested_topic_ids as well.
if (requested_topics) {
if (requested_topics)
rd_list_free_cb(missing_topics,
rd_list_remove_cmp(missing_topics,
md->topics[i].topic,
(void *)strcmp));
if (!all_topics) {
/* Only update cache when not asking
* for all topics. */

rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
if (requested_topic_ids)
rd_list_free_cb(
missing_topic_ids,
rd_list_remove_cmp(missing_topic_ids,
&mdi->topics[i].topic_id,
(void *)rd_kafka_Uuid_ptr_cmp));
if (!all_topics) {
/* Only update cache when not asking
* for all topics. */

rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers, md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
}

// TODO: Should be done for missing_topic_ids as well.
/* Requested topics not seen in metadata? Propogate to topic code. */
if (missing_topics) {
char *topic;
rd_rkb_dbg(rkb, TOPIC, "METADATA",
"%d/%d requested topic(s) seen in metadata",
"%d/%d requested topic(s) seen in metadata"
" (lookup by name)",
rd_list_cnt(requested_topics) -
rd_list_cnt(missing_topics),
rd_list_cnt(requested_topics));
Expand All @@ -890,6 +901,42 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}
}
}
if (missing_topic_ids) {
rd_kafka_Uuid_t *topic_id;
rd_rkb_dbg(rkb, TOPIC, "METADATA",
"%d/%d requested topic(s) seen in metadata"
" (lookup by id)",
rd_list_cnt(requested_topic_ids) -
rd_list_cnt(missing_topic_ids),
rd_list_cnt(requested_topic_ids));
for (i = 0; i < rd_list_cnt(missing_topic_ids); i++) {
rd_kafka_Uuid_t *missing_topic_id =
missing_topic_ids->rl_elems[i];
rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s",
rd_kafka_Uuid_base64str(missing_topic_id));
}
RD_LIST_FOREACH(topic_id, missing_topic_ids, i) {
rd_kafka_topic_t *rkt;

rd_kafka_rdlock(rk);
rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk,
*topic_id);
rd_kafka_rdunlock(rk);
if (rkt) {
/* Received metadata response contained no
* information about topic 'rkt' and thus
* indicates the topic is not available in the
* cluster.
* Mark the topic as non-existent */
rd_kafka_topic_wrlock(rkt);
rd_kafka_topic_set_notexists(
rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
rd_kafka_topic_wrunlock(rkt);

rd_kafka_topic_destroy0(rkt);
}
}
}


rd_kafka_wrlock(rkb->rkb_rk);
Expand Down Expand Up @@ -959,12 +1006,12 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}


// TODO: Should be done for requested_topic_ids as well.
/* Remove cache hints for the originally requested topics. */
if (requested_topics)
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
if (requested_topic_ids)
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);

rd_kafka_wrunlock(rkb->rkb_rk);

Expand All @@ -980,7 +1027,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
* which may contain only a sub-set of the subscribed topics (namely
* the effective subscription of available topics) as to not
* propagate non-included topics as non-existent. */
if (cgrp_update && (requested_topics || all_topics))
if (cgrp_update &&
(requested_topics || requested_topic_ids || all_topics))
rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp,
rd_true /*do join*/);

Expand All @@ -993,10 +1041,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}

done:

// TODO: Should be done for requested_topic_ids as well.
if (missing_topics)
rd_list_destroy(missing_topics);
if (missing_topic_ids)
rd_list_destroy(missing_topic_ids);

/* This metadata request was triggered by someone wanting
* the metadata information back as a reply, so send that reply now.
Expand All @@ -1011,18 +1059,26 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
err_parse:
err = rkbuf->rkbuf_err;
err:
// TODO: Should be done for requested_topic_ids as well.
if (requested_topics) {
/* Failed requests shall purge cache hints for
* the requested topics. */
rd_kafka_wrlock(rkb->rkb_rk);
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
rd_kafka_wrunlock(rkb->rkb_rk);
}
if (requested_topic_ids) {
/* Failed requests shall purge cache hints for
* the requested topics. */
rd_kafka_wrlock(rkb->rkb_rk);
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);
rd_kafka_wrunlock(rkb->rkb_rk);
}

// TODO: Should be done for requested_topic_ids as well.
if (missing_topics)
rd_list_destroy(missing_topics);
if (missing_topic_ids)
rd_list_destroy(missing_topic_ids);
rd_tmpabuf_destroy(&tbuf);

return err;
Expand Down
12 changes: 11 additions & 1 deletion src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ rd_kafka_metadata_new_topic_with_partition_replicas_mock(int replication_factor,
*/

struct rd_kafka_metadata_cache_entry {
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
rd_avl_node_t rkmce_avlnode_by_id; /* rkmc_avl_by_id */
TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */
rd_ts_t rkmce_ts_expires; /* Expire time */
rd_ts_t rkmce_ts_insert; /* Insert time */
Expand All @@ -243,6 +244,7 @@ struct rd_kafka_metadata_cache_entry {

struct rd_kafka_metadata_cache {
rd_avl_t rkmc_avl;
rd_avl_t rkmc_avl_by_id;
TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry;
rd_kafka_timer_t rkmc_expiry_tmr;
int rkmc_cnt;
Expand All @@ -269,6 +271,8 @@ struct rd_kafka_metadata_cache {


int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic);
int rd_kafka_metadata_cache_delete_by_topic_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t topic_id);
void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk);
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts);
void rd_kafka_metadata_cache_topic_update(
Expand All @@ -282,8 +286,14 @@ void rd_kafka_metadata_cache_topic_update(
void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t topic_id,
int valid);
void rd_kafka_metadata_cache_purge_hints(rd_kafka_t *rk,
const rd_list_t *topics);
void rd_kafka_metadata_cache_purge_hints_by_id(rd_kafka_t *rk,
const rd_list_t *topic_ids);
int rd_kafka_metadata_cache_hint(rd_kafka_t *rk,
const rd_list_t *topics,
rd_list_t *dst,
Expand Down
Loading

0 comments on commit 414fb34

Please sign in to comment.