Skip to content

Commit

Permalink
enrich container id from process id (elastic#15947) (elastic#17244)
Browse files Browse the repository at this point in the history
* enrich container id from process id

(cherry picked from commit 3cb957d)

Co-authored-by: Fang He <[email protected]>
  • Loading branch information
Carlos Pérez-Aradros Herce and BrookHF authored Mar 25, 2020
1 parent e1b2f23 commit 2206baf
Show file tree
Hide file tree
Showing 6 changed files with 469 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621]
- Add `translate_sid` processor on Windows for converting Windows security identifier (SID) values to names. {issue}7451[7451] {pull}16013[16013]
- Add support for kubernetes provider to recognize namespace level defaults {pull}16321[16321]
- Add capability of enrich `container.id` with process id in `add_process_metadata` processor {pull}15947[15947]
- Update RPM packages contained in Beat Docker images. {issue}17035[17035]
- Add Kerberos support to Kafka input and output. {pull}16781[16781]

Expand Down
85 changes: 74 additions & 11 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/gosigar/cgroup"
)

const (
processorName = "add_process_metadata"
cacheExpiration = time.Second * 30
processorName = "add_process_metadata"
cacheExpiration = time.Second * 30
containerIDMapping = "container.id"
)

var (
Expand All @@ -47,14 +49,17 @@ var (

procCache = newProcessCache(cacheExpiration, gosysinfoProvider{})

processCgroupPaths = cgroup.ProcessCgroupPaths

instanceID atomic.Uint32
)

type addProcessMetadata struct {
config config
provider processMetadataProvider
log *logp.Logger
mappings common.MapStr
config config
provider processMetadataProvider
cidProvider cidProvider
log *logp.Logger
mappings common.MapStr
}

type processMetadata struct {
Expand All @@ -71,6 +76,10 @@ type processMetadataProvider interface {
GetProcessMetadata(pid int) (*processMetadata, error)
}

type cidProvider interface {
GetCid(pid int) (string, error)
}

func init() {
processors.RegisterPlugin(processorName, New)
jsprocessor.RegisterPlugin("AddProcessMetadata", New)
Expand All @@ -93,18 +102,50 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces
return nil, errors.Wrapf(err, "fail to unpack the %v configuration", processorName)
}

p := addProcessMetadata{
mappings, err := config.getMappings()

if err != nil {
return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName)
}

var p addProcessMetadata

p = addProcessMetadata{
config: config,
provider: provider,
log: log,
mappings: mappings,
}
if p.mappings, err = config.getMappings(); err != nil {
return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName)
// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if config.CgroupCacheExpireTime != 0 {
p.log.Debug("Initializing cgroup cache")
evictionListener := func(k common.Key, v common.Value) {
p.log.Debugf("Evicted cached cgroups for PID=%v", k)
}

cgroupsCache := common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener)
cgroupsCache.StartJanitor(config.CgroupCacheExpireTime)
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, cgroupsCache)
} else {
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, nil)
}

}

return &p, nil
}

// check if the value exist in mapping
func containsValue(m common.MapStr, v string) bool {
for _, x := range m {
if x == v {
return true
}
}
return false
}

// Run enriches the given event with the host meta data
func (p *addProcessMetadata) Run(event *beat.Event) (*beat.Event, error) {
for _, pidField := range p.config.MatchPIDs {
Expand Down Expand Up @@ -156,6 +197,10 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul
}
meta := metaPtr.fields

if err = p.enrichContainerID(pid, meta); err != nil {
return nil, err
}

result = event.Clone()
for dest, sourceIf := range p.mappings {
source, castOk := sourceIf.(string)
Expand All @@ -168,23 +213,41 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul
return nil, errors.Errorf("target field '%s' already exists and overwrite_keys is false", dest)
}
}

value, err := meta.GetValue(source)
if err != nil {
// Should never happen
return nil, err
}

if _, err = result.Put(dest, value); err != nil {
return nil, err
}
}

return result, nil
}

// enrichContainerID adds container.id into meta for mapping to pickup
func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) error {
if p.cidProvider == nil {
return nil
}
cid, err := p.cidProvider.GetCid(pid)
if err != nil {
return err
}
if _, err = meta.Put("container", common.MapStr{"id": cid}); err != nil {
return err
}
return nil
}

// String returns the processor representation formatted as a string
func (p *addProcessMetadata) String() string {
return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v]",
return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v, host_path=%v, cgroup_prefixes=%v]",
processorName, p.config.MatchPIDs, p.mappings, p.config.IgnoreMissing,
p.config.OverwriteKeys, p.config.RestrictedFields)
p.config.OverwriteKeys, p.config.RestrictedFields, p.config.HostPath, p.config.CgroupPrefixes)
}

func (p *processMetadata) toMap() common.MapStr {
Expand Down
Loading

0 comments on commit 2206baf

Please sign in to comment.