From a0dfa0a389c25e65aea46d99812ce9bb524482ef Mon Sep 17 00:00:00 2001 From: Monica Sarbu Date: Tue, 4 Aug 2015 22:45:59 +0200 Subject: [PATCH 1/2] Get basic info like mem, cpu times, state, name for each process, using sigar lib --- etc/topbeat.dev.yml | 57 +++++++++++ main.go | 35 ++++--- sigar.go | 224 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 303 insertions(+), 13 deletions(-) create mode 100644 etc/topbeat.dev.yml create mode 100644 sigar.go diff --git a/etc/topbeat.dev.yml b/etc/topbeat.dev.yml new file mode 100644 index 00000000000..38972534cf1 --- /dev/null +++ b/etc/topbeat.dev.yml @@ -0,0 +1,57 @@ +################### Topbeat Configuration Example ######################### + +############################# Input ############################################ +input: + period: 1 + +############################# Shipper ############################################ +shipper: + + # The name of the shipper that publishes the network data. It can be used to group + # all the transactions sent by a single shipper in the web interface. + # If this options is not defined, the hostname is used. + name: + + # The tags of the shipper are included in their own field with each + # transaction published. Tags make it easy to group transactions by different + # logical properties. + #tags: ["service1"] + +############################# Output ############################################ + +# Configure what outputs to use when sending the data collected by topbeat. +# You can enable one or multiple outputs by setting enabled option to true. +output: + + # Elasticsearch as output + # Options: + # host, port: where Elasticsearch is listening on + # save_topology: specify if the topology is saved in Elasticsearch + elasticsearch: + enabled: false + hosts: ["localhost:9200"] + save_topology: false + + # Redis as output + # Options: + # host, port: where Redis is listening on + # save_topology: specify if the topology is saved in Redis + #redis: + # enabled: true + # host: localhost + # port: 6379 + # save_topology: true + + # File as output + # Options: + # path: where to save the files + # filename: name of the files + # rotate_every_kb: maximum size of the files in path + # number of files: maximum number of files in path + file: + enabled: true + path: "/tmp/topbeat" + filename: topbeat + rotate_every_kb: 1000 + number_of_files: 7 + diff --git a/main.go b/main.go index 021e72bbf33..94dd7cb213a 100644 --- a/main.go +++ b/main.go @@ -13,9 +13,6 @@ import ( "github.com/elastic/libbeat/publisher" "github.com/elastic/libbeat/service" "github.com/monicasarbu/gotop/cpu" - "github.com/monicasarbu/gotop/load" - "github.com/monicasarbu/gotop/mem" - "github.com/monicasarbu/gotop/proc" ) // You can overwrite these, e.g.: go build -ldflags "-X main.Version 1.0.0-beta3" @@ -49,42 +46,54 @@ func (t *Topbeat) Run() error { for t.isAlive { time.Sleep(1 * time.Second) - load_stat, err := load.Load() + load_stat, err := GetSystemLoad() if err != nil { - logp.Err("Error reading load statistics: %v", err) + logp.Err("Fail to get load statistics: %v", err) continue } - - cpu_stat, err := cpu.Cpu_times_percent(0) + cpu_stat, err := GetCpuTimes() if err != nil { - logp.Err("Error reading cpu times: %v", err) + logp.Err("Fail to get cpu times: %v", err) continue } - mem_stat, err := mem.Virtual_memory() + mem_stat, err := GetMemory() if err != nil { - logp.Err("Error reading memory statistics: %v", err) + logp.Err("Fail to get memory details: %v", err) continue } + swap_stat, err := GetSwap() + if err != nil { + logp.Err("Fail to get swap details: %v", err) + } - pids := proc.Pids() - procs := []proc.Process{} + pids, err := Pids() + if err != nil { + logp.Err("Fail to get the list of pids: %v", err) + } + procs := []Process{} + procs_ignored := 0 for _, pid := range pids { - process, err := proc.GetProcess(pid) + process, err := GetProcess(pid) if err != nil { logp.Err("Error geting the process %d: %v", pid, err) + procs_ignored += 1 continue } procs = append(procs, *process) + logp.Debug("topbeat", "Process: %s", process) } + logp.Debug("topbeat", "Processes %d, ignored %d", len(pids), procs_ignored) + event := common.MapStr{ "timestamp": common.Time(time.Now()), "type": "top", "load": load_stat, "cpu": cpu_stat, "mem": mem_stat, + "swap": swap_stat, "procs": procs, } diff --git a/sigar.go b/sigar.go new file mode 100644 index 00000000000..9637687a279 --- /dev/null +++ b/sigar.go @@ -0,0 +1,224 @@ +package main + +import ( + "fmt" + + "github.com/cloudfoundry/gosigar" + "github.com/elastic/libbeat/logp" +) + +type SystemLoad struct { + Load1 float64 `json:"load1"` + Load5 float64 `json:"load5"` + Load15 float64 `json:"load15"` +} + +type CpuTimes struct { + User uint64 `json:"user"` + Nice uint64 `json:"nice"` + System uint64 `json:"system"` + Idle uint64 `json:"idle"` + IOWait uint64 `json:"iowait"` + Irq uint64 `json:"irq"` + SoftIrq uint64 `json:"softirq"` + Steal uint64 `json:"steal"` +} + +type MemStat struct { + Total uint64 `json:"total"` + Used uint64 `json:"used"` + Free uint64 `json:"free"` + ActualUsed uint64 `json:"actual_used"` + ActualFree uint64 `json:"actual_free"` +} + +type ProcMemStat struct { + Size uint64 `json:"size"` + Resident uint64 `json:"rss"` + Share uint64 `json:"share"` +} + +type ProcCpuTime struct { + User uint64 `json:"user"` + System uint64 `json:"system"` + Total uint64 `json:"total"` + Start string `json:"start_time"` +} + +type Process struct { + Pid int `json:"pid"` + Ppid int `json:"ppid"` + Name string `json:"name"` + State string `json:"state"` + Mem ProcMemStat `json:"mem"` + Cpu ProcCpuTime `json:"cpu"` +} + +func (p *Process) String() string { + + return fmt.Sprintf("pid: %d, ppid: %d, name: %s, state: %s, mem: %s, cpu: %s", + p.Pid, p.Ppid, p.Name, p.State, p.Mem.String(), p.Cpu.String()) +} + +func (m *ProcMemStat) String() string { + + return fmt.Sprintf("%d size, %d rss, %d share", m.Size, m.Resident, m.Share) +} + +func (t *ProcCpuTime) String() string { + return fmt.Sprintf("started at %s, %d total, %d us, %d sys", t.Start, t.Total, t.User, t.System) + +} + +func (m *MemStat) String() string { + + return fmt.Sprintf("%d total, %d used, %d actual used, %d free, %d actual free", m.Total, m.Used, m.ActualUsed, + m.Free, m.ActualFree) +} + +func (t *SystemLoad) String() string { + + return fmt.Sprintf("%.2f %.2f %.2f", t.Load1, t.Load5, t.Load15) +} + +func (t *CpuTimes) String() string { + + return fmt.Sprintf("%d user, %d system, %d nice, %d iddle, %d iowait, %d irq, %d softirq, %d steal", + t.User, t.System, t.Nice, t.Idle, t.IOWait, t.Irq, t.SoftIrq, t.Steal) +} + +func GetSystemLoad() (*SystemLoad, error) { + + concreteSigar := sigar.ConcreteSigar{} + avg, err := concreteSigar.GetLoadAverage() + if err != nil { + return nil, err + } + logp.Debug("topbeat", "load %v\n", avg) + + return &SystemLoad{ + Load1: avg.One, + Load5: avg.Five, + Load15: avg.Fifteen, + }, nil +} + +func GetCpuTimes() (*CpuTimes, error) { + + cpu := sigar.Cpu{} + err := cpu.Get() + if err != nil { + return nil, err + } + + logp.Debug("topbeat", "cpu times %v\n", cpu) + + return &CpuTimes{ + User: cpu.User, + Nice: cpu.Nice, + System: cpu.Sys, + Idle: cpu.Idle, + IOWait: cpu.Wait, + Irq: cpu.Irq, + SoftIrq: cpu.SoftIrq, + Steal: cpu.Stolen, + }, nil +} + +func GetMemory() (*MemStat, error) { + + mem := sigar.Mem{} + err := mem.Get() + if err != nil { + return nil, err + } + return &MemStat{ + Total: mem.Total, + Used: mem.Used, + Free: mem.Free, + ActualFree: mem.ActualFree, + ActualUsed: mem.ActualUsed, + }, nil +} + +func GetSwap() (*MemStat, error) { + + swap := sigar.Swap{} + err := swap.Get() + if err != nil { + return nil, err + } + return &MemStat{ + Total: swap.Total, + Used: swap.Used, + Free: swap.Free, + }, nil + +} + +func Pids() ([]int, error) { + + pids := sigar.ProcList{} + err := pids.Get() + if err != nil { + return nil, err + } + return pids.List, nil +} + +func getProcState(b byte) string { + + switch b { + case 'S': + return "sleeping" + case 'R': + return "running" + case 'D': + return "idle" + case 'T': + return "stopped" + case 'Z': + return "zombie" + } + return "" +} + +func GetProcess(pid int) (*Process, error) { + + state := sigar.ProcState{} + mem := sigar.ProcMem{} + time := sigar.ProcTime{} + + err := state.Get(pid) + if err != nil { + return nil, err + } + + err = mem.Get(pid) + if err != nil { + return nil, err + } + + err = time.Get(pid) + if err != nil { + return nil, err + } + + return &Process{ + Pid: pid, + Ppid: state.Ppid, + Name: state.Name, + State: getProcState(byte(state.State)), + Mem: ProcMemStat{ + Size: mem.Size / 1024, + Resident: mem.Resident / 1024, + Share: mem.Share / 1024, + }, + Cpu: ProcCpuTime{ + Start: time.FormatStartTime(), + Total: time.Total, + User: time.User, + System: time.Sys, + }, + }, nil +} From 32f6f45e074a45ff844d9fadc9ec84f03699450b Mon Sep 17 00:00:00 2001 From: Monica Sarbu Date: Wed, 5 Aug 2015 18:28:37 +0200 Subject: [PATCH 2/2] Insert in ES a document for each process There are two types of documents that are inserted in ES: 1. index=topbeat-*, type=system for system wide statistics (memory, swqp, cpu times, system load) 2. index=topbeat-*, type=proc for per process statistics (memory, cpu times, ppid, state, process name) in order to be able to easily build per process statistics in Kibana, like the graphs describing the evolution of CPU usage for a certain process. By default, all the processes that are running on a server are exported to ES. You can configure a subset of processes by using regular expressions. The default configuration is: input: period: 1 // every second statistics procs: [".*"] // all processes are exported to ES --- config.go | 1 + etc/topbeat.dev.yml | 18 ++++++++----- main.go | 62 ++++++++++++++++++++++++++++++++------------- sigar.go | 10 ++++---- 4 files changed, 62 insertions(+), 29 deletions(-) diff --git a/config.go b/config.go index 5c3d8a89a3e..91742fd073e 100644 --- a/config.go +++ b/config.go @@ -8,6 +8,7 @@ import ( type TopConfig struct { Period *int64 + Procs *[]string } type ConfigSettings struct { diff --git a/etc/topbeat.dev.yml b/etc/topbeat.dev.yml index 38972534cf1..9b4bf191001 100644 --- a/etc/topbeat.dev.yml +++ b/etc/topbeat.dev.yml @@ -4,6 +4,9 @@ input: period: 1 + procs: [".*"] + + ############################# Shipper ############################################ shipper: @@ -28,9 +31,10 @@ output: # host, port: where Elasticsearch is listening on # save_topology: specify if the topology is saved in Elasticsearch elasticsearch: - enabled: false + enabled: true hosts: ["localhost:9200"] save_topology: false + index: "topbeat" # Redis as output # Options: @@ -48,10 +52,10 @@ output: # filename: name of the files # rotate_every_kb: maximum size of the files in path # number of files: maximum number of files in path - file: - enabled: true - path: "/tmp/topbeat" - filename: topbeat - rotate_every_kb: 1000 - number_of_files: 7 + #file: + # enabled: true + # path: "/tmp/topbeat" + # filename: topbeat + # rotate_every_kb: 1000 + # number_of_files: 7 diff --git a/main.go b/main.go index 94dd7cb213a..f24e99d4e8e 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "os" + "regexp" "runtime" "time" @@ -12,7 +13,6 @@ import ( "github.com/elastic/libbeat/logp" "github.com/elastic/libbeat/publisher" "github.com/elastic/libbeat/service" - "github.com/monicasarbu/gotop/cpu" ) // You can overwrite these, e.g.: go build -ldflags "-X main.Version 1.0.0-beta3" @@ -22,6 +22,7 @@ var Name = "topbeat" type Topbeat struct { isAlive bool period time.Duration + procs []string events chan common.MapStr } @@ -33,14 +34,32 @@ func (t *Topbeat) Init(config TopConfig, events chan common.MapStr) error { } else { t.period = 1 * time.Second } + if config.Procs != nil { + t.procs = *config.Procs + } else { + t.procs = []string{".*"} //all processes + } + + logp.Debug("topbeat", "Init toppbeat") + logp.Debug("topbeat", "Follow processes %q\n", t.procs) logp.Debug("topbeat", "Period %v\n", t.period) t.events = events return nil } +func (t *Topbeat) MatchProcess(name string) bool { + + for _, reg := range t.procs { + matched, _ := regexp.MatchString(reg, name) + if matched { + return true + } + } + return false +} + func (t *Topbeat) Run() error { - _, _ = cpu.Cpu_times_percent(0) t.isAlive = true for t.isAlive { @@ -48,53 +67,62 @@ func (t *Topbeat) Run() error { load_stat, err := GetSystemLoad() if err != nil { - logp.Err("Fail to get load statistics: %v", err) + logp.Warn("Getting load statistics: %v", err) continue } cpu_stat, err := GetCpuTimes() if err != nil { - logp.Err("Fail to get cpu times: %v", err) + logp.Warn("Getting cpu times: %v", err) continue } mem_stat, err := GetMemory() if err != nil { - logp.Err("Fail to get memory details: %v", err) + logp.Warn("Getting memory details: %v", err) continue } swap_stat, err := GetSwap() if err != nil { - logp.Err("Fail to get swap details: %v", err) + logp.Warn("Getting swap details: %v", err) } pids, err := Pids() if err != nil { - logp.Err("Fail to get the list of pids: %v", err) + logp.Warn("Getting the list of pids: %v", err) } - procs := []Process{} - procs_ignored := 0 for _, pid := range pids { process, err := GetProcess(pid) if err != nil { - logp.Err("Error geting the process %d: %v", pid, err) - procs_ignored += 1 + logp.Debug("topbeat", "Skip process %d: %v", pid, err) continue } - procs = append(procs, *process) - logp.Debug("topbeat", "Process: %s", process) - } - logp.Debug("topbeat", "Processes %d, ignored %d", len(pids), procs_ignored) + if t.MatchProcess(process.Name) { + + logp.Debug("topbeat", "Process: %s", process) + + event := common.MapStr{ + "timestamp": common.Time(time.Now()), + "type": "proc", + "proc.pid": process.Pid, + "proc.ppid": process.Ppid, + "proc.name": process.Name, + "proc.state": process.State, + "proc.mem": process.Mem, + "proc.cpu": process.Cpu, + } + t.events <- event + } + } event := common.MapStr{ "timestamp": common.Time(time.Now()), - "type": "top", + "type": "system", "load": load_stat, "cpu": cpu_stat, "mem": mem_stat, "swap": swap_stat, - "procs": procs, } t.events <- event diff --git a/sigar.go b/sigar.go index 9637687a279..394f309f73e 100644 --- a/sigar.go +++ b/sigar.go @@ -133,11 +133,11 @@ func GetMemory() (*MemStat, error) { return nil, err } return &MemStat{ - Total: mem.Total, - Used: mem.Used, - Free: mem.Free, - ActualFree: mem.ActualFree, - ActualUsed: mem.ActualUsed, + Total: mem.Total / 1024, + Used: mem.Used / 1024, + Free: mem.Free / 1024, + ActualFree: mem.ActualFree / 1024, + ActualUsed: mem.ActualUsed / 1024, }, nil }