Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scraping for Prometheus endpoint in Kubernetes #4920

Merged
merged 16 commits into from
Nov 5, 2018
Merged
30 changes: 30 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ in Prometheus format.
## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

# Scrape Kubernetes service for prometheus annotations.
# prometheus.io/scrape: Enable scraping for this service
# prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
# prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true

## Use bearer token for authorization
# bearer_token = /path/to/bearer/token

Expand All @@ -37,6 +43,16 @@ by looking up all A records assigned to the hostname as described in
This method can be used to locate all
[Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services).

#### Kubernetes scraping

Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes
glinton marked this conversation as resolved.
Show resolved Hide resolved
pods.
Currently the following annotation are supported:

* `prometheus.io/scrape` Enable scraping for this pod
* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default metrics).
* `prometheus.io/port` Used to override the port, the default value is 9102
glinton marked this conversation as resolved.
Show resolved Hide resolved

#### Bearer Token

If set, the file specified by the `bearer_token` parameter will be read on
Expand Down
181 changes: 181 additions & 0 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package prometheus

import (
"context"
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"strings"

"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"github.com/ghodss/yaml"
glinton marked this conversation as resolved.
Show resolved Hide resolved
)

// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfigPath string) (*k8s.Client, error) {
data, err := ioutil.ReadFile(kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("read kubeconfig: %s", err.Error())
glinton marked this conversation as resolved.
Show resolved Hide resolved
}

// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("unmarshal kubeconfig: %s", err.Error())
glinton marked this conversation as resolved.
Show resolved Hide resolved
}
return k8s.NewClient(&config)
}

func start(p *Prometheus) error {
client, err := k8s.NewInClusterClient()
if err != nil {
configLocation := fmt.Sprintf("%s/.kube/config", os.Getenv("HOME"))
glinton marked this conversation as resolved.
Show resolved Hide resolved
if p.KubeConfig != "" {
configLocation = p.KubeConfig
}
client, err = loadClient(configLocation)
if err != nil {
return err
}
}
type payload struct {
eventype string
pod *corev1.Pod
}

in := make(chan payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, unused.

go func() {
var pod corev1.Pod
rewatch:
watcher, err := client.Watch(context.Background(), "", &pod)
if err != nil {
log.Printf("E! [inputs.prometheus] unable to watch resources: %s", err.Error())
glinton marked this conversation as resolved.
Show resolved Hide resolved
}
defer watcher.Close()
glinton marked this conversation as resolved.
Show resolved Hide resolved
glinton marked this conversation as resolved.
Show resolved Hide resolved

for {
select {
case <-p.done:
log.Printf("I! [inputs.prometheus] shutting down\n")
return
default:
cm := new(corev1.Pod)
eventType, err := watcher.Next(cm)
if err != nil {
log.Printf("D! [inputs.prometheus] unable to watch next: %s", err.Error())
goto rewatch
}
in <- payload{eventType, cm}
}
}
}()

go func() {
for {
select {
case <-p.done:
log.Printf("I! [inputs.prometheus] shutting down\n")
return
case payload := <-in:
glinton marked this conversation as resolved.
Show resolved Hide resolved
cm := payload.pod
eventType := payload.eventype

switch eventType {
case k8s.EventAdded:
registerPod(cm, p)
case k8s.EventDeleted:
unregisterPod(cm, p)
case k8s.EventModified:
}
}
}
}()

return nil
}

func registerPod(pod *corev1.Pod, p *Prometheus) {
targetURL := getScrapeURL(pod)
if targetURL == nil {
return
}

log.Printf("I! [inputs.prometheus] will scrape metrics from %v\n", *targetURL)
glinton marked this conversation as resolved.
Show resolved Hide resolved
// add annotation as metrics tags
tags := pod.GetMetadata().GetAnnotations()
tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags
for k, v := range pod.GetMetadata().GetLabels() {
tags[k] = v
}
URL, err := url.Parse(*targetURL)
if err != nil {
log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", *targetURL, err)
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock()
p.kubernetesPods = append(p.kubernetesPods,
URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags})
p.lock.Unlock()
}

func getScrapeURL(pod *corev1.Pod) *string {
scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"]
if scrape != "true" {
return nil
}
ip := pod.Status.GetPodIP()
if ip == "" {
// return as if scrape was disabled, we will be notified again once the pod
// has an IP
return nil
}

path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"]
port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"]
if port == "" {
port = "9102" // default
}
if path == "" {
path = "/metrics"
}
if !strings.HasPrefix(path, "/") {
path = "/" + path
}

x := fmt.Sprintf("http://%s:%s%s", ip, port, path)
glinton marked this conversation as resolved.
Show resolved Hide resolved

return &x
}

func unregisterPod(pod *corev1.Pod, p *Prometheus) {
url := getScrapeURL(pod)
if url == nil {
return
}

p.lock.Lock()
defer p.lock.Unlock()
log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s\n",
pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
var result []URLAndAddress
for _, v := range p.kubernetesPods {
if v.URL.String() != *url {
result = append(result, v)
} else {
log.Printf("D! [inputs.prometheus] will stop scraping for %v\n", *url)
}

}
p.kubernetesPods = result
}
89 changes: 89 additions & 0 deletions plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package prometheus

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"

v1 "github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
)

func TestScrapeURLNoAnnotations(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.GetMetadata().Annotations = map[string]string{}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.Metadata.Name = str("myPod")
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotations(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}

func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}

func TestAddPod(t *testing.T) {
prom := &Prometheus{lock: &sync.Mutex{}}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{lock: &sync.Mutex{}}

p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
registerPod(p, prom)
assert.Equal(t, 2, len(prom.kubernetesPods))
}
func TestDeletePods(t *testing.T) {
prom := &Prometheus{lock: &sync.Mutex{}}

p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
unregisterPod(p, prom)
assert.Equal(t, 0, len(prom.kubernetesPods))
}

func pod() *v1.Pod {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}}
p.Status.PodIP = str("127.0.0.1")
p.Metadata.Name = str("myPod")
p.Metadata.Namespace = str("default")
return p
}

func str(x string) *string {
return &x
}
Loading