From 6065784faea0585d00c5196adb2f20cfd220f517 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Fri, 8 Mar 2019 14:52:29 +0000 Subject: [PATCH] Issue 414: Add support for headers --- deps/librdkafka | 2 +- e2e/both.spec.js | 116 +++++++++++++++++++++++++++++++++++++++++++ examples/producer.md | 5 +- lib/producer.js | 5 +- package.json | 2 +- src/common.cc | 24 ++++++++- src/common.h | 2 +- src/producer.cc | 41 +++++++++++++-- src/producer.h | 6 ++- 9 files changed, 188 insertions(+), 15 deletions(-) diff --git a/deps/librdkafka b/deps/librdkafka index 849c066b..a3753a71 160000 --- a/deps/librdkafka +++ b/deps/librdkafka @@ -1 +1 @@ -Subproject commit 849c066b559950b02e37a69256f0cb7b04381d0e +Subproject commit a3753a7167507c22556e6174290b34d1e06128a4 diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 8cb2deaf..33e14126 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -240,6 +240,72 @@ describe('Consumer/Producer', function() { }); }); + it('should be able to produce and consume messages with one header value as string: consumeLoop', function(done) { + var headers = [ + { key: "value" } + ]; + this.timeout(5000); + run_headers_test(done, headers); + }); + + it('should be able to produce and consume messages with one header value as buffer: consumeLoop', function(done) { + var headers = [ + { key: Buffer.from('value') } + ]; + this.timeout(5000); + run_headers_test(done, headers); + }); + + it('should be able to produce and consume messages with one header value as int: consumeLoop', function(done) { + var headers = [ + { key: 10 } + ]; + this.timeout(5000); + run_headers_test(done, headers); + }); + + it('should be able to produce and consume messages with one header value as float: consumeLoop', function(done) { + var headers = [ + { key: 1.11 } + ]; + this.timeout(5000); + run_headers_test(done, headers); + }); + + it('should be able to produce and consume messages with multiple headers value as buffer: consumeLoop', function(done) { + var headers = [ + { key1: Buffer.from('value1') }, + { key2: Buffer.from('value2') }, + { key3: Buffer.from('value3') }, + { key4: Buffer.from('value4') }, + ]; + this.timeout(5000); + run_headers_test(done, headers); + }); + + it('should be able to produce and consume messages with multiple headers value as string: consumeLoop', function(done) { + var headers = [ + { key1: 'value1' }, + { key2: 'value2' }, + { key3: 'value3' }, + { key4: 'value4' }, + ]; + this.timeout(5000); + run_headers_test(done, headers); + }); + + it('should be able to produce and consume messages with multiple headers with mixed values: consumeLoop', function(done) { + var headers = [ + { key1: 'value1' }, + { key2: Buffer.from('value2') }, + { key3: 100 }, + { key4: 10.1 }, + ]; + this.timeout(5000); + run_headers_test(done, headers); + }); + + it('should be able to produce and consume messages: empty key and empty value', function(done) { this.timeout(20000); var key = ''; @@ -408,4 +474,54 @@ describe('Consumer/Producer', function() { }); }); + function assert_headers_match(expectedHeaders, messageHeaders) { + t.equal(expectedHeaders.length, messageHeaders.length, 'Headers length does not match expected length'); + for (var i = 0; i < expectedHeaders.length; i++) { + var expectedKey = Object.keys(expectedHeaders[i])[0]; + var messageKey = Object.keys(messageHeaders[i]); + t.equal(messageKey.length, 1, 'Expected only one Header key'); + t.equal(expectedKey, messageKey[0], 'Expected key does not match message key'); + var expectedValue = Buffer.isBuffer(expectedHeaders[i][expectedKey]) ? + expectedHeaders[i][expectedKey].toString() : + expectedHeaders[i][expectedKey]; + var actualValue = messageHeaders[i][expectedKey].toString(); + t.equal(expectedValue, actualValue, 'invalid message header'); + } + } + + function run_headers_test(done, headers) { + var key = 'key'; + + crypto.randomBytes(4096, function(ex, buffer) { + + producer.setPollInterval(10); + + producer.once('delivery-report', function(err, report) { + if (!err) { + t.equal(topic, report.topic, 'invalid delivery-report topic'); + t.equal(key, report.key, 'invalid delivery-report key'); + t.ok(report.offset >= 0, 'invalid delivery-report offset'); + } + }); + + consumer.on('data', function(message) { + t.equal(buffer.toString(), message.value.toString(), 'invalid message value'); + t.equal(key, message.key, 'invalid message key'); + t.equal(topic, message.topic, 'invalid message topic'); + t.ok(message.offset >= 0, 'invalid message offset'); + assert_headers_match(headers, message.headers); + done(); + }); + + consumer.subscribe([topic]); + consumer.consume(); + + setTimeout(function() { + var timestamp = new Date().getTime(); + producer.produce(topic, null, buffer, key, timestamp, "", headers); + }, 2000); + + }); + } + }); diff --git a/examples/producer.md b/examples/producer.md index d325a49b..eda372c2 100644 --- a/examples/producer.md +++ b/examples/producer.md @@ -47,7 +47,10 @@ producer.on('ready', function(arg) { var key = "key-"+i; // if partition is set to -1, librdkafka will use the default partitioner var partition = -1; - producer.produce(topicName, partition, value, key); + var headers = [ + { header: "header value" } + ] + producer.produce(topicName, partition, value, key, new Date(), "". headers); } //need to keep polling for a while to ensure the delivery reports are received diff --git a/lib/producer.js b/lib/producer.js index 9ffb04cd..49e085c8 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -115,11 +115,12 @@ function Producer(conf, topicConf) { * @param {string} key - The key associated with the message. * @param {number|null} timestamp - Timestamp to send with the message. * @param {object} opaque - An object you want passed along with this message, if provided. + * @param {object} headers - A list of custom key value pairs that provide message metadata. * @throws {LibrdKafkaError} - Throws a librdkafka error if it failed. * @return {boolean} - returns an error if it failed, or true if not * @see Producer#produce */ -Producer.prototype.produce = function(topic, partition, message, key, timestamp, opaque) { +Producer.prototype.produce = function(topic, partition, message, key, timestamp, opaque, headers) { if (!this._isConnected) { throw new Error('Producer not connected'); } @@ -135,7 +136,7 @@ Producer.prototype.produce = function(topic, partition, message, key, timestamp, partition = partition == null ? this.defaultPartition : partition; return this._errorWrap( - this._client.produce(topic, partition, message, key, timestamp, opaque)); + this._client.produce(topic, partition, message, key, timestamp, opaque, headers)); }; diff --git a/package.json b/package.json index 1dbc438a..5bb69345 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "devDependencies": { "bluebird": "^3.5.3", "jsdoc": "^3.4.0", - "jshint": "^2.9.7", + "jshint": "^2.10.1", "mocha": "^5.2.0", "node-gyp": "3.x", "toolkit-jsdoc": "^1.0.0" diff --git a/src/common.cc b/src/common.cc index d9d7d6a3..ec4e20db 100644 --- a/src/common.cc +++ b/src/common.cc @@ -397,10 +397,12 @@ namespace Message { // Overload for all use cases except delivery reports v8::Local ToV8Object(RdKafka::Message *message) { - return ToV8Object(message, true); + return ToV8Object(message, true, true); } -v8::Local ToV8Object(RdKafka::Message *message, bool include_payload) { // NOLINT +v8::Local ToV8Object(RdKafka::Message *message, + bool include_payload, + bool include_headers) { if (message->err() == RdKafka::ERR_NO_ERROR) { v8::Local pack = Nan::New(); @@ -417,6 +419,24 @@ v8::Local ToV8Object(RdKafka::Message *message, bool include_payload Nan::Null()); } + RdKafka::Headers* headers; + if (((headers = message->headers()) != 0) && include_headers) { + v8::Local v8headers = Nan::New(); + int index = 0; + std::vector all = headers->get_all(); + for (std::vector::iterator it = all.begin(); + it != all.end(); it++) { + v8::Local v8header = Nan::New(); + Nan::Set(v8header, Nan::New(it->key()).ToLocalChecked(), + Nan::Encode(it->value_string(), + it->value_size(), Nan::Encoding::BUFFER)); + v8headers->Set(index, v8header); + index++; + } + Nan::Set(pack, + Nan::New("headers").ToLocalChecked(), v8headers); + } + Nan::Set(pack, Nan::New("size").ToLocalChecked(), Nan::New(message->len())); diff --git a/src/common.h b/src/common.h index 73c60aaa..1509aeed 100644 --- a/src/common.h +++ b/src/common.h @@ -119,7 +119,7 @@ v8::Local ToV8Object(RdKafka::Metadata*); namespace Message { v8::Local ToV8Object(RdKafka::Message*); -v8::Local ToV8Object(RdKafka::Message*, bool); +v8::Local ToV8Object(RdKafka::Message*, bool, bool); } diff --git a/src/producer.cc b/src/producer.cc index 74503f43..ff93884d 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -258,10 +258,11 @@ Baton Producer::Produce(void* message, size_t size, RdKafka::Topic* topic, * @return - A baton object with error code set if it failed. */ Baton Producer::Produce(void* message, size_t size, std::string topic, - int32_t partition, std::string *key, int64_t timestamp, void* opaque) { + int32_t partition, std::string *key, int64_t timestamp, void* opaque, + RdKafka::Headers* headers) { return Produce(message, size, topic, partition, key ? key->data() : NULL, key ? key->size() : 0, - timestamp, opaque); + timestamp, opaque, headers); } /** @@ -279,7 +280,7 @@ Baton Producer::Produce(void* message, size_t size, std::string topic, */ Baton Producer::Produce(void* message, size_t size, std::string topic, int32_t partition, const void *key, size_t key_len, - int64_t timestamp, void* opaque) { + int64_t timestamp, void* opaque, RdKafka::Headers* headers) { RdKafka::ErrorCode response_code; if (IsConnected()) { @@ -291,7 +292,7 @@ Baton Producer::Produce(void* message, size_t size, std::string topic, RdKafka::Producer::RK_MSG_COPY, message, size, key, key_len, - timestamp, opaque); + timestamp, headers, opaque); } else { response_code = RdKafka::ERR__STATE; } @@ -437,6 +438,32 @@ NAN_METHOD(Producer::NodeProduce) { // v8::Local object = Nan::New(persistent); } + std::vector headers; + if (info.Length() > 6 && !info[6]->IsUndefined()) { + v8::Local v8Headers = v8::Local::Cast(info[6]); + + if (v8Headers->Length() >= 1) { + for (unsigned int i = 0; i < v8Headers->Length(); i++) { + v8::Local header = v8Headers->Get(i)->ToObject(); + if (header.IsEmpty()) { + continue; + } + v8::Local props = header->GetOwnPropertyNames(); + Nan::MaybeLocal v8Key = Nan::To(props->Get(0)); + Nan::MaybeLocal v8Value = + Nan::To(header->Get(v8Key.ToLocalChecked())); + + Nan::Utf8String uKey(v8Key.ToLocalChecked()); + std::string key(*uKey); + + Nan::Utf8String uValue(v8Value.ToLocalChecked()); + std::string value(*uValue); + headers.push_back( + RdKafka::Headers::Header(key, value.c_str(), value.size())); + } + } +} + Producer* producer = ObjectWrap::Unwrap(info.This()); // Let the JS library throw if we need to so the error can be more rich @@ -446,12 +473,16 @@ NAN_METHOD(Producer::NodeProduce) { // Get string pointer for this thing Nan::Utf8String topicUTF8(info[0]->ToString()); std::string topic_name(*topicUTF8); + RdKafka::Headers *rd_headers = RdKafka::Headers::create(headers); Baton b = producer->Produce(message_buffer_data, message_buffer_length, topic_name, partition, key_buffer_data, key_buffer_length, - timestamp, opaque); + timestamp, opaque, rd_headers); error_code = static_cast(b.err()); + if (error_code != 0 && rd_headers) { + delete rd_headers; + } } else { // First parameter is a topic OBJECT Topic* topic = ObjectWrap::Unwrap(info[0].As()); diff --git a/src/producer.h b/src/producer.h index b7109d17..0b54215f 100644 --- a/src/producer.h +++ b/src/producer.h @@ -66,12 +66,14 @@ class Producer : public Connection { Baton Produce(void* message, size_t message_size, std::string topic, int32_t partition, std::string* key, - int64_t timestamp, void* opaque); + int64_t timestamp, void* opaque, + RdKafka::Headers* headers); Baton Produce(void* message, size_t message_size, std::string topic, int32_t partition, const void* key, size_t key_len, - int64_t timestamp, void* opaque); + int64_t timestamp, void* opaque, + RdKafka::Headers* headers); std::string Name();