Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ECS support and target option #20

Merged
merged 4 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.1.0
- Add ECS support. Add `target` option [#20](https://github.com/logstash-plugins/logstash-codec-es_bulk/pull/20)

## 3.0.8
- Fixed deeplink to Elasticsearch Reference [#18](https://github.com/logstash-plugins/logstash-codec-es_bulk/pull/18)

Expand Down
45 changes: 45 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,49 @@ Values in `additional_codecs` are prioritized over those specified in the
`codec` option. That is, the default `codec` is applied only if no codec
for the request's content-type is found in the `additional_codecs` setting.

[id="plugins-{type}s-{plugin}-ecs_metadata"]
==== Event Metadata and the Elastic Common Schema (ECS)

When ECS compatibility is disabled, the metadata is stored in the `[@metadata]` field.
When ECS is enabled, the metadata is stored in the `[@metadata][codec][es_bulk]` field.

[id="plugins-{type}s-{plugin}-options"]
==== ES Bulk Codec Configuration Options

[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No
|=======================================================================

[id="plugins-{type}s-{plugin}-ecs_compatibility"]
===== `ecs_compatibility`

* Value type is <<string,string>>
* Supported values are:
** `disabled`: unstructured metadata added at @metadata
** `v1`: uses `[@metadata][codec][es_bulk]` fields

Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)].

[id="plugins-{type}s-{plugin}-target"]
===== `target`

* Value type is <<string,string>>
* There is no default value for this setting.

Define the target field for placing the values. If this setting is not
set, the data will be stored at the root (top level) of the event.

For example, if you want data to be put under the `document` field:
[source,ruby]
----------------------------------
input {
kafka {
codec => es_bulk {
target => "[document]"
}
}
}
----------------------------------
30 changes: 26 additions & 4 deletions lib/logstash/codecs/es_bulk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
require "logstash/codecs/base"
require "logstash/codecs/line"
require "logstash/json"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
require 'logstash/plugin_mixins/event_support/event_factory_adapter'
require 'logstash/plugin_mixins/event_support/from_json_helper'

# This codec will decode the http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk format]
# into individual events, plus metadata into the `@metadata` field.
Expand All @@ -11,13 +16,30 @@
class LogStash::Codecs::ESBulk < LogStash::Codecs::Base
config_name "es_bulk"

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck

extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

include LogStash::PluginMixins::EventSupport::EventFactoryAdapter

# Defines a target field for placing decoded fields.
# If this setting is omitted, data gets stored at the root (top level) of the event.
#
# NOTE: the target is only relevant while decoding data into a new event.
config :target, :validate => :field_reference

public
def initialize(params={})
super(params)
@lines = LogStash::Codecs::Line.new
@lines.charset = "UTF-8"
@state = :initial
@metadata = Hash.new
@metadata_field = ecs_select[disabled: '[@metadata]', v1: '[@metadata][codec][es_bulk]']
end

def register
end

public
Expand All @@ -27,17 +49,17 @@ def decode(data)
line = LogStash::Json.load(bulk.get("message"))
case @state
when :metadata
event = LogStash::Event.new(line)
event.set("@metadata", @metadata)
event = targeted_event_factory.new_event(line)
event.set(@metadata_field, @metadata)
yield event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to event.set('[event][original]', bulk.get("message").dup.freeze) in ECS mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are four options here. Considering the following input

      { "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
      { "field1" : "value1" }
      { "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
      { "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
      { "field1" : "value3" }
      { "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
      { "doc" : {"field2" : "value2"} }
  1. Store the whole input as event.original.
    The input generates four events. Each event saves the complete raw data.
    It could generate too much extra bandwidth

  2. Store { "field1" : "value1" } as event.original.
    This is the line when target_event_factory is called. Metadata is not included. The information is not complete.
    This is the result of event.set('[event][original]', bulk.get("message").dup.freeze)
    The information is not particularly interesting because the same stuff transforms into an event.

  3. tailor-make a new line to concatenate metadata { "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } } and data { "field1" : "value1" }
    This will be a full event that requires concatenating the decoded string

  4. do not store event.original

I am opened to save it in event.original if you think it is useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right, it's tricky with the multi-line and the managed state ... was undecided on this making sense.
Reading your comments I am leaning towards 4. do not store event.original for now and ship as is...

@state = :initial
when :initial
@metadata = line[line.keys[0]]
@metadata["action"] = line.keys[0].to_s
@state = :metadata
if line.keys[0] == 'delete'
event = LogStash::Event.new()
event.set("@metadata", @metadata)
event = targeted_event_factory.new_event
event.set(@metadata_field, @metadata)
yield event
@state = :initial
end
Expand Down
5 changes: 4 additions & 1 deletion logstash-codec-es_bulk.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-es_bulk'
s.version = '3.0.8'
s.version = '3.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads the Elasticsearch bulk format into separate events, along with metadata"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -22,6 +22,9 @@ Gem::Specification.new do |s|
# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-codec-line'
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.3'
s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'insist'
Expand Down
82 changes: 46 additions & 36 deletions spec/codecs/es_bulk_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,67 @@
require "logstash/codecs/es_bulk"
require "logstash/event"
require "insist"
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'

describe LogStash::Codecs::ESBulk do
subject do
next LogStash::Codecs::ESBulk.new
end
describe LogStash::Codecs::ESBulk, :ecs_compatibility_support do
ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to test with setting the field to v8:

Suggested change
ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select|
ecs_compatibility_matrix(:disabled, :v1, :v8) do |ecs_select|

before(:each) do
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
end

context "#decode" do
it "should return 4 events from json data" do
data = <<-HERE
subject do
next LogStash::Codecs::ESBulk.new
end

context "#decode" do
it "should return 4 events from json data" do
data = <<-HERE
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }
HERE

count = 0
subject.decode(data) do |event|
case count
when 0
insist { event.get("[@metadata][_id]") } == "1"
insist { event.get("[@metadata][action]") } == "index"
insist { event.get("field1") } == "value1"
when 1
insist { event.get("[@metadata][_id]") } == "2"
insist { event.get("[@metadata][action]") } == "delete"
when 2
insist { event.get("[@metadata][_id]") } == "3"
insist { event.get("[@metadata][action]") } == "create"
insist { event.get("field1") } == "value3"
when 3
insist { event.get("[@metadata][_id]") } == "1"
insist { event.get("[@metadata][action]") } == "update"
insist { event.get("[doc][field2]") } == "value2"
HERE

metadata_field = ecs_select[disabled: '[@metadata]', v1: '[@metadata][codec][es_bulk]']

count = 0
subject.decode(data) do |event|
case count
when 0
insist { event.get("#{metadata_field}[_id]") } == "1"
insist { event.get("#{metadata_field}[action]") } == "index"
insist { event.get("field1") } == "value1"
when 1
insist { event.get("#{metadata_field}[_id]") } == "2"
insist { event.get("#{metadata_field}[action]") } == "delete"
when 2
insist { event.get("#{metadata_field}[_id]") } == "3"
insist { event.get("#{metadata_field}[action]") } == "create"
insist { event.get("field1") } == "value3"
when 3
insist { event.get("#{metadata_field}[_id]") } == "1"
insist { event.get("#{metadata_field}[action]") } == "update"
insist { event.get("[doc][field2]") } == "value2"
end
count += 1
end
count += 1
insist { count } == 4
end
insist { count } == 4
end
end

context "fail to process non-bulk event then continue" do
it "continues after a fail" do
decoded = false
subject.decode("something that isn't a bulk event\n") do |event|
decoded = true
context "fail to process non-bulk event then continue" do
it "continues after a fail" do
decoded = false
subject.decode("something that isn't a bulk event\n") do |event|
decoded = true
end
insist { decoded } == false
end
insist { decoded } == false
end

end

end