Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for CRI-O based logs autodiscover #10687

Merged
merged 4 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...v7.0.0-beta1[Check the
- Added support for ingesting structured Elasticsearch server logs {pull}10428[10428]
- Populate more ECS fields in the Suricata module. {pull}10006[10006]
- Add module zeek. {issue}9931[9931] {pull}10034[10034]
- Add support for CRI-O based logs autodiscover {pull}10687[10687]

*Heartbeat*

Expand Down
6 changes: 4 additions & 2 deletions filebeat/autodiscover/builder/hints/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ func defaultConfig() config {
rawCfg := map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"ids": []string{
"${data.container.id}",
"paths": []string{
// To be able to use this builder with CRI-O replace paths with:
// /var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log
"/var/lib/docker/containers/${data.container.id}/*-json.log",
},
},
}
Expand Down
232 changes: 232 additions & 0 deletions filebeat/autodiscover/builder/hints/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,235 @@ func TestGenerateHints(t *testing.T) {

}
}

func TestGenerateHintsWithPaths(t *testing.T) {
tests := []struct {
msg string
event bus.Event
path string
len int
result common.MapStr
}{
{
msg: "Empty event hints should return default config",
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"pod": common.MapStr{
"name": "pod",
"uid": "12345",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
},
path: "/var/lib/docker/containers/${data.kubernetes.container.id}/*-json.log",
len: 1,
result: common.MapStr{
"type": "docker",
"containers": map[string]interface{}{
"paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"},
},
"close_timeout": "true",
},
},
{
msg: "Hint with processors config must have a processors in the input config",
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"pod": common.MapStr{
"name": "pod",
"uid": "12345",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"hints": common.MapStr{
"logs": common.MapStr{
"processors": common.MapStr{
"1": common.MapStr{
"dissect": common.MapStr{
"tokenizer": "%{key1} %{key2}",
},
},
"drop_event": common.MapStr{},
},
},
},
},
len: 1,
path: "/var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log",
result: common.MapStr{
"type": "docker",
"containers": map[string]interface{}{
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
"processors": []interface{}{
map[string]interface{}{
"dissect": map[string]interface{}{
"tokenizer": "%{key1} %{key2}",
},
},
map[string]interface{}{
"drop_event": nil,
},
},
},
},
{
msg: "Hint with module should attach input to its filesets",
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"pod": common.MapStr{
"name": "pod",
"uid": "12345",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"hints": common.MapStr{
"logs": common.MapStr{
"module": "apache2",
},
},
},
len: 1,
path: "/var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log",
result: common.MapStr{
"module": "apache2",
"error": map[string]interface{}{
"enabled": true,
"input": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"stream": "all",
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
},
},
"access": map[string]interface{}{
"enabled": true,
"input": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"stream": "all",
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
},
},
},
},
{
msg: "Hint with module should honor defined filesets",
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"pod": common.MapStr{
"name": "pod",
"uid": "12345",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"hints": common.MapStr{
"logs": common.MapStr{
"module": "apache2",
"fileset": "access",
},
},
},
len: 1,
path: "/var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log",
result: common.MapStr{
"module": "apache2",
"access": map[string]interface{}{
"enabled": true,
"input": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"stream": "all",
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
},
},
"error": map[string]interface{}{
"enabled": false,
"input": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"stream": "all",
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
},
},
},
},
}

for _, test := range tests {
cfg, _ := common.NewConfigFrom(map[string]interface{}{
"config": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"paths": []string{
test.path,
},
},
"close_timeout": "true",
},
})

// Configure path for modules access
abs, _ := filepath.Abs("../../..")
err := paths.InitPaths(&paths.Path{
Home: abs,
})

l, err := NewLogHints(cfg)
if err != nil {
t.Fatal(err)
}

cfgs := l.CreateConfig(test.event)
assert.Equal(t, len(cfgs), test.len, test.msg)
if test.len != 0 {
config := common.MapStr{}
err := cfgs[0].Unpack(&config)
assert.Nil(t, err, test.msg)

assert.Equal(t, test.result, config, test.msg)
}

}
}
3 changes: 3 additions & 0 deletions filebeat/input/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var defaultConfig = config{
}

type config struct {
// List of containers' log files to tail
Containers containers `config:"containers"`

// Partial configures the input to join partial lines
Expand All @@ -43,6 +44,8 @@ type containers struct {
IDs []string `config:"ids"`
Path string `config:"path"`

Paths []string `config:"paths"`
jsoriano marked this conversation as resolved.
Show resolved Hide resolved

// Stream can be all, stdout or stderr
Stream string `config:"stream"`
}
39 changes: 33 additions & 6 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,19 @@ func NewInput(
return nil, errors.Wrap(err, "reading docker input config")
}

// Docker input should make sure that no callers should ever pass empty strings as container IDs
// Docker input should make sure that no callers should ever pass empty strings as container IDs or paths
// Hence we explicitly make sure that we catch such things and print stack traces in the event of
// an invocation so that it can be fixed.
var ids []string
var ids, paths []string
for _, p := range config.Containers.Paths {
if p != "" {
paths = append(paths, p)
} else {
logger.Error("Docker paths can't be empty for Docker input config")
logger.Debugw("Empty path for Docker logfile was received", logp.Stack("stacktrace"))
}
}

for _, containerID := range config.Containers.IDs {
if containerID != "" {
ids = append(ids, containerID)
Expand All @@ -64,12 +73,23 @@ func NewInput(
}
}

if len(ids) == 0 {
return nil, errors.New("Docker input requires at least one entry under 'containers.ids'")
if len(ids) == 0 && len(paths) == 0 {
return nil, errors.New("Docker input requires at least one entry under 'containers.ids' or 'containers.paths'")
}

for idx, containerID := range ids {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
// IDs + Path and Paths are mutually exclusive. Ensure that only one of them are set in a given configuration
if len(ids) != 0 && len(paths) != 0 {
return nil, errors.New("can not provide both 'containers.ids' and 'containers.paths' in the same input config")
}

if len(ids) != 0 {
for idx, containerID := range ids {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
}
} else {
for idx, p := range paths {
cfg.SetString("paths", idx, p)
}
}

if err := checkStream(config.Containers.Stream); err != nil {
Expand All @@ -92,6 +112,13 @@ func NewInput(
return nil, errors.Wrap(err, "update input config")
}

if len(paths) != 0 {
// Set symlinks to true as CRI-O paths could point to symlinks instead of the actual path.
if err := cfg.SetBool("symlinks", -1, true); err != nil {
return nil, errors.Wrap(err, "update input config")
}
}

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
if context.Meta == nil {
Expand Down