Skip to content

Commit

Permalink
Add brokers_sorted to internal metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Sep 4, 2023
1 parent 2c7f5e1 commit e6d4d63
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 40 deletions.
23 changes: 12 additions & 11 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -7917,7 +7917,7 @@ void rd_kafka_TopicCollection_destroy(rd_kafka_TopicCollection_t *topics) {
*/
static rd_kafka_TopicPartitionInfo_t *rd_kafka_TopicPartitionInfo_new(
const struct rd_kafka_metadata_partition *partition,
const struct rd_kafka_metadata_broker *brokers,
const struct rd_kafka_metadata_broker *brokers_sorted,
const rd_kafka_metadata_broker_internal_t *brokers_internal,
int broker_cnt) {
size_t i;
Expand All @@ -7930,25 +7930,26 @@ static rd_kafka_TopicPartitionInfo_t *rd_kafka_TopicPartitionInfo_new(

if (partition->leader >= 0) {
pinfo->leader = rd_kafka_Node_new_from_brokers(
partition->leader, brokers, brokers_internal, broker_cnt);
partition->leader, brokers_sorted, brokers_internal,
broker_cnt);
}

if (pinfo->isr_cnt > 0) {
pinfo->isr =
rd_calloc(pinfo->isr_cnt, sizeof(rd_kafka_Node_t *));
for (i = 0; i < pinfo->isr_cnt; i++)
pinfo->isr[i] = rd_kafka_Node_new_from_brokers(
partition->isrs[i], brokers, brokers_internal,
broker_cnt);
partition->isrs[i], brokers_sorted,
brokers_internal, broker_cnt);
}

if (pinfo->replica_cnt > 0) {
pinfo->replicas =
rd_calloc(pinfo->replica_cnt, sizeof(rd_kafka_Node_t *));
for (i = 0; i < pinfo->replica_cnt; i++)
pinfo->replicas[i] = rd_kafka_Node_new_from_brokers(
partition->replicas[i], brokers, brokers_internal,
broker_cnt);
partition->replicas[i], brokers_sorted,
brokers_internal, broker_cnt);
}

return pinfo;
Expand Down Expand Up @@ -7988,7 +7989,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new(
const char *topic,
const struct rd_kafka_metadata_partition *partitions,
int partition_cnt,
const struct rd_kafka_metadata_broker *brokers,
const struct rd_kafka_metadata_broker *brokers_sorted,
const rd_kafka_metadata_broker_internal_t *brokers_internal,
int broker_cnt,
const rd_kafka_AclOperation_t *authorized_operations,
Expand Down Expand Up @@ -8020,8 +8021,8 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new(
for (i = 0; i < partition_cnt; i++)
topicdesc->partitions[i] =
rd_kafka_TopicPartitionInfo_new(
&partitions[i], brokers, brokers_internal,
broker_cnt);
&partitions[i], brokers_sorted,
brokers_internal, broker_cnt);
}
return topicdesc;
}
Expand Down Expand Up @@ -8211,7 +8212,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
&authorized_operation_cnt);
topicdesc = rd_kafka_TopicDescription_new(
md->topics[i].topic, md->topics[i].partitions,
md->topics[i].partition_cnt, md->brokers,
md->topics[i].partition_cnt, mdi->brokers_sorted,
mdi->brokers, md->broker_cnt, authorized_operations,
authorized_operation_cnt,
mdi->topics[i].is_internal, NULL);
Expand Down Expand Up @@ -8372,7 +8373,7 @@ rd_kafka_ClusterDescription_new(const rd_kafka_metadata_internal_t *mdi) {

if (mdi->controller_id >= 0)
clusterdesc->controller = rd_kafka_Node_new_from_brokers(
mdi->controller_id, md->brokers, mdi->brokers,
mdi->controller_id, mdi->brokers_sorted, mdi->brokers,
md->broker_cnt);

clusterdesc->authorized_operations =
Expand Down
47 changes: 20 additions & 27 deletions src/rdkafka_aux.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,45 +254,38 @@ rd_kafka_Node_t *rd_kafka_Node_new(int32_t id,
*
* @return A new allocated Node object.
* Use rd_kafka_Node_destroy() to free when done.
* @remark The \p brokers_internal array is asumed to be sorted by node id,
* while the \p brokers array is not. This is the current default from
* Metadata response parsing.
* @remark The \p brokers_sorted and \p brokers_internal arrays are asumed to be
* sorted by id.
*/
rd_kafka_Node_t *rd_kafka_Node_new_from_brokers(
int32_t id,
const struct rd_kafka_metadata_broker *brokers,
const struct rd_kafka_metadata_broker *brokers_sorted,
const rd_kafka_metadata_broker_internal_t *brokers_internal,
int broker_cnt) {
int i;
rd_kafka_Node_t *node = rd_calloc(1, sizeof(*node));
node->id = id;
struct rd_kafka_metadata_broker key_sorted = {.id = id};
rd_kafka_metadata_broker_internal_t key_internal = {.id = id};

/* Brokers are not sorted, so we must iterate through all of them.
* However, the amount of brokers is small, and this function is only
* called while using the Admin API. */
for (i = 0; i < broker_cnt; i++) {
rd_kafka_metadata_broker_internal_t key = {.id = id};
rd_kafka_metadata_broker_internal_t *broker_internal = NULL;
char *rack_id = NULL;
struct rd_kafka_metadata_broker *broker =
bsearch(&key_sorted, brokers_sorted, broker_cnt,
sizeof(struct rd_kafka_metadata_broker),
rd_kafka_metadata_broker_cmp);

if (id != brokers[i].id)
continue;
rd_kafka_metadata_broker_internal_t *broker_internal =
bsearch(&key_internal, brokers_internal, broker_cnt,
sizeof(rd_kafka_metadata_broker_internal_t),
rd_kafka_metadata_broker_internal_cmp);

node->host = rd_strdup(brokers[i].host);
node->port = brokers[i].port;
node->id = id;

broker_internal =
bsearch(&key, brokers_internal, broker_cnt,
sizeof(rd_kafka_metadata_broker_internal_t),
rd_kafka_metadata_broker_internal_cmp);
if (!broker)
return node;

if (broker_internal)
rack_id = broker_internal->rack_id;
node->host = rd_strdup(broker->host);
node->port = broker->port;
if (broker_internal && broker_internal->rack_id)
node->rack_id = rd_strdup(broker_internal->rack_id);

if (rack_id)
node->rack_id = rd_strdup(rack_id);
break;
}
return node;
}

Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_aux.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ rd_kafka_Node_t *rd_kafka_Node_new(int32_t id,

rd_kafka_Node_t *rd_kafka_Node_new_from_brokers(
int32_t id,
const struct rd_kafka_metadata_broker *brokers,
const struct rd_kafka_metadata_broker *brokers_sorted,
const rd_kafka_metadata_broker_internal_t *brokers_internal,
int broker_cnt);

Expand Down
21 changes: 21 additions & 0 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ int rd_kafka_metadata_broker_internal_cmp(const void *_a, const void *_b) {
return RD_CMP(a->id, b->id);
}


/**
* @brief Id comparator for struct rd_kafka_metadata_broker*
*/
int rd_kafka_metadata_broker_cmp(const void *_a, const void *_b) {
const struct rd_kafka_metadata_broker *a = _a;
const struct rd_kafka_metadata_broker *b = _b;
return RD_CMP(a->id, b->id);
}


/**
* @brief Id comparator for rd_kafka_metadata_partition_internal_t
*/
Expand Down Expand Up @@ -549,6 +560,12 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rkbuf, "%d internal brokers: tmpabuf memory shortage",
md->broker_cnt);

if (!(mdi->brokers_sorted = rd_tmpabuf_alloc(
&tbuf, md->broker_cnt * sizeof(*mdi->brokers_sorted))))
rd_kafka_buf_parse_fail(
rkbuf, "%d sorted brokers: tmpabuf memory shortage",
md->broker_cnt);

for (i = 0; i < md->broker_cnt; i++) {
rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].id);
rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf,
Expand Down Expand Up @@ -585,6 +602,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,

qsort(mdi->brokers, md->broker_cnt, sizeof(mdi->brokers[i]),
rd_kafka_metadata_broker_internal_cmp);
memcpy(mdi->brokers_sorted, md->brokers,
sizeof(*mdi->brokers_sorted) * md->broker_cnt);
qsort(mdi->brokers_sorted, md->broker_cnt, sizeof(*mdi->brokers_sorted),
rd_kafka_metadata_broker_cmp);

/* Read TopicMetadata */
rd_kafka_buf_read_arraycnt(rkbuf, &md->topic_cnt, RD_KAFKAP_TOPICS_MAX);
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ typedef struct rd_kafka_metadata_internal_s {
be kept the first field so the pointer
can be cast to *rd_kafka_metadata_internal_t
when needed */
/* Identical to metadata->brokers, but sorted by broker id. */
struct rd_kafka_metadata_broker *brokers_sorted;
/* Internal metadata brokers. Same count as metadata.broker_cnt.
* Sorted by broker id. */
rd_kafka_metadata_broker_internal_t *brokers;
Expand Down Expand Up @@ -177,6 +179,8 @@ int rd_kafka_metadata_partition_id_cmp(const void *_a, const void *_b);

int rd_kafka_metadata_broker_internal_cmp(const void *_a, const void *_b);

int rd_kafka_metadata_broker_cmp(const void *_a, const void *_b);

void rd_kafka_metadata_partition_clear(
struct rd_kafka_metadata_partition *rkmp);

Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2218,7 +2218,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
rd_kafka_replyq_t use_replyq = replyq;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_Metadata, 0, 10, &features);
rkb, RD_KAFKAP_Metadata, 0, 12, &features);

rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1,
4 + (50 * topic_cnt) + 1,
Expand Down

0 comments on commit e6d4d63

Please sign in to comment.