From 7da634d5d86075439a7d3287ab56a06cdf0a7268 Mon Sep 17 00:00:00 2001 From: linalinn <10799908+linalinn@users.noreply.github.com> Date: Fri, 6 Dec 2019 14:39:13 +0100 Subject: [PATCH 1/6] Kubernetes PodLabels --- plugins/inputs/kubernetes/kubernetes.go | 133 +++++++++++++----- .../inputs/kubernetes/kubernetes_podlist.go | 15 ++ plugins/inputs/kubernetes/kubernetes_test.go | 53 ++++++- 3 files changed, 163 insertions(+), 38 deletions(-) create mode 100644 plugins/inputs/kubernetes/kubernetes_podlist.go diff --git a/plugins/inputs/kubernetes/kubernetes.go b/plugins/inputs/kubernetes/kubernetes.go index 45093a57bd732..72143b41552df 100644 --- a/plugins/inputs/kubernetes/kubernetes.go +++ b/plugins/inputs/kubernetes/kubernetes.go @@ -3,6 +3,7 @@ package kubernetes import ( "encoding/json" "fmt" + "github.com/influxdata/telegraf/filter" "io/ioutil" "net/http" "net/url" @@ -23,6 +24,11 @@ type Kubernetes struct { BearerToken string `toml:"bearer_token"` BearerTokenString string `toml:"bearer_token_string"` + LabelInclude []string `toml:"label_include"` + LabelExclude []string `toml:"label_exclude"` + + labelFilter filter.Filter + // HTTP Timeout specified as a string - 3s, 1m, 1h ResponseTimeout internal.Duration @@ -42,6 +48,11 @@ var sampleConfig = ` ## OR # bearer_token_string = "abc_123" + # Labels to include and exclude + # An empty array for include and exclude will include all labels + label_include = [] + label_exclude = [] + ## Set response_timeout (default 5 seconds) # response_timeout = "5s" @@ -75,6 +86,7 @@ func (k *Kubernetes) Description() string { } func (k *Kubernetes) Init() error { + // If neither are provided, use the default service account. if k.BearerToken == "" && k.BearerTokenString == "" { k.BearerToken = defaultServiceAccountPath @@ -88,6 +100,12 @@ func (k *Kubernetes) Init() error { k.BearerTokenString = strings.TrimSpace(string(token)) } + labelFilter, err := filter.NewIncludeExcludeFilter(k.LabelInclude, k.LabelExclude) + if err != nil { + return err + } + k.labelFilter = labelFilter + return nil } @@ -107,48 +125,20 @@ func buildURL(endpoint string, base string) (*url.URL, error) { } func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error { - url := fmt.Sprintf("%s/stats/summary", baseURL) - var req, err = http.NewRequest("GET", url, nil) - var resp *http.Response - - tlsCfg, err := k.ClientConfig.TLSConfig() + summaryMetrics := &SummaryMetrics{} + err := k.LoadJson(fmt.Sprintf("%s/stats/summary", baseURL), summaryMetrics) if err != nil { return err } - if k.RoundTripper == nil { - // Set default values - if k.ResponseTimeout.Duration < time.Second { - k.ResponseTimeout.Duration = time.Second * 5 - } - k.RoundTripper = &http.Transport{ - TLSHandshakeTimeout: 5 * time.Second, - TLSClientConfig: tlsCfg, - ResponseHeaderTimeout: k.ResponseTimeout.Duration, - } - } - - req.Header.Set("Authorization", "Bearer "+k.BearerTokenString) - req.Header.Add("Accept", "application/json") - - resp, err = k.RoundTripper.RoundTrip(req) + podInfos, err := k.gatherPodInfo(baseURL) if err != nil { - return fmt.Errorf("error making HTTP request to %s: %s", url, err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) + return err } - summaryMetrics := &SummaryMetrics{} - err = json.NewDecoder(resp.Body).Decode(summaryMetrics) - if err != nil { - return fmt.Errorf(`Error parsing response: %s`, err) - } buildSystemContainerMetrics(summaryMetrics, acc) buildNodeMetrics(summaryMetrics, acc) - buildPodMetrics(summaryMetrics, acc) + buildPodMetrics(baseURL, summaryMetrics, podInfos, acc) return nil } @@ -200,8 +190,74 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) acc.AddFields("kubernetes_node", fields, tags) } -func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) { +func (k *Kubernetes) gatherPodInfo(baseURL string) ([]PodInfo, error) { + var podapi Podlist + err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podapi) + if err != nil { + return nil, err + } + + var podinfo []PodInfo + + for i := 0; i < len(podapi.Items); i++ { + var meta PodInfo + err = json.Unmarshal(podapi.Items[i]["metadata"], &meta) + if err != nil { + fmt.Printf(`Error parsing response: %s\n`, err) + return nil, fmt.Errorf(`Error parsing response: %s`, err) + } + for key := range meta.Labels { + if !k.labelFilter.Match(key) { + delete(meta.Labels, key) + } + } + podinfo = append(podinfo, meta) + + } + + return podinfo, nil + +} + +func (k *Kubernetes) LoadJson(url string, v interface{}) error { + var req, err = http.NewRequest("GET", url, nil) + var resp *http.Response + tlsCfg, err := k.ClientConfig.TLSConfig() + if err != nil { + return err + } + if k.RoundTripper == nil { + if k.ResponseTimeout.Duration < time.Second { + k.ResponseTimeout.Duration = time.Second * 5 + } + k.RoundTripper = &http.Transport{ + TLSHandshakeTimeout: 5 * time.Second, + TLSClientConfig: tlsCfg, + ResponseHeaderTimeout: k.ResponseTimeout.Duration, + } + } + req.Header.Set("Authorization", "Bearer "+k.BearerTokenString) + req.Header.Add("Accept", "application/json") + resp, err = k.RoundTripper.RoundTrip(req) + if err != nil { + return fmt.Errorf("error making HTTP request to %s: %s", url, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) + } + + err = json.NewDecoder(resp.Body).Decode(v) + if err != nil { + return fmt.Errorf(`Error parsing response: %s`, err) + } + + return nil +} + +func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []PodInfo, acc telegraf.Accumulator) error { for _, pod := range summaryMetrics.Pods { + for _, container := range pod.Containers { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, @@ -209,6 +265,14 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) { "container_name": container.Name, "pod_name": pod.PodRef.Name, } + for i := range podInfo { + if podInfo[i].Name == pod.PodRef.Name && podInfo[i].NameSpace == pod.PodRef.Namespace { + for k, v := range podInfo[i].Labels { + tags[k] = v + } + } + } + fields := make(map[string]interface{}) fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds @@ -252,4 +316,5 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) { fields["tx_errors"] = pod.Network.TXErrors acc.AddFields("kubernetes_pod_network", fields, tags) } + return nil } diff --git a/plugins/inputs/kubernetes/kubernetes_podlist.go b/plugins/inputs/kubernetes/kubernetes_podlist.go new file mode 100644 index 0000000000000..21e4199fa1b3b --- /dev/null +++ b/plugins/inputs/kubernetes/kubernetes_podlist.go @@ -0,0 +1,15 @@ +package kubernetes + +import "encoding/json" + +type Podlist struct { + Kind string `json:"kind"` + ApiVersion string `json:"apiVersion"` + Items []map[string]json.RawMessage `json:"items"` +} + +type PodInfo struct { + Name string `json:"name"` + NameSpace string `json:"namespace"` + Labels map[string]string `json:"labels"` +} diff --git a/plugins/inputs/kubernetes/kubernetes_test.go b/plugins/inputs/kubernetes/kubernetes_test.go index 081bca03aa536..84ecb1da353cf 100644 --- a/plugins/inputs/kubernetes/kubernetes_test.go +++ b/plugins/inputs/kubernetes/kubernetes_test.go @@ -2,6 +2,7 @@ package kubernetes import ( "fmt" + "github.com/influxdata/telegraf/filter" "net/http" "net/http/httptest" "testing" @@ -12,13 +13,23 @@ import ( func TestKubernetesStats(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - fmt.Fprintln(w, response) + if r.RequestURI == "/stats/summary" { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, responseStatsSummery) + } + if r.RequestURI == "/pods" { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, responsePods) + } + })) defer ts.Close() + labelFilter, _ := filter.NewIncludeExcludeFilter([]string{"app", "superkey"}, nil) + k := &Kubernetes{ - URL: ts.URL, + URL: ts.URL, + labelFilter: labelFilter, } var acc testutil.Accumulator @@ -89,6 +100,8 @@ func TestKubernetesStats(t *testing.T) { "container_name": "foocontainer", "namespace": "foons", "pod_name": "foopod", + "app": "foo", + "superkey": "foobar", } acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags) @@ -112,6 +125,8 @@ func TestKubernetesStats(t *testing.T) { "container_name": "stopped-container", "namespace": "foons", "pod_name": "stopped-pod", + "app": "foo-stop", + "superkey": "superfoo", } acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags) @@ -143,7 +158,37 @@ func TestKubernetesStats(t *testing.T) { } -var response = ` +var responsePods = ` +{ + "kind": "PodList", + "apiVersion": "v1", + "metadata": {}, + "items": [ + { + "metadata": { + "name": "foopod", + "namespace": "foons", + "labels": { + "superkey": "foobar", + "app": "foo" + } + } + }, + { + "metadata": { + "name": "stopped-pod", + "namespace": "foons", + "labels": { + "superkey": "superfoo", + "app": "foo-stop" + } + } + } + ] +} +` + +var responseStatsSummery = ` { "node": { "nodeName": "node1", From b7be8b469d7a3c8cb6cd9a008c7c9b644c94b5da Mon Sep 17 00:00:00 2001 From: linalinn <10799908+linalinn@users.noreply.github.com> Date: Fri, 6 Dec 2019 14:59:51 +0100 Subject: [PATCH 2/6] Removed unused return value --- plugins/inputs/kubernetes/kubernetes.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/plugins/inputs/kubernetes/kubernetes.go b/plugins/inputs/kubernetes/kubernetes.go index 72143b41552df..21b72c993efdd 100644 --- a/plugins/inputs/kubernetes/kubernetes.go +++ b/plugins/inputs/kubernetes/kubernetes.go @@ -135,7 +135,6 @@ func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) err if err != nil { return err } - buildSystemContainerMetrics(summaryMetrics, acc) buildNodeMetrics(summaryMetrics, acc) buildPodMetrics(baseURL, summaryMetrics, podInfos, acc) @@ -255,9 +254,8 @@ func (k *Kubernetes) LoadJson(url string, v interface{}) error { return nil } -func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []PodInfo, acc telegraf.Accumulator) error { +func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []PodInfo, acc telegraf.Accumulator) { for _, pod := range summaryMetrics.Pods { - for _, container := range pod.Containers { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, @@ -316,5 +314,4 @@ func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []P fields["tx_errors"] = pod.Network.TXErrors acc.AddFields("kubernetes_pod_network", fields, tags) } - return nil } From 0dae34d28f2f459f88181b76c9e32a2d632ded13 Mon Sep 17 00:00:00 2001 From: linalinn <10799908+linalinn@users.noreply.github.com> Date: Fri, 6 Dec 2019 15:16:20 +0100 Subject: [PATCH 3/6] Update README.md --- plugins/inputs/kubernetes/README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/kubernetes/README.md b/plugins/inputs/kubernetes/README.md index a094b7b29203f..fec9fff94c72c 100644 --- a/plugins/inputs/kubernetes/README.md +++ b/plugins/inputs/kubernetes/README.md @@ -1,6 +1,6 @@ # Kubernetes Input Plugin -This input plugin talks to the kubelet api using the `/stats/summary` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet. +This input plugin talks to the kubelet api using the `/stats/summary` and `/pods` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet. To find the ip address of the host you are running on you can issue a command like the following: @@ -44,6 +44,11 @@ avoid cardinality issues: ## OR # bearer_token_string = "abc_123" + # Labels to include and exclude + # An empty array for include and exclude will include all labels + #label_include = [] + #label_exclude = [] + ## Set response_timeout (default 5 seconds) # response_timeout = "5s" From da1438ed2dc1e9ca0e66203ccdb88dab56970a20 Mon Sep 17 00:00:00 2001 From: linalinn <10799908+linalinn@users.noreply.github.com> Date: Mon, 9 Dec 2019 10:52:28 +0100 Subject: [PATCH 4/6] renamed and add label_exclude = ["*"] to init() --- plugins/inputs/kubernetes/README.md | 4 ++-- plugins/inputs/kubernetes/kubernetes.go | 17 ++++++++++------- ...kubernetes_podlist.go => kubernetes_pods.go} | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) rename plugins/inputs/kubernetes/{kubernetes_podlist.go => kubernetes_pods.go} (82%) diff --git a/plugins/inputs/kubernetes/README.md b/plugins/inputs/kubernetes/README.md index fec9fff94c72c..2a286e962ec8b 100644 --- a/plugins/inputs/kubernetes/README.md +++ b/plugins/inputs/kubernetes/README.md @@ -46,8 +46,8 @@ avoid cardinality issues: # Labels to include and exclude # An empty array for include and exclude will include all labels - #label_include = [] - #label_exclude = [] + # label_include = [] + # label_exclude = ["*"] ## Set response_timeout (default 5 seconds) # response_timeout = "5s" diff --git a/plugins/inputs/kubernetes/kubernetes.go b/plugins/inputs/kubernetes/kubernetes.go index 21b72c993efdd..b8215f4aa2218 100644 --- a/plugins/inputs/kubernetes/kubernetes.go +++ b/plugins/inputs/kubernetes/kubernetes.go @@ -50,8 +50,8 @@ var sampleConfig = ` # Labels to include and exclude # An empty array for include and exclude will include all labels - label_include = [] - label_exclude = [] + # label_include = [] + # label_exclude = ["*"] ## Set response_timeout (default 5 seconds) # response_timeout = "5s" @@ -71,7 +71,10 @@ const ( func init() { inputs.Add("kubernetes", func() telegraf.Input { - return &Kubernetes{} + return &Kubernetes{ + LabelInclude: []string{}, + LabelExclude: []string{"*"}, + } }) } @@ -190,7 +193,7 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) } func (k *Kubernetes) gatherPodInfo(baseURL string) ([]PodInfo, error) { - var podapi Podlist + var podapi Pods err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podapi) if err != nil { return nil, err @@ -263,9 +266,9 @@ func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []P "container_name": container.Name, "pod_name": pod.PodRef.Name, } - for i := range podInfo { - if podInfo[i].Name == pod.PodRef.Name && podInfo[i].NameSpace == pod.PodRef.Namespace { - for k, v := range podInfo[i].Labels { + for _, info := range podInfo { + if info.Name == pod.PodRef.Name && info.Namespace == pod.PodRef.Namespace { + for k, v := range info.Labels { tags[k] = v } } diff --git a/plugins/inputs/kubernetes/kubernetes_podlist.go b/plugins/inputs/kubernetes/kubernetes_pods.go similarity index 82% rename from plugins/inputs/kubernetes/kubernetes_podlist.go rename to plugins/inputs/kubernetes/kubernetes_pods.go index 21e4199fa1b3b..9e37b7f797c71 100644 --- a/plugins/inputs/kubernetes/kubernetes_podlist.go +++ b/plugins/inputs/kubernetes/kubernetes_pods.go @@ -2,7 +2,7 @@ package kubernetes import "encoding/json" -type Podlist struct { +type Pods struct { Kind string `json:"kind"` ApiVersion string `json:"apiVersion"` Items []map[string]json.RawMessage `json:"items"` @@ -10,6 +10,6 @@ type Podlist struct { type PodInfo struct { Name string `json:"name"` - NameSpace string `json:"namespace"` + Namespace string `json:"namespace"` Labels map[string]string `json:"labels"` } From 43cafea7ca0fa3dffea20491c57f618db634caf1 Mon Sep 17 00:00:00 2001 From: linalinn <10799908+linalinn@users.noreply.github.com> Date: Mon, 9 Dec 2019 11:16:29 +0100 Subject: [PATCH 5/6] removed second Json Decode --- plugins/inputs/kubernetes/kubernetes.go | 38 +++++++------------- plugins/inputs/kubernetes/kubernetes_pods.go | 14 ++++---- 2 files changed, 20 insertions(+), 32 deletions(-) diff --git a/plugins/inputs/kubernetes/kubernetes.go b/plugins/inputs/kubernetes/kubernetes.go index b8215f4aa2218..2342d5f4d7da0 100644 --- a/plugins/inputs/kubernetes/kubernetes.go +++ b/plugins/inputs/kubernetes/kubernetes.go @@ -140,7 +140,7 @@ func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) err } buildSystemContainerMetrics(summaryMetrics, acc) buildNodeMetrics(summaryMetrics, acc) - buildPodMetrics(baseURL, summaryMetrics, podInfos, acc) + buildPodMetrics(baseURL, summaryMetrics, podInfos, k.labelFilter, acc) return nil } @@ -192,33 +192,17 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) acc.AddFields("kubernetes_node", fields, tags) } -func (k *Kubernetes) gatherPodInfo(baseURL string) ([]PodInfo, error) { - var podapi Pods - err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podapi) +func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Metadata, error) { + var podApi Pods + err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podApi) if err != nil { return nil, err } - - var podinfo []PodInfo - - for i := 0; i < len(podapi.Items); i++ { - var meta PodInfo - err = json.Unmarshal(podapi.Items[i]["metadata"], &meta) - if err != nil { - fmt.Printf(`Error parsing response: %s\n`, err) - return nil, fmt.Errorf(`Error parsing response: %s`, err) - } - for key := range meta.Labels { - if !k.labelFilter.Match(key) { - delete(meta.Labels, key) - } - } - podinfo = append(podinfo, meta) - + var podInfos []Metadata + for _, podMetadata := range podApi.Items { + podInfos = append(podInfos, podMetadata.Metadata) } - - return podinfo, nil - + return podInfos, nil } func (k *Kubernetes) LoadJson(url string, v interface{}) error { @@ -257,7 +241,7 @@ func (k *Kubernetes) LoadJson(url string, v interface{}) error { return nil } -func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []PodInfo, acc telegraf.Accumulator) { +func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFilter filter.Filter, acc telegraf.Accumulator) { for _, pod := range summaryMetrics.Pods { for _, container := range pod.Containers { tags := map[string]string{ @@ -269,7 +253,9 @@ func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []P for _, info := range podInfo { if info.Name == pod.PodRef.Name && info.Namespace == pod.PodRef.Namespace { for k, v := range info.Labels { - tags[k] = v + if labelFilter.Match(k) { + tags[k] = v + } } } } diff --git a/plugins/inputs/kubernetes/kubernetes_pods.go b/plugins/inputs/kubernetes/kubernetes_pods.go index 9e37b7f797c71..672608e54fe25 100644 --- a/plugins/inputs/kubernetes/kubernetes_pods.go +++ b/plugins/inputs/kubernetes/kubernetes_pods.go @@ -1,14 +1,16 @@ package kubernetes -import "encoding/json" - type Pods struct { - Kind string `json:"kind"` - ApiVersion string `json:"apiVersion"` - Items []map[string]json.RawMessage `json:"items"` + Kind string `json:"kind"` + ApiVersion string `json:"apiVersion"` + Items []Item `json:"items"` +} + +type Item struct { + Metadata Metadata `json:"metadata"` } -type PodInfo struct { +type Metadata struct { Name string `json:"name"` Namespace string `json:"namespace"` Labels map[string]string `json:"labels"` From d81e986a678e8f71f9a07f081f1f4ef843e402a9 Mon Sep 17 00:00:00 2001 From: linalinn <10799908+linalinn@users.noreply.github.com> Date: Mon, 9 Dec 2019 11:23:49 +0100 Subject: [PATCH 6/6] Add exclude Label to UnitTest --- plugins/inputs/kubernetes/kubernetes_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/kubernetes/kubernetes_test.go b/plugins/inputs/kubernetes/kubernetes_test.go index 84ecb1da353cf..faf40be3e1000 100644 --- a/plugins/inputs/kubernetes/kubernetes_test.go +++ b/plugins/inputs/kubernetes/kubernetes_test.go @@ -170,7 +170,8 @@ var responsePods = ` "namespace": "foons", "labels": { "superkey": "foobar", - "app": "foo" + "app": "foo", + "exclude": "exclude0" } } }, @@ -180,7 +181,8 @@ var responsePods = ` "namespace": "foons", "labels": { "superkey": "superfoo", - "app": "foo-stop" + "app": "foo-stop", + "exclude": "exclude1" } } }