Skip to content

Commit

Permalink
Add ability to collect pod labels to Kubernetes input (influxdata#6764)
Browse files Browse the repository at this point in the history
  • Loading branch information
linalinn authored and Mathieu Lecarme committed Apr 17, 2020
1 parent d109d12 commit 9af177e
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 41 deletions.
7 changes: 6 additions & 1 deletion plugins/inputs/kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kubernetes Input Plugin

This input plugin talks to the kubelet api using the `/stats/summary` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.
This input plugin talks to the kubelet api using the `/stats/summary` and `/pods` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.

To find the ip address of the host you are running on you can issue a command like the following:

Expand Down Expand Up @@ -44,6 +44,11 @@ avoid cardinality issues:
## OR
# bearer_token_string = "abc_123"

# Labels to include and exclude
# An empty array for include and exclude will include all labels
# label_include = []
# label_exclude = ["*"]

## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

Expand Down
123 changes: 87 additions & 36 deletions plugins/inputs/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"encoding/json"
"fmt"
"github.com/influxdata/telegraf/filter"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -23,6 +24,11 @@ type Kubernetes struct {
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`

LabelInclude []string `toml:"label_include"`
LabelExclude []string `toml:"label_exclude"`

labelFilter filter.Filter

// HTTP Timeout specified as a string - 3s, 1m, 1h
ResponseTimeout internal.Duration

Expand All @@ -42,6 +48,11 @@ var sampleConfig = `
## OR
# bearer_token_string = "abc_123"
# Labels to include and exclude
# An empty array for include and exclude will include all labels
# label_include = []
# label_exclude = ["*"]
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
Expand All @@ -60,7 +71,10 @@ const (

func init() {
inputs.Add("kubernetes", func() telegraf.Input {
return &Kubernetes{}
return &Kubernetes{
LabelInclude: []string{},
LabelExclude: []string{"*"},
}
})
}

Expand All @@ -75,6 +89,7 @@ func (k *Kubernetes) Description() string {
}

func (k *Kubernetes) Init() error {

// If neither are provided, use the default service account.
if k.BearerToken == "" && k.BearerTokenString == "" {
k.BearerToken = defaultServiceAccountPath
Expand All @@ -88,6 +103,12 @@ func (k *Kubernetes) Init() error {
k.BearerTokenString = strings.TrimSpace(string(token))
}

labelFilter, err := filter.NewIncludeExcludeFilter(k.LabelInclude, k.LabelExclude)
if err != nil {
return err
}
k.labelFilter = labelFilter

return nil
}

Expand All @@ -107,48 +128,19 @@ func buildURL(endpoint string, base string) (*url.URL, error) {
}

func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error {
url := fmt.Sprintf("%s/stats/summary", baseURL)
var req, err = http.NewRequest("GET", url, nil)
var resp *http.Response

tlsCfg, err := k.ClientConfig.TLSConfig()
summaryMetrics := &SummaryMetrics{}
err := k.LoadJson(fmt.Sprintf("%s/stats/summary", baseURL), summaryMetrics)
if err != nil {
return err
}

if k.RoundTripper == nil {
// Set default values
if k.ResponseTimeout.Duration < time.Second {
k.ResponseTimeout.Duration = time.Second * 5
}
k.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
}
}

req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
req.Header.Add("Accept", "application/json")

resp, err = k.RoundTripper.RoundTrip(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}

summaryMetrics := &SummaryMetrics{}
err = json.NewDecoder(resp.Body).Decode(summaryMetrics)
podInfos, err := k.gatherPodInfo(baseURL)
if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err)
return err
}
buildSystemContainerMetrics(summaryMetrics, acc)
buildNodeMetrics(summaryMetrics, acc)
buildPodMetrics(summaryMetrics, acc)
buildPodMetrics(baseURL, summaryMetrics, podInfos, k.labelFilter, acc)
return nil
}

Expand Down Expand Up @@ -200,7 +192,56 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator)
acc.AddFields("kubernetes_node", fields, tags)
}

func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Metadata, error) {
var podApi Pods
err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podApi)
if err != nil {
return nil, err
}
var podInfos []Metadata
for _, podMetadata := range podApi.Items {
podInfos = append(podInfos, podMetadata.Metadata)
}
return podInfos, nil
}

func (k *Kubernetes) LoadJson(url string, v interface{}) error {
var req, err = http.NewRequest("GET", url, nil)
var resp *http.Response
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}
if k.RoundTripper == nil {
if k.ResponseTimeout.Duration < time.Second {
k.ResponseTimeout.Duration = time.Second * 5
}
k.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
}
}
req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
req.Header.Add("Accept", "application/json")
resp, err = k.RoundTripper.RoundTrip(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}

err = json.NewDecoder(resp.Body).Decode(v)
if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err)
}

return nil
}

func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFilter filter.Filter, acc telegraf.Accumulator) {
for _, pod := range summaryMetrics.Pods {
for _, container := range pod.Containers {
tags := map[string]string{
Expand All @@ -209,6 +250,16 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
"container_name": container.Name,
"pod_name": pod.PodRef.Name,
}
for _, info := range podInfo {
if info.Name == pod.PodRef.Name && info.Namespace == pod.PodRef.Namespace {
for k, v := range info.Labels {
if labelFilter.Match(k) {
tags[k] = v
}
}
}
}

fields := make(map[string]interface{})
fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores
fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds
Expand Down
17 changes: 17 additions & 0 deletions plugins/inputs/kubernetes/kubernetes_pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kubernetes

type Pods struct {
Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Items []Item `json:"items"`
}

type Item struct {
Metadata Metadata `json:"metadata"`
}

type Metadata struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Labels map[string]string `json:"labels"`
}
55 changes: 51 additions & 4 deletions plugins/inputs/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"fmt"
"github.com/influxdata/telegraf/filter"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -12,13 +13,23 @@ import (

func TestKubernetesStats(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, response)
if r.RequestURI == "/stats/summary" {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, responseStatsSummery)
}
if r.RequestURI == "/pods" {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, responsePods)
}

}))
defer ts.Close()

labelFilter, _ := filter.NewIncludeExcludeFilter([]string{"app", "superkey"}, nil)

k := &Kubernetes{
URL: ts.URL,
URL: ts.URL,
labelFilter: labelFilter,
}

var acc testutil.Accumulator
Expand Down Expand Up @@ -89,6 +100,8 @@ func TestKubernetesStats(t *testing.T) {
"container_name": "foocontainer",
"namespace": "foons",
"pod_name": "foopod",
"app": "foo",
"superkey": "foobar",
}
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)

Expand All @@ -112,6 +125,8 @@ func TestKubernetesStats(t *testing.T) {
"container_name": "stopped-container",
"namespace": "foons",
"pod_name": "stopped-pod",
"app": "foo-stop",
"superkey": "superfoo",
}
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)

Expand Down Expand Up @@ -143,7 +158,39 @@ func TestKubernetesStats(t *testing.T) {

}

var response = `
var responsePods = `
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {},
"items": [
{
"metadata": {
"name": "foopod",
"namespace": "foons",
"labels": {
"superkey": "foobar",
"app": "foo",
"exclude": "exclude0"
}
}
},
{
"metadata": {
"name": "stopped-pod",
"namespace": "foons",
"labels": {
"superkey": "superfoo",
"app": "foo-stop",
"exclude": "exclude1"
}
}
}
]
}
`

var responseStatsSummery = `
{
"node": {
"nodeName": "node1",
Expand Down

0 comments on commit 9af177e

Please sign in to comment.