From 8d1a55203ed13ec94dc4fb226000631039b2272a Mon Sep 17 00:00:00 2001 From: Soren Mathiasen Date: Fri, 16 Mar 2018 10:43:51 +0200 Subject: [PATCH 01/13] Add scraping for Prometheus endpoint in Kubernetes This will allow users to add Prometheus annotations in services in Kubernetes, and have telegraf scan for them and add them to the list of endpoints to collect metrics from --- Godeps | 16 +++ plugins/inputs/prometheus/README.md | 16 +++ plugins/inputs/prometheus/kubernetes.go | 116 +++++++++++++++++++ plugins/inputs/prometheus/kubernetes_test.go | 83 +++++++++++++ plugins/inputs/prometheus/prometheus.go | 54 +++++++-- plugins/inputs/prometheus/prometheus_test.go | 4 + 6 files changed, 279 insertions(+), 10 deletions(-) create mode 100644 plugins/inputs/prometheus/kubernetes.go create mode 100644 plugins/inputs/prometheus/kubernetes_test.go diff --git a/Godeps b/Godeps index 4b5dd401221ef..cd08b6a28e905 100644 --- a/Godeps +++ b/Godeps @@ -1,4 +1,8 @@ collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 +github.com/PuerkitoBio/purell fd18e053af8a4ff11039269006e8037ff374ce0e +github.com/PuerkitoBio/urlesc de5bf2ad457846296e2031421a34e2568e304e35 +github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65 +github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07 @@ -17,15 +21,22 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483 +github.com/emicklei/go-restful 2dd44038f0b95ae693b266c5f87593b5d2fdd78d github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 +github.com/go-openapi/jsonpointer 779f45308c19820f1a69e9a4cd965f496e0da10f +github.com/go-openapi/jsonreference 36d33bfe519efae5632669801b180bf1a245da3b +github.com/go-openapi/spec a4fa9574c7aa73b2fc54e251eb9524d0482bb592 +github.com/go-openapi/swag cf0bdb963811675a4d7e74901cefc7411a1df939 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 +github.com/golang/glog 23def4e6c14b4da8ac2ed8007337bc5eb5007998 github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93 github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc +github.com/google/gofuzz 24818f796faf91cd76ec7bddd72458fbced7a6c1 github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 @@ -39,6 +50,7 @@ github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 +github.com/mailru/easyjson 5f62e4f3afa2f576dc86531b7df4d966b19ef8f8 github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34 github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 @@ -93,3 +105,7 @@ gopkg.in/mgo.v2 3f83fa5005286a7fe593b055f0d7771a7dce4655 gopkg.in/olivere/elastic.v5 3113f9b9ad37509fe5f8a0e5e91c96fdc4435e26 gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8 gopkg.in/yaml.v2 4c78c975fe7c825c6d1466c42be594d1d6f3aba6 +k8s.io/api 5584376ceeffeb13a2e98b5e9f0e9dab37de4bab +k8s.io/apimachinery 18a564baac720819100827c16fdebcadb05b2d0d +k8s.io/client-go a3c9a30b35fc56f709f4428827b807c90af99fc3 +k8s.io/kube-openapi 39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1 \ No newline at end of file diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index ac7405014a8ce..66bfb681a3c55 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -14,6 +14,12 @@ in Prometheus format. ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] + # Scrape Kubernetes service for prometheus annotations. + # prometheus.io/scrape: Enable scraping for this service + # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + # prometheus.io/port: If port is not 9102 use this annotation + # kubernetes_scraping = true + ## Use bearer token for authorization # bearer_token = /path/to/bearer/token @@ -37,6 +43,16 @@ by looking up all A records assigned to the hostname as described in This method can be used to locate all [Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). +#### Kubernetes scraping + +Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes +pods. +Currently the following annotation are supported: + +* `prometheus.io/scrape` Enable scraping for this pod +* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default metrics). +* `prometheus.io/port` Used to override the port, the default value is 9102 + #### Bearer Token If set, the file specified by the `bearer_token` parameter will be read on diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go new file mode 100644 index 0000000000000..b30221c54f308 --- /dev/null +++ b/plugins/inputs/prometheus/kubernetes.go @@ -0,0 +1,116 @@ +package prometheus + +import ( + "fmt" + "log" + "strings" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" +) + +func start(p *Prometheus) error { + config, err := rest.InClusterConfig() + if err != nil { + return err + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return err + } + watchlist := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything()) + _, controller := cache.NewInformer( + watchlist, + &v1.Pod{}, + time.Second*0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + registerPod(pod, p) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + unregisterPod(pod, p) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + podPod := oldObj.(*v1.Pod) + newPod := newObj.(*v1.Pod) + unregisterPod(podPod, p) + registerPod(newPod, p) + }, + }, + ) + + go controller.Run(wait.NeverStop) + return nil +} + +func registerPod(pod *v1.Pod, p *Prometheus) { + url := scrapeURL(pod) + if url != nil { + log.Printf("Will scrape metrics from %v\n", *url) + p.lock.Lock() + // add annotation as metrics tags + tags := pod.GetAnnotations() + tags["pod_name"] = pod.Name + tags["namespace"] = pod.Namespace + // add labels as metrics tags + for k, v := range pod.GetLabels() { + tags[k] = v + } + p.KubernetesPods = append(p.KubernetesPods, Target{url: *url, tags: tags}) + p.lock.Unlock() + } +} + +func scrapeURL(pod *v1.Pod) *string { + scrape := pod.ObjectMeta.Annotations["prometheus.io/scrape"] + if pod.Status.PodIP == "" { + // return as if scrape was disabled, we will be notified again once the pod + // has an IP + return nil + } + if scrape == "true" { + path := pod.ObjectMeta.Annotations["prometheus.io/path"] + port := pod.ObjectMeta.Annotations["prometheus.io/port"] + if port == "" { + port = "9102" // default + } + if path == "" { + path = "/metrics" + } + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + + ip := pod.Status.PodIP + x := fmt.Sprintf("http://%v:%v%v", ip, port, path) + return &x + } + return nil +} + +func unregisterPod(pod *v1.Pod, p *Prometheus) { + url := scrapeURL(pod) + if url != nil { + p.lock.Lock() + defer p.lock.Unlock() + log.Printf("Registred a delete request for %v in namespace '%v'\n", pod.Name, pod.Namespace) + var result []Target + for _, v := range p.KubernetesPods { + if v.url != *url { + result = append(result, v) + } else { + log.Printf("Will stop scraping for %v\n", *url) + } + + } + p.KubernetesPods = result + } +} diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go new file mode 100644 index 0000000000000..2e3caba38e06e --- /dev/null +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -0,0 +1,83 @@ +package prometheus + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestScrapeURLNoAnnotations(t *testing.T) { + p := &v1.Pod{} + p.Annotations = map[string]string{} + url := scrapeURL(p) + assert.Nil(t, url) +} +func TestScrapeURLAnnotationsNoScrape(t *testing.T) { + p := &v1.Pod{} + p.Name = "myPod" + p.Annotations = map[string]string{"prometheus.io/scrape": "false"} + url := scrapeURL(p) + assert.Nil(t, url) +} +func TestScrapeURLAnnotations(t *testing.T) { + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + url := scrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) +} +func TestScrapeURLAnnotationsCustomPort(t *testing.T) { + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} + url := scrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) +} +func TestScrapeURLAnnotationsCustomPath(t *testing.T) { + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} + url := scrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) +} + +func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} + url := scrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) +} + +func TestAddPod(t *testing.T) { + prom := &Prometheus{lock: &sync.Mutex{}} + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + assert.Equal(t, 1, len(prom.KubernetesPods)) +} +func TestAddMultiplePods(t *testing.T) { + prom := &Prometheus{lock: &sync.Mutex{}} + + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + p.Name = "Pod2" + registerPod(p, prom) + assert.Equal(t, 2, len(prom.KubernetesPods)) +} +func TestDeletePods(t *testing.T) { + prom := &Prometheus{lock: &sync.Mutex{}} + + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + unregisterPod(p, prom) + assert.Equal(t, 0, len(prom.KubernetesPods)) +} + +func pod() *v1.Pod { + p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default"}} + p.Status.PodIP = "127.0.0.1" + p.Name = "myPod" + return p +} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 2a8a6b284b206..1a7bfe12b5c14 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -18,6 +18,10 @@ import ( const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3` +type Target struct { + url string + tags map[string]string +} type Prometheus struct { // An array of urls to scrape metrics from. URLs []string `toml:"urls"` @@ -25,6 +29,11 @@ type Prometheus struct { // An array of Kubernetes services to scrape metrics from. KubernetesServices []string + // Should we scrape Kubernetes services for prometheus annotations + KubernetesScraping bool `toml:"kubernetes_scraping"` + lock *sync.Mutex + KubernetesPods []Target + // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` @@ -48,6 +57,12 @@ var sampleConfig = ` ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] + + # Scrape Kubernetes pods for prometheus annotations. + # prometheus.io/scrape: Enable scraping for this pod + # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + # prometheus.io/port: If port is not 9102 use this annotation + # kubernetes_scraping = true ## Use bearer token for authorization # bearer_token = /path/to/bearer/token @@ -96,6 +111,7 @@ type URLAndAddress struct { OriginalURL *url.URL URL *url.URL Address string + Tags map[string]string } func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { @@ -109,11 +125,25 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL}) } + p.lock.Lock() + defer p.lock.Unlock() + // loop through all pods scraped via the prometheus annotation on the pods + for _, pod := range p.KubernetesPods { + URL, err := url.Parse(pod.url) + if err != nil { + log.Printf("prometheus: Could not parse url %s, skipping it. Error: %s", pod.url, err) + continue + } + podURL := p.AddressToURL(URL, URL.Hostname()) + allURLs = append(allURLs, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: pod.tags}) + } + for _, service := range p.KubernetesServices { URL, err := url.Parse(service) if err != nil { return nil, err } + resolvedAddresses, err := net.LookupHost(URL.Hostname()) if err != nil { log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err) @@ -157,15 +187,6 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error { return nil } -var tr = &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), -} - -var client = &http.Client{ - Transport: tr, - Timeout: time.Duration(4 * time.Second), -} - func (p *Prometheus) createHttpClient() (*http.Client, error) { tlsCfg, err := internal.GetTLSConfig( p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify) @@ -226,6 +247,9 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error if u.Address != "" { tags["address"] = u.Address } + for k, v := range u.Tags { + tags[k] = v + } switch metric.Type() { case telegraf.Counter: @@ -244,8 +268,18 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error return nil } +// Start will start the Kubernetes scraping if enabled in the configuration +func (p *Prometheus) Start(a telegraf.Accumulator) error { + if p.KubernetesScraping { + return start(p) + } + return nil +} + +func (p *Prometheus) Stop() {} + func init() { inputs.Add("prometheus", func() telegraf.Input { - return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}} + return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}, lock: &sync.Mutex{}} }) } diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index 9a2982ff989bf..b16e020451684 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "sync" "testing" "time" @@ -37,6 +38,7 @@ func TestPrometheusGeneratesMetrics(t *testing.T) { defer ts.Close() p := &Prometheus{ + lock: &sync.Mutex{}, URLs: []string{ts.URL}, } @@ -60,6 +62,7 @@ func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) { defer ts.Close() p := &Prometheus{ + lock: &sync.Mutex{}, KubernetesServices: []string{ts.URL}, } u, _ := url.Parse(ts.URL) @@ -89,6 +92,7 @@ func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) { defer ts.Close() p := &Prometheus{ + lock: &sync.Mutex{}, URLs: []string{ts.URL}, KubernetesServices: []string{"http://random.telegraf.local:88/metrics"}, } From 7cdc0e7001d7808256083f2fb03f50eab0ae5ae8 Mon Sep 17 00:00:00 2001 From: Soren Mathiasen Date: Mon, 30 Apr 2018 16:47:42 +0200 Subject: [PATCH 02/13] fixup! Add scraping for Prometheus endpoint in Kubernetes --- Godeps | 18 +-- plugins/inputs/prometheus/README.md | 2 +- plugins/inputs/prometheus/kubernetes.go | 161 ++++++++++++------- plugins/inputs/prometheus/kubernetes_test.go | 48 +++--- plugins/inputs/prometheus/prometheus.go | 36 ++--- 5 files changed, 147 insertions(+), 118 deletions(-) diff --git a/Godeps b/Godeps index cd08b6a28e905..07421e929ce7e 100644 --- a/Godeps +++ b/Godeps @@ -1,8 +1,4 @@ collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 -github.com/PuerkitoBio/purell fd18e053af8a4ff11039269006e8037ff374ce0e -github.com/PuerkitoBio/urlesc de5bf2ad457846296e2031421a34e2568e304e35 -github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65 -github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07 @@ -21,22 +17,17 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483 -github.com/emicklei/go-restful 2dd44038f0b95ae693b266c5f87593b5d2fdd78d +github.com/ericchiang/k8s 677cf3318ef83bf681a38821f81a233a9be09641 +github.com/ghodss/yaml 0ca9ea5df5451ffdf184b4428c902747c2c11cd7 github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 -github.com/go-openapi/jsonpointer 779f45308c19820f1a69e9a4cd965f496e0da10f -github.com/go-openapi/jsonreference 36d33bfe519efae5632669801b180bf1a245da3b -github.com/go-openapi/spec a4fa9574c7aa73b2fc54e251eb9524d0482bb592 -github.com/go-openapi/swag cf0bdb963811675a4d7e74901cefc7411a1df939 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 -github.com/golang/glog 23def4e6c14b4da8ac2ed8007337bc5eb5007998 github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93 github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc -github.com/google/gofuzz 24818f796faf91cd76ec7bddd72458fbced7a6c1 github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 @@ -50,7 +41,6 @@ github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 -github.com/mailru/easyjson 5f62e4f3afa2f576dc86531b7df4d966b19ef8f8 github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34 github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 @@ -105,7 +95,3 @@ gopkg.in/mgo.v2 3f83fa5005286a7fe593b055f0d7771a7dce4655 gopkg.in/olivere/elastic.v5 3113f9b9ad37509fe5f8a0e5e91c96fdc4435e26 gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8 gopkg.in/yaml.v2 4c78c975fe7c825c6d1466c42be594d1d6f3aba6 -k8s.io/api 5584376ceeffeb13a2e98b5e9f0e9dab37de4bab -k8s.io/apimachinery 18a564baac720819100827c16fdebcadb05b2d0d -k8s.io/client-go a3c9a30b35fc56f709f4428827b807c90af99fc3 -k8s.io/kube-openapi 39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1 \ No newline at end of file diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 66bfb681a3c55..0d91ae469abd1 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -18,7 +18,7 @@ in Prometheus format. # prometheus.io/scrape: Enable scraping for this service # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. # prometheus.io/port: If port is not 9102 use this annotation - # kubernetes_scraping = true + # monitor_kubernetes_pods = true ## Use bearer token for authorization # bearer_token = /path/to/bearer/token diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index b30221c54f308..102ec9e0dd576 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -1,84 +1,129 @@ package prometheus import ( + "context" "fmt" + "io/ioutil" "log" + "net/url" + "os" "strings" - "time" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/rest" + "github.com/ericchiang/k8s" + corev1 "github.com/ericchiang/k8s/apis/core/v1" + "github.com/ghodss/yaml" ) -func start(p *Prometheus) error { - config, err := rest.InClusterConfig() +// loadClient parses a kubeconfig from a file and returns a Kubernetes +// client. It does not support extensions or client auth providers. +func loadClient(kubeconfigPath string) (*k8s.Client, error) { + data, err := ioutil.ReadFile(kubeconfigPath) if err != nil { - return err + return nil, fmt.Errorf("read kubeconfig: %v", err) } - clientset, err := kubernetes.NewForConfig(config) + + // Unmarshal YAML into a Kubernetes config object. + var config k8s.Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("unmarshal kubeconfig: %v", err) + } + return k8s.NewClient(&config) +} + +func start(p *Prometheus) error { + client, err := k8s.NewInClusterClient() if err != nil { - return err + client, err = loadClient(fmt.Sprintf("%v/.kube/config", os.Getenv("HOME"))) + if err != nil { + log.Fatal(err) + } } - watchlist := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything()) - _, controller := cache.NewInformer( - watchlist, - &v1.Pod{}, - time.Second*0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - registerPod(pod, p) - }, - DeleteFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - unregisterPod(pod, p) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - podPod := oldObj.(*v1.Pod) - newPod := newObj.(*v1.Pod) - unregisterPod(podPod, p) - registerPod(newPod, p) - }, - }, - ) + type payload struct { + eventype string + pod *corev1.Pod + } + in := make(chan payload) + go func() { + var pod corev1.Pod + watcher, err := client.Watch(context.Background(), "", &pod) + if err != nil { + log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err) + } + defer watcher.Close() + + for { + cm := new(corev1.Pod) + eventType, err := watcher.Next(cm) + if err != nil { + log.Println() + } + in <- payload{eventType, cm} + } + }() + + go func() { + for { + select { + case <-p.done: + log.Printf("I! [inputs.prometheus] shutting dow\n") + return + case payload := <-in: + cm := payload.pod + eventType := payload.eventype + + if err != nil { + log.Printf("E! [inputs.prometheus] watcher encountered and error: %v", err) + break + } + switch eventType { + case k8s.EventAdded: + registerPod(cm, p) + case k8s.EventDeleted: + unregisterPod(cm, p) + case k8s.EventModified: + } + } + } + }() - go controller.Run(wait.NeverStop) return nil } -func registerPod(pod *v1.Pod, p *Prometheus) { - url := scrapeURL(pod) - if url != nil { - log.Printf("Will scrape metrics from %v\n", *url) +func registerPod(pod *corev1.Pod, p *Prometheus) { + targetURL := scrapeURL(pod) + if targetURL != nil { + log.Printf("I! [inputs.prometheus] will scrape metrics from %v\n", *targetURL) p.lock.Lock() // add annotation as metrics tags - tags := pod.GetAnnotations() - tags["pod_name"] = pod.Name - tags["namespace"] = pod.Namespace + tags := pod.GetMetadata().GetAnnotations() + tags["pod_name"] = pod.GetMetadata().GetName() + tags["namespace"] = pod.GetMetadata().GetNamespace() // add labels as metrics tags - for k, v := range pod.GetLabels() { + for k, v := range pod.GetMetadata().GetLabels() { tags[k] = v } - p.KubernetesPods = append(p.KubernetesPods, Target{url: *url, tags: tags}) + URL, err := url.Parse(*targetURL) + if err != nil { + log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", targetURL, err) + return + } + podURL := p.AddressToURL(URL, URL.Hostname()) + p.kubernetesPods = append(p.kubernetesPods, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: tags}) p.lock.Unlock() } } -func scrapeURL(pod *v1.Pod) *string { - scrape := pod.ObjectMeta.Annotations["prometheus.io/scrape"] - if pod.Status.PodIP == "" { +func scrapeURL(pod *corev1.Pod) *string { + scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] + if pod.Status.GetPodIP() == "" { // return as if scrape was disabled, we will be notified again once the pod // has an IP + log.Println("pod doesn't have an IP") return nil } if scrape == "true" { - path := pod.ObjectMeta.Annotations["prometheus.io/path"] - port := pod.ObjectMeta.Annotations["prometheus.io/port"] + path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"] + port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"] if port == "" { port = "9102" // default } @@ -89,28 +134,28 @@ func scrapeURL(pod *v1.Pod) *string { path = "/" + path } - ip := pod.Status.PodIP + ip := pod.Status.GetPodIP() x := fmt.Sprintf("http://%v:%v%v", ip, port, path) return &x } return nil } -func unregisterPod(pod *v1.Pod, p *Prometheus) { +func unregisterPod(pod *corev1.Pod, p *Prometheus) { url := scrapeURL(pod) if url != nil { p.lock.Lock() defer p.lock.Unlock() - log.Printf("Registred a delete request for %v in namespace '%v'\n", pod.Name, pod.Namespace) - var result []Target - for _, v := range p.KubernetesPods { - if v.url != *url { + log.Printf("D! [inputs.prometheus] registred a delete request for %v in namespace %v\n", pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) + var result []URLAndAddress + for _, v := range p.kubernetesPods { + if v.URL.String() != *url { result = append(result, v) } else { - log.Printf("Will stop scraping for %v\n", *url) + log.Printf("D! [inputs.prometheus] will stop scraping for %v\n", *url) } } - p.KubernetesPods = result + p.kubernetesPods = result } } diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go index 2e3caba38e06e..4e85c198d2534 100644 --- a/plugins/inputs/prometheus/kubernetes_test.go +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -5,45 +5,46 @@ import ( "testing" "github.com/stretchr/testify/assert" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + v1 "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" ) func TestScrapeURLNoAnnotations(t *testing.T) { - p := &v1.Pod{} - p.Annotations = map[string]string{} + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} + p.GetMetadata().Annotations = map[string]string{} url := scrapeURL(p) assert.Nil(t, url) } func TestScrapeURLAnnotationsNoScrape(t *testing.T) { - p := &v1.Pod{} - p.Name = "myPod" - p.Annotations = map[string]string{"prometheus.io/scrape": "false"} + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} + p.Metadata.Name = str("myPod") + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"} url := scrapeURL(p) assert.Nil(t, url) } func TestScrapeURLAnnotations(t *testing.T) { p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} url := scrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) } func TestScrapeURLAnnotationsCustomPort(t *testing.T) { p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} url := scrapeURL(p) assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) } func TestScrapeURLAnnotationsCustomPath(t *testing.T) { p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} url := scrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) } func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} url := scrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) } @@ -51,33 +52,38 @@ func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { func TestAddPod(t *testing.T) { prom := &Prometheus{lock: &sync.Mutex{}} p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - assert.Equal(t, 1, len(prom.KubernetesPods)) + assert.Equal(t, 1, len(prom.kubernetesPods)) } func TestAddMultiplePods(t *testing.T) { prom := &Prometheus{lock: &sync.Mutex{}} p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - p.Name = "Pod2" + p.Metadata.Name = str("Pod2") registerPod(p, prom) - assert.Equal(t, 2, len(prom.KubernetesPods)) + assert.Equal(t, 2, len(prom.kubernetesPods)) } func TestDeletePods(t *testing.T) { prom := &Prometheus{lock: &sync.Mutex{}} p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) unregisterPod(p, prom) - assert.Equal(t, 0, len(prom.KubernetesPods)) + assert.Equal(t, 0, len(prom.kubernetesPods)) } func pod() *v1.Pod { - p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default"}} - p.Status.PodIP = "127.0.0.1" - p.Name = "myPod" + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}} + p.Status.PodIP = str("127.0.0.1") + p.Metadata.Name = str("myPod") + p.Metadata.Namespace = str("default") return p } + +func str(x string) *string { + return &x +} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 1a7bfe12b5c14..13b485e07d771 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -18,10 +18,6 @@ import ( const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3` -type Target struct { - url string - tags map[string]string -} type Prometheus struct { // An array of urls to scrape metrics from. URLs []string `toml:"urls"` @@ -29,11 +25,6 @@ type Prometheus struct { // An array of Kubernetes services to scrape metrics from. KubernetesServices []string - // Should we scrape Kubernetes services for prometheus annotations - KubernetesScraping bool `toml:"kubernetes_scraping"` - lock *sync.Mutex - KubernetesPods []Target - // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` @@ -49,6 +40,12 @@ type Prometheus struct { InsecureSkipVerify bool client *http.Client + + // Should we scrape Kubernetes services for prometheus annotations + MonitorPods bool `toml:"monitor_kubernetes_pods"` + lock *sync.Mutex + kubernetesPods []URLAndAddress + done chan struct{} } var sampleConfig = ` @@ -62,7 +59,7 @@ var sampleConfig = ` # prometheus.io/scrape: Enable scraping for this pod # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. # prometheus.io/port: If port is not 9102 use this annotation - # kubernetes_scraping = true + # monitor_kubernetes_pods = true ## Use bearer token for authorization # bearer_token = /path/to/bearer/token @@ -128,15 +125,7 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { p.lock.Lock() defer p.lock.Unlock() // loop through all pods scraped via the prometheus annotation on the pods - for _, pod := range p.KubernetesPods { - URL, err := url.Parse(pod.url) - if err != nil { - log.Printf("prometheus: Could not parse url %s, skipping it. Error: %s", pod.url, err) - continue - } - podURL := p.AddressToURL(URL, URL.Hostname()) - allURLs = append(allURLs, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: pod.tags}) - } + allURLs = append(allURLs, p.kubernetesPods...) for _, service := range p.KubernetesServices { URL, err := url.Parse(service) @@ -270,16 +259,19 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error // Start will start the Kubernetes scraping if enabled in the configuration func (p *Prometheus) Start(a telegraf.Accumulator) error { - if p.KubernetesScraping { + if p.MonitorPods { return start(p) } return nil } -func (p *Prometheus) Stop() {} +func (p *Prometheus) Stop() { + close(p.done) +} func init() { inputs.Add("prometheus", func() telegraf.Input { - return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}, lock: &sync.Mutex{}} + return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}, + lock: &sync.Mutex{}} }) } From 3dff47fc7c852971794884b61501abdad6d99997 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Mon, 8 Oct 2018 16:35:34 -0600 Subject: [PATCH 03/13] Start addressing feedback and requested changes --- plugins/inputs/prometheus/kubernetes.go | 133 +++++++++++++----------- 1 file changed, 72 insertions(+), 61 deletions(-) diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 102ec9e0dd576..38b57454d25fc 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -19,13 +19,13 @@ import ( func loadClient(kubeconfigPath string) (*k8s.Client, error) { data, err := ioutil.ReadFile(kubeconfigPath) if err != nil { - return nil, fmt.Errorf("read kubeconfig: %v", err) + return nil, fmt.Errorf("read kubeconfig: %s", err.Error()) } // Unmarshal YAML into a Kubernetes config object. var config k8s.Config if err := yaml.Unmarshal(data, &config); err != nil { - return nil, fmt.Errorf("unmarshal kubeconfig: %v", err) + return nil, fmt.Errorf("unmarshal kubeconfig: %s", err.Error()) } return k8s.NewClient(&config) } @@ -33,9 +33,10 @@ func loadClient(kubeconfigPath string) (*k8s.Client, error) { func start(p *Prometheus) error { client, err := k8s.NewInClusterClient() if err != nil { + // TODO: (glinton) kubeconfig file option in config client, err = loadClient(fmt.Sprintf("%v/.kube/config", os.Getenv("HOME"))) if err != nil { - log.Fatal(err) + return err } } type payload struct { @@ -43,11 +44,13 @@ func start(p *Prometheus) error { pod *corev1.Pod } in := make(chan payload) + // TODO: (glinton) make sure this isn't leaked go func() { var pod corev1.Pod + // TODO: (glinton) reconnect watcher, track new pods watcher, err := client.Watch(context.Background(), "", &pod) if err != nil { - log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err) + log.Printf("E! [inputs.prometheus] unable to watch resources: %s", err.Error()) } defer watcher.Close() @@ -55,7 +58,7 @@ func start(p *Prometheus) error { cm := new(corev1.Pod) eventType, err := watcher.Next(cm) if err != nil { - log.Println() + log.Printf("E! [inputs.prometheus] unable to watch next: %s", err.Error()) } in <- payload{eventType, cm} } @@ -65,16 +68,12 @@ func start(p *Prometheus) error { for { select { case <-p.done: - log.Printf("I! [inputs.prometheus] shutting dow\n") + log.Printf("I! [inputs.prometheus] shutting down\n") return case payload := <-in: cm := payload.pod eventType := payload.eventype - if err != nil { - log.Printf("E! [inputs.prometheus] watcher encountered and error: %v", err) - break - } switch eventType { case k8s.EventAdded: registerPod(cm, p) @@ -90,72 +89,84 @@ func start(p *Prometheus) error { } func registerPod(pod *corev1.Pod, p *Prometheus) { - targetURL := scrapeURL(pod) - if targetURL != nil { - log.Printf("I! [inputs.prometheus] will scrape metrics from %v\n", *targetURL) - p.lock.Lock() - // add annotation as metrics tags - tags := pod.GetMetadata().GetAnnotations() - tags["pod_name"] = pod.GetMetadata().GetName() - tags["namespace"] = pod.GetMetadata().GetNamespace() - // add labels as metrics tags - for k, v := range pod.GetMetadata().GetLabels() { - tags[k] = v - } - URL, err := url.Parse(*targetURL) - if err != nil { - log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", targetURL, err) - return - } - podURL := p.AddressToURL(URL, URL.Hostname()) - p.kubernetesPods = append(p.kubernetesPods, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: tags}) - p.lock.Unlock() + targetURL := getScrapeURL(pod) + if targetURL == nil { + return + } + + log.Printf("I! [inputs.prometheus] will scrape metrics from %v\n", *targetURL) + // add annotation as metrics tags + tags := pod.GetMetadata().GetAnnotations() + tags["pod_name"] = pod.GetMetadata().GetName() + tags["namespace"] = pod.GetMetadata().GetNamespace() + // add labels as metrics tags + for k, v := range pod.GetMetadata().GetLabels() { + tags[k] = v } + URL, err := url.Parse(*targetURL) + if err != nil { + log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", targetURL, err) + return + } + podURL := p.AddressToURL(URL, URL.Hostname()) + p.lock.Lock() + p.kubernetesPods = append(p.kubernetesPods, + URLAndAddress{ + URL: podURL, + Address: URL.Hostname(), + OriginalURL: URL, + Tags: tags}) + p.lock.Unlock() } -func scrapeURL(pod *corev1.Pod) *string { +func getScrapeURL(pod *corev1.Pod) *string { scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] + if scrape != "true" { + return nil + } if pod.Status.GetPodIP() == "" { // return as if scrape was disabled, we will be notified again once the pod // has an IP - log.Println("pod doesn't have an IP") return nil } - if scrape == "true" { - path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"] - port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"] - if port == "" { - port = "9102" // default - } - if path == "" { - path = "/metrics" - } - if !strings.HasPrefix(path, "/") { - path = "/" + path - } - ip := pod.Status.GetPodIP() - x := fmt.Sprintf("http://%v:%v%v", ip, port, path) - return &x + path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"] + port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"] + if port == "" { + port = "9102" // default } - return nil + if path == "" { + path = "/metrics" + } + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + + ip := pod.Status.GetPodIP() + // TODO: (glinton) use url and specify scheme + x := fmt.Sprintf("http://%s:%s%s", ip, port, path) + + return &x } func unregisterPod(pod *corev1.Pod, p *Prometheus) { - url := scrapeURL(pod) - if url != nil { - p.lock.Lock() - defer p.lock.Unlock() - log.Printf("D! [inputs.prometheus] registred a delete request for %v in namespace %v\n", pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) - var result []URLAndAddress - for _, v := range p.kubernetesPods { - if v.URL.String() != *url { - result = append(result, v) - } else { - log.Printf("D! [inputs.prometheus] will stop scraping for %v\n", *url) - } + url := getScrapeURL(pod) + if url == nil { + return + } + p.lock.Lock() + defer p.lock.Unlock() + log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s\n", + pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) + var result []URLAndAddress + for _, v := range p.kubernetesPods { + if v.URL.String() != *url { + result = append(result, v) + } else { + log.Printf("D! [inputs.prometheus] will stop scraping for %v\n", *url) } - p.kubernetesPods = result + } + p.kubernetesPods = result } From 43eaf995626eb13af2fa59e890534c123fb702a8 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 24 Oct 2018 10:02:07 -0600 Subject: [PATCH 04/13] Continue implementing suggested changes --- Gopkg.lock | 30 ++++++++++++++++++++++ plugins/inputs/prometheus/kubernetes.go | 33 ++++++++++++++++--------- plugins/inputs/prometheus/prometheus.go | 3 +++ 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 80a6277bd4211..232a032f641aa 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -355,6 +355,32 @@ revision = "36d01c2b4cbeb3d2a12063e4880ce30800af9560" version = "v1.1.1" +[[projects]] + digest = "1:99a0607f79d36202b64b674c0464781549917cfc4bfb88037aaa98b31e124a18" + name = "github.com/ericchiang/k8s" + packages = [ + ".", + "apis/apiextensions/v1beta1", + "apis/core/v1", + "apis/meta/v1", + "apis/resource", + "runtime", + "runtime/schema", + "util/intstr", + "watch/versioned", + ] + pruneopts = "" + revision = "d1bbc0cffaf9849ddcae7b9efffae33e2dd52e9a" + version = "v1.2.0" + +[[projects]] + digest = "1:b13707423743d41665fd23f0c36b2f37bb49c30e94adb813319c44188a51ba22" + name = "github.com/ghodss/yaml" + packages = ["."] + pruneopts = "" + revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7" + version = "v1.0.0" + [[projects]] digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356" name = "github.com/go-ini/ini" @@ -1331,6 +1357,10 @@ "github.com/docker/docker/api/types/swarm", "github.com/docker/docker/client", "github.com/eclipse/paho.mqtt.golang", + "github.com/ericchiang/k8s", + "github.com/ericchiang/k8s/apis/core/v1", + "github.com/ericchiang/k8s/apis/meta/v1", + "github.com/ghodss/yaml", "github.com/go-logfmt/logfmt", "github.com/go-redis/redis", "github.com/go-sql-driver/mysql", diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 38b57454d25fc..1f68e24d5894c 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -33,8 +33,11 @@ func loadClient(kubeconfigPath string) (*k8s.Client, error) { func start(p *Prometheus) error { client, err := k8s.NewInClusterClient() if err != nil { - // TODO: (glinton) kubeconfig file option in config - client, err = loadClient(fmt.Sprintf("%v/.kube/config", os.Getenv("HOME"))) + configLocation := fmt.Sprintf("%s/.kube/config", os.Getenv("HOME")) + if p.KubeConfig != "" { + configLocation = p.KubeConfig + } + client, err = loadClient(configLocation) if err != nil { return err } @@ -43,11 +46,11 @@ func start(p *Prometheus) error { eventype string pod *corev1.Pod } + in := make(chan payload) - // TODO: (glinton) make sure this isn't leaked go func() { var pod corev1.Pod - // TODO: (glinton) reconnect watcher, track new pods + rewatch: watcher, err := client.Watch(context.Background(), "", &pod) if err != nil { log.Printf("E! [inputs.prometheus] unable to watch resources: %s", err.Error()) @@ -55,12 +58,19 @@ func start(p *Prometheus) error { defer watcher.Close() for { - cm := new(corev1.Pod) - eventType, err := watcher.Next(cm) - if err != nil { - log.Printf("E! [inputs.prometheus] unable to watch next: %s", err.Error()) + select { + case <-p.done: + log.Printf("I! [inputs.prometheus] shutting down\n") + return + default: + cm := new(corev1.Pod) + eventType, err := watcher.Next(cm) + if err != nil { + log.Printf("D! [inputs.prometheus] unable to watch next: %s", err.Error()) + goto rewatch + } + in <- payload{eventType, cm} } - in <- payload{eventType, cm} } }() @@ -124,7 +134,8 @@ func getScrapeURL(pod *corev1.Pod) *string { if scrape != "true" { return nil } - if pod.Status.GetPodIP() == "" { + ip := pod.Status.GetPodIP() + if ip == "" { // return as if scrape was disabled, we will be notified again once the pod // has an IP return nil @@ -142,8 +153,6 @@ func getScrapeURL(pod *corev1.Pod) *string { path = "/" + path } - ip := pod.Status.GetPodIP() - // TODO: (glinton) use url and specify scheme x := fmt.Sprintf("http://%s:%s%s", ip, port, path) return &x diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 7a8b27a0468e1..a289da1987ad4 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -26,6 +26,9 @@ type Prometheus struct { // An array of Kubernetes services to scrape metrics from. KubernetesServices []string + // Location of kubernetes config file + KubeConfig string + // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` From b1f933c332e4f21c21cbb0e206f46564a409041f Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 24 Oct 2018 16:33:50 -0600 Subject: [PATCH 05/13] Update tests --- plugins/inputs/prometheus/kubernetes.go | 2 +- plugins/inputs/prometheus/kubernetes_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 1f68e24d5894c..1fb86ab484355 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -115,7 +115,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { } URL, err := url.Parse(*targetURL) if err != nil { - log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", targetURL, err) + log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", *targetURL, err) return } podURL := p.AddressToURL(URL, URL.Hostname()) diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go index 4e85c198d2534..6a7b431205dc5 100644 --- a/plugins/inputs/prometheus/kubernetes_test.go +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -13,39 +13,39 @@ import ( func TestScrapeURLNoAnnotations(t *testing.T) { p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} p.GetMetadata().Annotations = map[string]string{} - url := scrapeURL(p) + url := getScrapeURL(p) assert.Nil(t, url) } func TestScrapeURLAnnotationsNoScrape(t *testing.T) { p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} p.Metadata.Name = str("myPod") p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"} - url := scrapeURL(p) + url := getScrapeURL(p) assert.Nil(t, url) } func TestScrapeURLAnnotations(t *testing.T) { p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} - url := scrapeURL(p) + url := getScrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) } func TestScrapeURLAnnotationsCustomPort(t *testing.T) { p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} - url := scrapeURL(p) + url := getScrapeURL(p) assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) } func TestScrapeURLAnnotationsCustomPath(t *testing.T) { p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} - url := scrapeURL(p) + url := getScrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) } func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} - url := scrapeURL(p) + url := getScrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) } From 09bf31a257edba7580b4162f0dc86c1cc4f66f19 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Wed, 31 Oct 2018 17:19:38 -0600 Subject: [PATCH 06/13] Address some feedback --- plugins/inputs/prometheus/README.md | 14 ++++--- plugins/inputs/prometheus/kubernetes.go | 54 ++++++++++++------------- plugins/inputs/prometheus/prometheus.go | 30 ++++++++------ 3 files changed, 53 insertions(+), 45 deletions(-) diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index cd0b0e1705623..6d64886aaa8bb 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -14,10 +14,13 @@ in Prometheus format. ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] - # Scrape Kubernetes service for prometheus annotations. - # prometheus.io/scrape: Enable scraping for this service - # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. - # prometheus.io/port: If port is not 9102 use this annotation + ## Kubernetes config file to create client from. + # kube_config = "/path/to/kubernetes.config" + + ## Scrape Kubernetes pods for the following prometheus annotations: + ## - prometheus.io/scrape: Enable scraping for this pod + ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + ## - prometheus.io/port: If port is not 9102 use this annotation # monitor_kubernetes_pods = true ## Use bearer token for authorization @@ -46,7 +49,8 @@ This method can be used to locate all #### Kubernetes scraping Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes -pods. +pods. Currently, you can run this plugin in your kubernetes cluster, or we use the kubeconfig +file to determine where to monitor. Currently the following annotation are supported: * `prometheus.io/scrape` Enable scraping for this pod diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 1fb86ab484355..6b75913443392 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "strings" + "sync" "github.com/ericchiang/k8s" corev1 "github.com/ericchiang/k8s/apis/core/v1" @@ -19,13 +20,13 @@ import ( func loadClient(kubeconfigPath string) (*k8s.Client, error) { data, err := ioutil.ReadFile(kubeconfigPath) if err != nil { - return nil, fmt.Errorf("read kubeconfig: %s", err.Error()) + return nil, fmt.Errorf("failed reading '%s': %s", kubeconfigPath err.Error()) } // Unmarshal YAML into a Kubernetes config object. var config k8s.Config if err := yaml.Unmarshal(data, &config); err != nil { - return nil, fmt.Errorf("unmarshal kubeconfig: %s", err.Error()) + return nil, err } return k8s.NewClient(&config) } @@ -33,7 +34,7 @@ func loadClient(kubeconfigPath string) (*k8s.Client, error) { func start(p *Prometheus) error { client, err := k8s.NewInClusterClient() if err != nil { - configLocation := fmt.Sprintf("%s/.kube/config", os.Getenv("HOME")) + configLocation := filepath.Join(user.Current().HomeDir, ".kube/config") if p.KubeConfig != "" { configLocation = p.KubeConfig } @@ -47,38 +48,27 @@ func start(p *Prometheus) error { pod *corev1.Pod } + p.wg = sync.WaitGroup in := make(chan payload) + + p.ctx, p.cancel = context.WithCancel(context.Background()) + + wg.Add(1) go func() { + defer wg.Done() var pod corev1.Pod rewatch: - watcher, err := client.Watch(context.Background(), "", &pod) + watcher, err := client.Watch(p.ctx, "", &pod) if err != nil { log.Printf("E! [inputs.prometheus] unable to watch resources: %s", err.Error()) + return } defer watcher.Close() for { select { - case <-p.done: - log.Printf("I! [inputs.prometheus] shutting down\n") - return - default: - cm := new(corev1.Pod) - eventType, err := watcher.Next(cm) - if err != nil { - log.Printf("D! [inputs.prometheus] unable to watch next: %s", err.Error()) - goto rewatch - } - in <- payload{eventType, cm} - } - } - }() - - go func() { - for { - select { - case <-p.done: - log.Printf("I! [inputs.prometheus] shutting down\n") + case <-ctx.Done(): + log.Printf("I! [inputs.prometheus] shutting down") return case payload := <-in: cm := payload.pod @@ -91,6 +81,14 @@ func start(p *Prometheus) error { unregisterPod(cm, p) case k8s.EventModified: } + default: + cm := new(corev1.Pod) + eventType, err := watcher.Next(cm) + if err != nil { + log.Printf("D! [inputs.prometheus] unable to watch next: %s", err.Error()) + goto rewatch + } + in <- payload{eventType, cm} } } }() @@ -104,7 +102,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { return } - log.Printf("I! [inputs.prometheus] will scrape metrics from %v\n", *targetURL) + log.Printf("I! [inputs.prometheus] will scrape metrics from %s", *targetURL) // add annotation as metrics tags tags := pod.GetMetadata().GetAnnotations() tags["pod_name"] = pod.GetMetadata().GetName() @@ -115,7 +113,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { } URL, err := url.Parse(*targetURL) if err != nil { - log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", *targetURL, err) + log.Printf("E! [inputs.prometheus] could not parse URL %s: %s", *targetURL, err.Error()) return } podURL := p.AddressToURL(URL, URL.Hostname()) @@ -166,14 +164,14 @@ func unregisterPod(pod *corev1.Pod, p *Prometheus) { p.lock.Lock() defer p.lock.Unlock() - log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s\n", + log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s", pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) var result []URLAndAddress for _, v := range p.kubernetesPods { if v.URL.String() != *url { result = append(result, v) } else { - log.Printf("D! [inputs.prometheus] will stop scraping for %v\n", *url) + log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url) } } diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index a289da1987ad4..f4585a1e5ba84 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -1,6 +1,7 @@ package prometheus import ( + "context" "errors" "fmt" "io/ioutil" @@ -40,9 +41,11 @@ type Prometheus struct { // Should we scrape Kubernetes services for prometheus annotations MonitorPods bool `toml:"monitor_kubernetes_pods"` - lock *sync.Mutex + lock sync.Mutex kubernetesPods []URLAndAddress - done chan struct{} + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup } var sampleConfig = ` @@ -51,11 +54,14 @@ var sampleConfig = ` ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] - - # Scrape Kubernetes pods for prometheus annotations. - # prometheus.io/scrape: Enable scraping for this pod - # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. - # prometheus.io/port: If port is not 9102 use this annotation + + ## Kubernetes config file to create client from. + # kube_config = "/path/to/kubernetes.config" + + ## Scrape Kubernetes pods for the following prometheus annotations: + ## - prometheus.io/scrape: Enable scraping for this pod + ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + ## - prometheus.io/port: If port is not 9102 use this annotation # monitor_kubernetes_pods = true ## Use bearer token for authorization @@ -113,7 +119,7 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { for _, u := range p.URLs { URL, err := url.Parse(u) if err != nil { - log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err) + log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err.Error()) continue } @@ -132,7 +138,7 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { resolvedAddresses, err := net.LookupHost(URL.Hostname()) if err != nil { - log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err) + log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err.Error()) continue } for _, resolved := range resolvedAddresses { @@ -262,12 +268,12 @@ func (p *Prometheus) Start(a telegraf.Accumulator) error { } func (p *Prometheus) Stop() { - close(p.done) + p.cancel() + wg.Wait() } func init() { inputs.Add("prometheus", func() telegraf.Input { - return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}, - lock: &sync.Mutex{}} + return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}} }) } From 7c666fdcb8fb410553cacf1ffead506f6f167184 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Thu, 1 Nov 2018 17:25:13 -0600 Subject: [PATCH 07/13] Allow configurable scheme --- plugins/inputs/prometheus/README.md | 2 + plugins/inputs/prometheus/kubernetes.go | 72 ++++++++++++++++--------- plugins/inputs/prometheus/prometheus.go | 4 +- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 6d64886aaa8bb..5c4b858860dd0 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -19,6 +19,8 @@ in Prometheus format. ## Scrape Kubernetes pods for the following prometheus annotations: ## - prometheus.io/scrape: Enable scraping for this pod + ## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to + ## set this to `https` & most likely set the tls config. ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. ## - prometheus.io/port: If port is not 9102 use this annotation # monitor_kubernetes_pods = true diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 6b75913443392..4fcb0b0990150 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -5,10 +5,12 @@ import ( "fmt" "io/ioutil" "log" + "net" "net/url" - "os" - "strings" + "os/user" + "path/filepath" "sync" + "time" "github.com/ericchiang/k8s" corev1 "github.com/ericchiang/k8s/apis/core/v1" @@ -20,7 +22,7 @@ import ( func loadClient(kubeconfigPath string) (*k8s.Client, error) { data, err := ioutil.ReadFile(kubeconfigPath) if err != nil { - return nil, fmt.Errorf("failed reading '%s': %s", kubeconfigPath err.Error()) + return nil, fmt.Errorf("failed reading '%s': %s", kubeconfigPath, err.Error()) } // Unmarshal YAML into a Kubernetes config object. @@ -34,7 +36,11 @@ func loadClient(kubeconfigPath string) (*k8s.Client, error) { func start(p *Prometheus) error { client, err := k8s.NewInClusterClient() if err != nil { - configLocation := filepath.Join(user.Current().HomeDir, ".kube/config") + u, err := user.Current() + if err != nil { + return fmt.Errorf("Failed to get current user - %s", err.Error()) + } + configLocation := filepath.Join(u.HomeDir, ".kube/config") if p.KubeConfig != "" { configLocation = p.KubeConfig } @@ -43,22 +49,22 @@ func start(p *Prometheus) error { return err } } + type payload struct { eventype string pod *corev1.Pod } - p.wg = sync.WaitGroup - in := make(chan payload) - p.ctx, p.cancel = context.WithCancel(context.Background()) + p.wg = sync.WaitGroup{} + in := make(chan payload) - wg.Add(1) + p.wg.Add(1) go func() { - defer wg.Done() - var pod corev1.Pod + defer p.wg.Done() + pod := &corev1.Pod{} rewatch: - watcher, err := client.Watch(p.ctx, "", &pod) + watcher, err := client.Watch(p.ctx, "", &corev1.Pod{}) if err != nil { log.Printf("E! [inputs.prometheus] unable to watch resources: %s", err.Error()) return @@ -67,28 +73,36 @@ func start(p *Prometheus) error { for { select { - case <-ctx.Done(): + case <-p.ctx.Done(): log.Printf("I! [inputs.prometheus] shutting down") return - case payload := <-in: - cm := payload.pod - eventType := payload.eventype + case rcvdPayload := <-in: + pod = rcvdPayload.pod + eventType := rcvdPayload.eventype switch eventType { case k8s.EventAdded: - registerPod(cm, p) + registerPod(pod, p) case k8s.EventDeleted: - unregisterPod(cm, p) + unregisterPod(pod, p) case k8s.EventModified: } default: - cm := new(corev1.Pod) - eventType, err := watcher.Next(cm) + pod = &corev1.Pod{} + // An error here means we need to reconnect the watcher. + eventType, err := watcher.Next(pod) if err != nil { log.Printf("D! [inputs.prometheus] unable to watch next: %s", err.Error()) - goto rewatch + watcher.Close() + select { + case <-p.ctx.Done(): + log.Printf("I! [inputs.prometheus] shutting down") + return + case <-time.After(time.Second): + goto rewatch + } } - in <- payload{eventType, cm} + in <- payload{eventType, pod} } } }() @@ -139,19 +153,27 @@ func getScrapeURL(pod *corev1.Pod) *string { return nil } + scheme := pod.GetMetadata().GetAnnotations()["prometheus.io/scheme"] path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"] port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"] + + if scheme == "" { + scheme = "http" + } if port == "" { - port = "9102" // default + port = "9102" } if path == "" { path = "/metrics" } - if !strings.HasPrefix(path, "/") { - path = "/" + path + + u := &url.URL{ + Scheme: scheme, + Host: net.JoinHostPort(ip, port), + Path: path, } - x := fmt.Sprintf("http://%s:%s%s", ip, port, path) + x := u.String() return &x } diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index f4585a1e5ba84..dc113f18d5b1b 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -60,6 +60,8 @@ var sampleConfig = ` ## Scrape Kubernetes pods for the following prometheus annotations: ## - prometheus.io/scrape: Enable scraping for this pod + ## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to + ## set this to 'https' & most likely set the tls config. ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. ## - prometheus.io/port: If port is not 9102 use this annotation # monitor_kubernetes_pods = true @@ -269,7 +271,7 @@ func (p *Prometheus) Start(a telegraf.Accumulator) error { func (p *Prometheus) Stop() { p.cancel() - wg.Wait() + p.wg.Wait() } func init() { From 1bb20b2837e7992b8656dead149d703a723c030f Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Fri, 2 Nov 2018 10:29:26 -0600 Subject: [PATCH 08/13] Remove goto --- plugins/inputs/prometheus/kubernetes.go | 98 +++++++++++++------------ 1 file changed, 53 insertions(+), 45 deletions(-) diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 4fcb0b0990150..6933ff8ec29ba 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -2,6 +2,7 @@ package prometheus import ( "context" + "errors" "fmt" "io/ioutil" "log" @@ -17,6 +18,11 @@ import ( "github.com/ghodss/yaml" ) +type payload struct { + eventype string + pod *corev1.Pod +} + // loadClient parses a kubeconfig from a file and returns a Kubernetes // client. It does not support extensions or client auth providers. func loadClient(kubeconfigPath string) (*k8s.Client, error) { @@ -50,11 +56,6 @@ func start(p *Prometheus) error { } } - type payload struct { - eventype string - pod *corev1.Pod - } - p.ctx, p.cancel = context.WithCancel(context.Background()) p.wg = sync.WaitGroup{} in := make(chan payload) @@ -62,47 +63,10 @@ func start(p *Prometheus) error { p.wg.Add(1) go func() { defer p.wg.Done() - pod := &corev1.Pod{} - rewatch: - watcher, err := client.Watch(p.ctx, "", &corev1.Pod{}) - if err != nil { - log.Printf("E! [inputs.prometheus] unable to watch resources: %s", err.Error()) - return - } - defer watcher.Close() - for { - select { - case <-p.ctx.Done(): - log.Printf("I! [inputs.prometheus] shutting down") - return - case rcvdPayload := <-in: - pod = rcvdPayload.pod - eventType := rcvdPayload.eventype - - switch eventType { - case k8s.EventAdded: - registerPod(pod, p) - case k8s.EventDeleted: - unregisterPod(pod, p) - case k8s.EventModified: - } - default: - pod = &corev1.Pod{} - // An error here means we need to reconnect the watcher. - eventType, err := watcher.Next(pod) - if err != nil { - log.Printf("D! [inputs.prometheus] unable to watch next: %s", err.Error()) - watcher.Close() - select { - case <-p.ctx.Done(): - log.Printf("I! [inputs.prometheus] shutting down") - return - case <-time.After(time.Second): - goto rewatch - } - } - in <- payload{eventType, pod} + err := watch(p, client, in) + if err == nil { + break } } }() @@ -110,6 +74,50 @@ func start(p *Prometheus) error { return nil } +func watch(p *Prometheus, client *k8s.Client, in chan payload) error { + pod := &corev1.Pod{} + watcher, err := client.Watch(p.ctx, "", &corev1.Pod{}) + if err != nil { + log.Printf("E! [inputs.prometheus] unable to watch resources: %s", err.Error()) + return err + } + defer watcher.Close() + + for { + select { + case <-p.ctx.Done(): + log.Printf("I! [inputs.prometheus] shutting down") + return nil + case rcvdPayload := <-in: + pod = rcvdPayload.pod + eventType := rcvdPayload.eventype + + switch eventType { + case k8s.EventAdded: + registerPod(pod, p) + case k8s.EventDeleted: + unregisterPod(pod, p) + case k8s.EventModified: + } + default: + pod = &corev1.Pod{} + // An error here means we need to reconnect the watcher. + eventType, err := watcher.Next(pod) + if err != nil { + log.Printf("D! [inputs.prometheus] unable to watch next: %s", err.Error()) + select { + case <-p.ctx.Done(): + log.Printf("I! [inputs.prometheus] shutting down") + return nil + case <-time.After(time.Second): + return errors.New("Watcher closed") + } + } + in <- payload{eventType, pod} + } + } +} + func registerPod(pod *corev1.Pod, p *Prometheus) { targetURL := getScrapeURL(pod) if targetURL == nil { From c4203e264233cd7e081b78a7300e8adbd888bbda Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Fri, 2 Nov 2018 10:35:35 -0600 Subject: [PATCH 09/13] Improve backoff --- plugins/inputs/prometheus/kubernetes.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 6933ff8ec29ba..ff621cecd862f 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -63,10 +63,18 @@ func start(p *Prometheus) error { p.wg.Add(1) go func() { defer p.wg.Done() + t := time.NewTimer(time.Second) for { - err := watch(p, client, in) - if err == nil { - break + select { + case <-p.ctx.Done(): + t.Stop() + log.Printf("I! [inputs.prometheus] shutting down") + return + case <-t.C: + err := watch(p, client, in) + if err == nil { + break + } } } }() @@ -105,13 +113,7 @@ func watch(p *Prometheus, client *k8s.Client, in chan payload) error { eventType, err := watcher.Next(pod) if err != nil { log.Printf("D! [inputs.prometheus] unable to watch next: %s", err.Error()) - select { - case <-p.ctx.Done(): - log.Printf("I! [inputs.prometheus] shutting down") - return nil - case <-time.After(time.Second): - return errors.New("Watcher closed") - } + return errors.New("Watcher closed") } in <- payload{eventType, pod} } From 79c60a3ba66ec4f863250c7446160e8faea2674e Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Fri, 2 Nov 2018 10:40:52 -0600 Subject: [PATCH 10/13] Fix tests --- plugins/inputs/prometheus/kubernetes_test.go | 6 +++--- plugins/inputs/prometheus/prometheus_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go index 6a7b431205dc5..1c803e8da4020 100644 --- a/plugins/inputs/prometheus/kubernetes_test.go +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -50,14 +50,14 @@ func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { } func TestAddPod(t *testing.T) { - prom := &Prometheus{lock: &sync.Mutex{}} + prom := &Prometheus{lock: sync.Mutex{}} p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) assert.Equal(t, 1, len(prom.kubernetesPods)) } func TestAddMultiplePods(t *testing.T) { - prom := &Prometheus{lock: &sync.Mutex{}} + prom := &Prometheus{lock: sync.Mutex{}} p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} @@ -67,7 +67,7 @@ func TestAddMultiplePods(t *testing.T) { assert.Equal(t, 2, len(prom.kubernetesPods)) } func TestDeletePods(t *testing.T) { - prom := &Prometheus{lock: &sync.Mutex{}} + prom := &Prometheus{lock: sync.Mutex{}} p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index b16e020451684..835faeab9a390 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -38,7 +38,7 @@ func TestPrometheusGeneratesMetrics(t *testing.T) { defer ts.Close() p := &Prometheus{ - lock: &sync.Mutex{}, + lock: sync.Mutex{}, URLs: []string{ts.URL}, } @@ -62,7 +62,7 @@ func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) { defer ts.Close() p := &Prometheus{ - lock: &sync.Mutex{}, + lock: sync.Mutex{}, KubernetesServices: []string{ts.URL}, } u, _ := url.Parse(ts.URL) @@ -92,7 +92,7 @@ func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) { defer ts.Close() p := &Prometheus{ - lock: &sync.Mutex{}, + lock: sync.Mutex{}, URLs: []string{ts.URL}, KubernetesServices: []string{"http://random.telegraf.local:88/metrics"}, } From 372e47daea4d1bff15c1063d300d14c917a075ec Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Fri, 2 Nov 2018 15:54:34 -0600 Subject: [PATCH 11/13] Address feedback --- Gopkg.lock | 57 +++++++------------- plugins/inputs/prometheus/README.md | 7 +-- plugins/inputs/prometheus/kubernetes.go | 42 ++++++--------- plugins/inputs/prometheus/kubernetes_test.go | 7 ++- plugins/inputs/prometheus/prometheus_test.go | 4 -- 5 files changed, 44 insertions(+), 73 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 5b30978cd2095..be6d901bdf47d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -4,7 +4,12 @@ [[projects]] digest = "1:972f38a9c879a4920d1e3a3d3438104b6c06163bfa3e6f4064adb00468d40587" name = "cloud.google.com/go" - packages = ["civil"] + packages = [ + "civil", + "compute/metadata", + "internal/version", + "monitoring/apiv3", + ] pruneopts = "" revision = "c728a003b238b26cef9ab6753a5dc424b331c3ad" version = "v0.27.0" @@ -381,14 +386,6 @@ revision = "d1bbc0cffaf9849ddcae7b9efffae33e2dd52e9a" version = "v1.2.0" -[[projects]] - digest = "1:b13707423743d41665fd23f0c36b2f37bb49c30e94adb813319c44188a51ba22" - name = "github.com/ghodss/yaml" - packages = ["."] - pruneopts = "" - revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7" - version = "v1.0.0" - [[projects]] digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356" name = "github.com/go-ini/ini" @@ -505,14 +502,6 @@ revision = "3af367b6b30c263d47e8895973edcca9a49cf029" version = "v0.2.0" -[[projects]] - digest = "1:e097a364f4e8d8d91b9b9eeafb992d3796a41fde3eb548c1a87eb9d9f60725cf" - name = "github.com/googleapis/gax-go" - packages = ["."] - pruneopts = "" - revision = "317e0006254c44a0ac427cc52a0e083ff0b9622f" - version = "v2.0.0" - [[projects]] digest = "1:c1d7e883c50a26ea34019320d8ae40fad86c9e5d56e63a1ba2cb618cef43e986" name = "github.com/google/uuid" @@ -521,6 +510,14 @@ revision = "064e2069ce9c359c118179501254f67d7d37ba24" version = "0.2" +[[projects]] + digest = "1:e097a364f4e8d8d91b9b9eeafb992d3796a41fde3eb548c1a87eb9d9f60725cf" + name = "github.com/googleapis/gax-go" + packages = ["."] + pruneopts = "" + revision = "317e0006254c44a0ac427cc52a0e083ff0b9622f" + version = "v2.0.0" + [[projects]] digest = "1:dbbeb8ddb0be949954c8157ee8439c2adfd8dc1c9510eb44a6e58cb68c3dce28" name = "github.com/gorilla/context" @@ -1081,14 +1078,6 @@ pruneopts = "" revision = "46796da1b0b4794e1e341883a399f12cc7574b55" -[[projects]] - branch = "master" - digest = "1:2fcfc6c3fb8dfe0d80d7789272230d3ac7db15022b66817113f98d9fff880225" - name = "github.com/zensqlmonitor/go-mssqldb" - packages = ["."] - pruneopts = "" - revision = "e8fbf836e44e86764eba398361d1825651709547" - [[projects]] digest = "1:8c8ec859c77fccd10a347b7219b597c4c21c448949e8bdf3fc3e6f4c78f952b4" name = "go.opencensus.io" @@ -1160,16 +1149,15 @@ [[projects]] branch = "master" - digest = "1:b697592485cb412be4188c08ca0beed9aab87f36b86418e21acc4a3998f63734" + digest = "0:" name = "golang.org/x/oauth2" packages = [ ".", + "clientcredentials", "google", "internal", "jws", "jwt", - "clientcredentials", - "internal", ] pruneopts = "" revision = "d2e6202438beef2727060aa7cabdd924d92ebfd9" @@ -1242,7 +1230,7 @@ revision = "19ff8768a5c0b8e46ea281065664787eefc24121" [[projects]] - digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472" + digest = "0:" name = "google.golang.org/appengine" packages = [ ".", @@ -1257,13 +1245,6 @@ "internal/socket", "internal/urlfetch", "socket", - "cloudsql", - "internal", - "internal/base", - "internal/datastore", - "internal/log", - "internal/remote_api", - "internal/urlfetch", "urlfetch", ] pruneopts = "" @@ -1431,6 +1412,7 @@ "github.com/Shopify/sarama", "github.com/StackExchange/wmi", "github.com/aerospike/aerospike-client-go", + "github.com/alecthomas/units", "github.com/amir/raidman", "github.com/apache/thrift/lib/go/thrift", "github.com/aws/aws-sdk-go/aws", @@ -1454,12 +1436,12 @@ "github.com/ericchiang/k8s", "github.com/ericchiang/k8s/apis/core/v1", "github.com/ericchiang/k8s/apis/meta/v1", - "github.com/ghodss/yaml", "github.com/go-logfmt/logfmt", "github.com/go-redis/redis", "github.com/go-sql-driver/mysql", "github.com/gobwas/glob", "github.com/golang/protobuf/proto", + "github.com/golang/protobuf/ptypes/empty", "github.com/golang/protobuf/ptypes/timestamp", "github.com/google/go-cmp/cmp", "github.com/gorilla/mux", @@ -1531,6 +1513,7 @@ "google.golang.org/grpc", "google.golang.org/grpc/codes", "google.golang.org/grpc/credentials", + "google.golang.org/grpc/metadata", "google.golang.org/grpc/status", "gopkg.in/gorethink/gorethink.v3", "gopkg.in/ldap.v2", diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 5c4b858860dd0..d963e4d418fb8 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -55,9 +55,10 @@ pods. Currently, you can run this plugin in your kubernetes cluster, or we use t file to determine where to monitor. Currently the following annotation are supported: -* `prometheus.io/scrape` Enable scraping for this pod -* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default metrics). -* `prometheus.io/port` Used to override the port, the default value is 9102 +* `prometheus.io/scrape` Enable scraping for this pod. +* `prometheus.io/scheme` If the metrics endpoint is secured then you will need to set this to `https` & most likely set the tls config. (default 'http') +* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default '/metrics') +* `prometheus.io/port` Used to override the port. (default 9102) #### Bearer Token diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index ff621cecd862f..d8416d9bc91bf 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -2,7 +2,6 @@ package prometheus import ( "context" - "errors" "fmt" "io/ioutil" "log" @@ -15,7 +14,7 @@ import ( "github.com/ericchiang/k8s" corev1 "github.com/ericchiang/k8s/apis/core/v1" - "github.com/ghodss/yaml" + "gopkg.in/yaml.v2" ) type payload struct { @@ -28,7 +27,7 @@ type payload struct { func loadClient(kubeconfigPath string) (*k8s.Client, error) { data, err := ioutil.ReadFile(kubeconfigPath) if err != nil { - return nil, fmt.Errorf("failed reading '%s': %s", kubeconfigPath, err.Error()) + return nil, fmt.Errorf("failed reading '%s': %v", kubeconfigPath, err) } // Unmarshal YAML into a Kubernetes config object. @@ -44,7 +43,7 @@ func start(p *Prometheus) error { if err != nil { u, err := user.Current() if err != nil { - return fmt.Errorf("Failed to get current user - %s", err.Error()) + return fmt.Errorf("Failed to get current user - %v", err) } configLocation := filepath.Join(u.HomeDir, ".kube/config") if p.KubeConfig != "" { @@ -63,20 +62,18 @@ func start(p *Prometheus) error { p.wg.Add(1) go func() { defer p.wg.Done() - t := time.NewTimer(time.Second) for { select { case <-p.ctx.Done(): - t.Stop() - log.Printf("I! [inputs.prometheus] shutting down") - return - case <-t.C: + break + case <-time.After(time.Second): err := watch(p, client, in) if err == nil { break } } } + log.Printf("D! [inputs.prometheus] shutting down") }() return nil @@ -86,7 +83,7 @@ func watch(p *Prometheus, client *k8s.Client, in chan payload) error { pod := &corev1.Pod{} watcher, err := client.Watch(p.ctx, "", &corev1.Pod{}) if err != nil { - log.Printf("E! [inputs.prometheus] unable to watch resources: %s", err.Error()) + log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err) return err } defer watcher.Close() @@ -94,11 +91,15 @@ func watch(p *Prometheus, client *k8s.Client, in chan payload) error { for { select { case <-p.ctx.Done(): - log.Printf("I! [inputs.prometheus] shutting down") return nil - case rcvdPayload := <-in: - pod = rcvdPayload.pod - eventType := rcvdPayload.eventype + default: + pod = &corev1.Pod{} + // An error here means we need to reconnect the watcher. + eventType, err := watcher.Next(pod) + if err != nil { + log.Printf("D! [inputs.prometheus] unable to watch next: %v", err) + return err + } switch eventType { case k8s.EventAdded: @@ -107,15 +108,6 @@ func watch(p *Prometheus, client *k8s.Client, in chan payload) error { unregisterPod(pod, p) case k8s.EventModified: } - default: - pod = &corev1.Pod{} - // An error here means we need to reconnect the watcher. - eventType, err := watcher.Next(pod) - if err != nil { - log.Printf("D! [inputs.prometheus] unable to watch next: %s", err.Error()) - return errors.New("Watcher closed") - } - in <- payload{eventType, pod} } } } @@ -126,7 +118,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { return } - log.Printf("I! [inputs.prometheus] will scrape metrics from %s", *targetURL) + log.Printf("D! [inputs.prometheus] will scrape metrics from %s", *targetURL) // add annotation as metrics tags tags := pod.GetMetadata().GetAnnotations() tags["pod_name"] = pod.GetMetadata().GetName() @@ -137,7 +129,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { } URL, err := url.Parse(*targetURL) if err != nil { - log.Printf("E! [inputs.prometheus] could not parse URL %s: %s", *targetURL, err.Error()) + log.Printf("E! [inputs.prometheus] could not parse URL %s: %v", *targetURL, err) return } podURL := p.AddressToURL(URL, URL.Hostname()) diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go index 1c803e8da4020..2afdbc5ec53b5 100644 --- a/plugins/inputs/prometheus/kubernetes_test.go +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -1,7 +1,6 @@ package prometheus import ( - "sync" "testing" "github.com/stretchr/testify/assert" @@ -50,14 +49,14 @@ func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { } func TestAddPod(t *testing.T) { - prom := &Prometheus{lock: sync.Mutex{}} + prom := &Prometheus{} p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) assert.Equal(t, 1, len(prom.kubernetesPods)) } func TestAddMultiplePods(t *testing.T) { - prom := &Prometheus{lock: sync.Mutex{}} + prom := &Prometheus{} p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} @@ -67,7 +66,7 @@ func TestAddMultiplePods(t *testing.T) { assert.Equal(t, 2, len(prom.kubernetesPods)) } func TestDeletePods(t *testing.T) { - prom := &Prometheus{lock: sync.Mutex{}} + prom := &Prometheus{} p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index 835faeab9a390..9a2982ff989bf 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -5,7 +5,6 @@ import ( "net/http" "net/http/httptest" "net/url" - "sync" "testing" "time" @@ -38,7 +37,6 @@ func TestPrometheusGeneratesMetrics(t *testing.T) { defer ts.Close() p := &Prometheus{ - lock: sync.Mutex{}, URLs: []string{ts.URL}, } @@ -62,7 +60,6 @@ func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) { defer ts.Close() p := &Prometheus{ - lock: sync.Mutex{}, KubernetesServices: []string{ts.URL}, } u, _ := url.Parse(ts.URL) @@ -92,7 +89,6 @@ func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) { defer ts.Close() p := &Prometheus{ - lock: sync.Mutex{}, URLs: []string{ts.URL}, KubernetesServices: []string{"http://random.telegraf.local:88/metrics"}, } From c5f952fb2ff6db8b52578dcb7ed36ebfe8989d74 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Fri, 2 Nov 2018 16:02:42 -0600 Subject: [PATCH 12/13] Pass context down --- plugins/inputs/prometheus/kubernetes.go | 14 ++++++-------- plugins/inputs/prometheus/prometheus.go | 5 +++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index d8416d9bc91bf..51d7e04a9a261 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -38,7 +38,7 @@ func loadClient(kubeconfigPath string) (*k8s.Client, error) { return k8s.NewClient(&config) } -func start(p *Prometheus) error { +func (p *Prometheus) start(ctx context.Context) error { client, err := k8s.NewInClusterClient() if err != nil { u, err := user.Current() @@ -55,7 +55,6 @@ func start(p *Prometheus) error { } } - p.ctx, p.cancel = context.WithCancel(context.Background()) p.wg = sync.WaitGroup{} in := make(chan payload) @@ -64,24 +63,23 @@ func start(p *Prometheus) error { defer p.wg.Done() for { select { - case <-p.ctx.Done(): + case <-ctx.Done(): break case <-time.After(time.Second): - err := watch(p, client, in) + err := p.watch(ctx, client, in) if err == nil { break } } } - log.Printf("D! [inputs.prometheus] shutting down") }() return nil } -func watch(p *Prometheus, client *k8s.Client, in chan payload) error { +func (p *Prometheus) watch(ctx context.Context, client *k8s.Client, in chan payload) error { pod := &corev1.Pod{} - watcher, err := client.Watch(p.ctx, "", &corev1.Pod{}) + watcher, err := client.Watch(ctx, "", &corev1.Pod{}) if err != nil { log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err) return err @@ -90,7 +88,7 @@ func watch(p *Prometheus, client *k8s.Client, in chan payload) error { for { select { - case <-p.ctx.Done(): + case <-ctx.Done(): return nil default: pod = &corev1.Pod{} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index dc113f18d5b1b..a9987a6610de0 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -43,7 +43,6 @@ type Prometheus struct { MonitorPods bool `toml:"monitor_kubernetes_pods"` lock sync.Mutex kubernetesPods []URLAndAddress - ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } @@ -264,7 +263,9 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error // Start will start the Kubernetes scraping if enabled in the configuration func (p *Prometheus) Start(a telegraf.Accumulator) error { if p.MonitorPods { - return start(p) + var ctx context.Context + ctx, p.cancel = context.WithCancel(context.Background()) + return p.start(ctx) } return nil } From f6d05ed85775a6946f10bcc9f1437386ecff4720 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Mon, 5 Nov 2018 12:43:30 -0700 Subject: [PATCH 13/13] Move error logging up --- plugins/inputs/prometheus/kubernetes.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 51d7e04a9a261..4faf2d55efb02 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -56,7 +56,6 @@ func (p *Prometheus) start(ctx context.Context) error { } p.wg = sync.WaitGroup{} - in := make(chan payload) p.wg.Add(1) go func() { @@ -64,11 +63,11 @@ func (p *Prometheus) start(ctx context.Context) error { for { select { case <-ctx.Done(): - break + return case <-time.After(time.Second): - err := p.watch(ctx, client, in) - if err == nil { - break + err := p.watch(ctx, client) + if err != nil { + log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err) } } } @@ -77,11 +76,10 @@ func (p *Prometheus) start(ctx context.Context) error { return nil } -func (p *Prometheus) watch(ctx context.Context, client *k8s.Client, in chan payload) error { +func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error { pod := &corev1.Pod{} watcher, err := client.Watch(ctx, "", &corev1.Pod{}) if err != nil { - log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err) return err } defer watcher.Close() @@ -95,7 +93,6 @@ func (p *Prometheus) watch(ctx context.Context, client *k8s.Client, in chan payl // An error here means we need to reconnect the watcher. eventType, err := watcher.Next(pod) if err != nil { - log.Printf("D! [inputs.prometheus] unable to watch next: %v", err) return err }