diff --git a/CHANGELOG.md b/CHANGELOG.md index 828f261..39bc244 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.4.0 + - Add `encoding` option to select the encoding of Avro payload, could be `binary` or `base64` [#39](https://github.com/logstash-plugins/logstash-codec-avro/pull/39) + ## 3.3.1 - Pin avro gem to 1.10.x, as 1.11+ requires ruby 2.6+ [#37](https://github.com/logstash-plugins/logstash-codec-avro/pull/37) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 51df2c4..64caff5 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -81,6 +81,7 @@ output { [cols="<,<,<",options="header",] |======================================================================= |Setting |Input type|Required +| <> | <>, one of `["binary", "base64"]`|No | <> | <>|No | <> |<>|Yes | <> |<>|No @@ -99,6 +100,17 @@ output { Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)]. +[id="plugins-{type}s-{plugin}-encoding"] +===== `encoding` + +* Value can be any of: `binary`, `base64` +* Default value is `base64` + +Set encoding for Avro's payload. +Use `base64` (default) to indicate that this codec sends or expects to receive base64-encoded bytes. + +Set this option to `binary` to indicate that this codec sends or expects to receive binary Avro data. + [id="plugins-{type}s-{plugin}-schema_uri"] ===== `schema_uri` diff --git a/lib/logstash/codecs/avro.rb b/lib/logstash/codecs/avro.rb index 09a5528..bf65e34 100644 --- a/lib/logstash/codecs/avro.rb +++ b/lib/logstash/codecs/avro.rb @@ -59,6 +59,14 @@ class LogStash::Codecs::Avro < LogStash::Codecs::Base include LogStash::PluginMixins::EventSupport::EventFactoryAdapter + BINARY_ENCODING = "binary".freeze + BASE64_ENCODING = "base64".freeze + + # Set encoding for Avro's payload. + # Use `base64` (default) encoding to convert the raw binary bytes to a `base64` encoded string. + # Set this option to `binary` to use the plain binary bytes. + config :encoding, :validate => [BINARY_ENCODING, BASE64_ENCODING], :default => BASE64_ENCODING + # schema path to fetch the schema from. # This can be a 'http' or 'file' scheme URI # example: @@ -92,7 +100,11 @@ def register public def decode(data) - datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data) + if encoding == BASE64_ENCODING + datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data) + else + datum = StringIO.new(data) + end decoder = Avro::IO::BinaryDecoder.new(datum) datum_reader = Avro::IO::DatumReader.new(@schema) event = targeted_event_factory.new_event(datum_reader.read(decoder)) @@ -113,6 +125,10 @@ def encode(event) buffer = StringIO.new encoder = Avro::IO::BinaryEncoder.new(buffer) dw.write(event.to_hash, encoder) - @on_event.call(event, Base64.strict_encode64(buffer.string)) + if encoding == BASE64_ENCODING + @on_event.call(event, Base64.strict_encode64(buffer.string)) + else + @on_event.call(event, buffer.string) + end end end diff --git a/logstash-codec-avro.gemspec b/logstash-codec-avro.gemspec index cd333a7..ef79e76 100644 --- a/logstash-codec-avro.gemspec +++ b/logstash-codec-avro.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-codec-avro' - s.version = '3.3.1' + s.version = '3.4.0' s.platform = 'java' s.licenses = ['Apache-2.0'] s.summary = "Reads serialized Avro records as Logstash events" diff --git a/spec/codecs/avro_spec.rb b/spec/codecs/avro_spec.rb index 5c62686..a584f80 100644 --- a/spec/codecs/avro_spec.rb +++ b/spec/codecs/avro_spec.rb @@ -55,6 +55,35 @@ end end + context "with binary encoding" do + let (:avro_config) { super().merge('encoding' => 'binary') } + + it "should return an LogStash::Event from raw and base64 encoded avro data" do + schema = Avro::Schema.parse(avro_config['schema_uri']) + dw = Avro::IO::DatumWriter.new(schema) + buffer = StringIO.new + encoder = Avro::IO::BinaryEncoder.new(buffer) + dw.write(test_event.to_hash, encoder) + + subject.decode(buffer.string) do |event| + expect(event).to be_a_kind_of(LogStash::Event) + expect(event.get("foo")).to eq(test_event.get("foo")) + expect(event.get("bar")).to eq(test_event.get("bar")) + expect(event.get('[event][original]')).to eq(buffer.string) if ecs_compatibility != :disabled + end + end + + it "should raise an error if base64 encoded data is provided" do + schema = Avro::Schema.parse(avro_config['schema_uri']) + dw = Avro::IO::DatumWriter.new(schema) + buffer = StringIO.new + encoder = Avro::IO::BinaryEncoder.new(buffer) + dw.write(test_event.to_hash, encoder) + + expect {subject.decode(Base64.strict_encode64(buffer.string))}.to raise_error + end + end + context "#decode with tag_on_failure" do let (:avro_config) {{ 'schema_uri' => ' {"type": "record", "name": "Test", @@ -111,6 +140,28 @@ insist {got_event} end + context "with binary encoding" do + let (:avro_config) { super().merge('encoding' => 'binary') } + + it "should return avro data from a LogStash::Event not base64 encoded" do + got_event = false + subject.on_event do |event, data| + schema = Avro::Schema.parse(avro_config['schema_uri']) + datum = StringIO.new(data) + decoder = Avro::IO::BinaryDecoder.new(datum) + datum_reader = Avro::IO::DatumReader.new(schema) + record = datum_reader.read(decoder) + + expect(event).to be_a_kind_of(LogStash::Event) + expect(event.get("foo")).to eq(test_event.get("foo")) + expect(event.get("bar")).to eq(test_event.get("bar")) + got_event = true + end + subject.encode(test_event) + expect(got_event).to be true + end + end + context "binary data" do let (:avro_config) {{ 'schema_uri' => '{"namespace": "com.systems.test.data",