From e9e888484d30645bb0fd268afe939578a7484f0b Mon Sep 17 00:00:00 2001 From: Bryce McDonnell Date: Wed, 12 Oct 2016 10:04:59 -0400 Subject: [PATCH 1/6] Adds TLSConfiguration class --- lib/logstash-logger/device.rb | 1 + lib/logstash-logger/device/kafka.rb | 3 + lib/logstash-logger/device/kafka_new.rb | 83 +++++++++++++ logstash-logger.gemspec | 1 + spec/device/kafka_new_spec.rb | 156 ++++++++++++++++++++++++ 5 files changed, 244 insertions(+) create mode 100644 lib/logstash-logger/device/kafka_new.rb create mode 100644 spec/device/kafka_new_spec.rb diff --git a/lib/logstash-logger/device.rb b/lib/logstash-logger/device.rb index 96b9164..8cd81b8 100644 --- a/lib/logstash-logger/device.rb +++ b/lib/logstash-logger/device.rb @@ -12,6 +12,7 @@ module Device autoload :Unix, 'logstash-logger/device/unix' autoload :Redis, 'logstash-logger/device/redis' autoload :Kafka, 'logstash-logger/device/kafka' + autoload :KafkaNew, 'logstash-logger/device/kafka_new' autoload :File, 'logstash-logger/device/file' autoload :IO, 'logstash-logger/device/io' autoload :Stdout, 'logstash-logger/device/stdout' diff --git a/lib/logstash-logger/device/kafka.rb b/lib/logstash-logger/device/kafka.rb index a7e12b9..72e0a59 100644 --- a/lib/logstash-logger/device/kafka.rb +++ b/lib/logstash-logger/device/kafka.rb @@ -13,6 +13,9 @@ class Kafka < Connectable attr_accessor :hosts, :topic, :producer, :backoff def initialize(opts) + + # TODO: improve this message and update README + warn "[DEPRECATED] Poseidon is deprecated, update client" super host = opts[:host] || DEFAULT_HOST port = opts[:port] || DEFAULT_PORT diff --git a/lib/logstash-logger/device/kafka_new.rb b/lib/logstash-logger/device/kafka_new.rb new file mode 100644 index 0000000..9196aca --- /dev/null +++ b/lib/logstash-logger/device/kafka_new.rb @@ -0,0 +1,83 @@ +module LogStashLogger + module Device + class KafkaNew < Connectable + class TLSConfiguration + attr_reader :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key + + def initialize(opts = {}) + @ssl_ca_cert = opts[:ssl_ca_cert] + @ssl_client_cert = opts[:ssl_client_cert] + @ssl_client_cert_key = opts[:ssl_client_cert_key] + end + + def cert_bundle + @cert_bundle ||= all_cert_params? ? cert_params_as_hash : {} + end + + def valid? + all_cert_params? || no_cert_params? + end + + def invalid? + !valid? + end + private + + def cert_params_as_hash + { ssl_ca_cert: @ssl_ca_cert, + ssl_client_cert: @ssl_client_cert, + ssl_client_cert_key: @ssl_client_cert_key, + } + end + + def all_cert_params? + cert_params_as_hash.values.compact.length == valid_cert_params_length + end + + def no_cert_params? + cert_params_as_hash.values.compact.empty? + end + + def valid_cert_params_length + cert_params_as_hash.keys.length + end + end + + attr_reader :topic, :brokers, :cert_bundle, :kafka_tls_configurator + + + # TODO: support client_id + def initialize(opts = {}, kafka_tls_configurator = TLSConfiguration) + require 'ruby-kafka' + + @kafka_tls_configurator = kafka_tls_configurator + @brokers = make_brokers_array(opts[:brokers]) + make_cert_bundle(opts) + end + + def connect + connect_opts = @cert_bundle.merge({ seed_brokers: @brokers }) + ::Kafka.new(connect_opts) + end + + private + + def make_brokers_array(opt) + case opt + when Array + opt + when String + opt.split("\s") + end + end + + def make_cert_bundle(opts) + tls_conf = kafka_tls_configurator.new(opts) + if tls_conf.invalid? + fail ArgumentError, "all ssl parameters (ssl_ca_cert, ssl_client_cert and ssl_client_cert_key) are required or do use any of them to not use TLS" + end + @cert_bundle ||= tls_conf.cert_bundle + end + end + end +end diff --git a/logstash-logger.gemspec b/logstash-logger.gemspec index eb57766..e3b49d5 100644 --- a/logstash-logger.gemspec +++ b/logstash-logger.gemspec @@ -26,6 +26,7 @@ Gem::Specification.new do |gem| end gem.add_development_dependency 'redis' gem.add_development_dependency 'poseidon' + gem.add_development_dependency 'ruby-kafka' if RUBY_VERSION < '2' || defined?(JRUBY_VERSION) gem.add_development_dependency 'SyslogLogger' diff --git a/spec/device/kafka_new_spec.rb b/spec/device/kafka_new_spec.rb new file mode 100644 index 0000000..e32250c --- /dev/null +++ b/spec/device/kafka_new_spec.rb @@ -0,0 +1,156 @@ +require 'logstash-logger' + +describe LogStashLogger::Device::KafkaNew::TLSConfiguration do + context "with TLS" do + let(:complete_bundle) do + # NOTE these keys are obviously fake. We don't actually make connections + { + ssl_ca_cert: "----BEGIN CERT---- lkajdslkajdsjk ----END CERT---", + ssl_client_cert: "----BEGIN CERT---- lkajdslkajdsjk ----END CERT---", + ssl_client_cert_key: "----PRIVATE---- lkajdslkajdsjk ----PRIVATE---", + } + end + + context "when complete params are passed in" do + let(:instance) { described_class.new(complete_bundle) } + + it "returns an empty hash when no ssl params are initialized" do + expect(instance.cert_bundle).to_not be_empty + expect(instance.valid?).to be_truthy + end + end + + context "when incomplete params are passed in" do + [:ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key].each do |param| + it "fails with an ArgumentError when #{param} no provided" do + opts = complete_bundle + opts[param] = nil + instance = described_class.new(opts) + expect(instance.cert_bundle).to be_empty + expect(instance.valid?).to be_falsey + end + end + end + end + + context "without TLS" do + let(:instance) { subject } + + it "returns an empty hash when no ssl params are initialized" do + expect(instance.cert_bundle).to be_empty + expect(instance.valid?).to be_truthy + end + end + + context "#invalid?" do + it 'is just the opposite of valid?' do + expect(subject).to receive(:valid?).and_return(false) + expect(subject.invalid?).to be_truthy + + expect(subject).to receive(:valid?).and_return(true) + expect(subject.invalid?).to be_falsey + end + end + +end + +describe LogStashLogger::Device::KafkaNew do + include_context 'device' + + let(:broker_hosts) { "localhost:9300 localhost:9232" } + let(:instance) { LogStashLogger::Device::KafkaNew.new({brokers: broker_hosts}) } + + describe "initializing" do + context "brokers" do + context "when array" do + it "sets the brokers array to @brokers" do + brokers = %w(localhost:9300 localhost:9232) + instance = LogStashLogger::Device::KafkaNew.new({brokers: brokers}) + + expect(instance.brokers).to be_kind_of Array + expect(instance.brokers.length).to eql(2) + end + + it 'sets the brokers to an array if a string is passed in' do + brokers = "localhost:9300 localhost:9232" + instance = LogStashLogger::Device::KafkaNew.new({brokers: brokers}) + expect(instance.brokers).to be_kind_of Array + expect(instance.brokers.length).to eql(2) + end + end + end + + context "topic" do + it 'sets the topic to the option provided' do + instance = described + end + end + + context "cert bundle" do + # YUCK! ruby-kafka does not presently allow reading certain variables + module ::Kafka + class Client + attr_reader :connection_builder + end + end + + + module ::Kafka + class ConnectionBuilder + attr_reader :ssl_context + end + end + + let(:complete_bundle) do + # NOTE these keys are obviously fake. We don't actually make connections + { + ssl_ca_cert: "----BEGIN CERT---- lkajdslkajdsjk ----END CERT---", + ssl_client_cert: "----BEGIN CERT---- lkajdslkajdsjk ----END CERT---", + ssl_client_cert_key: "----PRIVATE---- lkajdslkajdsjk ----PRIVATE---", + } + end + + context "no certs" do + it 'creates a connection without an ssl_context' do + connection = instance.connect + expect(connection.connection_builder.ssl_context).to be_nil + end + end + + context "partial certs passed in" do + it 'fails if the complete cert suite is not passed in' do + [:ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key].each do |param| + opts = complete_bundle.merge(brokers: broker_hosts) + opts[param] = nil + expect { + LogStashLogger::Device::KafkaNew.new(opts).connect + }.to raise_error( ArgumentError ) + end + end + end + + context "complete cert bundle" do + + it 'correctly passes in the cert bundle to the Kafka Client' do + opts = complete_bundle.merge(brokers: broker_hosts) + + expect_any_instance_of(::Kafka::Client).to receive(:build_ssl_context) + .with(opts[:ssl_ca_cert], opts[:ssl_client_cert], opts[:ssl_client_cert_key]) + .and_return(true) + + LogStashLogger::Device::KafkaNew.new(opts).connect + end + end + end + end + + describe "connecting" do + context "without certs" do + it "creates a connection object" do + # watch out for naming conflicts with poseidon! + # Both gems 'own' the namespace 'Kafka' + expect(instance.connect).to be_kind_of ::Kafka::Client + end + end + end +end From 08ae61835ad9f9e7dc9779f38402ae161f056476 Mon Sep 17 00:00:00 2001 From: Bryce McDonnell Date: Wed, 12 Oct 2016 10:43:59 -0400 Subject: [PATCH 2/6] Support for client_id, write_one message --- lib/logstash-logger/device/kafka_new.rb | 28 ++++-- spec/device/kafka_new_spec.rb | 112 +++++++++++++++--------- 2 files changed, 93 insertions(+), 47 deletions(-) diff --git a/lib/logstash-logger/device/kafka_new.rb b/lib/logstash-logger/device/kafka_new.rb index 9196aca..509f6d2 100644 --- a/lib/logstash-logger/device/kafka_new.rb +++ b/lib/logstash-logger/device/kafka_new.rb @@ -43,25 +43,43 @@ def valid_cert_params_length end end - attr_reader :topic, :brokers, :cert_bundle, :kafka_tls_configurator + attr_reader :topic, :brokers, :cert_bundle, :kafka_tls_configurator, + :client_id - # TODO: support client_id def initialize(opts = {}, kafka_tls_configurator = TLSConfiguration) require 'ruby-kafka' + super(opts) + @client_id = opts[:client_id] + @topic = opts[:topic] || raise_no_topic_set! @kafka_tls_configurator = kafka_tls_configurator @brokers = make_brokers_array(opts[:brokers]) make_cert_bundle(opts) end - def connect - connect_opts = @cert_bundle.merge({ seed_brokers: @brokers }) - ::Kafka.new(connect_opts) + def connection + @connection ||= ::Kafka.new(kafka_client_connection_hash) + end + + def write_one(message, topic=@topic) + kproducer = connection.producer + kproducer.produce(message, topic: topic) + kproducer.deliver_messages end private + def kafka_client_connection_hash + { seed_brokers: @brokers, + client_id: @client_id, + }.merge(@cert_bundle) + end + + def raise_no_topic_set! + fail ArgumentError, "a topic must be configured" + end + def make_brokers_array(opt) case opt when Array diff --git a/spec/device/kafka_new_spec.rb b/spec/device/kafka_new_spec.rb index e32250c..32da1b8 100644 --- a/spec/device/kafka_new_spec.rb +++ b/spec/device/kafka_new_spec.rb @@ -11,6 +11,8 @@ } end + + context "when complete params are passed in" do let(:instance) { described_class.new(complete_bundle) } @@ -58,14 +60,19 @@ include_context 'device' let(:broker_hosts) { "localhost:9300 localhost:9232" } - let(:instance) { LogStashLogger::Device::KafkaNew.new({brokers: broker_hosts}) } + let(:opts) do + { topic: 'hello-world', + brokers: broker_hosts, + } + end + let(:instance) { LogStashLogger::Device::KafkaNew.new(opts) } describe "initializing" do context "brokers" do context "when array" do it "sets the brokers array to @brokers" do brokers = %w(localhost:9300 localhost:9232) - instance = LogStashLogger::Device::KafkaNew.new({brokers: brokers}) + instance = LogStashLogger::Device::KafkaNew.new(opts.merge({brokers: brokers})) expect(instance.brokers).to be_kind_of Array expect(instance.brokers.length).to eql(2) @@ -73,7 +80,7 @@ it 'sets the brokers to an array if a string is passed in' do brokers = "localhost:9300 localhost:9232" - instance = LogStashLogger::Device::KafkaNew.new({brokers: brokers}) + instance = LogStashLogger::Device::KafkaNew.new(opts.merge({brokers: brokers})) expect(instance.brokers).to be_kind_of Array expect(instance.brokers.length).to eql(2) end @@ -82,11 +89,19 @@ context "topic" do it 'sets the topic to the option provided' do - instance = described + instance = described_class.new(topic: 'hello-world') + expect(instance.topic).to eql('hello-world') + end + + it 'raises an exception if no topic is set' do + expect { + described_class.new(topic: nil) + }.to raise_error(ArgumentError) end end - context "cert bundle" do + + context "Client Introspection" do # YUCK! ruby-kafka does not presently allow reading certain variables module ::Kafka class Client @@ -94,63 +109,76 @@ class Client end end - module ::Kafka class ConnectionBuilder - attr_reader :ssl_context + attr_reader :ssl_context, :client_id end end - let(:complete_bundle) do - # NOTE these keys are obviously fake. We don't actually make connections - { - ssl_ca_cert: "----BEGIN CERT---- lkajdslkajdsjk ----END CERT---", - ssl_client_cert: "----BEGIN CERT---- lkajdslkajdsjk ----END CERT---", - ssl_client_cert_key: "----PRIVATE---- lkajdslkajdsjk ----PRIVATE---", - } + context "client_id" do + it 'sets a client_id when connecting if one is passed in to the options' do + instance = described_class.new(opts.merge(client_id: 'hello-world')) + expect(instance.client_id).to eql('hello-world') + expect(instance.connection.connection_builder.client_id).to eql('hello-world') + end end - context "no certs" do - it 'creates a connection without an ssl_context' do - connection = instance.connect - expect(connection.connection_builder.ssl_context).to be_nil + context "cert bundle" do + let(:complete_bundle) do + # NOTE these keys are obviously fake. We don't actually make connections + { + ssl_ca_cert: "----BEGIN CERT---- lkajdslkajdsjk ----END CERT---", + ssl_client_cert: "----BEGIN CERT---- lkajdslkajdsjk ----END CERT---", + ssl_client_cert_key: "----PRIVATE---- lkajdslkajdsjk ----PRIVATE---", + } end - end - context "partial certs passed in" do - it 'fails if the complete cert suite is not passed in' do - [:ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key].each do |param| - opts = complete_bundle.merge(brokers: broker_hosts) - opts[param] = nil - expect { - LogStashLogger::Device::KafkaNew.new(opts).connect - }.to raise_error( ArgumentError ) + context "no certs" do + it 'creates a connection without an ssl_context' do + connection = instance.connection + expect(connection.connection_builder.ssl_context).to be_nil end end - end - context "complete cert bundle" do + context "partial certs passed in" do + it 'fails if the complete cert suite is not passed in' do + [:ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key].each do |param| + opts = complete_bundle.merge(brokers: broker_hosts) + opts[param] = nil + expect { + LogStashLogger::Device::KafkaNew.new(opts).connection + }.to raise_error( ArgumentError ) + end + end + end - it 'correctly passes in the cert bundle to the Kafka Client' do - opts = complete_bundle.merge(brokers: broker_hosts) + context "complete cert bundle" do - expect_any_instance_of(::Kafka::Client).to receive(:build_ssl_context) - .with(opts[:ssl_ca_cert], opts[:ssl_client_cert], opts[:ssl_client_cert_key]) - .and_return(true) + it 'correctly passes in the cert bundle to the Kafka Client' do + certopts = complete_bundle.merge(opts) - LogStashLogger::Device::KafkaNew.new(opts).connect + expect_any_instance_of(::Kafka::Client).to receive(:build_ssl_context) + .with(certopts[:ssl_ca_cert], certopts[:ssl_client_cert], certopts[:ssl_client_cert_key]) + .and_return(true) + + LogStashLogger::Device::KafkaNew.new(certopts).connection + end end end end end - describe "connecting" do - context "without certs" do - it "creates a connection object" do - # watch out for naming conflicts with poseidon! - # Both gems 'own' the namespace 'Kafka' - expect(instance.connect).to be_kind_of ::Kafka::Client - end + describe "writing single message to broker" do + it 'writes the message to the topic' do + producer = double('producer', produce: true) + connect_double = double("connection", producer: producer) + instance = described_class.new(opts) + + expect(instance).to receive(:connection).and_return(connect_double) + expect(connect_double).to receive(:producer) + expect(producer).to receive(:produce).and_return(true) + expect(producer).to receive(:deliver_messages).and_return(true) + instance.write_one("hello world") end end end From ce5e3a76a30369f22fe138679ad5d8f4cc290d07 Mon Sep 17 00:00:00 2001 From: Bryce McDonnell Date: Wed, 12 Oct 2016 11:00:09 -0400 Subject: [PATCH 3/6] Write messages to topic --- lib/logstash-logger/device/kafka_new.rb | 18 +++++++++-- spec/device/kafka_new_spec.rb | 42 +++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/lib/logstash-logger/device/kafka_new.rb b/lib/logstash-logger/device/kafka_new.rb index 509f6d2..19d49d2 100644 --- a/lib/logstash-logger/device/kafka_new.rb +++ b/lib/logstash-logger/device/kafka_new.rb @@ -63,13 +63,25 @@ def connection end def write_one(message, topic=@topic) - kproducer = connection.producer - kproducer.produce(message, topic: topic) - kproducer.deliver_messages + write_messages_to_broker_and_deliver do |producer| + producer.produce(message, topic: topic) + end + end + + def write_batch(messages, topic = @topic) + write_messages_to_broker_and_deliver do |producer| + messages.each {|message| producer.produce(message, topic: topic) } + end end private + def write_messages_to_broker_and_deliver(&block) + kproducer = connection.producer + block.call(kproducer) if block_given? + kproducer.deliver_messages + end + def kafka_client_connection_hash { seed_brokers: @brokers, client_id: @client_id, diff --git a/spec/device/kafka_new_spec.rb b/spec/device/kafka_new_spec.rb index 32da1b8..eea1bb3 100644 --- a/spec/device/kafka_new_spec.rb +++ b/spec/device/kafka_new_spec.rb @@ -174,11 +174,53 @@ class ConnectionBuilder connect_double = double("connection", producer: producer) instance = described_class.new(opts) + # NOTE: this is stubbing out the ruby-kafka API expect(instance).to receive(:connection).and_return(connect_double) expect(connect_double).to receive(:producer) expect(producer).to receive(:produce).and_return(true) expect(producer).to receive(:deliver_messages).and_return(true) instance.write_one("hello world") end + + it 'is capabable of writing to a different topic than instantiated' do + producer = double('producer', produce: lambda {|message, topic| "hi" }) + connect_double = double("connection", producer: producer) + instance = described_class.new(opts) + + message = 'hello world' + topic = 'my topic' + # NOTE: this is stubbing out the ruby-kafka API + expect(instance).to receive(:connection).and_return(connect_double) + expect(connect_double).to receive(:producer) + expect(producer).to receive(:produce). + with(message, topic: topic). + and_return(true) + expect(producer).to receive(:deliver_messages).and_return(true) + + instance.write_one(message, topic) + end + end + + describe "writing a batch of messages to the broker" do + it "writes the messages to the topic" do + producer = double('producer', produce: lambda {|message, topic| "hi" }) + connect_double = double("connection", producer: producer) + instance = described_class.new(opts) + + messages = ['hello world', 'goodbye world'] + topic = 'my topic' + # NOTE: this is stubbing out the ruby-kafka API + expect(instance).to receive(:connection).and_return(connect_double) + expect(connect_double).to receive(:producer) + expect(producer).to receive(:produce). + with(messages.first, topic: topic). + and_return(true) + expect(producer).to receive(:produce). + with(messages.last, topic: topic). + and_return(true) + expect(producer).to receive(:deliver_messages).and_return(true) + + instance.write_batch(messages, topic) + end end end From b4f3f9e8114a31d5eae2ef71158b6fe58aa6da37 Mon Sep 17 00:00:00 2001 From: Bryce McDonnell Date: Wed, 12 Oct 2016 11:26:48 -0400 Subject: [PATCH 4/6] Update README, create annoying deprecation message --- README.md | 43 ++++++++++++++++--------- lib/logstash-logger/device/kafka.rb | 11 +++++-- lib/logstash-logger/device/kafka_new.rb | 2 +- lib/logstash-logger/version.rb | 2 +- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index c09bb20..a99986f 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,9 @@ file_logger = LogStashLogger.new(type: :file, path: 'log/development.log', sync: unix_logger = LogStashLogger.new(type: :unix, path: '/tmp/sock') syslog_logger = LogStashLogger.new(type: :syslog) redis_logger = LogStashLogger.new(type: :redis) -kafka_logger = LogStashLogger.new(type: :kafka) +** NOTE: The current kafka class will be deprecated in a future version. + For now, migrate to using kafka_new** +kafka_logger = LogStashLogger.new(type: :kafka_new) stdout_logger = LogStashLogger.new(type: :stdout) stderr_logger = LogStashLogger.new(type: :stderr) io_logger = LogStashLogger.new(type: :io, io: io) @@ -454,26 +456,37 @@ config.logstash.port = 6379 #### Kafka -Add the poseidon gem to your Gemfile: +Add the ruby-kafka gem to your Gemfile: - gem 'poseidon' + gem 'ruby-kafka' ```ruby +## NOTE: A future version of this gem will remove the current +# implementation of the kafka client. This will be a breaking change. Use +# kafka_new to ensure forward compatibility # Required -config.logstash.type = :kafka -# Optional, will default to the 'logstash' topic -config.logstash.path = 'logstash' - -# Optional, will default to the 'logstash-logger' producer -config.logstash.producer = 'logstash-logger' - -# Optional, will default to localhost:9092 host/port -config.logstash.hosts = ['localhost:9092'] - -# Optional, will default to 1s backoff -config.logstash.backoff = 1 +config.logstash.type = :kafka_new +# Required +config.logstash.topic = 'logstash-topic' + +# Required, can be in one of two formats: +# String format (splits on single space): +config.logstash.brokers = 'localhost:9092 some-other-host.net:9300' +# Array format +config.logstash.brokers = %w(localhost:9092 some-other-host.net:9300) + +# Optional, defaults to 'ruby-kafka' +config.logstash.client_id = 'logstash-client-alpha' + +# Optional, transmit over TLS +# NOTE: either 0 or all 3 ssl_parameters must be provided for a +# successful connection. An exception will be raised if 1 or 2 params +# are povided +config.logstash.ssl_ca_cert: ENV['CLOUDKAFKA_CA'] +config.logstash.ssl_client_cert: ENV['CLOUDKAFKA_CERT'] +config.logstash.ssl_client_cert_key: ENV['CLOUDKAFKA_PRIVATE_KEY'] ``` #### File diff --git a/lib/logstash-logger/device/kafka.rb b/lib/logstash-logger/device/kafka.rb index 72e0a59..b39f4b2 100644 --- a/lib/logstash-logger/device/kafka.rb +++ b/lib/logstash-logger/device/kafka.rb @@ -12,10 +12,15 @@ class Kafka < Connectable attr_accessor :hosts, :topic, :producer, :backoff - def initialize(opts) + @@deprecation_message = <<-MSG + [DEPRECATION WARNING] + Poseidon client will be deprecated and requires different configuration + parameters (but they are similar). Update your Kafka configuration to + use :kafka_new to ensure forward compatibility + MSG - # TODO: improve this message and update README - warn "[DEPRECATED] Poseidon is deprecated, update client" + def initialize(opts) + warn @@deprecation_message super host = opts[:host] || DEFAULT_HOST port = opts[:port] || DEFAULT_PORT diff --git a/lib/logstash-logger/device/kafka_new.rb b/lib/logstash-logger/device/kafka_new.rb index 19d49d2..01da7f7 100644 --- a/lib/logstash-logger/device/kafka_new.rb +++ b/lib/logstash-logger/device/kafka_new.rb @@ -21,6 +21,7 @@ def valid? def invalid? !valid? end + private def cert_params_as_hash @@ -46,7 +47,6 @@ def valid_cert_params_length attr_reader :topic, :brokers, :cert_bundle, :kafka_tls_configurator, :client_id - def initialize(opts = {}, kafka_tls_configurator = TLSConfiguration) require 'ruby-kafka' super(opts) diff --git a/lib/logstash-logger/version.rb b/lib/logstash-logger/version.rb index 105bccc..aa86227 100644 --- a/lib/logstash-logger/version.rb +++ b/lib/logstash-logger/version.rb @@ -1,3 +1,3 @@ module LogStashLogger - VERSION = "0.19.2" + VERSION = "0.19.3" end From 169b6c30173da9bb2803289f223239e149957c6f Mon Sep 17 00:00:00 2001 From: Bryce McDonnell Date: Wed, 12 Oct 2016 11:56:07 -0400 Subject: [PATCH 5/6] a bit of cleanup after testing --- lib/logstash-logger/device.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logstash-logger/device.rb b/lib/logstash-logger/device.rb index 8cd81b8..798ebac 100644 --- a/lib/logstash-logger/device.rb +++ b/lib/logstash-logger/device.rb @@ -51,6 +51,7 @@ def self.device_klass_for(type) when :file then File when :redis then Redis when :kafka then Kafka + when :kafka_new then KafkaNew when :io then IO when :stdout then Stdout when :stderr then Stderr From f0a3c7c04022c284cc068d73165582299798b8b5 Mon Sep 17 00:00:00 2001 From: Bryce McDonnell Date: Thu, 13 Oct 2016 16:11:02 -0400 Subject: [PATCH 6/6] Style cleanup and double check topic is set --- lib/logstash-logger/device/kafka_new.rb | 14 ++++++++------ spec/device/kafka_new_spec.rb | 4 ---- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/logstash-logger/device/kafka_new.rb b/lib/logstash-logger/device/kafka_new.rb index 01da7f7..388b55b 100644 --- a/lib/logstash-logger/device/kafka_new.rb +++ b/lib/logstash-logger/device/kafka_new.rb @@ -62,15 +62,17 @@ def connection @connection ||= ::Kafka.new(kafka_client_connection_hash) end - def write_one(message, topic=@topic) + def write_one(message, topic=nil) + topic ||= @topic write_messages_to_broker_and_deliver do |producer| producer.produce(message, topic: topic) end end - def write_batch(messages, topic = @topic) + def write_batch(messages, topic=nil) + topic ||= @topic write_messages_to_broker_and_deliver do |producer| - messages.each {|message| producer.produce(message, topic: topic) } + messages.each {|msg| producer.produce(msg, topic: topic) } end end @@ -83,9 +85,9 @@ def write_messages_to_broker_and_deliver(&block) end def kafka_client_connection_hash - { seed_brokers: @brokers, - client_id: @client_id, - }.merge(@cert_bundle) + { seed_brokers: @brokers, + client_id: @client_id, + }.merge(@cert_bundle) end def raise_no_topic_set! diff --git a/spec/device/kafka_new_spec.rb b/spec/device/kafka_new_spec.rb index eea1bb3..7ea62d4 100644 --- a/spec/device/kafka_new_spec.rb +++ b/spec/device/kafka_new_spec.rb @@ -11,8 +11,6 @@ } end - - context "when complete params are passed in" do let(:instance) { described_class.new(complete_bundle) } @@ -53,7 +51,6 @@ expect(subject.invalid?).to be_falsey end end - end describe LogStashLogger::Device::KafkaNew do @@ -100,7 +97,6 @@ end end - context "Client Introspection" do # YUCK! ruby-kafka does not presently allow reading certain variables module ::Kafka