Skip to content

Commit

Permalink
Fix calls with empty list (#4480)
Browse files Browse the repository at this point in the history
DescribeTopics and ListOffsets should return an empty array if and empty array is passed as parameter.
Consistent with Java and user friendly for users using filter and map.
  • Loading branch information
emasab authored Oct 24, 2023
1 parent af11adc commit 95a542c
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 65 deletions.
4 changes: 2 additions & 2 deletions examples/describe_topics.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc,
fprintf(stderr, "No matching topics found\n");
return 1;
} else {
fprintf(stderr, "No topics in cluster\n");
fprintf(stderr, "No topics requested\n");
}
}

Expand All @@ -281,7 +281,7 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {
rd_kafka_error_t *error;
int retval = 0;
int topics_cnt = 0;
const int min_argc = 2;
const int min_argc = 1;
int include_topic_authorized_operations;

if (argc < min_argc)
Expand Down
17 changes: 14 additions & 3 deletions examples/list_offsets.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,21 @@ static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) {
* @brief Print list offsets result information.
*/
static int
print_list_offsets_result_info(const rd_kafka_ListOffsets_result_t *result) {
print_list_offsets_result_info(const rd_kafka_ListOffsets_result_t *result,
int req_cnt) {
const rd_kafka_ListOffsetsResultInfo_t **result_infos;
size_t cnt;
size_t i;
result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt);
printf("ListOffsets results:\n");
if (cnt == 0) {
if (req_cnt > 0) {
fprintf(stderr, "No matching partitions found\n");
return 1;
} else {
fprintf(stderr, "No partitions requested\n");
}
}
for (i = 0; i < cnt; i++) {
const rd_kafka_topic_partition_t *topic_partition =
rd_kafka_ListOffsetsResultInfo_topic_partition(
Expand Down Expand Up @@ -177,7 +186,8 @@ static void cmd_list_offsets(rd_kafka_conf_t *conf, int argc, char **argv) {
rd_kafka_event_t *event = NULL;
rd_kafka_error_t *error = NULL;
int i;
int retval = 0;
int retval = 0;
int partitions = 0;
rd_kafka_topic_partition_list_t *rktpars;

if ((argc - 1) % 3 != 0) {
Expand All @@ -193,6 +203,7 @@ static void cmd_list_offsets(rd_kafka_conf_t *conf, int argc, char **argv) {
rktpars, argv[i], parse_int("partition", argv[i + 1]))
->offset = parse_int("offset", argv[i + 2]);
}
partitions = rktpars->cnt;

/*
* Create consumer instance
Expand Down Expand Up @@ -254,7 +265,7 @@ static void cmd_list_offsets(rd_kafka_conf_t *conf, int argc, char **argv) {
* partitions may have errors. */
const rd_kafka_ListOffsets_result_t *result;
result = rd_kafka_event_ListOffsets_result(event);
retval = print_list_offsets_result_info(result);
retval = print_list_offsets_result_info(result, partitions);
}


Expand Down
145 changes: 96 additions & 49 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -4450,28 +4450,44 @@ void rd_kafka_ListOffsets(rd_kafka_t *rk,
rd_kafka_admin_request_op_result_cb_set(
rko_fanout, rd_kafka_ListOffsets_handle_result);

if (topic_partitions->cnt == 0) {
rd_kafka_admin_result_fail(
rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG,
"At least one partition is required");
goto err;
}
if (topic_partitions->cnt) {
for (i = 0; i < topic_partitions->cnt; i++) {
if (!topic_partitions->elems[i].topic[0]) {
rd_kafka_admin_result_fail(
rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG,
"Partition topic name at index %d must be "
"non-empty",
i);
goto err;
}
if (topic_partitions->elems[i].partition < 0) {
rd_kafka_admin_result_fail(
rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG,
"Partition at index %d cannot be negative",
i);
goto err;
}
}

topic_partitions_sorted = rd_list_new(
topic_partitions->cnt, rd_kafka_topic_partition_destroy_free);
for (i = 0; i < topic_partitions->cnt; i++)
rd_list_add(
topic_partitions_sorted,
rd_kafka_topic_partition_copy(&topic_partitions->elems[i]));

rd_list_sort(topic_partitions_sorted, rd_kafka_topic_partition_cmp);
if (rd_list_find_duplicate(topic_partitions_sorted,
rd_kafka_topic_partition_cmp)) {
topic_partitions_sorted =
rd_list_new(topic_partitions->cnt,
rd_kafka_topic_partition_destroy_free);
for (i = 0; i < topic_partitions->cnt; i++)
rd_list_add(topic_partitions_sorted,
rd_kafka_topic_partition_copy(
&topic_partitions->elems[i]));

rd_kafka_admin_result_fail(
rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG,
"Partitions must not contain duplicates");
goto err;
rd_list_sort(topic_partitions_sorted,
rd_kafka_topic_partition_cmp);
if (rd_list_find_duplicate(topic_partitions_sorted,
rd_kafka_topic_partition_cmp)) {

rd_kafka_admin_result_fail(
rko_fanout, RD_KAFKA_RESP_ERR__INVALID_ARG,
"Partitions must not contain duplicates");
goto err;
}
}

for (i = 0; i < topic_partitions->cnt; i++) {
Expand All @@ -4493,14 +4509,24 @@ void rd_kafka_ListOffsets(rd_kafka_t *rk,
rd_list_add(&rko_fanout->rko_u.admin_request.args,
copied_topic_partitions);

/* Async query for partition leaders */
rd_kafka_topic_partition_list_query_leaders_async(
rk, copied_topic_partitions,
rd_kafka_admin_timeout_remains(rko_fanout),
RD_KAFKA_REPLYQ(rk->rk_ops, 0),
rd_kafka_ListOffsets_leaders_queried_cb, rko_fanout);
if (topic_partitions->cnt) {
/* Async query for partition leaders */
rd_kafka_topic_partition_list_query_leaders_async(
rk, copied_topic_partitions,
rd_kafka_admin_timeout_remains(rko_fanout),
RD_KAFKA_REPLYQ(rk->rk_ops, 0),
rd_kafka_ListOffsets_leaders_queried_cb, rko_fanout);
} else {
/* Empty list */
rd_kafka_op_t *rko_result =
rd_kafka_admin_result_new(rko_fanout);
/* Enqueue empty result on application queue, we're done. */
rd_kafka_admin_result_enq(rko_fanout, rko_result);
rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
rd_true /*destroy*/);
}

rd_list_destroy(topic_partitions_sorted);
RD_IF_FREE(topic_partitions_sorted, rd_list_destroy);
return;
err:
RD_IF_FREE(topic_partitions_sorted, rd_list_destroy);
Expand Down Expand Up @@ -8721,39 +8747,60 @@ void rd_kafka_DescribeTopics(rd_kafka_t *rk,
rk, RD_KAFKA_OP_DESCRIBETOPICS,
RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT, &cbs, options, rkqu->rkqu_q);

if (topics->topics_cnt == 0) {
rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG,
"No topics to describe");
rd_kafka_admin_common_worker_destroy(rk, rko,
rd_true /*destroy*/);
return;
}

rd_list_init(&rko->rko_u.admin_request.args, (int)topics->topics_cnt,
rd_free);
for (i = 0; i < topics->topics_cnt; i++)
rd_list_add(&rko->rko_u.admin_request.args,
rd_strdup(topics->topics[i]));

/* Check for duplicates.
* Make a temporary copy of the topic list and sort it to check for
* duplicates, we don't want the original list sorted since we want
* to maintain ordering. */
rd_list_init(&dup_list, rd_list_cnt(&rko->rko_u.admin_request.args),
NULL);
rd_list_copy_to(&dup_list, &rko->rko_u.admin_request.args, NULL, NULL);
rd_list_sort(&dup_list, rd_kafka_DescribeTopics_cmp);
if (rd_list_find_duplicate(&dup_list, rd_kafka_DescribeTopics_cmp)) {
if (rd_list_cnt(&rko->rko_u.admin_request.args)) {
int j;
char *topic_name;
/* Check for duplicates.
* Make a temporary copy of the topic list and sort it to check
* for duplicates, we don't want the original list sorted since
* we want to maintain ordering. */
rd_list_init(&dup_list,
rd_list_cnt(&rko->rko_u.admin_request.args), NULL);
rd_list_copy_to(&dup_list, &rko->rko_u.admin_request.args, NULL,
NULL);
rd_list_sort(&dup_list, rd_kafka_DescribeTopics_cmp);
if (rd_list_find_duplicate(&dup_list,
rd_kafka_DescribeTopics_cmp)) {
rd_list_destroy(&dup_list);
rd_kafka_admin_result_fail(
rko, RD_KAFKA_RESP_ERR__INVALID_ARG,
"Duplicate topics not allowed");
rd_kafka_admin_common_worker_destroy(
rk, rko, rd_true /*destroy*/);
return;
}

/* Check for empty topics. */
RD_LIST_FOREACH(topic_name, &rko->rko_u.admin_request.args, j) {
if (!topic_name[0]) {
rd_list_destroy(&dup_list);
rd_kafka_admin_result_fail(
rko, RD_KAFKA_RESP_ERR__INVALID_ARG,
"Empty topic name at index %d isn't "
"allowed",
j);
rd_kafka_admin_common_worker_destroy(
rk, rko, rd_true /*destroy*/);
return;
}
}

rd_list_destroy(&dup_list);
rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG,
"Duplicate topics not allowed");
rd_kafka_q_enq(rk->rk_ops, rko);
} else {
/* Empty list */
rd_kafka_op_t *rko_result = rd_kafka_admin_result_new(rko);
/* Enqueue empty result on application queue, we're done. */
rd_kafka_admin_result_enq(rko, rko_result);
rd_kafka_admin_common_worker_destroy(rk, rko,
rd_true /*destroy*/);
return;
}

rd_list_destroy(&dup_list);
rd_kafka_q_enq(rk->rk_ops, rko);
}

/**@}*/
Expand Down
74 changes: 63 additions & 11 deletions tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3160,7 +3160,7 @@ static void do_test_DescribeTopics(const char *what,
rd_kafka_queue_t *q;
#define TEST_DESCRIBE_TOPICS_CNT 3
char *topic_names[TEST_DESCRIBE_TOPICS_CNT];
rd_kafka_TopicCollection_t *topics;
rd_kafka_TopicCollection_t *topics, *empty_topics;
rd_kafka_AdminOptions_t *options;
rd_kafka_event_t *rkev;
const rd_kafka_error_t *error;
Expand Down Expand Up @@ -3197,11 +3197,11 @@ static void do_test_DescribeTopics(const char *what,
}
topics = rd_kafka_TopicCollection_of_topic_names(
(const char **)topic_names, TEST_DESCRIBE_TOPICS_CNT);
empty_topics = rd_kafka_TopicCollection_of_topic_names(NULL, 0);

test_CreateTopics_simple(rk, NULL, topic_names, 1, 1, NULL);
test_wait_topic_exists(rk, topic_names[0], 10000);

/* Call DescribeTopics. */
options =
rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DESCRIBETOPICS);
TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout(
Expand All @@ -3210,10 +3210,40 @@ static void do_test_DescribeTopics(const char *what,
rd_kafka_AdminOptions_set_include_authorized_operations(
options, include_authorized_operations));

TIMING_START(&timing, "DescribeTopics");
/* Call DescribeTopics with empty topics. */
TIMING_START(&timing, "DescribeTopics empty");
rd_kafka_DescribeTopics(rk, empty_topics, options, q);
TIMING_ASSERT_LATER(&timing, 0, 50);

/* Check DescribeTopics results. */
rkev = test_wait_admin_result(q, RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT,
tmout_multip(20 * 1000));
TEST_ASSERT(rkev, "Expected DescribeTopicsResult on queue");

/* Extract result. */
res = rd_kafka_event_DescribeTopics_result(rkev);
TEST_ASSERT(res, "Expected DescribeTopics result, not %s",
rd_kafka_event_name(rkev));

err = rd_kafka_event_error(rkev);
errstr2 = rd_kafka_event_error_string(rkev);
TEST_ASSERT(!err, "Expected success, not %s: %s",
rd_kafka_err2name(err), errstr2);

result_topics =
rd_kafka_DescribeTopics_result_topics(res, &result_topics_cnt);

/* Check no result is received. */
TEST_ASSERT((int)result_topics_cnt == 0,
"Expected 0 topics in result, got %d",
(int)result_topics_cnt);

rd_kafka_event_destroy(rkev);

/* Call DescribeTopics with all of them. */
TIMING_START(&timing, "DescribeTopics all");
rd_kafka_DescribeTopics(rk, topics, options, q);
TIMING_ASSERT_LATER(&timing, 0, 50);
rd_kafka_AdminOptions_destroy(options);

/* Check DescribeTopics results. */
rkev = test_wait_admin_result(q, RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT,
Expand Down Expand Up @@ -3302,6 +3332,7 @@ static void do_test_DescribeTopics(const char *what,
"Authorized operations should be NULL when not requested");
}

rd_kafka_AdminOptions_destroy(options);
rd_kafka_event_destroy(rkev);

/* If we don't have authentication/authorization set up in our
Expand Down Expand Up @@ -3410,6 +3441,7 @@ static void do_test_DescribeTopics(const char *what,
rd_kafka_queue_destroy(q);

rd_kafka_TopicCollection_destroy(topics);
rd_kafka_TopicCollection_destroy(empty_topics);


TEST_LATER_CHECK();
Expand Down Expand Up @@ -4946,8 +4978,11 @@ static void do_test_ListOffsets(const char *what,
rd_kafka_event_t *event;
rd_kafka_queue_t *q;
rd_kafka_t *p;
size_t i = 0;
rd_kafka_topic_partition_list_t *topic_partitions;
size_t i = 0, cnt = 0;
rd_kafka_topic_partition_list_t *topic_partitions,
*empty_topic_partitions;
const rd_kafka_ListOffsets_result_t *result;
const rd_kafka_ListOffsetsResultInfo_t **result_infos;
int64_t basetimestamp = 10000000;
int64_t timestamps[] = {
basetimestamp + 100,
Expand Down Expand Up @@ -5011,9 +5046,29 @@ static void do_test_ListOffsets(const char *what,
TEST_CALL_ERROR__(rd_kafka_AdminOptions_set_isolation_level(
options, RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED));

topic_partitions = rd_kafka_topic_partition_list_new(1);
topic_partitions = rd_kafka_topic_partition_list_new(1);
empty_topic_partitions = rd_kafka_topic_partition_list_new(0);
rd_kafka_topic_partition_list_add(topic_partitions, topic, 0);

/* Call ListOffsets with empty partition list */
rd_kafka_ListOffsets(rk, empty_topic_partitions, options, q);
rd_kafka_topic_partition_list_destroy(empty_topic_partitions);
/* Wait for results */
event = rd_kafka_queue_poll(q, -1 /*indefinitely*/);
if (!event)
TEST_FAIL("Event missing");

TEST_CALL_ERR__(rd_kafka_event_error(event));

result = rd_kafka_event_ListOffsets_result(event);
result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt);
rd_kafka_event_destroy(event);

TEST_ASSERT(!cnt,
"Expected empty result info array, got %" PRIusz
" result infos",
cnt);

for (i = 0; i < RD_ARRAY_SIZE(test_fixtures); i++) {
rd_bool_t retry = rd_true;
rd_kafka_topic_partition_list_t *topic_partitions_copy;
Expand All @@ -5036,6 +5091,7 @@ static void do_test_ListOffsets(const char *what,
topic_partitions_copy->elems[0].offset = test_fixture.query;

while (retry) {
size_t j;
rd_kafka_resp_err_t err;
/* Call ListOffsets */
rd_kafka_ListOffsets(rk, topic_partitions_copy, options,
Expand All @@ -5056,10 +5112,6 @@ static void do_test_ListOffsets(const char *what,
rd_kafka_err2name(err));
}

const rd_kafka_ListOffsets_result_t *result;
const rd_kafka_ListOffsetsResultInfo_t **result_infos;
size_t cnt;
size_t j;
result = rd_kafka_event_ListOffsets_result(event);
result_infos =
rd_kafka_ListOffsets_result_infos(result, &cnt);
Expand Down

0 comments on commit 95a542c

Please sign in to comment.