diff --git a/examples/describe_topics.c b/examples/describe_topics.c index 5085b36725..56cd3b6010 100644 --- a/examples/describe_topics.c +++ b/examples/describe_topics.c @@ -136,51 +136,59 @@ int64_t parse_int(const char *what, const char *str) { return (int64_t)n; } - /** - * @brief Print partition information. + * @brief Print node information. */ -static void print_partition_info(const rd_kafka_TopicDescription_t *topic, - int partition_idx) { - rd_kafka_resp_err_t partition_err; - int leader, id, isr_cnt, replica_cnt, k; - partition_err = - rd_kafka_TopicDescription_partition_error(topic, partition_idx); - - if (partition_err != RD_KAFKA_RESP_ERR_NO_ERROR) { - printf("\tPartition at index %d has error[%" PRId32 "]: %s\n", - partition_idx, partition_err, - rd_kafka_err2str(partition_err)); +static void print_node_info(const rd_kafka_Node_t *node) { + if (!node) { + printf("\t\t(null)\n"); return; } - printf("\tPartition at index %d succeeded\n", partition_idx); - id = rd_kafka_TopicDescription_partition_id(topic, partition_idx); - leader = - rd_kafka_TopicDescription_partition_leader(topic, partition_idx); - isr_cnt = - rd_kafka_TopicDescription_partition_isr_count(topic, partition_idx); - replica_cnt = rd_kafka_TopicDescription_partition_replica_count( - topic, partition_idx); - printf("\tPartition has id: %d with leader: %d\n", id, leader); + + printf("\t\tNode [id: %" PRId32 + ", host: %s" + ", port: %" PRIu16 ", rack %s ]\n", + rd_kafka_Node_id(node), rd_kafka_Node_host(node), + rd_kafka_Node_port(node), rd_kafka_Node_rack_id(node)); +} + +/** + * @brief Print partition information. + */ +static void +print_partition_info(const rd_kafka_TopicPartitionInfo_t *partition) { + size_t k; + int id; + const rd_kafka_Node_t **isr; + size_t isr_cnt; + const rd_kafka_Node_t **replicas; + size_t replica_cnt; + + id = rd_kafka_TopicPartitionInfo_partition(partition); + printf("\tPartition id: %d\n", id); + + printf("\tPartition leader: \n"); + print_node_info(rd_kafka_TopicPartitionInfo_leader(partition)); + + isr = rd_kafka_TopicPartitionInfo_isr(partition, &isr_cnt); if (isr_cnt) { printf( "\tThe in-sync replica count is: %d, they " - "are: ", - isr_cnt); + "are: \n", + (int)isr_cnt); for (k = 0; k < isr_cnt; k++) - printf("%d ", rd_kafka_TopicDescription_partition_isr( - topic, partition_idx, k)); - printf("\n"); + print_node_info(isr[k]); } else printf("\tThe in-sync replica count is 0\n"); + replicas = rd_kafka_TopicPartitionInfo_isr(partition, &replica_cnt); if (replica_cnt) { - printf("\tThe replica count is: %d, they are: ", replica_cnt); + printf( + "\tThe replica count is: %d, they " + "are: \n", + (int)replica_cnt); for (k = 0; k < replica_cnt; k++) - printf("%d ", - rd_kafka_TopicDescription_partition_replica( - topic, partition_idx, k)); - printf("\n"); + print_node_info(replicas[k]); } else printf("\tThe replica count is 0\n"); } @@ -189,14 +197,14 @@ static void print_partition_info(const rd_kafka_TopicDescription_t *topic, * @brief Print topic information. */ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { - int j, acl_operation; + size_t j; const rd_kafka_error_t *error; - const char *topic_name = rd_kafka_TopicDescription_topic_name(topic); - int topic_authorized_operations_cnt = - rd_kafka_TopicDescription_topic_authorized_operation_count(topic); - int partition_cnt = - rd_kafka_TopicDescription_topic_partition_count(topic); - error = rd_kafka_TopicDescription_error(topic); + const char *topic_name = rd_kafka_TopicDescription_name(topic); + error = rd_kafka_TopicDescription_error(topic); + const rd_kafka_AclOperation_t *authorized_operations; + size_t authorized_operations_cnt; + const rd_kafka_TopicPartitionInfo_t **partitions; + size_t partition_cnt; if (rd_kafka_error_code(error)) { printf("Topic: %s has error[%" PRId32 "]: %s\n", topic_name, @@ -205,20 +213,25 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { return; } + authorized_operations = rd_kafka_TopicDescription_authorized_operations( + topic, &authorized_operations_cnt); + printf( "Topic: %s succeeded, has %d topic authorized operations " "allowed, they are:\n", - topic_name, topic_authorized_operations_cnt); - for (j = 0; j < topic_authorized_operations_cnt; j++) { - acl_operation = - rd_kafka_TopicDescription_authorized_operation(topic, j); + topic_name, (int)authorized_operations_cnt); + + for (j = 0; j < authorized_operations_cnt; j++) printf("\t%s operation is allowed\n", - rd_kafka_AclOperation_name(acl_operation)); - } + rd_kafka_AclOperation_name(authorized_operations[j])); + + + partitions = + rd_kafka_TopicDescription_partitions(topic, &partition_cnt); - printf("partition count is: %d\n", partition_cnt); + printf("partition count is: %d\n", (int)partition_cnt); for (j = 0; j < partition_cnt; j++) { - print_partition_info(topic, j); + print_partition_info(partitions[j]); printf("\n"); } } @@ -257,8 +270,9 @@ static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc, * topics. */ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { - rd_kafka_t *rk = NULL; - const char **topics = NULL; + rd_kafka_t *rk = NULL; + const char **topic_names = NULL; + rd_kafka_TopicCollection_t *topics = NULL; char errstr[512]; rd_kafka_AdminOptions_t *options = NULL; rd_kafka_event_t *event = NULL; @@ -277,8 +291,10 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { include_topic_authorized_operations > 1) usage("include_topic_authorized_operations not a 0-1 int"); - topics = (const char **)&argv[1]; - topics_cnt = argc - 1; + topic_names = (const char **)&argv[1]; + topics_cnt = argc - 1; + topics = + rd_kafka_TopicCollection_new_from_names(topic_names, topics_cnt); /* * Create producer instance @@ -315,7 +331,7 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { } /* Call DescribeTopics */ - rd_kafka_DescribeTopics(rk, topics, topics_cnt, options, queue); + rd_kafka_DescribeTopics(rk, topics, options, queue); /* Wait for results */ event = rd_kafka_queue_poll(queue, -1 /* indefinitely but limited by @@ -348,6 +364,8 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) { exit: /* Cleanup. */ + if (topics) + rd_kafka_TopicCollection_destroy(topics); if (event) rd_kafka_event_destroy(event); if (options) diff --git a/src/rdkafka.h b/src/rdkafka.h index 1ef6e2f7c8..c9b97603b9 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -4973,6 +4973,16 @@ const char *rd_kafka_Node_host(const rd_kafka_Node_t *node); RD_EXPORT uint16_t rd_kafka_Node_port(const rd_kafka_Node_t *node); +/** + * @brief Get the rack of \p node. + * + * @param node The Node instance + * + * @return The node rack id. May be NULL. + */ +RD_EXPORT +const char *rd_kafka_Node_rack_id(const rd_kafka_Node_t *node); + /**@}*/ @@ -7984,12 +7994,44 @@ rd_kafka_DeleteRecords_result_offsets( * @{ */ +/** + * @brief Represents a collection of topics, to be passed to DescribeTopics. + * + */ +typedef struct rd_kafka_TopicCollection_s rd_kafka_TopicCollection_t; + +/** + * @brief TopicPartition represents a partition in the DescribeTopics result. + * + */ +typedef struct rd_kafka_TopicPartitionInfo_s rd_kafka_TopicPartitionInfo_t; + /** * @brief DescribeTopics result type. * */ typedef struct rd_kafka_TopicDescription_s rd_kafka_TopicDescription_t; +/** + * @brief Creates a new TopicCollection for passing to rd_kafka_DescribeTopics. + * + * @param topics A list of topics. + * @param topics_cnt Count of topics. + * + * @return a newly allocated TopicCollection object. Must be freed using + * rd_kafka_TopicCollection_destroy when done. + */ +RD_EXPORT +rd_kafka_TopicCollection_t * +rd_kafka_TopicCollection_new_from_names(const char **topics, size_t topics_cnt); + +/** + * @brief Destroy and free a TopicCollection object created with + * rd_kafka_TopicCollection_new_* methods. + */ +RD_EXPORT void +rd_kafka_TopicCollection_destroy(rd_kafka_TopicCollection_t *topics); + /** * @brief Describe topics as specified by the \p topics * array of size \p topics_cnt elements. @@ -8007,8 +8049,7 @@ typedef struct rd_kafka_TopicDescription_s rd_kafka_TopicDescription_t; */ RD_EXPORT void rd_kafka_DescribeTopics(rd_kafka_t *rk, - const char **topics, - size_t topics_cnt, + const rd_kafka_TopicCollection_t *topics, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu); @@ -8026,160 +8067,120 @@ const rd_kafka_TopicDescription_t **rd_kafka_DescribeTopics_result_topics( const rd_kafka_DescribeTopics_result_t *result, size_t *cntp); + /** - * @brief Gets the topic partition count for the \p topicdesc topic. + * @brief Gets an array of partitions for the \p topicdesc topic. * * @param topicdesc The topic description. + * @param cntp is updated to the number of partitions in the array. * - * @return The topic partition count. + * @return An array of TopicPartitionInfos. + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p topicdesc object. */ RD_EXPORT -const int rd_kafka_TopicDescription_topic_partition_count( - const rd_kafka_TopicDescription_t *topicdesc); +const rd_kafka_TopicPartitionInfo_t **rd_kafka_TopicDescription_partitions( + const rd_kafka_TopicDescription_t *topicdesc, + size_t *cntp); /** - * @brief Gets the partition id for partition at index position for the - * \p topicdesc topic. + * @brief Gets the partition id for \p partition. * - * @param topicdesc The topic description. - * @param idx Index for the partitions. + * @param partition The partition info. * * @return The partition id. */ RD_EXPORT -const int rd_kafka_TopicDescription_partition_id( - const rd_kafka_TopicDescription_t *topicdesc, - int idx); +const int rd_kafka_TopicPartitionInfo_partition( + const rd_kafka_TopicPartitionInfo_t *partition); + /** - * @brief Gets the partition leader for partition at index position for the - * \p topicdesc topic. + * @brief Gets the partition leader for \p partition. * - * @param topicdesc The topic description. - * @param idx Index for the partitions. + * @param partition The partition info. * * @return The partition leader. - */ -RD_EXPORT -const int rd_kafka_TopicDescription_partition_leader( - const rd_kafka_TopicDescription_t *topicdesc, - int idx); - -/** - * @brief Gets the partition in-sync replica count for partition at index - * position for the \p topicdesc topic. * - * @param topicdesc The topic description. - * @param idx Index for the partitions. - * - * @return The partition replica count. + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p partition object. */ RD_EXPORT -const int rd_kafka_TopicDescription_partition_isr_count( - const rd_kafka_TopicDescription_t *topicdesc, - int idx); +const rd_kafka_Node_t *rd_kafka_TopicPartitionInfo_leader( + const rd_kafka_TopicPartitionInfo_t *partition); /** - * @brief Gets the in-sync replica at index \p replica_idx for the partition - * at the index \p partition_idx for the topic \p topicdesc. + * @brief Gets the partition in-sync replicas for \p partition. * - * @param topicdesc The topic description. - * @param partition_idx Index for the partitions. - * @param isr_idx Index for the in-sync replica. - * - * @return The partition in-sync replica. - */ -RD_EXPORT -const int rd_kafka_TopicDescription_partition_isr( - const rd_kafka_TopicDescription_t *topicdesc, - int partition_idx, - int isr_idx); - -/** - * @brief Gets the partition replica count for partition at index position for - * the \p topicdesc topic. + * @param partition The partition info. + * @param cntp is updated with in-sync replicas count. * - * @param topicdesc The topic description. - * @param idx Index for the partitions. + * @return The in-sync replica nodes. * - * @return The partition replica count. + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p partition object. */ RD_EXPORT -const int rd_kafka_TopicDescription_partition_replica_count( - const rd_kafka_TopicDescription_t *topicdesc, - int idx); - +const rd_kafka_Node_t ** +rd_kafka_TopicPartitionInfo_isr(const rd_kafka_TopicPartitionInfo_t *partition, + size_t *cntp); /** - * @brief Gets the partition replica at index \p replica_idx for the partition - * at the index \p partition_idx for the topic \p topicdesc. + * @brief Gets the partition replicas for \p partition. * - * @param topicdesc The topic description. - * @param partition_idx Index for the partitions. - * @param replica_idx Index for the replica. + * @param partition The partition info. + * @param cntp is updated with partition replicas count. * - * @return The partition replica. - */ -RD_EXPORT -const int rd_kafka_TopicDescription_partition_replica( - const rd_kafka_TopicDescription_t *topicdesc, - int partition_idx, - int replica_idx); - -/** - * @brief Gets the partition error for partition at index position for the \p - * topicdesc topic. - * - * @param topicdesc The topic description. - * @param idx Index for the partitions. + * @return The partition replicas nodes. * - * @return The partition error. + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p partition object. */ RD_EXPORT -const rd_kafka_resp_err_t rd_kafka_TopicDescription_partition_error( - const rd_kafka_TopicDescription_t *topicdesc, - int idx); +const rd_kafka_Node_t **rd_kafka_TopicPartitionInfo_replicas( + const rd_kafka_TopicPartitionInfo_t *partition, + size_t *cntp); /** - * @brief Gets the topic authorized acl operations count for the \p topicdesc - * topic. + * @brief Gets the topic authorized ACL operations for the \p topicdesc topic. * * @param topicdesc The topic description. + * @param cntp is updated with authorized ACL operations count. * - * @return The topic authorized operations count. + * @return The topic authorized operations. * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p topicdesc object. */ RD_EXPORT -const int rd_kafka_TopicDescription_topic_authorized_operation_count( - const rd_kafka_TopicDescription_t *topicdesc); +const rd_kafka_AclOperation_t *rd_kafka_TopicDescription_authorized_operations( + const rd_kafka_TopicDescription_t *topicdesc, + size_t *cntp); /** - * @brief Gets operation at idx index of topic authorized operations for the - * \p topicdesc topic. + * @brief Gets the topic name for the \p topicdesc topic. * * @param topicdesc The topic description. - * @param idx The index for which element is needed. * - * @return Authorized operation at given index. + * @return The topic name. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p topicdesc object. */ RD_EXPORT -const rd_kafka_AclOperation_t rd_kafka_TopicDescription_authorized_operation( - const rd_kafka_TopicDescription_t *topicdesc, - size_t idx); +const char * +rd_kafka_TopicDescription_name(const rd_kafka_TopicDescription_t *topicdesc); /** - * @brief Gets the topic name for the \p topicdesc topic. + * @brief Gets if the \p topicdesc topic is internal. * * @param topicdesc The topic description. * - * @return The topic name. - * - * @remark The lifetime of the returned memory is the same - * as the lifetime of the \p topicdesc object. + * @return 1 if the topic is internal to Kafka, 0 otherwise. */ RD_EXPORT -const char *rd_kafka_TopicDescription_topic_name( +int rd_kafka_TopicDescription_is_internal( const rd_kafka_TopicDescription_t *topicdesc); /** diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 6d520fb52b..6b4775794e 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -7860,6 +7860,97 @@ rd_kafka_DescribeConsumerGroups_result_groups( * */ +rd_kafka_TopicCollection_t * +rd_kafka_TopicCollection_new_from_names(const char **topics, + size_t topics_cnt) { + size_t i; + rd_kafka_TopicCollection_t *ret = + rd_calloc(1, sizeof(rd_kafka_TopicCollection_t)); + + ret->topics_cnt = topics_cnt; + if (!ret->topics_cnt) + return ret; + + ret->topics = rd_calloc(sizeof(char *), topics_cnt); + for (i = 0; i < topics_cnt; i++) + ret->topics[i] = rd_strdup(topics[i]); + + return ret; +} + +void rd_kafka_TopicCollection_destroy(rd_kafka_TopicCollection_t *topics) { + size_t i; + + for (i = 0; i < topics->topics_cnt; i++) + rd_free(topics->topics[i]); + + RD_IF_FREE(topics->topics, rd_free); + rd_free(topics); +} + +/** + * @brief Create a new TopicPartitionInfo object. + * + * @return A newly allocated TopicPartitionInfo. Use + * rd_kafka_TopicPartitionInfo_destroy() to free when done. + */ +static rd_kafka_TopicPartitionInfo_t *rd_kafka_TopicPartitionInfo_new( + const struct rd_kafka_metadata_partition *partition, + const struct rd_kafka_metadata_broker *brokers, + const rd_kafka_metadata_broker_internal_t *brokers_internal, + int broker_cnt) { + size_t i; + rd_kafka_TopicPartitionInfo_t *pinfo = + rd_calloc(1, sizeof(rd_kafka_TopicPartitionInfo_t)); + + pinfo->partition = partition->id; + pinfo->isr_cnt = partition->isr_cnt; + pinfo->replica_cnt = partition->replica_cnt; + + if (partition->leader >= 0) { + pinfo->leader = rd_kafka_Node_new_from_brokers( + partition->leader, brokers, brokers_internal, broker_cnt); + } + + if (pinfo->isr_cnt > 0) { + pinfo->isr = + rd_calloc(pinfo->isr_cnt, sizeof(rd_kafka_Node_t *)); + for (i = 0; i < pinfo->isr_cnt; i++) + pinfo->isr[i] = rd_kafka_Node_new_from_brokers( + partition->isrs[i], brokers, brokers_internal, + broker_cnt); + } + + if (pinfo->replica_cnt > 0) { + pinfo->replicas = + rd_calloc(sizeof(rd_kafka_Node_t *), pinfo->replica_cnt); + for (i = 0; i < pinfo->replica_cnt; i++) + pinfo->replicas[i] = rd_kafka_Node_new_from_brokers( + partition->replicas[i], brokers, brokers_internal, + broker_cnt); + } + + return pinfo; +} + +/** + * @brief Destroy and deallocate a TopicPartitionInfo. + */ +static void +rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { + int i; + RD_IF_FREE(pinfo->leader, rd_kafka_Node_destroy); + + for (i = 0; i < pinfo->isr_cnt; i++) + rd_kafka_Node_destroy(pinfo->isr[i]); + RD_IF_FREE(pinfo->isr, rd_free); + + for (i = 0; i < pinfo->replica_cnt; i++) + rd_kafka_Node_destroy(pinfo->replicas[i]); + RD_IF_FREE(pinfo->replicas, rd_free); + + rd_free(pinfo); +} /** * @brief Create a new TopicDescription object. @@ -7872,36 +7963,45 @@ rd_kafka_DescribeConsumerGroups_result_groups( * @return A newly allocated TopicDescription object. * Use rd_kafka_TopicDescription_destroy() to free when done. */ -static rd_kafka_TopicDescription_t * -rd_kafka_TopicDescription_new(const char *topic, - struct rd_kafka_metadata_partition *partitions, - int partition_cnt, - const rd_list_t *topic_authorized_operations, - rd_kafka_error_t *error) { +static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( + const char *topic, + const struct rd_kafka_metadata_partition *partitions, + int partition_cnt, + const struct rd_kafka_metadata_broker *brokers, + const rd_kafka_metadata_broker_internal_t *brokers_internal, + int broker_cnt, + const rd_list_t *topic_authorized_operations, + rd_bool_t is_internal, + rd_kafka_error_t *error) { rd_kafka_TopicDescription_t *topicdesc; int i; topicdesc = rd_calloc(1, sizeof(*topicdesc)); topicdesc->topic = rd_strdup(topic); topicdesc->partition_cnt = partition_cnt; + topicdesc->is_internal = is_internal; if (error) topicdesc->error = rd_kafka_error_copy(error); - if (topic_authorized_operations == NULL) - rd_list_init(&topicdesc->authorized_operations, 0, rd_free); - else { - rd_list_init_copy(&topicdesc->authorized_operations, - topic_authorized_operations); - rd_list_copy_to(&topicdesc->authorized_operations, - topic_authorized_operations, - rd_kafka_AclOperation_copy, NULL); + if (topic_authorized_operations) { + rd_kafka_AclOperation_t *acl_operation; + topicdesc->authorized_operations_cnt = + rd_list_cnt(topic_authorized_operations); + topicdesc->authorized_operations = + rd_malloc(sizeof(rd_kafka_AclOperation_t) * + topicdesc->authorized_operations_cnt); + RD_LIST_FOREACH(acl_operation, topic_authorized_operations, i) { + topicdesc->authorized_operations[i] = *acl_operation; + } } if (partitions) { topicdesc->partitions = rd_calloc(sizeof(*partitions), partition_cnt); for (i = 0; i < partition_cnt; i++) - rd_kafka_copy_metadata_partition( - &partitions[i], &topicdesc->partitions[i]); + topicdesc->partitions[i] = + rd_kafka_TopicPartitionInfo_new( + &partitions[i], brokers, brokers_internal, + broker_cnt); } return topicdesc; } @@ -7917,7 +8017,8 @@ rd_kafka_TopicDescription_new(const char *topic, static rd_kafka_TopicDescription_t * rd_kafka_TopicDescription_new_error(const char *topic, rd_kafka_error_t *error) { - return rd_kafka_TopicDescription_new(topic, NULL, 0, NULL, error); + return rd_kafka_TopicDescription_new(topic, NULL, 0, NULL, NULL, 0, + NULL, rd_false, error); } static void @@ -7926,12 +8027,11 @@ rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) { RD_IF_FREE(topicdesc->topic, rd_free); RD_IF_FREE(topicdesc->error, rd_kafka_error_destroy); + RD_IF_FREE(topicdesc->authorized_operations, rd_free); for (i = 0; i < topicdesc->partition_cnt; i++) - rd_kafka_metadata_partition_clear(&topicdesc->partitions[i]); + rd_kafka_TopicPartitionInfo_destroy(topicdesc->partitions[i]); - RD_IF_FREE(topicdesc->partitions, rd_free); - rd_list_destroy(&topicdesc->authorized_operations); rd_free(topicdesc); } @@ -7939,71 +8039,54 @@ static void rd_kafka_TopicDescription_free(void *ptr) { rd_kafka_TopicDescription_destroy(ptr); } -const rd_kafka_AclOperation_t rd_kafka_TopicDescription_authorized_operation( - const rd_kafka_TopicDescription_t *topicdesc, - size_t idx) { - rd_kafka_AclOperation_t *entry = - rd_list_elem(&topicdesc->authorized_operations, idx); - return *entry; +const int rd_kafka_TopicPartitionInfo_partition( + const rd_kafka_TopicPartitionInfo_t *partition) { + return partition->partition; } -const int rd_kafka_TopicDescription_topic_authorized_operation_count( - const rd_kafka_TopicDescription_t *topicdesc) { - return rd_list_cnt(&topicdesc->authorized_operations); +const rd_kafka_Node_t *rd_kafka_TopicPartitionInfo_leader( + const rd_kafka_TopicPartitionInfo_t *partition) { + return partition->leader; } -const int rd_kafka_TopicDescription_partition_id( - const rd_kafka_TopicDescription_t *topicdesc, - int idx) { - return topicdesc->partitions[idx].id; -} - -const int rd_kafka_TopicDescription_partition_leader( - const rd_kafka_TopicDescription_t *topicdesc, - int idx) { - return topicdesc->partitions[idx].leader; -} -const int rd_kafka_TopicDescription_partition_isr_count( - const rd_kafka_TopicDescription_t *topicdesc, - int idx) { - return topicdesc->partitions[idx].isr_cnt; +const rd_kafka_Node_t ** +rd_kafka_TopicPartitionInfo_isr(const rd_kafka_TopicPartitionInfo_t *partition, + size_t *cntp) { + *cntp = partition->isr_cnt; + return partition->isr; } -const int rd_kafka_TopicDescription_partition_replica_count( - const rd_kafka_TopicDescription_t *topicdesc, - int idx) { - return topicdesc->partitions[idx].replica_cnt; +const rd_kafka_Node_t **rd_kafka_TopicPartitionInfo_replicas( + const rd_kafka_TopicPartitionInfo_t *partition, + size_t *cntp) { + *cntp = partition->replica_cnt; + return partition->replicas; } -const int rd_kafka_TopicDescription_partition_isr( +const rd_kafka_TopicPartitionInfo_t **rd_kafka_TopicDescription_partitions( const rd_kafka_TopicDescription_t *topicdesc, - int partition_idx, - int isr_idx) { - return topicdesc->partitions[partition_idx].isrs[isr_idx]; + size_t *cntp) { + *cntp = topicdesc->partition_cnt; + return topicdesc->partitions; } -const int rd_kafka_TopicDescription_partition_replica( +const rd_kafka_AclOperation_t *rd_kafka_TopicDescription_authorized_operations( const rd_kafka_TopicDescription_t *topicdesc, - int partition_idx, - int replica_idx) { - return topicdesc->partitions[partition_idx].replicas[replica_idx]; + size_t *cntp) { + *cntp = topicdesc->authorized_operations_cnt; + return topicdesc->authorized_operations; } -const rd_kafka_resp_err_t rd_kafka_TopicDescription_partition_error( - const rd_kafka_TopicDescription_t *topicdesc, - int idx) { - return topicdesc->partitions[idx].err; -} -const int rd_kafka_TopicDescription_topic_partition_count( - const rd_kafka_TopicDescription_t *topicdesc) { - return topicdesc->partition_cnt; +const char * +rd_kafka_TopicDescription_name(const rd_kafka_TopicDescription_t *topicdesc) { + return topicdesc->topic; } -const char *rd_kafka_TopicDescription_topic_name( +int rd_kafka_TopicDescription_is_internal( const rd_kafka_TopicDescription_t *topicdesc) { - return topicdesc->topic; + return topicdesc->is_internal; } const rd_kafka_error_t * @@ -8105,8 +8188,9 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, mdi->topics[i].topic_authorized_operations); topicdesc = rd_kafka_TopicDescription_new( md->topics[i].topic, md->topics[i].partitions, - md->topics[i].partition_cnt, authorized_operations, - NULL); + md->topics[i].partition_cnt, md->brokers, + mdi->brokers, md->broker_cnt, authorized_operations, + mdi->topics[i].is_internal, NULL); if (authorized_operations) rd_list_destroy(authorized_operations); } else { @@ -8132,8 +8216,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, } void rd_kafka_DescribeTopics(rd_kafka_t *rk, - const char **topics, - size_t topics_cnt, + const rd_kafka_TopicCollection_t *topics, const rd_kafka_AdminOptions_t *options, rd_kafka_queue_t *rkqu) { rd_kafka_op_t *rko; @@ -8151,7 +8234,7 @@ void rd_kafka_DescribeTopics(rd_kafka_t *rk, rk, RD_KAFKA_OP_DESCRIBETOPICS, RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT, &cbs, options, rkqu->rkqu_q); - if (topics_cnt == 0) { + if (topics->topics_cnt == 0) { rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG, "No topics to describe"); rd_kafka_admin_common_worker_destroy(rk, rko, @@ -8159,10 +8242,11 @@ void rd_kafka_DescribeTopics(rd_kafka_t *rk, return; } - rd_list_init(&rko->rko_u.admin_request.args, (int)topics_cnt, rd_free); - for (i = 0; i < topics_cnt; i++) + rd_list_init(&rko->rko_u.admin_request.args, (int)topics->topics_cnt, + rd_free); + for (i = 0; i < topics->topics_cnt; i++) rd_list_add(&rko->rko_u.admin_request.args, - rd_strdup(topics[i])); + rd_strdup(topics->topics[i])); /* Check for duplicates. * Make a temporary copy of the topic list and sort it to check for diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index a87740e05d..d488e6963b 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -494,17 +494,42 @@ struct rd_kafka_ConsumerGroupDescription_s { * @name DescribeTopics * @{ */ + +/** + * @brief TopicCollection contains a list of topics. + * + */ +struct rd_kafka_TopicCollection_s { + char **topics; /**< List of topic names. */ + size_t topics_cnt; /**< Count of topic names. */ +}; + +/** + * @brief TopicPartition result type in DescribeTopics result. + * + */ +struct rd_kafka_TopicPartitionInfo_s { + int partition; /**< Partition id. */ + rd_kafka_Node_t *leader; /**< Leader of the partition. */ + size_t isr_cnt; /**< Count of insync replicas. */ + rd_kafka_Node_t **isr; /**< List of in sync replica nodes. */ + size_t replica_cnt; /**< Count of partition replicas. */ + rd_kafka_Node_t **replicas; /**< List of replica nodes. */ +}; + /** * @struct DescribeTopics result */ struct rd_kafka_TopicDescription_s { - char *topic; /**< Topic name */ - int partition_cnt; /**< Number of partitions in \p partitions*/ - struct rd_kafka_metadata_partition *partitions; /**< Partitions */ - rd_kafka_error_t *error; /**< Topic error reported by broker */ - rd_list_t - authorized_operations; /**< Operations allowed for topic. - Type: (rd_kafka_AclOperation_t *) */ + char *topic; /**< Topic name */ + int partition_cnt; /**< Number of partitions in \p partitions*/ + rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ + rd_kafka_TopicPartitionInfo_t **partitions; /**< Partitions */ + rd_kafka_error_t *error; /**< Topic error reported by broker */ + size_t authorized_operations_cnt; /**< Count of operations allowed for + topic.*/ + rd_kafka_AclOperation_t + *authorized_operations; /**< Operations allowed for topic. */ }; /**@}*/ diff --git a/src/rdkafka_aux.c b/src/rdkafka_aux.c index 92877fe2c1..c2bc3e8d33 100644 --- a/src/rdkafka_aux.c +++ b/src/rdkafka_aux.c @@ -248,6 +248,54 @@ rd_kafka_Node_t *rd_kafka_Node_new(int32_t id, return ret; } +/** + * @brief Create a new Node object given a node id, and use broker information + * to populate other fields. + * + * @return A new allocated Node object. + * Use rd_kafka_Node_destroy() to free when done. + * @remark The \p brokers_internal array is asumed to be sorted by node id, + * while the \p brokers array is not. This is the current default from + * Metadata response parsing. + */ +rd_kafka_Node_t *rd_kafka_Node_new_from_brokers( + int32_t id, + const struct rd_kafka_metadata_broker *brokers, + const rd_kafka_metadata_broker_internal_t *brokers_internal, + int broker_cnt) { + int i; + rd_kafka_Node_t *node = rd_calloc(1, sizeof(*node)); + node->id = id; + + /* Brokers are not sorted, so we must iterate through all of them. + * However, the amount of brokers is small, and this function is only + * called while using the Admin API. */ + for (i = 0; i < broker_cnt; i++) { + rd_kafka_metadata_broker_internal_t key = {.id = id}; + rd_kafka_metadata_broker_internal_t *broker_internal = NULL; + char *rack_id = NULL; + + if (id != brokers[i].id) + continue; + + node->host = rd_strdup(brokers[i].host); + node->port = brokers[i].port; + + broker_internal = + bsearch(&key, brokers_internal, broker_cnt, + sizeof(rd_kafka_metadata_broker_internal_t), + rd_kafka_metadata_broker_internal_cmp); + + if (broker_internal) + rack_id = broker_internal->rack_id; + + if (rack_id) + node->rack_id = rd_strdup(rack_id); + break; + } + return node; +} + /** * @brief Copy \p src Node object * @@ -287,3 +335,7 @@ const char *rd_kafka_Node_host(const rd_kafka_Node_t *node) { uint16_t rd_kafka_Node_port(const rd_kafka_Node_t *node) { return node->port; } + +const char *rd_kafka_Node_rack_id(const rd_kafka_Node_t *node) { + return node->rack_id; +} diff --git a/src/rdkafka_aux.h b/src/rdkafka_aux.h index b1865f9400..d2714621e1 100644 --- a/src/rdkafka_aux.h +++ b/src/rdkafka_aux.h @@ -116,6 +116,12 @@ rd_kafka_Node_t *rd_kafka_Node_new(int32_t id, uint16_t port, const char *rack_id); +rd_kafka_Node_t *rd_kafka_Node_new_from_brokers( + int32_t id, + const struct rd_kafka_metadata_broker *brokers, + const rd_kafka_metadata_broker_internal_t *brokers_internal, + int broker_cnt); + rd_kafka_Node_t *rd_kafka_Node_copy(const rd_kafka_Node_t *src); void rd_kafka_Node_destroy(rd_kafka_Node_t *node); diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index be06f0d985..1ea32f372a 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -637,10 +637,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb, rd_kafka_buf_read_uuid(rkbuf, &mdi->topics[i].topic_id); } - if (ApiVersion >= 1) { - int8_t is_internal; - rd_kafka_buf_read_i8(rkbuf, &is_internal); - } + if (ApiVersion >= 1) + rd_kafka_buf_read_bool(rkbuf, + &mdi->topics[i].is_internal); /* PartitionMetadata */ rd_kafka_buf_read_arraycnt(rkbuf, &md->topics[i].partition_cnt, diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index bc13014a09..58f68a73cf 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -57,6 +57,7 @@ typedef struct rd_kafka_metadata_topic_internal_s { rd_kafka_uuid_t topic_id; int32_t topic_authorized_operations; /**< ACL operations allowed for topic */ + rd_bool_t is_internal; /**< Is topic internal to Kafka? */ } rd_kafka_metadata_topic_internal_t; diff --git a/tests/0080-admin_ut.c b/tests/0080-admin_ut.c index ebbf3a453a..cebbf9c8e6 100644 --- a/tests/0080-admin_ut.c +++ b/tests/0080-admin_ut.c @@ -774,6 +774,7 @@ static void do_test_DescribeTopics(const char *what, rd_kafka_queue_t *q; #define TEST_DESCRIBE_TOPICS_CNT 4 const char *topic_names[TEST_DESCRIBE_TOPICS_CNT]; + rd_kafka_TopicCollection_t *topics; rd_kafka_AdminOptions_t *options = NULL; int exp_timeout = MY_SOCKET_TIMEOUT_MS; int i; @@ -797,6 +798,9 @@ static void do_test_DescribeTopics(const char *what, topic_names[i] = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); } + topics = rd_kafka_TopicCollection_new_from_names( + topic_names, TEST_DESCRIBE_TOPICS_CNT); + if (with_options) { options = rd_kafka_AdminOptions_new( rk, RD_KAFKA_ADMIN_OP_DESCRIBETOPICS); @@ -825,8 +829,7 @@ static void do_test_DescribeTopics(const char *what, TIMING_START(&timing, "DescribeTopics"); TEST_SAY("Call DescribeTopics, timeout is %dms\n", exp_timeout); - rd_kafka_DescribeTopics(rk, topic_names, TEST_DESCRIBE_TOPICS_CNT, - options, q); + rd_kafka_DescribeTopics(rk, topics, options, q); TIMING_ASSERT_LATER(&timing, 0, 50); /* Poll result queue */ @@ -865,6 +868,7 @@ static void do_test_DescribeTopics(const char *what, for (i = 0; i < TEST_DESCRIBE_TOPICS_CNT; i++) { rd_free((char *)topic_names[i]); } + rd_kafka_TopicCollection_destroy(topics); if (options) rd_kafka_AdminOptions_destroy(options); diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 75a010cfaf..6a2288006b 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -3146,6 +3146,29 @@ static void do_test_DescribeConsumerGroups(const char *what, } \ } while (0) +static void +xtest_match_authorized_operations(const rd_kafka_AclOperation_t *expected, + size_t expected_cnt, + const rd_kafka_AclOperation_t *actual, + size_t actual_cnt) { + size_t i, j; + TEST_ASSERT(expected_cnt == actual_cnt, + "Expected %" PRIusz " authorized operations, got %" PRIusz, + expected_cnt, actual_cnt); + + for (i = 0; i < expected_cnt; i++) { + for (j = 0; j < actual_cnt; j++) + if (expected[i] == actual[j]) + break; + + if (j == actual_cnt) + TEST_FAIL( + "Did not find expected authorized operation in " + "result %s\n", + rd_kafka_AclOperation_name(expected[i])); + } +} + /** * @brief Test DescribeTopics: create a topic, describe it, and then * delete it. @@ -3161,7 +3184,8 @@ static void do_test_DescribeTopics(const char *what, rd_bool_t include_authorized_operations) { rd_kafka_queue_t *q; #define TEST_DESCRIBE_TOPICS_CNT 3 - char *topics[TEST_DESCRIBE_TOPICS_CNT]; + char *topic_names[TEST_DESCRIBE_TOPICS_CNT]; + rd_kafka_TopicCollection_t *topics; rd_kafka_AdminOptions_t *options; rd_kafka_event_t *rkev; const rd_kafka_error_t *error; @@ -3169,6 +3193,8 @@ static void do_test_DescribeTopics(const char *what, test_timing_t timing; const rd_kafka_DescribeTopics_result_t *res; const rd_kafka_TopicDescription_t **result_topics; + const rd_kafka_TopicPartitionInfo_t **partitions; + size_t partitions_cnt; size_t result_topics_cnt; char errstr[128]; const char *errstr2; @@ -3177,7 +3203,8 @@ static void do_test_DescribeTopics(const char *what, const char *principal; rd_kafka_AclBinding_t *acl_bindings[1]; int i; - int authorized_operations_cnt; + const rd_kafka_AclOperation_t *authorized_operations; + size_t authorized_operations_cnt; SUB_TEST_QUICK( "%s DescribeTopics with %s, request_timeout %d, " @@ -3189,10 +3216,14 @@ static void do_test_DescribeTopics(const char *what, /* Only create one topic, the others will be non-existent. */ for (i = 0; i < TEST_DESCRIBE_TOPICS_CNT; i++) { - rd_strdupa(&topics[i], test_mk_topic_name(__FUNCTION__, 1)); + rd_strdupa(&topic_names[i], + test_mk_topic_name(__FUNCTION__, 1)); } - test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL); - test_wait_topic_exists(rk, topics[0], 10000); + topics = rd_kafka_TopicCollection_new_from_names( + (const char **)topic_names, TEST_DESCRIBE_TOPICS_CNT); + + test_CreateTopics_simple(rk, NULL, topic_names, 1, 1, NULL); + test_wait_topic_exists(rk, topic_names[0], 10000); /* Call DescribeTopics. */ options = @@ -3204,8 +3235,7 @@ static void do_test_DescribeTopics(const char *what, options, include_authorized_operations)); TIMING_START(&timing, "DescribeTopics"); - rd_kafka_DescribeTopics(rk, (const char **)topics, - TEST_DESCRIBE_TOPICS_CNT, options, q); + rd_kafka_DescribeTopics(rk, topics, options, q); TIMING_ASSERT_LATER(&timing, 0, 50); rd_kafka_AdminOptions_destroy(options); @@ -3251,35 +3281,39 @@ static void do_test_DescribeTopics(const char *what, } /* Check fields inside the first (existent) topic. */ - TEST_ASSERT( - strcmp(rd_kafka_TopicDescription_topic_name(result_topics[0]), - topics[0]) == 0, - "Expected topic name %s, got %s", topics[0], - rd_kafka_TopicDescription_topic_name(result_topics[0])); - TEST_ASSERT( - rd_kafka_TopicDescription_topic_partition_count(result_topics[0]) == - 1, - "Expected %d partitions, got %d", 1, - rd_kafka_TopicDescription_topic_partition_count(result_topics[0])); + TEST_ASSERT(strcmp(rd_kafka_TopicDescription_name(result_topics[0]), + topic_names[0]) == 0, + "Expected topic name %s, got %s", topic_names[0], + rd_kafka_TopicDescription_name(result_topics[0])); + + partitions = rd_kafka_TopicDescription_partitions(result_topics[0], + &partitions_cnt); + + TEST_ASSERT(partitions_cnt == 1, "Expected %d partitions, got %" PRIusz, + 1, partitions_cnt); + + TEST_ASSERT(rd_kafka_TopicPartitionInfo_partition(partitions[0]) == 0, + "Expected partion id to be %d, got %d", 0, + rd_kafka_TopicPartitionInfo_partition(partitions[0])); if (include_authorized_operations) { - authorized_operations_cnt = - rd_kafka_TopicDescription_topic_authorized_operation_count( - result_topics[0]); - TEST_ASSERT(authorized_operations_cnt == 8, - "Expected 8 operations allowed before creating " - "ACLs, got %d.", - authorized_operations_cnt); - test_match_authorized_operations( - authorized_operations_cnt, result_topics[0], - rd_kafka_TopicDescription_authorized_operation, 8, + const rd_kafka_AclOperation_t expected[] = { RD_KAFKA_ACL_OPERATION_ALTER, RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS, RD_KAFKA_ACL_OPERATION_CREATE, RD_KAFKA_ACL_OPERATION_DELETE, RD_KAFKA_ACL_OPERATION_DESCRIBE, RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS, - RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_OPERATION_WRITE); + RD_KAFKA_ACL_OPERATION_READ, + RD_KAFKA_ACL_OPERATION_WRITE}; + + authorized_operations = + rd_kafka_TopicDescription_authorized_operations( + result_topics[0], &authorized_operations_cnt); + + xtest_match_authorized_operations(expected, 8, + authorized_operations, + authorized_operations_cnt); } rd_kafka_event_destroy(rkev); @@ -3303,7 +3337,7 @@ static void do_test_DescribeTopics(const char *what, /* Change authorized operations for the principal which we're * using to connect to the broker. */ acl_bindings[0] = rd_kafka_AclBinding_new( - RD_KAFKA_RESOURCE_TOPIC, topics[0], + RD_KAFKA_RESOURCE_TOPIC, topic_names[0], RD_KAFKA_RESOURCE_PATTERN_LITERAL, principal, "*", RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, NULL, 0); @@ -3321,8 +3355,7 @@ static void do_test_DescribeTopics(const char *what, 1)); TIMING_START(&timing, "DescribeTopics"); - rd_kafka_DescribeTopics(rk, (const char **)topics, - TEST_DESCRIBE_TOPICS_CNT, options, q); + rd_kafka_DescribeTopics(rk, topics, options, q); TIMING_ASSERT_LATER(&timing, 0, 50); rd_kafka_AdminOptions_destroy(options); @@ -3356,17 +3389,18 @@ static void do_test_DescribeTopics(const char *what, rd_kafka_error_string(error)); /* Check if ACLs changed. */ - authorized_operations_cnt = - rd_kafka_TopicDescription_topic_authorized_operation_count( - result_topics[0]); - TEST_ASSERT(authorized_operations_cnt == 2, - "Expected 2 operations allowed after creating " - "ACLs, got %d.", - authorized_operations_cnt); - test_match_authorized_operations( - authorized_operations_cnt, result_topics[0], - rd_kafka_TopicDescription_authorized_operation, 2, - RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_OPERATION_DESCRIBE); + { + const rd_kafka_AclOperation_t expected[] = { + RD_KAFKA_ACL_OPERATION_READ, + RD_KAFKA_ACL_OPERATION_DESCRIBE}; + authorized_operations = + rd_kafka_TopicDescription_authorized_operations( + result_topics[0], &authorized_operations_cnt); + + xtest_match_authorized_operations(expected, 2, + authorized_operations, + authorized_operations_cnt); + } rd_kafka_event_destroy(rkev); /* @@ -3375,7 +3409,7 @@ static void do_test_DescribeTopics(const char *what, * and describe. */ acl_bindings[0] = rd_kafka_AclBinding_new( - RD_KAFKA_RESOURCE_TOPIC, topics[0], + RD_KAFKA_RESOURCE_TOPIC, topic_names[0], RD_KAFKA_RESOURCE_PATTERN_LITERAL, principal, "*", RD_KAFKA_ACL_OPERATION_DELETE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, NULL, 0); @@ -3384,10 +3418,12 @@ static void do_test_DescribeTopics(const char *what, rd_kafka_AclBinding_destroy(acl_bindings[0]); done: - test_DeleteTopics_simple(rk, NULL, topics, 1, NULL); + test_DeleteTopics_simple(rk, NULL, topic_names, 1, NULL); if (!rkqu) rd_kafka_queue_destroy(q); + rd_kafka_TopicCollection_destroy(topics); + TEST_LATER_CHECK(); #undef TEST_DESCRIBE_TOPICS_CNT