Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.procstat): Allow multiple selection criteria #14948

Merged
merged 8 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions plugins/inputs/procstat/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## the native finder performs the search directly in a manor dependent on the
## platform. Default is 'pgrep'
# pid_finder = "pgrep"

## New-style filtering configuration (multiple filter sections are allowed)
# [[inputs.procstat.filter]]
# ## Name of the filter added as 'filter' tag
# name = "shell"
#
# ## Service filters, only one is allowed
# ## Systemd unit names (wildcards are supported)
# # systemd_units = []
# ## CGroup name or path (wildcards are supported)
# # cgroups = []
# ## Supervisor service names of hypervisorctl management
# # supervisor_units = []
# ## Windows service names
# # win_service = []
#
# ## Process filters, multiple are allowed
# ## Regular expressions to use for matching againt the full command
# # patterns = ['.*']
# ## List of users owning the process (wildcards are supported)
# # users = ['*']
# ## List of executable paths of the process (wildcards are supported)
# # executables = ['*']
# ## List of process names (wildcards are supported)
# # process_names = ['*']
# ## Recursion depth for determining children of the matched processes
# ## A negative value means all children with infinite depth
# # recursion_depth = 0
```

### Windows support
Expand Down
226 changes: 226 additions & 0 deletions plugins/inputs/procstat/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package procstat

import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"

"github.com/influxdata/telegraf/filter"
"github.com/shirou/gopsutil/v3/process"
)

type Filter struct {
Name string `toml:"name"`
PidFiles []string `toml:"pid_files"`
SystemdUnits []string `toml:"systemd_units"`
SupervisorUnits []string `toml:"supervisor_units"`
WinService []string `toml:"win_services"`
CGroups []string `toml:"cgroups"`
Patterns []string `toml:"patterns"`
Users []string `toml:"users"`
Executables []string `toml:"executables"`
ProcessNames []string `toml:"process_names"`
RecursionDepth int `toml:"recursion_depth"`

filterSupervisorUnit string
filterCmds []*regexp.Regexp
filterUser filter.Filter
filterExecutable filter.Filter
filterProcessName filter.Filter
}

func (f *Filter) Init() error {
if f.Name == "" {
return errors.New("filter must be named")
}

// Check for only one service selector being active
var active []string
if len(f.PidFiles) > 0 {
active = append(active, "pid_files")
}
if len(f.CGroups) > 0 {
active = append(active, "cgroups")
}
if len(f.SystemdUnits) > 0 {
active = append(active, "systemd_units")
}
if len(f.SupervisorUnits) > 0 {
active = append(active, "supervisor_units")
}
if len(f.WinService) > 0 {
active = append(active, "win_services")
}
if len(active) > 1 {
return fmt.Errorf("cannot select multiple services %q", strings.Join(active, ", "))
}

// Prepare the filters
f.filterCmds = make([]*regexp.Regexp, 0, len(f.Patterns))
for _, p := range f.Patterns {
re, err := regexp.Compile(p)
if err != nil {
return fmt.Errorf("compiling pattern %q of filter %q failed: %w", p, f.Name, err)
}
f.filterCmds = append(f.filterCmds, re)
}

f.filterSupervisorUnit = strings.TrimSpace(strings.Join(f.SupervisorUnits, " "))

var err error
if f.filterUser, err = filter.Compile(f.Users); err != nil {
return fmt.Errorf("compiling users filter for %q failed: %w", f.Name, err)
}
if f.filterExecutable, err = filter.Compile(f.Executables); err != nil {
return fmt.Errorf("compiling executables filter for %q failed: %w", f.Name, err)
}
if f.filterProcessName, err = filter.Compile(f.ProcessNames); err != nil {
return fmt.Errorf("compiling process-names filter for %q failed: %w", f.Name, err)
}

return nil
}

func (f *Filter) ApplyFilter() ([]processGroup, error) {
// Determine processes on service level. if there is no constraint on the
// services, use all processes for matching.
var groups []processGroup
switch {
case len(f.PidFiles) > 0:
g, err := findByPidFiles(f.PidFiles)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case len(f.CGroups) > 0:
g, err := findByCgroups(f.CGroups)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case len(f.SystemdUnits) > 0:
g, err := findBySystemdUnits(f.CGroups)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case f.filterSupervisorUnit != "":
g, err := findBySupervisorUnits(f.filterSupervisorUnit)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case len(f.WinService) > 0:
g, err := findByWindowsServices(f.WinService)
if err != nil {
return nil, err
}
groups = append(groups, g...)
default:
procs, err := process.Processes()
if err != nil {
return nil, err
}
groups = append(groups, processGroup{processes: procs, tags: make(map[string]string)})
}

// Filter by additional properties such as users, patterns etc
result := make([]processGroup, 0, len(groups))
for _, g := range groups {
var matched []*process.Process
for _, p := range g.processes {
// Users
if f.filterUser != nil {
if username, err := p.Username(); err != nil || !f.filterUser.Match(username) {
// Errors can happen if we don't have permissions or the process no longer exists
continue
}
}

// Executables
if f.filterExecutable != nil {
if exe, err := p.Exe(); err != nil || !f.filterExecutable.Match(exe) {
continue
}
}

// Process names
if f.filterProcessName != nil {
if name, err := p.Name(); err != nil || !f.filterProcessName.Match(name) {
continue
}
}

// Patterns
if len(f.filterCmds) > 0 {
cmd, err := p.Cmdline()
if err != nil {
// This can happen if we don't have permissions or the process no longer exists
continue
}
var found bool
for _, re := range f.filterCmds {
if re.MatchString(cmd) {
found = true
break
}
}
if !found {
continue
}
}

matched = append(matched, p)
}
result = append(result, processGroup{processes: matched, tags: g.tags})
}

// Resolve children down to the requested depth
previous := result
for depth := 0; depth < f.RecursionDepth || f.RecursionDepth < 0; depth++ {
children := make([]processGroup, 0, len(previous))
for _, group := range previous {
for _, p := range group.processes {
c, err := getChildren(p)
if err != nil {
return nil, fmt.Errorf("unable to get children of process %d: %w", p.Pid, err)
}
if len(c) == 0 {
continue
}

tags := make(map[string]string, len(group.tags)+1)
for k, v := range group.tags {
tags[k] = v
}
tags["parent_pid"] = strconv.FormatInt(int64(p.Pid), 10)
children = append(children, processGroup{
processes: c,
tags: tags,
})
}
}
if len(children) == 0 {
break
}
result = append(result, children...)
previous = children
}

return result, nil
}

func getChildren(p *process.Process) ([]*process.Process, error) {
children, err := p.Children()
// Check for cases that do not really mean error but rather means that there
// is no match.
switch {
case err == nil,
errors.Is(err, process.ErrorNoChildren),
strings.Contains(err.Error(), "exit status 1"):
return children, nil
}
return nil, fmt.Errorf("unable to get children of process %d: %w", p.Pid, err)
}
45 changes: 45 additions & 0 deletions plugins/inputs/procstat/os_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
package procstat

import (
"context"
"errors"
"fmt"

"github.com/coreos/go-systemd/v22/dbus"
"github.com/shirou/gopsutil/v3/process"
)

Expand All @@ -31,3 +34,45 @@ func collectMemmap(proc Process, prefix string, fields map[string]any) {
fields[prefix+"memory_swap"] = memMap.Swap
}
}

func findBySystemdUnits(units []string) ([]processGroup, error) {
ctx := context.Background()
conn, err := dbus.NewSystemConnectionContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to connect to systemd: %w", err)
}
defer conn.Close()

sdunits, err := conn.ListUnitsByPatternsContext(ctx, []string{"enabled", "disabled", "static"}, units)
if err != nil {
return nil, fmt.Errorf("failed to list units: %w", err)
}

groups := make([]processGroup, 0, len(sdunits))
for _, u := range sdunits {
prop, err := conn.GetUnitTypePropertyContext(ctx, u.Name, "Service", "MainPID")
if err != nil {
// This unit might not be a service or similar
continue
}
raw := prop.Value.Value()
pid, ok := raw.(uint32)
if !ok {
return nil, fmt.Errorf("failed to parse PID %v of unit %q: invalid type %T", raw, u, raw)
}
p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of unit %q: %w", pid, u, err)
}
groups = append(groups, processGroup{
processes: []*process.Process{p},
tags: map[string]string{"systemd_unit": u.Name},
})
}

return groups, nil
}

func findByWindowsServices(_ []string) ([]processGroup, error) {
return nil, nil
}
8 changes: 8 additions & 0 deletions plugins/inputs/procstat/os_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,11 @@ func queryPidWithWinServiceName(_ string) (uint32, error) {
}

func collectMemmap(Process, string, map[string]any) {}

func findBySystemdUnits(_ []string) ([]processGroup, error) {
return nil, nil
}

func findByWindowsServices(_ []string) ([]processGroup, error) {
return nil, nil
}
27 changes: 27 additions & 0 deletions plugins/inputs/procstat/os_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package procstat

import (
"errors"
"fmt"
"unsafe"

"github.com/shirou/gopsutil/v3/process"
Expand Down Expand Up @@ -55,3 +56,29 @@ func queryPidWithWinServiceName(winServiceName string) (uint32, error) {
}

func collectMemmap(Process, string, map[string]any) {}

func findBySystemdUnits(_ []string) ([]processGroup, error) {
return nil, nil
}

func findByWindowsServices(services []string) ([]processGroup, error) {
groups := make([]processGroup, 0, len(services))
for _, service := range services {
pid, err := queryPidWithWinServiceName(service)
if err != nil {
return nil, fmt.Errorf("failed to query PID of service %q: %w", service, err)
}

p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of service %q: %w", pid, service, err)
}

groups = append(groups, processGroup{
processes: []*process.Process{p},
tags: map[string]string{"win_service": service},
})
}

return groups, nil
}
Loading
Loading