-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add ECS support #36
add ECS support #36
Changes from 8 commits
f9e849c
8b410ea
6c936f9
79b40d4
c28264c
678c579
1dbcb9a
18e67e8
5f92fc8
84f111b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -26,6 +26,11 @@ This plugin is used to serialize Logstash events as | |||||||||
Avro datums, as well as deserializing Avro datums into | ||||||||||
Logstash events. | ||||||||||
|
||||||||||
[id="plugins-{type}s-{plugin}-ecs_metadata"] | ||||||||||
==== Event Metadata and the Elastic Common Schema (ECS) | ||||||||||
|
||||||||||
The plugin behaves the same regardless of ECS compatibility, except adding the original message to `[event][original]`. | ||||||||||
|
||||||||||
==== Encoding | ||||||||||
|
||||||||||
This codec is for serializing individual Logstash events | ||||||||||
|
@@ -76,12 +81,25 @@ output { | |||||||||
[cols="<,<,<",options="header",] | ||||||||||
|======================================================================= | ||||||||||
|Setting |Input type|Required | ||||||||||
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No | ||||||||||
| <<plugins-{type}s-{plugin}-schema_uri>> |<<string,string>>|Yes | ||||||||||
| <<plugins-{type}s-{plugin}-tag_on_failure>> |<<boolean,boolean>>|No | ||||||||||
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No | ||||||||||
|======================================================================= | ||||||||||
|
||||||||||
| ||||||||||
|
||||||||||
[id="plugins-{type}s-{plugin}-ecs_compatibility"] | ||||||||||
===== `ecs_compatibility` | ||||||||||
|
||||||||||
* Value type is <<string,string>> | ||||||||||
* Supported values are: | ||||||||||
** `disabled`: Avro data added at root level | ||||||||||
** `v1`,`v8`: Elastic Common Schema compliant behavior (`[event][original]` is also added) | ||||||||||
|
||||||||||
Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)]. | ||||||||||
|
||||||||||
|
||||||||||
[id="plugins-{type}s-{plugin}-schema_uri"] | ||||||||||
===== `schema_uri` | ||||||||||
|
||||||||||
|
@@ -104,4 +122,24 @@ example: | |||||||||
|
||||||||||
tag events with `_avroparsefailure` when decode fails | ||||||||||
|
||||||||||
[id="plugins-{type}s-{plugin}-target"] | ||||||||||
===== `target` | ||||||||||
|
||||||||||
* Value type is <<string,string>> | ||||||||||
* There is no default value for this setting. | ||||||||||
|
||||||||||
Define the target field for placing the row values. If this setting is not | ||||||||||
set, the Avro data will be stored at the root (top level) of the event. | ||||||||||
|
||||||||||
For example, if you want data to be put under the `document` field: | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
[source,ruby] | ||||||||||
---------------------------------- | ||||||||||
input { | ||||||||||
kafka { | ||||||||||
codec => avro { | ||||||||||
schema_uri => "/tmp/schema.avsc" | ||||||||||
target => "[document]" | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
---------------------------------- | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,17 +6,22 @@ | |
require "logstash/event" | ||
require "logstash/timestamp" | ||
require "logstash/util" | ||
require 'logstash/plugin_mixins/ecs_compatibility_support' | ||
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check' | ||
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter' | ||
require 'logstash/plugin_mixins/event_support/event_factory_adapter' | ||
require 'logstash/plugin_mixins/event_support/from_json_helper' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✂️ we do need the |
||
|
||
# Read serialized Avro records as Logstash events | ||
# | ||
# This plugin is used to serialize Logstash events as | ||
# Avro datums, as well as deserializing Avro datums into | ||
# This plugin is used to serialize Logstash events as | ||
# Avro datums, as well as deserializing Avro datums into | ||
# Logstash events. | ||
# | ||
# ==== Encoding | ||
# | ||
# This codec is for serializing individual Logstash events | ||
# as Avro datums that are Avro binary blobs. It does not encode | ||
# | ||
# This codec is for serializing individual Logstash events | ||
# as Avro datums that are Avro binary blobs. It does not encode | ||
# Logstash events into an Avro file. | ||
# | ||
# | ||
|
@@ -48,6 +53,12 @@ | |
class LogStash::Codecs::Avro < LogStash::Codecs::Base | ||
config_name "avro" | ||
|
||
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) | ||
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck | ||
|
||
extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter | ||
|
||
include LogStash::PluginMixins::EventSupport::EventFactoryAdapter | ||
|
||
# schema path to fetch the schema from. | ||
# This can be a 'http' or 'file' scheme URI | ||
|
@@ -60,11 +71,22 @@ class LogStash::Codecs::Avro < LogStash::Codecs::Base | |
# tag events with `_avroparsefailure` when decode fails | ||
config :tag_on_failure, :validate => :boolean, :default => false | ||
|
||
# Defines a target field for placing decoded fields. | ||
# If this setting is omitted, data gets stored at the root (top level) of the event. | ||
# | ||
# NOTE: the target is only relevant while decoding data into a new event. | ||
config :target, :validate => :field_reference | ||
|
||
def open_and_read(uri_string) | ||
open(uri_string).read | ||
end | ||
|
||
public | ||
def initialize(*params) | ||
super | ||
@original_field = ecs_select[disabled: nil, v1: '[event][original]'] | ||
end | ||
|
||
def register | ||
@schema = Avro::Schema.parse(open_and_read(schema_uri)) | ||
end | ||
|
@@ -74,11 +96,13 @@ def decode(data) | |
datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data) | ||
decoder = Avro::IO::BinaryDecoder.new(datum) | ||
datum_reader = Avro::IO::DatumReader.new(@schema) | ||
yield LogStash::Event.new(datum_reader.read(decoder)) | ||
event = targeted_event_factory.new_event(datum_reader.read(decoder)) | ||
event.set(@original_field, data.dup.freeze) if @original_field | ||
yield event | ||
rescue => e | ||
if tag_on_failure | ||
@logger.error("Avro parse error, original data now in message field", :error => e) | ||
yield LogStash::Event.new("message" => data, "tags" => ["_avroparsefailure"]) | ||
yield event_factory.new_event("message" => data, "tags" => ["_avroparsefailure"]) | ||
else | ||
raise e | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we include this note from the commented code?
Note that the target is only relevant when decoding data into a new event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is important. many thanks