diff --git a/INTRODUCTION.md b/INTRODUCTION.md index 7d96f8b8fd..7fbe9adbcf 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1975,7 +1975,7 @@ release of librdkafka. | 0 | Produce | 9 | 7 | | 1 | Fetch | 15 | 11 | | 2 | ListOffsets | 8 | 5 | -| 3 | Metadata | 12 | 9 | +| 3 | Metadata | 12 | 12 | | 8 | OffsetCommit | 8 | 7 | | 9 | OffsetFetch | 8 | 7 | | 10 | FindCoordinator | 4 | 2 | diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 33481ba1ac..cbcff5000a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,6 +8,7 @@ set( rdbuf.c rdcrc32.c rdfnv1a.c + rdbase64.c rdkafka.c rdkafka_assignor.c rdkafka_broker.c diff --git a/src/Makefile b/src/Makefile index 26df5723b8..1c43f0b017 100644 --- a/src/Makefile +++ b/src/Makefile @@ -52,7 +52,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \ rdkafka_msgset_writer.c rdkafka_msgset_reader.c \ rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \ rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \ - rdkafka_txnmgr.c rdkafka_coord.c \ + rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \ rdvarint.c rdbuf.c rdmap.c rdunittest.c \ rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \ rdkafka_error.c rdkafka_fetcher.c \ diff --git a/src/rdbase64.c b/src/rdbase64.c new file mode 100644 index 0000000000..81c386ed80 --- /dev/null +++ b/src/rdbase64.c @@ -0,0 +1,130 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2023 Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdbase64.h" + +#if WITH_SSL +#include +#endif + +/** + * @brief Base64 encode binary input \p in, and write base64-encoded string + * and it's size to \p out. out->ptr will be NULL in case of some issue + * with the conversion or the conversion is not supported. + * + * @post out->ptr must be freed after use. + */ +void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) { + +#if !WITH_SSL + out->ptr = NULL; + return; +#endif + + size_t max_len; + + /* OpenSSL takes an |int| argument so the input cannot exceed that. */ + if (in->size > INT_MAX) { + out->ptr = NULL; + return; + } + + max_len = (((in->size + 2) / 3) * 4) + 1; + out->ptr = rd_malloc(max_len); + if (out->ptr == NULL) + return; + + out->size = EVP_EncodeBlock((uint8_t *)out->ptr, (uint8_t *)in->ptr, + (int)in->size); + + rd_assert(out->size < max_len); + out->ptr[out->size] = 0; +} + + +/** + * @brief Base64 encode binary input \p in. + * @returns a newly allocated, base64-encoded string or NULL in case of some + * issue with the conversion or the conversion is not supported. + * + * @post Returned string must be freed after use. + */ +char *rd_base64_encode_str(const rd_chariov_t *in) { + rd_chariov_t out; + rd_base64_encode(in, &out); + return out.ptr; +} + + +/** + * @brief Base64 decode input string \p in. Ignores leading and trailing + * whitespace. + * @returns * 0 on successes in which case a newly allocated binary string is + * set in out (and size). + * * -1 on invalid Base64. + * * -2 on conversion not supported. + */ +int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out) { + +#if !WITH_SSL + return -2; +#endif + + size_t ret_len; + + /* OpenSSL takes an |int| argument, so |in->size| must not exceed + * that. */ + if (in->size % 4 != 0 || in->size > INT_MAX) { + return -1; + } + + ret_len = ((in->size / 4) * 3); + out->ptr = rd_malloc(ret_len + 1); + + if (EVP_DecodeBlock((uint8_t *)out->ptr, (uint8_t *)in->ptr, + (int)in->size) == -1) { + rd_free(out->ptr); + out->ptr = NULL; + return -1; + } + + /* EVP_DecodeBlock will pad the output with trailing NULs and count + * them in the return value. */ + if (in->size > 1 && in->ptr[in->size - 1] == '=') { + if (in->size > 2 && in->ptr[in->size - 2] == '=') { + ret_len -= 2; + } else { + ret_len -= 1; + } + } + + out->ptr[ret_len] = 0; + out->size = ret_len; + + return 0; +} \ No newline at end of file diff --git a/src/rdbase64.h b/src/rdbase64.h new file mode 100644 index 0000000000..fd9e7a209f --- /dev/null +++ b/src/rdbase64.h @@ -0,0 +1,41 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2023 Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#ifndef _RDBASE64_H_ +#define _RDBASE64_H_ + +#include "rd.h" + +void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out); + +char *rd_base64_encode_str(const rd_chariov_t *in); + +int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out); + +#endif /* _RDBASE64_H_ */ \ No newline at end of file diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index 7845beff90..ccd563cc6c 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -1206,7 +1206,6 @@ rd_kafka_buf_update_i64(rd_kafka_buf_t *rkbuf, size_t of, int64_t v) { rd_kafka_buf_update(rkbuf, of, &v, sizeof(v)); } - /** * @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer. * @@ -1428,4 +1427,21 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf, rd_kafka_make_req_cb_t *make_cb, void *make_opaque, void (*free_make_opaque_cb)(void *make_opaque)); + + +#define rd_kafka_buf_read_uuid(rkbuf, uuid) \ + do { \ + rd_kafka_buf_read_i64(rkbuf, \ + &((uuid)->most_significant_bits)); \ + rd_kafka_buf_read_i64(rkbuf, \ + &((uuid)->least_significant_bits)); \ + (uuid)->base64str[0] = '\0'; \ + } while (0) + +static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf, + rd_kafka_uuid_t *uuid) { + rd_kafka_buf_write_i64(rkbuf, uuid->most_significant_bits); + rd_kafka_buf_write_i64(rkbuf, uuid->least_significant_bits); +} + #endif /* _RDKAFKA_BUF_H_ */ diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index f6d2bfbb49..f96edf6583 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -593,6 +593,11 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err); rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, md->topics[i].topic); + + if (ApiVersion >= 10) { + rd_kafka_buf_read_uuid(rkbuf, &mdi->topics[i].topic_id); + } + if (ApiVersion >= 1) { int8_t is_internal; rd_kafka_buf_read_i8(rkbuf, &is_internal); diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 03586618db..8a8f16dbfa 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -54,6 +54,7 @@ typedef struct rd_kafka_metadata_topic_internal_s { * same count as metadata.topics[i].partition_cnt. * Sorted by Partition Id. */ rd_kafka_metadata_partition_internal_t *partitions; + rd_kafka_uuid_t topic_id; } rd_kafka_metadata_topic_internal_t; diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 24fce04106..9e0efa79f0 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -30,8 +30,10 @@ #define _RDKAFKA_PROTO_H_ +#include "rdstring.h" #include "rdendian.h" #include "rdvarint.h" +#include "rdbase64.h" /* Protocol defines */ #include "rdkafka_protocol.h" @@ -565,6 +567,73 @@ typedef struct rd_kafka_buf_s rd_kafka_buf_t; (8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4) +/** + * @brief UUID + * + * @param most_significant_bits - most significant 64 bits for the UUID + * @param least_significant_bits - least significant 64 bits for the UUID + * @param base64str - base64 encoding for the uuid. By default, it is lazy + * loaded. Use function `rd_kafka_uuid_base64str()` as a getter for this field. + */ +typedef struct rd_kafka_uuid_s { + int64_t most_significant_bits; + int64_t least_significant_bits; + char base64str[23]; +} rd_kafka_uuid_t; + +#define RD_KAFKA_ZERO_UUID \ + { 0, 0, "" } + +#define RD_KAFKA_METADATA_TOPIC_ID \ + { 0, 1, "" } + +/** + * Initialize given UUID to zero UUID. + * + * @param uuid - UUID to initialize. + */ +static RD_INLINE RD_UNUSED void rd_kafka_uuid_init(rd_kafka_uuid_t *uuid) { + memset(uuid, 0, sizeof(*uuid)); +} + +/** + * @brief Computes base64 encoding for the given uuid string. + * @param uuid UUID for which base64 encoding is required. + * + * @return base64 encoded string for the given UUID or NULL in case of some + * issue with the conversion or the conversion is not supported. + */ +static RD_INLINE RD_UNUSED char * +rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) { + if (strlen(uuid->base64str)) + return uuid->base64str; + + rd_chariov_t in_base64; + char *out_base64_str; + char *uuid_bytes; + uint64_t input_uuid[2]; + + input_uuid[0] = htobe64(uuid->most_significant_bits); + input_uuid[1] = htobe64(uuid->least_significant_bits); + uuid_bytes = (char *)input_uuid; + in_base64.ptr = uuid_bytes; + in_base64.size = sizeof(uuid->most_significant_bits) + + sizeof(uuid->least_significant_bits); + + out_base64_str = rd_base64_encode_str(&in_base64); + if (!out_base64_str) + return NULL; + + rd_strlcpy(uuid->base64str, out_base64_str, + 23 /* Removing extra ('=') padding */); + rd_free(out_base64_str); + return uuid->base64str; +} + +static RD_INLINE RD_UNUSED void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) { + rd_free(uuid); +} + /** * @name Producer ID and Epoch for the Idempotent Producer diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 8d0789cfc7..b01973a2cf 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2216,7 +2216,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, int *full_incr = NULL; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_Metadata, 0, 9, &features); + rkb, RD_KAFKAP_Metadata, 0, 12, &features); rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1, 4 + (50 * topic_cnt) + 1, @@ -2305,6 +2305,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, if (topic_cnt > 0) { char *topic; int i; + rd_kafka_uuid_t zero_uuid = RD_KAFKA_ZERO_UUID; /* Maintain a copy of the topics list so we can purge * hints from the metadata cache on error. */ @@ -2312,6 +2313,12 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, rd_list_copy(topics, rd_list_string_copy, NULL); RD_LIST_FOREACH(topic, topics, i) { + if (ApiVersion >= 10) { + /* FIXME: Not supporting topic id in the request + * right now. Update this to correct topic + * id once KIP-516 is fully implemented. */ + rd_kafka_buf_write_uuid(rkbuf, &zero_uuid); + } rd_kafka_buf_write_str(rkbuf, topic, -1); /* Tags for previous topic */ rd_kafka_buf_write_tags(rkbuf); @@ -2337,7 +2344,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, "on broker auto.create.topics.enable configuration"); } - if (ApiVersion >= 8 && ApiVersion < 10) { + if (ApiVersion >= 8 && ApiVersion <= 10) { /* TODO: implement KIP-430 */ /* IncludeClusterAuthorizedOperations */ rd_kafka_buf_write_bool(rkbuf, rd_false); diff --git a/src/rdkafka_sasl_oauthbearer_oidc.c b/src/rdkafka_sasl_oauthbearer_oidc.c index 9fa0972a39..57a6d2e30e 100644 --- a/src/rdkafka_sasl_oauthbearer_oidc.c +++ b/src/rdkafka_sasl_oauthbearer_oidc.c @@ -37,25 +37,7 @@ #include #include "rdhttp.h" #include "rdkafka_sasl_oauthbearer_oidc.h" - - -/** - * @brief Base64 encode binary input \p in, and write base64-encoded string - * and it's size to \p out - */ -static void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) { - size_t max_len; - - max_len = (((in->size + 2) / 3) * 4) + 1; - out->ptr = rd_malloc(max_len); - rd_assert(out->ptr); - - out->size = EVP_EncodeBlock((uint8_t *)out->ptr, (uint8_t *)in->ptr, - (int)in->size); - - rd_assert(out->size <= max_len); - out->ptr[out->size] = 0; -} +#include "rdbase64.h" /** @@ -84,6 +66,7 @@ static char *rd_kafka_oidc_build_auth_header(const char *client_id, client_authorization_in.size--; rd_base64_encode(&client_authorization_in, &client_authorization_out); + rd_assert(client_authorization_out.ptr); authorization_base64_header_size = strlen("Authorization: Basic ") + client_authorization_out.size + 1; diff --git a/src/rdkafka_sasl_scram.c b/src/rdkafka_sasl_scram.c index 32f13a4c04..01a6cd75e4 100644 --- a/src/rdkafka_sasl_scram.c +++ b/src/rdkafka_sasl_scram.c @@ -38,6 +38,7 @@ #include "rdkafka_sasl_int.h" #include "rdrand.h" #include "rdunittest.h" +#include "rdbase64.h" #if WITH_SSL @@ -142,77 +143,6 @@ static char *rd_kafka_sasl_scram_get_attr(const rd_chariov_t *inbuf, } -/** - * @brief Base64 encode binary input \p in - * @returns a newly allocated, base64-encoded string or NULL on error. - */ -static char *rd_base64_encode(const rd_chariov_t *in) { - char *ret; - size_t ret_len, max_len; - - /* OpenSSL takes an |int| argument so the input cannot exceed that. */ - if (in->size > INT_MAX) { - return NULL; - } - - /* This does not overflow given the |INT_MAX| bound, above. */ - max_len = (((in->size + 2) / 3) * 4) + 1; - ret = rd_malloc(max_len); - if (ret == NULL) { - return NULL; - } - - ret_len = - EVP_EncodeBlock((uint8_t *)ret, (uint8_t *)in->ptr, (int)in->size); - assert(ret_len < max_len); - ret[ret_len] = 0; - - return ret; -} - - -/** - * @brief Base64 decode input string \p in. Ignores leading and trailing - * whitespace. - * @returns -1 on invalid Base64, or 0 on successes in which case a - * newly allocated binary string is set in out (and size). - */ -static int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out) { - size_t ret_len; - - /* OpenSSL takes an |int| argument, so |in->size| must not exceed - * that. */ - if (in->size % 4 != 0 || in->size > INT_MAX) { - return -1; - } - - ret_len = ((in->size / 4) * 3); - out->ptr = rd_malloc(ret_len + 1); - - if (EVP_DecodeBlock((uint8_t *)out->ptr, (uint8_t *)in->ptr, - (int)in->size) == -1) { - rd_free(out->ptr); - out->ptr = NULL; - return -1; - } - - /* EVP_DecodeBlock will pad the output with trailing NULs and count - * them in the return value. */ - if (in->size > 1 && in->ptr[in->size - 1] == '=') { - if (in->size > 2 && in->ptr[in->size - 2] == '=') { - ret_len -= 2; - } else { - ret_len -= 1; - } - } - - out->ptr[ret_len] = 0; - out->size = ret_len; - - return 0; -} - - /** * @brief Perform H(str) hash function and stores the result in \p out * which must be at least EVP_MAX_MD_SIZE. @@ -443,7 +373,7 @@ static int rd_kafka_sasl_scram_build_client_final_message( } /* Store the Base64 encoded ServerSignature for quick comparison */ - state->ServerSignatureB64 = rd_base64_encode(&ServerSignature); + state->ServerSignatureB64 = rd_base64_encode_str(&ServerSignature); if (state->ServerSignatureB64 == NULL) { rd_free(client_final_msg_wo_proof.ptr); return -1; @@ -468,7 +398,7 @@ static int rd_kafka_sasl_scram_build_client_final_message( /* Base64 encoded ClientProof */ - ClientProofB64 = rd_base64_encode(&ClientProof); + ClientProofB64 = rd_base64_encode_str(&ClientProof); if (ClientProofB64 == NULL) { rd_free(client_final_msg_wo_proof.ptr); return -1; diff --git a/win32/librdkafka.vcxproj b/win32/librdkafka.vcxproj index 2735fca9ca..a7f267e89e 100644 --- a/win32/librdkafka.vcxproj +++ b/win32/librdkafka.vcxproj @@ -102,6 +102,7 @@ + @@ -174,6 +175,7 @@ +