From 1d944620660b7119fbe76099ba38213c673fae67 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 24 Apr 2019 08:43:13 -0500 Subject: [PATCH] Add `add_observer_metadata processor` (#11394) Resolves #11379 via addition of new add_observer_metadata processor. In addition to creating the processor this PR extracts out the common operations between add_observer_metadata and add_host_metadata for geo and netinfo fields into a new processors/util package. Please note that the observer ECS field does not contain the same values that host does. See the ECS Observer Spec for more info. --- CHANGELOG.next.asciidoc | 3 + heartbeat/heartbeat.yml | 5 + heartbeat/scripts/post_process_config.py | 4 + libbeat/cmd/instance/imports.go | 1 + libbeat/docs/processors-using.asciidoc | 77 +++++++- .../add_host_metadata/add_host_metadata.go | 88 +--------- .../add_host_metadata_test.go | 71 ++------ .../processors/add_host_metadata/config.go | 21 +-- .../add_observer_metadata.go | 154 ++++++++++++++++ .../add_observer_metadata_test.go | 164 ++++++++++++++++++ .../add_observer_metadata/config.go | 39 +++++ libbeat/processors/processor.go | 7 +- .../javascript/module/processor/processor.go | 2 + libbeat/processors/util/geo.go | 77 ++++++++ libbeat/processors/util/geo_test.go | 110 ++++++++++++ libbeat/processors/util/netinfo.go | 67 +++++++ 16 files changed, 731 insertions(+), 159 deletions(-) create mode 100644 libbeat/processors/add_observer_metadata/add_observer_metadata.go create mode 100644 libbeat/processors/add_observer_metadata/add_observer_metadata_test.go create mode 100644 libbeat/processors/add_observer_metadata/config.go create mode 100644 libbeat/processors/util/geo.go create mode 100644 libbeat/processors/util/geo_test.go create mode 100644 libbeat/processors/util/netinfo.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3022081f41d..428f0c11632 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -119,6 +119,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - New processor: `truncate_fields`. {pull}11297[11297] - Allow a beat to ship monitoring data directly to an Elasticsearch monitoring clsuter. {pull}9260[9260] - Updated go-seccomp-bpf library to v1.1.0 which updates syscall lists for Linux v5.0. {pull}NNNN[NNNN] +- Add `add_observer_metadata` processor. {pull}11394[11394] *Auditbeat* @@ -144,6 +145,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* +- Enable `add_observer_metadata` processor in default config. {pull}11394[11394] + *Journalbeat* *Metricbeat* diff --git a/heartbeat/heartbeat.yml b/heartbeat/heartbeat.yml index 258bc31d198..cfeaf7485e2 100644 --- a/heartbeat/heartbeat.yml +++ b/heartbeat/heartbeat.yml @@ -126,6 +126,11 @@ output.elasticsearch: # Client Certificate Key #ssl.key: "/etc/pki/client/cert.key" +#================================ Processors ===================================== +processors: + - add_observer_metadata: ~ + + #================================ Logging ===================================== # Sets log level. The default log level is info. diff --git a/heartbeat/scripts/post_process_config.py b/heartbeat/scripts/post_process_config.py index e66d6a7d0e3..38d9dbcd634 100755 --- a/heartbeat/scripts/post_process_config.py +++ b/heartbeat/scripts/post_process_config.py @@ -22,6 +22,10 @@ if m: section_name = m.group(1) if section_name == "Processors": + output += line # include section name in output + output += "processors:\n" + output += " - add_observer_metadata: ~\n" + output += "\n\n" inside_processor_section = True else: inside_processor_section = False diff --git a/libbeat/cmd/instance/imports.go b/libbeat/cmd/instance/imports.go index 70cb46a49e7..413961edd47 100644 --- a/libbeat/cmd/instance/imports.go +++ b/libbeat/cmd/instance/imports.go @@ -29,6 +29,7 @@ import ( _ "github.com/elastic/beats/libbeat/processors/add_host_metadata" _ "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata" _ "github.com/elastic/beats/libbeat/processors/add_locale" + _ "github.com/elastic/beats/libbeat/processors/add_observer_metadata" _ "github.com/elastic/beats/libbeat/processors/add_process_metadata" _ "github.com/elastic/beats/libbeat/processors/communityid" _ "github.com/elastic/beats/libbeat/processors/dissect" diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 139f06160b9..9a44d923fc5 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -198,6 +198,7 @@ The supported processors are: * <> * <> * <> + * <> * <> * <> * <> @@ -1173,7 +1174,7 @@ It has the following settings: The `add_host_metadata` processor annotates each event with relevant metadata from the host machine. -The fields added to the event are looking as following: +The fields added to the event look like the following: [source,json] ------------------------------------------------------------------------------- @@ -1205,6 +1206,80 @@ The fields added to the event are looking as following: } ------------------------------------------------------------------------------- +[[add-observer-metadata]] +=== Add Observer metadata + +beta[] + +[source,yaml] +------------------------------------------------------------------------------- +processors: +- add_observer_metadata: + netinfo.enabled: false + cache.ttl: 5m + geo: + name: nyc-dc1-rack1 + location: 40.7128, -74.0060 + continent_name: North America + country_iso_code: US + region_name: New York + region_iso_code: NY + city_name: New York +------------------------------------------------------------------------------- + +It has the following settings: + +`netinfo.enabled`:: (Optional) Default false. Include IP addresses and MAC addresses as fields observer.ip and observer.mac + +`cache.ttl`:: (Optional) The processor uses an internal cache for the observer metadata. This sets the cache expiration time. The default is 5m, negative values disable caching altogether. + +`geo.name`:: User definable token to be used for identifying a discrete location. Frequently a datacenter, rack, or similar. + +`geo.location`:: Longitude and latitude in comma separated format. + +`geo.continent_name`:: Name of the continent. + +`geo.country_name`:: Name of the country. + +`geo.region_name`:: Name of the region. + +`geo.city_name`:: Name of the city. + +`geo.country_iso_code`:: ISO country code. + +`geo.region_iso_code`:: ISO region code. + + +The `add_geo_metadata` processor annotates each event with relevant metadata from the observer machine. +The fields added to the event look like the following: + +[source,json] +------------------------------------------------------------------------------- +{ + "observer" : { + "hostname" : "avce", + "type" : "heartbeat", + "vendor" : "elastic", + "ip" : [ + "192.168.1.251", + "fe80::64b2:c3ff:fe5b:b974", + ], + "mac" : [ + "dc:c1:02:6f:1b:ed", + ], + "geo": { + "continent_name": "North America", + "country_iso_code": "US", + "region_name": "New York", + "region_iso_code": "NY", + "city_name": "New York", + "name": "nyc-dc1-rack1", + "location": "40.7128, -74.0060" + } + } +} +------------------------------------------------------------------------------- + [[dissect]] === Dissect strings diff --git a/libbeat/processors/add_host_metadata/add_host_metadata.go b/libbeat/processors/add_host_metadata/add_host_metadata.go index 883d06fe225..1e1266deaf2 100644 --- a/libbeat/processors/add_host_metadata/add_host_metadata.go +++ b/libbeat/processors/add_host_metadata/add_host_metadata.go @@ -19,12 +19,9 @@ package add_host_metadata import ( "fmt" - "net" - "regexp" "sync" "time" - "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/beat" @@ -32,6 +29,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/metric/system/host" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/util" "github.com/elastic/go-sysinfo" ) @@ -67,40 +65,9 @@ func New(cfg *common.Config) (processors.Processor, error) { p.loadData() if config.Geo != nil { - if len(config.Geo.Location) > 0 { - // Regexp matching a number with an optional decimal component - // Valid numbers: '123', '123.23', etc. - latOrLon := `\-?\d+(\.\d+)?` - - // Regexp matching a pair of lat lon coordinates. - // e.g. 40.123, -92.929 - locRegexp := `^\s*` + // anchor to start of string with optional whitespace - latOrLon + // match the latitude - `\s*\,\s*` + // match the separator. optional surrounding whitespace - latOrLon + // match the longitude - `\s*$` //optional whitespace then end anchor - - if m, _ := regexp.MatchString(locRegexp, config.Geo.Location); !m { - return nil, errors.New(fmt.Sprintf("Invalid lat,lon string for add_host_metadata: %s", config.Geo.Location)) - } - } - - geoFields := common.MapStr{ - "name": config.Geo.Name, - "location": config.Geo.Location, - "continent_name": config.Geo.ContinentName, - "country_iso_code": config.Geo.CountryISOCode, - "region_name": config.Geo.RegionName, - "region_iso_code": config.Geo.RegionISOCode, - "city_name": config.Geo.CityName, - } - // Delete any empty values - blankStringMatch := regexp.MustCompile(`^\s*$`) - for k, v := range geoFields { - vStr := v.(string) - if blankStringMatch.MatchString(vStr) { - delete(geoFields, k) - } + geoFields, err := util.GeoConfigToMap(*config.Geo) + if err != nil { + return nil, err } p.geoData = common.MapStr{"host": common.MapStr{"geo": geoFields}} } @@ -151,7 +118,7 @@ func (p *addHostMetadata) loadData() error { data := host.MapHostInfo(h.Info()) if p.config.NetInfoEnabled { // IP-address and MAC-address - var ipList, hwList, err = p.getNetInfo() + var ipList, hwList, err = util.GetNetInfo() if err != nil { logp.Info("Error when getting network information %v", err) } @@ -171,51 +138,6 @@ func (p *addHostMetadata) loadData() error { return nil } -func (p *addHostMetadata) getNetInfo() ([]string, []string, error) { - var ipList []string - var hwList []string - - // Get all interfaces and loop through them - ifaces, err := net.Interfaces() - if err != nil { - return nil, nil, err - } - - // Keep track of all errors - var errs multierror.Errors - - for _, i := range ifaces { - // Skip loopback interfaces - if i.Flags&net.FlagLoopback == net.FlagLoopback { - continue - } - - hw := i.HardwareAddr.String() - // Skip empty hardware addresses - if hw != "" { - hwList = append(hwList, hw) - } - - addrs, err := i.Addrs() - if err != nil { - // If we get an error, keep track of it and continue with the next interface - errs = append(errs, err) - continue - } - - for _, addr := range addrs { - switch v := addr.(type) { - case *net.IPNet: - ipList = append(ipList, v.IP.String()) - case *net.IPAddr: - ipList = append(ipList, v.IP.String()) - } - } - } - - return ipList, hwList, errs.Err() -} - func (p *addHostMetadata) String() string { return fmt.Sprintf("%v=[netinfo.enabled=[%v], cache.ttl=[%v]]", processorName, p.config.NetInfoEnabled, p.config.CacheTTL) diff --git a/libbeat/processors/add_host_metadata/add_host_metadata_test.go b/libbeat/processors/add_host_metadata/add_host_metadata_test.go index 750fbc4670a..57f4358b65d 100644 --- a/libbeat/processors/add_host_metadata/add_host_metadata_test.go +++ b/libbeat/processors/add_host_metadata/add_host_metadata_test.go @@ -23,9 +23,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -169,77 +168,31 @@ func TestConfigGeoEnabled(t *testing.T) { newEvent, err := p.Run(event) assert.NoError(t, err) - for configKey, configValue := range config { - t.Run(fmt.Sprintf("Check of %s", configKey), func(t *testing.T) { - v, err := newEvent.GetValue(fmt.Sprintf("host.%s", configKey)) - assert.NoError(t, err) - assert.Equal(t, configValue, v, "Could not find in %s", newEvent) - }) - } + eventGeoField, err := newEvent.GetValue("host.geo") + require.NoError(t, err) + + assert.Len(t, eventGeoField, len(config)) } -func TestPartialGeo(t *testing.T) { +func TestConfigGeoDisabled(t *testing.T) { event := &beat.Event{ Fields: common.MapStr{}, Timestamp: time.Now(), } - config := map[string]interface{}{ - "geo.name": "yerevan-am", - "geo.city_name": " ", - } + config := map[string]interface{}{} testConfig, err := common.NewConfigFrom(config) - assert.NoError(t, err) + require.NoError(t, err) p, err := New(testConfig) require.NoError(t, err) newEvent, err := p.Run(event) - assert.NoError(t, err) - - v, err := newEvent.Fields.GetValue("host.geo.name") - assert.NoError(t, err) - assert.Equal(t, "yerevan-am", v) - - missing := []string{"continent_name", "country_name", "country_iso_code", "region_name", "region_iso_code", "city_name"} - - for _, k := range missing { - path := "host.geo." + k - v, err = newEvent.Fields.GetValue(path) - - assert.Equal(t, common.ErrKeyNotFound, err, "din expect to find %v", path) - } -} -func TestGeoLocationValidation(t *testing.T) { - locations := []struct { - str string - valid bool - }{ - {"40.177200, 44.503490", true}, - {"-40.177200, -44.503490", true}, - {"garbage", false}, - {"9999999999", false}, - } - - for _, location := range locations { - t.Run(fmt.Sprintf("Location %s validation should be %t", location.str, location.valid), func(t *testing.T) { - - conf, err := common.NewConfigFrom(map[string]interface{}{ - "geo": map[string]interface{}{ - "location": location.str, - }, - }) - require.NoError(t, err) - - _, err = New(conf) + require.NoError(t, err) - if location.valid { - require.NoError(t, err) - } else { - require.Error(t, err) - } - }) - } + eventGeoField, err := newEvent.GetValue("host.geo") + assert.Error(t, err) + assert.Equal(t, nil, eventGeoField) } diff --git a/libbeat/processors/add_host_metadata/config.go b/libbeat/processors/add_host_metadata/config.go index 03fff0c6b98..6a3940019c9 100644 --- a/libbeat/processors/add_host_metadata/config.go +++ b/libbeat/processors/add_host_metadata/config.go @@ -19,25 +19,16 @@ package add_host_metadata import ( "time" + + "github.com/elastic/beats/libbeat/processors/util" ) // Config for add_host_metadata processor. type Config struct { - NetInfoEnabled bool `config:"netinfo.enabled"` // Add IP and MAC to event - CacheTTL time.Duration `config:"cache.ttl"` - Geo *GeoConfig `config:"geo"` - Name string `config:"name"` -} - -// GeoConfig contains geo configuration data. -type GeoConfig struct { - Name string `config:"name"` - Location string `config:"location"` - ContinentName string `config:"continent_name"` - CountryISOCode string `config:"country_iso_code"` - RegionName string `config:"region_name"` - RegionISOCode string `config:"region_iso_code"` - CityName string `config:"city_name"` + NetInfoEnabled bool `config:"netinfo.enabled"` // Add IP and MAC to event + CacheTTL time.Duration `config:"cache.ttl"` + Geo *util.GeoConfig `config:"geo"` + Name string `config:"name"` } func defaultConfig() Config { diff --git a/libbeat/processors/add_observer_metadata/add_observer_metadata.go b/libbeat/processors/add_observer_metadata/add_observer_metadata.go new file mode 100644 index 00000000000..050e971dd97 --- /dev/null +++ b/libbeat/processors/add_observer_metadata/add_observer_metadata.go @@ -0,0 +1,154 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_observer_metadata + +import ( + "fmt" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/util" + "github.com/elastic/go-sysinfo" +) + +func init() { + processors.RegisterPlugin("add_observer_metadata", New) +} + +type observerMetadata struct { + lastUpdate struct { + time.Time + sync.Mutex + } + data common.MapStrPointer + geoData common.MapStr + config Config +} + +const ( + processorName = "add_observer_metadata" +) + +// New creates a new instance of the add_observer_metadata processor. +func New(cfg *common.Config) (processors.Processor, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, errors.Wrapf(err, "fail to unpack the %v configuration", processorName) + } + + p := &observerMetadata{ + config: config, + data: common.NewMapStrPointer(nil), + } + p.loadData() + + if config.Geo != nil { + geoFields, err := util.GeoConfigToMap(*config.Geo) + if err != nil { + return nil, err + } + + p.geoData = common.MapStr{"observer": common.MapStr{"geo": geoFields}} + } + + return p, nil +} + +// Run enriches the given event with the observer meta data +func (p *observerMetadata) Run(event *beat.Event) (*beat.Event, error) { + err := p.loadData() + if err != nil { + return nil, err + } + + keyExists, _ := event.Fields.HasKey("observer") + + if p.config.Overwrite || !keyExists { + if p.config.Overwrite { + event.Fields.Delete("observer") + } + event.Fields.DeepUpdate(p.data.Get().Clone()) + + if len(p.geoData) > 0 { + event.Fields.DeepUpdate(p.geoData) + } + } + + return event, nil +} + +func (p *observerMetadata) expired() bool { + if p.config.CacheTTL <= 0 { + return true + } + + p.lastUpdate.Lock() + defer p.lastUpdate.Unlock() + + if p.lastUpdate.Add(p.config.CacheTTL).After(time.Now()) { + return false + } + p.lastUpdate.Time = time.Now() + return true +} + +func (p *observerMetadata) loadData() error { + if !p.expired() { + return nil + } + + h, err := sysinfo.Host() + if err != nil { + return err + } + + hostInfo := h.Info() + data := common.MapStr{ + "observer": common.MapStr{ + "hostname": hostInfo.Hostname, + }, + } + if p.config.NetInfoEnabled { + // IP-address and MAC-address + var ipList, hwList, err = util.GetNetInfo() + if err != nil { + logp.Info("Error when getting network information %v", err) + } + + if len(ipList) > 0 { + data.Put("observer.ip", ipList) + } + if len(hwList) > 0 { + data.Put("observer.mac", hwList) + } + } + + p.data.Set(data) + return nil +} + +func (p *observerMetadata) String() string { + return fmt.Sprintf("%v=[netinfo.enabled=[%v], cache.ttl=[%v]]", + processorName, p.config.NetInfoEnabled, p.config.CacheTTL) +} diff --git a/libbeat/processors/add_observer_metadata/add_observer_metadata_test.go b/libbeat/processors/add_observer_metadata/add_observer_metadata_test.go new file mode 100644 index 00000000000..e686388ccc6 --- /dev/null +++ b/libbeat/processors/add_observer_metadata/add_observer_metadata_test.go @@ -0,0 +1,164 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_observer_metadata + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestConfigDefault(t *testing.T) { + event := &beat.Event{ + Fields: common.MapStr{}, + Timestamp: time.Now(), + } + testConfig, err := common.NewConfigFrom(map[string]interface{}{}) + assert.NoError(t, err) + + p, err := New(testConfig) + + newEvent, err := p.Run(event) + assert.NoError(t, err) + + v, err := newEvent.GetValue("observer.ip") + assert.Error(t, err) + assert.Nil(t, v) + + v, err = newEvent.GetValue("observer.mac") + assert.Error(t, err) + assert.Nil(t, v) +} + +func TestOverwriteFalse(t *testing.T) { + event := &beat.Event{ + Fields: common.MapStr{"observer": common.MapStr{"foo": "bar"}}, + Timestamp: time.Now(), + } + testConfig, err := common.NewConfigFrom(map[string]interface{}{}) + require.NoError(t, err) + + p, err := New(testConfig) + + newEvent, err := p.Run(event) + require.NoError(t, err) + + v, err := newEvent.GetValue("observer") + require.NoError(t, err) + assert.Equal(t, common.MapStr{"foo": "bar"}, v) +} + +func TestOverwriteTrue(t *testing.T) { + event := &beat.Event{ + Fields: common.MapStr{"observer": common.MapStr{"foo": "bar"}}, + Timestamp: time.Now(), + } + testConfig, err := common.NewConfigFrom(map[string]interface{}{"overwrite": true}) + require.NoError(t, err) + + p, err := New(testConfig) + + newEvent, err := p.Run(event) + require.NoError(t, err) + + v, err := newEvent.GetValue("observer.hostname") + require.NoError(t, err) + assert.NotNil(t, v) +} + +func TestConfigNetInfoEnabled(t *testing.T) { + event := &beat.Event{ + Fields: common.MapStr{}, + Timestamp: time.Now(), + } + testConfig, err := common.NewConfigFrom(map[string]interface{}{ + "netinfo.enabled": true, + }) + assert.NoError(t, err) + + p, err := New(testConfig) + + newEvent, err := p.Run(event) + assert.NoError(t, err) + + v, err := newEvent.GetValue("observer.ip") + assert.NoError(t, err) + assert.NotNil(t, v) + + v, err = newEvent.GetValue("observer.mac") + assert.NoError(t, err) + assert.NotNil(t, v) +} + +func TestConfigGeoEnabled(t *testing.T) { + event := &beat.Event{ + Fields: common.MapStr{}, + Timestamp: time.Now(), + } + + config := map[string]interface{}{ + "geo.name": "yerevan-am", + "geo.location": "40.177200, 44.503490", + "geo.continent_name": "Asia", + "geo.country_iso_code": "AM", + "geo.region_name": "Erevan", + "geo.region_iso_code": "AM-ER", + "geo.city_name": "Yerevan", + } + + testConfig, err := common.NewConfigFrom(config) + assert.NoError(t, err) + + p, err := New(testConfig) + require.NoError(t, err) + + newEvent, err := p.Run(event) + assert.NoError(t, err) + + eventGeoField, err := newEvent.GetValue("observer.geo") + require.NoError(t, err) + + assert.Len(t, eventGeoField, len(config)) +} + +func TestConfigGeoDisabled(t *testing.T) { + event := &beat.Event{ + Fields: common.MapStr{}, + Timestamp: time.Now(), + } + + config := map[string]interface{}{} + + testConfig, err := common.NewConfigFrom(config) + require.NoError(t, err) + + p, err := New(testConfig) + require.NoError(t, err) + + newEvent, err := p.Run(event) + require.NoError(t, err) + + eventGeoField, err := newEvent.GetValue("observer.geo") + assert.Error(t, err) + assert.Equal(t, nil, eventGeoField) +} diff --git a/libbeat/processors/add_observer_metadata/config.go b/libbeat/processors/add_observer_metadata/config.go new file mode 100644 index 00000000000..111ed9a3f1d --- /dev/null +++ b/libbeat/processors/add_observer_metadata/config.go @@ -0,0 +1,39 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_observer_metadata + +import ( + "time" + + "github.com/elastic/beats/libbeat/processors/util" +) + +// Config for add_host_metadata processor. +type Config struct { + Overwrite bool `config:"overwrite"` // Overwrite if observer fields already exist + NetInfoEnabled bool `config:"netinfo.enabled"` // Add IP and MAC to event + CacheTTL time.Duration `config:"cache.ttl"` + Geo *util.GeoConfig `config:"geo"` +} + +func defaultConfig() Config { + return Config{ + NetInfoEnabled: false, + CacheTTL: 5 * time.Minute, + } +} diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go index 4ca42788a97..89f2fde9a28 100644 --- a/libbeat/processors/processor.go +++ b/libbeat/processors/processor.go @@ -71,7 +71,12 @@ func New(config PluginConfig) (*Processors, error) { gen, exists := registry.reg[actionName] if !exists { - return nil, errors.Errorf("the processor action %s does not exist", actionName) + var validActions []string + for k := range registry.reg { + validActions = append(validActions, k) + + } + return nil, errors.Errorf("the processor action %s does not exist. Valid actions: %v", actionName, strings.Join(validActions, ", ")) } actionCfg.PrintDebugf("Configure processor action '%v' with:", actionName) diff --git a/libbeat/processors/script/javascript/module/processor/processor.go b/libbeat/processors/script/javascript/module/processor/processor.go index 9acb38cfed5..6cc9a4fdd12 100644 --- a/libbeat/processors/script/javascript/module/processor/processor.go +++ b/libbeat/processors/script/javascript/module/processor/processor.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/libbeat/processors/add_host_metadata" "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata" "github.com/elastic/beats/libbeat/processors/add_locale" + "github.com/elastic/beats/libbeat/processors/add_observer_metadata" "github.com/elastic/beats/libbeat/processors/add_process_metadata" "github.com/elastic/beats/libbeat/processors/communityid" "github.com/elastic/beats/libbeat/processors/dissect" @@ -44,6 +45,7 @@ var constructors = map[string]processors.Constructor{ "AddFields": actions.CreateAddFields, "AddHostMetadata": add_host_metadata.New, "AddKubernetesMetadata": add_kubernetes_metadata.New, + "AddObserverMetadata": add_observer_metadata.New, "AddLocale": add_locale.New, "AddProcessMetadata": add_process_metadata.New, "CommunityID": communityid.New, diff --git a/libbeat/processors/util/geo.go b/libbeat/processors/util/geo.go new file mode 100644 index 00000000000..d8decf0de9c --- /dev/null +++ b/libbeat/processors/util/geo.go @@ -0,0 +1,77 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package util + +import ( + "fmt" + "regexp" + + "github.com/elastic/beats/libbeat/common" +) + +// GeoConfig contains geo configuration data. +type GeoConfig struct { + Name string `config:"name"` + Location string `config:"location"` + ContinentName string `config:"continent_name"` + CountryISOCode string `config:"country_iso_code"` + RegionName string `config:"region_name"` + RegionISOCode string `config:"region_iso_code"` + CityName string `config:"city_name"` +} + +// GeoConfigToMap converts `geo` sections to a `common.MapStr`. +func GeoConfigToMap(config GeoConfig) (common.MapStr, error) { + if len(config.Location) > 0 { + // Regexp matching a number with an optional decimal component + // Valid numbers: '123', '123.23', etc. + latOrLon := `\-?\d+(\.\d+)?` + + // Regexp matching a pair of lat lon coordinates. + // e.g. 40.123, -92.929 + locRegexp := `^\s*` + // anchor to start of string with optional whitespace + latOrLon + // match the latitude + `\s*\,\s*` + // match the separator. optional surrounding whitespace + latOrLon + // match the longitude + `\s*$` //optional whitespace then end anchor + + if m, _ := regexp.MatchString(locRegexp, config.Location); !m { + return nil, fmt.Errorf("Invalid lat,lon string for add_observer_metadata: %s", config.Location) + } + } + + geoFields := common.MapStr{ + "name": config.Name, + "location": config.Location, + "continent_name": config.ContinentName, + "country_iso_code": config.CountryISOCode, + "region_name": config.RegionName, + "region_iso_code": config.RegionISOCode, + "city_name": config.CityName, + } + // Delete any empty values + blankStringMatch := regexp.MustCompile(`^\s*$`) + for k, v := range geoFields { + vStr := v.(string) + if blankStringMatch.MatchString(vStr) { + delete(geoFields, k) + } + } + + return geoFields, nil +} diff --git a/libbeat/processors/util/geo_test.go b/libbeat/processors/util/geo_test.go new file mode 100644 index 00000000000..cd7334c766c --- /dev/null +++ b/libbeat/processors/util/geo_test.go @@ -0,0 +1,110 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package util + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/common" +) + +// parseGeoConfig converts the map into a GeoConfig. +// Going through go-ucfg we test the config to struct transform / validation. +func parseConfig(t *testing.T, configMap map[string]interface{}) GeoConfig { + config, err := common.NewConfigFrom(configMap) + require.NoError(t, err) + + geoConfig := GeoConfig{} + err = config.Unpack(&geoConfig) + require.NoError(t, err) + + return geoConfig +} + +func TestConfigGeoEnabled(t *testing.T) { + config := map[string]interface{}{ + "name": "yerevan-am", + "location": "40.177200, 44.503490", + "continent_name": "Asia", + "country_iso_code": "AM", + "region_name": "Erevan", + "region_iso_code": "AM-ER", + "city_name": "Yerevan", + } + + geoMap, err := GeoConfigToMap(parseConfig(t, config)) + require.NoError(t, err) + + for configKey, configValue := range config { + t.Run(fmt.Sprintf("Check of %s", configKey), func(t *testing.T) { + v, ok := geoMap[configKey] + assert.True(t, ok, "key has entry") + assert.Equal(t, configValue, v) + }) + } +} + +func TestPartialGeo(t *testing.T) { + config := map[string]interface{}{ + "name": "yerevan-am", + "city_name": " ", + } + + geoMap, err := GeoConfigToMap(parseConfig(t, config)) + require.NoError(t, err) + + assert.Equal(t, "yerevan-am", geoMap["name"]) + + missing := []string{"continent_name", "country_name", "country_iso_code", "region_name", "region_iso_code", "city_name"} + + for _, k := range missing { + _, exists := geoMap[k] + assert.False(t, exists, "key should %s should not exist", k) + } +} + +func TestGeoLocationValidation(t *testing.T) { + locations := []struct { + str string + valid bool + }{ + {"40.177200, 44.503490", true}, + {"-40.177200, -44.503490", true}, + {"garbage", false}, + {"9999999999", false}, + } + + for _, location := range locations { + t.Run(fmt.Sprintf("Location %s validation should be %t", location.str, location.valid), func(t *testing.T) { + + geoConfig := parseConfig(t, common.MapStr{"location": location.str}) + geoMap, err := GeoConfigToMap(geoConfig) + + if location.valid { + require.NoError(t, err) + require.Equal(t, location.str, geoMap["location"]) + } else { + require.Error(t, err) + } + }) + } +} diff --git a/libbeat/processors/util/netinfo.go b/libbeat/processors/util/netinfo.go new file mode 100644 index 00000000000..1d4464df3e0 --- /dev/null +++ b/libbeat/processors/util/netinfo.go @@ -0,0 +1,67 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package util + +import ( + "net" + + "github.com/joeshaw/multierror" +) + +// GetNetInfo returns lists of IPs and MACs for the machine it is executed on. +func GetNetInfo() (ipList []string, hwList []string, err error) { + // Get all interfaces and loop through them + ifaces, err := net.Interfaces() + if err != nil { + return nil, nil, err + } + + // Keep track of all errors + var errs multierror.Errors + + for _, i := range ifaces { + // Skip loopback interfaces + if i.Flags&net.FlagLoopback == net.FlagLoopback { + continue + } + + hw := i.HardwareAddr.String() + // Skip empty hardware addresses + if hw != "" { + hwList = append(hwList, hw) + } + + addrs, err := i.Addrs() + if err != nil { + // If we get an error, keep track of it and continue with the next interface + errs = append(errs, err) + continue + } + + for _, addr := range addrs { + switch v := addr.(type) { + case *net.IPNet: + ipList = append(ipList, v.IP.String()) + case *net.IPAddr: + ipList = append(ipList, v.IP.String()) + } + } + } + + return ipList, hwList, errs.Err() +}