Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ECS support #36

Merged
merged 10 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.3.0
- Add ECS support. Add target option and event.original [#36](https://github.com/logstash-plugins/logstash-codec-avro/pull/36)

## 3.2.4
- [DOC] Add clarifications on partial deserialization [#35](https://github.com/logstash-plugins/logstash-codec-avro/pull/35)

Expand Down
39 changes: 39 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ This plugin is used to serialize Logstash events as
Avro datums, as well as deserializing Avro datums into
Logstash events.

[id="plugins-{type}s-{plugin}-ecs_metadata"]
==== Event Metadata and the Elastic Common Schema (ECS)

The plugin behaves the same regardless of ECS compatibility, except adding the original message to `[event][original]`.

==== Encoding

This codec is for serializing individual Logstash events
Expand Down Expand Up @@ -76,12 +81,25 @@ output {
[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-schema_uri>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-tag_on_failure>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No
|=======================================================================

&nbsp;

[id="plugins-{type}s-{plugin}-ecs_compatibility"]
===== `ecs_compatibility`

* Value type is <<string,string>>
* Supported values are:
** `disabled`: Avro data added at root level
** `v1`,`v8`: Elastic Common Schema compliant behavior (`[event][original]` is also added)

Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)].


[id="plugins-{type}s-{plugin}-schema_uri"]
===== `schema_uri`

Expand All @@ -104,4 +122,25 @@ example:

tag events with `_avroparsefailure` when decode fails

[id="plugins-{type}s-{plugin}-target"]
===== `target`

* Value type is <<string,string>>
* There is no default value for this setting.
* This is only relevant when decode data into an event

Define the target field for placing the values. If this setting is not
set, the Avro data will be stored at the root (top level) of the event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include this note from the commented code?
Note that the target is only relevant when decoding data into a new event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is important. many thanks

*Example*
[source,ruby]
----------------------------------
input {
kafka {
codec => avro {
schema_uri => "/tmp/schema.avsc"
target => "[document]"
}
}
}
----------------------------------
37 changes: 30 additions & 7 deletions lib/logstash/codecs/avro.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@
require "logstash/event"
require "logstash/timestamp"
require "logstash/util"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
require 'logstash/plugin_mixins/event_support/event_factory_adapter'

# Read serialized Avro records as Logstash events
#
# This plugin is used to serialize Logstash events as
# Avro datums, as well as deserializing Avro datums into
# This plugin is used to serialize Logstash events as
# Avro datums, as well as deserializing Avro datums into
# Logstash events.
#
# ==== Encoding
#
# This codec is for serializing individual Logstash events
# as Avro datums that are Avro binary blobs. It does not encode
#
# This codec is for serializing individual Logstash events
# as Avro datums that are Avro binary blobs. It does not encode
# Logstash events into an Avro file.
#
#
Expand Down Expand Up @@ -48,6 +52,12 @@
class LogStash::Codecs::Avro < LogStash::Codecs::Base
config_name "avro"

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck

extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

include LogStash::PluginMixins::EventSupport::EventFactoryAdapter

# schema path to fetch the schema from.
# This can be a 'http' or 'file' scheme URI
Expand All @@ -60,11 +70,22 @@ class LogStash::Codecs::Avro < LogStash::Codecs::Base
# tag events with `_avroparsefailure` when decode fails
config :tag_on_failure, :validate => :boolean, :default => false

# Defines a target field for placing decoded fields.
# If this setting is omitted, data gets stored at the root (top level) of the event.
#
# NOTE: the target is only relevant while decoding data into a new event.
config :target, :validate => :field_reference

def open_and_read(uri_string)
open(uri_string).read
end

public
def initialize(*params)
super
@original_field = ecs_select[disabled: nil, v1: '[event][original]']
end

def register
@schema = Avro::Schema.parse(open_and_read(schema_uri))
end
Expand All @@ -74,11 +95,13 @@ def decode(data)
datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data)
decoder = Avro::IO::BinaryDecoder.new(datum)
datum_reader = Avro::IO::DatumReader.new(@schema)
yield LogStash::Event.new(datum_reader.read(decoder))
event = targeted_event_factory.new_event(datum_reader.read(decoder))
event.set(@original_field, data.dup.freeze) if @original_field
yield event
rescue => e
if tag_on_failure
@logger.error("Avro parse error, original data now in message field", :error => e)
yield LogStash::Event.new("message" => data, "tags" => ["_avroparsefailure"])
yield event_factory.new_event("message" => data, "tags" => ["_avroparsefailure"])
else
raise e
end
Expand Down
7 changes: 5 additions & 2 deletions logstash-codec-avro.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-avro'
s.version = '3.2.4'
s.version = '3.3.0'
s.platform = 'java'
s.licenses = ['Apache-2.0']
s.summary = "Reads serialized Avro records as Logstash events"
Expand All @@ -22,10 +22,13 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"

s.add_runtime_dependency "avro" #(Apache 2.0 license)
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.3'
s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'insist'

end

175 changes: 106 additions & 69 deletions spec/codecs/avro_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,80 +5,115 @@
require 'base64'
require 'logstash/codecs/avro'
require 'logstash/event'
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'

describe LogStash::Codecs::Avro do
describe LogStash::Codecs::Avro, :ecs_compatibility_support, :aggregate_failures do

context "non binary data" do
let (:avro_config) {{ 'schema_uri' => '
ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select|
before(:each) do
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
end

context "non binary data" do
let (:avro_config) {{ 'schema_uri' => '
{"type": "record", "name": "Test",
"fields": [{"name": "foo", "type": ["null", "string"]},
{"name": "bar", "type": "int"}]}' }}
let (:test_event) {LogStash::Event.new({ "foo" => "hello", "bar" => 10 })}
let (:test_event_hash) { { "foo" => "hello", "bar" => 10 } }
let (:test_event) {LogStash::Event.new(test_event_hash)}

subject do
allow_any_instance_of(LogStash::Codecs::Avro).to \
subject do
allow_any_instance_of(LogStash::Codecs::Avro).to \
receive(:open_and_read).and_return(avro_config['schema_uri'])
next LogStash::Codecs::Avro.new(avro_config)
end
next LogStash::Codecs::Avro.new(avro_config)
end

context "#decode" do
it "should return an LogStash::Event from raw and base64 encoded avro data" do
schema = Avro::Schema.parse(avro_config['schema_uri'])
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(test_event.to_hash, encoder)

subject.decode(Base64.strict_encode64(buffer.string)) do |event|
insist {event.is_a? LogStash::Event}
insist {event.get("foo")} == test_event.get("foo")
insist {event.get("bar")} == test_event.get("bar")
end
subject.decode(buffer.string) do |event|
insist {event.is_a? LogStash::Event}
insist {event.get("foo")} == test_event.get("foo")
insist {event.get("bar")} == test_event.get("bar")
context "#decode" do
it "should return an LogStash::Event from raw and base64 encoded avro data" do
schema = Avro::Schema.parse(avro_config['schema_uri'])
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(test_event.to_hash, encoder)

subject.decode(Base64.strict_encode64(buffer.string)) do |event|
insist {event.is_a? LogStash::Event}
insist {event.get("foo")} == test_event.get("foo")
insist {event.get("bar")} == test_event.get("bar")
expect(event.get('[event][original]')).to eq(Base64.strict_encode64(buffer.string)) if ecs_compatibility != :disabled
end
subject.decode(buffer.string) do |event|
insist {event.is_a? LogStash::Event}
insist {event.get("foo")} == test_event.get("foo")
insist {event.get("bar")} == test_event.get("bar")
expect(event.get('[event][original]')).to eq(buffer.string) if ecs_compatibility != :disabled
end
end
end

it "should throw exception if decoding fails" do
expect {subject.decode("not avro") {|_| }}.to raise_error NoMethodError
it "should throw exception if decoding fails" do
expect {subject.decode("not avro") {|_| }}.to raise_error NoMethodError
end
end
end

context "#decode with tag_on_failure" do
let (:avro_config) {super.merge("tag_on_failure" => true)}
context "#decode with tag_on_failure" do
let (:avro_config) {{ 'schema_uri' => '
{"type": "record", "name": "Test",
"fields": [{"name": "foo", "type": ["null", "string"]},
{"name": "bar", "type": "int"}]}',
'tag_on_failure' => true}}

it "should tag event on failure" do
subject.decode("not avro") do |event|
insist {event.is_a? LogStash::Event}
insist {event.get("tags")} == ["_avroparsefailure"]
it "should tag event on failure" do
subject.decode("not avro") do |event|
insist {event.is_a? LogStash::Event}
insist {event.get("tags")} == ["_avroparsefailure"]
end
end
end
end

context "#encode" do
it "should return avro data from a LogStash::Event" do
got_event = false
subject.on_event do |event, data|
context "#decode with target" do
let(:avro_target) { "avro_target" }
let (:avro_config) {{ 'schema_uri' => '
{"type": "record", "name": "Test",
"fields": [{"name": "foo", "type": ["null", "string"]},
{"name": "bar", "type": "int"}]}',
'target' => avro_target}}

it "should return an LogStash::Event with content in target" do
schema = Avro::Schema.parse(avro_config['schema_uri'])
datum = StringIO.new(Base64.strict_decode64(data))
decoder = Avro::IO::BinaryDecoder.new(datum)
datum_reader = Avro::IO::DatumReader.new(schema)
record = datum_reader.read(decoder)

insist {record["foo"]} == test_event.get("foo")
insist {record["bar"]} == test_event.get("bar")
insist {event.is_a? LogStash::Event}
got_event = true
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(test_event.to_hash, encoder)

subject.decode(buffer.string) do |event|
insist {event.get("[#{avro_target}][foo]")} == test_event.get("foo")
insist {event.get("[#{avro_target}][bar]")} == test_event.get("bar")
end
end
subject.encode(test_event)
insist {got_event}
end

context "binary data" do
context "#encode" do
it "should return avro data from a LogStash::Event" do
got_event = false
subject.on_event do |event, data|
schema = Avro::Schema.parse(avro_config['schema_uri'])
datum = StringIO.new(Base64.strict_decode64(data))
decoder = Avro::IO::BinaryDecoder.new(datum)
datum_reader = Avro::IO::DatumReader.new(schema)
record = datum_reader.read(decoder)

insist {record["foo"]} == test_event.get("foo")
insist {record["bar"]} == test_event.get("bar")
insist {event.is_a? LogStash::Event}
got_event = true
end
subject.encode(test_event)
insist {got_event}
end

context "binary data" do

let (:avro_config) {{ 'schema_uri' => '{"namespace": "com.systems.test.data",
let (:avro_config) {{ 'schema_uri' => '{"namespace": "com.systems.test.data",
"type": "record",
"name": "TestRecord",
"fields": [
Expand All @@ -87,29 +122,31 @@
{"name": "latitude", "type": ["double", "null"]}
]
}' }}
let (:test_event) {LogStash::Event.new({ "name" => "foo", "longitude" => 21.01234.to_f, "latitude" => 111.0123.to_f })}
let (:test_event) {LogStash::Event.new({ "name" => "foo", "longitude" => 21.01234.to_f, "latitude" => 111.0123.to_f })}

subject do
allow_any_instance_of(LogStash::Codecs::Avro).to \
subject do
allow_any_instance_of(LogStash::Codecs::Avro).to \
receive(:open_and_read).and_return(avro_config['schema_uri'])
next LogStash::Codecs::Avro.new(avro_config)
end
next LogStash::Codecs::Avro.new(avro_config)
end

it "should correctly encode binary data" do
schema = Avro::Schema.parse(avro_config['schema_uri'])
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(test_event.to_hash, encoder)
it "should correctly encode binary data" do
schema = Avro::Schema.parse(avro_config['schema_uri'])
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(test_event.to_hash, encoder)

subject.decode(Base64.strict_encode64(buffer.string)) do |event|
insist {event.is_a? LogStash::Event}
insist {event.get("name")} == test_event.get("name")
insist {event.get("longitude")} == test_event.get("longitude")
insist {event.get("latitude")} == test_event.get("latitude")
subject.decode(Base64.strict_encode64(buffer.string)) do |event|
insist {event.is_a? LogStash::Event}
insist {event.get("name")} == test_event.get("name")
insist {event.get("longitude")} == test_event.get("longitude")
insist {event.get("latitude")} == test_event.get("latitude")
end
end
end
end

end
end
end