Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Apache Aurora Input Plugin #4

Merged
merged 1 commit into from
May 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,18 @@
# # ]


# # Telegraf plugin for gather Apache Aurora metrics
# [[inputs.aurora]]
# ## Timeout, in ms.
# timeout = 100
# ## Aurora Master
# master = "localhost:8081"
# ## Http Prefix
# prefix = "http"
# ## Numeric values only
# numeric = true


# # Read metrics from one or many MongoDB servers
# [[inputs.mongodb]]
# ## An array of URI to gather stats about. Specify an ip or hostname
Expand Down
190 changes: 190 additions & 0 deletions plugins/inputs/aurora/aurora.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package aurora

import (
"fmt"
"io/ioutil"
"log"
"net/http"
"regexp"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)

type Aurora struct {
Timeout int `toml:"timeout"`
Master string `toml:"master"`
HttpPrefix string `toml:"prefix"`
Numeric bool `toml:"numeric"`
}

var sampleConfig = `
## Timeout, in ms.
timeout = 100
## Aurora Master
master = "localhost:8081"
## Http Prefix
prefix = "http"
## Numeric values only
numeric = true
`

// SampleConfig returns a sample configuration block
func (a *Aurora) SampleConfig() string {
return sampleConfig
}

// Description just returns a short description of the Mesos plugin
func (a *Aurora) Description() string {
return "Telegraf plugin for gathering metrics from N Apache Aurora Masters"
}

func (a *Aurora) SetDefaults() {
if a.Timeout == 0 {
log.Println("I! [aurora] Missing timeout value, setting default value (1000ms)")
a.Timeout = 1000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be the value found in SampleConfig?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe the value in sampleConfig should be that one

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed to be 1000 everywhere

} else if a.HttpPrefix == "" {
log.Println("I! [aurora] Missing http prefix value, setting default value (http)")
a.HttpPrefix = "http"
}
}


// Converts string values taken from aurora vars to numeric values for wavefront
func convertToNumeric(value string) (interface{}, bool) {
var err error
var val interface{}
if val, err = strconv.ParseFloat(value, 64); err == nil {
return val, true
}
if val, err = strconv.ParseBool(value); err != nil {
return val.(bool), false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this assertion cause an error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out I don't need the cast. Thanks

}
return val, true
}

// Matches job keys like sla_role2/prod2/jobname2_job_uptime_50.00_sec
func isJobMetric(key string) bool {
// Regex for matching job specific tasks
re := regexp.MustCompile("^sla_(.*?)/(.*?)/.*")
return re.MatchString(key)
}

// Checks if the job key starts with task_store indicating it's a task store metric
func isTaskStore(key string) bool {
return strings.HasPrefix(key, "task_store_")
}

// Checks if the key is a framework_registered which is used to determine leader election
func isFramework(key string) bool {
return strings.HasPrefix(key, "framework_registered")
}

// This function parses a job metric key like sla_role2/prod2/jobname2_job_uptime_50.00_sec
// It returns the fields and the tags associated with those fields
func parseJobSpecificMetric(key string, value interface{}) (map[string]interface{}, map[string]string) {
// cut off the sla_
key = key[4:]
// We have previous checked if this is a job metric using isJobMetric so we know there will be 2 slashes
slashSplit := strings.Split(key, "/")
role := slashSplit[0]
env := slashSplit[1]
underscoreIdx := strings.Index(slashSplit[2], "_")
job := slashSplit[2][:underscoreIdx]
metric := slashSplit[2][underscoreIdx+1:]

fields := make(map[string]interface{})
fields[metric] = value

tags := make(map[string]string)
tags["role"] = role
tags["env"] = env
tags["job"] = job
return fields, tags
}

// This function takes a metric like task_store_DRAINED and generates aurora.task.store.DRAINED
func parseTaskStore(key string, value interface{}) (map[string]interface{}, map[string]string) {
metric := "task.store." + strings.Replace(key[len("task_store_"):], "_", ".", -1)
fields := make(map[string]interface{})
fields[metric] = value
tags := make(map[string]string)
return fields, tags
}

// This method parses the value out of the variable line which is always in the last place
// It returns the key, value, and error
func (a *Aurora) parseMetric(line string) (string, interface{}, error) {
splitIdx := strings.Index(line, " ")
if splitIdx == -1 {
return "", nil, fmt.Errorf("Invalid metric line %s has no value", line)
}
key := line[:splitIdx]
value := line[splitIdx+1:]
// If numeric is true and the metric is not numeric then ignore
numeric, isNumeric := convertToNumeric(value)
if a.Numeric && !isNumeric {
return "", nil, fmt.Errorf("Value is rejected due to being non-numeric")
}
return key, numeric, nil
}

// Gather() metrics from given list of Aurora Masters
func (a *Aurora) Gather(acc telegraf.Accumulator) error {
a.SetDefaults()

client := http.Client{
Timeout: time.Duration(a.Timeout) * time.Second,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long is this supposed to be?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. That's too long

}
url := fmt.Sprintf("%s://%s/vars", a.HttpPrefix, a.Master)
resp, err := client.Get(url)
if err != nil {
return err
}

data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
defer resp.Body.Close()

lines := strings.Split(string(data), "\n")
for _, line := range lines {
if isJobMetric(line) {
key, value, err := a.parseMetric(line)
if err != nil {
continue
}
fields, tags := parseJobSpecificMetric(key, value)
// Per job there are different tags so need to add a field per line
acc.AddFields("aurora", fields, tags)
} else if isTaskStore(line) {
key, value, err := a.parseMetric(line)
if err != nil {
continue
}
fields, tags := parseTaskStore(key, value)
acc.AddFields("aurora", fields, tags)
} else if isFramework(line) {
key, value, err := a.parseMetric(line)
if err != nil {
continue
}
fields := map[string]interface{}{
key: value,
}
tags := make(map[string]string)
acc.AddFields("aurora", fields, tags)
}
}
return nil
}

func init() {
inputs.Add("aurora", func() telegraf.Input {
return &Aurora{}
})
}
124 changes: 124 additions & 0 deletions plugins/inputs/aurora/aurora_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package aurora

import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)

var masterServer *httptest.Server

func getRawMetrics() string {
return `assigner_launch_failures 0
cron_job_triggers 240
sla_cluster_mtta_ms 18
sla_disk_small_mttr_ms 1029
sla_cpu_small_mtta_ms 17
jvm_prop_java.endorsed.dirs /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/endorsed
sla_role2/prod2/jobname2_job_uptime_50.00_sec 25`
}

func TestMain(m *testing.M) {
metrics := getRawMetrics()

masterRouter := http.NewServeMux()
masterRouter.HandleFunc("/vars", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, metrics)
})
masterServer = httptest.NewServer(masterRouter)

rc := m.Run()

masterServer.Close()
os.Exit(rc)
}

func TestConvertToNumeric(t *testing.T) {
if _, isNumeric := convertToNumeric("0.000"); !isNumeric {
t.Fatalf("0.000 should have been numeric")
}
if _, isNumeric := convertToNumeric("7"); !isNumeric {
t.Fatalf("7 should have been numeric")
}
if boolVal, isNumeric := convertToNumeric("true"); !isNumeric {
if val := boolVal.(int); val != 1 {
t.Fatalf("true should have been converted to a 1")
}
t.Fatalf("true should have been numeric")
}
if boolVal, isNumeric := convertToNumeric("false"); !isNumeric {
if val := boolVal.(int); val != 0 {
t.Fatalf("false should have been converted to a 0")
}
t.Fatalf("false should have been numeric")
}
if _, isNumeric := convertToNumeric("&"); isNumeric {
t.Fatalf("& should not be numeric")
}
}

func TestIsJobMetric(t *testing.T) {
var notJobMetrics = []string{
"assigner_launch_failures",
"cron_job_triggers",
"sla_cluster_mtta_ms",
"sla_disk_small_mttr_ms",
"sla_cpu_small_mtta_ms",
}
for _, metric := range notJobMetrics {
if isJobMetric(metric) {
t.Fatalf("%s should not be a job metric", metric)
}
}
var isJobMetrics = []string{
"sla_role2/prod2/jobname2_job_uptime_50.00_sec",
}
for _, metric := range isJobMetrics {
if !isJobMetric(metric) {
t.Fatalf("%s should be a job metric", metric)
}
}
}

func TestParseJobSpecificMetric(t *testing.T) {
var expectedFields = map[string]interface{}{
"job_uptime_50.00_sec": 0,
}
var expectedTags = map[string]string{
"role": "role2",
"env": "prod2",
"job": "jobname2",
}
key := "sla_role2/prod2/jobname2_job_uptime_50.00_sec"
value := 0
fields, tags := parseJobSpecificMetric(key, value)
assert.Equal(t, fields, expectedFields)
assert.Equal(t, tags, expectedTags)
}

func TestAuroraMaster(t *testing.T) {
var acc testutil.Accumulator

m := Aurora{
Master: masterServer.Listener.Addr().String(),
Timeout: 10,
HttpPrefix: "http",
Numeric: true,
}

err := m.Gather(&acc)
if err != nil {
t.Error(err)
}

var referenceMetrics = map[string]interface{}{
"job_uptime_50.00_sec": 25.0,
}
acc.AssertContainsFields(t, "aurora", referenceMetrics)
}