-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enrich container id from process id #15947
Changes from all commits
686f3fe
32b77db
b10fed2
693237f
fca4d9d
9547ccb
2126102
068380b
8f722f2
e3bd35a
0a38304
231012b
998ae92
5f79ddf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -30,11 +30,13 @@ import ( | |||
"github.com/elastic/beats/v7/libbeat/logp" | ||||
"github.com/elastic/beats/v7/libbeat/processors" | ||||
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" | ||||
"github.com/elastic/gosigar/cgroup" | ||||
) | ||||
|
||||
const ( | ||||
processorName = "add_process_metadata" | ||||
cacheExpiration = time.Second * 30 | ||||
processorName = "add_process_metadata" | ||||
cacheExpiration = time.Second * 30 | ||||
containerIDMapping = "container.id" | ||||
) | ||||
|
||||
var ( | ||||
|
@@ -47,14 +49,17 @@ var ( | |||
|
||||
procCache = newProcessCache(cacheExpiration, gosysinfoProvider{}) | ||||
|
||||
processCgroupPaths = cgroup.ProcessCgroupPaths | ||||
|
||||
instanceID atomic.Uint32 | ||||
) | ||||
|
||||
type addProcessMetadata struct { | ||||
config config | ||||
provider processMetadataProvider | ||||
log *logp.Logger | ||||
mappings common.MapStr | ||||
config config | ||||
provider processMetadataProvider | ||||
cidProvider cidProvider | ||||
log *logp.Logger | ||||
mappings common.MapStr | ||||
} | ||||
|
||||
type processMetadata struct { | ||||
|
@@ -71,6 +76,10 @@ type processMetadataProvider interface { | |||
GetProcessMetadata(pid int) (*processMetadata, error) | ||||
} | ||||
|
||||
type cidProvider interface { | ||||
GetCid(pid int) (string, error) | ||||
} | ||||
|
||||
func init() { | ||||
processors.RegisterPlugin(processorName, New) | ||||
jsprocessor.RegisterPlugin("AddProcessMetadata", New) | ||||
|
@@ -93,18 +102,50 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces | |||
return nil, errors.Wrapf(err, "fail to unpack the %v configuration", processorName) | ||||
} | ||||
|
||||
p := addProcessMetadata{ | ||||
mappings, err := config.getMappings() | ||||
|
||||
if err != nil { | ||||
return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName) | ||||
} | ||||
|
||||
var p addProcessMetadata | ||||
|
||||
p = addProcessMetadata{ | ||||
config: config, | ||||
provider: provider, | ||||
log: log, | ||||
mappings: mappings, | ||||
} | ||||
if p.mappings, err = config.getMappings(); err != nil { | ||||
return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName) | ||||
// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled | ||||
if ok := containsValue(mappings, "container.id"); ok { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd suggest using built in functions to check keys. beats/libbeat/common/mapstr.go Line 161 in 78e481d
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, it is checking if the value exist or not instead of the key. |
||||
if config.CgroupCacheExpireTime != 0 { | ||||
p.log.Debug("Initializing cgroup cache") | ||||
evictionListener := func(k common.Key, v common.Value) { | ||||
p.log.Debugf("Evicted cached cgroups for PID=%v", k) | ||||
} | ||||
|
||||
cgroupsCache := common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener) | ||||
cgroupsCache.StartJanitor(config.CgroupCacheExpireTime) | ||||
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, cgroupsCache) | ||||
} else { | ||||
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, nil) | ||||
} | ||||
|
||||
} | ||||
|
||||
return &p, nil | ||||
} | ||||
|
||||
// check if the value exist in mapping | ||||
func containsValue(m common.MapStr, v string) bool { | ||||
for _, x := range m { | ||||
if x == v { | ||||
return true | ||||
} | ||||
} | ||||
return false | ||||
} | ||||
|
||||
// Run enriches the given event with the host meta data | ||||
func (p *addProcessMetadata) Run(event *beat.Event) (*beat.Event, error) { | ||||
for _, pidField := range p.config.MatchPIDs { | ||||
|
@@ -156,6 +197,10 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul | |||
} | ||||
meta := metaPtr.fields | ||||
|
||||
if err = p.enrichContainerID(pid, meta); err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
result = event.Clone() | ||||
for dest, sourceIf := range p.mappings { | ||||
source, castOk := sourceIf.(string) | ||||
|
@@ -168,23 +213,41 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul | |||
return nil, errors.Errorf("target field '%s' already exists and overwrite_keys is false", dest) | ||||
} | ||||
} | ||||
|
||||
value, err := meta.GetValue(source) | ||||
if err != nil { | ||||
// Should never happen | ||||
return nil, err | ||||
} | ||||
|
||||
if _, err = result.Put(dest, value); err != nil { | ||||
return nil, err | ||||
} | ||||
} | ||||
|
||||
return result, nil | ||||
} | ||||
|
||||
// enrichContainerID adds container.id into meta for mapping to pickup | ||||
func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) error { | ||||
if p.cidProvider == nil { | ||||
return nil | ||||
} | ||||
cid, err := p.cidProvider.GetCid(pid) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
if _, err = meta.Put("container", common.MapStr{"id": cid}); err != nil { | ||||
return err | ||||
} | ||||
return nil | ||||
} | ||||
|
||||
// String returns the processor representation formatted as a string | ||||
func (p *addProcessMetadata) String() string { | ||||
return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v]", | ||||
return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v, host_path=%v, cgroup_prefixes=%v]", | ||||
processorName, p.config.MatchPIDs, p.mappings, p.config.IgnoreMissing, | ||||
p.config.OverwriteKeys, p.config.RestrictedFields) | ||||
p.config.OverwriteKeys, p.config.RestrictedFields, p.config.HostPath, p.config.CgroupPrefixes) | ||||
} | ||||
|
||||
func (p *processMetadata) toMap() common.MapStr { | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.