Skip to content

Commit

Permalink
Issue 414: Add support for headers
Browse files Browse the repository at this point in the history
davidtrihy-genesys authored and webmakersteve committed Jun 1, 2019
1 parent a2f3f78 commit 6065784
Showing 9 changed files with 188 additions and 15 deletions.
2 changes: 1 addition & 1 deletion deps/librdkafka
116 changes: 116 additions & 0 deletions e2e/both.spec.js
Original file line number Diff line number Diff line change
@@ -240,6 +240,72 @@ describe('Consumer/Producer', function() {
});
});

it('should be able to produce and consume messages with one header value as string: consumeLoop', function(done) {
var headers = [
{ key: "value" }
];
this.timeout(5000);
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with one header value as buffer: consumeLoop', function(done) {
var headers = [
{ key: Buffer.from('value') }
];
this.timeout(5000);
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with one header value as int: consumeLoop', function(done) {
var headers = [
{ key: 10 }
];
this.timeout(5000);
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with one header value as float: consumeLoop', function(done) {
var headers = [
{ key: 1.11 }
];
this.timeout(5000);
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with multiple headers value as buffer: consumeLoop', function(done) {
var headers = [
{ key1: Buffer.from('value1') },
{ key2: Buffer.from('value2') },
{ key3: Buffer.from('value3') },
{ key4: Buffer.from('value4') },
];
this.timeout(5000);
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with multiple headers value as string: consumeLoop', function(done) {
var headers = [
{ key1: 'value1' },
{ key2: 'value2' },
{ key3: 'value3' },
{ key4: 'value4' },
];
this.timeout(5000);
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with multiple headers with mixed values: consumeLoop', function(done) {
var headers = [
{ key1: 'value1' },
{ key2: Buffer.from('value2') },
{ key3: 100 },
{ key4: 10.1 },
];
this.timeout(5000);
run_headers_test(done, headers);
});


it('should be able to produce and consume messages: empty key and empty value', function(done) {
this.timeout(20000);
var key = '';
@@ -408,4 +474,54 @@ describe('Consumer/Producer', function() {
});
});

function assert_headers_match(expectedHeaders, messageHeaders) {
t.equal(expectedHeaders.length, messageHeaders.length, 'Headers length does not match expected length');
for (var i = 0; i < expectedHeaders.length; i++) {
var expectedKey = Object.keys(expectedHeaders[i])[0];
var messageKey = Object.keys(messageHeaders[i]);
t.equal(messageKey.length, 1, 'Expected only one Header key');
t.equal(expectedKey, messageKey[0], 'Expected key does not match message key');
var expectedValue = Buffer.isBuffer(expectedHeaders[i][expectedKey]) ?
expectedHeaders[i][expectedKey].toString() :
expectedHeaders[i][expectedKey];
var actualValue = messageHeaders[i][expectedKey].toString();
t.equal(expectedValue, actualValue, 'invalid message header');
}
}

function run_headers_test(done, headers) {
var key = 'key';

crypto.randomBytes(4096, function(ex, buffer) {

producer.setPollInterval(10);

producer.once('delivery-report', function(err, report) {
if (!err) {
t.equal(topic, report.topic, 'invalid delivery-report topic');
t.equal(key, report.key, 'invalid delivery-report key');
t.ok(report.offset >= 0, 'invalid delivery-report offset');
}
});

consumer.on('data', function(message) {
t.equal(buffer.toString(), message.value.toString(), 'invalid message value');
t.equal(key, message.key, 'invalid message key');
t.equal(topic, message.topic, 'invalid message topic');
t.ok(message.offset >= 0, 'invalid message offset');
assert_headers_match(headers, message.headers);
done();
});

consumer.subscribe([topic]);
consumer.consume();

setTimeout(function() {
var timestamp = new Date().getTime();
producer.produce(topic, null, buffer, key, timestamp, "", headers);
}, 2000);

});
}

});
5 changes: 4 additions & 1 deletion examples/producer.md
Original file line number Diff line number Diff line change
@@ -47,7 +47,10 @@ producer.on('ready', function(arg) {
var key = "key-"+i;
// if partition is set to -1, librdkafka will use the default partitioner
var partition = -1;
producer.produce(topicName, partition, value, key);
var headers = [
{ header: "header value" }
]
producer.produce(topicName, partition, value, key, new Date(), "". headers);
}

//need to keep polling for a while to ensure the delivery reports are received
5 changes: 3 additions & 2 deletions lib/producer.js
Original file line number Diff line number Diff line change
@@ -115,11 +115,12 @@ function Producer(conf, topicConf) {
* @param {string} key - The key associated with the message.
* @param {number|null} timestamp - Timestamp to send with the message.
* @param {object} opaque - An object you want passed along with this message, if provided.
* @param {object} headers - A list of custom key value pairs that provide message metadata.
* @throws {LibrdKafkaError} - Throws a librdkafka error if it failed.
* @return {boolean} - returns an error if it failed, or true if not
* @see Producer#produce
*/
Producer.prototype.produce = function(topic, partition, message, key, timestamp, opaque) {
Producer.prototype.produce = function(topic, partition, message, key, timestamp, opaque, headers) {
if (!this._isConnected) {
throw new Error('Producer not connected');
}
@@ -135,7 +136,7 @@ Producer.prototype.produce = function(topic, partition, message, key, timestamp,
partition = partition == null ? this.defaultPartition : partition;

return this._errorWrap(
this._client.produce(topic, partition, message, key, timestamp, opaque));
this._client.produce(topic, partition, message, key, timestamp, opaque, headers));

};

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@
"devDependencies": {
"bluebird": "^3.5.3",
"jsdoc": "^3.4.0",
"jshint": "^2.9.7",
"jshint": "^2.10.1",
"mocha": "^5.2.0",
"node-gyp": "3.x",
"toolkit-jsdoc": "^1.0.0"
24 changes: 22 additions & 2 deletions src/common.cc
Original file line number Diff line number Diff line change
@@ -397,10 +397,12 @@ namespace Message {

// Overload for all use cases except delivery reports
v8::Local<v8::Object> ToV8Object(RdKafka::Message *message) {
return ToV8Object(message, true);
return ToV8Object(message, true, true);
}

v8::Local<v8::Object> ToV8Object(RdKafka::Message *message, bool include_payload) { // NOLINT
v8::Local<v8::Object> ToV8Object(RdKafka::Message *message,
bool include_payload,
bool include_headers) {
if (message->err() == RdKafka::ERR_NO_ERROR) {
v8::Local<v8::Object> pack = Nan::New<v8::Object>();

@@ -417,6 +419,24 @@ v8::Local<v8::Object> ToV8Object(RdKafka::Message *message, bool include_payload
Nan::Null());
}

RdKafka::Headers* headers;
if (((headers = message->headers()) != 0) && include_headers) {
v8::Local<v8::Array> v8headers = Nan::New<v8::Array>();
int index = 0;
std::vector<RdKafka::Headers::Header> all = headers->get_all();
for (std::vector<RdKafka::Headers::Header>::iterator it = all.begin();
it != all.end(); it++) {
v8::Local<v8::Object> v8header = Nan::New<v8::Object>();
Nan::Set(v8header, Nan::New<v8::String>(it->key()).ToLocalChecked(),
Nan::Encode(it->value_string(),
it->value_size(), Nan::Encoding::BUFFER));
v8headers->Set(index, v8header);
index++;
}
Nan::Set(pack,
Nan::New<v8::String>("headers").ToLocalChecked(), v8headers);
}

Nan::Set(pack, Nan::New<v8::String>("size").ToLocalChecked(),
Nan::New<v8::Number>(message->len()));

2 changes: 1 addition & 1 deletion src/common.h
Original file line number Diff line number Diff line change
@@ -119,7 +119,7 @@ v8::Local<v8::Object> ToV8Object(RdKafka::Metadata*);
namespace Message {

v8::Local<v8::Object> ToV8Object(RdKafka::Message*);
v8::Local<v8::Object> ToV8Object(RdKafka::Message*, bool);
v8::Local<v8::Object> ToV8Object(RdKafka::Message*, bool, bool);

}

41 changes: 36 additions & 5 deletions src/producer.cc
Original file line number Diff line number Diff line change
@@ -258,10 +258,11 @@ Baton Producer::Produce(void* message, size_t size, RdKafka::Topic* topic,
* @return - A baton object with error code set if it failed.
*/
Baton Producer::Produce(void* message, size_t size, std::string topic,
int32_t partition, std::string *key, int64_t timestamp, void* opaque) {
int32_t partition, std::string *key, int64_t timestamp, void* opaque,
RdKafka::Headers* headers) {
return Produce(message, size, topic, partition,
key ? key->data() : NULL, key ? key->size() : 0,
timestamp, opaque);
timestamp, opaque, headers);
}

/**
@@ -279,7 +280,7 @@ Baton Producer::Produce(void* message, size_t size, std::string topic,
*/
Baton Producer::Produce(void* message, size_t size, std::string topic,
int32_t partition, const void *key, size_t key_len,
int64_t timestamp, void* opaque) {
int64_t timestamp, void* opaque, RdKafka::Headers* headers) {
RdKafka::ErrorCode response_code;

if (IsConnected()) {
@@ -291,7 +292,7 @@ Baton Producer::Produce(void* message, size_t size, std::string topic,
RdKafka::Producer::RK_MSG_COPY,
message, size,
key, key_len,
timestamp, opaque);
timestamp, headers, opaque);
} else {
response_code = RdKafka::ERR__STATE;
}
@@ -437,6 +438,32 @@ NAN_METHOD(Producer::NodeProduce) {
// v8::Local<v8::Object> object = Nan::New(persistent);
}

std::vector<RdKafka::Headers::Header> headers;
if (info.Length() > 6 && !info[6]->IsUndefined()) {
v8::Local<v8::Array> v8Headers = v8::Local<v8::Array>::Cast(info[6]);

if (v8Headers->Length() >= 1) {
for (unsigned int i = 0; i < v8Headers->Length(); i++) {
v8::Local<v8::Object> header = v8Headers->Get(i)->ToObject();
if (header.IsEmpty()) {
continue;
}
v8::Local<v8::Array> props = header->GetOwnPropertyNames();
Nan::MaybeLocal<v8::String> v8Key = Nan::To<v8::String>(props->Get(0));
Nan::MaybeLocal<v8::String> v8Value =
Nan::To<v8::String>(header->Get(v8Key.ToLocalChecked()));

Nan::Utf8String uKey(v8Key.ToLocalChecked());
std::string key(*uKey);

Nan::Utf8String uValue(v8Value.ToLocalChecked());
std::string value(*uValue);
headers.push_back(
RdKafka::Headers::Header(key, value.c_str(), value.size()));
}
}
}

Producer* producer = ObjectWrap::Unwrap<Producer>(info.This());

// Let the JS library throw if we need to so the error can be more rich
@@ -446,12 +473,16 @@ NAN_METHOD(Producer::NodeProduce) {
// Get string pointer for this thing
Nan::Utf8String topicUTF8(info[0]->ToString());
std::string topic_name(*topicUTF8);
RdKafka::Headers *rd_headers = RdKafka::Headers::create(headers);

Baton b = producer->Produce(message_buffer_data, message_buffer_length,
topic_name, partition, key_buffer_data, key_buffer_length,
timestamp, opaque);
timestamp, opaque, rd_headers);

error_code = static_cast<int>(b.err());
if (error_code != 0 && rd_headers) {
delete rd_headers;
}
} else {
// First parameter is a topic OBJECT
Topic* topic = ObjectWrap::Unwrap<Topic>(info[0].As<v8::Object>());
6 changes: 4 additions & 2 deletions src/producer.h
Original file line number Diff line number Diff line change
@@ -66,12 +66,14 @@ class Producer : public Connection {
Baton Produce(void* message, size_t message_size,
std::string topic, int32_t partition,
std::string* key,
int64_t timestamp, void* opaque);
int64_t timestamp, void* opaque,
RdKafka::Headers* headers);

Baton Produce(void* message, size_t message_size,
std::string topic, int32_t partition,
const void* key, size_t key_len,
int64_t timestamp, void* opaque);
int64_t timestamp, void* opaque,
RdKafka::Headers* headers);

std::string Name();

0 comments on commit 6065784

Please sign in to comment.