diff --git a/lib/logstash/codecs/avro.rb b/lib/logstash/codecs/avro.rb index 39544e9..ebd30f1 100644 --- a/lib/logstash/codecs/avro.rb +++ b/lib/logstash/codecs/avro.rb @@ -60,6 +60,8 @@ class LogStash::Codecs::Avro < LogStash::Codecs::Base # tag events with `_avroparsefailure` when decode fails config :tag_on_failure, :validate => :boolean, :default => false + config :encode_as_base64, :validate => :boolean, :default => true + def open_and_read(uri_string) open(uri_string).read end @@ -67,6 +69,20 @@ def open_and_read(uri_string) public def register @schema = Avro::Schema.parse(open_and_read(schema_uri)) + if encode_as_base64 + define_singleton_method :encode do |event| + buffer = StringIO.new + write_event(event, buffer) + @on_event.call(event, Base64.strict_encode64(buffer.string)) + end + else + define_singleton_method :encode do |event| + buffer = StringIO.new + buffer.set_encoding("ASCII-8BIT") + write_event(event, buffer) + @on_event.call(event, buffer.string) + end + end end public @@ -84,12 +100,8 @@ def decode(data) end end - public - def encode(event) - dw = Avro::IO::DatumWriter.new(@schema) - buffer = StringIO.new - encoder = Avro::IO::BinaryEncoder.new(buffer) - dw.write(event.to_hash, encoder) - @on_event.call(event, Base64.strict_encode64(buffer.string)) + private + def write_event(event, buffer) + Avro::IO::DatumWriter.new(@schema).write(event.to_hash, Avro::IO::BinaryEncoder.new(buffer)) end end