Skip to content

Commit

Permalink
Address comments for DescribeTopics API
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Aug 31, 2023
1 parent 85f7f91 commit 59ab5bb
Show file tree
Hide file tree
Showing 10 changed files with 507 additions and 281 deletions.
122 changes: 70 additions & 52 deletions examples/describe_topics.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,51 +136,59 @@ int64_t parse_int(const char *what, const char *str) {
return (int64_t)n;
}


/**
* @brief Print partition information.
* @brief Print node information.
*/
static void print_partition_info(const rd_kafka_TopicDescription_t *topic,
int partition_idx) {
rd_kafka_resp_err_t partition_err;
int leader, id, isr_cnt, replica_cnt, k;
partition_err =
rd_kafka_TopicDescription_partition_error(topic, partition_idx);

if (partition_err != RD_KAFKA_RESP_ERR_NO_ERROR) {
printf("\tPartition at index %d has error[%" PRId32 "]: %s\n",
partition_idx, partition_err,
rd_kafka_err2str(partition_err));
static void print_node_info(const rd_kafka_Node_t *node) {
if (!node) {
printf("\t\t(null)\n");
return;
}
printf("\tPartition at index %d succeeded\n", partition_idx);
id = rd_kafka_TopicDescription_partition_id(topic, partition_idx);
leader =
rd_kafka_TopicDescription_partition_leader(topic, partition_idx);
isr_cnt =
rd_kafka_TopicDescription_partition_isr_count(topic, partition_idx);
replica_cnt = rd_kafka_TopicDescription_partition_replica_count(
topic, partition_idx);
printf("\tPartition has id: %d with leader: %d\n", id, leader);

printf("\t\tNode [id: %" PRId32
", host: %s"
", port: %" PRIu16 ", rack %s ]\n",
rd_kafka_Node_id(node), rd_kafka_Node_host(node),
rd_kafka_Node_port(node), rd_kafka_Node_rack_id(node));
}

/**
* @brief Print partition information.
*/
static void
print_partition_info(const rd_kafka_TopicPartitionInfo_t *partition) {
size_t k;
int id;
const rd_kafka_Node_t **isr;
size_t isr_cnt;
const rd_kafka_Node_t **replicas;
size_t replica_cnt;

id = rd_kafka_TopicPartitionInfo_partition(partition);
printf("\tPartition id: %d\n", id);

printf("\tPartition leader: \n");
print_node_info(rd_kafka_TopicPartitionInfo_leader(partition));

isr = rd_kafka_TopicPartitionInfo_isr(partition, &isr_cnt);
if (isr_cnt) {
printf(
"\tThe in-sync replica count is: %d, they "
"are: ",
isr_cnt);
"are: \n",
(int)isr_cnt);
for (k = 0; k < isr_cnt; k++)
printf("%d ", rd_kafka_TopicDescription_partition_isr(
topic, partition_idx, k));
printf("\n");
print_node_info(isr[k]);
} else
printf("\tThe in-sync replica count is 0\n");

replicas = rd_kafka_TopicPartitionInfo_isr(partition, &replica_cnt);
if (replica_cnt) {
printf("\tThe replica count is: %d, they are: ", replica_cnt);
printf(
"\tThe replica count is: %d, they "
"are: \n",
(int)replica_cnt);
for (k = 0; k < replica_cnt; k++)
printf("%d ",
rd_kafka_TopicDescription_partition_replica(
topic, partition_idx, k));
printf("\n");
print_node_info(replicas[k]);
} else
printf("\tThe replica count is 0\n");
}
Expand All @@ -189,14 +197,14 @@ static void print_partition_info(const rd_kafka_TopicDescription_t *topic,
* @brief Print topic information.
*/
static void print_topic_info(const rd_kafka_TopicDescription_t *topic) {
int j, acl_operation;
size_t j;
const rd_kafka_error_t *error;
const char *topic_name = rd_kafka_TopicDescription_topic_name(topic);
int topic_authorized_operations_cnt =
rd_kafka_TopicDescription_topic_authorized_operation_count(topic);
int partition_cnt =
rd_kafka_TopicDescription_topic_partition_count(topic);
error = rd_kafka_TopicDescription_error(topic);
const char *topic_name = rd_kafka_TopicDescription_name(topic);
error = rd_kafka_TopicDescription_error(topic);
const rd_kafka_AclOperation_t *authorized_operations;
size_t authorized_operations_cnt;
const rd_kafka_TopicPartitionInfo_t **partitions;
size_t partition_cnt;

if (rd_kafka_error_code(error)) {
printf("Topic: %s has error[%" PRId32 "]: %s\n", topic_name,
Expand All @@ -205,20 +213,25 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) {
return;
}

authorized_operations = rd_kafka_TopicDescription_authorized_operations(
topic, &authorized_operations_cnt);

printf(
"Topic: %s succeeded, has %d topic authorized operations "
"allowed, they are:\n",
topic_name, topic_authorized_operations_cnt);
for (j = 0; j < topic_authorized_operations_cnt; j++) {
acl_operation =
rd_kafka_TopicDescription_authorized_operation(topic, j);
topic_name, (int)authorized_operations_cnt);

for (j = 0; j < authorized_operations_cnt; j++)
printf("\t%s operation is allowed\n",
rd_kafka_AclOperation_name(acl_operation));
}
rd_kafka_AclOperation_name(authorized_operations[j]));


partitions =
rd_kafka_TopicDescription_partitions(topic, &partition_cnt);

printf("partition count is: %d\n", partition_cnt);
printf("partition count is: %d\n", (int)partition_cnt);
for (j = 0; j < partition_cnt; j++) {
print_partition_info(topic, j);
print_partition_info(partitions[j]);
printf("\n");
}
}
Expand Down Expand Up @@ -257,8 +270,9 @@ static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc,
* topics.
*/
static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {
rd_kafka_t *rk = NULL;
const char **topics = NULL;
rd_kafka_t *rk = NULL;
const char **topic_names = NULL;
rd_kafka_TopicCollection_t *topics = NULL;
char errstr[512];
rd_kafka_AdminOptions_t *options = NULL;
rd_kafka_event_t *event = NULL;
Expand All @@ -277,8 +291,10 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {
include_topic_authorized_operations > 1)
usage("include_topic_authorized_operations not a 0-1 int");

topics = (const char **)&argv[1];
topics_cnt = argc - 1;
topic_names = (const char **)&argv[1];
topics_cnt = argc - 1;
topics =
rd_kafka_TopicCollection_new_from_names(topic_names, topics_cnt);

/*
* Create producer instance
Expand Down Expand Up @@ -315,7 +331,7 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {
}

/* Call DescribeTopics */
rd_kafka_DescribeTopics(rk, topics, topics_cnt, options, queue);
rd_kafka_DescribeTopics(rk, topics, options, queue);

/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /* indefinitely but limited by
Expand Down Expand Up @@ -348,6 +364,8 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {

exit:
/* Cleanup. */
if (topics)
rd_kafka_TopicCollection_destroy(topics);
if (event)
rd_kafka_event_destroy(event);
if (options)
Expand Down
Loading

0 comments on commit 59ab5bb

Please sign in to comment.