Skip to content

Commit

Permalink
Merge pull request #5 from signalfx/output-update-1
Browse files Browse the repository at this point in the history
minor bug fixes to SignalFx output and metadata plugins
  • Loading branch information
charless-splunk authored Jan 11, 2018
2 parents 8303b8e + b223db3 commit f055d7e
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 145 deletions.
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65
github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
github.com/signalfx/com_signalfx_metrics_protobuf c2cf2ea6c74f6ca217767bc69e002f7a721c50df
github.com/signalfx/gohistogram 1ccfd2ff508314074672f4450a917011a2060408
github.com/signalfx/golib 18f12a260c4e6a0c93d4f37ecca95679c48a25d7
github.com/signalfx/golib v1.1.0
github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15
github.com/StackExchange/wmi f3e2bae1e0cb5aef83e319133eabfee30013a4a5
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
Expand Down
12 changes: 4 additions & 8 deletions plugins/inputs/signalfx_metadata/aws-info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ func (s *AWSInfo) GetAWSInfo() (info map[string]string) {
return
}

func buildAWSUniqueID(info map[string]string) (string, bool) {
var awsUniqueID string
var awsSet = false
func buildAWSUniqueID(info map[string]string) (awsUniqueID string, awsSet bool) {
if id, ok := info["aws_instance_id"]; ok {
if region, ok := info["aws_region"]; ok {
if account, ok := info["aws_account_id"]; ok {
Expand All @@ -53,7 +51,7 @@ func buildAWSUniqueID(info map[string]string) (string, bool) {
}
}
}
return awsUniqueID, awsSet
return
}

func processAWSInfo(info map[string]string, identity map[string]interface{}) {
Expand All @@ -75,12 +73,10 @@ func processAWSInfo(info map[string]string, identity map[string]interface{}) {
}
}

func requestAWSInfo() (map[string]interface{}, error) {
func requestAWSInfo() (identity map[string]interface{}, err error) {
var url = "http://169.254.169.254/latest/dynamic/instance-identity/document"
var identity map[string]interface{}
var httpClient = &http.Client{Timeout: 200 * time.Millisecond}
var raw []byte
var err error
var res *http.Response

// make the request
Expand All @@ -93,5 +89,5 @@ func requestAWSInfo() (map[string]interface{}, error) {
// parse the json response
err = json.Unmarshal(raw, &identity)
}
return identity, err
return
}
6 changes: 2 additions & 4 deletions plugins/inputs/signalfx_metadata/host-info.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,16 @@ func GetMemory() (info map[string]string) {
return
}

func getStringFromFile(pattern string, path string) (string, error) {
func getStringFromFile(pattern string, path string) (response string, err error) {
var file []byte
var match [][]byte
var err error
var reg = regexp.MustCompile(pattern)
var response string

if file, err = ioutil.ReadFile(path); err == nil {
match = reg.FindSubmatch(file)
if len(match) > 1 {
response = string(match[1])
}
}
return response, err
return
}
215 changes: 124 additions & 91 deletions plugins/inputs/signalfx_metadata/process-info.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"log"
"math"
"reflect"
"strconv"
Expand All @@ -14,31 +15,44 @@ import (
"github.com/shirou/gopsutil/process"
)

const numColumns = 11

// NewProcessInfo - returns a new ProcessInfo instance
func NewProcessInfo() *ProcessInfo {
return &ProcessInfo{make(map[int32]*process.Process)}
func NewProcessInfo(bufferSize int, numWorkers int) *ProcessInfo {
var s = &ProcessInfo{
processes: make(map[int32]*process.Process),
processIn: make(chan *workerInProcess, bufferSize),
bufferSize: bufferSize,
numWorkers: numWorkers,
}

for i := 0; i < numWorkers; i++ {
newWorkerProcess(s.processIn)
}

return s
}

// ProcessInfo - list of processes
type ProcessInfo struct {
processes map[int32]*process.Process
processes map[int32]*process.Process
processIn chan *workerInProcess
bufferSize int
numWorkers int
}

// GetTop - returns a map of process information
func (s *ProcessInfo) GetTop() ([]byte, error) {
var response = map[string]string{
"v": pluginVersion,
}
var byteResponse []byte
var top = make(map[string][]interface{})
func (s *ProcessInfo) GetTop() (response string, err error) {
start := time.Now()
var pids []int32
var pidList map[int32]bool
var err error

pidList = make(map[int32]bool)

var compressed bytes.Buffer
pids, err = process.Pids()
// store process instances in s.processes to collect accurate %cpu info
if err == nil {
var pidList = make(map[int32]bool, len(pids))
var top = make(map[string][]interface{}, len(pids))
var output = make(chan *workerOutProcess, s.bufferSize)

// Add missing processes to process list
for _, pid := range pids {
pidList[pid] = true
Expand All @@ -48,159 +62,178 @@ func (s *ProcessInfo) GetTop() ([]byte, error) {
}
}
}

// use separate go routine to push processes on to worker threads
for pid, proc := range s.processes {
// Remove dead processes from process list
if _, in := pidList[pid]; !in {
delete(s.processes, pid)
} else {
pid64 := int64(pid)
stringPid := strconv.FormatInt(pid64, 10)
top[stringPid] = GetProcessInfo(proc)
s.processIn <- &workerInProcess{
pid: pid,
proc: proc,
f: s.GetProcessInfo,
out: output,
}
}
}
if js, er := json.Marshal(top); er == nil {
compressed := compressByteArray(js)
base64ed := base64.StdEncoding.EncodeToString(compressed)
response["t"] = base64ed

// wait for all processes to return
count := 0
for msg := range output {
top[strconv.FormatInt(int64(msg.pid), 10)] = msg.out
count++
if count == len(pids) {
close(output)
}
}

if js, er := json.Marshal(top); er == nil {
compressed, err = compressByteArray(js)
}
byteResponse, err = json.Marshal(response)
}
return byteResponse, err
response = fmt.Sprintf("{\"t\":\"%s\",\"v\":\"%s\"}", base64.StdEncoding.EncodeToString(compressed.Bytes()), pluginVersion)
log.Printf("D! Input [signalfx-metadata] process list collection took %s \n", time.Since(start))
return
}

func compressByteArray(in []byte) []byte {
var buf bytes.Buffer
func compressByteArray(in []byte) (buf bytes.Buffer, err error) {
compressor := zlib.NewWriter(&buf)
if _, err := compressor.Write(in); err != nil {
panic(err)
}
if err := compressor.Close(); err != nil {
panic(err)
}
return buf.Bytes()
_, err = compressor.Write(in)
_ = compressor.Close()
return
}

func getProcessCommand(proc *process.Process) string {
var response = " "
func getProcessCommand(proc *process.Process) (response string) {
response = " "
if val, err := proc.Name(); err == nil {
response = val
}
return response
return
}

func getProcessCPUNiceValue(proc *process.Process) int32 {
var response = int32(0)
func getProcessCPUNiceValue(proc *process.Process) (response int32) {
if val, err := proc.Nice(); err == nil {
response = val
}
return response
return
}

func getProcessCPUPercent(proc *process.Process) float64 {
var response = float64(0)
func getProcessCPUPercent(proc *process.Process) (response float64) {
if val, err := proc.Percent(time.Duration(0)); err == nil {
response = val
}
return response
return
}

func getProcessCPUTime(proc *process.Process) string {
var response = " "
func getProcessCPUTime(proc *process.Process) (response string) {
response = " "
if val, err := proc.Times(); err == nil {
response = toTime(val.User + val.System)
}
return response
}

// GetProcessInfo - returns an array of information about a process
func GetProcessInfo(proc *process.Process) []interface{} {
var username = getProcessUsername(proc)
var priority = getProcessPriority(proc)
var cpuNiceValue = getProcessCPUNiceValue(proc)
var virtualMemory, residentMemory = getProcessMemoryInfo(proc)
var sharedMemory = getProcessMemoryExInfo(proc)
var status = getProcessStatus(proc)
var cpuPercent = getProcessCPUPercent(proc)
var memPercent = getProcessMemoryPercent(proc)
var cpuTime = getProcessCPUTime(proc)
var commandValue = getProcessCommand(proc)
return
}

type workerOutProcess struct {
pid int32
out []interface{}
}

type workerInProcess struct {
pid int32
proc *process.Process
f func(*process.Process) []interface{}
out chan *workerOutProcess
}

func newWorkerProcess(in chan *workerInProcess) {
go func() {
for msg := range in {
msg.out <- &workerOutProcess{
pid: msg.pid,
out: msg.f(msg.proc),
}
}
}()
}

// GetProcessInfo returns the top styled process list encoded in base64 and compressed
func (s *ProcessInfo) GetProcessInfo(proc *process.Process) []interface{} {
return []interface{}{
username,
priority,
cpuNiceValue,
virtualMemory,
residentMemory,
sharedMemory,
status,
cpuPercent,
memPercent,
cpuTime,
commandValue,
getProcessUsername(proc),
getProcessPriority(proc),
getProcessCPUNiceValue(proc),
getProcessVirtualMemoryInfo(proc),
getProcessResidentMemoryInfo(proc),
getProcessMemoryExInfo(proc),
getProcessStatus(proc),
getProcessCPUPercent(proc),
getProcessMemoryPercent(proc),
getProcessCPUTime(proc),
getProcessCommand(proc),
}
}

func getProcessMemoryExInfo(proc *process.Process) uint64 {
var response = uint64(0)
func getProcessMemoryExInfo(proc *process.Process) (response uint64) {
if val, err := proc.MemoryInfoEx(); err == nil {
// MemoryInfoEx is not implemented on mac so we must reflect it
memEx := reflect.ValueOf(val)
f := reflect.Indirect(memEx).FieldByName("Shared")
v := f.Uint()
response = v / 1024
}
return response
return
}

func getProcessMemoryInfo(proc *process.Process) (uint64, uint64) {
var virtualMemory = uint64(0)
var residentMemory = uint64(0)
func getProcessVirtualMemoryInfo(proc *process.Process) (virtualMemory uint64) {
if val, err := proc.MemoryInfo(); err == nil {
virtualMemory = val.VMS / 1024
}
return
}

func getProcessResidentMemoryInfo(proc *process.Process) (residentMemory uint64) {
if val, err := proc.MemoryInfo(); err == nil {
residentMemory = val.RSS / 1024
}
return virtualMemory, residentMemory
return
}

func getProcessMemoryPercent(proc *process.Process) float32 {
var response = float32(0)
func getProcessMemoryPercent(proc *process.Process) (response float32) {
if val, err := proc.MemoryPercent(); err == nil {
response = val
}
return response
return
}

func getProcessPriority(proc *process.Process) int32 {
var response = int32(0)
func getProcessPriority(proc *process.Process) (response int32) {
if val, err := proc.IOnice(); err == nil {
response = val
}
return response
return
}

func getProcessStatus(proc *process.Process) string {
var response = "D"
func getProcessStatus(proc *process.Process) (response string) {
response = "D"
if val, err := proc.Status(); err == nil {
response = val
}
return response
return
}

func getProcessUsername(proc *process.Process) string {
var response = " "
func getProcessUsername(proc *process.Process) (response string) {
response = " "
if val, err := proc.Username(); err == nil {
response = val
}
return response
return
}

func toTime(secs float64) string {
var response string
func toTime(secs float64) (response string) {
minutes := int(secs / 60)
seconds := int(math.Mod(secs, 60.0))
sec := seconds
dec := (seconds - sec) * 100
response = fmt.Sprintf("%02d:%02d.%02d", minutes, sec, dec)
return response
return
}
Loading

0 comments on commit f055d7e

Please sign in to comment.