Skip to content

Commit

Permalink
Add source path matching to add_docker_metadata processor (#4495)
Browse files Browse the repository at this point in the history
* Add `extract_field` internal processor

* Add `match_source` to `add_docker_metadata` processor

This should be useful to enrich events coming from docker logs, as it
will parse the container id from the source path.

* Move common docker fields to libbeat

* Fix some messages and tests
  • Loading branch information
exekias authored and andrewkroh committed Jun 15, 2017
1 parent 5c12a5f commit 6bc3d4b
Show file tree
Hide file tree
Showing 20 changed files with 572 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
*Affecting all Beats*

- New cli subcommands interface. {pull}4420[4420]
- Allow source path matching in `add_docker_metadata` processor {pull}4495[4495]

*Filebeat*

Expand Down
42 changes: 42 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ grouped in the following categories:
* <<exported-fields-auditd>>
* <<exported-fields-beat>>
* <<exported-fields-cloud>>
* <<exported-fields-docker-processor>>
* <<exported-fields-icinga>>
* <<exported-fields-kubernetes-processor>>
* <<exported-fields-log>>
Expand Down Expand Up @@ -600,6 +601,47 @@ Name of the project in Google Cloud.
Region in which this host is running.
[[exported-fields-docker-processor]]
== docker Fields
beta[]
Docker stats collected from Docker.
[float]
=== docker.container.id
type: keyword
Unique container id.
[float]
=== docker.container.image
type: keyword
Name of the image the container was built on.
[float]
=== docker.container.name
type: keyword
Container name.
[float]
=== docker.container.labels
type: object
Image labels.
[[exported-fields-icinga]]
== Icinga Fields
Expand Down
4 changes: 3 additions & 1 deletion filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,10 @@ filebeat.prospectors:
#
#processors:
#- add_docker_metadata:
# host: "unix:///var/run/docker.sock"
# match_source: true
# match_source_index: 4
# match_fields: ["system.process.cgroup.id"]
# host: "unix:///var/run/docker.sock"
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
# # certificate_authority: "/etc/pki/root/ca.pem"
Expand Down
42 changes: 42 additions & 0 deletions heartbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ grouped in the following categories:
* <<exported-fields-beat>>
* <<exported-fields-cloud>>
* <<exported-fields-common>>
* <<exported-fields-docker-processor>>
* <<exported-fields-http>>
* <<exported-fields-icmp>>
* <<exported-fields-kubernetes-processor>>
Expand Down Expand Up @@ -255,6 +256,47 @@ required: True
Indicator if monitor could validate the service to be available.
[[exported-fields-docker-processor]]
== docker Fields
beta[]
Docker stats collected from Docker.
[float]
=== docker.container.id
type: keyword
Unique container id.
[float]
=== docker.container.image
type: keyword
Name of the image the container was built on.
[float]
=== docker.container.name
type: keyword
Container name.
[float]
=== docker.container.labels
type: object
Image labels.
[[exported-fields-http]]
== HTTP Monitor Fields
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/heartbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,10 @@ heartbeat.scheduler:
#
#processors:
#- add_docker_metadata:
# host: "unix:///var/run/docker.sock"
# match_source: true
# match_source_index: 4
# match_fields: ["system.process.cgroup.id"]
# host: "unix:///var/run/docker.sock"
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
# # certificate_authority: "/etc/pki/root/ca.pem"
Expand Down
4 changes: 3 additions & 1 deletion libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@
#
#processors:
#- add_docker_metadata:
# host: "unix:///var/run/docker.sock"
# match_source: true
# match_source_index: 4
# match_fields: ["system.process.cgroup.id"]
# host: "unix:///var/run/docker.sock"
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
# # certificate_authority: "/etc/pki/root/ca.pem"
Expand Down
32 changes: 21 additions & 11 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -573,20 +573,30 @@ from Docker containers:
* Image
* Labels

Currently it supports enriching events by matching existing fields with the
container ID in them, for example, cgroup IDs retrieved by metricbeat system
module:

[source,yaml]
-------------------------------------------------------------------------------
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
match_fields: ["system.process.cgroup.id"]
host: "unix:///var/run/docker.sock"
# To connect to Docker over TLS you must specify a client and CA certificate.
#ssl:
# certificate_authority: "/etc/pki/root/ca.pem"
# certificate: "/etc/pki/client/cert.pem"
# key: "/etc/pki/client/cert.key"
match_source: true
match_source_index: 4
# To connect to Docker over TLS you must specify a client and CA certificate.
#ssl:
# certificate_authority: "/etc/pki/root/ca.pem"
# certificate: "/etc/pki/client/cert.pem"
# key: "/etc/pki/client/cert.key"
-------------------------------------------------------------------------------

It has the following settings:

`host`:: (Optional) Docker socket (UNIX or TCP socket). It uses
`unix:///var/run/docker.sock` by default.
`match_fields`:: (Optional) A list of fields to match a container id, at least
one of them should hold a container id to get the event enriched.
`match_source`:: (Optional) Match container id from a log path present in
`source` field. Enabled by default.
`match_source_index`:: (Optional) Index in the source path split by / to look
for container id. It defaults to 4 to match
`/var/lib/docker/containers/<container_id>/*.log`
91 changes: 91 additions & 0 deletions libbeat/processors/actions/extract_field.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package actions

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

type extract_field struct {
Field string
Separator string
Index int
Target string
}

/*
This one won't be registered (yet)
func init() {
processors.RegisterPlugin("extract_field",
configChecked(NewExtractField,
requireFields("field", "separator", "index", "target"),
allowedFields("field", "separator", "index", "target", "when")))
}
*/

func NewExtractField(c common.Config) (processors.Processor, error) {
config := struct {
Field string `config:"field"`
Separator string `config:"separator"`
Index int `config:"index"`
Target string `config:"target"`
}{}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the extract_field configuration: %s", err)
}

/* remove read only fields */
for _, readOnly := range processors.MandatoryExportedFields {
if config.Field == readOnly {
return nil, fmt.Errorf("%s is a read only field, cannot override", readOnly)
}
}

f := extract_field{
Field: config.Field,
Separator: config.Separator,
Index: config.Index,
Target: config.Target,
}
return f, nil
}

func (f extract_field) Run(event common.MapStr) (common.MapStr, error) {
fieldValue, err := event.GetValue(f.Field)
if err != nil {
return nil, fmt.Errorf("error getting field '%s' from event", f.Field)
}

value, ok := fieldValue.(string)
if !ok {
return nil, fmt.Errorf("could not get a string from field '%s'", f.Field)
}

parts := strings.Split(value, f.Separator)
parts = deleteEmpty(parts)
if len(parts) < f.Index+1 {
return nil, fmt.Errorf("index is out of range for field '%s'", f.Field)
}

event.Put(f.Target, parts[f.Index])

return event, nil
}

func (f extract_field) String() string {
return "extract_field=" + f.Target
}

func deleteEmpty(s []string) []string {
var r []string
for _, str := range s {
if str != "" {
r = append(r, str)
}
}
return r
}
90 changes: 90 additions & 0 deletions libbeat/processors/actions/extract_field_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package actions

import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/stretchr/testify/assert"
)

func TestCommonPaths(t *testing.T) {
var tests = []struct {
Value, Field, Separator, Target, Result string
Index int
}{
// Common docker case
{
Value: "/var/lib/docker/containers/f1510836197d7c34da22cf796dba5640f87c04de5c95cf0adc11b85f1e1c1528/f1510836197d7c34da22cf796dba5640f87c04de5c95cf0adc11b85f1e1c1528-json.log",
Field: "source",
Separator: "/",
Target: "docker.container.id",
Index: 4,
Result: "f1510836197d7c34da22cf796dba5640f87c04de5c95cf0adc11b85f1e1c1528",
},
{
Value: "/var/lib/foo/bar",
Field: "other_field",
Separator: "/",
Target: "destination",
Index: 3,
Result: "bar",
},
{
Value: "-var-lib-foo-bar",
Field: "source",
Separator: "-",
Target: "destination",
Index: 2,
Result: "foo",
},
{
Value: "*var*lib*foo*bar",
Field: "source",
Separator: "*",
Target: "destination",
Index: 0,
Result: "var",
},
}

for _, test := range tests {
var testConfig, _ = common.NewConfigFrom(map[string]interface{}{
"field": test.Field,
"separator": test.Separator,
"index": test.Index,
"target": test.Target,
})

// Configure input to
input := common.MapStr{
test.Field: test.Value,
}

actual := runExtractField(t, testConfig, input)

result, err := actual.GetValue(test.Target)
if err != nil {
t.Fatalf("could not get target field: %s", err)
}
assert.Equal(t, result.(string), test.Result)
}
}

func runExtractField(t *testing.T, config *common.Config, input common.MapStr) common.MapStr {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"})
}

p, err := NewExtractField(*config)
if err != nil {
t.Fatalf("error initializing extract_field: %s", err)
}

actual, err := p.Run(input)
if err != nil {
t.Fatalf("error running extract_field: %s", err)
}

return actual
}
Loading

0 comments on commit 6bc3d4b

Please sign in to comment.