Skip to content

Commit

Permalink
feat(inputs.procstat): Allow multiple selection criteria (#14948)
Browse files Browse the repository at this point in the history
Co-authored-by: Joshua Powers <[email protected]>
  • Loading branch information
srebhan and powersj authored Apr 19, 2024
1 parent fa0dbba commit 2acae45
Show file tree
Hide file tree
Showing 8 changed files with 678 additions and 34 deletions.
28 changes: 28 additions & 0 deletions plugins/inputs/procstat/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## the native finder performs the search directly in a manor dependent on the
## platform. Default is 'pgrep'
# pid_finder = "pgrep"

## New-style filtering configuration (multiple filter sections are allowed)
# [[inputs.procstat.filter]]
# ## Name of the filter added as 'filter' tag
# name = "shell"
#
# ## Service filters, only one is allowed
# ## Systemd unit names (wildcards are supported)
# # systemd_units = []
# ## CGroup name or path (wildcards are supported)
# # cgroups = []
# ## Supervisor service names of hypervisorctl management
# # supervisor_units = []
# ## Windows service names
# # win_service = []
#
# ## Process filters, multiple are allowed
# ## Regular expressions to use for matching againt the full command
# # patterns = ['.*']
# ## List of users owning the process (wildcards are supported)
# # users = ['*']
# ## List of executable paths of the process (wildcards are supported)
# # executables = ['*']
# ## List of process names (wildcards are supported)
# # process_names = ['*']
# ## Recursion depth for determining children of the matched processes
# ## A negative value means all children with infinite depth
# # recursion_depth = 0
```

### Windows support
Expand Down
226 changes: 226 additions & 0 deletions plugins/inputs/procstat/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package procstat

import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"

"github.com/influxdata/telegraf/filter"
"github.com/shirou/gopsutil/v3/process"
)

type Filter struct {
Name string `toml:"name"`
PidFiles []string `toml:"pid_files"`
SystemdUnits []string `toml:"systemd_units"`
SupervisorUnits []string `toml:"supervisor_units"`
WinService []string `toml:"win_services"`
CGroups []string `toml:"cgroups"`
Patterns []string `toml:"patterns"`
Users []string `toml:"users"`
Executables []string `toml:"executables"`
ProcessNames []string `toml:"process_names"`
RecursionDepth int `toml:"recursion_depth"`

filterSupervisorUnit string
filterCmds []*regexp.Regexp
filterUser filter.Filter
filterExecutable filter.Filter
filterProcessName filter.Filter
}

func (f *Filter) Init() error {
if f.Name == "" {
return errors.New("filter must be named")
}

// Check for only one service selector being active
var active []string
if len(f.PidFiles) > 0 {
active = append(active, "pid_files")
}
if len(f.CGroups) > 0 {
active = append(active, "cgroups")
}
if len(f.SystemdUnits) > 0 {
active = append(active, "systemd_units")
}
if len(f.SupervisorUnits) > 0 {
active = append(active, "supervisor_units")
}
if len(f.WinService) > 0 {
active = append(active, "win_services")
}
if len(active) > 1 {
return fmt.Errorf("cannot select multiple services %q", strings.Join(active, ", "))
}

// Prepare the filters
f.filterCmds = make([]*regexp.Regexp, 0, len(f.Patterns))
for _, p := range f.Patterns {
re, err := regexp.Compile(p)
if err != nil {
return fmt.Errorf("compiling pattern %q of filter %q failed: %w", p, f.Name, err)
}
f.filterCmds = append(f.filterCmds, re)
}

f.filterSupervisorUnit = strings.TrimSpace(strings.Join(f.SupervisorUnits, " "))

var err error
if f.filterUser, err = filter.Compile(f.Users); err != nil {
return fmt.Errorf("compiling users filter for %q failed: %w", f.Name, err)
}
if f.filterExecutable, err = filter.Compile(f.Executables); err != nil {
return fmt.Errorf("compiling executables filter for %q failed: %w", f.Name, err)
}
if f.filterProcessName, err = filter.Compile(f.ProcessNames); err != nil {
return fmt.Errorf("compiling process-names filter for %q failed: %w", f.Name, err)
}

return nil
}

func (f *Filter) ApplyFilter() ([]processGroup, error) {
// Determine processes on service level. if there is no constraint on the
// services, use all processes for matching.
var groups []processGroup
switch {
case len(f.PidFiles) > 0:
g, err := findByPidFiles(f.PidFiles)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case len(f.CGroups) > 0:
g, err := findByCgroups(f.CGroups)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case len(f.SystemdUnits) > 0:
g, err := findBySystemdUnits(f.CGroups)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case f.filterSupervisorUnit != "":
g, err := findBySupervisorUnits(f.filterSupervisorUnit)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case len(f.WinService) > 0:
g, err := findByWindowsServices(f.WinService)
if err != nil {
return nil, err
}
groups = append(groups, g...)
default:
procs, err := process.Processes()
if err != nil {
return nil, err
}
groups = append(groups, processGroup{processes: procs, tags: make(map[string]string)})
}

// Filter by additional properties such as users, patterns etc
result := make([]processGroup, 0, len(groups))
for _, g := range groups {
var matched []*process.Process
for _, p := range g.processes {
// Users
if f.filterUser != nil {
if username, err := p.Username(); err != nil || !f.filterUser.Match(username) {
// Errors can happen if we don't have permissions or the process no longer exists
continue
}
}

// Executables
if f.filterExecutable != nil {
if exe, err := p.Exe(); err != nil || !f.filterExecutable.Match(exe) {
continue
}
}

// Process names
if f.filterProcessName != nil {
if name, err := p.Name(); err != nil || !f.filterProcessName.Match(name) {
continue
}
}

// Patterns
if len(f.filterCmds) > 0 {
cmd, err := p.Cmdline()
if err != nil {
// This can happen if we don't have permissions or the process no longer exists
continue
}
var found bool
for _, re := range f.filterCmds {
if re.MatchString(cmd) {
found = true
break
}
}
if !found {
continue
}
}

matched = append(matched, p)
}
result = append(result, processGroup{processes: matched, tags: g.tags})
}

// Resolve children down to the requested depth
previous := result
for depth := 0; depth < f.RecursionDepth || f.RecursionDepth < 0; depth++ {
children := make([]processGroup, 0, len(previous))
for _, group := range previous {
for _, p := range group.processes {
c, err := getChildren(p)
if err != nil {
return nil, fmt.Errorf("unable to get children of process %d: %w", p.Pid, err)
}
if len(c) == 0 {
continue
}

tags := make(map[string]string, len(group.tags)+1)
for k, v := range group.tags {
tags[k] = v
}
tags["parent_pid"] = strconv.FormatInt(int64(p.Pid), 10)
children = append(children, processGroup{
processes: c,
tags: tags,
})
}
}
if len(children) == 0 {
break
}
result = append(result, children...)
previous = children
}

return result, nil
}

func getChildren(p *process.Process) ([]*process.Process, error) {
children, err := p.Children()
// Check for cases that do not really mean error but rather means that there
// is no match.
switch {
case err == nil,
errors.Is(err, process.ErrorNoChildren),
strings.Contains(err.Error(), "exit status 1"):
return children, nil
}
return nil, fmt.Errorf("unable to get children of process %d: %w", p.Pid, err)
}
45 changes: 45 additions & 0 deletions plugins/inputs/procstat/os_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
package procstat

import (
"context"
"errors"
"fmt"

"github.com/coreos/go-systemd/v22/dbus"
"github.com/shirou/gopsutil/v3/process"
)

Expand All @@ -31,3 +34,45 @@ func collectMemmap(proc Process, prefix string, fields map[string]any) {
fields[prefix+"memory_swap"] = memMap.Swap
}
}

func findBySystemdUnits(units []string) ([]processGroup, error) {
ctx := context.Background()
conn, err := dbus.NewSystemConnectionContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to connect to systemd: %w", err)
}
defer conn.Close()

sdunits, err := conn.ListUnitsByPatternsContext(ctx, []string{"enabled", "disabled", "static"}, units)
if err != nil {
return nil, fmt.Errorf("failed to list units: %w", err)
}

groups := make([]processGroup, 0, len(sdunits))
for _, u := range sdunits {
prop, err := conn.GetUnitTypePropertyContext(ctx, u.Name, "Service", "MainPID")
if err != nil {
// This unit might not be a service or similar
continue
}
raw := prop.Value.Value()
pid, ok := raw.(uint32)
if !ok {
return nil, fmt.Errorf("failed to parse PID %v of unit %q: invalid type %T", raw, u, raw)
}
p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of unit %q: %w", pid, u, err)
}
groups = append(groups, processGroup{
processes: []*process.Process{p},
tags: map[string]string{"systemd_unit": u.Name},
})
}

return groups, nil
}

func findByWindowsServices(_ []string) ([]processGroup, error) {
return nil, nil
}
8 changes: 8 additions & 0 deletions plugins/inputs/procstat/os_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,11 @@ func queryPidWithWinServiceName(_ string) (uint32, error) {
}

func collectMemmap(Process, string, map[string]any) {}

func findBySystemdUnits(_ []string) ([]processGroup, error) {
return nil, nil
}

func findByWindowsServices(_ []string) ([]processGroup, error) {
return nil, nil
}
27 changes: 27 additions & 0 deletions plugins/inputs/procstat/os_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package procstat

import (
"errors"
"fmt"
"unsafe"

"github.com/shirou/gopsutil/v3/process"
Expand Down Expand Up @@ -55,3 +56,29 @@ func queryPidWithWinServiceName(winServiceName string) (uint32, error) {
}

func collectMemmap(Process, string, map[string]any) {}

func findBySystemdUnits(_ []string) ([]processGroup, error) {
return nil, nil
}

func findByWindowsServices(services []string) ([]processGroup, error) {
groups := make([]processGroup, 0, len(services))
for _, service := range services {
pid, err := queryPidWithWinServiceName(service)
if err != nil {
return nil, fmt.Errorf("failed to query PID of service %q: %w", service, err)
}

p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of service %q: %w", pid, service, err)
}

groups = append(groups, processGroup{
processes: []*process.Process{p},
tags: map[string]string{"win_service": service},
})
}

return groups, nil
}
Loading

0 comments on commit 2acae45

Please sign in to comment.