Skip to content

Commit

Permalink
Change API for common stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Jun 24, 2023
1 parent 6ddbaba commit 3a9a340
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 247 deletions.
13 changes: 6 additions & 7 deletions examples/describe_cluster.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2022, Magnus Edenhill
* Copyright (c) 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -150,7 +150,8 @@ print_cluster_info(const rd_kafka_DescribeCluster_result_t *clusterdesc) {
controller_id = rd_kafka_ClusterDescription_controller_id(desc);
node_cnt = rd_kafka_ClusterDescription_node_count(desc);
cluster_authorized_operations_cnt =
rd_kafka_ClusterDescription_cluster_authorized_operation_count(desc);
rd_kafka_ClusterDescription_cluster_authorized_operation_count(
desc);
cluster_id = rd_kafka_ClusterDescription_cluster_id(desc);

printf(
Expand All @@ -159,8 +160,7 @@ print_cluster_info(const rd_kafka_DescribeCluster_result_t *clusterdesc) {
cluster_id, controller_id, cluster_authorized_operations_cnt);
for (j = 0; j < cluster_authorized_operations_cnt; j++) {
acl_operation =
rd_kafka_ClusterDescription_authorized_operation(desc,
j);
rd_kafka_ClusterDescription_authorized_operation(desc, j);
printf("\t%s operation is allowed\n",
rd_kafka_AclOperation_name(acl_operation));
}
Expand Down Expand Up @@ -220,9 +220,8 @@ static void cmd_describe_cluster(rd_kafka_conf_t *conf, int argc, char **argv) {
fprintf(stderr, "%% Failed to set timeout: %s\n", errstr);
goto exit;
}
if ((error =
rd_kafka_AdminOptions_set_include_cluster_authorized_operations(
options, include_cluster_authorized_operations))) {
if ((error = rd_kafka_AdminOptions_set_include_authorized_operations(
options, include_cluster_authorized_operations))) {
fprintf(stderr,
"%% Failed to set require cluster authorized "
"operations: %s\n",
Expand Down
6 changes: 5 additions & 1 deletion examples/describe_consumer_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc,
}
return 0;
}

/**
* @brief Parse an integer or fail.
*/
Expand All @@ -263,6 +264,7 @@ int64_t parse_int(const char *what, const char *str) {

return (int64_t)n;
}

/**
* @brief Call rd_kafka_DescribeConsumerGroups() with a list of
* groups.
Expand All @@ -282,7 +284,7 @@ cmd_describe_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {
parse_int("include_authorized_operations", argv[0]);
if (include_authorized_operations < 0 ||
include_authorized_operations > 1)
usage("Require stable not a 0-1 int");
usage("include_authorized_operations not a 0-1 int");

if (argc >= 1) {
groups = (const char **)&argv[1];
Expand Down Expand Up @@ -376,6 +378,7 @@ int main(int argc, char **argv) {
*/
conf = rd_kafka_conf_new();


/*
* Parse common options
*/
Expand Down Expand Up @@ -408,5 +411,6 @@ int main(int argc, char **argv) {
}

cmd_describe_consumer_groups(conf, argc - optind, &argv[optind]);

return 0;
}
21 changes: 10 additions & 11 deletions examples/describe_topics.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2022, Magnus Edenhill
* Copyright (c) 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -187,23 +187,23 @@ static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc,

printf("partition count is: %d\n", partition_cnt);
for (j = 0; j < partition_cnt; j++) {
const rd_kafka_error_t *partition_error;
rd_kafka_resp_err_t partition_err;
int leader, id, isr_cnt, replica_cnt, k;
partition_error =
partition_err =
rd_kafka_TopicDescription_partition_error(topic, j);

if (rd_kafka_error_code(partition_error)) {
if (partition_err != RD_KAFKA_RESP_ERR_NO_ERROR) {
printf(
"\tPartition at index %d has error[%" PRId32
"]: %s\n",
j, rd_kafka_error_code(partition_error),
rd_kafka_error_string(partition_error));
j, partition_err,
rd_kafka_err2str(partition_err));
continue;
}
printf("\tPartition at index %d succeeded\n", j);
id = rd_kafka_TopicDescription_partition_id(topic, j);
leader =
rd_kafka_TopicDescription_partition_leader(topic, j);
leader = rd_kafka_TopicDescription_partition_leader(
topic, j);
isr_cnt = rd_kafka_TopicDescription_partition_isr_count(
topic, j);
replica_cnt =
Expand Down Expand Up @@ -293,9 +293,8 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {
fprintf(stderr, "%% Failed to set timeout: %s\n", errstr);
goto exit;
}
if ((error =
rd_kafka_AdminOptions_set_include_topic_authorized_operations(
options, include_topic_authorized_operations))) {
if ((error = rd_kafka_AdminOptions_set_include_authorized_operations(
options, include_topic_authorized_operations))) {
fprintf(stderr,
"%% Failed to set require topic authorized operations: "
"%s\n",
Expand Down
4 changes: 3 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4794,7 +4794,9 @@ static void rd_kafka_ListGroups_resp_cb(rd_kafka_t *rk,

state->wait_cnt++;
error = rd_kafka_DescribeGroupsRequest(
rkb, 0, grps, i, rd_false, RD_KAFKA_REPLYQ(state->q, 0),
rkb, 0, grps, i,
rd_false /* include_authorized_operations */,
RD_KAFKA_REPLYQ(state->q, 0),
rd_kafka_DescribeGroups_resp_cb, state);
if (error) {
rd_kafka_DescribeGroups_resp_cb(
Expand Down
134 changes: 53 additions & 81 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -5367,9 +5367,9 @@ typedef int rd_kafka_event_type_t;
/** AlterConsumerGroupOffsets_result_t */
#define RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT 0x10000
/** DescribeTopics_result_t */
#define RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT 0x20000
#define RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT 0x100000
/** DescribeCluster_result_t */
#define RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT 0x40000
#define RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT 0x200000


/**
Expand Down Expand Up @@ -6942,15 +6942,16 @@ rd_kafka_error_t *rd_kafka_AdminOptions_set_require_stable_offsets(

/**
* @brief Whether broker should return authorized operations
* (DescribeConsumerGroups).
* (DescribeConsumerGroups, DescribeTopics, DescribeCluster).
*
* @param options Admin options.
* @param true_or_false Defaults to false.
*
* @return NULL on success, a new error instance that must be
* released with rd_kafka_error_destroy() in case of error.
*
* @remark This option is valid for DescribeConsumerGroups.
* @remark This option is valid for DescribeConsumerGroups, DescribeTopics,
* DescribeCluster.
*/
RD_EXPORT
rd_kafka_error_t *rd_kafka_AdminOptions_set_include_authorized_operations(
Expand All @@ -6975,41 +6976,6 @@ rd_kafka_error_t *rd_kafka_AdminOptions_set_match_consumer_group_states(
const rd_kafka_consumer_group_state_t *consumer_group_states,
size_t consumer_group_states_cnt);

/**
* @brief Whether broker should return cluster authorized operations
* (DescribeCluster).
*
* @param options Admin options.
* @param true_or_false Defaults to false.
*
* @return NULL on success, a new error instance that must be
* released with rd_kafka_error_destroy() in case of error.
*
* @remark This option is valid for DescribeCluster.
*/
RD_EXPORT
rd_kafka_error_t *
rd_kafka_AdminOptions_set_include_cluster_authorized_operations(
rd_kafka_AdminOptions_t *options,
int true_or_false);

/**
* @brief Whether broker should return topic authorized operations
* (DescribeTopics).
*
* @param options Admin options.
* @param true_or_false Defaults to false.
*
* @return NULL on success, a new error instance that must be
* released with rd_kafka_error_destroy() in case of error.
*
* @remark This option is valid for DescribeTopics.
*/
RD_EXPORT
rd_kafka_error_t *rd_kafka_AdminOptions_set_include_topic_authorized_operations(
rd_kafka_AdminOptions_t *options,
int true_or_false);

/**
* @brief Set application opaque value that can be extracted from the
* result event using rd_kafka_event_opaque()
Expand All @@ -7018,8 +6984,38 @@ RD_EXPORT void
rd_kafka_AdminOptions_set_opaque(rd_kafka_AdminOptions_t *options,
void *ev_opaque);



/**
* @enum rd_kafka_AclOperation_t
* @brief Apache Kafka ACL operation types. Common type for multiple Admin API
* functions.
*/
typedef enum rd_kafka_AclOperation_t {
RD_KAFKA_ACL_OPERATION_UNKNOWN = 0, /**< Unknown */
RD_KAFKA_ACL_OPERATION_ANY =
1, /**< In a filter, matches any AclOperation */
RD_KAFKA_ACL_OPERATION_ALL = 2, /**< ALL operation */
RD_KAFKA_ACL_OPERATION_READ = 3, /**< READ operation */
RD_KAFKA_ACL_OPERATION_WRITE = 4, /**< WRITE operation */
RD_KAFKA_ACL_OPERATION_CREATE = 5, /**< CREATE operation */
RD_KAFKA_ACL_OPERATION_DELETE = 6, /**< DELETE operation */
RD_KAFKA_ACL_OPERATION_ALTER = 7, /**< ALTER operation */
RD_KAFKA_ACL_OPERATION_DESCRIBE = 8, /**< DESCRIBE operation */
RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION =
9, /**< CLUSTER_ACTION operation */
RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS =
10, /**< DESCRIBE_CONFIGS operation */
RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS =
11, /**< ALTER_CONFIGS operation */
RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE =
12, /**< IDEMPOTENT_WRITE operation */
RD_KAFKA_ACL_OPERATION__CNT
} rd_kafka_AclOperation_t;

/**@}*/


/**
* @name Admin API - Topics
* @brief Topic related operations.
Expand Down Expand Up @@ -7826,7 +7822,7 @@ rd_kafka_DeleteRecords_result_offsets(
/**@}*/

/**
* @name Admin API - DescribeTopic
* @name Admin API - DescribeTopics
* @{
*/

Expand Down Expand Up @@ -7986,31 +7982,31 @@ const rd_kafka_resp_err_t rd_kafka_TopicDescription_partition_error(
int idx);

/**
* @brief Gets operation at idx index of topic authorized operations for the
* \p topicdesc topic.
* @brief Gets the topic authorized acl operations count for the \p topicdesc
* topic.
*
* @param topicdesc The topic description.
* @param idx The index for which element is needed.
*
* @return Authorized operation at given index.
* @return The topic authorized operations count.
*
*/
RD_EXPORT
const int rd_kafka_TopicDescription_authorized_operation(
const rd_kafka_TopicDescription_t *topicdesc,
size_t idx);
const int rd_kafka_TopicDescription_topic_authorized_operation_count(
const rd_kafka_TopicDescription_t *topicdesc);

/**
* @brief Gets the topic authorized acl operations count for the \p topicdesc
* topic.
* @brief Gets operation at idx index of topic authorized operations for the
* \p topicdesc topic.
*
* @param topicdesc The topic description.
* @param idx The index for which element is needed.
*
* @return The topic authorized operations count.
*
* @return Authorized operation at given index.
*/
RD_EXPORT
const int rd_kafka_TopicDescription_topic_authorized_operation_count(
const rd_kafka_TopicDescription_t *topicdesc);
const rd_kafka_AclOperation_t rd_kafka_TopicDescription_authorized_operation(
const rd_kafka_TopicDescription_t *topicdesc,
size_t idx);

/**
* @brief Gets the topic name for the \p topicdesc topic.
Expand Down Expand Up @@ -8040,6 +8036,7 @@ RD_EXPORT
const rd_kafka_error_t *
rd_kafka_TopicDescription_error(const rd_kafka_TopicDescription_t *topicdesc);


/**@}*/

/**
Expand Down Expand Up @@ -8131,7 +8128,7 @@ const int rd_kafka_ClusterDescription_cluster_authorized_operation_count(
* @return Authorized operation at given index.
*/
RD_EXPORT
const int rd_kafka_ClusterDescription_authorized_operation(
const rd_kafka_AclOperation_t rd_kafka_ClusterDescription_authorized_operation(
const rd_kafka_ClusterDescription_t *clusterdesc,
size_t idx);

Expand Down Expand Up @@ -8162,6 +8159,7 @@ const char *rd_kafka_ClusterDescription_cluster_id(

/**@}*/


/**
* @name Admin API - ListConsumerGroups
* @{
Expand Down Expand Up @@ -8406,7 +8404,7 @@ size_t rd_kafka_ConsumerGroupDescription_authorized_operation_count(
* @return Authorized operation at given index.
*/
RD_EXPORT
int rd_kafka_ConsumerGroupDescription_authorized_operation(
rd_kafka_AclOperation_t rd_kafka_ConsumerGroupDescription_authorized_operation(
const rd_kafka_ConsumerGroupDescription_t *grpdesc,
size_t idx);

Expand Down Expand Up @@ -8937,32 +8935,6 @@ RD_EXPORT const rd_kafka_error_t *
rd_kafka_acl_result_error(const rd_kafka_acl_result_t *aclres);


/**
* @enum rd_kafka_AclOperation_t
* @brief Apache Kafka ACL operation types.
*/
typedef enum rd_kafka_AclOperation_t {
RD_KAFKA_ACL_OPERATION_UNKNOWN = 0, /**< Unknown */
RD_KAFKA_ACL_OPERATION_ANY =
1, /**< In a filter, matches any AclOperation */
RD_KAFKA_ACL_OPERATION_ALL = 2, /**< ALL operation */
RD_KAFKA_ACL_OPERATION_READ = 3, /**< READ operation */
RD_KAFKA_ACL_OPERATION_WRITE = 4, /**< WRITE operation */
RD_KAFKA_ACL_OPERATION_CREATE = 5, /**< CREATE operation */
RD_KAFKA_ACL_OPERATION_DELETE = 6, /**< DELETE operation */
RD_KAFKA_ACL_OPERATION_ALTER = 7, /**< ALTER operation */
RD_KAFKA_ACL_OPERATION_DESCRIBE = 8, /**< DESCRIBE operation */
RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION =
9, /**< CLUSTER_ACTION operation */
RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS =
10, /**< DESCRIBE_CONFIGS operation */
RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS =
11, /**< ALTER_CONFIGS operation */
RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE =
12, /**< IDEMPOTENT_WRITE operation */
RD_KAFKA_ACL_OPERATION__CNT
} rd_kafka_AclOperation_t;

/**
* @returns a string representation of the \p acl_operation
*/
Expand Down
Loading

0 comments on commit 3a9a340

Please sign in to comment.