Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Header support for C++ API #1959

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions examples/kafkatest_verifiable_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,17 @@ void msg_consume(RdKafka::KafkaConsumer *consumer,
" [" << (int)msg->partition() << "] at offset " <<
msg->offset() << std::endl;

RdKafka::Headers *headers = msg->get_headers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for headers support in the verifiable_client, please remove these changes.

kafkatest_verifiable_client is used to run the Java system tests with the librdkafka client.

if (headers) {
std::vector<RdKafka::Headers::Header> sheaders = headers->get_all();
std::cout << "Headers length: " << sheaders.size() << std::endl;
for(std::vector<RdKafka::Headers::Header>::const_iterator it = sheaders.begin();
it != sheaders.end();
it++) {
std::cout << "Key: " << it->key() << " Value: " << it->value() << std::endl;
}
}

if (state.maxMessages >= 0 &&
state.consumer.consumedMessages >= state.maxMessages)
return;
Expand Down Expand Up @@ -834,31 +845,42 @@ int main (int argc, char **argv) {
msg << value_prefix << i;
while (true) {
RdKafka::ErrorCode resp;
if (create_time == -1) {
resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.str().c_str()),
msg.str().size(), NULL, NULL);
} else {
resp = producer->produce(topics[0], partition,
RdKafka::Headers *headers = 0;
if (create_time == -1) {
resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.str().c_str()),
msg.str().size(), NULL, NULL);
} else {
std::string name = "kafkaheader";
std::string val = "header_val";
std::vector<RdKafka::Headers::Header> headers_arr;
headers_arr.push_back(RdKafka::Headers::Header(name, val.c_str()));

headers = RdKafka::Headers::create(headers_arr, false);
resp = producer->produce(topics[0], partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.str().c_str()),
msg.str().size(),
NULL, 0,
create_time,
NULL);
}
NULL, headers);
}

if (resp == RdKafka::ERR__QUEUE_FULL) {
producer->poll(100);
continue;
} else if (resp != RdKafka::ERR_NO_ERROR) {
headers->destroy_headers();
errorString("producer_send_error",
RdKafka::err2str(resp), topic->name(), NULL, msg.str());
state.producer.numErr++;
} else {
state.producer.numSent++;
}
if (headers) {
delete headers;
}
break;
}

Expand Down
1 change: 1 addition & 0 deletions src-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_library(
ConfImpl.cpp
ConsumerImpl.cpp
HandleImpl.cpp
HeadersImpl.cpp
KafkaConsumerImpl.cpp
MessageImpl.cpp
MetadataImpl.cpp
Expand Down
49 changes: 49 additions & 0 deletions src-cpp/HeadersImpl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* librdkafka - Apache Kafka C/C++ library
*
* Copyright (c) 2014 Magnus Edenhill
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Copyright (c) 2014 Magnus Edenhill
* Copyright (c) 2018 Magnus Edenhill

* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include <iostream>
#include <string>
#include <list>
#include <cerrno>

#include "rdkafkacpp_int.h"

RdKafka::Headers *RdKafka::Headers::create(size_t initial_count, bool free_rd_headers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two spaces after type, should only be one.

return new RdKafka::HeadersImpl(initial_count, free_rd_headers);
}

RdKafka::Headers *RdKafka::Headers::create(const std::vector<Header> &headers, bool free_rd_headers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

free_rd_headers is an implementation detail, remove it from the public API.

if (headers.size() > 0) {
return new RdKafka::HeadersImpl(headers, free_rd_headers);
} else {
return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would've assumed an empty Headers list here rather than NULL

}

}

RdKafka::Headers::~Headers() {}
2 changes: 1 addition & 1 deletion src-cpp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ LIBVER= 1
CXXSRCS= RdKafka.cpp ConfImpl.cpp HandleImpl.cpp \
ConsumerImpl.cpp ProducerImpl.cpp KafkaConsumerImpl.cpp \
TopicImpl.cpp TopicPartitionImpl.cpp MessageImpl.cpp \
QueueImpl.cpp MetadataImpl.cpp
HeadersImpl.cpp QueueImpl.cpp MetadataImpl.cpp

HDRS= rdkafkacpp.h

Expand Down
12 changes: 9 additions & 3 deletions src-cpp/ProducerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,19 @@ RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,

}


RdKafka::ErrorCode
RdKafka::ProducerImpl::produce (const std::string topic_name,
int32_t partition, int msgflags,
void *payload, size_t len,
const void *key, size_t key_len,
int64_t timestamp,
void *msg_opaque) {
int64_t timestamp, void *msg_opaque,
RdKafka::Headers *headers) {
rd_kafka_headers_t *hdrs;
if (headers) {
hdrs = headers->c_headers();
} else {
hdrs = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use NULL for pointers, not 0. And in this case you can initialize hdrs to NULL directly.

}
return
static_cast<RdKafka::ErrorCode>
(
Expand All @@ -162,6 +167,7 @@ RdKafka::ProducerImpl::produce (const std::string topic_name,
RD_KAFKA_V_KEY(key, key_len),
RD_KAFKA_V_TIMESTAMP(timestamp),
RD_KAFKA_V_OPAQUE(msg_opaque),
RD_KAFKA_V_HEADERS(hdrs),
RD_KAFKA_V_END)
);
}
142 changes: 139 additions & 3 deletions src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ extern "C" {
struct rd_kafka_s;
struct rd_kafka_topic_s;
struct rd_kafka_message_s;
struct rd_kafka_headers_s;
};

namespace RdKafka {


/**
* @name Miscellaneous APIs
* @{
Expand Down Expand Up @@ -396,6 +396,7 @@ std::string err2str(RdKafka::ErrorCode err);
/* Forward declarations */
class Producer;
class Message;
class Headers;
class Queue;
class Event;
class Topic;
Expand Down Expand Up @@ -1365,7 +1366,139 @@ class RD_EXPORT MessageTimestamp {
int64_t timestamp; /**< Milliseconds since epoch (UTC). */
};

/**
* @brief Headers object
*
* This object encapsulates the C implementation logic into a C++ object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't reference the C API from the public C++ API: that'll require users to know both the C and C++ API.
Instead copy the relevant sections of the C doc strings here and modify for C++ use.

* for use in RdKafka::Messages object.
*/
class RD_EXPORT Headers {
public:
virtual ~Headers() = 0;

/**
* @brief Header object
*
* This object represents a single Header with key value pair
* and an ErrorCode
*
* @remark dynamic allocation of this object is not supported.
*
*/
class Header {
public:
Header(const std::string& key,
const char* value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const char *value

RdKafka::ErrorCode err = ERR_NO_ERROR):
key_(key), err_(err) {
value_container_.assign(value);
};

std::string key() const {
return key_;
}

const char* value() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const char *value..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

header values may be NULL, and are to be considered binary, so use const void * and a size_t length instead

return value_container_.c_str();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

values may be NULL

}

RdKafka::ErrorCode err() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use of the error field?

Copy link
Author

@davidtrihy davidtrihy Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error field was just so when you got one individual Header when it was returned you could pass back if there was an error or not on when trying to get that Header

return err_;
}

private:
std::string key_;
RdKafka::ErrorCode err_;
std::string value_container_;
void *operator new(size_t); /* Prevent dynamic allocation */
};

/**
* @brief create a new instance of the Headers object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capital "C" in "Create", same for the other functions

*/
static Headers *create(size_t initial_size = 8, bool free_rd_headers = true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use default param values:
https://google.github.io/styleguide/cppguide.html#Default_Arguments

Document the @params.

I don't think free_rd_headers should be part of the public API


/**
* @brief create a new instance of the Headers object from a std::vector
*/
static Headers *create(const std::vector<Header> &headers, bool free_rd_headers = true);

/**
* @brief adds a Header to the end
*
* @returns An ErrorCode signalling a success or failure to add the Header.
*/
virtual ErrorCode add(const std::string& key, const char* value) = 0;

/**
* @brief removes all the Headers of a given key
*
* @returns An ErrorCode signalling a success or failure to remove the Header.
*/
virtual ErrorCode remove(const std::string& key) = 0;

/**
* @brief gets all of the Headers of a given key
*
* @returns a std::vector containing all the Headers of the given key.
*/
virtual std::vector<Header> get(const std::string &key) const = 0;

/**
* @brief gets the last occurrence of a Header of a given key
*
* @returns the Header if found, otherwise a Header with an ErrorCode
*/
virtual Header get_last(const std::string& key) const = 0;

/**
* @brief returns all the Headers of a Message
*
* @returns a std::vector containing all of the Headers of a message
*/
virtual std::vector<Header> get_all() const = 0;

/**
* @brief the count of all the Headers
*
* @returns a size_t count of all the headers
*/
virtual size_t size() const = 0;

/**
* @brief Returns the underlying librdkafka C rd_kafka_headers_t handle.
*
* @warning Calling the C API on this handle is not recommended and there
* is no official support for it, but for cases where the C++ API
* does not provide the underlying functionality this C handle can be
* used to interact directly with the core librdkafka API.
*
* @remark The lifetime of the returned pointer can be different than the lifetime
* of the Headers message due to how the producev function in the C API works
* if there is no error then the producev will take care of deallocation
* but if there is an error then it is the responsibility of the calling
* object to deallocate the underlying C implementation if an instance
* of the Headers object is created with free_rd_headers set to `false`
*
* @remark Include <rdkafka/rdkafka.h> prior to including
* <rdkafka/rdkafkacpp.h>
*
* @returns \c rd_kafka_headers_t*
*/
virtual struct rd_kafka_headers_s *c_headers() = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
virtual struct rd_kafka_headers_s *c_headers() = 0;
virtual struct rd_kafka_headers_s *c_ptrs() = 0;


/**
* @brief cleans up the underlying alocated C implementation headers if called
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allocated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would you use this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably an artifact of some experimentation I was doing, with your comment mentioning detaching the headers from the rkmessage I could completely encapsulate the Headers within in this instance and let the destructor manage the lifetime of the rd_headers

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually come to think of it, I think this is still needed in the case where producev fails and doesn't clean up the C headers and in that case you will have to manually clean them up yourself because producev can't set the internal C headers ptr to NULL so the destructor will fail for a double free

*
* @remark Safe to call even if the Headers object is set to clean up when
* when the destructor is called
*
* @remark Safe to call even if the underlyng C pointer is set to null
*
* @returns an ErrorCode signalling if the the operation was attempted
*/
virtual ErrorCode destroy_headers() = 0;
};

/**
* @brief Message object
Expand Down Expand Up @@ -1428,6 +1561,9 @@ class RD_EXPORT Message {
/** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */
virtual void *msg_opaque () const = 0;

/** @returns The Headers instance for this Message (if applicable) */
virtual RdKafka::Headers *get_headers() = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add new methods to end of the class.


virtual ~Message () = 0;

/** @returns the latency in microseconds for a produced message measured
Expand Down Expand Up @@ -2147,8 +2283,8 @@ class RD_EXPORT Producer : public virtual Handle {
int msgflags,
void *payload, size_t len,
const void *key, size_t key_len,
int64_t timestamp,
void *msg_opaque) = 0;
int64_t timestamp, void *msg_opaque,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must not change existing APIs since that will break the API stability guarantee

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted thank you, I'll overload the method

RdKafka::Headers *headers) = 0;


/**
Expand Down
Loading