diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 7dfdeb65a8..a08a871544 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -112,7 +112,7 @@ blocks: value: -std=gnu90 # Test minimum C standard, default in CentOS 7 prologue: commands: - - docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY + - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' jobs: - name: 'Build and integration tests' commands: @@ -147,7 +147,7 @@ blocks: type: s1-prod-ubuntu20-04-amd64-2 prologue: commands: - - docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY + - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' epilogue: commands: - '[[ -z $SEMAPHORE_GIT_TAG_NAME ]] || artifact push workflow artifacts/ --destination artifacts/${ARTIFACT_KEY}/' @@ -203,7 +203,7 @@ blocks: type: s1-prod-ubuntu20-04-arm64-1 prologue: commands: - - docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY + - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' epilogue: commands: - '[[ -z $SEMAPHORE_GIT_TAG_NAME ]] || artifact push workflow artifacts/ --destination artifacts/${ARTIFACT_KEY}/' diff --git a/CHANGELOG.md b/CHANGELOG.md index a05e01308b..edb700213a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ librdkafka v2.3.1 is a maintenance release: * Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes, check the [release notes](https://www.openssl.org/news/cl30.txt). + * Integration tests can be started in KRaft mode and run against any + GitHub Kafka branch other than the released versions. * Fixed a bug causing duplicate message consumption from a stale fetch start offset in some particular cases (#4636) diff --git a/src/rdkafka.c b/src/rdkafka.c index 7427fa171b..566d2e065d 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -64,6 +64,7 @@ #endif #include "rdtime.h" +#include "rdmap.h" #include "crc32c.h" #include "rdunittest.h" @@ -5178,6 +5179,17 @@ const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) { return uuid->base64str; } +unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid) { + unsigned char bytes[16]; + memcpy(bytes, &uuid->most_significant_bits, 8); + memcpy(&bytes[8], &uuid->least_significant_bits, 8); + return rd_bytes_hash(bytes, 16); +} + +unsigned int rd_kafka_Uuid_map_hash(const void *key) { + return rd_kafka_Uuid_hash(key); +} + int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) { return uuid->least_significant_bits; } diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 5b3fec0043..7dd9eae66a 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -3101,6 +3101,12 @@ int rd_kafka_topic_partition_by_id_cmp(const void *_a, const void *_b) { return are_topic_ids_different || RD_CMP(a->partition, b->partition); } +static int rd_kafka_topic_partition_by_id_cmp_opaque(const void *_a, + const void *_b, + void *opaque) { + return rd_kafka_topic_partition_by_id_cmp(_a, _b); +} + /** @brief Compare only the topic */ int rd_kafka_topic_partition_cmp_topic(const void *_a, const void *_b) { const rd_kafka_topic_partition_t *a = _a; @@ -3114,13 +3120,22 @@ static int rd_kafka_topic_partition_cmp_opaque(const void *_a, return rd_kafka_topic_partition_cmp(_a, _b); } -/** @returns a hash of the topic and partition */ +/** @returns a hash of the topic name and partition */ unsigned int rd_kafka_topic_partition_hash(const void *_a) { const rd_kafka_topic_partition_t *a = _a; int r = 31 * 17 + a->partition; return 31 * r + rd_string_hash(a->topic, -1); } +/** @returns a hash of the topic id and partition */ +unsigned int rd_kafka_topic_partition_hash_by_id(const void *_a) { + const rd_kafka_topic_partition_t *a = _a; + const rd_kafka_Uuid_t topic_id = + rd_kafka_topic_partition_get_topic_id(a); + int r = 31 * 17 + a->partition; + return 31 * r + rd_kafka_Uuid_hash(&topic_id); +} + /** @@ -3327,6 +3342,12 @@ void rd_kafka_topic_partition_list_sort_by_topic( rktparlist, rd_kafka_topic_partition_cmp_opaque, NULL); } +void rd_kafka_topic_partition_list_sort_by_topic_id( + rd_kafka_topic_partition_list_t *rktparlist) { + rd_kafka_topic_partition_list_sort( + rktparlist, rd_kafka_topic_partition_by_id_cmp_opaque, NULL); +} + rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset( rd_kafka_topic_partition_list_t *rktparlist, const char *topic, @@ -4493,3 +4514,161 @@ const char *rd_kafka_fetch_pos2str(const rd_kafka_fetch_pos_t fetchpos) { return ret[idx]; } + +typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *, + void *) map_toppar_void_t; + +/** + * @brief Calculates \p a ∩ \p b using \p cmp and \p hash . + * Ordered following \p a order. Elements are copied from \p a. + */ +static rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection0( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b, + int(cmp)(const void *_a, const void *_b), + unsigned int(hash)(const void *_a)) { + rd_kafka_topic_partition_t *rktpar; + rd_kafka_topic_partition_list_t *ret = + rd_kafka_topic_partition_list_new(a->cnt < b->cnt ? a->cnt + : b->cnt); + map_toppar_void_t b_map = + RD_MAP_INITIALIZER(b->cnt, cmp, hash, NULL, NULL); + RD_KAFKA_TPLIST_FOREACH(rktpar, b) { + RD_MAP_SET(&b_map, rktpar, rktpar); + } + RD_KAFKA_TPLIST_FOREACH(rktpar, a) { + if ((RD_MAP_GET(&b_map, rktpar) != NULL) == 1) { + rd_kafka_topic_partition_list_add_copy(ret, rktpar); + } + } + RD_MAP_DESTROY(&b_map); + return ret; +} + +/** + * @brief Calculates \p a - \p b using \p cmp and \p hash . + * Ordered following \p a order. Elements are copied from \p a. + */ +static rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_difference0(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b, + int(cmp)(const void *_a, + const void *_b), + unsigned int(hash)(const void *_a)) { + rd_kafka_topic_partition_t *rktpar; + rd_kafka_topic_partition_list_t *ret = + rd_kafka_topic_partition_list_new(a->cnt); + map_toppar_void_t b_map = + RD_MAP_INITIALIZER(b->cnt, cmp, hash, NULL, NULL); + RD_KAFKA_TPLIST_FOREACH(rktpar, b) { + RD_MAP_SET(&b_map, rktpar, rktpar); + } + RD_KAFKA_TPLIST_FOREACH(rktpar, a) { + if ((RD_MAP_GET(&b_map, rktpar) != NULL) == 0) { + rd_kafka_topic_partition_list_add_copy(ret, rktpar); + } + } + RD_MAP_DESTROY(&b_map); + return ret; +} + +/** + * @brief Calculates \p a ∪ \p b using \p cmp and \p hash . + * Ordered following \p a order for elements in \p a + * and \p b order for elements only in \p b. + * Elements are copied the same way. + */ +static rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_union0(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b, + int(cmp)(const void *_a, const void *_b), + unsigned int(hash)(const void *_a)) { + + rd_kafka_topic_partition_list_t *b_minus_a = + rd_kafka_topic_partition_list_difference0(b, a, cmp, hash); + rd_kafka_topic_partition_list_t *ret = + rd_kafka_topic_partition_list_new(a->cnt + b_minus_a->cnt); + + rd_kafka_topic_partition_list_add_list(ret, a); + rd_kafka_topic_partition_list_add_list(ret, b_minus_a); + + rd_kafka_topic_partition_list_destroy(b_minus_a); + return ret; +} + +/** + * @brief Calculates \p a ∩ \p b using topic name and partition id. + * Ordered following \p a order. Elements are copied from \p a. + */ +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_intersection0( + a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash); +} + +/** + * @brief Calculates \p a - \p b using topic name and partition id. + * Ordered following \p a order. Elements are copied from \p a. + */ +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_difference_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_difference0( + a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash); +} + +/** + * @brief Calculates \p a ∪ \p b using topic name and partition id. + * Ordered following \p a order for elements in \p a + * and \p b order for elements only in \p b. + * Elements are copied the same way. + */ +rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_union_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_union0( + a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash); +} + +/** + * @brief Calculates \p a ∩ \p b using topic id and partition id. + * Ordered following \p a order. Elements are copied from \p a. + */ +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection_by_id( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_intersection0( + a, b, rd_kafka_topic_partition_by_id_cmp, + rd_kafka_topic_partition_hash_by_id); +} + +/** + * @brief Calculates \p a - \p b using topic id and partition id. + * Ordered following \p a order. Elements are copied from \p a. + */ +rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_difference_by_id( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_difference0( + a, b, rd_kafka_topic_partition_by_id_cmp, + rd_kafka_topic_partition_hash_by_id); +} + +/** + * @brief Calculates \p a ∪ \p b using topic id and partition id. + * Ordered following \p a order for elements in \p a + * and \p b order for elements only in \p b. + * Elements are copied the same way. + */ +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_union_by_id(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + return rd_kafka_topic_partition_list_union0( + a, b, rd_kafka_topic_partition_by_id_cmp, + rd_kafka_topic_partition_hash_by_id); +} diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index 56b4a76138..cdb023d87a 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -773,6 +773,9 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic( void rd_kafka_topic_partition_list_sort_by_topic( rd_kafka_topic_partition_list_t *rktparlist); +void rd_kafka_topic_partition_list_sort_by_topic_id( + rd_kafka_topic_partition_list_t *rktparlist); + void rd_kafka_topic_partition_list_reset_offsets( rd_kafka_topic_partition_list_t *rktparlist, int64_t offset); @@ -1122,4 +1125,31 @@ static RD_UNUSED RD_INLINE void rd_kafka_toppar_set_offset_validation_position( rktp->rktp_offset_validation_pos = offset_validation_pos; } +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_difference_by_name( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_union_by_name(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_intersection_by_id( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_difference_by_id( + rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + +rd_kafka_topic_partition_list_t * +rd_kafka_topic_partition_list_union_by_id(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b); + #endif /* _RDKAFKA_PARTITION_H_ */ diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index ee392a6a38..cf4153f03d 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -603,6 +603,10 @@ rd_kafka_Uuid_t rd_kafka_Uuid_random(); const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid); +unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid); + +unsigned int rd_kafka_Uuid_map_hash(const void *key); + /** * @name Producer ID and Epoch for the Idempotent Producer * @{ diff --git a/src/rdkafka_ssl.c b/src/rdkafka_ssl.c index 85f745cb9c..0dd7e509da 100644 --- a/src/rdkafka_ssl.c +++ b/src/rdkafka_ssl.c @@ -476,7 +476,8 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans, param = SSL_get0_param(rktrans->rktrans_ssl); - if (!X509_VERIFY_PARAM_set1_host(param, name, 0)) + if (!X509_VERIFY_PARAM_set1_host(param, name, + strnlen(name, sizeof(name)))) goto fail; } #else diff --git a/src/rdmap.c b/src/rdmap.c index 8e1a0546cc..1e82bcb9a2 100644 --- a/src/rdmap.c +++ b/src/rdmap.c @@ -2,6 +2,7 @@ * librdkafka - The Apache Kafka C/C++ library * * Copyright (c) 2020-2022, Magnus Edenhill + * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -237,6 +238,21 @@ unsigned int rd_map_str_hash(const void *key) { } +/** + * @returns a djb2 hash of \p bytes. + * + * @param len \p bytes will be hashed up to \p len. + */ +unsigned int rd_bytes_hash(unsigned char *bytes, size_t len) { + unsigned int hash = 5381; + size_t i; + + for (i = 0; i < len; i++) + hash = ((hash << 5) + hash) + bytes[i]; + + return hash; +} + /** * @name Unit tests diff --git a/src/rdmap.h b/src/rdmap.h index bea8a1aca6..b8e3feb97b 100644 --- a/src/rdmap.h +++ b/src/rdmap.h @@ -2,6 +2,7 @@ * librdkafka - The Apache Kafka C/C++ library * * Copyright (c) 2020-2022, Magnus Edenhill + * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -249,6 +250,10 @@ int rd_map_str_cmp(const void *a, const void *b); */ unsigned int rd_map_str_hash(const void *a); +/** + * @brief Bytes hash function (djb2). + */ +unsigned int rd_bytes_hash(unsigned char *bytes, size_t len); /** diff --git a/tests/LibrdkafkaTestApp.py b/tests/LibrdkafkaTestApp.py index 6dff9b4ea3..40fdd12341 100644 --- a/tests/LibrdkafkaTestApp.py +++ b/tests/LibrdkafkaTestApp.py @@ -191,7 +191,7 @@ def __init__(self, cluster, version, conf=None, if tests is not None: self.env_add('TESTS', ','.join(tests)) - def start_cmd(self): + def finalize_env(self): self.env_add( 'KAFKA_PATH', self.cluster.get_all( @@ -214,7 +214,8 @@ def start_cmd(self): # Provide a HTTPS REST endpoint for the HTTP client tests. self.env_add( 'RD_UT_HTTP_URL', - 'https://jsonplaceholder.typicode.com/users') + 'https://jsonplaceholder.typicode.com/users', + False) # Per broker env vars for b in [x for x in self.cluster.apps if isinstance( @@ -222,14 +223,20 @@ def start_cmd(self): self.env_add('BROKER_ADDRESS_%d' % b.appid, ','.join([x for x in b.conf['listeners'].split(',') - if x.startswith(self.security_protocol)])) + if x.startswith(self.security_protocol)]), + False) # Add each broker pid as an env so they can be killed # indivdidually. - self.env_add('BROKER_PID_%d' % b.appid, str(b.proc.pid)) + self.env_add('BROKER_PID_%d' % b.appid, str(b.proc.pid), False) # JMX port, if available jmx_port = b.conf.get('jmx_port', None) if jmx_port is not None: - self.env_add('BROKER_JMX_PORT_%d' % b.appid, str(jmx_port)) + self.env_add( + 'BROKER_JMX_PORT_%d' % + b.appid, str(jmx_port), False) + + def start_cmd(self): + self.finalize_env() extra_args = list() if not self.local_tests: diff --git a/tests/broker_version_tests.py b/tests/broker_version_tests.py index d84836974a..c451e02471 100755 --- a/tests/broker_version_tests.py +++ b/tests/broker_version_tests.py @@ -48,6 +48,8 @@ def test_it(version, deploy=True, conf={}, rdkconf={}, tests=None, cluster.start(timeout=30) if conf.get('test_mode', '') == 'bash': + rdkafka.finalize_env() + if inherit_env: env = dict(os.environ, **rdkafka.env) else: diff --git a/tests/requirements.txt b/tests/requirements.txt index 3096e9c884..bd7777d3a1 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,2 +1,2 @@ -trivup/trivup-0.12.3.tar.gz +trivup/trivup-0.12.4.tar.gz jsoncomment diff --git a/tests/test.c b/tests/test.c index 654e2b6355..dfdb2df0e0 100644 --- a/tests/test.c +++ b/tests/test.c @@ -1804,17 +1804,14 @@ int main(int argc, char **argv) { TEST_SAY("Git version: %s\n", test_git_version); - if (!strcmp(test_broker_version_str, "trunk")) - test_broker_version_str = "9.9.9.9"; /* for now */ - d = 0; if (sscanf(test_broker_version_str, "%d.%d.%d.%d", &a, &b, &c, &d) < 3) { - printf( - "%% Expected broker version to be in format " - "N.N.N (N=int), not %s\n", - test_broker_version_str); - exit(1); + TEST_SAY( + "Non-numeric broker version, setting version" + " to 9.9.9.9\n"); + test_broker_version_str = "9.9.9.9"; + sscanf(test_broker_version_str, "%d.%d.%d.%d", &a, &b, &c, &d); } test_broker_version = TEST_BRKVER(a, b, c, d); TEST_SAY("Broker version: %s (%d.%d.%d.%d)\n", test_broker_version_str, diff --git a/tests/trivup/trivup-0.12.3.tar.gz b/tests/trivup/trivup-0.12.3.tar.gz deleted file mode 100644 index 1782154bf5..0000000000 Binary files a/tests/trivup/trivup-0.12.3.tar.gz and /dev/null differ diff --git a/tests/trivup/trivup-0.12.4.tar.gz b/tests/trivup/trivup-0.12.4.tar.gz new file mode 100644 index 0000000000..52f5be3df3 Binary files /dev/null and b/tests/trivup/trivup-0.12.4.tar.gz differ