Skip to content

Commit

Permalink
Merge pull request #4 from medallia/aurora
Browse files Browse the repository at this point in the history
Add Apache Aurora Input Plugin
  • Loading branch information
connorgorman authored May 11, 2017
2 parents 115ba45 + b7eb8df commit 6510de9
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 0 deletions.
12 changes: 12 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,18 @@
# # ]


# # Telegraf plugin for gather Apache Aurora metrics
# [[inputs.aurora]]
# ## Timeout, in ms.
# timeout = 100
# ## Aurora Master
# master = "localhost:8081"
# ## Http Prefix
# prefix = "http"
# ## Numeric values only
# numeric = true


# # Read metrics from one or many MongoDB servers
# [[inputs.mongodb]]
# ## An array of URI to gather stats about. Specify an ip or hostname
Expand Down
190 changes: 190 additions & 0 deletions plugins/inputs/aurora/aurora.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package aurora

import (
"fmt"
"io/ioutil"
"log"
"net/http"
"regexp"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)

type Aurora struct {
Timeout int `toml:"timeout"`
Master string `toml:"master"`
HttpPrefix string `toml:"prefix"`
Numeric bool `toml:"numeric"`
}

var sampleConfig = `
## Timeout, in ms.
timeout = 100
## Aurora Master
master = "localhost:8081"
## Http Prefix
prefix = "http"
## Numeric values only
numeric = true
`

// SampleConfig returns a sample configuration block
func (a *Aurora) SampleConfig() string {
return sampleConfig
}

// Description just returns a short description of the Mesos plugin
func (a *Aurora) Description() string {
return "Telegraf plugin for gathering metrics from N Apache Aurora Masters"
}

func (a *Aurora) SetDefaults() {
if a.Timeout == 0 {
log.Println("I! [aurora] Missing timeout value, setting default value (1000ms)")
a.Timeout = 1000
} else if a.HttpPrefix == "" {
log.Println("I! [aurora] Missing http prefix value, setting default value (http)")
a.HttpPrefix = "http"
}
}


// Converts string values taken from aurora vars to numeric values for wavefront
func convertToNumeric(value string) (interface{}, bool) {
var err error
var val interface{}
if val, err = strconv.ParseFloat(value, 64); err == nil {
return val, true
}
if val, err = strconv.ParseBool(value); err != nil {
return val.(bool), false
}
return val, true
}

// Matches job keys like sla_role2/prod2/jobname2_job_uptime_50.00_sec
func isJobMetric(key string) bool {
// Regex for matching job specific tasks
re := regexp.MustCompile("^sla_(.*?)/(.*?)/.*")
return re.MatchString(key)
}

// Checks if the job key starts with task_store indicating it's a task store metric
func isTaskStore(key string) bool {
return strings.HasPrefix(key, "task_store_")
}

// Checks if the key is a framework_registered which is used to determine leader election
func isFramework(key string) bool {
return strings.HasPrefix(key, "framework_registered")
}

// This function parses a job metric key like sla_role2/prod2/jobname2_job_uptime_50.00_sec
// It returns the fields and the tags associated with those fields
func parseJobSpecificMetric(key string, value interface{}) (map[string]interface{}, map[string]string) {
// cut off the sla_
key = key[4:]
// We have previous checked if this is a job metric using isJobMetric so we know there will be 2 slashes
slashSplit := strings.Split(key, "/")
role := slashSplit[0]
env := slashSplit[1]
underscoreIdx := strings.Index(slashSplit[2], "_")
job := slashSplit[2][:underscoreIdx]
metric := slashSplit[2][underscoreIdx+1:]

fields := make(map[string]interface{})
fields[metric] = value

tags := make(map[string]string)
tags["role"] = role
tags["env"] = env
tags["job"] = job
return fields, tags
}

// This function takes a metric like task_store_DRAINED and generates aurora.task.store.DRAINED
func parseTaskStore(key string, value interface{}) (map[string]interface{}, map[string]string) {
metric := "task.store." + strings.Replace(key[len("task_store_"):], "_", ".", -1)
fields := make(map[string]interface{})
fields[metric] = value
tags := make(map[string]string)
return fields, tags
}

// This method parses the value out of the variable line which is always in the last place
// It returns the key, value, and error
func (a *Aurora) parseMetric(line string) (string, interface{}, error) {
splitIdx := strings.Index(line, " ")
if splitIdx == -1 {
return "", nil, fmt.Errorf("Invalid metric line %s has no value", line)
}
key := line[:splitIdx]
value := line[splitIdx+1:]
// If numeric is true and the metric is not numeric then ignore
numeric, isNumeric := convertToNumeric(value)
if a.Numeric && !isNumeric {
return "", nil, fmt.Errorf("Value is rejected due to being non-numeric")
}
return key, numeric, nil
}

// Gather() metrics from given list of Aurora Masters
func (a *Aurora) Gather(acc telegraf.Accumulator) error {
a.SetDefaults()

client := http.Client{
Timeout: time.Duration(a.Timeout) * time.Second,
}
url := fmt.Sprintf("%s://%s/vars", a.HttpPrefix, a.Master)
resp, err := client.Get(url)
if err != nil {
return err
}

data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
defer resp.Body.Close()

lines := strings.Split(string(data), "\n")
for _, line := range lines {
if isJobMetric(line) {
key, value, err := a.parseMetric(line)
if err != nil {
continue
}
fields, tags := parseJobSpecificMetric(key, value)
// Per job there are different tags so need to add a field per line
acc.AddFields("aurora", fields, tags)
} else if isTaskStore(line) {
key, value, err := a.parseMetric(line)
if err != nil {
continue
}
fields, tags := parseTaskStore(key, value)
acc.AddFields("aurora", fields, tags)
} else if isFramework(line) {
key, value, err := a.parseMetric(line)
if err != nil {
continue
}
fields := map[string]interface{}{
key: value,
}
tags := make(map[string]string)
acc.AddFields("aurora", fields, tags)
}
}
return nil
}

func init() {
inputs.Add("aurora", func() telegraf.Input {
return &Aurora{}
})
}
124 changes: 124 additions & 0 deletions plugins/inputs/aurora/aurora_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package aurora

import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)

var masterServer *httptest.Server

func getRawMetrics() string {
return `assigner_launch_failures 0
cron_job_triggers 240
sla_cluster_mtta_ms 18
sla_disk_small_mttr_ms 1029
sla_cpu_small_mtta_ms 17
jvm_prop_java.endorsed.dirs /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/endorsed
sla_role2/prod2/jobname2_job_uptime_50.00_sec 25`
}

func TestMain(m *testing.M) {
metrics := getRawMetrics()

masterRouter := http.NewServeMux()
masterRouter.HandleFunc("/vars", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, metrics)
})
masterServer = httptest.NewServer(masterRouter)

rc := m.Run()

masterServer.Close()
os.Exit(rc)
}

func TestConvertToNumeric(t *testing.T) {
if _, isNumeric := convertToNumeric("0.000"); !isNumeric {
t.Fatalf("0.000 should have been numeric")
}
if _, isNumeric := convertToNumeric("7"); !isNumeric {
t.Fatalf("7 should have been numeric")
}
if boolVal, isNumeric := convertToNumeric("true"); !isNumeric {
if val := boolVal.(int); val != 1 {
t.Fatalf("true should have been converted to a 1")
}
t.Fatalf("true should have been numeric")
}
if boolVal, isNumeric := convertToNumeric("false"); !isNumeric {
if val := boolVal.(int); val != 0 {
t.Fatalf("false should have been converted to a 0")
}
t.Fatalf("false should have been numeric")
}
if _, isNumeric := convertToNumeric("&"); isNumeric {
t.Fatalf("& should not be numeric")
}
}

func TestIsJobMetric(t *testing.T) {
var notJobMetrics = []string{
"assigner_launch_failures",
"cron_job_triggers",
"sla_cluster_mtta_ms",
"sla_disk_small_mttr_ms",
"sla_cpu_small_mtta_ms",
}
for _, metric := range notJobMetrics {
if isJobMetric(metric) {
t.Fatalf("%s should not be a job metric", metric)
}
}
var isJobMetrics = []string{
"sla_role2/prod2/jobname2_job_uptime_50.00_sec",
}
for _, metric := range isJobMetrics {
if !isJobMetric(metric) {
t.Fatalf("%s should be a job metric", metric)
}
}
}

func TestParseJobSpecificMetric(t *testing.T) {
var expectedFields = map[string]interface{}{
"job_uptime_50.00_sec": 0,
}
var expectedTags = map[string]string{
"role": "role2",
"env": "prod2",
"job": "jobname2",
}
key := "sla_role2/prod2/jobname2_job_uptime_50.00_sec"
value := 0
fields, tags := parseJobSpecificMetric(key, value)
assert.Equal(t, fields, expectedFields)
assert.Equal(t, tags, expectedTags)
}

func TestAuroraMaster(t *testing.T) {
var acc testutil.Accumulator

m := Aurora{
Master: masterServer.Listener.Addr().String(),
Timeout: 10,
HttpPrefix: "http",
Numeric: true,
}

err := m.Gather(&acc)
if err != nil {
t.Error(err)
}

var referenceMetrics = map[string]interface{}{
"job_uptime_50.00_sec": 25.0,
}
acc.AssertContainsFields(t, "aurora", referenceMetrics)
}

0 comments on commit 6510de9

Please sign in to comment.