Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add encoding selector option (base64 or binary) #39

Merged
merged 7 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.4.0
- Add `encoding` option to select the encoding of Avro payload, could be `binary` or `base64` [#39](https://github.com/logstash-plugins/logstash-codec-avro/pull/39)

## 3.3.1
- Pin avro gem to 1.10.x, as 1.11+ requires ruby 2.6+ [#37](https://github.com/logstash-plugins/logstash-codec-avro/pull/37)

Expand Down
12 changes: 12 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ output {
[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-encoding>> | <<string,string>>, one of `["binary", "base64"]`|No
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-schema_uri>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-tag_on_failure>> |<<boolean,boolean>>|No
Expand All @@ -99,6 +100,17 @@ output {

Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)].

[id="plugins-{type}s-{plugin}-encoding"]
===== `encoding`

* Value can be any of: `binary`, `base64`
* Default value is `base64`

Set encoding for Avro's payload.
Use `base64` (default) to indicate that this codec sends or expects to receive base64-encoded bytes.

Set this option to `binary` to indicate that this codec sends or expects to receive binary Avro data.


[id="plugins-{type}s-{plugin}-schema_uri"]
===== `schema_uri`
Expand Down
20 changes: 18 additions & 2 deletions lib/logstash/codecs/avro.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ class LogStash::Codecs::Avro < LogStash::Codecs::Base

include LogStash::PluginMixins::EventSupport::EventFactoryAdapter

BINARY_ENCODING = "binary".freeze
BASE64_ENCODING = "base64".freeze

# Set encoding for Avro's payload.
# Use `base64` (default) encoding to convert the raw binary bytes to a `base64` encoded string.
# Set this option to `binary` to use the plain binary bytes.
config :encoding, :validate => [BINARY_ENCODING, BASE64_ENCODING], :default => BASE64_ENCODING

# schema path to fetch the schema from.
# This can be a 'http' or 'file' scheme URI
# example:
Expand Down Expand Up @@ -92,7 +100,11 @@ def register

public
def decode(data)
datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data)
if encoding == BASE64_ENCODING
datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data)
else
datum = StringIO.new(data)
end
decoder = Avro::IO::BinaryDecoder.new(datum)
datum_reader = Avro::IO::DatumReader.new(@schema)
event = targeted_event_factory.new_event(datum_reader.read(decoder))
Expand All @@ -113,6 +125,10 @@ def encode(event)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(event.to_hash, encoder)
@on_event.call(event, Base64.strict_encode64(buffer.string))
if encoding == BASE64_ENCODING
@on_event.call(event, Base64.strict_encode64(buffer.string))
else
@on_event.call(event, buffer.string)
end
end
end
2 changes: 1 addition & 1 deletion logstash-codec-avro.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-avro'
s.version = '3.3.1'
s.version = '3.4.0'
s.platform = 'java'
s.licenses = ['Apache-2.0']
s.summary = "Reads serialized Avro records as Logstash events"
Expand Down
51 changes: 51 additions & 0 deletions spec/codecs/avro_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@
end
end

context "with binary encoding" do
let (:avro_config) { super().merge('encoding' => 'binary') }

it "should return an LogStash::Event from raw and base64 encoded avro data" do
schema = Avro::Schema.parse(avro_config['schema_uri'])
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(test_event.to_hash, encoder)

subject.decode(buffer.string) do |event|
expect(event).to be_a_kind_of(LogStash::Event)
expect(event.get("foo")).to eq(test_event.get("foo"))
expect(event.get("bar")).to eq(test_event.get("bar"))
expect(event.get('[event][original]')).to eq(buffer.string) if ecs_compatibility != :disabled
end
end

it "should raise an error if base64 encoded data is provided" do
schema = Avro::Schema.parse(avro_config['schema_uri'])
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(test_event.to_hash, encoder)

expect {subject.decode(Base64.strict_encode64(buffer.string))}.to raise_error
end
end

context "#decode with tag_on_failure" do
let (:avro_config) {{ 'schema_uri' => '
{"type": "record", "name": "Test",
Expand Down Expand Up @@ -111,6 +140,28 @@
insist {got_event}
end

context "with binary encoding" do
let (:avro_config) { super().merge('encoding' => 'binary') }

it "should return avro data from a LogStash::Event not base64 encoded" do
got_event = false
subject.on_event do |event, data|
schema = Avro::Schema.parse(avro_config['schema_uri'])
datum = StringIO.new(data)
decoder = Avro::IO::BinaryDecoder.new(datum)
datum_reader = Avro::IO::DatumReader.new(schema)
record = datum_reader.read(decoder)

expect(event).to be_a_kind_of(LogStash::Event)
expect(event.get("foo")).to eq(test_event.get("foo"))
expect(event.get("bar")).to eq(test_event.get("bar"))
got_event = true
end
subject.encode(test_event)
expect(got_event).to be true
end
end

context "binary data" do

let (:avro_config) {{ 'schema_uri' => '{"namespace": "com.systems.test.data",
Expand Down