Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add input plugin for DC/OS #3519

Merged
merged 15 commits into from
Nov 29, 2017
Merged
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ github.com/couchbase/go-couchbase bfe555a140d53dc1adf390f1a1d4b0fd4ceadb28
github.com/couchbase/gomemcached 4a25d2f4e1dea9ea7dd76dfd943407abf9b07d29
github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6
github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76
github.com/dgrijalva/jwt-go dbeaa9332f19a944acb5736b4456cfcc02140e29
github.com/docker/docker f5ec1e2936dcbe7b5001c2b817188b095c700c27
github.com/docker/go-connections 990a1a1a70b0da4c4cb70e117971a4f0babfbf1a
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/consul"
_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
_ "github.com/influxdata/telegraf/plugins/inputs/dcos"
_ "github.com/influxdata/telegraf/plugins/inputs/disque"
_ "github.com/influxdata/telegraf/plugins/inputs/dmcache"
_ "github.com/influxdata/telegraf/plugins/inputs/dns_query"
209 changes: 209 additions & 0 deletions plugins/inputs/dcos/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# DC/OS Input Plugin

This input plugin gathers metrics from a DC/OS cluster's [metrics component](https://docs.mesosphere.com/1.10/metrics/).

**Series Cardinality Warning**

Depending on the work load of your DC/OS cluster, this plugin can quickly
create a high number of series which, when unchecked, can cause high load on
your database.

- Use [measurement filtering](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#measurement-filtering) liberally to exclude unneeded metrics as well as the node, container, and app inclue/exclude options.
- Write to a database with an appropriate [retention policy](https://docs.influxdata.com/influxdb/v1.3/concepts/glossary/#retention-policy-rp).
- Limit the number of series allowed in your database using the `max-series-per-database` and `max-values-per-tag` settings.
- Consider enabling the [TSI](https://docs.influxdata.com/influxdb/v1.3/about_the_project/releasenotes-changelog/#release-notes-8) engine.
- Monitor your [series cardinality](https://docs.influxdata.com/influxdb/v1.3/troubleshooting/frequently-asked-questions/#how-can-i-query-for-series-cardinality).

### Configuration:
```toml
[[inputs.dcos]]
## The DC/OS cluster URL.
cluster_url = "https://dcos-master-1"

## The ID of the service account.
service_account_id = "telegraf"
## The private key file for the service account.
service_account_private_key = "/etc/telegraf/telegraf-sa-key.pem"

## Path containing login token. If set, will read on every gather.
# token_file = "/home/dcos/.dcos/token"

## In all filter options if both include and exclude are empty all items
## will be collected. Arrays may contain glob patterns.
##
## Node IDs to collect metrics from. If a node is excluded, no metrics will
## be collected for its containers or apps.
# node_include = []
# node_exclude = []
## Container IDs to collect container metrics from.
# container_include = []
# container_exclude = []
## Container IDs to collect app metrics from.
# app_include = []
# app_exclude = []

## Maximum concurrent connections to the cluster.
# max_connections = 10
## Maximum time to receive a response from cluster.
# response_timeout = "20s"

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## If false, skip chain & host verification
# insecure_skip_verify = true

## Recommended filtering to reduce series cardinality.
# [inputs.dcos.tagdrop]
# path = ["/var/lib/mesos/slave/slaves/*"]
```

#### Enterprise Authentication

When using Enterprise DC/OS, it is recommended to use a service account to
authenticate with the cluster.

The plugin requires the following permissions:
```
dcos:adminrouter:ops:system-metrics full
dcos:adminrouter:ops:mesos full
```

Follow the directions to [create a service account and assign permissions](https://docs.mesosphere.com/1.10/security/service-auth/custom-service-auth/).

Quick configuration using the Enterprise CLI:
```
dcos security org service-accounts keypair telegraf-sa-key.pem telegraf-sa-cert.pem
dcos security org service-accounts create -p telegraf-sa-cert.pem -d "Telegraf DC/OS input plugin" telegraf
dcos security org users grant telegraf dcos:adminrouter:ops:system-metrics full
dcos security org users grant telegraf dcos:adminrouter:ops:mesos full
```

#### Open Source Authentication

The Open Source DC/OS does not provide service accounts. Instead you can use
of the following options:

1. [Disable authentication](https://dcos.io/docs/1.10/security/managing-authentication/#authentication-opt-out)
2. Use the `token_file` parameter to read a authentication token from a file.

Then `token_file` can be set by using the [dcos cli] to login periodically.
The cli can login for at most XXX days, you will need to ensure the cli
performs a new login before this time expires.
```
dcos auth login --username foo --password bar
dcos config show core.dcos_acs_token > ~/.dcos/token
```

Another option to create a `token_file` is to generate a token using the
cluster secret. This will allow you to set the expiration date manually or
even create a never expiring token. However, if the cluster secret or the
token is compromised it cannot be revoked and may require a full reinstall of
the cluster. For more information on this technique reference
[this blog post](https://medium.com/@richardgirges/authenticating-open-source-dc-os-with-third-party-services-125fa33a5add).

### Metrics:

Please consult the [Metrics Reference](https://docs.mesosphere.com/1.10/metrics/reference/)
for details on interprete field interpretation.

- dcos_node
- tags:
- cluster
- hostname
- path (filesystem fields only)
- interface (network fields only)
- fields:
- system_uptime (float)
- cpu_cores (float)
- cpu_total (float)
- cpu_user (float)
- cpu_system (float)
- cpu_idle (float)
- cpu_wait (float)
- load_1min (float)
- load_5min (float)
- load_15min (float)
- filesystem_capacity_total_bytes (int)
- filesystem_capacity_used_bytes (int)
- filesystem_capacity_free_bytes (int)
- filesystem_inode_total (float)
- filesystem_inode_used (float)
- filesystem_inode_free (float)
- memory_total_bytes (int)
- memory_free_bytes (int)
- memory_buffers_bytes (int)
- memory_cached_bytes (int)
- swap_total_bytes (int)
- swap_free_bytes (int)
- swap_used_bytes (int)
- network_in_bytes (int)
- network_out_bytes (int)
- network_in_packets (float)
- network_out_packets (float)
- network_in_dropped (float)
- network_out_dropped (float)
- network_in_errors (float)
- network_out_errors (float)
- process_count (float)

- dcos_container
- tags:
- cluster
- hostname
- container_id
- task_name
- fields:
- cpus_limit (float)
- cpus_system_time (float)
- cpus_throttled_time (float)
- cpus_user_time (float)
- disk_limit_bytes (int)
- disk_used_bytes (int)
- mem_limit_bytes (int)
- mem_total_bytes (int)
- net_rx_bytes (int)
- net_rx_dropped (float)
- net_rx_errors (float)
- net_rx_packets (float)
- net_tx_bytes (int)
- net_tx_dropped (float)
- net_tx_errors (float)
- net_tx_packets (float)

- dcos_app
- tags:
- cluster
- hostname
- container_id
- task_name
- fields:
- fields are application specific

### Example Output:

```
dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/boot filesystem_capacity_free_bytes=918188032i,filesystem_capacity_total_bytes=1063256064i,filesystem_capacity_used_bytes=145068032i,filesystem_inode_free=523958,filesystem_inode_total=524288,filesystem_inode_used=330 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=dummy0 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=docker0 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18 cpu_cores=2,cpu_idle=81.62,cpu_system=4.19,cpu_total=13.670000000000002,cpu_user=9.48,cpu_wait=0,load_15min=0.7,load_1min=0.22,load_5min=0.6,memory_buffers_bytes=970752i,memory_cached_bytes=1830473728i,memory_free_bytes=1178636288i,memory_total_bytes=3975073792i,process_count=198,swap_free_bytes=859828224i,swap_total_bytes=859828224i,swap_used_bytes=0i,system_uptime=18874 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=lo network_in_bytes=1090992450i,network_in_dropped=0,network_in_errors=0,network_in_packets=1546938,network_out_bytes=1090992450i,network_out_dropped=0,network_out_errors=0,network_out_packets=1546938 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/ filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=minuteman network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=210i,network_out_dropped=0,network_out_errors=0,network_out_packets=3 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=eth0 network_in_bytes=539886216i,network_in_dropped=1,network_in_errors=0,network_in_packets=979808,network_out_bytes=112395836i,network_out_dropped=0,network_out_errors=0,network_out_packets=891239 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=spartan network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=210i,network_out_dropped=0,network_out_errors=0,network_out_packets=3 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/var/lib/docker/overlay filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=vtep1024 network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,path=/var/lib/docker/plugins filesystem_capacity_free_bytes=1668378624i,filesystem_capacity_total_bytes=6641680384i,filesystem_capacity_used_bytes=4973301760i,filesystem_inode_free=3107856,filesystem_inode_total=3248128,filesystem_inode_used=140272 1511859222000000000
dcos_node,cluster=enterprise,hostname=192.168.122.18,interface=d-dcos network_in_bytes=0i,network_in_dropped=0,network_in_errors=0,network_in_packets=0,network_out_bytes=0i,network_out_dropped=0,network_out_errors=0,network_out_packets=0 1511859222000000000
dcos_app,cluster=enterprise,container_id=9a78d34a-3bbf-467e-81cf-a57737f154ee,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
dcos_container,cluster=enterprise,container_id=cbf19b77-3b8d-4bcf-b81f-824b67279629,hostname=192.168.122.18 cpus_limit=0.3,cpus_system_time=307.31,cpus_throttled_time=102.029930607,cpus_user_time=268.57,disk_limit_bytes=268435456i,disk_used_bytes=30953472i,mem_limit_bytes=570425344i,mem_total_bytes=13316096i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000
dcos_app,cluster=enterprise,container_id=cbf19b77-3b8d-4bcf-b81f-824b67279629,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
dcos_container,cluster=enterprise,container_id=5725e219-f66e-40a8-b3ab-519d85f4c4dc,hostname=192.168.122.18,task_name=hello-world cpus_limit=0.6,cpus_system_time=25.6,cpus_throttled_time=327.977109217,cpus_user_time=566.54,disk_limit_bytes=0i,disk_used_bytes=0i,mem_limit_bytes=1107296256i,mem_total_bytes=335941632i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000
dcos_app,cluster=enterprise,container_id=5725e219-f66e-40a8-b3ab-519d85f4c4dc,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
dcos_app,cluster=enterprise,container_id=c76e1488-4fb7-4010-a4cf-25725f8173f9,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
dcos_container,cluster=enterprise,container_id=cbe0b2f9-061f-44ac-8f15-4844229e8231,hostname=192.168.122.18,task_name=telegraf cpus_limit=0.2,cpus_system_time=8.109999999,cpus_throttled_time=93.183916045,cpus_user_time=17.97,disk_limit_bytes=0i,disk_used_bytes=0i,mem_limit_bytes=167772160i,mem_total_bytes=0i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000
dcos_container,cluster=enterprise,container_id=b64115de-3d2a-431d-a805-76e7c46453f1,hostname=192.168.122.18 cpus_limit=0.2,cpus_system_time=2.69,cpus_throttled_time=20.064861214,cpus_user_time=6.56,disk_limit_bytes=268435456i,disk_used_bytes=29360128i,mem_limit_bytes=297795584i,mem_total_bytes=13733888i,net_rx_bytes=0i,net_rx_dropped=0,net_rx_errors=0,net_rx_packets=0,net_tx_bytes=0i,net_tx_dropped=0,net_tx_errors=0,net_tx_packets=0 1511859222000000000
dcos_app,cluster=enterprise,container_id=b64115de-3d2a-431d-a805-76e7c46453f1,hostname=192.168.122.18 container_received_bytes_per_sec=0,container_throttled_bytes_per_sec=0 1511859222000000000
```
332 changes: 332 additions & 0 deletions plugins/inputs/dcos/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
package dcos

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

jwt "github.com/dgrijalva/jwt-go"
)

const (
// How long to stayed logged in for
loginDuration = 65 * time.Minute
)

// Client is an interface for communicating with the DC/OS API.
type Client interface {
SetToken(token string)

Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error)
GetSummary(ctx context.Context) (*Summary, error)
GetContainers(ctx context.Context, node string) ([]Container, error)
GetNodeMetrics(ctx context.Context, node string) (*Metrics, error)
GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error)
GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error)
}

type APIError struct {
StatusCode int
Title string
Description string
}

// Login is request data for logging in.
type Login struct {
UID string `json:"uid"`
Exp int64 `json:"exp"`
Token string `json:"token"`
}

// LoginError is the response when login fails.
type LoginError struct {
Title string `json:"title"`
Description string `json:"description"`
}

// LoginAuth is the response to a successful login.
type LoginAuth struct {
Token string `json:"token"`
}

// Slave is a node in the cluster.
type Slave struct {
ID string `json:"id"`
}

// Summary provides high level cluster wide information.
type Summary struct {
Cluster string
Slaves []Slave
}

// Container is a container on a node.
type Container struct {
ID string
}

type DataPoint struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Unit string `json:"unit"`
Value float64 `json:"value"`
}

// Metrics are the DCOS metrics
type Metrics struct {
Datapoints []DataPoint `json:"datapoints"`
Dimensions map[string]interface{} `json:"dimensions"`
}

// AuthToken is the authentication token.
type AuthToken struct {
Text string
Expire time.Time
}

// ClusterClient is a Client that uses the cluster URL.
type ClusterClient struct {
clusterURL *url.URL
httpClient *http.Client
credentials *Credentials
token string
semaphore chan struct{}
}

type claims struct {
UID string `json:"uid"`
jwt.StandardClaims
}

func (e APIError) Error() string {
if e.Description != "" {
return fmt.Sprintf("%s: %s", e.Title, e.Description)
}
return e.Title
}

func NewClusterClient(
clusterURL *url.URL,
timeout time.Duration,
maxConns int,
tlsConfig *tls.Config,
) *ClusterClient {
httpClient := &http.Client{
Transport: &http.Transport{
MaxIdleConns: maxConns,
TLSClientConfig: tlsConfig,
},
Timeout: timeout,
}
semaphore := make(chan struct{}, maxConns)

c := &ClusterClient{
clusterURL: clusterURL,
httpClient: httpClient,
semaphore: semaphore,
}
return c
}

func (c *ClusterClient) SetToken(token string) {
c.token = token
}

func (c *ClusterClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthToken, error) {
token, err := c.createLoginToken(sa)
if err != nil {
return nil, err
}

exp := time.Now().Add(loginDuration)

body := &Login{
UID: sa.AccountID,
Exp: exp.Unix(),
Token: token,
}

octets, err := json.Marshal(body)
if err != nil {
return nil, err
}

req, err := http.NewRequest("POST", c.url("/acs/api/v1/auth/login"), bytes.NewBuffer(octets))
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/json")

req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
auth := &LoginAuth{}
dec := json.NewDecoder(resp.Body)
err = dec.Decode(auth)
if err != nil {
return nil, err
}

token := &AuthToken{
Text: auth.Token,
Expire: exp,
}
return token, nil
}

loginError := &LoginError{}
dec := json.NewDecoder(resp.Body)
err = dec.Decode(loginError)
if err != nil {
err := &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
}
return nil, err
}

err = &APIError{
StatusCode: resp.StatusCode,
Title: loginError.Title,
Description: loginError.Description,
}
return nil, err
}

func (c *ClusterClient) GetSummary(ctx context.Context) (*Summary, error) {
summary := &Summary{}
err := c.doGet(ctx, c.url("/mesos/master/state-summary"), summary)
if err != nil {
return nil, err
}

return summary, nil
}

func (c *ClusterClient) GetContainers(ctx context.Context, node string) ([]Container, error) {
list := []string{}

path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node)
err := c.doGet(ctx, c.url(path), &list)
if err != nil {
return nil, err
}

containers := make([]Container, 0, len(list))
for _, c := range list {
containers = append(containers, Container{ID: c})

}

return containers, nil
}

func (c *ClusterClient) getMetrics(ctx context.Context, url string) (*Metrics, error) {
metrics := &Metrics{}

err := c.doGet(ctx, url, metrics)
if err != nil {
return nil, err
}

return metrics, nil
}

func (c *ClusterClient) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) {
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node)
return c.getMetrics(ctx, c.url(path))
}

func (c *ClusterClient) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) {
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container)
return c.getMetrics(ctx, c.url(path))
}

func (c *ClusterClient) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) {
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container)
return c.getMetrics(ctx, c.url(path))
}

func createGetRequest(url string, token string) (*http.Request, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}

if token != "" {
req.Header.Add("Authorization", "token="+token)
}
req.Header.Add("Accept", "application/json")

return req, nil
}

func (c *ClusterClient) doGet(ctx context.Context, url string, v interface{}) error {
req, err := createGetRequest(url, c.token)
if err != nil {
return err
}

select {
case c.semaphore <- struct{}{}:
break
case <-ctx.Done():
return ctx.Err()
}

resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
<-c.semaphore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly add this to the defer above

return err
}
defer func() {
resp.Body.Close()
<-c.semaphore
}()

// Clear invalid token if unauthorized
if resp.StatusCode == http.StatusUnauthorized {
c.token = ""
}

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
}
}

if resp.StatusCode == http.StatusNoContent {
return nil
}

err = json.NewDecoder(resp.Body).Decode(v)
return err
}

func (c *ClusterClient) url(path string) string {
url := c.clusterURL
url.Path = path
return url.String()
}

func (c *ClusterClient) createLoginToken(sa *ServiceAccount) (string, error) {
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{
UID: sa.AccountID,
StandardClaims: jwt.StandardClaims{
// How long we have to login with this token
ExpiresAt: int64(5 * time.Minute / time.Second),
},
})
return token.SignedString(sa.PrivateKey)
}
232 changes: 232 additions & 0 deletions plugins/inputs/dcos/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package dcos

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

jwt "github.com/dgrijalva/jwt-go"
"github.com/stretchr/testify/require"
)

const (
privateKey = `-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQCwlGyzVp9cqtwiNCgCnaR0kilPZhr4xFBcnXxvQ8/uzOHaWKxj
XWR38cKR3gPh5+4iSmzMdo3HDJM5ks6imXGnp+LPOA5iNewnpLNs7UxA2arwKH/6
4qIaAXAtf5jE46wZIMgc2EW9wGL3dxC0JY8EXPpBFB/3J8gADkorFR8lwwIDAQAB
AoGBAJaFHxfMmjHK77U0UnrQWFSKFy64cftmlL4t/Nl3q7L68PdIKULWZIMeEWZ4
I0UZiFOwr4em83oejQ1ByGSwekEuiWaKUI85IaHfcbt+ogp9hY/XbOEo56OPQUAd
bEZv1JqJOqta9Ug1/E1P9LjEEyZ5F5ubx7813rxAE31qKtKJAkEA1zaMlCWIr+Rj
hGvzv5rlHH3wbOB4kQFXO4nqj3J/ttzR5QiJW24STMDcbNngFlVcDVju56LrNTiD
dPh9qvl7nwJBANILguR4u33OMksEZTYB7nQZSurqXsq6382zH7pTl29ANQTROHaM
PKC8dnDWq8RGTqKuvWblIzzGIKqIMovZo10CQC96T0UXirITFolOL3XjvAuvFO1Q
EAkdXJs77805m0dCK+P1IChVfiAEpBw3bKJArpAbQIlFfdI953JUp5SieU0CQEub
BSSEKMjh/cxu6peEHnb/262vayuCFKkQPu1sxWewLuVrAe36EKCy9dcsDmv5+rgo
Odjdxc9Madm4aKlaT6kCQQCpAgeblDrrxTrNQ+Typzo37PlnQrvI+0EceAUuJ72G
P0a+YZUeHNRqT2pPN9lMTAZGGi3CtcF2XScbLNEBeXge
-----END RSA PRIVATE KEY-----`
)

func TestLogin(t *testing.T) {
var tests = []struct {
name string
responseCode int
responseBody string
expectedError error
expectedToken string
}{
{
name: "Login successful",
responseCode: 200,
responseBody: `{"token": "XXX.YYY.ZZZ"}`,
expectedError: nil,
expectedToken: "XXX.YYY.ZZZ",
},
{
name: "Unauthorized Error",
responseCode: http.StatusUnauthorized,
responseBody: `{"title": "x", "description": "y"}`,
expectedError: &APIError{http.StatusUnauthorized, "x", "y"},
expectedToken: "",
},
}

key, err := jwt.ParseRSAPrivateKeyFromPEM([]byte(privateKey))
require.NoError(t, err)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tt.responseCode)
fmt.Fprintln(w, tt.responseBody)
})
ts := httptest.NewServer(handler)
u, err := url.Parse(ts.URL)
require.NoError(t, err)

ctx := context.Background()
sa := &ServiceAccount{
AccountID: "telegraf",
PrivateKey: key,
}
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
auth, err := client.Login(ctx, sa)

require.Equal(t, tt.expectedError, err)

if tt.expectedToken != "" {
require.Equal(t, tt.expectedToken, auth.Text)
} else {
require.Nil(t, auth)
}

ts.Close()
})
}
}

func TestGetSummary(t *testing.T) {
var tests = []struct {
name string
responseCode int
responseBody string
expectedValue *Summary
expectedError error
}{
{
name: "No nodes",
responseCode: 200,
responseBody: `{"cluster": "a", "slaves": []}`,
expectedValue: &Summary{Cluster: "a", Slaves: []Slave{}},
expectedError: nil,
},
{
name: "Unauthorized Error",
responseCode: http.StatusUnauthorized,
responseBody: `<html></html>`,
expectedValue: nil,
expectedError: &APIError{StatusCode: http.StatusUnauthorized, Title: "401 Unauthorized"},
},
{
name: "Has nodes",
responseCode: 200,
responseBody: `{"cluster": "a", "slaves": [{"id": "a"}, {"id": "b"}]}`,
expectedValue: &Summary{
Cluster: "a",
Slaves: []Slave{
Slave{ID: "a"},
Slave{ID: "b"},
},
},
expectedError: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// check the path
w.WriteHeader(tt.responseCode)
fmt.Fprintln(w, tt.responseBody)
})
ts := httptest.NewServer(handler)
u, err := url.Parse(ts.URL)
require.NoError(t, err)

ctx := context.Background()
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
summary, err := client.GetSummary(ctx)

require.Equal(t, tt.expectedError, err)
require.Equal(t, tt.expectedValue, summary)

ts.Close()
})
}

}

func TestGetNodeMetrics(t *testing.T) {
var tests = []struct {
name string
responseCode int
responseBody string
expectedValue *Metrics
expectedError error
}{
{
name: "Empty Body",
responseCode: 200,
responseBody: `{}`,
expectedValue: &Metrics{},
expectedError: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// check the path
w.WriteHeader(tt.responseCode)
fmt.Fprintln(w, tt.responseBody)
})
ts := httptest.NewServer(handler)
u, err := url.Parse(ts.URL)
require.NoError(t, err)

ctx := context.Background()
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
m, err := client.GetNodeMetrics(ctx, "foo")

require.Equal(t, tt.expectedError, err)
require.Equal(t, tt.expectedValue, m)

ts.Close()
})
}

}

func TestGetContainerMetrics(t *testing.T) {
var tests = []struct {
name string
responseCode int
responseBody string
expectedValue *Metrics
expectedError error
}{
{
name: "204 No Contents",
responseCode: 204,
responseBody: ``,
expectedValue: &Metrics{},
expectedError: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// check the path
w.WriteHeader(tt.responseCode)
fmt.Fprintln(w, tt.responseBody)
})
ts := httptest.NewServer(handler)
u, err := url.Parse(ts.URL)
require.NoError(t, err)

ctx := context.Background()
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
m, err := client.GetContainerMetrics(ctx, "foo", "bar")

require.Equal(t, tt.expectedError, err)
require.Equal(t, tt.expectedValue, m)

ts.Close()
})
}

}
72 changes: 72 additions & 0 deletions plugins/inputs/dcos/creds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package dcos

import (
"context"
"crypto/rsa"
"fmt"
"io/ioutil"
"strings"
"time"
"unicode/utf8"
)

const (
// How long before expiration to renew token
relogDuration = 5 * time.Minute
)

type Credentials interface {
Token(ctx context.Context, client Client) (string, error)
IsExpired() bool
}

type ServiceAccount struct {
AccountID string
PrivateKey *rsa.PrivateKey

auth *AuthToken
}

type TokenCreds struct {
Path string
}

type NullCreds struct {
}

func (c *ServiceAccount) Token(ctx context.Context, client Client) (string, error) {
auth, err := client.Login(ctx, c)
if err != nil {
return "", err
}
c.auth = auth
return auth.Text, nil
}

func (c *ServiceAccount) IsExpired() bool {
return c.auth.Text != "" || c.auth.Expire.Add(relogDuration).After(time.Now())
}

func (c *TokenCreds) Token(ctx context.Context, client Client) (string, error) {
octets, err := ioutil.ReadFile(c.Path)
if err != nil {
return "", fmt.Errorf("Error reading token file %q: %s", c.Path, err)
}
if !utf8.Valid(octets) {
return "", fmt.Errorf("Token file does not contain utf-8 encoded text: %s", c.Path)
}
token := strings.TrimSpace(string(octets))
return token, nil
}

func (c *TokenCreds) IsExpired() bool {
return true
}

func (c *NullCreds) Token(ctx context.Context, client Client) (string, error) {
return "", nil
}

func (c *NullCreds) IsExpired() bool {
return true
}
435 changes: 435 additions & 0 deletions plugins/inputs/dcos/dcos.go

Large diffs are not rendered by default.

441 changes: 441 additions & 0 deletions plugins/inputs/dcos/dcos_test.go

Large diffs are not rendered by default.