Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scraping for Prometheus endpoint in Kubernetes #3901

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c
github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98
github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483
github.com/ericchiang/k8s 677cf3318ef83bf681a38821f81a233a9be09641
github.com/ghodss/yaml 0ca9ea5df5451ffdf184b4428c902747c2c11cd7
github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a
Expand Down
16 changes: 16 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ in Prometheus format.
## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

# Scrape Kubernetes service for prometheus annotations.
# prometheus.io/scrape: Enable scraping for this service
# prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
# prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true

## Use bearer token for authorization
# bearer_token = /path/to/bearer/token

Expand All @@ -37,6 +43,16 @@ by looking up all A records assigned to the hostname as described in
This method can be used to locate all
[Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services).

#### Kubernetes scraping

Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes
pods.
Currently the following annotation are supported:

* `prometheus.io/scrape` Enable scraping for this pod
* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default metrics).
* `prometheus.io/port` Used to override the port, the default value is 9102

#### Bearer Token

If set, the file specified by the `bearer_token` parameter will be read on
Expand Down
161 changes: 161 additions & 0 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package prometheus

import (
"context"
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"strings"

"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"github.com/ghodss/yaml"
)

// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfigPath string) (*k8s.Client, error) {
data, err := ioutil.ReadFile(kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("read kubeconfig: %v", err)
}

// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("unmarshal kubeconfig: %v", err)
}
return k8s.NewClient(&config)
}

func start(p *Prometheus) error {
client, err := k8s.NewInClusterClient()
if err != nil {
client, err = loadClient(fmt.Sprintf("%v/.kube/config", os.Getenv("HOME")))
glinton marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Fatal(err)
glinton marked this conversation as resolved.
Show resolved Hide resolved
}
}
type payload struct {
eventype string
pod *corev1.Pod
}
in := make(chan payload)
go func() {
glinton marked this conversation as resolved.
Show resolved Hide resolved
var pod corev1.Pod
watcher, err := client.Watch(context.Background(), "", &pod)
glinton marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to find this in the library documentation, does the watch send all pods initially?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran a test on our cluster and it started registering all pods. So yes :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi I think this patch is very useful. so i have tested it.
Existing pods are all good. But if i create a new pod, it does not work for new pod.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will investigate. Thanks for the feedback

if err != nil {
log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err)
}
defer watcher.Close()

for {
cm := new(corev1.Pod)
eventType, err := watcher.Next(cm)
if err != nil {
log.Println()
glinton marked this conversation as resolved.
Show resolved Hide resolved
}
in <- payload{eventType, cm}
}
}()

go func() {
for {
select {
case <-p.done:
log.Printf("I! [inputs.prometheus] shutting dow\n")
glinton marked this conversation as resolved.
Show resolved Hide resolved
return
case payload := <-in:
cm := payload.pod
eventType := payload.eventype

if err != nil {
glinton marked this conversation as resolved.
Show resolved Hide resolved
log.Printf("E! [inputs.prometheus] watcher encountered and error: %v", err)
glinton marked this conversation as resolved.
Show resolved Hide resolved
break
}
switch eventType {
case k8s.EventAdded:
registerPod(cm, p)
case k8s.EventDeleted:
unregisterPod(cm, p)
case k8s.EventModified:
}
}
}
}()

return nil
}

func registerPod(pod *corev1.Pod, p *Prometheus) {
targetURL := scrapeURL(pod)
if targetURL != nil {
glinton marked this conversation as resolved.
Show resolved Hide resolved
log.Printf("I! [inputs.prometheus] will scrape metrics from %v\n", *targetURL)
p.lock.Lock()
glinton marked this conversation as resolved.
Show resolved Hide resolved
// add annotation as metrics tags
tags := pod.GetMetadata().GetAnnotations()
tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags
for k, v := range pod.GetMetadata().GetLabels() {
tags[k] = v
}
URL, err := url.Parse(*targetURL)
if err != nil {
log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", targetURL, err)
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.kubernetesPods = append(p.kubernetesPods, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: tags})
glinton marked this conversation as resolved.
Show resolved Hide resolved
p.lock.Unlock()
}
}

func scrapeURL(pod *corev1.Pod) *string {
glinton marked this conversation as resolved.
Show resolved Hide resolved
scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"]
if pod.Status.GetPodIP() == "" {
// return as if scrape was disabled, we will be notified again once the pod
// has an IP
log.Println("pod doesn't have an IP")
glinton marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
if scrape == "true" {
glinton marked this conversation as resolved.
Show resolved Hide resolved
path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"]
port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"]
if port == "" {
port = "9102" // default
}
if path == "" {
path = "/metrics"
}
if !strings.HasPrefix(path, "/") {
path = "/" + path
}

ip := pod.Status.GetPodIP()
x := fmt.Sprintf("http://%v:%v%v", ip, port, path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using url.URL to create the url, which would avoid the need to fixup the path above and potentially handle other cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always using http seems like a potential issue to me. What if the service is using https?

return &x
}
return nil
}

func unregisterPod(pod *corev1.Pod, p *Prometheus) {
url := scrapeURL(pod)
if url != nil {
glinton marked this conversation as resolved.
Show resolved Hide resolved
p.lock.Lock()
defer p.lock.Unlock()
log.Printf("D! [inputs.prometheus] registred a delete request for %v in namespace %v\n", pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
glinton marked this conversation as resolved.
Show resolved Hide resolved
var result []URLAndAddress
for _, v := range p.kubernetesPods {
if v.URL.String() != *url {
result = append(result, v)
} else {
log.Printf("D! [inputs.prometheus] will stop scraping for %v\n", *url)
}

}
p.kubernetesPods = result
}
}
89 changes: 89 additions & 0 deletions plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package prometheus

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"

v1 "github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
)

func TestScrapeURLNoAnnotations(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.GetMetadata().Annotations = map[string]string{}
url := scrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.Metadata.Name = str("myPod")
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"}
url := scrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotations(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := scrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := scrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
url := scrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}

func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"}
url := scrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}

func TestAddPod(t *testing.T) {
prom := &Prometheus{lock: &sync.Mutex{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{lock: &sync.Mutex{}}

p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
registerPod(p, prom)
assert.Equal(t, 2, len(prom.kubernetesPods))
}
func TestDeletePods(t *testing.T) {
prom := &Prometheus{lock: &sync.Mutex{}}

p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
unregisterPod(p, prom)
assert.Equal(t, 0, len(prom.kubernetesPods))
}

func pod() *v1.Pod {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}}
p.Status.PodIP = str("127.0.0.1")
p.Metadata.Name = str("myPod")
p.Metadata.Namespace = str("default")
return p
}

func str(x string) *string {
return &x
}
46 changes: 36 additions & 10 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type Prometheus struct {
InsecureSkipVerify bool

client *http.Client

// Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"`
lock *sync.Mutex
kubernetesPods []URLAndAddress
done chan struct{}
}

var sampleConfig = `
Expand All @@ -48,6 +54,12 @@ var sampleConfig = `

## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

# Scrape Kubernetes pods for prometheus annotations.
# prometheus.io/scrape: Enable scraping for this pod
# prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
# prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true

## Use bearer token for authorization
# bearer_token = /path/to/bearer/token
Expand Down Expand Up @@ -96,6 +108,7 @@ type URLAndAddress struct {
OriginalURL *url.URL
URL *url.URL
Address string
Tags map[string]string
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a good idea to rename this struct... perhaps we merge this with the Target struct?


func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) {
Expand All @@ -109,11 +122,17 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) {

allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL})
}
p.lock.Lock()
defer p.lock.Unlock()
// loop through all pods scraped via the prometheus annotation on the pods
allURLs = append(allURLs, p.kubernetesPods...)

for _, service := range p.KubernetesServices {
URL, err := url.Parse(service)
if err != nil {
return nil, err
}

resolvedAddresses, err := net.LookupHost(URL.Hostname())
if err != nil {
log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err)
Expand Down Expand Up @@ -157,15 +176,6 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
return nil
}

var tr = &http.Transport{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

func (p *Prometheus) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
Expand Down Expand Up @@ -226,6 +236,9 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
if u.Address != "" {
tags["address"] = u.Address
}
for k, v := range u.Tags {
tags[k] = v
}

switch metric.Type() {
case telegraf.Counter:
Expand All @@ -244,8 +257,21 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
return nil
}

// Start will start the Kubernetes scraping if enabled in the configuration
func (p *Prometheus) Start(a telegraf.Accumulator) error {
if p.MonitorPods {
return start(p)
}
return nil
}

func (p *Prometheus) Stop() {
close(p.done)
}

func init() {
inputs.Add("prometheus", func() telegraf.Input {
return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}}
return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3},
lock: &sync.Mutex{}}
})
}
Loading