Skip to content

Commit

Permalink
Add add_observer_metadata processor (#11394)
Browse files Browse the repository at this point in the history
Resolves #11379 via addition of new add_observer_metadata processor.

In addition to creating the processor this PR extracts out the common operations between add_observer_metadata and add_host_metadata for geo and netinfo fields into a new processors/util package.

Please note that the observer ECS field does not contain the same values that host does. See the ECS Observer Spec for more info.
  • Loading branch information
andrewvc authored Apr 24, 2019
1 parent 1e080fd commit 1d94462
Show file tree
Hide file tree
Showing 16 changed files with 731 additions and 159 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- New processor: `truncate_fields`. {pull}11297[11297]
- Allow a beat to ship monitoring data directly to an Elasticsearch monitoring clsuter. {pull}9260[9260]
- Updated go-seccomp-bpf library to v1.1.0 which updates syscall lists for Linux v5.0. {pull}NNNN[NNNN]
- Add `add_observer_metadata` processor. {pull}11394[11394]

*Auditbeat*

Expand All @@ -144,6 +145,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Heartbeat*

- Enable `add_observer_metadata` processor in default config. {pull}11394[11394]

*Journalbeat*

*Metricbeat*
Expand Down
5 changes: 5 additions & 0 deletions heartbeat/heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ output.elasticsearch:
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

#================================ Processors =====================================
processors:
- add_observer_metadata: ~


#================================ Logging =====================================

# Sets log level. The default log level is info.
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/scripts/post_process_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
if m:
section_name = m.group(1)
if section_name == "Processors":
output += line # include section name in output
output += "processors:\n"
output += " - add_observer_metadata: ~\n"
output += "\n\n"
inside_processor_section = True
else:
inside_processor_section = False
Expand Down
1 change: 1 addition & 0 deletions libbeat/cmd/instance/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
_ "github.com/elastic/beats/libbeat/processors/add_host_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_locale"
_ "github.com/elastic/beats/libbeat/processors/add_observer_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_process_metadata"
_ "github.com/elastic/beats/libbeat/processors/communityid"
_ "github.com/elastic/beats/libbeat/processors/dissect"
Expand Down
77 changes: 76 additions & 1 deletion libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ The supported processors are:
* <<add-docker-metadata,`add_docker_metadata`>>
* <<add-fields, `add_fields`>>
* <<add-host-metadata,`add_host_metadata`>>
* <<add-observer-metadata,`add_observer_metadata`>>
* <<add-kubernetes-metadata,`add_kubernetes_metadata`>>
* <<add-labels, `add_labels`>>
* <<add-locale,`add_locale`>>
Expand Down Expand Up @@ -1173,7 +1174,7 @@ It has the following settings:


The `add_host_metadata` processor annotates each event with relevant metadata from the host machine.
The fields added to the event are looking as following:
The fields added to the event look like the following:

[source,json]
-------------------------------------------------------------------------------
Expand Down Expand Up @@ -1205,6 +1206,80 @@ The fields added to the event are looking as following:
}
-------------------------------------------------------------------------------

[[add-observer-metadata]]
=== Add Observer metadata

beta[]

[source,yaml]
-------------------------------------------------------------------------------
processors:
- add_observer_metadata:
netinfo.enabled: false
cache.ttl: 5m
geo:
name: nyc-dc1-rack1
location: 40.7128, -74.0060
continent_name: North America
country_iso_code: US
region_name: New York
region_iso_code: NY
city_name: New York
-------------------------------------------------------------------------------

It has the following settings:

`netinfo.enabled`:: (Optional) Default false. Include IP addresses and MAC addresses as fields observer.ip and observer.mac

`cache.ttl`:: (Optional) The processor uses an internal cache for the observer metadata. This sets the cache expiration time. The default is 5m, negative values disable caching altogether.

`geo.name`:: User definable token to be used for identifying a discrete location. Frequently a datacenter, rack, or similar.

`geo.location`:: Longitude and latitude in comma separated format.

`geo.continent_name`:: Name of the continent.

`geo.country_name`:: Name of the country.

`geo.region_name`:: Name of the region.

`geo.city_name`:: Name of the city.

`geo.country_iso_code`:: ISO country code.

`geo.region_iso_code`:: ISO region code.


The `add_geo_metadata` processor annotates each event with relevant metadata from the observer machine.
The fields added to the event look like the following:

[source,json]
-------------------------------------------------------------------------------
{
"observer" : {
"hostname" : "avce",
"type" : "heartbeat",
"vendor" : "elastic",
"ip" : [
"192.168.1.251",
"fe80::64b2:c3ff:fe5b:b974",
],
"mac" : [
"dc:c1:02:6f:1b:ed",
],
"geo": {
"continent_name": "North America",
"country_iso_code": "US",
"region_name": "New York",
"region_iso_code": "NY",
"city_name": "New York",
"name": "nyc-dc1-rack1",
"location": "40.7128, -74.0060"
}
}
}
-------------------------------------------------------------------------------

[[dissect]]
=== Dissect strings

Expand Down
88 changes: 5 additions & 83 deletions libbeat/processors/add_host_metadata/add_host_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@ package add_host_metadata

import (
"fmt"
"net"
"regexp"
"sync"
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/metric/system/host"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/util"
"github.com/elastic/go-sysinfo"
)

Expand Down Expand Up @@ -67,40 +65,9 @@ func New(cfg *common.Config) (processors.Processor, error) {
p.loadData()

if config.Geo != nil {
if len(config.Geo.Location) > 0 {
// Regexp matching a number with an optional decimal component
// Valid numbers: '123', '123.23', etc.
latOrLon := `\-?\d+(\.\d+)?`

// Regexp matching a pair of lat lon coordinates.
// e.g. 40.123, -92.929
locRegexp := `^\s*` + // anchor to start of string with optional whitespace
latOrLon + // match the latitude
`\s*\,\s*` + // match the separator. optional surrounding whitespace
latOrLon + // match the longitude
`\s*$` //optional whitespace then end anchor

if m, _ := regexp.MatchString(locRegexp, config.Geo.Location); !m {
return nil, errors.New(fmt.Sprintf("Invalid lat,lon string for add_host_metadata: %s", config.Geo.Location))
}
}

geoFields := common.MapStr{
"name": config.Geo.Name,
"location": config.Geo.Location,
"continent_name": config.Geo.ContinentName,
"country_iso_code": config.Geo.CountryISOCode,
"region_name": config.Geo.RegionName,
"region_iso_code": config.Geo.RegionISOCode,
"city_name": config.Geo.CityName,
}
// Delete any empty values
blankStringMatch := regexp.MustCompile(`^\s*$`)
for k, v := range geoFields {
vStr := v.(string)
if blankStringMatch.MatchString(vStr) {
delete(geoFields, k)
}
geoFields, err := util.GeoConfigToMap(*config.Geo)
if err != nil {
return nil, err
}
p.geoData = common.MapStr{"host": common.MapStr{"geo": geoFields}}
}
Expand Down Expand Up @@ -151,7 +118,7 @@ func (p *addHostMetadata) loadData() error {
data := host.MapHostInfo(h.Info())
if p.config.NetInfoEnabled {
// IP-address and MAC-address
var ipList, hwList, err = p.getNetInfo()
var ipList, hwList, err = util.GetNetInfo()
if err != nil {
logp.Info("Error when getting network information %v", err)
}
Expand All @@ -171,51 +138,6 @@ func (p *addHostMetadata) loadData() error {
return nil
}

func (p *addHostMetadata) getNetInfo() ([]string, []string, error) {
var ipList []string
var hwList []string

// Get all interfaces and loop through them
ifaces, err := net.Interfaces()
if err != nil {
return nil, nil, err
}

// Keep track of all errors
var errs multierror.Errors

for _, i := range ifaces {
// Skip loopback interfaces
if i.Flags&net.FlagLoopback == net.FlagLoopback {
continue
}

hw := i.HardwareAddr.String()
// Skip empty hardware addresses
if hw != "" {
hwList = append(hwList, hw)
}

addrs, err := i.Addrs()
if err != nil {
// If we get an error, keep track of it and continue with the next interface
errs = append(errs, err)
continue
}

for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
ipList = append(ipList, v.IP.String())
case *net.IPAddr:
ipList = append(ipList, v.IP.String())
}
}
}

return ipList, hwList, errs.Err()
}

func (p *addHostMetadata) String() string {
return fmt.Sprintf("%v=[netinfo.enabled=[%v], cache.ttl=[%v]]",
processorName, p.config.NetInfoEnabled, p.config.CacheTTL)
Expand Down
71 changes: 12 additions & 59 deletions libbeat/processors/add_host_metadata/add_host_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -169,77 +168,31 @@ func TestConfigGeoEnabled(t *testing.T) {
newEvent, err := p.Run(event)
assert.NoError(t, err)

for configKey, configValue := range config {
t.Run(fmt.Sprintf("Check of %s", configKey), func(t *testing.T) {
v, err := newEvent.GetValue(fmt.Sprintf("host.%s", configKey))
assert.NoError(t, err)
assert.Equal(t, configValue, v, "Could not find in %s", newEvent)
})
}
eventGeoField, err := newEvent.GetValue("host.geo")
require.NoError(t, err)

assert.Len(t, eventGeoField, len(config))
}

func TestPartialGeo(t *testing.T) {
func TestConfigGeoDisabled(t *testing.T) {
event := &beat.Event{
Fields: common.MapStr{},
Timestamp: time.Now(),
}

config := map[string]interface{}{
"geo.name": "yerevan-am",
"geo.city_name": " ",
}
config := map[string]interface{}{}

testConfig, err := common.NewConfigFrom(config)
assert.NoError(t, err)
require.NoError(t, err)

p, err := New(testConfig)
require.NoError(t, err)

newEvent, err := p.Run(event)
assert.NoError(t, err)

v, err := newEvent.Fields.GetValue("host.geo.name")
assert.NoError(t, err)
assert.Equal(t, "yerevan-am", v)

missing := []string{"continent_name", "country_name", "country_iso_code", "region_name", "region_iso_code", "city_name"}

for _, k := range missing {
path := "host.geo." + k
v, err = newEvent.Fields.GetValue(path)

assert.Equal(t, common.ErrKeyNotFound, err, "din expect to find %v", path)
}
}

func TestGeoLocationValidation(t *testing.T) {
locations := []struct {
str string
valid bool
}{
{"40.177200, 44.503490", true},
{"-40.177200, -44.503490", true},
{"garbage", false},
{"9999999999", false},
}

for _, location := range locations {
t.Run(fmt.Sprintf("Location %s validation should be %t", location.str, location.valid), func(t *testing.T) {

conf, err := common.NewConfigFrom(map[string]interface{}{
"geo": map[string]interface{}{
"location": location.str,
},
})
require.NoError(t, err)

_, err = New(conf)
require.NoError(t, err)

if location.valid {
require.NoError(t, err)
} else {
require.Error(t, err)
}
})
}
eventGeoField, err := newEvent.GetValue("host.geo")
assert.Error(t, err)
assert.Equal(t, nil, eventGeoField)
}
Loading

0 comments on commit 1d94462

Please sign in to comment.