From 9e779ce7faed927e96c8a4bbdd54945900a6ed76 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Mon, 13 Aug 2018 16:25:10 +0100 Subject: [PATCH 1/4] Header support for C++ API --- examples/kafkatest_verifiable_client.cpp | 40 +++- src-cpp/CMakeLists.txt | 1 + src-cpp/HeadersImpl.cpp | 49 +++++ src-cpp/Makefile | 2 +- src-cpp/ProducerImpl.cpp | 12 +- src-cpp/rdkafkacpp.h | 135 ++++++++++++- src-cpp/rdkafkacpp_int.h | 148 ++++++++++++++- tests/0054-offset_time.cpp | 2 +- tests/0057-invalid_topic.cpp | 4 +- tests/0059-bsearch.cpp | 2 +- tests/0065-yield.cpp | 2 +- tests/0070-null_empty.cpp | 2 +- tests/0085-headers.cpp | 219 ++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/test.c | 3 + win32/librdkafkacpp/librdkafkacpp.vcxproj | 1 + win32/tests/tests.vcxproj | 1 + 17 files changed, 594 insertions(+), 30 deletions(-) create mode 100644 src-cpp/HeadersImpl.cpp create mode 100644 tests/0085-headers.cpp diff --git a/examples/kafkatest_verifiable_client.cpp b/examples/kafkatest_verifiable_client.cpp index 542ef823f6..4c9e1e1a64 100644 --- a/examples/kafkatest_verifiable_client.cpp +++ b/examples/kafkatest_verifiable_client.cpp @@ -457,6 +457,17 @@ void msg_consume(RdKafka::KafkaConsumer *consumer, " [" << (int)msg->partition() << "] at offset " << msg->offset() << std::endl; + RdKafka::Headers *headers = msg->get_headers(); + if (headers) { + std::vector sheaders = headers->get_all(); + std::cout << "Headers length: " << sheaders.size() << std::endl; + for(std::vector::const_iterator it = sheaders.begin(); + it != sheaders.end(); + it++) { + std::cout << "Key: " << (*it).key << " Value: " << (*it).value << std::endl; + } + } + if (state.maxMessages >= 0 && state.consumer.consumedMessages >= state.maxMessages) return; @@ -831,31 +842,42 @@ int main (int argc, char **argv) { msg << value_prefix << i; while (true) { RdKafka::ErrorCode resp; - if (create_time == -1) { - resp = producer->produce(topic, partition, - RdKafka::Producer::RK_MSG_COPY /* Copy payload */, - const_cast(msg.str().c_str()), - msg.str().size(), NULL, NULL); - } else { - resp = producer->produce(topics[0], partition, + RdKafka::Headers *headers = 0; + if (create_time == -1) { + resp = producer->produce(topic, partition, + RdKafka::Producer::RK_MSG_COPY /* Copy payload */, + const_cast(msg.str().c_str()), + msg.str().size(), NULL, NULL); + } else { + std::string name = "kafkaheader"; + std::string val = "header_val"; + std::vector headers_arr; + headers_arr.push_back(RdKafka::Headers::Header(name, val.c_str())); + + headers = RdKafka::Headers::create(headers_arr, false); + resp = producer->produce(topics[0], partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast(msg.str().c_str()), msg.str().size(), NULL, 0, create_time, - NULL); - } + NULL, headers); + } if (resp == RdKafka::ERR__QUEUE_FULL) { producer->poll(100); continue; } else if (resp != RdKafka::ERR_NO_ERROR) { + headers->destroy_headers(); errorString("producer_send_error", RdKafka::err2str(resp), topic->name(), NULL, msg.str()); state.producer.numErr++; } else { state.producer.numSent++; } + if (headers) { + delete headers; + } break; } diff --git a/src-cpp/CMakeLists.txt b/src-cpp/CMakeLists.txt index f3deebf28b..18a7c49b82 100644 --- a/src-cpp/CMakeLists.txt +++ b/src-cpp/CMakeLists.txt @@ -5,6 +5,7 @@ set( ConfImpl.cpp ConsumerImpl.cpp HandleImpl.cpp + HeadersImpl.cpp KafkaConsumerImpl.cpp MessageImpl.cpp MetadataImpl.cpp diff --git a/src-cpp/HeadersImpl.cpp b/src-cpp/HeadersImpl.cpp new file mode 100644 index 0000000000..27a655a612 --- /dev/null +++ b/src-cpp/HeadersImpl.cpp @@ -0,0 +1,49 @@ +/* + * librdkafka - Apache Kafka C/C++ library + * + * Copyright (c) 2014 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include + +#include "rdkafkacpp_int.h" + +RdKafka::Headers *RdKafka::Headers::create(size_t initial_count, bool free_rd_headers) { + return new RdKafka::HeadersImpl(initial_count, free_rd_headers); +} + +RdKafka::Headers *RdKafka::Headers::create(const std::vector
&headers, bool free_rd_headers) { + if (headers.size() > 0) { + return new RdKafka::HeadersImpl(headers, free_rd_headers); + } else { + return 0; + } + +} + +RdKafka::Headers::~Headers() {} diff --git a/src-cpp/Makefile b/src-cpp/Makefile index c55007deb1..5a41ed51a2 100644 --- a/src-cpp/Makefile +++ b/src-cpp/Makefile @@ -5,7 +5,7 @@ LIBVER= 1 CXXSRCS= RdKafka.cpp ConfImpl.cpp HandleImpl.cpp \ ConsumerImpl.cpp ProducerImpl.cpp KafkaConsumerImpl.cpp \ TopicImpl.cpp TopicPartitionImpl.cpp MessageImpl.cpp \ - QueueImpl.cpp MetadataImpl.cpp + HeadersImpl.cpp QueueImpl.cpp MetadataImpl.cpp HDRS= rdkafkacpp.h diff --git a/src-cpp/ProducerImpl.cpp b/src-cpp/ProducerImpl.cpp index 456bc33787..cd6a1e4838 100644 --- a/src-cpp/ProducerImpl.cpp +++ b/src-cpp/ProducerImpl.cpp @@ -143,14 +143,19 @@ RdKafka::ProducerImpl::produce (RdKafka::Topic *topic, } - RdKafka::ErrorCode RdKafka::ProducerImpl::produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, - void *msg_opaque) { + int64_t timestamp, void *msg_opaque, + RdKafka::Headers *headers) { + rd_kafka_headers_t *hdrs; + if (headers) { + hdrs = headers->c_headers(); + } else { + hdrs = 0; + } return static_cast ( @@ -162,6 +167,7 @@ RdKafka::ProducerImpl::produce (const std::string topic_name, RD_KAFKA_V_KEY(key, key_len), RD_KAFKA_V_TIMESTAMP(timestamp), RD_KAFKA_V_OPAQUE(msg_opaque), + RD_KAFKA_V_HEADERS(hdrs), RD_KAFKA_V_END) ); } diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h index fa7a5014d0..16acc1238f 100644 --- a/src-cpp/rdkafkacpp.h +++ b/src-cpp/rdkafkacpp.h @@ -75,11 +75,11 @@ extern "C" { struct rd_kafka_s; struct rd_kafka_topic_s; struct rd_kafka_message_s; -} + struct rd_kafka_headers_s; +}; namespace RdKafka { - /** * @name Miscellaneous APIs * @{ @@ -448,6 +448,7 @@ std::string err2str(RdKafka::ErrorCode err); /* Forward declarations */ class Producer; class Message; +class Headers; class Queue; class Event; class Topic; @@ -1449,7 +1450,130 @@ class RD_EXPORT MessageTimestamp { int64_t timestamp; /**< Milliseconds since epoch (UTC). */ }; +/** + * @brief Headers object + * + * This object encapsulates the C implementation logic into a C++ object + * for use in RdKafka::Messages object. + */ +class RD_EXPORT Headers { + public: + virtual ~Headers() = 0; + + /** + * @brief Header object + * + * This object represents a single Header with key value pair + * and an ErrorCode + * + * @remark dynamic allocation of this object is not supported. + * + */ + class Header { + public: + Header(const std::string& key, + const char* value, + RdKafka::ErrorCode err = ERR_NO_ERROR): + key(key), err(err) { + // Safe managed copy of the value preserving the bytes + value_container_ = value; + this->value = value_container_.c_str(); + }; + + std::string key; + const char* value; + RdKafka::ErrorCode err; + private: + std::string value_container_; + void *operator new(size_t); /* Prevent dynamic allocation */ + }; + + /** + * @brief create a new instance of the Headers object + */ + static Headers *create(size_t initial_size = 8, bool free_rd_headers = true); + + /** + * @brief create a new instance of the Headers object from a std::vector + */ + static Headers *create(const std::vector
&headers, bool free_rd_headers = true); + /** + * @brief adds a Header to the end + * + * @returns An ErrorCode signalling a success or failure to add the Header. + */ + virtual ErrorCode add(const std::string& key, const char* value) = 0; + + /** + * @brief removes all the Headers of a given key + * + * @returns An ErrorCode signalling a success or failure to remove the Header. + */ + virtual ErrorCode remove(const std::string& key) = 0; + + /** + * @brief gets all of the Headers of a given key + * + * @returns a std::vector containing all the Headers of the given key. + */ + virtual std::vector
get(const std::string &key) const = 0; + + /** + * @brief gets the last occurrence of a Header of a given key + * + * @returns the Header if found, otherwise a Header with an ErrorCode + */ + virtual Header get_last(const std::string& key) const = 0; + + /** + * @brief returns all the Headers of a Message + * + * @returns a std::vector containing all of the Headers of a message + */ + virtual std::vector
get_all() const = 0; + + /** + * @brief the count of all the Headers + * + * @returns a size_t count of all the headers + */ + virtual size_t size() const = 0; + + /** + * @brief Returns the underlying librdkafka C rd_kafka_headers_t handle. + * + * @warning Calling the C API on this handle is not recommended and there + * is no official support for it, but for cases where the C++ API + * does not provide the underlying functionality this C handle can be + * used to interact directly with the core librdkafka API. + * + * @remark The lifetime of the returned pointer can be different than the lifetime + * of the Headers message due to how the producev function in the C API works + * if there is no error then the producev will take care of deallocation + * but if there is an error then it is the responsibility of the calling + * object to deallocate the underlying C implementation if an instance + * of the Headers object is created with free_rd_headers set to `false` + * + * @remark Include prior to including + * + * + * @returns \c rd_kafka_headers_t* + */ + virtual struct rd_kafka_headers_s *c_headers() = 0; + + /** + * @brief cleans up the underlying alocated C implementation headers if called + * + * @remark Safe to call even if the Headers object is set to clean up when + * when the destructor is called + * + * @remark Safe to call even if the underlyng C pointer is set to null + * + * @returns an ErrorCode signalling if the the operation was attempted + */ + virtual ErrorCode destroy_headers() = 0; +}; /** * @brief Message object @@ -1531,6 +1655,9 @@ class RD_EXPORT Message { /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */ virtual void *msg_opaque () const = 0; + /** @returns The Headers instance for this Message (if applicable) */ + virtual RdKafka::Headers *get_headers() = 0; + virtual ~Message () = 0; /** @returns the latency in microseconds for a produced message measured @@ -2257,8 +2384,8 @@ class RD_EXPORT Producer : public virtual Handle { int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, - void *msg_opaque) = 0; + int64_t timestamp, void *msg_opaque, + RdKafka::Headers *headers) = 0; /** diff --git a/src-cpp/rdkafkacpp_int.h b/src-cpp/rdkafkacpp_int.h index 60e5a0f8f8..8c4f9d4846 100644 --- a/src-cpp/rdkafkacpp_int.h +++ b/src-cpp/rdkafkacpp_int.h @@ -48,7 +48,6 @@ typedef int mode_t; namespace RdKafka { - void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque); void log_cb_trampoline (const rd_kafka_t *rk, int level, const char *fac, const char *buf); @@ -120,6 +119,122 @@ class EventImpl : public Event { bool fatal_; }; +class HeadersImpl : public Headers { + public: + HeadersImpl (size_t initial_size, bool free_rd_headers): + headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} + + HeadersImpl (rd_kafka_headers_t *headers): + headers_ (headers), free_headers_ (false) {}; + + HeadersImpl (const std::vector
&headers, bool free_rd_headers): + free_headers_ (free_rd_headers) { + if (headers.size() > 0) { + headers_ = rd_kafka_headers_new(headers.size()); + from_vector(headers); + } else { + headers_ = rd_kafka_headers_new(8); + } + } + + ~HeadersImpl() { + if(free_headers_ && headers_) { + rd_kafka_headers_destroy(headers_); + } + } + + ErrorCode add(const std::string& key, const char* value) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, + key.c_str(), key.size(), + value, strlen(value)); + return static_cast(err); + } + + ErrorCode remove(const std::string& key) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_remove (headers_, key.c_str()); + return static_cast(err); + } + + std::vector get(const std::string &key) const { + std::vector headers; + const void *value; + size_t size; + rd_kafka_resp_err_t err; + for (size_t idx = 0; + !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ;\ + idx++) { + if (value) { + const char* casted_value = static_cast(value); + headers.push_back(Headers::Header(key, casted_value)); + } + } + return headers; + } + + Headers::Header get_last(const std::string& key) const { + const void *value; + size_t size; + rd_kafka_resp_err_t err; + err = rd_kafka_header_get_last (headers_, key.c_str(), &value, &size); + const char* casted_value = static_cast(value); + ErrorCode cpp_error = static_cast(err); + return Headers::Header(key, casted_value, cpp_error); + } + + std::vector get_all() const { + std::vector headers; + size_t idx = 0; + const char *name; + const void *valuep; + size_t size; + while (!rd_kafka_header_get_all(headers_, idx++, + &name, &valuep, &size)) { + if (valuep != NULL) { + const char* casted_value = static_cast(valuep); + headers.push_back(Headers::Header(name, casted_value)); + } + } + return headers; + } + + size_t size() const { + return rd_kafka_header_cnt(headers_); + } + + struct rd_kafka_headers_s* c_headers() { + return headers_; + } + + ErrorCode destroy_headers() { + if (headers_) { + rd_kafka_headers_destroy(headers_); + headers_ = 0; + return RdKafka::ERR_NO_ERROR; + } else { + return RdKafka::ERR_OPERATION_NOT_ATTEMPTED; + } + } + + private: + void from_vector(const std::vector
&headers) { + if (headers.size() > 0) { + for (std::vector
::const_iterator it = headers.begin(); + it != headers.end(); + it++) { + this->add(it->key, it->value); + } + } + } + + HeadersImpl(HeadersImpl const&) /*= delete*/; + HeadersImpl& operator=(HeadersImpl const&) /*= delete*/; + + rd_kafka_headers_t* headers_; + bool free_headers_; +}; + class MessageImpl : public Message { public: @@ -128,17 +243,21 @@ class MessageImpl : public Message { rd_kafka_message_destroy(const_cast(rkmessage_)); if (key_) delete key_; + delete headers_; }; MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage): - topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {} + topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), + headers_(get_headers_from_rkmessage(rkmessage)) {} MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage, bool dofree): - topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL) { } + topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL), + headers_(get_headers_from_rkmessage(rkmessage)) {} MessageImpl (rd_kafka_message_t *rkmessage): - topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) { + topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), + headers_(get_headers_from_rkmessage(rkmessage)) { if (rkmessage->rkt) { /* Possibly NULL */ topic_ = static_cast(rd_kafka_topic_opaque(rkmessage->rkt)); @@ -147,10 +266,11 @@ class MessageImpl : public Message { /* Create errored message */ MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err): - topic_(topic), free_rkmessage_(false), key_(NULL) { + topic_(topic), free_rkmessage_(false), key_(NULL), headers_(0) { rkmessage_ = &rkmessage_err_; memset(&rkmessage_err_, 0, sizeof(rkmessage_err_)); rkmessage_err_.err = static_cast(err); + } std::string errstr() const { @@ -205,6 +325,10 @@ class MessageImpl : public Message { return rd_kafka_message_latency(rkmessage_); } + Headers* get_headers() { + return headers_; + } + struct rd_kafka_message_s *c_ptr () { return rkmessage_; } @@ -220,8 +344,18 @@ class MessageImpl : public Message { * used as a place holder and rkmessage_ is set to point to it. */ rd_kafka_message_t rkmessage_err_; mutable std::string *key_; /* mutable because it's a cached value */ + RdKafka::Headers *headers_; private: + RdKafka::Headers* get_headers_from_rkmessage(rd_kafka_message_t *rkmessage) { + rd_kafka_headers_t *hdrsp; + rd_kafka_resp_err_t err; + + if (rkmessage->len > 0 && !(err = rd_kafka_message_headers(rkmessage, &hdrsp))) { + return new HeadersImpl(hdrsp); + } + return 0; + } /* "delete" copy ctor + copy assignment, for safety of key_ */ MessageImpl(MessageImpl const&) /*= delete*/; MessageImpl& operator=(MessageImpl const&) /*= delete*/; @@ -915,8 +1049,8 @@ class ProducerImpl : virtual public Producer, virtual public HandleImpl { int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, - void *msg_opaque); + int64_t timestamp, void *msg_opaque, + RdKafka::Headers *headers); ErrorCode flush (int timeout_ms) { return static_cast(rd_kafka_flush(rk_, diff --git a/tests/0054-offset_time.cpp b/tests/0054-offset_time.cpp index b550f2a9a3..c1641887b0 100644 --- a/tests/0054-offset_time.cpp +++ b/tests/0054-offset_time.cpp @@ -104,7 +104,7 @@ static void test_offset_time (void) { for (int ti = 0 ; ti < timestamp_cnt*2 ; ti += 2) { err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, (void *)topic.c_str(), topic.size(), NULL, 0, - timestamps[ti], NULL); + timestamps[ti], NULL, NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); } diff --git a/tests/0057-invalid_topic.cpp b/tests/0057-invalid_topic.cpp index d95ada65c3..66cf9e14c6 100644 --- a/tests/0057-invalid_topic.cpp +++ b/tests/0057-invalid_topic.cpp @@ -85,13 +85,13 @@ static void test_invalid_topic (void) { for (int i = -1 ; i < 3 ; i++) { err = p->produce(topic_bad, i, RdKafka::Producer::RK_MSG_COPY, - (void *)"bad", 4, NULL, 0, 0, NULL); + (void *)"bad", 4, NULL, 0, 0, NULL, NULL); if (err) /* Error is probably delayed until delivery report */ check_err(err, RdKafka::ERR_TOPIC_EXCEPTION); err = p->produce(topic_good, i, RdKafka::Producer::RK_MSG_COPY, - (void *)"good", 5, NULL, 0, 0, NULL); + (void *)"good", 5, NULL, 0, 0, NULL, NULL); check_err(err, RdKafka::ERR_NO_ERROR); } diff --git a/tests/0059-bsearch.cpp b/tests/0059-bsearch.cpp index 20f598efef..8caf84497c 100644 --- a/tests/0059-bsearch.cpp +++ b/tests/0059-bsearch.cpp @@ -132,7 +132,7 @@ static void do_test_bsearch (void) { err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, (void *)topic.c_str(), topic.size(), NULL, 0, timestamp, - i == 357 ? (void *)1 /*golden*/ : NULL); + i == 357 ? (void *)1 /*golden*/ : NULL, NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); timestamp += 100 + (timestamp % 9); diff --git a/tests/0065-yield.cpp b/tests/0065-yield.cpp index ffbf1c6d7a..2d6323d359 100644 --- a/tests/0065-yield.cpp +++ b/tests/0065-yield.cpp @@ -93,7 +93,7 @@ static void do_test_producer (bool do_yield) { for (int i = 0 ; i < msgcnt ; i++) { err = p->produce(topic, 0, RdKafka::Producer::RK_MSG_COPY, - (void *)"hi", 2, NULL, 0, 0, NULL); + (void *)"hi", 2, NULL, 0, 0, NULL, NULL); if (err) Test::Fail("produce() failed: " + RdKafka::err2str(err)); } diff --git a/tests/0070-null_empty.cpp b/tests/0070-null_empty.cpp index 68502f06d0..a0850b0854 100644 --- a/tests/0070-null_empty.cpp +++ b/tests/0070-null_empty.cpp @@ -111,7 +111,7 @@ static void do_test_null_empty (bool api_version_request) { (void *)msgs[i+1], msgs[i+1] ? strlen(msgs[i+1]) : 0, /* Key */ (void *)msgs[i], msgs[i] ? strlen(msgs[i]) : 0, - 0, NULL); + 0, NULL, NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); } diff --git a/tests/0085-headers.cpp b/tests/0085-headers.cpp new file mode 100644 index 0000000000..36e35fa969 --- /dev/null +++ b/tests/0085-headers.cpp @@ -0,0 +1,219 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include "testcpp.h" + +class MyCbs : public RdKafka::OffsetCommitCb, public RdKafka::EventCb { + public: + int seen_commit; + int seen_stats; + + void offset_commit_cb (RdKafka::ErrorCode err, + std::vector&offsets) { + seen_commit++; + Test::Say("Got commit callback!\n"); + } + + void event_cb (RdKafka::Event &event) { + switch (event.type()) + { + case RdKafka::Event::EVENT_STATS: + Test::Say("Got stats callback!\n"); + seen_stats++; + break; + default: + break; + } + } +}; + +static void assert_all_headers_match(RdKafka::Headers *actual, + std::vector &expected) { + if (actual->size() != expected.size()) { + Test::Fail(tostr() << "Expected headers length to equal" + << expected.size() << " instead equals " << actual->size() << "\n"); + } + + std::vector actual_headers = actual->get_all(); + for(size_t i = 0; i < actual_headers.size(); i++) { + RdKafka::Headers::Header actual_header = actual_headers[i]; + RdKafka::Headers::Header expected_header = expected[i]; + std::string actual_key = actual_header.key; + std::string actual_value = std::string(actual_header.value); + std::string expected_key = expected_header.key; + std::string expected_value = std::string(expected_header.value); + if (actual_key != expected_key) { + Test::Fail(tostr() << "Header key does not match, expected '" + << actual_key << "' but got '" << expected_key << "'\n"); + } + if (actual_value != expected_value) { + Test::Fail(tostr() << "Header value does not match, expected '" + << actual_value << "' but got '" << expected_value << "'\n"); + } + } +} + +static void test_n_headers (int n, const char* message) { + std::string topic = Test::mk_topic_name("0085-headers", 1); + RdKafka::Conf *conf; + std::string errstr; + + Test::conf_init(&conf, NULL, 0); + + Test::conf_set(conf, "group.id", topic); + Test::conf_set(conf, "group.id", topic); + Test::conf_set(conf, "socket.timeout.ms", "10000"); + Test::conf_set(conf, "enable.auto.commit", "false"); + Test::conf_set(conf, "enable.partition.eof", "false"); + Test::conf_set(conf, "auto.offset.reset", "earliest"); + Test::conf_set(conf, "statistics.interval.ms", "1000"); + + MyCbs cbs; + cbs.seen_commit = 0; + cbs.seen_stats = 0; + if (conf->set("offset_commit_cb", (RdKafka::OffsetCommitCb *)&cbs, errstr) != + RdKafka::Conf::CONF_OK) + Test::Fail("Failed to set commit callback: " + errstr); + if (conf->set("event_cb", (RdKafka::EventCb *)&cbs, errstr) != + RdKafka::Conf::CONF_OK) + Test::Fail("Failed to set event callback: " + errstr); + + RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); + if (!p) + Test::Fail("Failed to create Producer: " + errstr); + + RdKafka::ErrorCode err; + + std::vector headers_arr; + for (int i = 0; i < n; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + headers_arr.push_back(RdKafka::Headers::Header(key, val.c_str())); + } + RdKafka::Headers *produce_headers = RdKafka::Headers::create(headers_arr, false); + + err = p->produce(topic, 0, + RdKafka::Producer::RK_MSG_COPY, + (void *)message, message ? strlen(message) : 0, + (void *)"key", 3, 0, NULL, produce_headers); + + p->flush(tmout_multip(10000)); + + if (p->outq_len() > 0) + Test::Fail(tostr() << "Expected producer to be flushed, " << + p->outq_len() << " messages remain"); + + RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); + if (!c) + Test::Fail("Failed to create KafkaConsumer: " + errstr); + + std::vector topics; + topics.push_back(topic); + if ((err = c->subscribe(topics))) + Test::Fail("subscribe failed: " + RdKafka::err2str(err)); + + int cnt = 0; + while (!cbs.seen_commit || !cbs.seen_stats) { + RdKafka::Message *msg = c->consume(tmout_multip(1000)); + if (!msg->err()) { + cnt++; + Test::Say(tostr() << "Received message #" << cnt << "\n"); + if (cnt > 10) + Test::Fail(tostr() << "Should've seen the " + "offset commit (" << cbs.seen_commit << ") and " + "stats callbacks (" << cbs.seen_stats << ") by now"); + + /* Commit the first message to trigger the offset commit_cb */ + if (cnt == 1) { + err = c->commitAsync(msg); + if (err) + Test::Fail("commitAsync() failed: " + RdKafka::err2str(err)); + rd_sleep(1); /* Sleep to simulate slow processing, making sure + * that the offset commit callback op gets + * inserted on the consume queue in front of + * the messages. */ + } + RdKafka::Headers *headers = msg->get_headers(); + if (!headers) { + Test::Fail("Expected RdKafka::Message to contain headers"); + } + + assert_all_headers_match(headers, headers_arr); + + } else if (msg->err() == RdKafka::ERR__TIMED_OUT) + ; /* Stil rebalancing? */ + else + Test::Fail("consume() failed: " + msg->errstr()); + delete msg; + } + + c->close(); + delete c; + delete p; + delete conf; + +} + +static void test_one_header () { + Test::Say("Test one header in consumed message.\n"); + std::string val = "valid"; + test_n_headers(1, val.c_str()); +} + +static void test_ten_headers () { + Test::Say("Test ten headers in consumed message.\n"); + std::string val = "valid"; + test_n_headers(10, val.c_str()); +} + +static void test_one_header_null_msg () { + Test::Say("Test one header in consumed message with a null value message.\n"); + test_n_headers(1, NULL); +} + +static void test_one_header_empty_msg () { + Test::Say("Test one header in consumed message with an empty value message.\n"); + std::string val = ""; + test_n_headers(1, val.c_str()); +} + +extern "C" { + int main_0085_headers (int argc, char **argv) { + test_one_header(); + test_ten_headers(); + // These two tests fail and I'm not sure if this is correct behaviour + // test_one_header_null_msg(); + // test_one_header_empty_msg(); + return 0; + } +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9f94aa31b0..c824409111 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -77,6 +77,7 @@ set( 0082-fetch_max_bytes.cpp 0083-cb_event.c 0084-destroy_flags.c + 0085-headers.cpp 0086-purge.c 0088-produce_metadata_timeout.c 0089-max_poll_interval.c diff --git a/tests/test.c b/tests/test.c index 890eca2605..4125c1bbd9 100644 --- a/tests/test.c +++ b/tests/test.c @@ -177,11 +177,13 @@ _TEST_DECL(0082_fetch_max_bytes); _TEST_DECL(0083_cb_event); _TEST_DECL(0084_destroy_flags_local); _TEST_DECL(0084_destroy_flags); +_TEST_DECL(0085_headers); _TEST_DECL(0086_purge_local); _TEST_DECL(0086_purge_remote); _TEST_DECL(0088_produce_metadata_timeout); _TEST_DECL(0089_max_poll_interval); + /* Manual tests */ _TEST_DECL(8000_idle); @@ -289,6 +291,7 @@ struct test tests[] = { _TEST(0083_cb_event, 0, TEST_BRKVER(0,9,0,0)), _TEST(0084_destroy_flags_local, TEST_F_LOCAL), _TEST(0084_destroy_flags, 0), + _TEST(0085_headers, 0, TEST_BRKVER(0,11,0,0)), _TEST(0086_purge_local, TEST_F_LOCAL), _TEST(0086_purge_remote, 0), #if WITH_SOCKEM diff --git a/win32/librdkafkacpp/librdkafkacpp.vcxproj b/win32/librdkafkacpp/librdkafkacpp.vcxproj index 789c0d127f..40cbabc8bd 100644 --- a/win32/librdkafkacpp/librdkafkacpp.vcxproj +++ b/win32/librdkafkacpp/librdkafkacpp.vcxproj @@ -85,6 +85,7 @@ + diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index c1174b6f48..9b013c6d63 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -167,6 +167,7 @@ + From 415832010f5ff2c3641a5157b4fb77a385efac29 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Mon, 13 Aug 2018 16:25:10 +0100 Subject: [PATCH 2/4] Header support for C++ API --- src-cpp/rdkafkacpp.h | 26 ++++++--- src-cpp/rdkafkacpp_int.h | 116 +++++++++++++++++++++++++++++++++++++++ tests/test.c | 1 - 3 files changed, 134 insertions(+), 9 deletions(-) diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h index 16acc1238f..8e61ece983 100644 --- a/src-cpp/rdkafkacpp.h +++ b/src-cpp/rdkafkacpp.h @@ -1474,16 +1474,25 @@ class RD_EXPORT Headers { Header(const std::string& key, const char* value, RdKafka::ErrorCode err = ERR_NO_ERROR): - key(key), err(err) { - // Safe managed copy of the value preserving the bytes - value_container_ = value; - this->value = value_container_.c_str(); + key_(key), err_(err) { + value_container_.assign(value); }; - - std::string key; - const char* value; - RdKafka::ErrorCode err; + + std::string key() const { + return key_; + } + + const char* value() const { + return value_container_.c_str(); + } + + RdKafka::ErrorCode err() const { + return err_; + } + private: + std::string key_; + RdKafka::ErrorCode err_; std::string value_container_; void *operator new(size_t); /* Prevent dynamic allocation */ }; @@ -1533,6 +1542,7 @@ class RD_EXPORT Headers { */ virtual std::vector
get_all() const = 0; + /** * @brief the count of all the Headers * diff --git a/src-cpp/rdkafkacpp_int.h b/src-cpp/rdkafkacpp_int.h index 8c4f9d4846..776c3babb5 100644 --- a/src-cpp/rdkafkacpp_int.h +++ b/src-cpp/rdkafkacpp_int.h @@ -235,6 +235,122 @@ class HeadersImpl : public Headers { bool free_headers_; }; +class HeadersImpl : public Headers { + public: + HeadersImpl (size_t initial_size, bool free_rd_headers): + headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} + + HeadersImpl (rd_kafka_headers_t *headers): + headers_ (headers), free_headers_ (false) {}; + + HeadersImpl (const std::vector
&headers, bool free_rd_headers): + free_headers_ (free_rd_headers) { + if (headers.size() > 0) { + headers_ = rd_kafka_headers_new(headers.size()); + from_vector(headers); + } else { + headers_ = rd_kafka_headers_new(8); + } + } + + ~HeadersImpl() { + if(free_headers_ && headers_) { + rd_kafka_headers_destroy(headers_); + } + } + + ErrorCode add(const std::string& key, const char* value) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, + key.c_str(), key.size(), + value, strlen(value)); + return static_cast(err); + } + + ErrorCode remove(const std::string& key) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_remove (headers_, key.c_str()); + return static_cast(err); + } + + std::vector get(const std::string &key) const { + std::vector headers; + const void *value; + size_t size; + rd_kafka_resp_err_t err; + for (size_t idx = 0; + !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ;\ + idx++) { + if (value) { + const char* casted_value = static_cast(value); + headers.push_back(Headers::Header(key, casted_value)); + } + } + return headers; + } + + Headers::Header get_last(const std::string& key) const { + const void *value; + size_t size; + rd_kafka_resp_err_t err; + err = rd_kafka_header_get_last (headers_, key.c_str(), &value, &size); + const char* casted_value = static_cast(value); + ErrorCode cpp_error = static_cast(err); + return Headers::Header(key, casted_value, cpp_error); + } + + std::vector get_all() const { + std::vector headers; + size_t idx = 0; + const char *name; + const void *valuep; + size_t size; + while (!rd_kafka_header_get_all(headers_, idx++, + &name, &valuep, &size)) { + if (valuep != NULL) { + const char* casted_value = static_cast(valuep); + headers.push_back(Headers::Header(name, casted_value)); + } + } + return headers; + } + + size_t size() const { + return rd_kafka_header_cnt(headers_); + } + + struct rd_kafka_headers_s* c_headers() { + return headers_; + } + + ErrorCode destroy_headers() { + if (headers_) { + rd_kafka_headers_destroy(headers_); + headers_ = 0; + return RdKafka::ERR_NO_ERROR; + } else { + return RdKafka::ERR_OPERATION_NOT_ATTEMPTED; + } + } + + private: + void from_vector(const std::vector
&headers) { + if (headers.size() > 0) { + for (std::vector
::const_iterator it = headers.begin(); + it != headers.end(); + it++) { + this->add(it->key(), it->value()); + } + } + } + + HeadersImpl(HeadersImpl const&) /*= delete*/; + HeadersImpl& operator=(HeadersImpl const&) /*= delete*/; + + rd_kafka_headers_t* headers_; + bool free_headers_; +}; + class MessageImpl : public Message { public: diff --git a/tests/test.c b/tests/test.c index 4125c1bbd9..a63ac64153 100644 --- a/tests/test.c +++ b/tests/test.c @@ -183,7 +183,6 @@ _TEST_DECL(0086_purge_remote); _TEST_DECL(0088_produce_metadata_timeout); _TEST_DECL(0089_max_poll_interval); - /* Manual tests */ _TEST_DECL(8000_idle); From 5d4504e054e37af3fa062e40d81b6b7f7bdacdc8 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Mon, 15 Oct 2018 15:25:05 +0000 Subject: [PATCH 3/4] Applied most review comments --- examples/kafkatest_verifiable_client.cpp | 40 +-- src-cpp/HeadersImpl.cpp | 10 +- src-cpp/ProducerImpl.cpp | 26 +- src-cpp/rdkafkacpp.h | 181 +++++++++-- src-cpp/rdkafkacpp_int.h | 74 +++-- tests/0054-offset_time.cpp | 2 +- tests/0057-invalid_topic.cpp | 4 +- tests/0059-bsearch.cpp | 2 +- tests/0065-yield.cpp | 2 +- tests/0070-null_empty.cpp | 2 +- tests/0085-headers.cpp | 397 ++++++++++++++++------- 11 files changed, 531 insertions(+), 209 deletions(-) diff --git a/examples/kafkatest_verifiable_client.cpp b/examples/kafkatest_verifiable_client.cpp index 4c9e1e1a64..542ef823f6 100644 --- a/examples/kafkatest_verifiable_client.cpp +++ b/examples/kafkatest_verifiable_client.cpp @@ -457,17 +457,6 @@ void msg_consume(RdKafka::KafkaConsumer *consumer, " [" << (int)msg->partition() << "] at offset " << msg->offset() << std::endl; - RdKafka::Headers *headers = msg->get_headers(); - if (headers) { - std::vector sheaders = headers->get_all(); - std::cout << "Headers length: " << sheaders.size() << std::endl; - for(std::vector::const_iterator it = sheaders.begin(); - it != sheaders.end(); - it++) { - std::cout << "Key: " << (*it).key << " Value: " << (*it).value << std::endl; - } - } - if (state.maxMessages >= 0 && state.consumer.consumedMessages >= state.maxMessages) return; @@ -842,42 +831,31 @@ int main (int argc, char **argv) { msg << value_prefix << i; while (true) { RdKafka::ErrorCode resp; - RdKafka::Headers *headers = 0; - if (create_time == -1) { - resp = producer->produce(topic, partition, - RdKafka::Producer::RK_MSG_COPY /* Copy payload */, - const_cast(msg.str().c_str()), - msg.str().size(), NULL, NULL); - } else { - std::string name = "kafkaheader"; - std::string val = "header_val"; - std::vector headers_arr; - headers_arr.push_back(RdKafka::Headers::Header(name, val.c_str())); - - headers = RdKafka::Headers::create(headers_arr, false); - resp = producer->produce(topics[0], partition, + if (create_time == -1) { + resp = producer->produce(topic, partition, + RdKafka::Producer::RK_MSG_COPY /* Copy payload */, + const_cast(msg.str().c_str()), + msg.str().size(), NULL, NULL); + } else { + resp = producer->produce(topics[0], partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast(msg.str().c_str()), msg.str().size(), NULL, 0, create_time, - NULL, headers); - } + NULL); + } if (resp == RdKafka::ERR__QUEUE_FULL) { producer->poll(100); continue; } else if (resp != RdKafka::ERR_NO_ERROR) { - headers->destroy_headers(); errorString("producer_send_error", RdKafka::err2str(resp), topic->name(), NULL, msg.str()); state.producer.numErr++; } else { state.producer.numSent++; } - if (headers) { - delete headers; - } break; } diff --git a/src-cpp/HeadersImpl.cpp b/src-cpp/HeadersImpl.cpp index 27a655a612..d7b5f9357d 100644 --- a/src-cpp/HeadersImpl.cpp +++ b/src-cpp/HeadersImpl.cpp @@ -33,15 +33,15 @@ #include "rdkafkacpp_int.h" -RdKafka::Headers *RdKafka::Headers::create(size_t initial_count, bool free_rd_headers) { - return new RdKafka::HeadersImpl(initial_count, free_rd_headers); +RdKafka::Headers *RdKafka::Headers::create(size_t initial_count) { + return new RdKafka::HeadersImpl(initial_count, false); } -RdKafka::Headers *RdKafka::Headers::create(const std::vector
&headers, bool free_rd_headers) { +RdKafka::Headers *RdKafka::Headers::create(const std::vector
&headers) { if (headers.size() > 0) { - return new RdKafka::HeadersImpl(headers, free_rd_headers); + return new RdKafka::HeadersImpl(headers, false); } else { - return 0; + return new RdKafka::HeadersImpl(8, false); } } diff --git a/src-cpp/ProducerImpl.cpp b/src-cpp/ProducerImpl.cpp index cd6a1e4838..5df139e72b 100644 --- a/src-cpp/ProducerImpl.cpp +++ b/src-cpp/ProducerImpl.cpp @@ -143,6 +143,27 @@ RdKafka::ProducerImpl::produce (RdKafka::Topic *topic, } +RdKafka::ErrorCode +RdKafka::ProducerImpl::produce (const std::string topic_name, + int32_t partition, int msgflags, + void *payload, size_t len, + const void *key, size_t key_len, + int64_t timestamp, void *msg_opaque) { + return + static_cast + ( + rd_kafka_producev(rk_, + RD_KAFKA_V_TOPIC(topic_name.c_str()), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_MSGFLAGS(msgflags), + RD_KAFKA_V_VALUE(payload, len), + RD_KAFKA_V_KEY(key, key_len), + RD_KAFKA_V_TIMESTAMP(timestamp), + RD_KAFKA_V_OPAQUE(msg_opaque), + RD_KAFKA_V_END) + ); +} + RdKafka::ErrorCode RdKafka::ProducerImpl::produce (const std::string topic_name, int32_t partition, int msgflags, @@ -150,12 +171,11 @@ RdKafka::ProducerImpl::produce (const std::string topic_name, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque, RdKafka::Headers *headers) { - rd_kafka_headers_t *hdrs; + rd_kafka_headers_t *hdrs = NULL; if (headers) { hdrs = headers->c_headers(); - } else { - hdrs = 0; } + return static_cast ( diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h index 8e61ece983..70685a4ff2 100644 --- a/src-cpp/rdkafkacpp.h +++ b/src-cpp/rdkafkacpp.h @@ -50,9 +50,10 @@ #include #include #include +#include +#include #include - #ifdef _MSC_VER #undef RD_EXPORT #ifdef LIBRDKAFKA_STATICLIB @@ -1455,6 +1456,8 @@ class RD_EXPORT MessageTimestamp { * * This object encapsulates the C implementation logic into a C++ object * for use in RdKafka::Messages object. + * + * @remark Requires Apache Kafka >= 0.11.0 brokers */ class RD_EXPORT Headers { public: @@ -1467,55 +1470,175 @@ class RD_EXPORT Headers { * and an ErrorCode * * @remark dynamic allocation of this object is not supported. - * */ class Header { public: - Header(const std::string& key, - const char* value, - RdKafka::ErrorCode err = ERR_NO_ERROR): - key_(key), err_(err) { - value_container_.assign(value); + /** + * @brief Header object to encapsulate a single Header + * + * @param key the string value for the header key + * + * @param value the bytes of the header value + * + * @param value_size the length in bytes of the header value + */ + Header(const std::string &key, + const void *value, + size_t value_size): + key_(key), err_(ERR_NO_ERROR), value_size_(value_size) { + value_ = copy_value(value, value_size); + }; + + /** + * @brief Header object to encapsulate a single Header + * + * @param key the string value for the header key + * + * @param value the bytes of the header value + * + * @param value_size the length in bytes of the header value + * + * @param err the error code if one returned + * + * @remark The error code is used for when the Header is constructed internally + * by using something like RdKafka::Headers::get_last which constructs + * a Header encapsulating the ErrorCode in the process + */ + Header(const std::string &key, + const void *value, + size_t value_size, + const RdKafka::ErrorCode &err): + key_(key), err_(err), value_size_(value_size) { + value_ = copy_value(value, value_size); }; + + /** + * @brief Copy constructor + * + * @param other the other Header used for the copy constructor + */ + Header(const Header &other) + { + key_ = other.key_; + err_ = other.err_; + value_size_ = other.value_size_; + + value_ = copy_value(other.value_, value_size_); + } + + Header& operator=(const Header &other) + { + if(&other == this) { + return *this; + } + + key_ = other.key_; + err_ = other.err_; + value_size_ = other.value_size_; + + value_ = copy_value(other.value_, value_size_); + + return *this; + } + + ~Header() { + if (value_ != NULL) { + free(value_); + } + } + /** @returns Key the Key associated with this Header */ std::string key() const { return key_; } - const char* value() const { - return value_container_.c_str(); + /** @returns Value returns the binary value */ + const void *value() const { + return value_; + } + + /** @returns Value returns the value casted to a C string */ + const char *value_string() const { + return static_cast(value_); } + /** @returns Value Size the length of the Value in bytes */ + size_t value_size() const { + return value_size_; + } + + /** @returns Error Code the error code of this Header (usually ERR_NO_ERROR) */ RdKafka::ErrorCode err() const { return err_; } private: + char *copy_value(const void* value, size_t value_size) { + char * dest = NULL; + if (value != NULL) { + dest = (char*) malloc(value_size + 1); + memcpy(dest, (char*)value, value_size); + dest[value_size] = '\0'; + } + return dest; + } std::string key_; RdKafka::ErrorCode err_; - std::string value_container_; + char *value_; + size_t value_size_; void *operator new(size_t); /* Prevent dynamic allocation */ }; /** * @brief create a new instance of the Headers object + * + * @params initial_size initial size to set the Headers list to + * + * @returns Empty Headers list set to the initial size */ - static Headers *create(size_t initial_size = 8, bool free_rd_headers = true); + static Headers *create(size_t initial_size); /** * @brief create a new instance of the Headers object from a std::vector + * + * @params headers std::vector of RdKafka::Headers::Header objects + * + * @returns Headers list from std::vector set to the size of the std::vector */ - static Headers *create(const std::vector
&headers, bool free_rd_headers = true); + static Headers *create(const std::vector
&headers); /** * @brief adds a Header to the end + * + * @param key the header key as a std::string + * + * @param value the value as a binary value + * + * @param value_size the size of the value added * - * @returns An ErrorCode signalling a success or failure to add the Header. + * @returns An ErrorCode signalling a success or failure to add the header. */ - virtual ErrorCode add(const std::string& key, const char* value) = 0; + virtual ErrorCode add(const std::string& key, const void* value, size_t value_size) = 0; + + /** + * @brief adds a Header to the end + * + * @param key the header key as a std::string + * + * @param value the value as a std::string + * + * @remark convenience method for adding a std::string as a value for the header + * + * @returns An ErrorCode signalling a success or failure to add the header. + */ + virtual ErrorCode add(const std::string& key, const std::string &value) = 0; /** * @brief removes all the Headers of a given key + * + * @param key the header key as a std::string you want to remove + * + * @remark if duplicate keys exist this will remove all of them * * @returns An ErrorCode signalling a success or failure to remove the Header. */ @@ -1523,6 +1646,10 @@ class RD_EXPORT Headers { /** * @brief gets all of the Headers of a given key + * + * @param key the header key as a std::string you want to get + * + * @remark if duplicate keys exist this will return them all as a std::vector * * @returns a std::vector containing all the Headers of the given key. */ @@ -1530,6 +1657,10 @@ class RD_EXPORT Headers { /** * @brief gets the last occurrence of a Header of a given key + * + * @param key the header key as a std::string you want to get + * + * @remark this will only return the most recently added header * * @returns the Header if found, otherwise a Header with an ErrorCode */ @@ -1573,16 +1704,14 @@ class RD_EXPORT Headers { virtual struct rd_kafka_headers_s *c_headers() = 0; /** - * @brief cleans up the underlying alocated C implementation headers if called + * @brief cleans up the underlying allocated C implementation headers if called * * @remark Safe to call even if the Headers object is set to clean up when * when the destructor is called * * @remark Safe to call even if the underlyng C pointer is set to null - * - * @returns an ErrorCode signalling if the the operation was attempted */ - virtual ErrorCode destroy_headers() = 0; + virtual void destroy_headers() = 0; }; /** @@ -1665,9 +1794,6 @@ class RD_EXPORT Message { /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */ virtual void *msg_opaque () const = 0; - /** @returns The Headers instance for this Message (if applicable) */ - virtual RdKafka::Headers *get_headers() = 0; - virtual ~Message () = 0; /** @returns the latency in microseconds for a produced message measured @@ -1696,6 +1822,9 @@ class RD_EXPORT Message { * @brief Returns the message's persistance status in the topic log. */ virtual Status status () const = 0; + + /** @returns The Headers instance for this Message (if applicable) */ + virtual RdKafka::Headers *get_headers() = 0; }; /**@}*/ @@ -2390,6 +2519,16 @@ class RD_EXPORT Producer : public virtual Handle { * message timestamp (microseconds since beginning of epoch, UTC). * Otherwise identical to produce() above. */ + virtual ErrorCode produce (const std::string topic_name, int32_t partition, + int msgflags, + void *payload, size_t len, + const void *key, size_t key_len, + int64_t timestamp, void *msg_opaque) = 0; + + /** + * @brief produce() variant that that allows for Header support on produce + * Otherwise identical to produce() above. + */ virtual ErrorCode produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, diff --git a/src-cpp/rdkafkacpp_int.h b/src-cpp/rdkafkacpp_int.h index 776c3babb5..782ee5f031 100644 --- a/src-cpp/rdkafkacpp_int.h +++ b/src-cpp/rdkafkacpp_int.h @@ -240,8 +240,8 @@ class HeadersImpl : public Headers { HeadersImpl (size_t initial_size, bool free_rd_headers): headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} - HeadersImpl (rd_kafka_headers_t *headers): - headers_ (headers), free_headers_ (false) {}; + HeadersImpl (rd_kafka_headers_t *headers, bool free_rd_headers): + headers_ (headers), free_headers_ (free_rd_headers) {}; HeadersImpl (const std::vector
&headers, bool free_rd_headers): free_headers_ (free_rd_headers) { @@ -254,20 +254,28 @@ class HeadersImpl : public Headers { } ~HeadersImpl() { - if(free_headers_ && headers_) { + if (free_headers_ && headers_) { rd_kafka_headers_destroy(headers_); } } - ErrorCode add(const std::string& key, const char* value) { + ErrorCode add(const std::string &key, const void *value, size_t value_size) { rd_kafka_resp_err_t err; err = rd_kafka_header_add(headers_, key.c_str(), key.size(), - value, strlen(value)); + value, value_size); return static_cast(err); } - ErrorCode remove(const std::string& key) { + ErrorCode add(const std::string &key, const std::string &value) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, + key.c_str(), key.size(), + value.c_str(), value.size()); + return static_cast(err); + } + + ErrorCode remove(const std::string &key) { rd_kafka_resp_err_t err; err = rd_kafka_header_remove (headers_, key.c_str()); return static_cast(err); @@ -279,11 +287,10 @@ class HeadersImpl : public Headers { size_t size; rd_kafka_resp_err_t err; for (size_t idx = 0; - !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ;\ + !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ; idx++) { if (value) { - const char* casted_value = static_cast(value); - headers.push_back(Headers::Header(key, casted_value)); + headers.push_back(Headers::Header(key, value, size)); } } return headers; @@ -293,23 +300,21 @@ class HeadersImpl : public Headers { const void *value; size_t size; rd_kafka_resp_err_t err; - err = rd_kafka_header_get_last (headers_, key.c_str(), &value, &size); - const char* casted_value = static_cast(value); + err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size); ErrorCode cpp_error = static_cast(err); - return Headers::Header(key, casted_value, cpp_error); + return Headers::Header(key, value, size, cpp_error); } std::vector get_all() const { std::vector headers; size_t idx = 0; const char *name; - const void *valuep; + const void *value; size_t size; while (!rd_kafka_header_get_all(headers_, idx++, - &name, &valuep, &size)) { - if (valuep != NULL) { - const char* casted_value = static_cast(valuep); - headers.push_back(Headers::Header(name, casted_value)); + &name, &value, &size)) { + if (value != NULL) { + headers.push_back(Headers::Header(name, value, size)); } } return headers; @@ -323,13 +328,10 @@ class HeadersImpl : public Headers { return headers_; } - ErrorCode destroy_headers() { + void destroy_headers() { if (headers_) { rd_kafka_headers_destroy(headers_); headers_ = 0; - return RdKafka::ERR_NO_ERROR; - } else { - return RdKafka::ERR_OPERATION_NOT_ATTEMPTED; } } @@ -339,7 +341,7 @@ class HeadersImpl : public Headers { for (std::vector
::const_iterator it = headers.begin(); it != headers.end(); it++) { - this->add(it->key(), it->value()); + this->add(it->key(), it->value(), it->value_size()); } } } @@ -358,8 +360,9 @@ class MessageImpl : public Message { if (free_rkmessage_) rd_kafka_message_destroy(const_cast(rkmessage_)); if (key_) - delete key_; - delete headers_; + delete key_; + if (headers_) + delete headers_; }; MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage): @@ -382,11 +385,10 @@ class MessageImpl : public Message { /* Create errored message */ MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err): - topic_(topic), free_rkmessage_(false), key_(NULL), headers_(0) { + topic_(topic), free_rkmessage_(false), key_(NULL), headers_(NULL) { rkmessage_ = &rkmessage_err_; memset(&rkmessage_err_, 0, sizeof(rkmessage_err_)); rkmessage_err_.err = static_cast(err); - } std::string errstr() const { @@ -441,10 +443,6 @@ class MessageImpl : public Message { return rd_kafka_message_latency(rkmessage_); } - Headers* get_headers() { - return headers_; - } - struct rd_kafka_message_s *c_ptr () { return rkmessage_; } @@ -453,6 +451,10 @@ class MessageImpl : public Message { return static_cast(rd_kafka_message_status(rkmessage_)); } + Headers* get_headers() { + return headers_; + } + RdKafka::Topic *topic_; rd_kafka_message_t *rkmessage_; bool free_rkmessage_; @@ -467,10 +469,10 @@ class MessageImpl : public Message { rd_kafka_headers_t *hdrsp; rd_kafka_resp_err_t err; - if (rkmessage->len > 0 && !(err = rd_kafka_message_headers(rkmessage, &hdrsp))) { - return new HeadersImpl(hdrsp); + if (rkmessage->len > 0 && !(err = rd_kafka_message_detach_headers(rkmessage, &hdrsp))) { + return new HeadersImpl(hdrsp, free_rkmessage_); } - return 0; + return NULL; } /* "delete" copy ctor + copy assignment, for safety of key_ */ MessageImpl(MessageImpl const&) /*= delete*/; @@ -1161,6 +1163,12 @@ class ProducerImpl : virtual public Producer, virtual public HandleImpl { const std::vector *key, void *msg_opaque); + ErrorCode produce (const std::string topic_name, int32_t partition, + int msgflags, + void *payload, size_t len, + const void *key, size_t key_len, + int64_t timestamp, void *msg_opaque); + ErrorCode produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, diff --git a/tests/0054-offset_time.cpp b/tests/0054-offset_time.cpp index c1641887b0..b550f2a9a3 100644 --- a/tests/0054-offset_time.cpp +++ b/tests/0054-offset_time.cpp @@ -104,7 +104,7 @@ static void test_offset_time (void) { for (int ti = 0 ; ti < timestamp_cnt*2 ; ti += 2) { err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, (void *)topic.c_str(), topic.size(), NULL, 0, - timestamps[ti], NULL, NULL); + timestamps[ti], NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); } diff --git a/tests/0057-invalid_topic.cpp b/tests/0057-invalid_topic.cpp index 66cf9e14c6..d95ada65c3 100644 --- a/tests/0057-invalid_topic.cpp +++ b/tests/0057-invalid_topic.cpp @@ -85,13 +85,13 @@ static void test_invalid_topic (void) { for (int i = -1 ; i < 3 ; i++) { err = p->produce(topic_bad, i, RdKafka::Producer::RK_MSG_COPY, - (void *)"bad", 4, NULL, 0, 0, NULL, NULL); + (void *)"bad", 4, NULL, 0, 0, NULL); if (err) /* Error is probably delayed until delivery report */ check_err(err, RdKafka::ERR_TOPIC_EXCEPTION); err = p->produce(topic_good, i, RdKafka::Producer::RK_MSG_COPY, - (void *)"good", 5, NULL, 0, 0, NULL, NULL); + (void *)"good", 5, NULL, 0, 0, NULL); check_err(err, RdKafka::ERR_NO_ERROR); } diff --git a/tests/0059-bsearch.cpp b/tests/0059-bsearch.cpp index 8caf84497c..20f598efef 100644 --- a/tests/0059-bsearch.cpp +++ b/tests/0059-bsearch.cpp @@ -132,7 +132,7 @@ static void do_test_bsearch (void) { err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY, (void *)topic.c_str(), topic.size(), NULL, 0, timestamp, - i == 357 ? (void *)1 /*golden*/ : NULL, NULL); + i == 357 ? (void *)1 /*golden*/ : NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); timestamp += 100 + (timestamp % 9); diff --git a/tests/0065-yield.cpp b/tests/0065-yield.cpp index 2d6323d359..ffbf1c6d7a 100644 --- a/tests/0065-yield.cpp +++ b/tests/0065-yield.cpp @@ -93,7 +93,7 @@ static void do_test_producer (bool do_yield) { for (int i = 0 ; i < msgcnt ; i++) { err = p->produce(topic, 0, RdKafka::Producer::RK_MSG_COPY, - (void *)"hi", 2, NULL, 0, 0, NULL, NULL); + (void *)"hi", 2, NULL, 0, 0, NULL); if (err) Test::Fail("produce() failed: " + RdKafka::err2str(err)); } diff --git a/tests/0070-null_empty.cpp b/tests/0070-null_empty.cpp index a0850b0854..68502f06d0 100644 --- a/tests/0070-null_empty.cpp +++ b/tests/0070-null_empty.cpp @@ -111,7 +111,7 @@ static void do_test_null_empty (bool api_version_request) { (void *)msgs[i+1], msgs[i+1] ? strlen(msgs[i+1]) : 0, /* Key */ (void *)msgs[i], msgs[i] ? strlen(msgs[i]) : 0, - 0, NULL, NULL); + 0, NULL); if (err != RdKafka::ERR_NO_ERROR) Test::Fail("Produce failed: " + RdKafka::err2str(err)); } diff --git a/tests/0085-headers.cpp b/tests/0085-headers.cpp index 36e35fa969..72f7189d48 100644 --- a/tests/0085-headers.cpp +++ b/tests/0085-headers.cpp @@ -29,45 +29,36 @@ #include #include "testcpp.h" -class MyCbs : public RdKafka::OffsetCommitCb, public RdKafka::EventCb { - public: - int seen_commit; - int seen_stats; - - void offset_commit_cb (RdKafka::ErrorCode err, - std::vector&offsets) { - seen_commit++; - Test::Say("Got commit callback!\n"); - } - - void event_cb (RdKafka::Event &event) { - switch (event.type()) - { - case RdKafka::Event::EVENT_STATS: - Test::Say("Got stats callback!\n"); - seen_stats++; - break; - default: - break; - } - } -}; - static void assert_all_headers_match(RdKafka::Headers *actual, - std::vector &expected) { - if (actual->size() != expected.size()) { - Test::Fail(tostr() << "Expected headers length to equal" - << expected.size() << " instead equals " << actual->size() << "\n"); + RdKafka::Headers *expected) { + if (!actual) { + Test::Fail("Expected RdKafka::Message to contain headers"); + } + if (actual->size() != expected->size()) { + Test::Fail(tostr() << "Expected headers length to equal " + << expected->size() << " instead equals " << actual->size() << "\n"); } std::vector actual_headers = actual->get_all(); + std::vector expected_headers = expected->get_all(); + Test::Say(tostr() << "Header size " << actual_headers.size() << "\n"); for(size_t i = 0; i < actual_headers.size(); i++) { RdKafka::Headers::Header actual_header = actual_headers[i]; - RdKafka::Headers::Header expected_header = expected[i]; - std::string actual_key = actual_header.key; - std::string actual_value = std::string(actual_header.value); - std::string expected_key = expected_header.key; - std::string expected_value = std::string(expected_header.value); + RdKafka::Headers::Header expected_header = expected_headers[i]; + std::string actual_key = actual_header.key(); + std::string actual_value = std::string( + actual_header.value_string(), + actual_header.value_size() + ); + std::string expected_key = expected_header.key(); + std::string expected_value = std::string( + actual_header.value_string(), + expected_header.value_size() + ); + + Test::Say(tostr() << "Expected Key " << expected_key << " Expected val " << expected_value + << " Actual key " << actual_key << " Actual val " << actual_value << "\n"); + if (actual_key != expected_key) { Test::Fail(tostr() << "Header key does not match, expected '" << actual_key << "' but got '" << expected_key << "'\n"); @@ -79,7 +70,8 @@ static void assert_all_headers_match(RdKafka::Headers *actual, } } -static void test_n_headers (int n, const char* message) { +static void test_headers (RdKafka::Headers *produce_headers, + RdKafka::Headers *compare_headers) { std::string topic = Test::mk_topic_name("0085-headers", 1); RdKafka::Conf *conf; std::string errstr; @@ -87,22 +79,6 @@ static void test_n_headers (int n, const char* message) { Test::conf_init(&conf, NULL, 0); Test::conf_set(conf, "group.id", topic); - Test::conf_set(conf, "group.id", topic); - Test::conf_set(conf, "socket.timeout.ms", "10000"); - Test::conf_set(conf, "enable.auto.commit", "false"); - Test::conf_set(conf, "enable.partition.eof", "false"); - Test::conf_set(conf, "auto.offset.reset", "earliest"); - Test::conf_set(conf, "statistics.interval.ms", "1000"); - - MyCbs cbs; - cbs.seen_commit = 0; - cbs.seen_stats = 0; - if (conf->set("offset_commit_cb", (RdKafka::OffsetCommitCb *)&cbs, errstr) != - RdKafka::Conf::CONF_OK) - Test::Fail("Failed to set commit callback: " + errstr); - if (conf->set("event_cb", (RdKafka::EventCb *)&cbs, errstr) != - RdKafka::Conf::CONF_OK) - Test::Fail("Failed to set event callback: " + errstr); RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); if (!p) @@ -110,21 +86,9 @@ static void test_n_headers (int n, const char* message) { RdKafka::ErrorCode err; - std::vector headers_arr; - for (int i = 0; i < n; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - headers_arr.push_back(RdKafka::Headers::Header(key, val.c_str())); - } - RdKafka::Headers *produce_headers = RdKafka::Headers::create(headers_arr, false); - err = p->produce(topic, 0, RdKafka::Producer::RK_MSG_COPY, - (void *)message, message ? strlen(message) : 0, + (void *)"message", 7, (void *)"key", 3, 0, NULL, produce_headers); p->flush(tmout_multip(10000)); @@ -136,84 +100,297 @@ static void test_n_headers (int n, const char* message) { RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); if (!c) Test::Fail("Failed to create KafkaConsumer: " + errstr); - - std::vector topics; - topics.push_back(topic); - if ((err = c->subscribe(topics))) - Test::Fail("subscribe failed: " + RdKafka::err2str(err)); + std::vector parts; + parts.push_back(RdKafka::TopicPartition::create(topic, 0, + RdKafka::Topic::OFFSET_BEGINNING)); + err = c->assign(parts); + if (err != RdKafka::ERR_NO_ERROR) + Test::Fail("assign() failed: " + RdKafka::err2str(err)); + RdKafka::TopicPartition::destroy(parts); int cnt = 0; - while (!cbs.seen_commit || !cbs.seen_stats) { - RdKafka::Message *msg = c->consume(tmout_multip(1000)); - if (!msg->err()) { + bool running = true; + + while (running) { + RdKafka::Message *msg = c->consume(10000); + Test::Say(tostr() << msg->err()); + if (msg->err() == RdKafka::ERR_NO_ERROR) { cnt++; Test::Say(tostr() << "Received message #" << cnt << "\n"); - if (cnt > 10) - Test::Fail(tostr() << "Should've seen the " - "offset commit (" << cbs.seen_commit << ") and " - "stats callbacks (" << cbs.seen_stats << ") by now"); - - /* Commit the first message to trigger the offset commit_cb */ - if (cnt == 1) { - err = c->commitAsync(msg); - if (err) - Test::Fail("commitAsync() failed: " + RdKafka::err2str(err)); - rd_sleep(1); /* Sleep to simulate slow processing, making sure - * that the offset commit callback op gets - * inserted on the consume queue in front of - * the messages. */ - } RdKafka::Headers *headers = msg->get_headers(); - if (!headers) { - Test::Fail("Expected RdKafka::Message to contain headers"); + if (compare_headers->size() > 0) { + assert_all_headers_match(headers, compare_headers); + } else { + if (headers != 0) { + Test::Fail("Expected get_headers to return a NULL pointer"); + } } - - assert_all_headers_match(headers, headers_arr); - - } else if (msg->err() == RdKafka::ERR__TIMED_OUT) - ; /* Stil rebalancing? */ - else + running = false; + } else if (msg->err() == RdKafka::ERR__TIMED_OUT) { + Test::Say("I'm rebalancing?"); + /* Stil rebalancing? */ + } else { Test::Fail("consume() failed: " + msg->errstr()); + } delete msg; } - c->close(); delete c; delete p; delete conf; - } static void test_one_header () { Test::Say("Test one header in consumed message.\n"); - std::string val = "valid"; - test_n_headers(1, val.c_str()); + int num_hdrs = 1; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val); + compare_headers->add(key, val); + } + test_headers(produce_headers, compare_headers); } static void test_ten_headers () { Test::Say("Test ten headers in consumed message.\n"); - std::string val = "valid"; - test_n_headers(10, val.c_str()); + int num_hdrs = 10; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val); + compare_headers->add(key, val); + } + test_headers(produce_headers, compare_headers); } -static void test_one_header_null_msg () { - Test::Say("Test one header in consumed message with a null value message.\n"); - test_n_headers(1, NULL); +static void test_add_with_void_param () { + Test::Say("Test adding one header using add method that takes void*.\n"); + int num_hdrs = 1; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val.c_str(), val.size()); + compare_headers->add(key, val.c_str(), val.size()); + } + test_headers(produce_headers, compare_headers); } -static void test_one_header_empty_msg () { - Test::Say("Test one header in consumed message with an empty value message.\n"); - std::string val = ""; - test_n_headers(1, val.c_str()); +static void test_no_headers () { + Test::Say("Test no headers produced.\n"); + int num_hdrs = 0; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val); + compare_headers->add(key, val); + } + test_headers(produce_headers, compare_headers); +} + +static void test_header_with_null_value () { + Test::Say("Test one header with null value.\n"); + int num_hdrs = 1; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::stringstream key_s; + key_s << "header_" << i; + std::string key = key_s.str(); + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, NULL, 0); + compare_headers->add(key, NULL, 0); + } + test_headers(produce_headers, compare_headers); +} + +static void test_duplicate_keys () { + Test::Say("Test multiple headers with duplicate keys.\n"); + int num_hdrs = 4; + RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + for (int i = 0; i < num_hdrs; ++i) { + std::string dup_key = "dup_key"; + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(dup_key, val); + compare_headers->add(dup_key, val); + } + test_headers(produce_headers, compare_headers); +} + +static void test_remove_after_add () { + Test::Say("Test removing after adding headers.\n"); + int num_hdrs = 1; + RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + + // Add one unique key + std::string key_one = "key1"; + std::string val_one = "val_one"; + headers->add(key_one, val_one); + + // Add a second unique key + std::string key_two = "key2"; + std::string val_two = "val_two"; + headers->add(key_two, val_one); + + // Assert header length is 2 + size_t expected_size = 2; + if (headers->size() != expected_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size << ", instead got " + << headers->size() << "\n"); + } + + // Remove key_one and assert headers == 1 + headers->remove(key_one); + size_t expected_remove_size = 1; + if (headers->size() != expected_remove_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_remove_size << ", instead got " + << headers->size() << "\n"); + } +} + +static void test_remove_all_duplicate_keys () { + Test::Say("Test removing duplicate keys removes all headers.\n"); + int num_hdrs = 4; + RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + + // Add one unique key + std::string key_one = "key1"; + std::string val_one = "val_one"; + headers->add(key_one, val_one); + + // Add 2 duplicate keys + std::string dup_key = "dup_key"; + std::string val_two = "val_two"; + headers->add(dup_key, val_one); + headers->add(dup_key, val_two); + + // Assert header length is 3 + size_t expected_size = 3; + if (headers->size() != expected_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size << ", instead got " + << headers->size() << "\n"); + } + + // Remove key_one and assert headers == 1 + headers->remove(dup_key); + size_t expected_size_remove = 1; + if (headers->size() != expected_size_remove) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size_remove << ", instead got " + << headers->size() << "\n"); + } +} + +static void test_get_last_gives_last_added_val () { + Test::Say("Test get_last returns the last added value of duplicate keys.\n"); + int num_hdrs = 1; + RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + + // Add two duplicate keys + std::string dup_key = "dup_key"; + std::string val_one = "val_one"; + std::string val_two = "val_two"; + std::string val_three = "val_three"; + headers->add(dup_key, val_one); + headers->add(dup_key, val_two); + headers->add(dup_key, val_three); + + // Assert header length is 3 + size_t expected_size = 3; + if (headers->size() != expected_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size << ", instead got " + << headers->size() << "\n"); + } + + // Get last of duplicate key and assert it equals val_two + RdKafka::Headers::Header last = headers->get_last(dup_key); + std::string value = std::string(last.value_string()); + if (value != val_three) { + Test::Fail(tostr() << "Expected get_last to return " << val_two + << " as the value of the header instead got " + << value << "\n"); + } +} + +static void test_get_of_key_returns_all () { + Test::Say("Test get returns all the headers of a duplicate key.\n"); + int num_hdrs = 1; + RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + + // Add two duplicate keys + std::string unique_key = "unique"; + std::string dup_key = "dup_key"; + std::string val_one = "val_one"; + std::string val_two = "val_two"; + std::string val_three = "val_three"; + headers->add(unique_key, val_one); + headers->add(dup_key, val_one); + headers->add(dup_key, val_two); + headers->add(dup_key, val_three); + + // Assert header length is 4 + size_t expected_size = 4; + if (headers->size() != expected_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_size << ", instead got " + << headers->size() << "\n"); + } + + // Get all of the duplicate key + std::vector get = headers->get(dup_key); + size_t expected_get_size = 3; + if (get.size() != expected_get_size) { + Test::Fail(tostr() << "Expected header->size() to equal " + << expected_get_size << ", instead got " + << headers->size() << "\n"); + } } extern "C" { int main_0085_headers (int argc, char **argv) { test_one_header(); test_ten_headers(); - // These two tests fail and I'm not sure if this is correct behaviour - // test_one_header_null_msg(); - // test_one_header_empty_msg(); + test_add_with_void_param(); + test_no_headers(); + test_header_with_null_value(); + test_duplicate_keys(); + test_remove_after_add(); + test_remove_all_duplicate_keys(); + test_get_last_gives_last_added_val(); + test_get_of_key_returns_all(); return 0; } } From 2b098ad4c55f3b2eaa18a034c8a8229672bdc69b Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 15 Nov 2018 13:39:10 +0100 Subject: [PATCH 4/4] API changes, and some refactoring and cleanup of C++ Headers --- examples/rdkafka_example.cpp | 68 +++++--- src-cpp/HeadersImpl.cpp | 14 +- src-cpp/ProducerImpl.cpp | 43 +++-- src-cpp/rdkafkacpp.h | 273 +++++++++++++++----------------- src-cpp/rdkafkacpp_int.h | 235 +++++++++------------------- tests/0085-headers.cpp | 293 +++++++++++++++++------------------ 6 files changed, 418 insertions(+), 508 deletions(-) diff --git a/examples/rdkafka_example.cpp b/examples/rdkafka_example.cpp index 944843c3dc..dec77476ba 100644 --- a/examples/rdkafka_example.cpp +++ b/examples/rdkafka_example.cpp @@ -203,6 +203,8 @@ class MyHashPartitionerCb : public RdKafka::PartitionerCb { }; void msg_consume(RdKafka::Message* message, void* opaque) { + const RdKafka::Headers *headers; + switch (message->err()) { case RdKafka::ERR__TIMED_OUT: break; @@ -213,6 +215,20 @@ void msg_consume(RdKafka::Message* message, void* opaque) { if (message->key()) { std::cout << "Key: " << *message->key() << std::endl; } + headers = message->headers(); + if (headers) { + std::vector hdrs = headers->get_all(); + for (size_t i = 0 ; i < hdrs.size() ; i++) { + const RdKafka::Headers::Header hdr = hdrs[i]; + + if (hdr.value() != NULL) + printf(" Header: %s = \"%.*s\"\n", + hdr.key().c_str(), + (int)hdr.value_size(), (const char *)hdr.value()); + else + printf(" Header: %s = NULL\n", hdr.key().c_str()); + } + } printf("%.*s\n", static_cast(message->len()), static_cast(message->payload())); @@ -479,6 +495,8 @@ int main (int argc, char **argv) { /* Set delivery report callback */ conf->set("dr_cb", &ex_dr_cb, errstr); + conf->set("default_topic_conf", tconf, errstr); + /* * Create producer using accumulated global configuration. */ @@ -490,15 +508,6 @@ int main (int argc, char **argv) { std::cout << "% Created producer " << producer->name() << std::endl; - /* - * Create topic handle. - */ - RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, - tconf, errstr); - if (!topic) { - std::cerr << "Failed to create topic: " << errstr << std::endl; - exit(1); - } /* * Read messages from stdin and produce to broker. @@ -509,20 +518,36 @@ int main (int argc, char **argv) { continue; } + RdKafka::Headers *headers = RdKafka::Headers::create(); + headers->add("my header", "header value"); + headers->add("other header", "yes"); + /* * Produce message */ RdKafka::ErrorCode resp = - producer->produce(topic, partition, - RdKafka::Producer::RK_MSG_COPY /* Copy payload */, - const_cast(line.c_str()), line.size(), - NULL, NULL); - if (resp != RdKafka::ERR_NO_ERROR) - std::cerr << "% Produce failed: " << - RdKafka::err2str(resp) << std::endl; - else - std::cerr << "% Produced message (" << line.size() << " bytes)" << - std::endl; + producer->produce(topic_str, partition, + RdKafka::Producer::RK_MSG_COPY /* Copy payload */, + /* Value */ + const_cast(line.c_str()), line.size(), + /* Key */ + NULL, 0, + /* Timestamp (defaults to now) */ + 0, + /* Message headers, if any */ + headers, + /* Per-message opaque value passed to + * delivery report */ + NULL); + if (resp != RdKafka::ERR_NO_ERROR) { + std::cerr << "% Produce failed: " << + RdKafka::err2str(resp) << std::endl; + delete headers; /* Headers are automatically deleted on produce() + * success. */ + } else { + std::cerr << "% Produced message (" << line.size() << " bytes)" << + std::endl; + } producer->poll(0); } @@ -533,7 +558,6 @@ int main (int argc, char **argv) { producer->poll(1000); } - delete topic; delete producer; @@ -635,7 +659,7 @@ int main (int argc, char **argv) { RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000); if (err != RdKafka::ERR_NO_ERROR) { - std::cerr << "%% Failed to acquire metadata: " + std::cerr << "%% Failed to acquire metadata: " << RdKafka::err2str(err) << std::endl; run = 0; break; @@ -649,6 +673,8 @@ int main (int argc, char **argv) { } + delete conf; + delete tconf; /* * Wait for RdKafka to decommission. diff --git a/src-cpp/HeadersImpl.cpp b/src-cpp/HeadersImpl.cpp index d7b5f9357d..b31912c677 100644 --- a/src-cpp/HeadersImpl.cpp +++ b/src-cpp/HeadersImpl.cpp @@ -33,17 +33,15 @@ #include "rdkafkacpp_int.h" -RdKafka::Headers *RdKafka::Headers::create(size_t initial_count) { - return new RdKafka::HeadersImpl(initial_count, false); +RdKafka::Headers *RdKafka::Headers::create() { + return new RdKafka::HeadersImpl(); } RdKafka::Headers *RdKafka::Headers::create(const std::vector
&headers) { - if (headers.size() > 0) { - return new RdKafka::HeadersImpl(headers, false); - } else { - return new RdKafka::HeadersImpl(8, false); - } - + if (headers.size() > 0) + return new RdKafka::HeadersImpl(headers); + else + return new RdKafka::HeadersImpl(); } RdKafka::Headers::~Headers() {} diff --git a/src-cpp/ProducerImpl.cpp b/src-cpp/ProducerImpl.cpp index 5df139e72b..c8631fd694 100644 --- a/src-cpp/ProducerImpl.cpp +++ b/src-cpp/ProducerImpl.cpp @@ -169,25 +169,34 @@ RdKafka::ProducerImpl::produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, void *msg_opaque, - RdKafka::Headers *headers) { + int64_t timestamp, + RdKafka::Headers *headers, + void *msg_opaque) { rd_kafka_headers_t *hdrs = NULL; + RdKafka::HeadersImpl *headersimpl = NULL; + rd_kafka_resp_err_t err; + if (headers) { - hdrs = headers->c_headers(); + headersimpl = static_cast(headers); + hdrs = headersimpl->c_ptr(); } - return - static_cast - ( - rd_kafka_producev(rk_, - RD_KAFKA_V_TOPIC(topic_name.c_str()), - RD_KAFKA_V_PARTITION(partition), - RD_KAFKA_V_MSGFLAGS(msgflags), - RD_KAFKA_V_VALUE(payload, len), - RD_KAFKA_V_KEY(key, key_len), - RD_KAFKA_V_TIMESTAMP(timestamp), - RD_KAFKA_V_OPAQUE(msg_opaque), - RD_KAFKA_V_HEADERS(hdrs), - RD_KAFKA_V_END) - ); + err = rd_kafka_producev(rk_, + RD_KAFKA_V_TOPIC(topic_name.c_str()), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_MSGFLAGS(msgflags), + RD_KAFKA_V_VALUE(payload, len), + RD_KAFKA_V_KEY(key, key_len), + RD_KAFKA_V_TIMESTAMP(timestamp), + RD_KAFKA_V_OPAQUE(msg_opaque), + RD_KAFKA_V_HEADERS(hdrs), + RD_KAFKA_V_END); + + if (!err && headersimpl) { + /* A successful producev() call will destroy the C headers. */ + headersimpl->c_headers_destroyed(); + delete headers; + } + + return static_cast(err); } diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h index 70685a4ff2..0787b8ccd3 100644 --- a/src-cpp/rdkafkacpp.h +++ b/src-cpp/rdkafkacpp.h @@ -76,7 +76,6 @@ extern "C" { struct rd_kafka_s; struct rd_kafka_topic_s; struct rd_kafka_message_s; - struct rd_kafka_headers_s; }; namespace RdKafka { @@ -1451,22 +1450,24 @@ class RD_EXPORT MessageTimestamp { int64_t timestamp; /**< Milliseconds since epoch (UTC). */ }; + /** * @brief Headers object * - * This object encapsulates the C implementation logic into a C++ object - * for use in RdKafka::Messages object. + * Represents message headers. + * + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers * * @remark Requires Apache Kafka >= 0.11.0 brokers */ class RD_EXPORT Headers { - public: +public: virtual ~Headers() = 0; /** * @brief Header object * - * This object represents a single Header with key value pair + * This object represents a single Header with a key value pair * and an ErrorCode * * @remark dynamic allocation of this object is not supported. @@ -1477,111 +1478,107 @@ class RD_EXPORT Headers { * @brief Header object to encapsulate a single Header * * @param key the string value for the header key - * - * @param value the bytes of the header value - * + * @param value the bytes of the header value, or NULL * @param value_size the length in bytes of the header value + * + * @remark key and value are copied. + * */ Header(const std::string &key, const void *value, size_t value_size): key_(key), err_(ERR_NO_ERROR), value_size_(value_size) { - value_ = copy_value(value, value_size); - }; + value_ = copy_value(value, value_size); + } /** * @brief Header object to encapsulate a single Header * * @param key the string value for the header key - * * @param value the bytes of the header value - * * @param value_size the length in bytes of the header value - * * @param err the error code if one returned * - * @remark The error code is used for when the Header is constructed internally - * by using something like RdKafka::Headers::get_last which constructs + * @remark The error code is used for when the Header is constructed + * internally by using RdKafka::Headers::get_last which constructs * a Header encapsulating the ErrorCode in the process */ Header(const std::string &key, const void *value, size_t value_size, - const RdKafka::ErrorCode &err): - key_(key), err_(err), value_size_(value_size) { + const RdKafka::ErrorCode err): + key_(key), err_(err), value_size_(value_size) { value_ = copy_value(value, value_size); - }; + } /** * @brief Copy constructor * - * @param other the other Header used for the copy constructor + * @param other other Header used for the copy constructor */ - Header(const Header &other) - { - key_ = other.key_; - err_ = other.err_; - value_size_ = other.value_size_; - - value_ = copy_value(other.value_, value_size_); + Header(const Header &other): + key_(other.key_), err_(other.err_), value_size_(other.value_size_) { + value_ = copy_value(other.value_, value_size_); } Header& operator=(const Header &other) { - if(&other == this) { - return *this; - } + if (&other == this) { + return *this; + } - key_ = other.key_; - err_ = other.err_; - value_size_ = other.value_size_; + key_ = other.key_; + err_ = other.err_; + value_size_ = other.value_size_; - value_ = copy_value(other.value_, value_size_); - - return *this; + value_ = copy_value(other.value_, value_size_); + + return *this; } ~Header() { - if (value_ != NULL) { - free(value_); - } + if (value_ != NULL) + free(value_); } - - /** @returns Key the Key associated with this Header */ + + /** @returns the key/name associated with this Header */ std::string key() const { - return key_; + return key_; } - /** @returns Value returns the binary value */ + /** @returns returns the binary value, or NULL */ const void *value() const { - return value_; + return value_; } - /** @returns Value returns the value casted to a C string */ + /** @returns returns the value casted to a nul-terminated C string, + * or NULL. */ const char *value_string() const { - return static_cast(value_); + return static_cast(value_); } - /** @returns Value Size the length of the Value in bytes */ + /** @returns Value Size the length of the Value in bytes */ size_t value_size() const { - return value_size_; + return value_size_; } - /** @returns Error Code the error code of this Header (usually ERR_NO_ERROR) */ + /** @returns the error code of this Header (usually ERR_NO_ERROR) */ RdKafka::ErrorCode err() const { - return err_; + return err_; } - - private: - char *copy_value(const void* value, size_t value_size) { - char * dest = NULL; - if (value != NULL) { - dest = (char*) malloc(value_size + 1); - memcpy(dest, (char*)value, value_size); - dest[value_size] = '\0'; - } - return dest; + + private: + char *copy_value(const void *value, size_t value_size) { + if (!value) + return NULL; + + char *dest = (char *)malloc(value_size + 1); + memcpy(dest, (const char *)value, value_size); + dest[value_size] = '\0'; + + return dest; } + std::string key_; RdKafka::ErrorCode err_; char *value_; @@ -1590,130 +1587,103 @@ class RD_EXPORT Headers { }; /** - * @brief create a new instance of the Headers object + * @brief Create a new instance of the Headers object * - * @params initial_size initial size to set the Headers list to - * - * @returns Empty Headers list set to the initial size + * @returns an empty Headers list */ - static Headers *create(size_t initial_size); + static Headers *create(); /** - * @brief create a new instance of the Headers object from a std::vector + * @brief Create a new instance of the Headers object from a std::vector * - * @params headers std::vector of RdKafka::Headers::Header objects + * @params headers std::vector of RdKafka::Headers::Header objects. + * The headers are copied, not referenced. * - * @returns Headers list from std::vector set to the size of the std::vector + * @returns a Headers list from std::vector set to the size of the std::vector */ static Headers *create(const std::vector
&headers); - /** - * @brief adds a Header to the end - * - * @param key the header key as a std::string - * - * @param value the value as a binary value + /** + * @brief Adds a Header to the end of the list. * - * @param value_size the size of the value added + * @param key header key/name + * @param value binary value, or NULL + * @param value_size size of the value * - * @returns An ErrorCode signalling a success or failure to add the header. + * @returns an ErrorCode signalling success or failure to add the header. */ - virtual ErrorCode add(const std::string& key, const void* value, size_t value_size) = 0; + virtual ErrorCode add(const std::string &key, const void *value, + size_t value_size) = 0; - /** - * @brief adds a Header to the end - * - * @param key the header key as a std::string + /** + * @brief Adds a Header to the end of the list. + * + * Convenience method for adding a std::string as a value for the header. * - * @param value the value as a std::string + * @param key header key/name + * @param value value string * - * @remark convenience method for adding a std::string as a value for the header + * @returns an ErrorCode signalling success or failure to add the header. + */ + virtual ErrorCode add(const std::string &key, const std::string &value) = 0; + + /** + * @brief Adds a Header to the end of the list. + * + * This method makes a copy of the passed header. + * + * @param header Existing header to copy * - * @returns An ErrorCode signalling a success or failure to add the header. + * @returns an ErrorCode signalling success or failure to add the header. */ - virtual ErrorCode add(const std::string& key, const std::string &value) = 0; + virtual ErrorCode add(const Header &header) = 0; - /** - * @brief removes all the Headers of a given key + /** + * @brief Removes all the Headers of a given key * - * @param key the header key as a std::string you want to remove + * @param key header key/name to remove * - * @remark if duplicate keys exist this will remove all of them - * * @returns An ErrorCode signalling a success or failure to remove the Header. */ - virtual ErrorCode remove(const std::string& key) = 0; + virtual ErrorCode remove(const std::string &key) = 0; - /** - * @brief gets all of the Headers of a given key + /** + * @brief Gets all of the Headers of a given key * - * @param key the header key as a std::string you want to get + * @param key header key/name * - * @remark if duplicate keys exist this will return them all as a std::vector + * @remark If duplicate keys exist this will return them all as a std::vector * * @returns a std::vector containing all the Headers of the given key. */ virtual std::vector
get(const std::string &key) const = 0; - /** - * @brief gets the last occurrence of a Header of a given key + /** + * @brief Gets the last occurrence of a Header of a given key * - * @param key the header key as a std::string you want to get + * @param key header key/name * - * @remark this will only return the most recently added header + * @remark This will only return the most recently added header * - * @returns the Header if found, otherwise a Header with an ErrorCode + * @returns the Header if found, otherwise a Header with an err set to + * ERR__NOENT. */ - virtual Header get_last(const std::string& key) const = 0; + virtual Header get_last(const std::string &key) const = 0; - /** - * @brief returns all the Headers of a Message + /** + * @brief Returns all Headers * - * @returns a std::vector containing all of the Headers of a message + * @returns a std::vector containing all of the Headers */ virtual std::vector
get_all() const = 0; - - /** - * @brief the count of all the Headers - * - * @returns a size_t count of all the headers - */ - virtual size_t size() const = 0; - /** - * @brief Returns the underlying librdkafka C rd_kafka_headers_t handle. - * - * @warning Calling the C API on this handle is not recommended and there - * is no official support for it, but for cases where the C++ API - * does not provide the underlying functionality this C handle can be - * used to interact directly with the core librdkafka API. - * - * @remark The lifetime of the returned pointer can be different than the lifetime - * of the Headers message due to how the producev function in the C API works - * if there is no error then the producev will take care of deallocation - * but if there is an error then it is the responsibility of the calling - * object to deallocate the underlying C implementation if an instance - * of the Headers object is created with free_rd_headers set to `false` - * - * @remark Include prior to including - * - * - * @returns \c rd_kafka_headers_t* - */ - virtual struct rd_kafka_headers_s *c_headers() = 0; - - /** - * @brief cleans up the underlying allocated C implementation headers if called - * - * @remark Safe to call even if the Headers object is set to clean up when - * when the destructor is called - * - * @remark Safe to call even if the underlyng C pointer is set to null + * @returns the number of headers. */ - virtual void destroy_headers() = 0; + virtual size_t size() const = 0; }; + /** * @brief Message object * @@ -1823,8 +1793,19 @@ class RD_EXPORT Message { */ virtual Status status () const = 0; - /** @returns The Headers instance for this Message (if applicable) */ - virtual RdKafka::Headers *get_headers() = 0; + /** @returns the Headers instance for this Message, or NULL if there + * are no headers. + * + * @remark The lifetime of the Headers are the same as the Message. */ + virtual RdKafka::Headers *headers () = 0; + + /** @returns the Headers instance for this Message (if applicable). + * If NULL is returned the reason is given in \p err, which + * is either ERR__NOENT if there were no headers, or another + * error code if header parsing failed. + * + * @remark The lifetime of the Headers are the same as the Message. */ + virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0; }; /**@}*/ @@ -2528,13 +2509,17 @@ class RD_EXPORT Producer : public virtual Handle { /** * @brief produce() variant that that allows for Header support on produce * Otherwise identical to produce() above. + * + * @warning The \p headers will be freed/deleted if the produce() call + * succeeds, or left untouched if produce() fails. */ virtual ErrorCode produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, void *msg_opaque, - RdKafka::Headers *headers) = 0; + int64_t timestamp, + RdKafka::Headers *headers, + void *msg_opaque) = 0; /** diff --git a/src-cpp/rdkafkacpp_int.h b/src-cpp/rdkafkacpp_int.h index 782ee5f031..4b04fce066 100644 --- a/src-cpp/rdkafkacpp_int.h +++ b/src-cpp/rdkafkacpp_int.h @@ -119,16 +119,16 @@ class EventImpl : public Event { bool fatal_; }; + class HeadersImpl : public Headers { public: - HeadersImpl (size_t initial_size, bool free_rd_headers): - headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} + HeadersImpl (): + headers_ (rd_kafka_headers_new(8)) {} HeadersImpl (rd_kafka_headers_t *headers): - headers_ (headers), free_headers_ (false) {}; + headers_ (headers) {} - HeadersImpl (const std::vector
&headers, bool free_rd_headers): - free_headers_ (free_rd_headers) { + HeadersImpl (const std::vector
&headers) { if (headers.size() > 0) { headers_ = rd_kafka_headers_new(headers.size()); from_vector(headers); @@ -138,128 +138,20 @@ class HeadersImpl : public Headers { } ~HeadersImpl() { - if(free_headers_ && headers_) { + if (headers_) { rd_kafka_headers_destroy(headers_); } } - ErrorCode add(const std::string& key, const char* value) { + ErrorCode add(const std::string& key, const char *value) { rd_kafka_resp_err_t err; err = rd_kafka_header_add(headers_, key.c_str(), key.size(), - value, strlen(value)); - return static_cast(err); - } - - ErrorCode remove(const std::string& key) { - rd_kafka_resp_err_t err; - err = rd_kafka_header_remove (headers_, key.c_str()); + value, -1); return static_cast(err); } - std::vector get(const std::string &key) const { - std::vector headers; - const void *value; - size_t size; - rd_kafka_resp_err_t err; - for (size_t idx = 0; - !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ;\ - idx++) { - if (value) { - const char* casted_value = static_cast(value); - headers.push_back(Headers::Header(key, casted_value)); - } - } - return headers; - } - - Headers::Header get_last(const std::string& key) const { - const void *value; - size_t size; - rd_kafka_resp_err_t err; - err = rd_kafka_header_get_last (headers_, key.c_str(), &value, &size); - const char* casted_value = static_cast(value); - ErrorCode cpp_error = static_cast(err); - return Headers::Header(key, casted_value, cpp_error); - } - - std::vector get_all() const { - std::vector headers; - size_t idx = 0; - const char *name; - const void *valuep; - size_t size; - while (!rd_kafka_header_get_all(headers_, idx++, - &name, &valuep, &size)) { - if (valuep != NULL) { - const char* casted_value = static_cast(valuep); - headers.push_back(Headers::Header(name, casted_value)); - } - } - return headers; - } - - size_t size() const { - return rd_kafka_header_cnt(headers_); - } - - struct rd_kafka_headers_s* c_headers() { - return headers_; - } - - ErrorCode destroy_headers() { - if (headers_) { - rd_kafka_headers_destroy(headers_); - headers_ = 0; - return RdKafka::ERR_NO_ERROR; - } else { - return RdKafka::ERR_OPERATION_NOT_ATTEMPTED; - } - } - - private: - void from_vector(const std::vector
&headers) { - if (headers.size() > 0) { - for (std::vector
::const_iterator it = headers.begin(); - it != headers.end(); - it++) { - this->add(it->key, it->value); - } - } - } - - HeadersImpl(HeadersImpl const&) /*= delete*/; - HeadersImpl& operator=(HeadersImpl const&) /*= delete*/; - - rd_kafka_headers_t* headers_; - bool free_headers_; -}; - -class HeadersImpl : public Headers { - public: - HeadersImpl (size_t initial_size, bool free_rd_headers): - headers_ (rd_kafka_headers_new(initial_size)), free_headers_ (free_rd_headers) {} - - HeadersImpl (rd_kafka_headers_t *headers, bool free_rd_headers): - headers_ (headers), free_headers_ (free_rd_headers) {}; - - HeadersImpl (const std::vector
&headers, bool free_rd_headers): - free_headers_ (free_rd_headers) { - if (headers.size() > 0) { - headers_ = rd_kafka_headers_new(headers.size()); - from_vector(headers); - } else { - headers_ = rd_kafka_headers_new(8); - } - } - - ~HeadersImpl() { - if (free_headers_ && headers_) { - rd_kafka_headers_destroy(headers_); - } - } - - ErrorCode add(const std::string &key, const void *value, size_t value_size) { + ErrorCode add(const std::string& key, const void *value, size_t value_size) { rd_kafka_resp_err_t err; err = rd_kafka_header_add(headers_, key.c_str(), key.size(), @@ -275,7 +167,15 @@ class HeadersImpl : public Headers { return static_cast(err); } - ErrorCode remove(const std::string &key) { + ErrorCode add(const Header &header) { + rd_kafka_resp_err_t err; + err = rd_kafka_header_add(headers_, + header.key().c_str(), header.key().size(), + header.value(), header.value_size()); + return static_cast(err); + } + + ErrorCode remove(const std::string& key) { rd_kafka_resp_err_t err; err = rd_kafka_header_remove (headers_, key.c_str()); return static_cast(err); @@ -287,11 +187,10 @@ class HeadersImpl : public Headers { size_t size; rd_kafka_resp_err_t err; for (size_t idx = 0; - !(err = rd_kafka_header_get(headers_, idx, key.c_str(), &value, &size)) ; + !(err = rd_kafka_header_get(headers_, idx, key.c_str(), + &value, &size)) ; idx++) { - if (value) { - headers.push_back(Headers::Header(key, value, size)); - } + headers.push_back(Headers::Header(key, value, size)); } return headers; } @@ -301,21 +200,19 @@ class HeadersImpl : public Headers { size_t size; rd_kafka_resp_err_t err; err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size); - ErrorCode cpp_error = static_cast(err); - return Headers::Header(key, value, size, cpp_error); + return Headers::Header(key, value, size, + static_cast(err)); } std::vector get_all() const { std::vector headers; size_t idx = 0; const char *name; - const void *value; + const void *valuep; size_t size; while (!rd_kafka_header_get_all(headers_, idx++, - &name, &value, &size)) { - if (value != NULL) { - headers.push_back(Headers::Header(name, value, size)); - } + &name, &valuep, &size)) { + headers.push_back(Headers::Header(name, valuep, size)); } return headers; } @@ -324,36 +221,34 @@ class HeadersImpl : public Headers { return rd_kafka_header_cnt(headers_); } - struct rd_kafka_headers_s* c_headers() { - return headers_; + /** @brief Reset the C headers pointer to NULL. */ + void c_headers_destroyed() { + headers_ = NULL; } - void destroy_headers() { - if (headers_) { - rd_kafka_headers_destroy(headers_); - headers_ = 0; - } + /** @returns the underlying C headers, or NULL. */ + rd_kafka_headers_t *c_ptr() { + return headers_; } - - private: + + +private: void from_vector(const std::vector
&headers) { - if (headers.size() > 0) { - for (std::vector
::const_iterator it = headers.begin(); - it != headers.end(); - it++) { - this->add(it->key(), it->value(), it->value_size()); - } - } + if (headers.size() == 0) + return; + for (std::vector
::const_iterator it = headers.begin(); + it != headers.end(); it++) + this->add(*it); } HeadersImpl(HeadersImpl const&) /*= delete*/; HeadersImpl& operator=(HeadersImpl const&) /*= delete*/; - rd_kafka_headers_t* headers_; - bool free_headers_; + rd_kafka_headers_t *headers_; }; + class MessageImpl : public Message { public: ~MessageImpl () { @@ -367,16 +262,16 @@ class MessageImpl : public Message { MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage): topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), - headers_(get_headers_from_rkmessage(rkmessage)) {} + headers_(NULL) {} MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage, bool dofree): topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL), - headers_(get_headers_from_rkmessage(rkmessage)) {} + headers_(NULL) {} MessageImpl (rd_kafka_message_t *rkmessage): topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), - headers_(get_headers_from_rkmessage(rkmessage)) { + headers_(NULL) { if (rkmessage->rkt) { /* Possibly NULL */ topic_ = static_cast(rd_kafka_topic_opaque(rkmessage->rkt)); @@ -451,8 +346,27 @@ class MessageImpl : public Message { return static_cast(rd_kafka_message_status(rkmessage_)); } - Headers* get_headers() { - return headers_; + Headers *headers () { + ErrorCode err; + return headers(&err); + } + + Headers *headers (ErrorCode *err) { + *err = ERR_NO_ERROR; + + if (!headers_) { + rd_kafka_headers_t *c_hdrs; + rd_kafka_resp_err_t c_err; + + if ((c_err = rd_kafka_message_detach_headers(rkmessage_, &c_hdrs))) { + *err = static_cast(c_err); + return NULL; + } + + headers_ = new HeadersImpl(c_hdrs); + } + + return headers_; } RdKafka::Topic *topic_; @@ -462,21 +376,13 @@ class MessageImpl : public Message { * used as a place holder and rkmessage_ is set to point to it. */ rd_kafka_message_t rkmessage_err_; mutable std::string *key_; /* mutable because it's a cached value */ - RdKafka::Headers *headers_; private: - RdKafka::Headers* get_headers_from_rkmessage(rd_kafka_message_t *rkmessage) { - rd_kafka_headers_t *hdrsp; - rd_kafka_resp_err_t err; - - if (rkmessage->len > 0 && !(err = rd_kafka_message_detach_headers(rkmessage, &hdrsp))) { - return new HeadersImpl(hdrsp, free_rkmessage_); - } - return NULL; - } /* "delete" copy ctor + copy assignment, for safety of key_ */ MessageImpl(MessageImpl const&) /*= delete*/; MessageImpl& operator=(MessageImpl const&) /*= delete*/; + + RdKafka::Headers *headers_; }; @@ -1173,8 +1079,9 @@ class ProducerImpl : virtual public Producer, virtual public HandleImpl { int msgflags, void *payload, size_t len, const void *key, size_t key_len, - int64_t timestamp, void *msg_opaque, - RdKafka::Headers *headers); + int64_t timestamp, + RdKafka::Headers *headers, + void *msg_opaque); ErrorCode flush (int timeout_ms) { return static_cast(rd_kafka_flush(rk_, diff --git a/tests/0085-headers.cpp b/tests/0085-headers.cpp index 72f7189d48..2ce24b6e3e 100644 --- a/tests/0085-headers.cpp +++ b/tests/0085-headers.cpp @@ -29,22 +29,27 @@ #include #include "testcpp.h" + +static RdKafka::Producer *producer; +static RdKafka::KafkaConsumer *consumer; +static std::string topic; + static void assert_all_headers_match(RdKafka::Headers *actual, - RdKafka::Headers *expected) { + const RdKafka::Headers *expected) { if (!actual) { Test::Fail("Expected RdKafka::Message to contain headers"); } if (actual->size() != expected->size()) { - Test::Fail(tostr() << "Expected headers length to equal " + Test::Fail(tostr() << "Expected headers length to equal " << expected->size() << " instead equals " << actual->size() << "\n"); } std::vector actual_headers = actual->get_all(); std::vector expected_headers = expected->get_all(); - Test::Say(tostr() << "Header size " << actual_headers.size() << "\n"); + Test::Say(3, tostr() << "Header size " << actual_headers.size() << "\n"); for(size_t i = 0; i < actual_headers.size(); i++) { RdKafka::Headers::Header actual_header = actual_headers[i]; - RdKafka::Headers::Header expected_header = expected_headers[i]; + const RdKafka::Headers::Header expected_header = expected_headers[i]; std::string actual_key = actual_header.key(); std::string actual_value = std::string( actual_header.value_string(), @@ -56,185 +61,108 @@ static void assert_all_headers_match(RdKafka::Headers *actual, expected_header.value_size() ); - Test::Say(tostr() << "Expected Key " << expected_key << " Expected val " << expected_value - << " Actual key " << actual_key << " Actual val " << actual_value << "\n"); + Test::Say(3, + tostr() << + "Expected Key " << expected_key << + ", Expected val " << expected_value << + ", Actual key " << actual_key << + ", Actual val " << actual_value << "\n"); if (actual_key != expected_key) { - Test::Fail(tostr() << "Header key does not match, expected '" + Test::Fail(tostr() << "Header key does not match, expected '" << actual_key << "' but got '" << expected_key << "'\n"); } if (actual_value != expected_value) { - Test::Fail(tostr() << "Header value does not match, expected '" + Test::Fail(tostr() << "Header value does not match, expected '" << actual_value << "' but got '" << expected_value << "'\n"); } } } static void test_headers (RdKafka::Headers *produce_headers, - RdKafka::Headers *compare_headers) { - std::string topic = Test::mk_topic_name("0085-headers", 1); - RdKafka::Conf *conf; - std::string errstr; - - Test::conf_init(&conf, NULL, 0); - - Test::conf_set(conf, "group.id", topic); - - RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); - if (!p) - Test::Fail("Failed to create Producer: " + errstr); + const RdKafka::Headers *compare_headers) { RdKafka::ErrorCode err; - err = p->produce(topic, 0, - RdKafka::Producer::RK_MSG_COPY, - (void *)"message", 7, - (void *)"key", 3, 0, NULL, produce_headers); + err = producer->produce(topic, 0, + RdKafka::Producer::RK_MSG_COPY, + (void *)"message", 7, + (void *)"key", 3, 0, produce_headers, NULL); + if (err) + Test::Fail("produce() failed: " + RdKafka::err2str(err)); - p->flush(tmout_multip(10000)); + producer->flush(tmout_multip(10*1000)); - if (p->outq_len() > 0) + if (producer->outq_len() > 0) Test::Fail(tostr() << "Expected producer to be flushed, " << - p->outq_len() << " messages remain"); - - RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); - if (!c) - Test::Fail("Failed to create KafkaConsumer: " + errstr); - std::vector parts; - parts.push_back(RdKafka::TopicPartition::create(topic, 0, - RdKafka::Topic::OFFSET_BEGINNING)); - err = c->assign(parts); - if (err != RdKafka::ERR_NO_ERROR) - Test::Fail("assign() failed: " + RdKafka::err2str(err)); - RdKafka::TopicPartition::destroy(parts); + producer->outq_len() << " messages remain"); int cnt = 0; bool running = true; while (running) { - RdKafka::Message *msg = c->consume(10000); - Test::Say(tostr() << msg->err()); + RdKafka::Message *msg = consumer->consume(10*1000); + if (msg->err() == RdKafka::ERR_NO_ERROR) { cnt++; - Test::Say(tostr() << "Received message #" << cnt << "\n"); - RdKafka::Headers *headers = msg->get_headers(); + RdKafka::Headers *headers = msg->headers(); if (compare_headers->size() > 0) { assert_all_headers_match(headers, compare_headers); } else { if (headers != 0) { - Test::Fail("Expected get_headers to return a NULL pointer"); + Test::Fail("Expected headers to return a NULL pointer"); } } running = false; - } else if (msg->err() == RdKafka::ERR__TIMED_OUT) { - Test::Say("I'm rebalancing?"); - /* Stil rebalancing? */ } else { Test::Fail("consume() failed: " + msg->errstr()); } delete msg; } - c->close(); - delete c; - delete p; - delete conf; } -static void test_one_header () { - Test::Say("Test one header in consumed message.\n"); - int num_hdrs = 1; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); +static void test_headers (int num_hdrs) { + Test::Say(tostr() << "Test " << num_hdrs << + " headers in consumed message.\n"); + RdKafka::Headers *produce_headers = RdKafka::Headers::create(); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(); for (int i = 0; i < num_hdrs; ++i) { std::stringstream key_s; key_s << "header_" << i; std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, val); - compare_headers->add(key, val); - } - test_headers(produce_headers, compare_headers); -} -static void test_ten_headers () { - Test::Say("Test ten headers in consumed message.\n"); - int num_hdrs = 10; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); - for (int i = 0; i < num_hdrs; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, val); - compare_headers->add(key, val); - } - test_headers(produce_headers, compare_headers); -} - -static void test_add_with_void_param () { - Test::Say("Test adding one header using add method that takes void*.\n"); - int num_hdrs = 1; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); - for (int i = 0; i < num_hdrs; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, val.c_str(), val.size()); - compare_headers->add(key, val.c_str(), val.size()); - } - test_headers(produce_headers, compare_headers); -} - -static void test_no_headers () { - Test::Say("Test no headers produced.\n"); - int num_hdrs = 0; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); - for (int i = 0; i < num_hdrs; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, val); - compare_headers->add(key, val); - } - test_headers(produce_headers, compare_headers); -} - -static void test_header_with_null_value () { - Test::Say("Test one header with null value.\n"); - int num_hdrs = 1; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); - for (int i = 0; i < num_hdrs; ++i) { - std::stringstream key_s; - key_s << "header_" << i; - std::string key = key_s.str(); - std::stringstream val_s; - val_s << "value_" << i; - std::string val = val_s.str(); - produce_headers->add(key, NULL, 0); - compare_headers->add(key, NULL, 0); + if ((i % 4) == 0) { + /* NULL value */ + produce_headers->add(key, NULL, 0); + compare_headers->add(key, NULL, 0); + } else if ((i % 5) == 0) { + /* Empty value, use different methods for produce + * and compare to make sure they behave the same way. */ + std::string val = ""; + produce_headers->add(key, val); + compare_headers->add(key, "", 0); + } else if ((i % 6) == 0) { + /* Binary value (no nul-term) */ + produce_headers->add(key, "binary", 6); + compare_headers->add(key, "binary"); /* auto-nul-terminated */ + } else { + /* Standard string value */ + std::stringstream val_s; + val_s << "value_" << i; + std::string val = val_s.str(); + produce_headers->add(key, val); + compare_headers->add(key, val); + } } test_headers(produce_headers, compare_headers); + delete compare_headers; } static void test_duplicate_keys () { Test::Say("Test multiple headers with duplicate keys.\n"); int num_hdrs = 4; - RdKafka::Headers *produce_headers = RdKafka::Headers::create(num_hdrs); - RdKafka::Headers *compare_headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *produce_headers = RdKafka::Headers::create(); + RdKafka::Headers *compare_headers = RdKafka::Headers::create(); for (int i = 0; i < num_hdrs; ++i) { std::string dup_key = "dup_key"; std::stringstream val_s; @@ -244,12 +172,12 @@ static void test_duplicate_keys () { compare_headers->add(dup_key, val); } test_headers(produce_headers, compare_headers); + delete compare_headers; } static void test_remove_after_add () { Test::Say("Test removing after adding headers.\n"); - int num_hdrs = 1; - RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *headers = RdKafka::Headers::create(); // Add one unique key std::string key_one = "key1"; @@ -264,7 +192,7 @@ static void test_remove_after_add () { // Assert header length is 2 size_t expected_size = 2; if (headers->size() != expected_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size << ", instead got " << headers->size() << "\n"); } @@ -273,16 +201,17 @@ static void test_remove_after_add () { headers->remove(key_one); size_t expected_remove_size = 1; if (headers->size() != expected_remove_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_remove_size << ", instead got " << headers->size() << "\n"); } + + delete headers; } static void test_remove_all_duplicate_keys () { Test::Say("Test removing duplicate keys removes all headers.\n"); - int num_hdrs = 4; - RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *headers = RdKafka::Headers::create(); // Add one unique key std::string key_one = "key1"; @@ -298,7 +227,7 @@ static void test_remove_all_duplicate_keys () { // Assert header length is 3 size_t expected_size = 3; if (headers->size() != expected_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size << ", instead got " << headers->size() << "\n"); } @@ -307,16 +236,17 @@ static void test_remove_all_duplicate_keys () { headers->remove(dup_key); size_t expected_size_remove = 1; if (headers->size() != expected_size_remove) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size_remove << ", instead got " << headers->size() << "\n"); } + + delete headers; } static void test_get_last_gives_last_added_val () { Test::Say("Test get_last returns the last added value of duplicate keys.\n"); - int num_hdrs = 1; - RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *headers = RdKafka::Headers::create(); // Add two duplicate keys std::string dup_key = "dup_key"; @@ -330,7 +260,7 @@ static void test_get_last_gives_last_added_val () { // Assert header length is 3 size_t expected_size = 3; if (headers->size() != expected_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size << ", instead got " << headers->size() << "\n"); } @@ -339,16 +269,17 @@ static void test_get_last_gives_last_added_val () { RdKafka::Headers::Header last = headers->get_last(dup_key); std::string value = std::string(last.value_string()); if (value != val_three) { - Test::Fail(tostr() << "Expected get_last to return " << val_two + Test::Fail(tostr() << "Expected get_last to return " << val_two << " as the value of the header instead got " << value << "\n"); } + + delete headers; } static void test_get_of_key_returns_all () { Test::Say("Test get returns all the headers of a duplicate key.\n"); - int num_hdrs = 1; - RdKafka::Headers *headers = RdKafka::Headers::create(num_hdrs); + RdKafka::Headers *headers = RdKafka::Headers::create(); // Add two duplicate keys std::string unique_key = "unique"; @@ -364,7 +295,7 @@ static void test_get_of_key_returns_all () { // Assert header length is 4 size_t expected_size = 4; if (headers->size() != expected_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_size << ", instead got " << headers->size() << "\n"); } @@ -373,24 +304,78 @@ static void test_get_of_key_returns_all () { std::vector get = headers->get(dup_key); size_t expected_get_size = 3; if (get.size() != expected_get_size) { - Test::Fail(tostr() << "Expected header->size() to equal " + Test::Fail(tostr() << "Expected header->size() to equal " << expected_get_size << ", instead got " << headers->size() << "\n"); } + + delete headers; +} + +static void test_failed_produce () { + + RdKafka::Headers *headers = RdKafka::Headers::create(); + headers->add("my", "header"); + + RdKafka::ErrorCode err; + + err = producer->produce(topic, 999 /* invalid partition */, + RdKafka::Producer::RK_MSG_COPY, + (void *)"message", 7, + (void *)"key", 3, 0, headers, NULL); + if (!err) + Test::Fail("Expected produce() to fail"); + + delete headers; } extern "C" { int main_0085_headers (int argc, char **argv) { - test_one_header(); - test_ten_headers(); - test_add_with_void_param(); - test_no_headers(); - test_header_with_null_value(); + topic = Test::mk_topic_name("0085-headers", 1); + + RdKafka::Conf *conf; + std::string errstr; + + Test::conf_init(&conf, NULL, 0); + + RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr); + if (!p) + Test::Fail("Failed to create Producer: " + errstr); + + Test::conf_set(conf, "group.id", topic); + + RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr); + if (!c) + Test::Fail("Failed to create KafkaConsumer: " + errstr); + + delete conf; + + std::vector parts; + parts.push_back(RdKafka::TopicPartition::create(topic, 0, + RdKafka::Topic:: + OFFSET_BEGINNING)); + RdKafka::ErrorCode err = c->assign(parts); + if (err != RdKafka::ERR_NO_ERROR) + Test::Fail("assign() failed: " + RdKafka::err2str(err)); + RdKafka::TopicPartition::destroy(parts); + + producer = p; + consumer = c; + + test_headers(0); + test_headers(1); + test_headers(261); test_duplicate_keys(); test_remove_after_add(); test_remove_all_duplicate_keys(); test_get_last_gives_last_added_val(); test_get_of_key_returns_all(); + test_failed_produce(); + + c->close(); + delete c; + delete p; + return 0; } }