diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bc7d8b561a6..2a1d49ed9bfe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Main (unreleased) - New Grafana Agent Flow components: + - `discovery.kubelet` collect scrape targets from the Kubelet API. (@gcampbell12) - `prometheus.exporter.kafka` collects metrics from Kafka Server. (@oliver-zhang) - `otelcol.processor.attributes` accepts telemetry data from other `otelcol` components and modifies attributes of a span, log, or metric. (@ptodev) diff --git a/component/all/all.go b/component/all/all.go index 840bc65b4cb8..1b23f04045c7 100644 --- a/component/all/all.go +++ b/component/all/all.go @@ -10,6 +10,7 @@ import ( _ "github.com/grafana/agent/component/discovery/docker" // Import discovery.docker _ "github.com/grafana/agent/component/discovery/file" // Import discovery.file _ "github.com/grafana/agent/component/discovery/gce" // Import discovery.gce + _ "github.com/grafana/agent/component/discovery/kubelet" // Import discovery.kubelet _ "github.com/grafana/agent/component/discovery/kubernetes" // Import discovery.kubernetes _ "github.com/grafana/agent/component/discovery/relabel" // Import discovery.relabel _ "github.com/grafana/agent/component/local/file" // Import local.file diff --git a/component/discovery/kubelet/kubelet.go b/component/discovery/kubelet/kubelet.go new file mode 100644 index 000000000000..98afc87dce5c --- /dev/null +++ b/component/discovery/kubelet/kubelet.go @@ -0,0 +1,278 @@ +// Package kubelet implements a discovery.kubelet component. +package kubelet + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/common/config" + "github.com/grafana/agent/component/discovery" + commonConfig "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/refresh" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/strutil" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + defaultKubeletRefreshInterval = 5 * time.Second + + metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_" + namespaceLabel = metaLabelPrefix + "namespace" + presentValue = model.LabelValue("true") + podNameLabel = metaLabelPrefix + "pod_name" + podIPLabel = metaLabelPrefix + "pod_ip" + podContainerNameLabel = metaLabelPrefix + "pod_container_name" + podContainerImageLabel = metaLabelPrefix + "pod_container_image" + podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" + podContainerPortNumberLabel = metaLabelPrefix + "pod_container_port_number" + podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol" + podContainerIsInit = metaLabelPrefix + "pod_container_init" + podReadyLabel = metaLabelPrefix + "pod_ready" + podPhaseLabel = metaLabelPrefix + "pod_phase" + podLabelPrefix = metaLabelPrefix + "pod_label_" + podLabelPresentPrefix = metaLabelPrefix + "pod_labelpresent_" + podAnnotationPrefix = metaLabelPrefix + "pod_annotation_" + podAnnotationPresentPrefix = metaLabelPrefix + "pod_annotationpresent_" + podNodeNameLabel = metaLabelPrefix + "pod_node_name" + podHostIPLabel = metaLabelPrefix + "pod_host_ip" + podUID = metaLabelPrefix + "pod_uid" + podControllerKind = metaLabelPrefix + "pod_controller_kind" + podControllerName = metaLabelPrefix + "pod_controller_name" +) + +var ( + defaultKubeletURL, _ = url.Parse("https://localhost:10250") +) + +func init() { + component.Register(component.Registration{ + Name: "discovery.kubelet", + Args: Arguments{}, + Exports: discovery.Exports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +// Arguments configures the discovery.kubelet component. +type Arguments struct { + URL config.URL `river:"url,attr,optional"` + Interval time.Duration `river:"refresh_interval,attr,optional"` + HTTPClientConfig config.HTTPClientConfig `river:",squash"` + Namespaces []string `river:"namespaces,attr,optional"` +} + +// DefaultConfig holds defaults for SDConfig. +var DefaultConfig = Arguments{ + URL: config.URL{ + URL: defaultKubeletURL, + }, + HTTPClientConfig: config.DefaultHTTPClientConfig, +} + +// SetToDefault implements river.Defaulter. +func (args *Arguments) SetToDefault() { + *args = DefaultConfig +} + +// Validate implements river.Validator. +func (args *Arguments) Validate() error { + // We must explicitly Validate because HTTPClientConfig is squashed and it won't run otherwise + return args.HTTPClientConfig.Validate() +} + +// New returns a new instance of a discovery.kubelet component. +func New(opts component.Options, args Arguments) (*discovery.Component, error) { + return discovery.New(opts, args, func(args component.Arguments) (discovery.Discoverer, error) { + newArgs := args.(Arguments) + kubeletDiscovery, err := NewKubeletDiscovery(newArgs) + if err != nil { + return nil, err + } + interval := defaultKubeletRefreshInterval + if newArgs.Interval != 0 { + interval = newArgs.Interval + } + return refresh.NewDiscovery(opts.Logger, "kubelet", interval, kubeletDiscovery.Refresh), nil + }) +} + +type Discovery struct { + client *http.Client + url string + targetNamespaces []string +} + +func NewKubeletDiscovery(args Arguments) (*Discovery, error) { + transport, err := commonConfig.NewRoundTripperFromConfig(*args.HTTPClientConfig.Convert(), "kubelet_sd") + if err != nil { + return nil, err + } + client := &http.Client{ + Transport: transport, + Timeout: 30 * time.Second, + } + // ensure the path is the kubelet pods endpoint + args.URL.Path = "/pods" + return &Discovery{ + client: client, + url: args.URL.String(), + targetNamespaces: args.Namespaces, + }, nil +} + +func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) { + // Create a new GET request to the kubelet API server + req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) + if err != nil { + return nil, fmt.Errorf("error creating kubelet pods request: %v", err) + } + + // Send the request to the kubelet + resp, err := d.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error sending kublet pods request: %v", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("error response from kubelet: %v", resp.Status) + } + + // Read the response body + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %v", err) + } + + // Unmarshal the response body into a pod list + var podList v1.PodList + if err := json.Unmarshal(body, &podList); err != nil { + return nil, fmt.Errorf("error unmarshaling response body: %v", err) + } + + // Create a list of targets from the pods + var targetGroups []*targetgroup.Group + for _, pod := range podList.Items { + // Skip pods that are not in the one of the desired namespaces + if len(d.targetNamespaces) > 0 && !d.podInTargetNamespaces(pod) { + continue + } + targetGroups = append(targetGroups, d.buildPodTargetGroup(pod)) + } + + return targetGroups, nil +} + +func (d *Discovery) buildPodTargetGroup(pod v1.Pod) *targetgroup.Group { + tg := &targetgroup.Group{ + Source: podSource(pod), + } + // PodIP can be empty when a pod is starting or has been evicted. + if len(pod.Status.PodIP) == 0 { + return tg + } + + tg.Labels = podLabels(pod) + tg.Labels[namespaceLabel] = lv(pod.Namespace) + + containers := append(pod.Spec.Containers, pod.Spec.InitContainers...) + for i, c := range containers { + isInit := i >= len(pod.Spec.Containers) + for _, port := range c.Ports { + ports := strconv.FormatUint(uint64(port.ContainerPort), 10) + addr := net.JoinHostPort(pod.Status.PodIP, ports) + + tg.Targets = append(tg.Targets, model.LabelSet{ + model.AddressLabel: lv(addr), + podContainerNameLabel: lv(c.Name), + podContainerImageLabel: lv(c.Image), + podContainerPortNumberLabel: lv(ports), + podContainerPortNameLabel: lv(port.Name), + podContainerPortProtocolLabel: lv(string(port.Protocol)), + podContainerIsInit: lv(strconv.FormatBool(isInit)), + }) + } + } + + return tg +} + +func (d *Discovery) podInTargetNamespaces(pod v1.Pod) bool { + for _, ns := range d.targetNamespaces { + if pod.Namespace == ns { + return true + } + } + return false +} + +func podSource(pod v1.Pod) string { + return podSourceFromNamespaceAndName(pod.Namespace, pod.Name) +} + +func podSourceFromNamespaceAndName(namespace, name string) string { + return "pod/" + namespace + "/" + name +} + +func podLabels(pod v1.Pod) model.LabelSet { + ls := model.LabelSet{ + podNameLabel: lv(pod.ObjectMeta.Name), + podIPLabel: lv(pod.Status.PodIP), + podReadyLabel: podReady(pod), + podPhaseLabel: lv(string(pod.Status.Phase)), + podNodeNameLabel: lv(pod.Spec.NodeName), + podHostIPLabel: lv(pod.Status.HostIP), + podUID: lv(string(pod.ObjectMeta.UID)), + } + + createdBy := metav1.GetControllerOf(&pod) + if createdBy != nil { + if createdBy.Kind != "" { + ls[podControllerKind] = lv(createdBy.Kind) + } + if createdBy.Name != "" { + ls[podControllerName] = lv(createdBy.Name) + } + } + + for k, v := range pod.Labels { + ln := strutil.SanitizeLabelName(k) + ls[model.LabelName(podLabelPrefix+ln)] = lv(v) + ls[model.LabelName(podLabelPresentPrefix+ln)] = presentValue + } + + for k, v := range pod.Annotations { + ln := strutil.SanitizeLabelName(k) + ls[model.LabelName(podAnnotationPrefix+ln)] = lv(v) + ls[model.LabelName(podAnnotationPresentPrefix+ln)] = presentValue + } + + return ls +} + +func lv(s string) model.LabelValue { + return model.LabelValue(s) +} + +func podReady(pod v1.Pod) model.LabelValue { + for _, cond := range pod.Status.Conditions { + if cond.Type == v1.PodReady { + return lv(strings.ToLower(string(cond.Status))) + } + } + return lv(strings.ToLower(string(v1.ConditionUnknown))) +} diff --git a/component/discovery/kubelet/kubelet_test.go b/component/discovery/kubelet/kubelet_test.go new file mode 100644 index 000000000000..3d068efa1b01 --- /dev/null +++ b/component/discovery/kubelet/kubelet_test.go @@ -0,0 +1,36 @@ +package kubelet + +import ( + "testing" + + "github.com/grafana/agent/pkg/river" + "github.com/stretchr/testify/require" +) + +func TestRiverConfig(t *testing.T) { + var exampleRiverConfig = ` + bearer_token_file = "/path/to/file.token" +` + + var args Arguments + err := river.Unmarshal([]byte(exampleRiverConfig), &args) + require.NoError(t, err) +} + +func TestBadRiverConfig(t *testing.T) { + var exampleRiverConfig = ` + bearer_token = "token" + bearer_token_file = "/path/to/file.token" +` + + // Make sure the squashed HTTPClientConfig Validate function is being utilized correctly + var args Arguments + err := river.Unmarshal([]byte(exampleRiverConfig), &args) + require.ErrorContains(t, err, "at most one of bearer_token & bearer_token_file must be configured") + + // Make sure that URL defaults to https://localhost:10250 + var args2 Arguments + err = river.Unmarshal([]byte{}, &args2) + require.NoError(t, err) + require.Equal(t, args2.URL.String(), "https://localhost:10250") +} diff --git a/docs/sources/flow/reference/components/discovery.kubelet.md b/docs/sources/flow/reference/components/discovery.kubelet.md new file mode 100644 index 000000000000..66db23683212 --- /dev/null +++ b/docs/sources/flow/reference/components/discovery.kubelet.md @@ -0,0 +1,149 @@ +--- +title: discovery.kubelet +labels: + stage: beta +--- + +# discovery.kubelet + +`discovery.kubelet` discovers Kubernetes pods running on the specified Kubelet +and exposes them as scrape targets. + +## Usage + +```river +discovery.kubelet "LABEL" { + bearer_token_file = "TOKEN_FILE" +} +``` + +## Requirements + +* The Kubelet must be reachable from the `grafana-agent` pod network. +* Follow the [Kubelet authorization](https://kubernetes.io/docs/reference/access-authn-authz/kubelet-authn-authz/#kubelet-authorization) + documentation to configure authentication to the Kubelet API. + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`url` | `string` | URL of the Kubelet server. | | no +`bearer_token` | `secret` | Bearer token to authenticate with. | | no +`bearer_token_file` | `string` | File containing a bearer token to authenticate with. | | no +`refresh_interval` | `duration` | How often the Kubelet should be polled for scrape targets | `5s` | no +`namespaces` | `list(string)` | A list of namespaces to extract target pods from | | no + +One of the following authentication methods must be provided if kubelet authentication is enabled + - [`bearer_token` argument](#arguments). + - [`bearer_token_file` argument](#arguments). + - [`authorization` block][authorization]. + +The `namespaces` list limits the namespaces to discover resources in. If +omitted, all namespaces are searched. + +## Blocks + +The following blocks are supported inside the definition of +`discovery.kubelet`: + +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +authorization | [authorization][] | Configure generic authorization to the endpoint. | no +tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no + +[authorization]: #authorization-block +[tls_config]: #tls_config-block + +### authorization block + +{{< docs/shared lookup="flow/reference/components/authorization-block.md" source="agent" >}} + +### tls_config block + +{{< docs/shared lookup="flow/reference/components/tls-config-block.md" source="agent" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +---- | ---- | ----------- +`targets` | `list(map(string))` | The set of targets discovered from the Kubelet API. + +Each target includes the following labels: + +* `__address__`: The target address to scrape derived from the pod IP and container port. +* `__meta_kubernetes_namespace`: The namespace of the pod object. +* `__meta_kubernetes_pod_name`: The name of the pod object. +* `__meta_kubernetes_pod_ip`: The pod IP of the pod object. +* `__meta_kubernetes_pod_label_`: Each label from the pod object. +* `__meta_kubernetes_pod_labelpresent_`: `true` for each label from + the pod object. +* `__meta_kubernetes_pod_annotation_`: Each annotation from the + pod object. +* `__meta_kubernetes_pod_annotationpresent_`: `true` for each + annotation from the pod object. +* `__meta_kubernetes_pod_container_init`: `true` if the container is an + `InitContainer`. +* `__meta_kubernetes_pod_container_name`: Name of the container the target + address points to. +* `__meta_kubernetes_pod_container_id`: ID of the container the target address + points to. The ID is in the form `://`. +* `__meta_kubernetes_pod_container_image`: The image the container is using. +* `__meta_kubernetes_pod_container_port_name`: Name of the container port. +* `__meta_kubernetes_pod_container_port_number`: Number of the container port. +* `__meta_kubernetes_pod_container_port_protocol`: Protocol of the container + port. +* `__meta_kubernetes_pod_ready`: Set to `true` or `false` for the pod's ready + state. +* `__meta_kubernetes_pod_phase`: Set to `Pending`, `Running`, `Succeeded`, `Failed` or + `Unknown` in the lifecycle. +* `__meta_kubernetes_pod_node_name`: The name of the node the pod is scheduled + onto. +* `__meta_kubernetes_pod_host_ip`: The current host IP of the pod object. +* `__meta_kubernetes_pod_uid`: The UID of the pod object. +* `__meta_kubernetes_pod_controller_kind`: Object kind of the pod controller. +* `__meta_kubernetes_pod_controller_name`: Name of the pod controller. + +> **Note**: The Kubelet API used by this component is an internal API and therefore the +> data in the response returned from the API cannot be guaranteed between different versions +> of the Kubelet. + +## Component health + +`discovery.kubelet` is reported as unhealthy when given an invalid +configuration. In those cases, exported fields retain their last healthy +values. + +## Debug information + +`discovery.kubelet` does not expose any component-specific debug information. + +### Debug metrics + +`discovery.kubelet` does not expose any component-specific debug metrics. + +## Examples + +### Bearer token file authentication + +This example uses a bearer token file to authenticate to the Kubelet API: + +```river +discovery.kubelet "k8s_pods" { + bearer_token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token" +} +``` + +### Limit searched namespaces + +This example limits the namespaces where pods are discovered using the `namespaces` argument: + +```river +discovery.kubelet "k8s_pods" { + bearer_token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token" + namespaces = ["default", "kube-system"] +} +```