diff --git a/Rakefile b/Rakefile index 714735e..cae5884 100644 --- a/Rakefile +++ b/Rakefile @@ -1,12 +1,11 @@ -require "gem_publisher" +require 'gem_publisher' -desc "Publish gem to RubyGems.org" +desc 'Publish gem to RubyGems.org' task :publish_gem do |t| - gem = GemPublisher.publish_if_updated("logstash-output-kafka.gemspec", :rubygems) + gem = GemPublisher.publish_if_updated('logstash-output-kafka.gemspec', :rubygems) puts "Published #{gem}" if gem end task :default do - system("rake -T") + system('rake -T') end - diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 1f9949e..d9cfc0b 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -101,40 +101,39 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :send_buffer_bytes, :validate => :number, :default => 100 * 1024 # The client id is a user-specified string sent in each request to help trace calls. It should # logically identify the application making the request. - config :client_id, :validate => :string, :default => "" + config :client_id, :validate => :string, :default => '' public def register require 'jruby-kafka' options = { - :topic_id => @topic_id, - :broker_list => @broker_list, - :compression_codec => @compression_codec, - :compressed_topics => @compressed_topics, - :request_required_acks => @request_required_acks, - :serializer_class => @serializer_class, - :partitioner_class => @partitioner_class, - :request_timeout_ms => @request_timeout_ms, - :producer_type => @producer_type, - :key_serializer_class => @key_serializer_class, - :message_send_max_retries => @message_send_max_retries, - :retry_backoff_ms => @retry_backoff_ms, - :topic_metadata_refresh_interval_ms => @topic_metadata_refresh_interval_ms, - :queue_buffering_max_ms => @queue_buffering_max_ms, - :queue_buffering_max_messages => @queue_buffering_max_messages, - :queue_enqueue_timeout_ms => @queue_enqueue_timeout_ms, - :batch_num_messages => @batch_num_messages, - :send_buffer_bytes => @send_buffer_bytes, - :client_id => @client_id + :broker_list => @broker_list, + :compression_codec => @compression_codec, + :compressed_topics => @compressed_topics, + :request_required_acks => @request_required_acks, + :serializer_class => @serializer_class, + :partitioner_class => @partitioner_class, + :request_timeout_ms => @request_timeout_ms, + :producer_type => @producer_type, + :key_serializer_class => @key_serializer_class, + :message_send_max_retries => @message_send_max_retries, + :retry_backoff_ms => @retry_backoff_ms, + :topic_metadata_refresh_interval_ms => @topic_metadata_refresh_interval_ms, + :queue_buffering_max_ms => @queue_buffering_max_ms, + :queue_buffering_max_messages => @queue_buffering_max_messages, + :queue_enqueue_timeout_ms => @queue_enqueue_timeout_ms, + :batch_num_messages => @batch_num_messages, + :send_buffer_bytes => @send_buffer_bytes, + :client_id => @client_id } @producer = Kafka::Producer.new(options) - @producer.connect() + @producer.connect @logger.info('Registering kafka producer', :topic_id => @topic_id, :broker_list => @broker_list) @codec.on_event do |event| begin - @producer.sendMsg(@topic_id,nil,event) + @producer.send_msg(@topic_id,nil,event) rescue LogStash::ShutdownSignal @logger.info('Kafka producer got shutdown signal') rescue => e @@ -153,4 +152,7 @@ def receive(event) @codec.encode(event) end + def teardown + @producer.close + end end #class LogStash::Outputs::Kafka diff --git a/logstash-output-kafka.gemspec b/logstash-output-kafka.gemspec index df10be1..10447b4 100644 --- a/logstash-output-kafka.gemspec +++ b/logstash-output-kafka.gemspec @@ -3,12 +3,12 @@ Gem::Specification.new do |s| s.name = 'logstash-output-kafka' s.version = '0.1.0' s.licenses = ['Apache License (2.0)'] - s.summary = "Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker" - s.description = "Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker" - s.authors = ["Elasticsearch"] + s.summary = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker' + s.description = 'Output events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker' + s.authors = ['Elasticsearch'] s.email = 'richard.pijnenburg@elasticsearch.com' - s.homepage = "http://logstash.net/" - s.require_paths = ["lib"] + s.homepage = 'http://logstash.net/' + s.require_paths = ['lib'] # Files s.files = `git ls-files`.split($\) @@ -17,7 +17,7 @@ Gem::Specification.new do |s| s.test_files = s.files.grep(%r{^(test|spec|features)/}) # Special flag to let us know this is actually a logstash plugin - s.metadata = { "logstash_plugin" => "true", "group" => "output" } + s.metadata = { 'logstash_plugin' => 'true', 'group' => 'output'} # Jar dependencies s.requirements << "jar 'org.apache.kafka:kafka_2.10', '0.8.1.1'" @@ -26,7 +26,6 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0' s.add_runtime_dependency 'jar-dependencies', ['~> 0.1.0'] - s.add_runtime_dependency 'jruby-kafka', ['>=0.1.0'] + s.add_runtime_dependency 'jruby-kafka', ['>=0.2.1'] end - diff --git a/spec/outputs/kafka.rb b/spec/outputs/kafka.rb index 0a87b97..06f3daa 100644 --- a/spec/outputs/kafka.rb +++ b/spec/outputs/kafka.rb @@ -3,34 +3,34 @@ require 'rspec' require 'insist' require 'logstash/namespace' -require "logstash/timestamp" +require 'logstash/timestamp' require 'logstash/outputs/kafka' describe LogStash::Outputs::Kafka do - let (:kafka_config) {{"topic_id" => "test"}} + let (:kafka_config) {{:topic_id => 'test'}} it 'should populate kafka config with default values' do kafka = LogStash::Outputs::Kafka.new(kafka_config) - insist {kafka.broker_list} == "localhost:9092" - insist {kafka.topic_id} == "test" - insist {kafka.compression_codec} == "none" - insist {kafka.serializer_class} == "kafka.serializer.StringEncoder" - insist {kafka.partitioner_class} == "kafka.producer.DefaultPartitioner" - insist {kafka.producer_type} == "sync" + insist {kafka.broker_list} == 'localhost:9092' + insist {kafka.topic_id} == 'test' + insist {kafka.compression_codec} == 'none' + insist {kafka.serializer_class} == 'kafka.serializer.StringEncoder' + insist {kafka.partitioner_class} == 'kafka.producer.DefaultPartitioner' + insist {kafka.producer_type} == 'sync' end - it "should register and load kafka jars without errors" do + it 'should register and load kafka jars without errors' do kafka = LogStash::Outputs::Kafka.new(kafka_config) kafka.register end - it "should send logstash event to kafka broker" do + it 'should send logstash event to kafka broker' do timestamp = LogStash::Timestamp.now expect_any_instance_of(Kafka::Producer) - .to receive(:sendMsg) - .with("test", nil, "{\"message\":\"hello world\",\"host\":\"test\",\"@timestamp\":\"#{timestamp}\",\"@version\":\"1\"}") - e = LogStash::Event.new({"message" => "hello world", "host" => "test", "@timestamp" => timestamp}) + .to receive(:send_msg) + .with('test', nil, "{\"message\":\"hello world\",\"host\":\"test\",\"@timestamp\":\"#{timestamp}\",\"@version\":\"1\"}") + e = LogStash::Event.new({:message => 'hello world', :host => 'test', '@timestamp' => timestamp}) kafka = LogStash::Outputs::Kafka.new(kafka_config) kafka.register kafka.receive(e)