diff --git a/go.mod b/go.mod index e2a5d99392b5d..8c6f2231066da 100644 --- a/go.mod +++ b/go.mod @@ -205,7 +205,7 @@ require ( golang.org/x/net v0.24.0 golang.org/x/oauth2 v0.20.0 golang.org/x/sync v0.7.0 - golang.org/x/sys v0.19.0 + golang.org/x/sys v0.20.0 golang.org/x/term v0.19.0 golang.org/x/text v0.14.0 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20211230205640-daad0b7ba671 @@ -454,7 +454,7 @@ require ( github.com/twmb/murmur3 v1.1.7 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/uber/jaeger-lib v2.4.1+incompatible // indirect - github.com/vishvananda/netlink v1.2.1-beta.2 // indirect + github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21 github.com/vishvananda/netns v0.0.4 github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect diff --git a/go.sum b/go.sum index 0a37b4b5e1c89..2632d14be5ac3 100644 --- a/go.sum +++ b/go.sum @@ -2256,6 +2256,8 @@ github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241/go.mod h1:aGkV5x github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs= github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= +github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21 h1:tcHUxOT8j/R+0S+A1j8D2InqguXFNxAiij+8QFOlX7Y= +github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21/go.mod h1:whJevzBpTrid75eZy99s3DqCmy05NfibNaF2Ol5Ox5A= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= @@ -2752,6 +2754,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -2759,6 +2762,8 @@ golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/plugins/inputs/procstat/README.md b/plugins/inputs/procstat/README.md index 5f91f400450d9..378a30a8bee36 100644 --- a/plugins/inputs/procstat/README.md +++ b/plugins/inputs/procstat/README.md @@ -64,17 +64,33 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## result in a large number of series, especially with short-lived processes, ## creating high cardinality at the output. ## Available options are: - ## cmdline -- full commandline - ## pid -- ID of the process - ## ppid -- ID of the process' parent - ## status -- state of the process - ## user -- username owning the process + ## cmdline -- full commandline + ## pid -- ID of the process + ## ppid -- ID of the process' parent + ## status -- state of the process + ## user -- username owning the process + ## protocol -- protocol type of the process socket # tag_with = [] ## Properties to collect - ## Available options are "cpu", "limits", "memory", "mmap" + ## Available options are + ## cpu -- CPU usage statistics + ## limits -- set resource limits + ## memory -- memory usage statistics + ## mmap -- mapped memory usage statistics (caution: can cause high load) + ## sockets -- socket statistics for protocols in 'socket_protocols' (Linux only) # properties = ["cpu", "limits", "memory", "mmap"] + ## Protocol filter for the sockets property (Linux only) + ## Available options are + ## all -- all of the protocols below + ## tcp4 -- TCP socket statistics for IPv4 + ## tcp6 -- TCP socket statistics for IPv6 + ## udp4 -- UDP socket statistics for IPv4 + ## udp6 -- UDP socket statistics for IPv6 + ## unix -- Unix socket statistics + # socket_protocols = ["all"] + ## Method to use when finding process IDs. Can be one of 'pgrep', or ## 'native'. The pgrep finder calls the pgrep executable in the PATH while ## the native finder performs the search directly in a manor dependent on the @@ -141,8 +157,8 @@ Below are an example set of tags and fields: - procstat - tags: - - pid (when `pid_tag` is true) - - cmdline (when 'cmdline_tag' is true) + - pid (if requested) + - cmdline (if requested) - process_name - pidfile (when defined) - exe (when defined) @@ -231,6 +247,36 @@ Below are an example set of tags and fields: - pid_count (int) - running (int) - result_code (int, success = 0, lookup_error = 1) +- procstat_socket (if configured, Linux only) + - tags: + - pid (if requested) + - protocol (if requested) + - cmdline (if requested) + - process_name + - pidfile (when defined) + - exe (when defined) + - pattern (when defined) + - user (when selected) + - systemd_unit (when defined) + - cgroup (when defined) + - cgroup_full (when cgroup or systemd_unit is used with glob) + - supervisor_unit (when defined) + - win_service (when defined) + - fields: + - protocol + - state + - pid + - src + - src_port (tcp and udp sockets only) + - dest (tcp and udp sockets only) + - dest_port (tcp and udp sockets only) + - bytes_received (tcp sockets only) + - bytes_sent (tcp sockets only) + - lost (tcp sockets only) + - retransmits (tcp sockets only) + - rx_queue + - tx_queue + - inode (unix sockets only) *NOTE: Resource limit > 2147483647 will be reported as 2147483647.* @@ -239,4 +285,5 @@ Below are an example set of tags and fields: ```text procstat_lookup,host=prash-laptop,pattern=influxd,pid_finder=pgrep,result=success pid_count=1i,running=1i,result_code=0i 1582089700000000000 procstat,host=prash-laptop,pattern=influxd,process_name=influxd,user=root involuntary_context_switches=151496i,child_minor_faults=1061i,child_major_faults=8i,cpu_time_user=2564.81,pid=32025i,major_faults=8609i,created_at=1580107536000000000i,voluntary_context_switches=1058996i,cpu_time_system=616.98,memory_swap=0i,memory_locked=0i,memory_usage=1.7797634601593018,num_threads=18i,cpu_time_iowait=0,memory_rss=148643840i,memory_vms=1435688960i,memory_data=0i,memory_stack=0i,minor_faults=1856550i 1582089700000000000 +procstat_socket,host=prash-laptop,process_name=browser,protocol=tcp4 bytes_received=826987i,bytes_sent=32869i,dest="192.168.0.2",dest_port=443i,lost=0i,pid=32025i,retransmits=0i,rx_queue=0i,src="192.168.0.1",src_port=52106i,state="established",tx_queue=0i 1582089700000000000 ``` diff --git a/plugins/inputs/procstat/os_linux.go b/plugins/inputs/procstat/os_linux.go index 2b25e7be7fe11..9e276b8de533c 100644 --- a/plugins/inputs/procstat/os_linux.go +++ b/plugins/inputs/procstat/os_linux.go @@ -3,17 +3,25 @@ package procstat import ( + "bytes" "context" "errors" "fmt" "os" - - "github.com/prometheus/procfs" + "strconv" + "strings" + "syscall" "github.com/coreos/go-systemd/v22/dbus" + "github.com/prometheus/procfs" + gopsnet "github.com/shirou/gopsutil/v3/net" "github.com/shirou/gopsutil/v3/process" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" ) +const supportsSocketStat = true + func processName(p *process.Process) (string, error) { return p.Exe() } @@ -103,3 +111,277 @@ func collectTotalReadWrite(proc Process) (r, w uint64, err error) { return stat.RChar, stat.WChar, nil } + +/* Socket statistics functions */ +type SocketState uint8 + +func (s SocketState) String() string { + switch uint8(s) { + case unix.BPF_TCP_ESTABLISHED: + return "established" + case unix.BPF_TCP_SYN_SENT: + return "syn-sent" + case unix.BPF_TCP_SYN_RECV: + return "syn-recv" + case unix.BPF_TCP_FIN_WAIT1: + return "fin-wait1" + case unix.BPF_TCP_FIN_WAIT2: + return "fin-wait2" + case unix.BPF_TCP_TIME_WAIT: + return "time-wait" + case unix.BPF_TCP_CLOSE: + return "close" + case unix.BPF_TCP_CLOSE_WAIT: + return "close-wait" + case unix.BPF_TCP_LAST_ACK: + return "last-ack" + case unix.BPF_TCP_LISTEN: + return "listen" + case unix.BPF_TCP_CLOSING: + return "closing" + case unix.BPF_TCP_NEW_SYN_RECV: + return "sync-recv" + } + + return "unknown" +} + +func mapFdToInode(pid int32, fd uint32) (uint32, error) { + fn := fmt.Sprintf("/proc/%d/fd/%d", pid, fd) + link, err := os.Readlink(fn) + if err != nil { + return 0, fmt.Errorf("reading link failed: %w", err) + } + target := strings.TrimPrefix(link, "socket:[") + target = strings.TrimSuffix(target, "]") + inode, err := strconv.ParseUint(target, 10, 32) + if err != nil { + return 0, fmt.Errorf("parsing link %q: %w", link, err) + } + + return uint32(inode), nil +} + +func statsTCP(conns []gopsnet.ConnectionStat, family uint8) ([]map[string]interface{}, error) { + if len(conns) == 0 { + return nil, nil + } + + // For TCP we need the inode for each connection to relate the connection + // statistics to the actual process socket. Therefore, map the + // file-descriptors to inodes using the /proc//fd entries. + inodes := make(map[uint32]gopsnet.ConnectionStat, len(conns)) + for _, c := range conns { + inode, err := mapFdToInode(c.Pid, c.Fd) + if err != nil { + panic(fmt.Errorf("mapping fd %d of pid %d failed: %w", c.Fd, c.Pid, err)) + } + inodes[inode] = c + } + + // Get the TCP socket statistics from the netlink socket. + responses, err := netlink.SocketDiagTCPInfo(family) + if err != nil { + return nil, fmt.Errorf("connecting to diag socket failed: %w", err) + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0) + for _, r := range responses { + c, found := inodes[r.InetDiagMsg.INode] + if !found { + // The inode does not belong to the process. + continue + } + + var proto string + switch r.InetDiagMsg.Family { + case syscall.AF_INET: + proto = "tcp4" + case syscall.AF_INET6: + proto = "tcp6" + default: + continue + } + + fmt.Printf("inetdiag: %+v\n", r.InetDiagMsg) + fields := map[string]interface{}{ + "protocol": proto, + "state": SocketState(r.InetDiagMsg.State).String(), + "pid": c.Pid, + "src": r.InetDiagMsg.ID.Source.String(), + "src_port": r.InetDiagMsg.ID.SourcePort, + "dest": r.InetDiagMsg.ID.Destination.String(), + "dest_port": r.InetDiagMsg.ID.DestinationPort, + "bytes_received": r.TCPInfo.Bytes_received, + "bytes_sent": r.TCPInfo.Bytes_sent, + "lost": r.TCPInfo.Lost, + "retransmits": r.TCPInfo.Retransmits, + "rx_queue": r.InetDiagMsg.RQueue, + "tx_queue": r.InetDiagMsg.WQueue, + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} + +func statsUDP(conns []gopsnet.ConnectionStat, family uint8) ([]map[string]interface{}, error) { + if len(conns) == 0 { + return nil, nil + } + + // For UDP we need the inode for each connection to relate the connection + // statistics to the actual process socket. Therefore, map the + // file-descriptors to inodes using the /proc//fd entries. + inodes := make(map[uint32]gopsnet.ConnectionStat, len(conns)) + for _, c := range conns { + inode, err := mapFdToInode(c.Pid, c.Fd) + if err != nil { + panic(fmt.Errorf("mapping fd %d of pid %d failed: %w", c.Fd, c.Pid, err)) + } + inodes[inode] = c + } + + // Get the UDP socket statistics from the netlink socket. + responses, err := netlink.SocketDiagUDPInfo(family) + if err != nil { + return nil, fmt.Errorf("connecting to diag socket failed: %w", err) + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0) + for _, r := range responses { + c, found := inodes[r.InetDiagMsg.INode] + if !found { + // The inode does not belong to the process. + continue + } + + var proto string + switch r.InetDiagMsg.Family { + case syscall.AF_INET: + proto = "udp4" + case syscall.AF_INET6: + proto = "udp6" + default: + continue + } + + fields := map[string]interface{}{ + "protocol": proto, + "state": SocketState(r.InetDiagMsg.State).String(), + "pid": c.Pid, + "src": r.InetDiagMsg.ID.Source.String(), + "src_port": r.InetDiagMsg.ID.SourcePort, + "dest": r.InetDiagMsg.ID.Destination.String(), + "dest_port": r.InetDiagMsg.ID.DestinationPort, + "rx_queue": r.InetDiagMsg.RQueue, + "tx_queue": r.InetDiagMsg.WQueue, + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} + +func statsUnix(conns []gopsnet.ConnectionStat) ([]map[string]interface{}, error) { + // We need to read the inode for each connection to relate the connection + // statistics to the actual process socket. Therefore, map the + // file-descriptors to inodes using the /proc//fd entries. + inodes := make(map[uint32]gopsnet.ConnectionStat, len(conns)) + for _, c := range conns { + inodes[c.Fd] = c + } + + // Get the UDP socket statistics from the netlink socket. + responses, err := netlink.UnixSocketDiagInfo() + if err != nil { + return nil, fmt.Errorf("connecting to diag socket failed: %w", err) + } + + // Filter the responses via the inodes belonging to the process + fieldslist := make([]map[string]interface{}, 0) + for _, r := range responses { + c, found := inodes[r.DiagMsg.INode] + if !found { + // The inode does not belong to the process. + continue + } + + fields := map[string]interface{}{ + "protocol": "unix", + "state": SocketState(r.DiagMsg.State).String(), + "pid": c.Pid, + "src": c.Laddr.IP, + "rx_queue": r.Queue.RQueue, + "tx_queue": r.Queue.WQueue, + "inode": c.Fd, + } + if r.Name != nil { + fields["name"] = strings.Trim(*r.Name, " \t\n\r") + } + if r.Peer != nil { + fields["peer"] = *r.Peer + } + fieldslist = append(fieldslist, fields) + } + + return fieldslist, nil +} + +func unixConnectionsPid(pid int32) ([]gopsnet.ConnectionStat, error) { + file := fmt.Sprintf("/proc/%d/net/unix", pid) + + // Read the contents of the /proc file with a single read sys call. + // This minimizes duplicates in the returned connections + // For more info: + // https://github.com/shirou/gopsutil/pull/361 + contents, err := os.ReadFile(file) + if err != nil { + return nil, err + } + + lines := bytes.Split(contents, []byte("\n")) + conns := make([]gopsnet.ConnectionStat, 0, len(lines)-1) + duplicate := make(map[string]bool, len(conns)) + // skip first line + for _, line := range lines[1:] { + tokens := strings.Fields(string(line)) + if len(tokens) < 6 { + continue + } + st, err := strconv.Atoi(tokens[4]) + if err != nil { + return nil, err + } + inode, err := strconv.Atoi(tokens[6]) + if err != nil { + return nil, err + } + + var path string + if len(tokens) == 8 { + path = tokens[len(tokens)-1] + } + + c := gopsnet.ConnectionStat{ + Fd: uint32(inode), + Family: unix.AF_UNIX, + Type: uint32(st), + Laddr: gopsnet.Addr{IP: path}, + Pid: pid, + Status: "NONE", + } + + // Check if we already go this connection + key := fmt.Sprintf("%d-%s:%d-%s:%d-%s", c.Type, c.Laddr.IP, c.Laddr.Port, c.Raddr.IP, c.Raddr.Port, c.Status) + if duplicate[key] { + continue + } + duplicate[key] = true + conns = append(conns, c) + } + + return conns, nil +} diff --git a/plugins/inputs/procstat/os_others.go b/plugins/inputs/procstat/os_others.go index 105226940e0f5..9a4a75898fa54 100644 --- a/plugins/inputs/procstat/os_others.go +++ b/plugins/inputs/procstat/os_others.go @@ -5,27 +5,46 @@ package procstat import ( "errors" + "github.com/shirou/gopsutil/v3/net" + gopsnet "github.com/shirou/gopsutil/v3/net" "github.com/shirou/gopsutil/v3/process" ) +const supportsSocketStat = false + func processName(p *process.Process) (string, error) { return p.Exe() } -func queryPidWithWinServiceName(_ string) (uint32, error) { +func queryPidWithWinServiceName(string) (uint32, error) { return 0, errors.New("os not supporting win_service option") } func collectMemmap(Process, string, map[string]any) {} -func findBySystemdUnits(_ []string) ([]processGroup, error) { +func findBySystemdUnits([]string) ([]processGroup, error) { return nil, nil } -func findByWindowsServices(_ []string) ([]processGroup, error) { +func findByWindowsServices([]string) ([]processGroup, error) { return nil, nil } -func collectTotalReadWrite(_ Process) (r, w uint64, err error) { +func collectTotalReadWrite(Process) (r, w uint64, err error) { return 0, 0, errors.ErrUnsupported } + +func unixConnectionsPid(int32) ([]gopsnet.ConnectionStat, error) { + return nil, errors.ErrUnsupported +} + +func statsTCP([]net.ConnectionStat, uint8) ([]map[string]interface{}, error) { + return nil, errors.ErrUnsupported +} + +func statsUDP([]net.ConnectionStat, uint8) ([]map[string]interface{}, error) { + return nil, errors.ErrUnsupported +} +func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) { + return nil, errors.ErrUnsupported +} diff --git a/plugins/inputs/procstat/os_windows.go b/plugins/inputs/procstat/os_windows.go index e93bcf58b8f0b..eccc107390020 100644 --- a/plugins/inputs/procstat/os_windows.go +++ b/plugins/inputs/procstat/os_windows.go @@ -7,11 +7,15 @@ import ( "fmt" "unsafe" + "github.com/shirou/gopsutil/v3/net" + gopsnet "github.com/shirou/gopsutil/v3/net" "github.com/shirou/gopsutil/v3/process" "golang.org/x/sys/windows" "golang.org/x/sys/windows/svc/mgr" ) +const supportsSocketStat = false + func processName(p *process.Process) (string, error) { return p.Name() } @@ -57,7 +61,7 @@ func queryPidWithWinServiceName(winServiceName string) (uint32, error) { func collectMemmap(Process, string, map[string]any) {} -func findBySystemdUnits(_ []string) ([]processGroup, error) { +func findBySystemdUnits([]string) ([]processGroup, error) { return nil, nil } @@ -83,6 +87,21 @@ func findByWindowsServices(services []string) ([]processGroup, error) { return groups, nil } -func collectTotalReadWrite(_ Process) (r, w uint64, err error) { +func collectTotalReadWrite(Process) (r, w uint64, err error) { return 0, 0, errors.ErrUnsupported } + +func unixConnectionsPid(int32) ([]gopsnet.ConnectionStat, error) { + return nil, errors.ErrUnsupported +} + +func statsTCP([]net.ConnectionStat, uint8) ([]map[string]interface{}, error) { + return nil, errors.ErrUnsupported +} + +func statsUDP([]net.ConnectionStat, uint8) ([]map[string]interface{}, error) { + return nil, errors.ErrUnsupported +} +func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) { + return nil, errors.ErrUnsupported +} diff --git a/plugins/inputs/procstat/process.go b/plugins/inputs/procstat/process.go index bb92bc6750a40..0b918579b6819 100644 --- a/plugins/inputs/procstat/process.go +++ b/plugins/inputs/procstat/process.go @@ -2,10 +2,13 @@ package procstat import ( "errors" + "fmt" "runtime" "strconv" + "syscall" "time" + gopsnet "github.com/shirou/gopsutil/v3/net" "github.com/shirou/gopsutil/v3/process" "github.com/influxdata/telegraf" @@ -17,7 +20,7 @@ type Process interface { Name() (string, error) SetTag(string, string) MemoryMaps(bool) (*[]process.MemoryMapsStat, error) - Metric(string, *collectionConfig) telegraf.Metric + Metrics(string, *collectionConfig, time.Time) ([]telegraf.Metric, error) } type PIDFinder interface { @@ -66,7 +69,7 @@ func (p *Proc) percent(_ time.Duration) (float64, error) { } // Add metrics a single Process -func (p *Proc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric { +func (p *Proc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) { if prefix != "" { prefix += "_" } @@ -245,5 +248,109 @@ func (p *Proc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric { } } - return metric.New("procstat", p.tags, fields, time.Time{}) + metrics := []telegraf.Metric{metric.New("procstat", p.tags, fields, t)} + + // Collect the socket statistics if requested + if cfg.features["sockets"] { + for _, protocol := range cfg.socketProtos { + // Get the requested connections for the PID + var fieldlist []map[string]interface{} + switch protocol { + case "all": + conns, err := gopsnet.ConnectionsPid(protocol, p.Pid) + if err != nil { + return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid) + } + var connsTCPv4, connsTCPv6, connsUDPv4, connsUDPv6 []gopsnet.ConnectionStat + for _, c := range conns { + switch { + case c.Family == syscall.AF_INET && c.Type == syscall.SOCK_STREAM: + connsTCPv4 = append(connsTCPv4, c) + case c.Family == syscall.AF_INET6 && c.Type == syscall.SOCK_STREAM: + connsTCPv6 = append(connsTCPv6, c) + case c.Family == syscall.AF_INET && c.Type == syscall.SOCK_DGRAM: + connsUDPv4 = append(connsUDPv4, c) + case c.Family == syscall.AF_INET6 && c.Type == syscall.SOCK_DGRAM: + connsUDPv6 = append(connsUDPv6, c) + } + } + connsUnix, err := unixConnectionsPid(p.Pid) + if err != nil { + return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid) + } + + fl, err := statsTCP(connsTCPv4, syscall.AF_INET) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"tcp4\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + + fl, err = statsTCP(connsTCPv6, syscall.AF_INET6) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"tcp6\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + + fl, err = statsUDP(connsUDPv4, syscall.AF_INET) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"udp4\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + + fl, err = statsUDP(connsUDPv6, syscall.AF_INET6) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"udp6\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + + fl, err = statsUnix(connsUnix) + if err != nil { + return metrics, fmt.Errorf("cannot get statistics for \"unix\" of PID %d", p.Pid) + } + fieldlist = append(fieldlist, fl...) + case "tcp4", "tcp6": + family := uint8(syscall.AF_INET) + if protocol == "tcp6" { + family = syscall.AF_INET6 + } + conns, err := gopsnet.ConnectionsPid(protocol, p.Pid) + if err != nil { + return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid) + } + if fieldlist, err = statsTCP(conns, family); err != nil { + return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid) + } + case "udp4", "udp6": + family := uint8(syscall.AF_INET) + if protocol == "udp6" { + family = syscall.AF_INET6 + } + conns, err := gopsnet.ConnectionsPid(protocol, p.Pid) + if err != nil { + return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid) + } + if fieldlist, err = statsUDP(conns, family); err != nil { + return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid) + } + case "unix": + conns, err := unixConnectionsPid(p.Pid) + if err != nil { + return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid) + } + if fieldlist, err = statsUnix(conns); err != nil { + return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid) + } + } + + for _, fields := range fieldlist { + if cfg.tagging["protocol"] { + p.tags["protocol"] = fields["protocol"].(string) + delete(fields, "protocol") + } + metrics = append(metrics, metric.New("procstat_socket", p.tags, fields, t)) + } + } + } + + return metrics, nil } diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index 51ed32393ed2c..6c630768c6327 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -30,9 +30,10 @@ var execCommand = exec.Command type PID int32 type collectionConfig struct { - solarisMode bool - tagging map[string]bool - features map[string]bool + solarisMode bool + tagging map[string]bool + features map[string]bool + socketProtos []string } type Procstat struct { @@ -53,6 +54,7 @@ type Procstat struct { WinService string `toml:"win_service"` Mode string `toml:"mode"` Properties []string `toml:"properties"` + SocketProtocols []string `toml:"socket_protocols"` TagWith []string `toml:"tag_with"` Filter []Filter `toml:"filter"` Log telegraf.Logger `toml:"-"` @@ -95,7 +97,7 @@ func (p *Procstat) Init() error { p.cfg.tagging = make(map[string]bool, len(p.TagWith)) for _, tag := range p.TagWith { switch tag { - case "cmdline", "pid", "ppid", "status", "user": + case "cmdline", "pid", "ppid", "status", "user", "protocol": default: return fmt.Errorf("invalid 'tag_with' setting %q", tag) } @@ -107,6 +109,30 @@ func (p *Procstat) Init() error { for _, prop := range p.Properties { switch prop { case "cpu", "limits", "memory", "mmap": + case "sockets": + if !supportsSocketStat { + return errors.New("socket statistics are not supported on your platform") + } + if len(p.SocketProtocols) == 0 { + p.SocketProtocols = []string{"all"} + } + protos := make(map[string]bool, len(p.SocketProtocols)) + for _, proto := range p.SocketProtocols { + switch proto { + case "all": + if len(protos) > 0 || len(p.SocketProtocols) > 1 { + return errors.New("additional 'socket_protocol' settings besides 'all' are not allowed") + } + case "tcp4", "tcp6", "udp4", "udp6", "unix": + default: + return fmt.Errorf("invalid 'socket_protocol' setting %q", proto) + } + if protos[proto] { + return fmt.Errorf("duplicate %q in 'socket_protocol' setting", proto) + } + protos[proto] = true + p.cfg.socketProtos = append(p.cfg.socketProtos, proto) + } default: return fmt.Errorf("invalid 'properties' setting %q", prop) } @@ -252,9 +278,15 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error { p.processes[pid] = proc } running[pid] = true - m := proc.Metric(p.Prefix, &p.cfg) - m.SetTime(now) - acc.AddMetric(m) + metrics, err := proc.Metrics(p.Prefix, &p.cfg, now) + if err != nil { + // Continue after logging an error as there might still be + // metrics available + acc.AddError(err) + } + for _, m := range metrics { + acc.AddMetric(m) + } } } @@ -351,9 +383,15 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error { p.processes[pid] = proc } running[pid] = true - m := proc.Metric(p.Prefix, &p.cfg) - m.SetTime(now) - acc.AddMetric(m) + metrics, err := proc.Metrics(p.Prefix, &p.cfg, now) + if err != nil { + // Continue after logging an error as there might still be + // metrics available + acc.AddError(err) + } + for _, m := range metrics { + acc.AddMetric(m) + } } } diff --git a/plugins/inputs/procstat/procstat_test.go b/plugins/inputs/procstat/procstat_test.go index 5221d831406bc..883ea5bb4629d 100644 --- a/plugins/inputs/procstat/procstat_test.go +++ b/plugins/inputs/procstat/procstat_test.go @@ -142,7 +142,7 @@ func (p *testProc) MemoryMaps(bool) (*[]process.MemoryMapsStat, error) { return &[]process.MemoryMapsStat{}, nil } -func (p *testProc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric { +func (p *testProc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) { if prefix != "" { prefix += "_" } @@ -212,7 +212,7 @@ func (p *testProc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric fields[prefix+"user"] = "testuser" } - return metric.New("procstat", tags, fields, time.Time{}) + return []telegraf.Metric{metric.New("procstat", tags, fields, t)}, nil } var pid = PID(42) diff --git a/plugins/inputs/procstat/sample.conf b/plugins/inputs/procstat/sample.conf index 20e003d5b31c5..818d9ebe4ee9d 100644 --- a/plugins/inputs/procstat/sample.conf +++ b/plugins/inputs/procstat/sample.conf @@ -35,17 +35,33 @@ ## result in a large number of series, especially with short-lived processes, ## creating high cardinality at the output. ## Available options are: - ## cmdline -- full commandline - ## pid -- ID of the process - ## ppid -- ID of the process' parent - ## status -- state of the process - ## user -- username owning the process + ## cmdline -- full commandline + ## pid -- ID of the process + ## ppid -- ID of the process' parent + ## status -- state of the process + ## user -- username owning the process + ## protocol -- protocol type of the process socket # tag_with = [] ## Properties to collect - ## Available options are "cpu", "limits", "memory", "mmap" + ## Available options are + ## cpu -- CPU usage statistics + ## limits -- set resource limits + ## memory -- memory usage statistics + ## mmap -- mapped memory usage statistics (caution: can cause high load) + ## sockets -- socket statistics for protocols in 'socket_protocols' (Linux only) # properties = ["cpu", "limits", "memory", "mmap"] + ## Protocol filter for the sockets property (Linux only) + ## Available options are + ## all -- all of the protocols below + ## tcp4 -- TCP socket statistics for IPv4 + ## tcp6 -- TCP socket statistics for IPv6 + ## udp4 -- UDP socket statistics for IPv4 + ## udp6 -- UDP socket statistics for IPv6 + ## unix -- Unix socket statistics + # socket_protocols = ["all"] + ## Method to use when finding process IDs. Can be one of 'pgrep', or ## 'native'. The pgrep finder calls the pgrep executable in the PATH while ## the native finder performs the search directly in a manor dependent on the