diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b30d180a019a..b41690fb77bb 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -13,6 +13,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d ==== Breaking changes *Affecting all Beats* +- Rename the `filters` section to `processors`. {pull}1944[1944] *Metricbeat* diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 745cfd401f15..bebcecea8129 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -259,20 +259,21 @@ filebeat.prospectors: # default is the number of logical CPUs available in the system. #max_procs: -#================================ Filters ===================================== +#================================ Processors ===================================== -# This section defines a list of filtering rules that are applied one by one -# starting with the exported event: +# Processors are used to reduce the number of fields in the exported event or to +# enhance the event with external meta data. This section defines a list of processors +# that are applied one by one and the first one receives the initial event: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# Supported actions: drop_fields, drop_event, include_fields +# Supported processors: drop_fields, drop_event, include_fields # -# For example, the following filter configuration uses multiple actions to keep +# For example, you can use the following processors to keep # the fields that contain CPU load percentages, but remove the fields that # contain CPU ticks values: # -#filters: +#processors: #- include_fields: # fields: ["cpu"] #- drop_fields: @@ -280,7 +281,7 @@ filebeat.prospectors: # # The following example drops the events that have the HTTP response code 200: # -#filters: +#processors: #- drop_event: # equals: # http.code: 200 diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index c8b6b05209f2..608a4585555f 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -110,7 +110,7 @@ geoip: {%- if drop_fields or drop_event or include_fields %} #================================ Filters ===================================== -filters: +processors: {%- if include_fields %} - include_fields: diff --git a/filebeat/tests/system/test_filtering.py b/filebeat/tests/system/test_processors.py similarity index 100% rename from filebeat/tests/system/test_filtering.py rename to filebeat/tests/system/test_processors.py diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index cf3ab04c7f4c..8fd6e45dc4cf 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -43,10 +43,10 @@ import ( "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" - _ "github.com/elastic/beats/libbeat/filter/rules" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/paths" + "github.com/elastic/beats/libbeat/processors" + _ "github.com/elastic/beats/libbeat/processors/actions" "github.com/elastic/beats/libbeat/publisher" svc "github.com/elastic/beats/libbeat/service" "github.com/satori/go.uuid" @@ -107,16 +107,16 @@ type Beat struct { Config BeatConfig // Common Beat configuration data. Publisher *publisher.Publisher // Publisher - filters *filter.Filters // Filters + processors *processors.Processors // Processors } // BeatConfig struct contains the basic configuration of every beat type BeatConfig struct { - Shipper publisher.ShipperConfig `config:",inline"` - Output map[string]*common.Config `config:"output"` - Logging logp.Logging `config:"logging"` - Filters filter.FilterPluginConfig `config:"filters"` - Path paths.Path `config:"path"` + Shipper publisher.ShipperConfig `config:",inline"` + Output map[string]*common.Config `config:"output"` + Logging logp.Logging `config:"logging"` + Processors processors.PluginConfig `config:"processors"` + Path paths.Path `config:"path"` } // Run initializes and runs a Beater implementation. name is the name of the @@ -223,9 +223,9 @@ func (bc *instance) config() error { // log paths values to help with troubleshooting logp.Info(paths.Paths.String()) - bc.data.filters, err = filter.New(bc.data.Config.Filters) + bc.data.processors, err = processors.New(bc.data.Config.Processors) if err != nil { - return fmt.Errorf("error initializing filters: %v", err) + return fmt.Errorf("error initializing processors: %v", err) } if bc.data.Config.Shipper.MaxProcs != nil { @@ -251,7 +251,7 @@ func (bc *instance) setup() error { return fmt.Errorf("error initializing publisher: %v", err) } - bc.data.Publisher.RegisterFilter(bc.data.filters) + bc.data.Publisher.RegisterProcessors(bc.data.processors) err = bc.beater.Setup(bc.data) if err != nil { return err diff --git a/libbeat/docs/filtering.asciidoc b/libbeat/docs/filtering.asciidoc index 8a36958f47ad..24cb7f72c583 100644 --- a/libbeat/docs/filtering.asciidoc +++ b/libbeat/docs/filtering.asciidoc @@ -9,8 +9,9 @@ //// include::../../libbeat/docs/filtering.asciidoc[] ////////////////////////////////////////////////////////////////////////// -Generic filtering is available to all Beats through libbeat. With generic filtering, you can reduce the number of -fields that are exported by the Beat by defining a list of filter actions that are applied to each event before it's -sent to the defined output. The filter actions are executed in the order that they are defined in the config file. +You can defined processors in any Beat as it is part of libbeat. Processors are used to reduce the number of +the exported fields, but also to enhance them with additional meta data. +Each processor receives an event, applies a defined action to the event and returns it. In case you define a list of +processors, then they are executed in the order they are defined in the configuration file. -You can define the filter actions under the `filters` section of the {beatname_uc} configuration file. +The processors are defined in the {beatname_uc} configuration file. diff --git a/libbeat/docs/filteringconfig.asciidoc b/libbeat/docs/filteringconfig.asciidoc index 54516ad997cc..df747dbace53 100644 --- a/libbeat/docs/filteringconfig.asciidoc +++ b/libbeat/docs/filteringconfig.asciidoc @@ -11,26 +11,28 @@ ////////////////////////////////////////////////////////////////////////// [[configuration-filter]] -=== Filters Configuration +=== Processors -You can set a list of filter actions in the `filters` section of the +{beatname_lc}.yml+ config file to reduce the number +You can define a set of `processors` in the +{beatname_lc}.yml+ config file to reduce the number of fields that are exported by the Beat. -If more filtering rules are defined, then they are executed in the order they are defined. The initial event is passed to the first filtering rule and what results from it is passed to the second filtering rule until all the filtering rules are applied. The condition that is used in the following filtering rules is running against the event that is received as input and it might defer from the original event. +If more processors are defined, then they are executed in the order they are defined. The initial event is passed to the +first processor and what results from it is passed to the second processor until all processors are applied. The +condition is checked against the event that is received as input and it might defer from the original event. [source,yaml] ------- -event -> filter action 1 -> event1 -> filter action 2 -> event2 ... +event -> processor 1 -> event1 -> processor 2 -> event2 ... ------- See <> for the full list of possible fields. -Each filter action receives a condition and optionally a set of arguments. The action is executed only if the condition +Each processor receives a condition and optionally a set of arguments. The action is executed only if the condition is fulfilled. [source,yaml] ------ -filters: +processors: - action1: condition1 [arguments] @@ -44,7 +46,7 @@ filters: See <> for specific {beatname_uc} examples. [[filtering-condition]] -==== Filtering condition +==== Condition Each condition receives a field to compare or multiple fields under the same condition and then `AND` is used between them. You can see a list of the <>. @@ -138,7 +140,7 @@ range: cpu.user_p: 0.8 ------ -==== Filtering Actions +==== Actions The supported filter actions are: @@ -156,7 +158,7 @@ optional and if it's missing then the defined fields are always exported. The `@ [source,yaml] ------- -filters: +processors: - include_fields: [condition] fields: ["field1", "field2", ...] @@ -178,7 +180,7 @@ even if they show up in the `drop_fields` list. [source,yaml] ----------------------------------------------------- -filters: +processors: - drop_fields: [condition] fields: ["field1", "field2", ...] @@ -195,7 +197,7 @@ without one all the events are dropped. [source,yaml] ------ -filters: +processors: - drop_event: condition ------ diff --git a/libbeat/filter/filter.go b/libbeat/filter/filter.go deleted file mode 100644 index 1c6ebcc91d70..000000000000 --- a/libbeat/filter/filter.go +++ /dev/null @@ -1,87 +0,0 @@ -package filter - -import ( - "fmt" - "strings" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" -) - -type Filters struct { - list []FilterRule -} - -func New(config FilterPluginConfig) (*Filters, error) { - - filters := Filters{} - - for _, filter := range config { - - if len(filter) != 1 { - return nil, fmt.Errorf("each filtering rule needs to have exactly one action, but found %d actions.", len(filter)) - } - - for filterName, cfg := range filter { - - constructor, exists := filterConstructors[filterName] - if !exists { - return nil, fmt.Errorf("the filtering rule %s doesn't exist", filterName) - } - - plugin, err := constructor(cfg) - if err != nil { - return nil, err - } - - filters.addRule(plugin) - } - } - - logp.Debug("filter", "filters: %v", filters) - return &filters, nil -} - -func (filters *Filters) addRule(filter FilterRule) { - - if filters.list == nil { - filters.list = []FilterRule{} - } - filters.list = append(filters.list, filter) -} - -// Applies a sequence of filtering rules and returns the filtered event -func (filters *Filters) Filter(event common.MapStr) common.MapStr { - - // Check if filters are set, just return event if not - if len(filters.list) == 0 { - return event - } - - // clone the event at first, before starting filtering - filtered := event.Clone() - var err error - - for _, filter := range filters.list { - filtered, err = filter.Filter(filtered) - if err != nil { - logp.Debug("filter", "fail to apply filtering rule %s: %s", filter, err) - } - if filtered == nil { - // drop event - return nil - } - } - - return filtered -} - -func (filters Filters) String() string { - s := []string{} - - for _, filter := range filters.list { - - s = append(s, filter.String()) - } - return strings.Join(s, ", ") -} diff --git a/libbeat/filter/registry.go b/libbeat/filter/registry.go deleted file mode 100644 index 8d281f4f711e..000000000000 --- a/libbeat/filter/registry.go +++ /dev/null @@ -1,28 +0,0 @@ -package filter - -import ( - "fmt" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" -) - -type FilterRule interface { - Filter(event common.MapStr) (common.MapStr, error) - String() string -} - -type FilterConstructor func(config common.Config) (FilterRule, error) - -var filterConstructors = map[string]FilterConstructor{} - -func RegisterPlugin(name string, constructor FilterConstructor) error { - - logp.Debug("filter", "Register plugin %s", name) - - if _, exists := filterConstructors[name]; exists { - return fmt.Errorf("plugin %s already registered", name) - } - filterConstructors[name] = constructor - return nil -} diff --git a/libbeat/filter/rules/drop_event.go b/libbeat/processors/actions/drop_event.go similarity index 67% rename from libbeat/filter/rules/drop_event.go rename to libbeat/processors/actions/drop_event.go index 7b9e60d60539..cc98b2cb6dca 100644 --- a/libbeat/filter/rules/drop_event.go +++ b/libbeat/processors/actions/drop_event.go @@ -1,27 +1,27 @@ -package rules +package actions import ( "fmt" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" + "github.com/elastic/beats/libbeat/processors" ) type DropEvent struct { - Cond *filter.Condition + Cond *processors.Condition } type DropEventConfig struct { - filter.ConditionConfig `config:",inline"` + processors.ConditionConfig `config:",inline"` } func init() { - if err := filter.RegisterPlugin("drop_event", newDropEvent); err != nil { + if err := processors.RegisterPlugin("drop_event", newDropEvent); err != nil { panic(err) } } -func newDropEvent(c common.Config) (filter.FilterRule, error) { +func newDropEvent(c common.Config) (processors.Processor, error) { f := DropEvent{} @@ -36,7 +36,7 @@ func newDropEvent(c common.Config) (filter.FilterRule, error) { return nil, fmt.Errorf("fail to unpack the drop_event configuration: %s", err) } - cond, err := filter.NewCondition(config.ConditionConfig) + cond, err := processors.NewCondition(config.ConditionConfig) if err != nil { return nil, err } @@ -48,14 +48,14 @@ func newDropEvent(c common.Config) (filter.FilterRule, error) { func (f *DropEvent) CheckConfig(c common.Config) error { for _, field := range c.GetFields() { - if !filter.AvailableCondition(field) { + if !processors.AvailableCondition(field) { return fmt.Errorf("unexpected %s option in the drop_event configuration", field) } } return nil } -func (f *DropEvent) Filter(event common.MapStr) (common.MapStr, error) { +func (f *DropEvent) Run(event common.MapStr) (common.MapStr, error) { if f.Cond != nil && !f.Cond.Check(event) { return event, nil diff --git a/libbeat/filter/rules/drop_fields.go b/libbeat/processors/actions/drop_fields.go similarity index 73% rename from libbeat/filter/rules/drop_fields.go rename to libbeat/processors/actions/drop_fields.go index 37e74dd74f71..5db3ca35bbe5 100644 --- a/libbeat/filter/rules/drop_fields.go +++ b/libbeat/processors/actions/drop_fields.go @@ -1,31 +1,31 @@ -package rules +package actions import ( "fmt" "strings" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" + "github.com/elastic/beats/libbeat/processors" ) type DropFields struct { Fields []string // condition - Cond *filter.Condition + Cond *processors.Condition } type DropFieldsConfig struct { - Fields []string `config:"fields"` - filter.ConditionConfig `config:",inline"` + Fields []string `config:"fields"` + processors.ConditionConfig `config:",inline"` } func init() { - if err := filter.RegisterPlugin("drop_fields", newDropFields); err != nil { + if err := processors.RegisterPlugin("drop_fields", newDropFields); err != nil { panic(err) } } -func newDropFields(c common.Config) (filter.FilterRule, error) { +func newDropFields(c common.Config) (processors.Processor, error) { f := DropFields{} @@ -41,7 +41,7 @@ func newDropFields(c common.Config) (filter.FilterRule, error) { } /* remove read only fields */ - for _, readOnly := range filter.MandatoryExportedFields { + for _, readOnly := range processors.MandatoryExportedFields { for i, field := range config.Fields { if readOnly == field { config.Fields = append(config.Fields[:i], config.Fields[i+1:]...) @@ -50,7 +50,7 @@ func newDropFields(c common.Config) (filter.FilterRule, error) { } f.Fields = config.Fields - cond, err := filter.NewCondition(config.ConditionConfig) + cond, err := processors.NewCondition(config.ConditionConfig) if err != nil { return nil, err } @@ -64,7 +64,7 @@ func (f *DropFields) CheckConfig(c common.Config) error { complete := false for _, field := range c.GetFields() { - if !filter.AvailableCondition(field) { + if !processors.AvailableCondition(field) { if field != "fields" { return fmt.Errorf("unexpected %s option in the drop_fields configuration", field) } @@ -80,7 +80,7 @@ func (f *DropFields) CheckConfig(c common.Config) error { return nil } -func (f *DropFields) Filter(event common.MapStr) (common.MapStr, error) { +func (f *DropFields) Run(event common.MapStr) (common.MapStr, error) { if f.Cond != nil && !f.Cond.Check(event) { return event, nil diff --git a/libbeat/filter/rules/include_fields.go b/libbeat/processors/actions/include_fields.go similarity index 76% rename from libbeat/filter/rules/include_fields.go rename to libbeat/processors/actions/include_fields.go index 31fcd8751fb3..741c4c34cf9e 100644 --- a/libbeat/filter/rules/include_fields.go +++ b/libbeat/processors/actions/include_fields.go @@ -1,31 +1,31 @@ -package rules +package actions import ( "fmt" "strings" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" + "github.com/elastic/beats/libbeat/processors" ) type IncludeFields struct { Fields []string // condition - Cond *filter.Condition + Cond *processors.Condition } type IncludeFieldsConfig struct { - Fields []string `config:"fields"` - filter.ConditionConfig `config:",inline"` + Fields []string `config:"fields"` + processors.ConditionConfig `config:",inline"` } func init() { - if err := filter.RegisterPlugin("include_fields", newIncludeFields); err != nil { + if err := processors.RegisterPlugin("include_fields", newIncludeFields); err != nil { panic(err) } } -func newIncludeFields(c common.Config) (filter.FilterRule, error) { +func newIncludeFields(c common.Config) (processors.Processor, error) { f := IncludeFields{} @@ -41,7 +41,7 @@ func newIncludeFields(c common.Config) (filter.FilterRule, error) { } /* add read only fields if they are not yet */ - for _, readOnly := range filter.MandatoryExportedFields { + for _, readOnly := range processors.MandatoryExportedFields { found := false for _, field := range config.Fields { if readOnly == field { @@ -54,7 +54,7 @@ func newIncludeFields(c common.Config) (filter.FilterRule, error) { } f.Fields = config.Fields - cond, err := filter.NewCondition(config.ConditionConfig) + cond, err := processors.NewCondition(config.ConditionConfig) if err != nil { return nil, err } @@ -68,7 +68,7 @@ func (f *IncludeFields) CheckConfig(c common.Config) error { complete := false for _, field := range c.GetFields() { - if !filter.AvailableCondition(field) { + if !processors.AvailableCondition(field) { if field != "fields" { return fmt.Errorf("unexpected %s option in the include_fields configuration", field) } @@ -84,7 +84,7 @@ func (f *IncludeFields) CheckConfig(c common.Config) error { return nil } -func (f *IncludeFields) Filter(event common.MapStr) (common.MapStr, error) { +func (f *IncludeFields) Run(event common.MapStr) (common.MapStr, error) { if f.Cond != nil && !f.Cond.Check(event) { return event, nil diff --git a/libbeat/filter/condition.go b/libbeat/processors/condition.go similarity index 96% rename from libbeat/filter/condition.go rename to libbeat/processors/condition.go index c4f44dc5c8d8..2efa08634f5d 100644 --- a/libbeat/filter/condition.go +++ b/libbeat/processors/condition.go @@ -1,4 +1,4 @@ -package filter +package processors import ( "fmt" @@ -68,7 +68,7 @@ func NewCondition(config ConditionConfig) (*Condition, error) { return &c, nil } -func (c *Condition) setEquals(cfg *ConditionFilter) error { +func (c *Condition) setEquals(cfg *ConditionFields) error { c.equals = map[string]EqualsValue{} @@ -88,7 +88,7 @@ func (c *Condition) setEquals(cfg *ConditionFilter) error { return nil } -func (c *Condition) setContains(cfg *ConditionFilter) error { +func (c *Condition) setContains(cfg *ConditionFields) error { c.contains = map[string]string{} @@ -104,7 +104,7 @@ func (c *Condition) setContains(cfg *ConditionFilter) error { return nil } -func (c *Condition) setRegexp(cfg *ConditionFilter) error { +func (c *Condition) setRegexp(cfg *ConditionFields) error { var err error @@ -124,7 +124,7 @@ func (c *Condition) setRegexp(cfg *ConditionFilter) error { return nil } -func (c *Condition) setRange(cfg *ConditionFilter) error { +func (c *Condition) setRange(cfg *ConditionFields) error { c.rangexp = map[string]RangeValue{} diff --git a/libbeat/filter/condition_test.go b/libbeat/processors/condition_test.go similarity index 85% rename from libbeat/filter/condition_test.go rename to libbeat/processors/condition_test.go index 31abc7dda797..dbd94b0ed6f8 100644 --- a/libbeat/filter/condition_test.go +++ b/libbeat/processors/condition_test.go @@ -1,4 +1,4 @@ -package filter +package processors import ( "testing" @@ -16,24 +16,24 @@ func TestBadCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Equals: &ConditionFilter{fields: map[string]interface{}{ + Equals: &ConditionFields{fields: map[string]interface{}{ "proc.pid": 0.08, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "gtr": 0.3, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "gt": "fdfdd", }}, }, ConditionConfig{ - Regexp: &ConditionFilter{fields: map[string]interface{}{ + Regexp: &ConditionFields{fields: map[string]interface{}{ "proc.name": "58gdhsga-=kw++w00", }}, }, @@ -67,20 +67,20 @@ func TestEqualsCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Equals: &ConditionFilter{fields: map[string]interface{}{ + Equals: &ConditionFields{fields: map[string]interface{}{ "type": "process", }}, }, ConditionConfig{ - Equals: &ConditionFilter{fields: map[string]interface{}{ + Equals: &ConditionFields{fields: map[string]interface{}{ "type": "process", "proc.pid": 305, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "proc.cpu.total_p.gt": 0.5, }}, }, @@ -121,14 +121,14 @@ func TestContainsCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Contains: &ConditionFilter{fields: map[string]interface{}{ + Contains: &ConditionFields{fields: map[string]interface{}{ "proc.name": "sec", "proc.username": "monica", }}, }, ConditionConfig{ - Contains: &ConditionFilter{fields: map[string]interface{}{ + Contains: &ConditionFields{fields: map[string]interface{}{ "type": "process", "proc.name": "secddd", }}, @@ -169,19 +169,19 @@ func TestRegexpCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Regexp: &ConditionFilter{fields: map[string]interface{}{ + Regexp: &ConditionFields{fields: map[string]interface{}{ "source": "apache2/error.*", }}, }, ConditionConfig{ - Regexp: &ConditionFilter{fields: map[string]interface{}{ + Regexp: &ConditionFields{fields: map[string]interface{}{ "source": "apache2/access.*", }}, }, ConditionConfig{ - Regexp: &ConditionFilter{fields: map[string]interface{}{ + Regexp: &ConditionFields{fields: map[string]interface{}{ "source": "apache2/error.*", "message": "[client 1.2.3.4]", }}, @@ -224,27 +224,27 @@ func TestRangeCondition(t *testing.T) { configs := []ConditionConfig{ ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "http.code.gte": 400, "http.code.lt": 500, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "bytes_out.gte": 2800, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "bytes_out.gte": 2800, "responsetime.gt": 30, }}, }, ConditionConfig{ - Range: &ConditionFilter{fields: map[string]interface{}{ + Range: &ConditionFields{fields: map[string]interface{}{ "proc.cpu.total_p.gte": 0.5, }}, }, diff --git a/libbeat/filter/config.go b/libbeat/processors/config.go similarity index 87% rename from libbeat/filter/config.go rename to libbeat/processors/config.go index dc2d2f568b54..550203fe41be 100644 --- a/libbeat/filter/config.go +++ b/libbeat/processors/config.go @@ -1,4 +1,4 @@ -package filter +package processors import ( "fmt" @@ -9,22 +9,22 @@ import ( ) type ConditionConfig struct { - Equals *ConditionFilter `config:"equals"` - Contains *ConditionFilter `config:"contains"` - Regexp *ConditionFilter `config:"regexp"` - Range *ConditionFilter `config:"range"` + Equals *ConditionFields `config:"equals"` + Contains *ConditionFields `config:"contains"` + Regexp *ConditionFields `config:"regexp"` + Range *ConditionFields `config:"range"` } -type ConditionFilter struct { +type ConditionFields struct { fields map[string]interface{} } -type FilterPluginConfig []map[string]common.Config +type PluginConfig []map[string]common.Config // fields that should be always exported var MandatoryExportedFields = []string{"@timestamp", "type"} -func (f *ConditionFilter) Unpack(to interface{}) error { +func (f *ConditionFields) Unpack(to interface{}) error { m, ok := to.(map[string]interface{}) if !ok { return fmt.Errorf("wrong type, expect map") diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go new file mode 100644 index 000000000000..5d15c4679772 --- /dev/null +++ b/libbeat/processors/processor.go @@ -0,0 +1,85 @@ +package processors + +import ( + "fmt" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type Processors struct { + list []Processor +} + +func New(config PluginConfig) (*Processors, error) { + + processors := Processors{} + + for _, processor := range config { + + if len(processor) != 1 { + return nil, fmt.Errorf("each processor needs to have exactly one action, but found %d actions.", + len(processor)) + } + + for processorName, cfg := range processor { + + constructor, exists := constructors[processorName] + if !exists { + return nil, fmt.Errorf("the processor %s doesn't exist", processorName) + } + + plugin, err := constructor(cfg) + if err != nil { + return nil, err + } + + processors.addProcessor(plugin) + } + } + + logp.Debug("processors", "Processors: %v", processors) + return &processors, nil +} + +func (processors *Processors) addProcessor(p Processor) { + + processors.list = append(processors.list, p) +} + +// Applies a sequence of processing rules and returns the filtered event +func (processors *Processors) Run(event common.MapStr) common.MapStr { + + // Check if processors are set, just return event if not + if len(processors.list) == 0 { + return event + } + + // clone the event at first, before starting filtering + filtered := event.Clone() + var err error + + for _, p := range processors.list { + filtered, err = p.Run(filtered) + if err != nil { + logp.Debug("filter", "fail to apply processor %s: %s", p, err) + } + if filtered == nil { + // drop event + return nil + } + } + + return filtered +} + +func (processors Processors) String() string { + s := []string{} + + for _, p := range processors.list { + + s = append(s, p.String()) + } + return strings.Join(s, ", ") +} diff --git a/libbeat/filter/filter_test.go b/libbeat/processors/processor_test.go similarity index 82% rename from libbeat/filter/filter_test.go rename to libbeat/processors/processor_test.go index 3fceeb912089..a25ed78e5f28 100644 --- a/libbeat/filter/filter_test.go +++ b/libbeat/processors/processor_test.go @@ -1,36 +1,36 @@ -package filter_test +package processors_test import ( "testing" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" - _ "github.com/elastic/beats/libbeat/filter/rules" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + _ "github.com/elastic/beats/libbeat/processors/actions" "github.com/stretchr/testify/assert" ) -func GetFilters(t *testing.T, yml []map[string]interface{}) *filter.Filters { +func GetProcessors(t *testing.T, yml []map[string]interface{}) *processors.Processors { - config := filter.FilterPluginConfig{} + config := processors.PluginConfig{} - for _, rule := range yml { + for _, action := range yml { c := map[string]common.Config{} - for name, ruleYml := range rule { - ruleConfig, err := common.NewConfigFrom(ruleYml) + for name, actionYml := range action { + actionConfig, err := common.NewConfigFrom(actionYml) assert.Nil(t, err) - c[name] = *ruleConfig + c[name] = *actionConfig } config = append(config, c) } - filters, err := filter.New(config) + list, err := processors.New(config) assert.Nil(t, err) - return filters + return list } @@ -54,21 +54,21 @@ func TestBadConfig(t *testing.T) { }, } - config := filter.FilterPluginConfig{} + config := processors.PluginConfig{} - for _, rule := range yml { + for _, action := range yml { c := map[string]common.Config{} - for name, ruleYml := range rule { - ruleConfig, err := common.NewConfigFrom(ruleYml) + for name, actionYml := range action { + actionConfig, err := common.NewConfigFrom(actionYml) assert.Nil(t, err) - c[name] = *ruleConfig + c[name] = *actionConfig } config = append(config, c) } - _, err := filter.New(config) + _, err := processors.New(config) assert.NotNil(t, err) } @@ -90,7 +90,7 @@ func TestIncludeFields(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -118,7 +118,7 @@ func TestIncludeFields(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) expectedEvent := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -136,7 +136,7 @@ func TestIncludeFields(t *testing.T) { "type": "process", } - assert.Equal(t, expectedEvent, filteredEvent) + assert.Equal(t, expectedEvent, processedEvent) } func TestIncludeFields1(t *testing.T) { @@ -156,7 +156,7 @@ func TestIncludeFields1(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -184,14 +184,14 @@ func TestIncludeFields1(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) expectedEvent := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", "type": "process", } - assert.Equal(t, expectedEvent, filteredEvent) + assert.Equal(t, expectedEvent, processedEvent) } func TestDropFields(t *testing.T) { @@ -207,7 +207,7 @@ func TestDropFields(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -235,7 +235,7 @@ func TestDropFields(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) expectedEvent := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -250,7 +250,7 @@ func TestDropFields(t *testing.T) { "type": "process", } - assert.Equal(t, expectedEvent, filteredEvent) + assert.Equal(t, expectedEvent, processedEvent) } func TestMultipleIncludeFields(t *testing.T) { @@ -275,7 +275,7 @@ func TestMultipleIncludeFields(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event1 := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -341,8 +341,8 @@ func TestMultipleIncludeFields(t *testing.T) { "type": "process", } - actual1 := filters.Filter(event1) - actual2 := filters.Filter(event2) + actual1 := processors.Run(event1) + actual2 := processors.Run(event2) assert.Equal(t, expected1, actual1) assert.Equal(t, expected2, actual2) @@ -366,7 +366,7 @@ func TestDropEvent(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -394,9 +394,9 @@ func TestDropEvent(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) - assert.Nil(t, filteredEvent) + assert.Nil(t, processedEvent) } func TestEmptyCondition(t *testing.T) { @@ -411,7 +411,7 @@ func TestEmptyCondition(t *testing.T) { }, } - filters := GetFilters(t, yml) + processors := GetProcessors(t, yml) event := common.MapStr{ "@timestamp": "2016-01-24T18:35:19.308Z", @@ -439,9 +439,9 @@ func TestEmptyCondition(t *testing.T) { "type": "process", } - filteredEvent := filters.Filter(event) + processedEvent := processors.Run(event) - assert.Nil(t, filteredEvent) + assert.Nil(t, processedEvent) } func TestBadCondition(t *testing.T) { @@ -460,21 +460,21 @@ func TestBadCondition(t *testing.T) { }, } - config := filter.FilterPluginConfig{} + config := processors.PluginConfig{} - for _, rule := range yml { + for _, action := range yml { c := map[string]common.Config{} - for name, ruleYml := range rule { - ruleConfig, err := common.NewConfigFrom(ruleYml) + for name, actionYml := range action { + actionConfig, err := common.NewConfigFrom(actionYml) assert.Nil(t, err) - c[name] = *ruleConfig + c[name] = *actionConfig } config = append(config, c) } - _, err := filter.New(config) + _, err := processors.New(config) assert.NotNil(t, err) } @@ -495,21 +495,21 @@ func TestMissingFields(t *testing.T) { }, } - config := filter.FilterPluginConfig{} + config := processors.PluginConfig{} - for _, rule := range yml { + for _, action := range yml { c := map[string]common.Config{} - for name, ruleYml := range rule { - ruleConfig, err := common.NewConfigFrom(ruleYml) + for name, actionYml := range action { + actionConfig, err := common.NewConfigFrom(actionYml) assert.Nil(t, err) - c[name] = *ruleConfig + c[name] = *actionConfig } config = append(config, c) } - _, err := filter.New(config) + _, err := processors.New(config) assert.NotNil(t, err) } diff --git a/libbeat/processors/registry.go b/libbeat/processors/registry.go new file mode 100644 index 000000000000..b24133311470 --- /dev/null +++ b/libbeat/processors/registry.go @@ -0,0 +1,28 @@ +package processors + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type Processor interface { + Run(event common.MapStr) (common.MapStr, error) + String() string +} + +type Constructor func(config common.Config) (Processor, error) + +var constructors = map[string]Constructor{} + +func RegisterPlugin(name string, constructor Constructor) error { + + logp.Debug("processors", "Register plugin %s", name) + + if _, exists := constructors[name]; exists { + return fmt.Errorf("plugin %s already registered", name) + } + constructors[name] = constructor + return nil +} diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index b5bed1904ff3..2940d9947082 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -165,8 +165,8 @@ func (c *client) filterEvent(event common.MapStr) *common.MapStr { } - // filter the event by applying the configured rules - publishEvent := c.publisher.Filters.Filter(event) + // process the event by applying the configured actions + publishEvent := c.publisher.Processors.Run(event) if publishEvent == nil { // the event is dropped logp.Debug("publish", "Drop event %s", event.StringToPrint()) diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index ca31b93737ea..dc99e1bc909e 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -9,9 +9,9 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/op" - "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/processors" "github.com/nranchev/go-libGeoIP" // load supported output plugins @@ -57,7 +57,7 @@ type Publisher struct { TopologyOutput outputs.TopologyOutputer IgnoreOutgoing bool GeoLite *libgeo.GeoIP - Filters *filter.Filters + Processors *processors.Processors globalEventMetadata common.EventMetadata // Fields and tags to add to each event. @@ -172,9 +172,9 @@ func (publisher *Publisher) PublishTopology(params ...string) error { return nil } -func (publisher *Publisher) RegisterFilter(filters *filter.Filters) error { +func (publisher *Publisher) RegisterProcessors(list *processors.Processors) error { - publisher.Filters = filters + publisher.Processors = list return nil } diff --git a/metricbeat/beater/event.go b/metricbeat/beater/event.go index f87a73a7d9ee..a88a832e1bca 100644 --- a/metricbeat/beater/event.go +++ b/metricbeat/beater/event.go @@ -4,8 +4,8 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) const ( @@ -23,7 +23,7 @@ type eventBuilder struct { fetchDuration time.Duration event common.MapStr fetchErr error - filters *filter.Filters + filters *processors.Processors metadata common.EventMetadata } @@ -43,7 +43,7 @@ func (b eventBuilder) build() (common.MapStr, error) { // Apply filters. if b.filters != nil { - event = b.filters.Filter(event) + event = b.filters.Run(event) } event = common.MapStr{ diff --git a/metricbeat/beater/event_test.go b/metricbeat/beater/event_test.go index 11b02eb6b6c0..94f0efe22944 100644 --- a/metricbeat/beater/event_test.go +++ b/metricbeat/beater/event_test.go @@ -33,7 +33,7 @@ var builder = eventBuilder{ fetchDuration: elapsed, // event // fetchErr - // filters + // processors // metadata } diff --git a/metricbeat/beater/module.go b/metricbeat/beater/module.go index 90aa64437593..d2e74b451876 100644 --- a/metricbeat/beater/module.go +++ b/metricbeat/beater/module.go @@ -7,8 +7,8 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/metricbeat/mb" "github.com/joeshaw/multierror" @@ -34,7 +34,7 @@ var ( // Use NewModuleWrapper or NewModuleWrappers to construct new ModuleWrappers. type ModuleWrapper struct { mb.Module - filters *filter.Filters + filters *processors.Processors metricSets []*metricSetWrapper // List of pointers to its associated MetricSets. } @@ -76,7 +76,7 @@ func NewModuleWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Modul var errs multierror.Errors for k, v := range modules { debugf("Initializing Module type '%s': %T=%+v", k.Name(), k, k) - f, err := filter.New(k.Config().Filters) + f, err := processors.New(k.Config().Filters) if err != nil { errs = append(errs, errors.Wrapf(err, "module %s", k.Name())) continue diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 8b0f3a9da658..5a552e6767db 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -8,7 +8,7 @@ import ( "time" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/filter" + "github.com/elastic/beats/libbeat/processors" ) // Module interfaces @@ -100,13 +100,13 @@ func (b *BaseMetricSet) Host() string { // ModuleConfig is the base configuration data for all Modules. type ModuleConfig struct { - Hosts []string `config:"hosts"` - Period time.Duration `config:"period" validate:"positive"` - Timeout time.Duration `config:"timeout" validate:"positive"` - Module string `config:"module" validate:"required"` - MetricSets []string `config:"metricsets" validate:"required"` - Enabled bool `config:"enabled"` - Filters filter.FilterPluginConfig `config:"filters"` + Hosts []string `config:"hosts"` + Period time.Duration `config:"period" validate:"positive"` + Timeout time.Duration `config:"timeout" validate:"positive"` + Module string `config:"module" validate:"required"` + MetricSets []string `config:"metricsets" validate:"required"` + Enabled bool `config:"enabled"` + Filters processors.PluginConfig `config:"filters"` common.EventMetadata `config:",inline"` // Fields and tags to add to events. } diff --git a/metricbeat/tests/system/config/metricbeat.yml.j2 b/metricbeat/tests/system/config/metricbeat.yml.j2 index b90bb07c50fa..98acbd966119 100644 --- a/metricbeat/tests/system/config/metricbeat.yml.j2 +++ b/metricbeat/tests/system/config/metricbeat.yml.j2 @@ -104,8 +104,8 @@ geoip: {%- if drop_fields or drop_event or include_fields %} -#================================ Filters ===================================== -filters: +#================================ Processors ===================================== +processors: {%- if include_fields %} - include_fields: diff --git a/packetbeat/packetbeat.full.yml b/packetbeat/packetbeat.full.yml index b52ade68353e..a244da5742d8 100644 --- a/packetbeat/packetbeat.full.yml +++ b/packetbeat/packetbeat.full.yml @@ -429,20 +429,21 @@ packetbeat.protocols.nfs: # default is the number of logical CPUs available in the system. #max_procs: -#================================ Filters ===================================== +#================================ Processors ===================================== -# This section defines a list of filtering rules that are applied one by one -# starting with the exported event: +# Processors are used to reduce the number of fields in the exported event or to +# enhance the event with external meta data. This section defines a list of processors +# that are applied one by one and the first one receives the initial event: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# Supported actions: drop_fields, drop_event, include_fields +# Supported processors: drop_fields, drop_event, include_fields # -# For example, the following filter configuration uses multiple actions to keep +# For example, you can use the following processors to keep # the fields that contain CPU load percentages, but remove the fields that # contain CPU ticks values: # -#filters: +#processors: #- include_fields: # fields: ["cpu"] #- drop_fields: @@ -450,7 +451,7 @@ packetbeat.protocols.nfs: # # The following example drops the events that have the HTTP response code 200: # -#filters: +#processors: #- drop_event: # equals: # http.code: 200 diff --git a/packetbeat/tests/system/config/packetbeat.yml.j2 b/packetbeat/tests/system/config/packetbeat.yml.j2 index 1b028cecb8f0..07e9eaaa7ece 100644 --- a/packetbeat/tests/system/config/packetbeat.yml.j2 +++ b/packetbeat/tests/system/config/packetbeat.yml.j2 @@ -176,8 +176,8 @@ geoip: {%- if drop_fields or drop_event or include_fields %} -#================================ Filters ===================================== -filters: +#================================ Processors ===================================== +processors: {%- if include_fields %} - include_fields: diff --git a/packetbeat/tests/system/test_0060_filtering.py b/packetbeat/tests/system/test_0060_processors.py similarity index 100% rename from packetbeat/tests/system/test_0060_filtering.py rename to packetbeat/tests/system/test_0060_processors.py diff --git a/winlogbeat/winlogbeat.full.yml b/winlogbeat/winlogbeat.full.yml index ee03acfc820e..2cf65a8e93ee 100644 --- a/winlogbeat/winlogbeat.full.yml +++ b/winlogbeat/winlogbeat.full.yml @@ -80,10 +80,11 @@ winlogbeat.event_logs: # default is the number of logical CPUs available in the system. #max_procs: -#================================ Filters ===================================== +#================================ Processors ===================================== -# This section defines a list of filtering rules that are applied one by one -# starting with the exported event: +# Processors are used to reduce the number of fields in the exported event or to +# enhance the event with external meta data. This section defines a list of processors +# that are applied one by one and the first one receives the initial event: # # event -> filter1 -> event1 -> filter2 ->event2 ... #