From 686f3fec9bb372e3308163535122b292b98981c1 Mon Sep 17 00:00:00 2001 From: fanhe Date: Wed, 29 Jan 2020 15:24:51 -0800 Subject: [PATCH 01/13] enrich container id from process id --- .../add_process_metadata.go | 40 +++++++++--- .../add_process_metadata_test.go | 52 +++++++++++++++- .../processors/add_process_metadata/config.go | 4 ++ .../docs/add_process_metadata.asciidoc | 12 ++++ .../gosiger_cid_provider.go | 62 +++++++++++++++++++ .../add_process_metadata/test_cid_provider.go | 28 +++++++++ 6 files changed, 188 insertions(+), 10 deletions(-) create mode 100644 libbeat/processors/add_process_metadata/gosiger_cid_provider.go create mode 100644 libbeat/processors/add_process_metadata/test_cid_provider.go diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 4b6270c9f6e..e6b8e0b7fa8 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -35,6 +35,8 @@ import ( const ( processorName = "add_process_metadata" cacheExpiration = time.Second * 30 + hostPath = "/" + cgroupPrefix = "/kubepods" ) var ( @@ -47,14 +49,17 @@ var ( procCache = newProcessCache(cacheExpiration, gosysinfoProvider{}) + gCidProvider = newCidProvider(hostPath, cgroupPrefix) + instanceID atomic.Uint32 ) type addProcessMetadata struct { - config config - provider processMetadataProvider - log *logp.Logger - mappings common.MapStr + config config + provider processMetadataProvider + cidProvider cidProvider + log *logp.Logger + mappings common.MapStr } type processMetadata struct { @@ -71,6 +76,10 @@ type processMetadataProvider interface { GetProcessMetadata(pid int) (*processMetadata, error) } +type cidProvider interface { + GetCid(pid int) (string, error) +} + func init() { processors.RegisterPlugin(processorName, New) jsprocessor.RegisterPlugin("AddProcessMetadata", New) @@ -78,10 +87,10 @@ func init() { // New constructs a new add_process_metadata processor. func New(cfg *common.Config) (processors.Processor, error) { - return newProcessMetadataProcessorWithProvider(cfg, &procCache) + return newProcessMetadataProcessorWithProvider(cfg, &procCache, gCidProvider) } -func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider) (proc processors.Processor, err error) { +func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider, cidProvider cidProvider) (proc processors.Processor, err error) { // Logging (each processor instance has a unique ID). var ( id = int(instanceID.Inc()) @@ -94,9 +103,10 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces } p := addProcessMetadata{ - config: config, - provider: provider, - log: log, + config: config, + provider: provider, + cidProvider: cidProvider, + log: log, } if p.mappings, err = config.getMappings(); err != nil { return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName) @@ -177,6 +187,18 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul return nil, err } } + + // enrich with the container id, if include_cid is set to true + if p.config.IncludeCid { + cid, err := p.cidProvider.GetCid(pid) + if err != nil { + return nil, err + } + if _, err = result.Put("cid", cid); err != nil { + return nil, err + } + } + return result, nil } diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index 16d3f38fa71..0b06ac005da 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -51,6 +51,10 @@ func TestAddProcessMetadata(t *testing.T) { }, } + tCidProvider := testCidProvider{ + 1: "test-cid", + } + for _, test := range []struct { description string config, event, expected common.MapStr @@ -385,13 +389,59 @@ func TestAddProcessMetadata(t *testing.T) { }, err: ErrNoProcess, }, + { + description: "env field (IncludeCid: true)", + config: common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + "include_cid": true, + }, + event: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + }, + expected: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + "process": common.MapStr{ + "name": "systemd", + "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", + "executable": "/usr/lib/systemd/systemd", + "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, + "pid": 1, + "ppid": 0, + "start_time": startTime, + }, + "cid": "test-cid", + }, + }, + { + description: "env field (IncludeCid: true), process not found", + config: common.MapStr{ + "match_pids": []string{"ppid"}, + "include_cid": true, + }, + event: common.MapStr{ + "ppid": 42, + }, + expected: common.MapStr{ + "ppid": 42, + }, + err: ErrNoProcess, + }, } { t.Run(test.description, func(t *testing.T) { config, err := common.NewConfigFrom(test.config) if err != nil { t.Fatal(err) } - proc, err := newProcessMetadataProcessorWithProvider(config, testProcs) + + proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, tCidProvider) if test.initErr == nil { if err != nil { t.Fatal(err) diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index 87d375b0a09..f3089211295 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -35,6 +35,9 @@ type config struct { // RestrictedFields make restricted fields available (i.e. env). RestrictedFields bool `config:"restricted_fields"` + // IncludeCid make cid field available + IncludeCid bool `config:"include_cid"` + // MatchPIDs fields containing the PID to lookup. MatchPIDs []string `config:"match_pids" validate:"required"` @@ -75,6 +78,7 @@ func defaultConfig() config { IgnoreMissing: true, OverwriteKeys: false, RestrictedFields: false, + IncludeCid: false, } } diff --git a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc index ef984d6176f..3f4829a9e50 100644 --- a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc +++ b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc @@ -42,6 +42,14 @@ Optionally, the process environment can be included, too: } ... ------------------------------------------------------------------------------- + +Optionally, the container id can be included, too: +[source,json] +------------------------------------------------------------------------------- + ... + "cid": "9317bb0f0c8eef642ded08bf5da182061f7f512a3987b665521a724cc84651ac" + ... +------------------------------------------------------------------------------- It has the following settings: `match_pids`:: List of fields to lookup for a PID. The processor will @@ -65,3 +73,7 @@ set to `true`, this condition will be ignored. `restricted_fields`:: (Optional) By default, the `process.env` field is not output, to avoid leaking sensitive data. If `restricted_fields` is `true`, the field will be present in the output. + +`include_cid`:: (Optional) By default, this value is false, and the `cid` +container id field is not outputed. If `restricted_fields` is `true`, `cid` +container id can be enriched from cgroup, when running auditbeat on kubernetes. diff --git a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go new file mode 100644 index 00000000000..70948ff233a --- /dev/null +++ b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go @@ -0,0 +1,62 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_process_metadata + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/elastic/gosigar/cgroup" +) + +type gosigarCidProvider struct { + hostPath string + cgroupPrefix string +} + +func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { + cgroups, err := cgroup.ProcessCgroupPaths(p.hostPath, pid) + if err != nil { + return "", fmt.Errorf("failed to read cgroups for pid=%v", pid) + } + cid := p.getCid(cgroups) + return cid, nil +} + +func newCidProvider(hostPath string, cgroupPrefix string) gosigarCidProvider { + return gosigarCidProvider{ + hostPath: hostPath, + cgroupPrefix: cgroupPrefix, + } +} + +// getCid checks all of the processes' paths to see if any +// of them are associated with Kubernetes. Kubernetes uses /kubepods/// when +// naming cgroups and we use this to determine the container ID. If no container +// ID is found then an empty string is returned. +// Example: +// /kubepods/besteffort/pod9b9e44c2-00fd-11ea-95e9-080027421ddf/2bb9fd4de339e5d4f094e78bb87636004acfe53f5668104addc761fe4a93588e +func (p gosigarCidProvider) getCid(cgroups map[string]string) string { + for _, path := range cgroups { + if strings.HasPrefix(path, p.cgroupPrefix) { + return filepath.Base(path) + } + } + return "" +} diff --git a/libbeat/processors/add_process_metadata/test_cid_provider.go b/libbeat/processors/add_process_metadata/test_cid_provider.go new file mode 100644 index 00000000000..6957ed7169a --- /dev/null +++ b/libbeat/processors/add_process_metadata/test_cid_provider.go @@ -0,0 +1,28 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_process_metadata + +type testCidProvider map[int]string + +func (p testCidProvider) GetCid(pid int) (result string, err error) { + cid, found := p[pid] + if !found { + return "", nil + } + return cid, nil +} From 32b77db5c370c5588861cbd853ccc15695001daf Mon Sep 17 00:00:00 2001 From: fanhe Date: Thu, 30 Jan 2020 12:12:27 -0800 Subject: [PATCH 02/13] maga fmt --- libbeat/processors/add_process_metadata/gosiger_cid_provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go index 70948ff233a..e7abd8b91ea 100644 --- a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go +++ b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go @@ -50,7 +50,7 @@ func newCidProvider(hostPath string, cgroupPrefix string) gosigarCidProvider { // of them are associated with Kubernetes. Kubernetes uses /kubepods/// when // naming cgroups and we use this to determine the container ID. If no container // ID is found then an empty string is returned. -// Example: +// Example: // /kubepods/besteffort/pod9b9e44c2-00fd-11ea-95e9-080027421ddf/2bb9fd4de339e5d4f094e78bb87636004acfe53f5668104addc761fe4a93588e func (p gosigarCidProvider) getCid(cgroups map[string]string) string { for _, path := range cgroups { From b10fed26e166a94da5fa554b56487d871f080445 Mon Sep 17 00:00:00 2001 From: fanhe Date: Tue, 25 Feb 2020 13:26:37 -0800 Subject: [PATCH 03/13] configuration, field, and test change --- .../add_process_metadata.go | 32 ++++----- .../add_process_metadata_test.go | 71 ++++++++++++++++--- .../processors/add_process_metadata/config.go | 17 +++-- .../docs/add_process_metadata.asciidoc | 25 ++++--- .../gosiger_cid_provider.go | 24 ++++--- 5 files changed, 116 insertions(+), 53 deletions(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index e6b8e0b7fa8..788579dabd1 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -30,13 +30,12 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/processors" jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" + "github.com/elastic/gosigar/cgroup" ) const ( processorName = "add_process_metadata" cacheExpiration = time.Second * 30 - hostPath = "/" - cgroupPrefix = "/kubepods" ) var ( @@ -49,7 +48,9 @@ var ( procCache = newProcessCache(cacheExpiration, gosysinfoProvider{}) - gCidProvider = newCidProvider(hostPath, cgroupPrefix) + processCgroupPaths = cgroup.ProcessCgroupPaths + + // wantContainerID = false instanceID atomic.Uint32 ) @@ -87,10 +88,10 @@ func init() { // New constructs a new add_process_metadata processor. func New(cfg *common.Config) (processors.Processor, error) { - return newProcessMetadataProcessorWithProvider(cfg, &procCache, gCidProvider) + return newProcessMetadataProcessorWithProvider(cfg, &procCache) } -func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider, cidProvider cidProvider) (proc processors.Processor, err error) { +func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider) (proc processors.Processor, err error) { // Logging (each processor instance has a unique ID). var ( id = int(instanceID.Inc()) @@ -105,7 +106,7 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces p := addProcessMetadata{ config: config, provider: provider, - cidProvider: cidProvider, + cidProvider: newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths), log: log, } if p.mappings, err = config.getMappings(); err != nil { @@ -166,6 +167,14 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul } meta := metaPtr.fields + cid, err := p.cidProvider.GetCid(pid) + if err != nil { + return nil, err + } + if _, err = meta.Put("container", common.MapStr{"id": cid}); err != nil { + return nil, err + } + result = event.Clone() for dest, sourceIf := range p.mappings { source, castOk := sourceIf.(string) @@ -188,17 +197,6 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul } } - // enrich with the container id, if include_cid is set to true - if p.config.IncludeCid { - cid, err := p.cidProvider.GetCid(pid) - if err != nil { - return nil, err - } - if _, err = result.Put("cid", cid); err != nil { - return nil, err - } - } - return result, nil } diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index 0b06ac005da..a463356c208 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -51,8 +51,26 @@ func TestAddProcessMetadata(t *testing.T) { }, } - tCidProvider := testCidProvider{ - 1: "test-cid", + // mock of the cgroup processCgroupPaths + processCgroupPaths = func(_ string, pid int) (map[string]string, error) { + testMap := map[int]map[string]string{ + 1: map[string]string{ + "cpu": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "net_prio": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "blkio": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "perf_event": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "freezer": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "pids": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "hugetlb": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "cpuacct": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "cpuset": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "net_cls": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "devices": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "memory": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "name=systemd": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + } + return testMap[pid], nil } for _, test := range []struct { @@ -87,6 +105,9 @@ func TestAddProcessMetadata(t *testing.T) { "ppid": 0, "start_time": startTime, }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, }, }, { @@ -165,6 +186,9 @@ func TestAddProcessMetadata(t *testing.T) { "ppid": 0, "start_time": startTime, }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, }, }, }, @@ -196,6 +220,9 @@ func TestAddProcessMetadata(t *testing.T) { "LANG": "en_US.UTF-8", }, }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, }, }, }, @@ -390,10 +417,9 @@ func TestAddProcessMetadata(t *testing.T) { err: ErrNoProcess, }, { - description: "env field (IncludeCid: true)", + description: "env field", config: common.MapStr{ - "match_pids": []string{"system.process.ppid"}, - "include_cid": true, + "match_pids": []string{"system.process.ppid"}, }, event: common.MapStr{ "system": common.MapStr{ @@ -417,14 +443,15 @@ func TestAddProcessMetadata(t *testing.T) { "ppid": 0, "start_time": startTime, }, - "cid": "test-cid", + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, }, }, { - description: "env field (IncludeCid: true), process not found", + description: "env field (IncludeContainer id), process not found", config: common.MapStr{ - "match_pids": []string{"ppid"}, - "include_cid": true, + "match_pids": []string{"ppid"}, }, event: common.MapStr{ "ppid": 42, @@ -434,6 +461,30 @@ func TestAddProcessMetadata(t *testing.T) { }, err: ErrNoProcess, }, + { + description: "container.id only", + config: common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + "include_fields": []string{"container.id"}, + }, + event: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + }, + expected: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + }, + }, } { t.Run(test.description, func(t *testing.T) { config, err := common.NewConfigFrom(test.config) @@ -441,7 +492,7 @@ func TestAddProcessMetadata(t *testing.T) { t.Fatal(err) } - proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, tCidProvider) + proc, err := newProcessMetadataProcessorWithProvider(config, testProcs) if test.initErr == nil { if err != nil { t.Fatal(err) diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index f3089211295..a7d1554682e 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -46,6 +46,12 @@ type config struct { // Fields is the list of fields to add to target. Fields []string `config:"include_fields"` + + // HostPath is the path where /proc reside + HostPath string `config:"host_path"` + + // CgroupPrefix is the prefix where the container id is inside cgroup + CgroupPrefixes []string `config:"cgroup_prefixes"` } // available fields by default @@ -59,6 +65,9 @@ var defaultFields = common.MapStr{ "ppid": nil, "start_time": nil, }, + "container": common.MapStr{ + "id": nil, + }, } // fields declared in here will only appear when requested explicitly @@ -78,7 +87,8 @@ func defaultConfig() config { IgnoreMissing: true, OverwriteKeys: false, RestrictedFields: false, - IncludeCid: false, + HostPath: "/", + CgroupPrefixes: []string{"/kubepods", "/docker"}, } } @@ -94,7 +104,7 @@ func (pf *config) getMappings() (mappings common.MapStr, err error) { } wantedFields := pf.Fields if len(wantedFields) == 0 { - wantedFields = []string{"process"} + wantedFields = []string{"process", "container"} } for _, docSrc := range wantedFields { dstField := fieldPrefix + docSrc @@ -103,9 +113,6 @@ func (pf *config) getMappings() (mappings common.MapStr, err error) { return nil, fmt.Errorf("field '%v' not found", docSrc) } if reqField != nil { - if len(wantedFields) != 1 { - return nil, fmt.Errorf("'%s' field cannot be used in conjunction with other fields", docSrc) - } for subField := range reqField.(common.MapStr) { key := dstField + "." + subField val := docSrc + "." + subField diff --git a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc index 3f4829a9e50..85bbaa52a2d 100644 --- a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc +++ b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc @@ -27,7 +27,10 @@ The fields added to the event look as follows: "pid": 1, "ppid": 0, "start_time": "2018-08-22T08:44:50.684Z", -} +}, +"container": { + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1" +}, ------------------------------------------------------------------------------- Optionally, the process environment can be included, too: @@ -43,13 +46,6 @@ Optionally, the process environment can be included, too: ... ------------------------------------------------------------------------------- -Optionally, the container id can be included, too: -[source,json] -------------------------------------------------------------------------------- - ... - "cid": "9317bb0f0c8eef642ded08bf5da182061f7f512a3987b665521a724cc84651ac" - ... -------------------------------------------------------------------------------- It has the following settings: `match_pids`:: List of fields to lookup for a PID. The processor will @@ -74,6 +70,13 @@ set to `true`, this condition will be ignored. output, to avoid leaking sensitive data. If `restricted_fields` is `true`, the field will be present in the output. -`include_cid`:: (Optional) By default, this value is false, and the `cid` -container id field is not outputed. If `restricted_fields` is `true`, `cid` -container id can be enriched from cgroup, when running auditbeat on kubernetes. +`host_path`:: (Optional) By default, the `host_path` field is set to the root +directory of the host `/`. HostPath is the path where /proc reside. For different +runtime configuration of Kubernetes or docker, the host_path can be set to +overwrite the default `host_path` + +`cgroup_prefixes`:: (Optional) By default, the `cgroup_prefixes` field is set to +`/kubepods` and `/docker`. CgroupPrefix is the prefix where the container id is +inside cgroup. or different runtime configuration of Kubernetes or docker, the +cgroup_prefixes can be set to overwrite the default `cgroup_prefixes` + diff --git a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go index e7abd8b91ea..cbce2a2d0df 100644 --- a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go +++ b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go @@ -21,17 +21,17 @@ import ( "fmt" "path/filepath" "strings" - - "github.com/elastic/gosigar/cgroup" ) type gosigarCidProvider struct { - hostPath string - cgroupPrefix string + hostPath string + cgroupPrefixes []string + processCgroupPaths func(string, int) (map[string]string, error) } func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { - cgroups, err := cgroup.ProcessCgroupPaths(p.hostPath, pid) + cgroups, err := p.processCgroupPaths(p.hostPath, pid) + if err != nil { return "", fmt.Errorf("failed to read cgroups for pid=%v", pid) } @@ -39,10 +39,12 @@ func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { return cid, nil } -func newCidProvider(hostPath string, cgroupPrefix string) gosigarCidProvider { +func newCidProvider(hostPath string, cgroupPrefixes []string, processCgroupPaths func(string, int) (map[string]string, error)) gosigarCidProvider { + return gosigarCidProvider{ - hostPath: hostPath, - cgroupPrefix: cgroupPrefix, + hostPath: hostPath, + cgroupPrefixes: cgroupPrefixes, + processCgroupPaths: processCgroupPaths, } } @@ -54,8 +56,10 @@ func newCidProvider(hostPath string, cgroupPrefix string) gosigarCidProvider { // /kubepods/besteffort/pod9b9e44c2-00fd-11ea-95e9-080027421ddf/2bb9fd4de339e5d4f094e78bb87636004acfe53f5668104addc761fe4a93588e func (p gosigarCidProvider) getCid(cgroups map[string]string) string { for _, path := range cgroups { - if strings.HasPrefix(path, p.cgroupPrefix) { - return filepath.Base(path) + for _, prefix := range p.cgroupPrefixes { + if strings.HasPrefix(path, prefix) { + return filepath.Base(path) + } } } return "" From 693237f126a75e9d6d5a029431c19f6899aca31c Mon Sep 17 00:00:00 2001 From: fanhe Date: Wed, 26 Feb 2020 14:02:00 -0800 Subject: [PATCH 04/13] rephrase doc, and avoid getting container id when disabled --- .../add_process_metadata.go | 55 +++++++++++++++---- .../docs/add_process_metadata.asciidoc | 13 ++--- 2 files changed, 50 insertions(+), 18 deletions(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 788579dabd1..f32c617eb32 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -50,8 +50,6 @@ var ( processCgroupPaths = cgroup.ProcessCgroupPaths - // wantContainerID = false - instanceID atomic.Uint32 ) @@ -103,19 +101,44 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces return nil, errors.Wrapf(err, "fail to unpack the %v configuration", processorName) } + mappings, err := config.getMappings() + + if err != nil { + return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName) + } + + var cidProvider gosigarCidProvider + + // don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled + if ok := containsValue(mappings, "container.id"); ok { + cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths) + } else { + dummyProcessCgroupPaths := func(_ string, pid int) (map[string]string, error) { + return map[string]string{}, nil + } + cidProvider = newCidProvider(config.HostPath, []string{}, dummyProcessCgroupPaths) + } + p := addProcessMetadata{ config: config, provider: provider, - cidProvider: newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths), + cidProvider: cidProvider, log: log, - } - if p.mappings, err = config.getMappings(); err != nil { - return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName) + mappings: mappings, } return &p, nil } +func containsValue(m common.MapStr, v string) bool { + for _, x := range m { + if x == v { + return true + } + } + return false +} + // Run enriches the given event with the host meta data func (p *addProcessMetadata) Run(event *beat.Event) (*beat.Event, error) { for _, pidField := range p.config.MatchPIDs { @@ -167,11 +190,7 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul } meta := metaPtr.fields - cid, err := p.cidProvider.GetCid(pid) - if err != nil { - return nil, err - } - if _, err = meta.Put("container", common.MapStr{"id": cid}); err != nil { + if meta, err = p.enrichContainerID(pid, meta); err != nil { return nil, err } @@ -200,6 +219,20 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul return result, nil } +// add container.id into meta for mapping to pickup +func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) (common.MapStr, error) { + cid, err := p.cidProvider.GetCid(pid) + if err != nil { + return nil, err + } + if cid != "" { + if _, err = meta.Put("container", common.MapStr{"id": cid}); err != nil { + return nil, err + } + } + return meta, nil +} + // String returns the processor representation formatted as a string func (p *addProcessMetadata) String() string { return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v]", diff --git a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc index 85bbaa52a2d..3e662be0475 100644 --- a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc +++ b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc @@ -71,12 +71,11 @@ output, to avoid leaking sensitive data. If `restricted_fields` is `true`, the field will be present in the output. `host_path`:: (Optional) By default, the `host_path` field is set to the root -directory of the host `/`. HostPath is the path where /proc reside. For different -runtime configuration of Kubernetes or docker, the host_path can be set to -overwrite the default `host_path` +directory of the host `/`. This is the path where `/proc` is mounted. For different +runtime configuration of Kubernetes or Docker the `host_path` can be set to +overwrite the default. `cgroup_prefixes`:: (Optional) By default, the `cgroup_prefixes` field is set to -`/kubepods` and `/docker`. CgroupPrefix is the prefix where the container id is -inside cgroup. or different runtime configuration of Kubernetes or docker, the -cgroup_prefixes can be set to overwrite the default `cgroup_prefixes` - +`/kubepods` and `/docker`. This is the prefix where the container ID is inside +cgroup. For different runtime configuration of Kubernetes or Docker the +`cgroup_prefixes` can be set to overwrite the defaults. From fca4d9d6cf2809c03da96584114b90e632611b46 Mon Sep 17 00:00:00 2001 From: fanhe Date: Thu, 27 Feb 2020 14:40:49 -0800 Subject: [PATCH 05/13] fix test fail when container id in mapping but don't exist --- .../add_process_metadata/add_process_metadata.go | 15 +++++++++++++-- .../add_process_metadata/gosiger_cid_provider.go | 12 +++++++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index f32c617eb32..1096b6cf36a 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -206,11 +206,22 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul return nil, errors.Errorf("target field '%s' already exists and overwrite_keys is false", dest) } } - value, err := meta.GetValue(source) + + ok, err := meta.HasKey(source) if err != nil { - // Should never happen return nil, err } + + var value interface{} + + if ok { + value, err = meta.GetValue(source) + if err != nil { + // Should never happen + return nil, err + } + } + if _, err = result.Put(dest, value); err != nil { return nil, err } diff --git a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go index cbce2a2d0df..420354a6ea8 100644 --- a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go +++ b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go @@ -19,6 +19,7 @@ package add_process_metadata import ( "fmt" + "os" "path/filepath" "strings" ) @@ -32,10 +33,19 @@ type gosigarCidProvider struct { func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { cgroups, err := p.processCgroupPaths(p.hostPath, pid) - if err != nil { + switch err.(type) { + case nil: + // do no thing + case *os.PathError: + // os.PathError happens when the process don't exist, or not running in linux system + return "", nil + default: + // should never happen return "", fmt.Errorf("failed to read cgroups for pid=%v", pid) } + cid := p.getCid(cgroups) + return cid, nil } From 9547ccb7b8188dc536abd487e5e635e6d473fd1a Mon Sep 17 00:00:00 2001 From: fanhe Date: Thu, 27 Feb 2020 16:19:48 -0800 Subject: [PATCH 06/13] replace with nil when container id is disabled --- .../add_process_metadata.go | 60 +++++++++---------- .../processors/add_process_metadata/config.go | 3 - .../gosiger_cid_provider.go | 2 - 3 files changed, 27 insertions(+), 38 deletions(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 1096b6cf36a..7b8ad9764aa 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -51,6 +51,8 @@ var ( processCgroupPaths = cgroup.ProcessCgroupPaths instanceID atomic.Uint32 + + isContainerIDEnabled bool ) type addProcessMetadata struct { @@ -107,24 +109,24 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName) } - var cidProvider gosigarCidProvider + var p addProcessMetadata // don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled if ok := containsValue(mappings, "container.id"); ok { - cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths) + p = addProcessMetadata{ + config: config, + provider: provider, + cidProvider: newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths), + log: log, + mappings: mappings, + } } else { - dummyProcessCgroupPaths := func(_ string, pid int) (map[string]string, error) { - return map[string]string{}, nil + p = addProcessMetadata{ + config: config, + provider: provider, + log: log, + mappings: mappings, } - cidProvider = newCidProvider(config.HostPath, []string{}, dummyProcessCgroupPaths) - } - - p := addProcessMetadata{ - config: config, - provider: provider, - cidProvider: cidProvider, - log: log, - mappings: mappings, } return &p, nil @@ -190,7 +192,7 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul } meta := metaPtr.fields - if meta, err = p.enrichContainerID(pid, meta); err != nil { + if err = p.enrichContainerID(pid, meta); err != nil { return nil, err } @@ -207,21 +209,12 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul } } - ok, err := meta.HasKey(source) + value, err := meta.GetValue(source) if err != nil { + // Should never happen return nil, err } - var value interface{} - - if ok { - value, err = meta.GetValue(source) - if err != nil { - // Should never happen - return nil, err - } - } - if _, err = result.Put(dest, value); err != nil { return nil, err } @@ -230,18 +223,19 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul return result, nil } -// add container.id into meta for mapping to pickup -func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) (common.MapStr, error) { +// addProcessMetadata add container.id into meta for mapping to pickup +func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) error { + if p.cidProvider == nil { + return nil + } cid, err := p.cidProvider.GetCid(pid) if err != nil { - return nil, err + return err } - if cid != "" { - if _, err = meta.Put("container", common.MapStr{"id": cid}); err != nil { - return nil, err - } + if _, err = meta.Put("container", common.MapStr{"id": cid}); err != nil { + return err } - return meta, nil + return nil } // String returns the processor representation formatted as a string diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index a7d1554682e..6ef5755352f 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -35,9 +35,6 @@ type config struct { // RestrictedFields make restricted fields available (i.e. env). RestrictedFields bool `config:"restricted_fields"` - // IncludeCid make cid field available - IncludeCid bool `config:"include_cid"` - // MatchPIDs fields containing the PID to lookup. MatchPIDs []string `config:"match_pids" validate:"required"` diff --git a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go index 420354a6ea8..2f3ecc99af3 100644 --- a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go +++ b/libbeat/processors/add_process_metadata/gosiger_cid_provider.go @@ -32,7 +32,6 @@ type gosigarCidProvider struct { func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { cgroups, err := p.processCgroupPaths(p.hostPath, pid) - switch err.(type) { case nil: // do no thing @@ -50,7 +49,6 @@ func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { } func newCidProvider(hostPath string, cgroupPrefixes []string, processCgroupPaths func(string, int) (map[string]string, error)) gosigarCidProvider { - return gosigarCidProvider{ hostPath: hostPath, cgroupPrefixes: cgroupPrefixes, From 212610263caba7fcfb175f1e97c2054f433d107d Mon Sep 17 00:00:00 2001 From: fanhe Date: Thu, 27 Feb 2020 16:27:34 -0800 Subject: [PATCH 07/13] remove test_cid_provider as replaced with mocking cgroup.ProcessCgroupPaths --- .../add_process_metadata/test_cid_provider.go | 28 ------------------- 1 file changed, 28 deletions(-) delete mode 100644 libbeat/processors/add_process_metadata/test_cid_provider.go diff --git a/libbeat/processors/add_process_metadata/test_cid_provider.go b/libbeat/processors/add_process_metadata/test_cid_provider.go deleted file mode 100644 index 6957ed7169a..00000000000 --- a/libbeat/processors/add_process_metadata/test_cid_provider.go +++ /dev/null @@ -1,28 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package add_process_metadata - -type testCidProvider map[int]string - -func (p testCidProvider) GetCid(pid int) (result string, err error) { - cid, found := p[pid] - if !found { - return "", nil - } - return cid, nil -} From 068380bc6777c6a0562ed6d9113f892da4fea0c4 Mon Sep 17 00:00:00 2001 From: fanhe Date: Tue, 3 Mar 2020 11:49:43 -0800 Subject: [PATCH 08/13] clear up and godoc --- .../processors/add_process_metadata/add_process_metadata.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 7b8ad9764aa..91dc82444c8 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -51,8 +51,6 @@ var ( processCgroupPaths = cgroup.ProcessCgroupPaths instanceID atomic.Uint32 - - isContainerIDEnabled bool ) type addProcessMetadata struct { @@ -223,7 +221,7 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul return result, nil } -// addProcessMetadata add container.id into meta for mapping to pickup +// enrichContainerID add container.id into meta for mapping to pickup func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) error { if p.cidProvider == nil { return nil From 8f722f22612e542c562839931fac6a1ce54ebdbb Mon Sep 17 00:00:00 2001 From: fanhe Date: Tue, 3 Mar 2020 12:02:04 -0800 Subject: [PATCH 09/13] fix typo --- libbeat/processors/add_process_metadata/add_process_metadata.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 91dc82444c8..1f39833bbf3 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -221,7 +221,7 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul return result, nil } -// enrichContainerID add container.id into meta for mapping to pickup +// enrichContainerID adds container.id into meta for mapping to pickup func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) error { if p.cidProvider == nil { return nil From e3bd35a81ced6516fcb7b7545793e956e94f95ee Mon Sep 17 00:00:00 2001 From: fanhe Date: Fri, 13 Mar 2020 11:27:18 -0700 Subject: [PATCH 10/13] add default MatchPIDs, more debug info, fix typo --- .../add_process_metadata/add_process_metadata.go | 10 ++++++---- libbeat/processors/add_process_metadata/config.go | 1 + ...gosiger_cid_provider.go => gosigar_cid_provider.go} | 0 3 files changed, 7 insertions(+), 4 deletions(-) rename libbeat/processors/add_process_metadata/{gosiger_cid_provider.go => gosigar_cid_provider.go} (100%) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 1f39833bbf3..6d0d062f675 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -34,8 +34,9 @@ import ( ) const ( - processorName = "add_process_metadata" - cacheExpiration = time.Second * 30 + processorName = "add_process_metadata" + cacheExpiration = time.Second * 30 + containerIDMapping = "container.id" ) var ( @@ -130,6 +131,7 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces return &p, nil } +// check if the value exist in mapping func containsValue(m common.MapStr, v string) bool { for _, x := range m { if x == v { @@ -238,9 +240,9 @@ func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) erro // String returns the processor representation formatted as a string func (p *addProcessMetadata) String() string { - return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v]", + return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v, host_path=%v, cgroup_prefixes=%v]", processorName, p.config.MatchPIDs, p.mappings, p.config.IgnoreMissing, - p.config.OverwriteKeys, p.config.RestrictedFields) + p.config.OverwriteKeys, p.config.RestrictedFields, p.config.HostPath, p.config.CgroupPrefixes) } func (p *processMetadata) toMap() common.MapStr { diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index 6ef5755352f..86d807e7902 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -84,6 +84,7 @@ func defaultConfig() config { IgnoreMissing: true, OverwriteKeys: false, RestrictedFields: false, + MatchPIDs: []string{"process.pid", "process.ppid", "process.parent.pid", "process.parent.ppid"}, HostPath: "/", CgroupPrefixes: []string{"/kubepods", "/docker"}, } diff --git a/libbeat/processors/add_process_metadata/gosiger_cid_provider.go b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go similarity index 100% rename from libbeat/processors/add_process_metadata/gosiger_cid_provider.go rename to libbeat/processors/add_process_metadata/gosigar_cid_provider.go From 0a38304d5d88d0469e19dbaa43cdaa15772190b3 Mon Sep 17 00:00:00 2001 From: fanhe Date: Wed, 18 Mar 2020 11:07:39 -0700 Subject: [PATCH 11/13] add cgroup cache --- .../add_process_metadata.go | 31 +++-- .../add_process_metadata_test.go | 111 ++++++++++++++++++ .../processors/add_process_metadata/config.go | 21 +++- .../docs/add_process_metadata.asciidoc | 28 +++-- .../gosigar_cid_provider.go | 63 ++++++++-- 5 files changed, 215 insertions(+), 39 deletions(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 6d0d062f675..3a2387cc9d2 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -110,22 +110,27 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces var p addProcessMetadata + p = addProcessMetadata{ + config: config, + provider: provider, + log: log, + mappings: mappings, + } // don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled if ok := containsValue(mappings, "container.id"); ok { - p = addProcessMetadata{ - config: config, - provider: provider, - cidProvider: newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths), - log: log, - mappings: mappings, - } - } else { - p = addProcessMetadata{ - config: config, - provider: provider, - log: log, - mappings: mappings, + if config.CgroupCacheExpireTime != 0 { + p.log.Debug("Initializing cgroup cache") + evictionListener := func(k common.Key, v common.Value) { + p.log.Debugf("Evicted cached cgroups for PID=%v", k) + } + + cgroupsCache := common.NewCacheWithRemovalListener(time.Duration(config.CgroupCacheExpireTime)*time.Second, 100, evictionListener) + cgroupsCache.StartJanitor(time.Duration(config.CgroupCacheCleanTime) * time.Second) + p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, cgroupsCache) + } else { + p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, nil) } + } return &p, nil diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index a463356c208..fff7733c846 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -485,6 +485,31 @@ func TestAddProcessMetadata(t *testing.T) { }, }, }, + { + description: "without cgroup cache", + config: common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + "include_fields": []string{"container.id"}, + "cgroup_cache_expire_time": 0, + }, + event: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + }, + expected: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + }, + }, } { t.Run(test.description, func(t *testing.T) { config, err := common.NewConfigFrom(test.config) @@ -522,6 +547,92 @@ func TestAddProcessMetadata(t *testing.T) { } } +func TestUsingCache(t *testing.T) { + logp.TestingSetup(logp.WithSelectors(processorName)) + + selfPID := os.Getpid() + + // mock of the cgroup processCgroupPaths + processCgroupPaths = func(_ string, pid int) (map[string]string, error) { + testMap := map[int]map[string]string{ + selfPID: map[string]string{ + "cpu": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "net_prio": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "blkio": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "perf_event": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "freezer": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "pids": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "hugetlb": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "cpuacct": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "cpuset": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "net_cls": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "devices": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "memory": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "name=systemd": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + } + return testMap[pid], nil + } + + config, err := common.NewConfigFrom(common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + "include_fields": []string{"container.id"}, + "target": "meta", + }) + + if err != nil { + t.Fatal(err) + } + proc, err := New(config) + if err != nil { + t.Fatal(err) + } + + ev := beat.Event{ + Fields: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": selfPID, + }, + }, + }, + } + + // first run + result, err := proc.Run(&ev) + if err != nil { + t.Fatal(err) + } + t.Log(result.Fields) + containerID, err := result.Fields.GetValue("meta.container.id") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", containerID) + + ev = beat.Event{ + Fields: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": selfPID, + }, + }, + }, + } + + // cached result + result, err = proc.Run(&ev) + if err != nil { + t.Fatal(err) + } + t.Log(result.Fields) + containerID, err = result.Fields.GetValue("meta.container.id") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", containerID) +} + func TestSelf(t *testing.T) { logp.TestingSetup(logp.WithSelectors(processorName)) config, err := common.NewConfigFrom(common.MapStr{ diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index 86d807e7902..549b5db888e 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -49,6 +49,13 @@ type config struct { // CgroupPrefix is the prefix where the container id is inside cgroup CgroupPrefixes []string `config:"cgroup_prefixes"` + + // CgroupCacheExpireTime is the length of time before cgroup cache elements expire in seconds, + // set to 0 to disable the cgroup cache + CgroupCacheExpireTime int `config:"cgroup_cache_expire_time"` + + // CgroupCacheCleanTime is the length of time cache clean up expired elements in seconds + CgroupCacheCleanTime int `config:"cgroup_cache_clean_time"` } // available fields by default @@ -81,12 +88,14 @@ func init() { func defaultConfig() config { return config{ - IgnoreMissing: true, - OverwriteKeys: false, - RestrictedFields: false, - MatchPIDs: []string{"process.pid", "process.ppid", "process.parent.pid", "process.parent.ppid"}, - HostPath: "/", - CgroupPrefixes: []string{"/kubepods", "/docker"}, + IgnoreMissing: true, + OverwriteKeys: false, + RestrictedFields: false, + MatchPIDs: []string{"process.pid", "process.ppid", "process.parent.pid", "process.parent.ppid"}, + HostPath: "/", + CgroupPrefixes: []string{"/kubepods", "/docker"}, + CgroupCacheExpireTime: 300, + CgroupCacheCleanTime: 5, } } diff --git a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc index 3e662be0475..16719b4800f 100644 --- a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc +++ b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc @@ -71,11 +71,23 @@ output, to avoid leaking sensitive data. If `restricted_fields` is `true`, the field will be present in the output. `host_path`:: (Optional) By default, the `host_path` field is set to the root -directory of the host `/`. This is the path where `/proc` is mounted. For different -runtime configuration of Kubernetes or Docker the `host_path` can be set to -overwrite the default. - -`cgroup_prefixes`:: (Optional) By default, the `cgroup_prefixes` field is set to -`/kubepods` and `/docker`. This is the prefix where the container ID is inside -cgroup. For different runtime configuration of Kubernetes or Docker the -`cgroup_prefixes` can be set to overwrite the defaults. +directory of the host `/`. This is the path where `/proc` is mounted. For +different runtime configurations of Kubernetes or Docker, the `host_path` can +be set to overwrite the default. + +`cgroup_prefixes`:: (Optional) By default, the `cgroup_prefixes` field is set +to `/kubepods` and `/docker`. This is the prefix where the container ID is +inside cgroup. For different runtime configurations of Kubernetes or Docker, +the `cgroup_prefixes` can be set to overwrite the defaults. + +`CgroupCacheExpireTime`:: (Optional) By default, the `CgroupCacheExpireTime` +is set to 5 minuses. This is the length of time before cgroup cache elements +expire in seconds. It can be set to 0 to disable the cgroup cache. In some +container runtimes technology like runc, the container's process is also +process in the host kernel, and will be affected by PID rollover/reuse. The +expire time needs to set smaller than the PIDs wrap around time to avoid wrong +container id. + +`CgroupCacheExpireTime`:: (Optional) By default, the `CgroupCacheExpireTime` +is set to 5 seconds. this is the length of time cache clean up expired elements +in seconds. diff --git a/libbeat/processors/add_process_metadata/gosigar_cid_provider.go b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go index 2f3ecc99af3..9b95a65a4d3 100644 --- a/libbeat/processors/add_process_metadata/gosigar_cid_provider.go +++ b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go @@ -18,29 +18,35 @@ package add_process_metadata import ( - "fmt" "os" "path/filepath" "strings" + "time" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/pkg/errors" +) + +const ( + providerName = "gosigar_cid_provider" + cgroupsCacheExpiration = 5 * time.Minute ) type gosigarCidProvider struct { + log *logp.Logger hostPath string cgroupPrefixes []string processCgroupPaths func(string, int) (map[string]string, error) + cgroupsCache *common.Cache } func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { - cgroups, err := p.processCgroupPaths(p.hostPath, pid) - switch err.(type) { - case nil: - // do no thing - case *os.PathError: - // os.PathError happens when the process don't exist, or not running in linux system - return "", nil - default: - // should never happen - return "", fmt.Errorf("failed to read cgroups for pid=%v", pid) + + cgroups, err := p.getProcessCgroups(pid) + + if err != nil { + p.log.Debugf("failed to get cgroups for pid=%v: %v", pid, err) } cid := p.getCid(cgroups) @@ -48,12 +54,45 @@ func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { return cid, nil } -func newCidProvider(hostPath string, cgroupPrefixes []string, processCgroupPaths func(string, int) (map[string]string, error)) gosigarCidProvider { +func newCidProvider(hostPath string, cgroupPrefixes []string, processCgroupPaths func(string, int) (map[string]string, error), cgroupsCache *common.Cache) gosigarCidProvider { return gosigarCidProvider{ + log: logp.NewLogger(providerName), hostPath: hostPath, cgroupPrefixes: cgroupPrefixes, processCgroupPaths: processCgroupPaths, + cgroupsCache: cgroupsCache, + } +} + +// getProcessCgroups returns a mapping of cgroup subsystem name to path. It +// returns an error if it failed to retrieve the cgroup info. +func (p gosigarCidProvider) getProcessCgroups(pid int) (map[string]string, error) { + + var cgroup map[string]string + var ok bool + + if p.cgroupsCache != nil { + if cgroup, ok = p.cgroupsCache.Get(pid).(map[string]string); ok { + p.log.Debugf("Using cached cgroups for pid=%v", pid) + return cgroup, nil + } } + + cgroup, err := p.processCgroupPaths(p.hostPath, pid) + switch err.(type) { + case nil, *os.PathError: + // do no thing when err is nil or when os.PathError happens because the process don't exist, + // or not running in linux system + default: + // should never happen + return cgroup, errors.Wrapf(err, "failed to read cgroups for pid=%v", pid) + } + + if p.cgroupsCache != nil { + p.cgroupsCache.Put(pid, cgroup) + } + + return cgroup, nil } // getCid checks all of the processes' paths to see if any From 231012bc0d11ecb79af1b711d6482386bf2f7ecd Mon Sep 17 00:00:00 2001 From: fanhe Date: Wed, 18 Mar 2020 11:45:55 -0700 Subject: [PATCH 12/13] mage fmt --- .../processors/add_process_metadata/gosigar_cid_provider.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/processors/add_process_metadata/gosigar_cid_provider.go b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go index 9b95a65a4d3..86a047f690a 100644 --- a/libbeat/processors/add_process_metadata/gosigar_cid_provider.go +++ b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go @@ -23,9 +23,10 @@ import ( "strings" "time" + "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/pkg/errors" ) const ( From 998ae9252cc51fd654e3ba3356d0a67b6b5aa0e4 Mon Sep 17 00:00:00 2001 From: fanhe Date: Thu, 19 Mar 2020 11:54:18 -0700 Subject: [PATCH 13/13] add CHANGELOG, change config --- CHANGELOG.next.asciidoc | 1 + .../add_process_metadata.go | 4 +-- .../add_process_metadata_test.go | 25 +++++++++++++++++++ .../processors/add_process_metadata/config.go | 9 +++---- .../docs/add_process_metadata.asciidoc | 18 ++++++------- .../gosigar_cid_provider.go | 4 +-- 6 files changed, 39 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 389462a3e8c..2a0a220e073 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -150,6 +150,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621 - Add support for kubernetes provider to recognize namespace level defaults {pull}16321[16321] - Add `translate_sid` processor on Windows for converting Windows security identifier (SID) values to names. {issue}7451[7451] {pull}16013[16013] +- Add capability of enrich `container.id` with process id in `add_process_metadata` processor {pull}15947[15947] *Auditbeat* diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 3a2387cc9d2..66c8767590a 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -124,8 +124,8 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces p.log.Debugf("Evicted cached cgroups for PID=%v", k) } - cgroupsCache := common.NewCacheWithRemovalListener(time.Duration(config.CgroupCacheExpireTime)*time.Second, 100, evictionListener) - cgroupsCache.StartJanitor(time.Duration(config.CgroupCacheCleanTime) * time.Second) + cgroupsCache := common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener) + cgroupsCache.StartJanitor(config.CgroupCacheExpireTime) p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, cgroupsCache) } else { p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, nil) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index fff7733c846..bb5eff7e193 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -510,6 +510,31 @@ func TestAddProcessMetadata(t *testing.T) { }, }, }, + { + description: "custom cache expire time", + config: common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + "include_fields": []string{"container.id"}, + "cgroup_cache_expire_time": 10 * time.Second, + }, + event: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + }, + expected: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + }, + }, } { t.Run(test.description, func(t *testing.T) { config, err := common.NewConfigFrom(test.config) diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index 549b5db888e..0ed65b1d778 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -19,6 +19,7 @@ package add_process_metadata import ( "fmt" + "time" "github.com/pkg/errors" @@ -52,10 +53,7 @@ type config struct { // CgroupCacheExpireTime is the length of time before cgroup cache elements expire in seconds, // set to 0 to disable the cgroup cache - CgroupCacheExpireTime int `config:"cgroup_cache_expire_time"` - - // CgroupCacheCleanTime is the length of time cache clean up expired elements in seconds - CgroupCacheCleanTime int `config:"cgroup_cache_clean_time"` + CgroupCacheExpireTime time.Duration `config:"cgroup_cache_expire_time"` } // available fields by default @@ -94,8 +92,7 @@ func defaultConfig() config { MatchPIDs: []string{"process.pid", "process.ppid", "process.parent.pid", "process.parent.ppid"}, HostPath: "/", CgroupPrefixes: []string{"/kubepods", "/docker"}, - CgroupCacheExpireTime: 300, - CgroupCacheCleanTime: 5, + CgroupCacheExpireTime: cacheExpiration, } } diff --git a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc index 16719b4800f..d3e71dca920 100644 --- a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc +++ b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc @@ -80,14 +80,10 @@ to `/kubepods` and `/docker`. This is the prefix where the container ID is inside cgroup. For different runtime configurations of Kubernetes or Docker, the `cgroup_prefixes` can be set to overwrite the defaults. -`CgroupCacheExpireTime`:: (Optional) By default, the `CgroupCacheExpireTime` -is set to 5 minuses. This is the length of time before cgroup cache elements -expire in seconds. It can be set to 0 to disable the cgroup cache. In some -container runtimes technology like runc, the container's process is also -process in the host kernel, and will be affected by PID rollover/reuse. The -expire time needs to set smaller than the PIDs wrap around time to avoid wrong -container id. - -`CgroupCacheExpireTime`:: (Optional) By default, the `CgroupCacheExpireTime` -is set to 5 seconds. this is the length of time cache clean up expired elements -in seconds. +`cgroup_cache_expire_time`:: (Optional) By default, the +`cgroup_cache_expire_time` is set to 30 seconds. This is the length of time +before cgroup cache elements expire in seconds. It can be set to 0 to disable +the cgroup cache. In some container runtimes technology like runc, the +container's process is also process in the host kernel, and will be affected by +PID rollover/reuse. The expire time needs to set smaller than the PIDs wrap +around time to avoid wrong container id. diff --git a/libbeat/processors/add_process_metadata/gosigar_cid_provider.go b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go index 86a047f690a..d1e3951475c 100644 --- a/libbeat/processors/add_process_metadata/gosigar_cid_provider.go +++ b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go @@ -21,7 +21,6 @@ import ( "os" "path/filepath" "strings" - "time" "github.com/pkg/errors" @@ -30,8 +29,7 @@ import ( ) const ( - providerName = "gosigar_cid_provider" - cgroupsCacheExpiration = 5 * time.Minute + providerName = "gosigar_cid_provider" ) type gosigarCidProvider struct {