diff --git a/src/rdkafka.h b/src/rdkafka.h index f9a9a7a0f6..2c8bb93bab 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -8147,7 +8147,8 @@ const rd_kafka_Node_t **rd_kafka_TopicPartitionInfo_replicas( * @param topicdesc The topic description. * @param cntp is updated with authorized ACL operations count. * - * @return The topic authorized operations. + * @return The topic authorized operations. Is NULL if operations were not + * requested. * * @remark The lifetime of the returned memory is the same * as the lifetime of the \p topicdesc object. @@ -8242,7 +8243,8 @@ const rd_kafka_Node_t **rd_kafka_DescribeCluster_result_nodes( * @param result The result of DescribeCluster. * @param cntp is updated with authorized ACL operations count. * - * @return The cluster authorized operations. + * @return The cluster authorized operations. Is NULL if operations were not + * requested. * @remark The lifetime of the returned memory is the same * as the lifetime of the \p result object. */ @@ -8509,7 +8511,8 @@ const char *rd_kafka_ConsumerGroupDescription_partition_assignor( * @param grpdesc The group description. * @param cntp is updated with authorized ACL operations count. * - * @return The group authorized operations. + * @return The group authorized operations. Is NULL if operations were not + * requested. * * @remark The lifetime of the returned memory is the same * as the lifetime of the \p grpdesc object. diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 9fca65b599..db0e12e09e 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -7107,12 +7107,12 @@ const rd_kafka_error_t **rd_kafka_ListConsumerGroups_result_errors( * * @param authorized_operations returned by RPC, containing operations encoded * per-bit. - * @param cntp is set to the count of the operations. - * @returns rd_kafka_AclOperation_t * + * @param cntp is set to the count of the operations, or -1 if the operations + * were not requested. + * @returns rd_kafka_AclOperation_t *. May be NULL. */ static rd_kafka_AclOperation_t * -rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations, - size_t *cntp) { +rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations, int *cntp) { rd_kafka_AclOperation_t i; int j = 0; int count = 0; @@ -7120,7 +7120,7 @@ rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations, /* In case of authorized_operations not requested, return NULL. */ if (authorized_operations < 0) { - *cntp = 0; + *cntp = -1; return NULL; } @@ -7131,11 +7131,14 @@ rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations, count += ((authorized_operations >> i) & 1); *cntp = count; - if (!count) - return operations; + /* In case no operations exist, allocate 1 byte so that the returned + * pointer is non-NULL. A NULL pointer implies that authorized + * operations were not requested. */ + if (count == 0) + return rd_malloc(1); - j = 0; operations = rd_malloc(sizeof(rd_kafka_AclOperation_t) * count); + j = 0; for (i = RD_KAFKA_ACL_OPERATION_READ; i < RD_KAFKA_ACL_OPERATION__CNT; i++) { if ((authorized_operations >> i) & 1) { @@ -7147,6 +7150,38 @@ rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations, return operations; } +/** + * @brief Copy a list of rd_kafka_AclOperation_t. + * + * @param src Array of rd_kafka_AclOperation_t to copy from. May be NULL if + * authorized operations were not requested. + * @param authorized_operations_cnt Count of \p src. May be -1 if authorized + * operations were not requested. + * @returns Copy of \p src. May be NULL. + */ +static rd_kafka_AclOperation_t * +rd_kafka_AuthorizedOperations_copy(const rd_kafka_AclOperation_t *src, + int authorized_operations_cnt) { + size_t copy_bytes = 0; + rd_kafka_AclOperation_t *dst = NULL; + + if (authorized_operations_cnt == -1 || src == NULL) + return NULL; + + /* Allocate and copy 1 byte so that the returned pointer + * is non-NULL. A NULL pointer implies that authorized operations were + * not requested. */ + if (authorized_operations_cnt == 0) + copy_bytes = 1; + else + copy_bytes = + sizeof(rd_kafka_AclOperation_t) * authorized_operations_cnt; + + dst = rd_malloc(copy_bytes); + memcpy(dst, src, copy_bytes); + return dst; +} + /** * @brief Create a new MemberDescription object. This object is used for * creating a ConsumerGroupDescription. @@ -7279,7 +7314,7 @@ rd_kafka_ConsumerGroupDescription_new( const rd_list_t *members, const char *partition_assignor, const rd_kafka_AclOperation_t *authorized_operations, - size_t authorized_operations_cnt, + int authorized_operations_cnt, rd_kafka_consumer_group_state_t state, const rd_kafka_Node_t *coordinator, rd_kafka_error_t *error) { @@ -7300,14 +7335,8 @@ rd_kafka_ConsumerGroupDescription_new( : rd_strdup(partition_assignor); grpdesc->authorized_operations_cnt = authorized_operations_cnt; - if (authorized_operations_cnt) { - grpdesc->authorized_operations = - rd_malloc(sizeof(rd_kafka_AclOperation_t) * - authorized_operations_cnt); - memcpy(grpdesc->authorized_operations, authorized_operations, - sizeof(rd_kafka_AclOperation_t) * - authorized_operations_cnt); - } + grpdesc->authorized_operations = rd_kafka_AuthorizedOperations_copy( + authorized_operations, authorized_operations_cnt); grpdesc->state = state; if (coordinator != NULL) @@ -7406,7 +7435,7 @@ const rd_kafka_AclOperation_t * rd_kafka_ConsumerGroupDescription_authorized_operations( const rd_kafka_ConsumerGroupDescription_t *grpdesc, size_t *cntp) { - *cntp = grpdesc->authorized_operations_cnt; + *cntp = RD_MAX(grpdesc->authorized_operations_cnt, 0); return grpdesc->authorized_operations; } @@ -7557,7 +7586,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, char *group_id = NULL, *group_state = NULL, *proto_type = NULL, *proto = NULL, *host = NULL; rd_kafka_AclOperation_t *operations = NULL; - size_t operation_cnt; + int operation_cnt; api_version = rd_kafka_buf_ApiVersion(reply); if (api_version >= 1) { @@ -7579,7 +7608,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, node = rd_kafka_Node_new(nodeid, host, port, NULL); while (cnt-- > 0) { int16_t error_code; - int32_t authorized_operations = 0; + int32_t authorized_operations = -1; rd_kafkap_str_t GroupId, GroupState, ProtocolType, ProtocolData; rd_bool_t is_simple_consumer_group, is_consumer_protocol_type; int32_t member_cnt; @@ -7996,7 +8025,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( const rd_kafka_metadata_broker_internal_t *brokers_internal, int broker_cnt, const rd_kafka_AclOperation_t *authorized_operations, - size_t authorized_operations_cnt, + int authorized_operations_cnt, rd_bool_t is_internal, rd_kafka_error_t *error) { rd_kafka_TopicDescription_t *topicdesc; @@ -8009,14 +8038,8 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( topicdesc->error = rd_kafka_error_copy(error); topicdesc->authorized_operations_cnt = authorized_operations_cnt; - if (authorized_operations_cnt) { - topicdesc->authorized_operations = - rd_malloc(sizeof(rd_kafka_AclOperation_t) * - authorized_operations_cnt); - memcpy(topicdesc->authorized_operations, authorized_operations, - sizeof(rd_kafka_AclOperation_t) * - authorized_operations_cnt); - } + topicdesc->authorized_operations = rd_kafka_AuthorizedOperations_copy( + authorized_operations, authorized_operations_cnt); if (partitions) { topicdesc->partitions = @@ -8099,7 +8122,7 @@ const rd_kafka_TopicPartitionInfo_t **rd_kafka_TopicDescription_partitions( const rd_kafka_AclOperation_t *rd_kafka_TopicDescription_authorized_operations( const rd_kafka_TopicDescription_t *topicdesc, size_t *cntp) { - *cntp = topicdesc->authorized_operations_cnt; + *cntp = RD_MAX(topicdesc->authorized_operations_cnt, 0); return topicdesc->authorized_operations; } @@ -8211,7 +8234,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, if (md->topics[i].err == RD_KAFKA_RESP_ERR_NO_ERROR) { rd_kafka_AclOperation_t *authorized_operations; - size_t authorized_operation_cnt; + int authorized_operation_cnt; authorized_operations = rd_kafka_AuthorizedOperations_parse( mdi->topics[i].topic_authorized_operations, @@ -8365,7 +8388,7 @@ rd_kafka_DescribeCluster_result_authorized_operations( size_t *cntp) { const rd_kafka_ClusterDescription_t *clusterdesc = rd_kafka_DescribeCluster_result_description(result); - *cntp = clusterdesc->authorized_operations_cnt; + *cntp = RD_MAX(clusterdesc->authorized_operations_cnt, 0); return clusterdesc->authorized_operations; } diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 0cc69a383b..4eb015fad0 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -482,9 +482,11 @@ struct rd_kafka_ConsumerGroupDescription_s { rd_kafka_consumer_group_state_t state; /** Consumer group coordinator. */ rd_kafka_Node_t *coordinator; - /** Count of operations allowed for topic.*/ - size_t authorized_operations_cnt; - /** Operations allowed for topic. */ + /** Count of operations allowed for topic. -1 indicates operations not + * requested.*/ + int authorized_operations_cnt; + /** Operations allowed for topic. May be NULL if operations were not + * requested */ rd_kafka_AclOperation_t *authorized_operations; /** Group specific error. */ rd_kafka_error_t *error; @@ -527,11 +529,13 @@ struct rd_kafka_TopicDescription_s { int partition_cnt; /**< Number of partitions in \p partitions*/ rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ rd_kafka_TopicPartitionInfo_t **partitions; /**< Partitions */ - rd_kafka_error_t *error; /**< Topic error reported by broker */ - size_t authorized_operations_cnt; /**< Count of operations allowed for - topic.*/ + rd_kafka_error_t *error; /**< Topic error reported by broker */ + int authorized_operations_cnt; /**< Count of operations allowed for + * topic. -1 indicates operations not + * requested. */ rd_kafka_AclOperation_t - *authorized_operations; /**< Operations allowed for topic. */ + *authorized_operations; /**< Operations allowed for topic. May be + * NULL if operations were not requested */ }; /**@}*/ @@ -544,14 +548,16 @@ struct rd_kafka_TopicDescription_s { * @struct DescribeCluster result - internal type. */ typedef struct rd_kafka_ClusterDescription_s { - char *cluster_id; /**< Cluster id */ - rd_kafka_Node_t *controller; /**< Current controller. */ - size_t node_cnt; /**< Count of brokers in the cluster. */ - rd_kafka_Node_t **nodes; /**< Brokers in the cluster. */ - size_t authorized_operations_cnt; /**< Count of operations allowed for - cluster.*/ + char *cluster_id; /**< Cluster id */ + rd_kafka_Node_t *controller; /**< Current controller. */ + size_t node_cnt; /**< Count of brokers in the cluster. */ + rd_kafka_Node_t **nodes; /**< Brokers in the cluster. */ + int authorized_operations_cnt; /**< Count of operations allowed for + * cluster. -1 indicates operations not + * requested. */ rd_kafka_AclOperation_t - *authorized_operations; /**< Operations allowed for cluster. */ + *authorized_operations; /**< Operations allowed for cluster. May be + * NULL if operations were not requested */ } rd_kafka_ClusterDescription_t; diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 69df84946c..2b8cbb3ea2 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -2987,11 +2987,16 @@ static void do_test_DescribeConsumerGroups(const char *what, rd_kafka_ConsumerGroupDescription_error(act)); rd_kafka_consumer_group_state_t state = rd_kafka_ConsumerGroupDescription_state(act); - rd_kafka_ConsumerGroupDescription_authorized_operations( - act, &authorized_operation_cnt); + const rd_kafka_AclOperation_t *authorized_operations = + rd_kafka_ConsumerGroupDescription_authorized_operations( + act, &authorized_operation_cnt); TEST_ASSERT( authorized_operation_cnt == 0, - "Authorized operations returned when not requested\n"); + "Authorized operation count should be 0, is %" PRIusz, + authorized_operation_cnt); + TEST_ASSERT( + authorized_operations == NULL, + "Authorized operations should be NULL when not requested"); TEST_ASSERT( strcmp(exp->group_id, rd_kafka_ConsumerGroupDescription_group_id(act)) == @@ -3266,6 +3271,8 @@ static void do_test_DescribeTopics(const char *what, "Expected partion id to be %d, got %d", 0, rd_kafka_TopicPartitionInfo_partition(partitions[0])); + authorized_operations = rd_kafka_TopicDescription_authorized_operations( + result_topics[0], &authorized_operations_cnt); if (include_authorized_operations) { const rd_kafka_AclOperation_t expected[] = { RD_KAFKA_ACL_OPERATION_ALTER, @@ -3277,14 +3284,19 @@ static void do_test_DescribeTopics(const char *what, RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_OPERATION_WRITE}; - authorized_operations = - rd_kafka_TopicDescription_authorized_operations( - result_topics[0], &authorized_operations_cnt); - test_match_authorized_operations(expected, 8, authorized_operations, authorized_operations_cnt); + } else { + TEST_ASSERT( + authorized_operations_cnt == 0, + "Authorized operation count should be 0, is %" PRIusz, + authorized_operations_cnt); + TEST_ASSERT( + authorized_operations == NULL, + "Authorized operations should be NULL when not requested"); } + rd_kafka_event_destroy(rkev); /* If we don't have authentication/authorization set up in our @@ -3481,6 +3493,9 @@ static void do_test_DescribeCluster(const char *what, TEST_ASSERT(rd_kafka_Node_port(nodes[0]), "Expected first node of cluster to have a port"); + authorized_operations = + rd_kafka_DescribeCluster_result_authorized_operations( + res, &authorized_operations_cnt); if (include_authorized_operations) { const rd_kafka_AclOperation_t expected[] = { RD_KAFKA_ACL_OPERATION_ALTER, @@ -3490,12 +3505,18 @@ static void do_test_DescribeCluster(const char *what, RD_KAFKA_ACL_OPERATION_DESCRIBE, RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS, RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE}; - authorized_operations = - rd_kafka_DescribeCluster_result_authorized_operations( - res, &authorized_operations_cnt); + test_match_authorized_operations(expected, 7, authorized_operations, authorized_operations_cnt); + } else { + TEST_ASSERT( + authorized_operations_cnt == 0, + "Authorized operation count should be 0, is %" PRIusz, + authorized_operations_cnt); + TEST_ASSERT( + authorized_operations == NULL, + "Authorized operations should be NULL when not requested"); } rd_kafka_event_destroy(rkev);