-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Header support for C++ API #1959
Conversation
1380e59
to
3a2d188
Compare
3a2d188
to
cba970d
Compare
cba970d
to
f2f8466
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good effort!
Left some comments. Also need to follow the existing style.
@@ -457,6 +457,17 @@ void msg_consume(RdKafka::KafkaConsumer *consumer, | |||
" [" << (int)msg->partition() << "] at offset " << | |||
msg->offset() << std::endl; | |||
|
|||
RdKafka::Headers *headers = msg->get_headers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need for headers support in the verifiable_client, please remove these changes.
kafkatest_verifiable_client is used to run the Java system tests with the librdkafka client.
src-cpp/HeadersImpl.cpp
Outdated
|
||
#include "rdkafkacpp_int.h" | ||
|
||
RdKafka::Headers *RdKafka::Headers::create(size_t initial_count, bool free_rd_headers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two spaces after type, should only be one.
src-cpp/HeadersImpl.cpp
Outdated
if (headers.size() > 0) { | ||
return new RdKafka::HeadersImpl(headers, free_rd_headers); | ||
} else { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would've assumed an empty Headers list here rather than NULL
src-cpp/ProducerImpl.cpp
Outdated
if (headers) { | ||
hdrs = headers->c_headers(); | ||
} else { | ||
hdrs = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use NULL for pointers, not 0. And in this case you can initialize hdrs
to NULL directly.
src-cpp/rdkafkacpp.h
Outdated
return key_; | ||
} | ||
|
||
const char* value() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const char *value..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
header values may be NULL, and are to be considered binary, so use const void * and a size_t length instead
RdKafka::Producer::RK_MSG_COPY, | ||
(void *)message, message ? strlen(message) : 0, | ||
(void *)"key", 3, 0, NULL, produce_headers); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
produce a message without headers to make sure that works as well
tests/0085-headers.cpp
Outdated
* the messages. */ | ||
} | ||
RdKafka::Headers *headers = msg->get_headers(); | ||
if (!headers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should try this on a message without headers and see that nothing breaks.
tests/0085-headers.cpp
Outdated
} | ||
|
||
static void test_one_header_null_msg () { | ||
Test::Say("Test one header in consumed message with a null value message.\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this very valuable, the message value / key are not really related to the headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just on this one when I produce messages with null or empty values the headers do not come through on the consume, is this expected behaviour?
@@ -283,6 +284,7 @@ struct test tests[] = { | |||
_TEST(0083_cb_event, 0, TEST_BRKVER(0,9,0,0)), | |||
_TEST(0084_destroy_flags_local, TEST_F_LOCAL), | |||
_TEST(0084_destroy_flags, 0), | |||
_TEST(0085_headers, 0, TEST_BRKVER(0,11,0,0)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add to CMakeLists.txt too.
This goes for HeaderImpl.cpp too.
src-cpp/HeadersImpl.cpp
Outdated
return new RdKafka::HeadersImpl(initial_count, free_rd_headers); | ||
} | ||
|
||
RdKafka::Headers *RdKafka::Headers::create(const std::vector<Header> &headers, bool free_rd_headers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
free_rd_headers is an implementation detail, remove it from the public API.
@davidtrihy thanks so much for pushing this forward! I know you're also looking to get this upstream in |
bump @davidtrihy |
@alexander-alvarez sorry I've been busy with my day job around this and we're working with the forked version at the moment, I nearly have all the changes requested done but I'm just trying to get the tests working properly again. I'll try and get it done in the next week when I hopefully have some spare time. |
@davidtrihy friendly ping. I'd help out myself, but my C++ is only enough to read it and sort of understand what's going on. |
@alexander-alvarez Hey sorry been very busy but I'll spend some time at it tonight and tomorrow to address the comments, the node-rdkafka maintainer is happy with the changes on that side but I'll address the rest of the issues on this soon! |
@davidtrihy Friendly reminder from your neighborly open source C++ freeloader. |
@edenhill apologies about the lateness in getting the changes requested for this one, been very busy with work and I needed to find some time! I hope this suffices and if you have any further feedback let me know and I'll try and address ASAP so we can get this over the line. |
If there's any further feedback I'll address it ASAP so I can get this over the line |
Thanks @davidtrihy I owe you a 🍺 or 🍸 (or whatever you want lol) if you stop by NYC |
looks like there's more conflicts :( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff, needs some changes!
@@ -3,7 +3,7 @@ | |||
|
|||
Property | C/P | Range | Default | Description | |||
-----------------------------------------|-----|-----------------|--------------:|-------------------------- | |||
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags* | |||
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_plain, sasl_scram, plugins | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CONFIGURATION.md is auto-generated from your build options, which seem to lack cyrus/gssapi support. Please don't commit this file unless there were actual changes to rdkafka_conf.c
/* | ||
* librdkafka - Apache Kafka C/C++ library | ||
* | ||
* Copyright (c) 2014 Magnus Edenhill |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Copyright (c) 2014 Magnus Edenhill | |
* Copyright (c) 2018 Magnus Edenhill |
/** | ||
* @brief Headers object | ||
* | ||
* This object encapsulates the C implementation logic into a C++ object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't reference the C API from the public C++ API: that'll require users to know both the C and C++ API.
Instead copy the relevant sections of the C doc strings here and modify for C++ use.
* | ||
* @param key the string value for the header key | ||
* | ||
* @param value the bytes of the header value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- ", or NULL"
* | ||
* @param value_size the length in bytes of the header value | ||
*/ | ||
Header(const std::string &key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Header should be a virtual class and thus not have a public constructor, only a factory method.
See e.g., Topic class.
size_t value_size, | ||
const RdKafka::ErrorCode &err): | ||
key_(key), err_(err), value_size_(value_size) { | ||
value_ = copy_value(value, value_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be more efficient to just reference the underlying C header value data, if possible
rd_kafka_headers_t *hdrsp; | ||
rd_kafka_resp_err_t err; | ||
|
||
if (rkmessage->len > 0 && !(err = rd_kafka_message_detach_headers(rkmessage, &hdrsp))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove the err variable, it is not used
Headers* get_headers() { | ||
return headers_; | ||
} | ||
|
||
RdKafka::Topic *topic_; | ||
rd_kafka_message_t *rkmessage_; | ||
bool free_rkmessage_; | ||
/* For error signalling by the C++ layer the .._err_ message is | ||
* used as a place holder and rkmessage_ is set to point to it. */ | ||
rd_kafka_message_t rkmessage_err_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this meant to be a pointer? Having the entire struct here for an error code seems wasteful.
/* | ||
* librdkafka - Apache Kafka C library | ||
* | ||
* Copyright (c) 2012-2015, Magnus Edenhill |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Copyright (c) 2012-2015, Magnus Edenhill | |
* Copyright (c) 2018, Magnus Edenhill |
@@ -1451,6 +1713,9 @@ class RD_EXPORT Message { | |||
* @returns \c rd_kafka_message_t* | |||
*/ | |||
virtual struct rd_kafka_message_s *c_ptr () = 0; | |||
|
|||
/** @returns The Headers instance for this Message (if applicable) */ | |||
virtual RdKafka::Headers *get_headers() = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
virtual RdKafka::Headers *get_headers() = 0; | |
virtual RdKafka::Headers *headers() = 0; |
@davidtrihy It's close to the finish line |
Bump @davidtrihy |
I am patiently waiting for this as well. Any idea on when this will be completed? |
I'm picking this up to do the finishing touches |
Made the final changes and created a new PR here: #2109 Please review and test out. |
Producing and consuming header support in the C++ API.
Two test cases added in the e2e tests, was trying to add test cases for header support for empty/null messages but it appears that is not supported and the headers do not get attached. Is this correct behaviour?
Reference issue #1861