Skip to content

Commit

Permalink
Merge branch 'master' into dev_fix_duplicate_messages_cooperative_ass…
Browse files Browse the repository at this point in the history
…ignor
  • Loading branch information
emasab authored Mar 21, 2024
2 parents 94fd544 + 267367c commit f11be5a
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 19 deletions.
6 changes: 3 additions & 3 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ blocks:
value: -std=gnu90 # Test minimum C standard, default in CentOS 7
prologue:
commands:
- docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
jobs:
- name: 'Build and integration tests'
commands:
Expand Down Expand Up @@ -147,7 +147,7 @@ blocks:
type: s1-prod-ubuntu20-04-amd64-2
prologue:
commands:
- docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
epilogue:
commands:
- '[[ -z $SEMAPHORE_GIT_TAG_NAME ]] || artifact push workflow artifacts/ --destination artifacts/${ARTIFACT_KEY}/'
Expand Down Expand Up @@ -203,7 +203,7 @@ blocks:
type: s1-prod-ubuntu20-04-arm64-1
prologue:
commands:
- docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
epilogue:
commands:
- '[[ -z $SEMAPHORE_GIT_TAG_NAME ]] || artifact push workflow artifacts/ --destination artifacts/${ARTIFACT_KEY}/'
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ librdkafka v2.3.1 is a maintenance release:

* Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes,
check the [release notes](https://www.openssl.org/news/cl30.txt).
* Integration tests can be started in KRaft mode and run against any
GitHub Kafka branch other than the released versions.
* Fixed a bug causing duplicate message consumption from a stale
fetch start offset in some particular cases (#4636)

Expand Down
12 changes: 12 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#endif

#include "rdtime.h"
#include "rdmap.h"
#include "crc32c.h"
#include "rdunittest.h"

Expand Down Expand Up @@ -5178,6 +5179,17 @@ const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) {
return uuid->base64str;
}

unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid) {
unsigned char bytes[16];
memcpy(bytes, &uuid->most_significant_bits, 8);
memcpy(&bytes[8], &uuid->least_significant_bits, 8);
return rd_bytes_hash(bytes, 16);
}

unsigned int rd_kafka_Uuid_map_hash(const void *key) {
return rd_kafka_Uuid_hash(key);
}

int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) {
return uuid->least_significant_bits;
}
Expand Down
181 changes: 180 additions & 1 deletion src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -3101,6 +3101,12 @@ int rd_kafka_topic_partition_by_id_cmp(const void *_a, const void *_b) {
return are_topic_ids_different || RD_CMP(a->partition, b->partition);
}

static int rd_kafka_topic_partition_by_id_cmp_opaque(const void *_a,
const void *_b,
void *opaque) {
return rd_kafka_topic_partition_by_id_cmp(_a, _b);
}

/** @brief Compare only the topic */
int rd_kafka_topic_partition_cmp_topic(const void *_a, const void *_b) {
const rd_kafka_topic_partition_t *a = _a;
Expand All @@ -3114,13 +3120,22 @@ static int rd_kafka_topic_partition_cmp_opaque(const void *_a,
return rd_kafka_topic_partition_cmp(_a, _b);
}

/** @returns a hash of the topic and partition */
/** @returns a hash of the topic name and partition */
unsigned int rd_kafka_topic_partition_hash(const void *_a) {
const rd_kafka_topic_partition_t *a = _a;
int r = 31 * 17 + a->partition;
return 31 * r + rd_string_hash(a->topic, -1);
}

/** @returns a hash of the topic id and partition */
unsigned int rd_kafka_topic_partition_hash_by_id(const void *_a) {
const rd_kafka_topic_partition_t *a = _a;
const rd_kafka_Uuid_t topic_id =
rd_kafka_topic_partition_get_topic_id(a);
int r = 31 * 17 + a->partition;
return 31 * r + rd_kafka_Uuid_hash(&topic_id);
}



/**
Expand Down Expand Up @@ -3327,6 +3342,12 @@ void rd_kafka_topic_partition_list_sort_by_topic(
rktparlist, rd_kafka_topic_partition_cmp_opaque, NULL);
}

void rd_kafka_topic_partition_list_sort_by_topic_id(
rd_kafka_topic_partition_list_t *rktparlist) {
rd_kafka_topic_partition_list_sort(
rktparlist, rd_kafka_topic_partition_by_id_cmp_opaque, NULL);
}

rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset(
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic,
Expand Down Expand Up @@ -4493,3 +4514,161 @@ const char *rd_kafka_fetch_pos2str(const rd_kafka_fetch_pos_t fetchpos) {

return ret[idx];
}

typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
void *) map_toppar_void_t;

/**
* @brief Calculates \p a ∩ \p b using \p cmp and \p hash .
* Ordered following \p a order. Elements are copied from \p a.
*/
static rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection0(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b,
int(cmp)(const void *_a, const void *_b),
unsigned int(hash)(const void *_a)) {
rd_kafka_topic_partition_t *rktpar;
rd_kafka_topic_partition_list_t *ret =
rd_kafka_topic_partition_list_new(a->cnt < b->cnt ? a->cnt
: b->cnt);
map_toppar_void_t b_map =
RD_MAP_INITIALIZER(b->cnt, cmp, hash, NULL, NULL);
RD_KAFKA_TPLIST_FOREACH(rktpar, b) {
RD_MAP_SET(&b_map, rktpar, rktpar);
}
RD_KAFKA_TPLIST_FOREACH(rktpar, a) {
if ((RD_MAP_GET(&b_map, rktpar) != NULL) == 1) {
rd_kafka_topic_partition_list_add_copy(ret, rktpar);
}
}
RD_MAP_DESTROY(&b_map);
return ret;
}

/**
* @brief Calculates \p a - \p b using \p cmp and \p hash .
* Ordered following \p a order. Elements are copied from \p a.
*/
static rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_difference0(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b,
int(cmp)(const void *_a,
const void *_b),
unsigned int(hash)(const void *_a)) {
rd_kafka_topic_partition_t *rktpar;
rd_kafka_topic_partition_list_t *ret =
rd_kafka_topic_partition_list_new(a->cnt);
map_toppar_void_t b_map =
RD_MAP_INITIALIZER(b->cnt, cmp, hash, NULL, NULL);
RD_KAFKA_TPLIST_FOREACH(rktpar, b) {
RD_MAP_SET(&b_map, rktpar, rktpar);
}
RD_KAFKA_TPLIST_FOREACH(rktpar, a) {
if ((RD_MAP_GET(&b_map, rktpar) != NULL) == 0) {
rd_kafka_topic_partition_list_add_copy(ret, rktpar);
}
}
RD_MAP_DESTROY(&b_map);
return ret;
}

/**
* @brief Calculates \p a ∪ \p b using \p cmp and \p hash .
* Ordered following \p a order for elements in \p a
* and \p b order for elements only in \p b.
* Elements are copied the same way.
*/
static rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_union0(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b,
int(cmp)(const void *_a, const void *_b),
unsigned int(hash)(const void *_a)) {

rd_kafka_topic_partition_list_t *b_minus_a =
rd_kafka_topic_partition_list_difference0(b, a, cmp, hash);
rd_kafka_topic_partition_list_t *ret =
rd_kafka_topic_partition_list_new(a->cnt + b_minus_a->cnt);

rd_kafka_topic_partition_list_add_list(ret, a);
rd_kafka_topic_partition_list_add_list(ret, b_minus_a);

rd_kafka_topic_partition_list_destroy(b_minus_a);
return ret;
}

/**
* @brief Calculates \p a ∩ \p b using topic name and partition id.
* Ordered following \p a order. Elements are copied from \p a.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_intersection0(
a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash);
}

/**
* @brief Calculates \p a - \p b using topic name and partition id.
* Ordered following \p a order. Elements are copied from \p a.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_difference_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_difference0(
a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash);
}

/**
* @brief Calculates \p a ∪ \p b using topic name and partition id.
* Ordered following \p a order for elements in \p a
* and \p b order for elements only in \p b.
* Elements are copied the same way.
*/
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_union_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_union0(
a, b, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash);
}

/**
* @brief Calculates \p a ∩ \p b using topic id and partition id.
* Ordered following \p a order. Elements are copied from \p a.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection_by_id(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_intersection0(
a, b, rd_kafka_topic_partition_by_id_cmp,
rd_kafka_topic_partition_hash_by_id);
}

/**
* @brief Calculates \p a - \p b using topic id and partition id.
* Ordered following \p a order. Elements are copied from \p a.
*/
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_difference_by_id(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_difference0(
a, b, rd_kafka_topic_partition_by_id_cmp,
rd_kafka_topic_partition_hash_by_id);
}

/**
* @brief Calculates \p a ∪ \p b using topic id and partition id.
* Ordered following \p a order for elements in \p a
* and \p b order for elements only in \p b.
* Elements are copied the same way.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_union_by_id(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b) {
return rd_kafka_topic_partition_list_union0(
a, b, rd_kafka_topic_partition_by_id_cmp,
rd_kafka_topic_partition_hash_by_id);
}
30 changes: 30 additions & 0 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,9 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic(
void rd_kafka_topic_partition_list_sort_by_topic(
rd_kafka_topic_partition_list_t *rktparlist);

void rd_kafka_topic_partition_list_sort_by_topic_id(
rd_kafka_topic_partition_list_t *rktparlist);

void rd_kafka_topic_partition_list_reset_offsets(
rd_kafka_topic_partition_list_t *rktparlist,
int64_t offset);
Expand Down Expand Up @@ -1122,4 +1125,31 @@ static RD_UNUSED RD_INLINE void rd_kafka_toppar_set_offset_validation_position(
rktp->rktp_offset_validation_pos = offset_validation_pos;
}

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_difference_by_name(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_union_by_name(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_intersection_by_id(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_difference_by_id(
rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_union_by_id(rd_kafka_topic_partition_list_t *a,
rd_kafka_topic_partition_list_t *b);

#endif /* _RDKAFKA_PARTITION_H_ */
4 changes: 4 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,10 @@ rd_kafka_Uuid_t rd_kafka_Uuid_random();

const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid);

unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid);

unsigned int rd_kafka_Uuid_map_hash(const void *key);

/**
* @name Producer ID and Epoch for the Idempotent Producer
* @{
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans,

param = SSL_get0_param(rktrans->rktrans_ssl);

if (!X509_VERIFY_PARAM_set1_host(param, name, 0))
if (!X509_VERIFY_PARAM_set1_host(param, name,
strnlen(name, sizeof(name))))
goto fail;
}
#else
Expand Down
16 changes: 16 additions & 0 deletions src/rdmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2020-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -237,6 +238,21 @@ unsigned int rd_map_str_hash(const void *key) {
}


/**
* @returns a djb2 hash of \p bytes.
*
* @param len \p bytes will be hashed up to \p len.
*/
unsigned int rd_bytes_hash(unsigned char *bytes, size_t len) {
unsigned int hash = 5381;
size_t i;

for (i = 0; i < len; i++)
hash = ((hash << 5) + hash) + bytes[i];

return hash;
}


/**
* @name Unit tests
Expand Down
5 changes: 5 additions & 0 deletions src/rdmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2020-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -249,6 +250,10 @@ int rd_map_str_cmp(const void *a, const void *b);
*/
unsigned int rd_map_str_hash(const void *a);

/**
* @brief Bytes hash function (djb2).
*/
unsigned int rd_bytes_hash(unsigned char *bytes, size_t len);


/**
Expand Down
Loading

0 comments on commit f11be5a

Please sign in to comment.