Skip to content

Commit

Permalink
Add TLS support to the mesos input plugin (#3769)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Feb 8, 2018
1 parent d467a20 commit 2950a3b
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 47 deletions.
12 changes: 10 additions & 2 deletions plugins/inputs/mesos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ For more information, please check the [Mesos Observability Metrics](http://meso
## Timeout, in ms.
timeout = 100
## A list of Mesos masters.
masters = ["localhost:5050"]
masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled.
master_collections = [
"resources",
Expand All @@ -35,6 +35,13 @@ For more information, please check the [Mesos Observability Metrics](http://meso
# "tasks",
# "messages",
# ]

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
```

By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
Expand Down Expand Up @@ -235,7 +242,8 @@ Mesos slave metric groups
### Tags:

- All master/slave measurements have the following tags:
- server
- server (network location of server: `host:port`)
- url (URL origin of server: `scheme://host:port`)
- role (master/slave)

- All master measurements have the extra tags:
Expand Down
182 changes: 137 additions & 45 deletions plugins/inputs/mesos/mesos.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
Expand All @@ -30,6 +33,20 @@ type Mesos struct {
Slaves []string
SlaveCols []string `toml:"slave_collections"`
//SlaveTasks bool

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool

initialized bool
client *http.Client
masterURLs []*url.URL
slaveURLs []*url.URL
}

var allMetrics = map[Role][]string{
Expand All @@ -41,7 +58,7 @@ var sampleConfig = `
## Timeout, in ms.
timeout = 100
## A list of Mesos masters.
masters = ["localhost:5050"]
masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled.
master_collections = [
"resources",
Expand All @@ -65,6 +82,13 @@ var sampleConfig = `
# "tasks",
# "messages",
# ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
`

// SampleConfig returns a sample configuration block
Expand All @@ -77,7 +101,28 @@ func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters"
}

func (m *Mesos) SetDefaults() {
func parseURL(s string, role Role) (*url.URL, error) {
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
host, port, err := net.SplitHostPort(s)
// no port specified
if err != nil {
host = s
switch role {
case MASTER:
port = "5050"
case SLAVE:
port = "5051"
}
}

s = "http://" + host + ":" + port
log.Printf("W! [inputs.mesos] Using %q as connection URL; please update your configuration to use an URL", s)
}

return url.Parse(s)
}

func (m *Mesos) initialize() error {
if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER]
}
Expand All @@ -87,41 +132,79 @@ func (m *Mesos) SetDefaults() {
}

if m.Timeout == 0 {
log.Println("I! [mesos] Missing timeout value, setting default value (100ms)")
log.Println("I! [inputs.mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}

rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms"

m.masterURLs = make([]*url.URL, 0, len(m.Masters))
for _, master := range m.Masters {
u, err := parseURL(master, MASTER)
if err != nil {
return err
}

u.RawQuery = rawQuery
m.masterURLs = append(m.masterURLs, u)
}

m.slaveURLs = make([]*url.URL, 0, len(m.Slaves))
for _, slave := range m.Slaves {
u, err := parseURL(slave, SLAVE)
if err != nil {
return err
}

u.RawQuery = rawQuery
m.slaveURLs = append(m.slaveURLs, u)
}

client, err := m.createHttpClient()
if err != nil {
return err
}
m.client = client

return nil
}

// Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
if !m.initialized {
err := m.initialize()
if err != nil {
return err
}
m.initialized = true
}

m.SetDefaults()
var wg sync.WaitGroup

for _, v := range m.Masters {
for _, master := range m.masterURLs {
wg.Add(1)
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5050", MASTER, acc))
go func(master *url.URL) {
acc.AddError(m.gatherMainMetrics(master, MASTER, acc))
wg.Done()
return
}(v)
}(master)
}

for _, v := range m.Slaves {
for _, slave := range m.slaveURLs {
wg.Add(1)
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5051", SLAVE, acc))
go func(slave *url.URL) {
acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc))
wg.Done()
return
}(v)
}(slave)

// if !m.SlaveTasks {
// continue
// }

// wg.Add(1)
// go func(c string) {
// acc.AddError(m.gatherSlaveTaskMetrics(c, ":5051", acc))
// acc.AddError(m.gatherSlaveTaskMetrics(slave, acc))
// wg.Done()
// return
// }(v)
Expand All @@ -132,6 +215,24 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
return nil
}

func (m *Mesos) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
if err != nil {
return nil, err
}

client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
},
Timeout: 4 * time.Second,
}

return client, nil
}

// metricsDiff() returns set names for removal
func metricsDiff(role Role, w []string) []string {
b := []string{}
Expand Down Expand Up @@ -393,38 +494,22 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
}
}

var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

// TaskStats struct for JSON API output /monitor/statistics
type TaskStats struct {
ExecutorID string `json:"executor_id"`
FrameworkID string `json:"framework_id"`
Statistics map[string]interface{} `json:"statistics"`
}

func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error {
func (m *Mesos) gatherSlaveTaskMetrics(u *url.URL, acc telegraf.Accumulator) error {
var metrics []TaskStats

host, _, err := net.SplitHostPort(address)
if err != nil {
host = address
address = address + defaultPort
}

tags := map[string]string{
"server": host,
"server": u.Hostname(),
"url": urlTag(u),
}

ts := strconv.Itoa(m.Timeout) + "ms"

resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
resp, err := m.client.Get(withPath(u, "/monitor/statistics").String())

if err != nil {
return err
Expand Down Expand Up @@ -459,24 +544,31 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t
return nil
}

func withPath(u *url.URL, path string) *url.URL {
c := *u
c.Path = path
return &c
}

func urlTag(u *url.URL) string {
c := *u
c.Path = ""
c.User = nil
c.RawQuery = ""
return c.String()
}

// This should not belong to the object
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error {
func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}

host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + defaultPort
}

tags := map[string]string{
"server": host,
"server": u.Hostname(),
"url": urlTag(u),
"role": string(role),
}

ts := strconv.Itoa(m.Timeout) + "ms"

resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
resp, err := m.client.Get(withPath(u, "/metrics/snapshot").String())

if err != nil {
return err
Expand Down
18 changes: 18 additions & 0 deletions plugins/inputs/mesos/mesos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

var masterMetrics map[string]interface{}
Expand Down Expand Up @@ -378,3 +380,19 @@ func TestSlaveFilter(t *testing.T) {
}
}
}

func TestWithPathDoesNotModify(t *testing.T) {
u, err := url.Parse("http://localhost:5051")
require.NoError(t, err)
v := withPath(u, "/xyzzy")
require.Equal(t, u.String(), "http://localhost:5051")
require.Equal(t, v.String(), "http://localhost:5051/xyzzy")
}

func TestURLTagDoesNotModify(t *testing.T) {
u, err := url.Parse("http://a:b@localhost:5051?timeout=1ms")
require.NoError(t, err)
v := urlTag(u)
require.Equal(t, u.String(), "http://a:b@localhost:5051?timeout=1ms")
require.Equal(t, v, "http://localhost:5051")
}

0 comments on commit 2950a3b

Please sign in to comment.