Skip to content

Commit

Permalink
[metricbeat][Kubernetes] Update token on the first HTTP 401 when maki…
Browse files Browse the repository at this point in the history
…ng requests to kubelet API (#40636) (#40655)

* Update token on HTTP 401

---------

Signed-off-by: constanca <[email protected]>
  • Loading branch information
mergify[bot] authored Aug 29, 2024
1 parent 4791cfd commit 45ef1d5
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Metricbeat*

- Fix first HTTP 401 error when fetching metrics from the Kubelet API caused by a token update {pull}40636[40636]
- Fix Azure Monitor 429 error by causing metricbeat to retry the request again. {pull}38294[38294]
- Fix fields not being parsed correctly in postgresql/database {issue}25301[25301] {pull}37720[37720]
- rabbitmq/queue - Change the mapping type of `rabbitmq.queue.consumers.utilisation.pct` to `scaled_float` from `long` because the values fall within the range of `[0.0, 1.0]`. Previously, conversion to integer resulted in reporting either `0` or `1`.
Expand Down
57 changes: 36 additions & 21 deletions metricbeat/helper/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,30 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"

"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent-libs/useragent"

"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent-libs/useragent"
)

var userAgent = useragent.UserAgent("Metricbeat", version.GetDefaultVersion(), version.Commit(), version.BuildTime().String())

// HTTP is a custom HTTP Client that handle the complexity of connection and retrieving information
// from HTTP endpoint.
type HTTP struct {
hostData mb.HostData
client *http.Client // HTTP client that is reused across requests.
headers http.Header
name string
uri string
method string
body []byte
hostData mb.HostData
bearerFile string
client *http.Client // HTTP client that is reused across requests.
headers http.Header
name string
uri string
method string
body []byte
}

// NewHTTP creates new http helper
Expand All @@ -57,7 +59,7 @@ func NewHTTP(base mb.BaseMetricSet) (*HTTP, error) {
return NewHTTPFromConfig(config, base.HostData())
}

// newHTTPWithConfig creates a new http helper from some configuration
// NewHTTPFromConfig newHTTPWithConfig creates a new http helper from some configuration
func NewHTTPFromConfig(config Config, hostData mb.HostData) (*HTTP, error) {
headers := http.Header{}
if config.Headers == nil {
Expand Down Expand Up @@ -96,12 +98,13 @@ func NewHTTPFromConfig(config Config, hostData mb.HostData) (*HTTP, error) {
}

return &HTTP{
hostData: hostData,
client: client,
headers: headers,
method: "GET",
uri: hostData.SanitizedURI,
body: nil,
hostData: hostData,
bearerFile: config.BearerTokenFile,
client: client,
headers: headers,
method: "GET",
uri: hostData.SanitizedURI,
body: nil,
}, nil
}

Expand All @@ -126,7 +129,7 @@ func (h *HTTP) FetchResponse() (*http.Response, error) {

resp, err := h.client.Do(req)
if err != nil {
return nil, fmt.Errorf("error making http request: %v", err)
return nil, fmt.Errorf("error making http request: %w", err)
}

return resp, nil
Expand Down Expand Up @@ -179,7 +182,7 @@ func (h *HTTP) FetchContent() ([]byte, error) {
return nil, fmt.Errorf("HTTP error %d in %s: %s", resp.StatusCode, h.name, resp.Status)
}

return ioutil.ReadAll(resp.Body)
return io.ReadAll(resp.Body)
}

// FetchScanner returns a Scanner for the content.
Expand Down Expand Up @@ -210,11 +213,23 @@ func (h *HTTP) FetchJSON() (map[string]interface{}, error) {
return data, nil
}

// getAuthHeaderFromToken reads a bearer authorizaiton token from the given file
func (h *HTTP) RefreshAuthorizationHeader() (bool, error) {
if h.bearerFile != "" {
header, err := getAuthHeaderFromToken(h.bearerFile)
if err != nil {
return false, err
}
h.headers.Set("Authorization", header)
return true, nil
}
return false, nil
}

// getAuthHeaderFromToken reads a bearer authorization token from the given file
func getAuthHeaderFromToken(path string) (string, error) {
var token string

b, err := ioutil.ReadFile(path)
b, err := os.ReadFile(path)
if err != nil {
return "", fmt.Errorf("reading bearer token file: %w", err)
}
Expand Down
37 changes: 37 additions & 0 deletions metricbeat/helper/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"testing"
"time"
Expand Down Expand Up @@ -294,6 +295,42 @@ func TestUserAgentCheck(t *testing.T) {
assert.Contains(t, ua, "Metricbeat")
}

func TestRefreshAuthorizationHeader(t *testing.T) {
path := t.TempDir()
bearerFileName := "token"
bearerFilePath := filepath.Join(path, bearerFileName)

getAuth := func(helper *HTTP) string {
for k, v := range helper.headers {
if k == "Authorization" {
return v[0]
}
}
return ""
}

firstToken := "token-1"
err := os.WriteFile(bearerFilePath, []byte(firstToken), 0644)
assert.NoError(t, err)

helper := &HTTP{bearerFile: bearerFilePath, headers: make(http.Header)}
updated, err := helper.RefreshAuthorizationHeader()
assert.NoError(t, err)
assert.True(t, updated)
expected := fmt.Sprintf("Bearer %s", firstToken)
assert.Equal(t, expected, getAuth(helper))

secondToken := "token-2"
err = os.WriteFile(bearerFilePath, []byte(secondToken), 0644)
assert.NoError(t, err)

updated, err = helper.RefreshAuthorizationHeader()
assert.NoError(t, err)
assert.True(t, updated)
expected = fmt.Sprintf("Bearer %s", secondToken)
assert.Equal(t, expected, getAuth(helper))
}

func checkTimeout(t *testing.T, h *HTTP) {
t.Helper()

Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ package container
import (
"fmt"

"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)

const (
Expand Down
25 changes: 23 additions & 2 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package kubernetes

import (
"fmt"
httpnet "net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -123,7 +125,7 @@ func (m *module) GetStateMetricsFamilies(prometheus p.Prometheus) ([]*p.MetricFa
defer m.kubeStateMetricsCache.lock.Unlock()

now := time.Now()
// NOTE: These entries will be never removed, this can be a leak if
// NOTE: These entries will never be removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
familiesCache := m.kubeStateMetricsCache.getCacheMapEntry(m.cacheHash)
Expand All @@ -142,13 +144,32 @@ func (m *module) GetKubeletStats(http *helper.HTTP) ([]byte, error) {

now := time.Now()

// NOTE: These entries will be never removed, this can be a leak if
// NOTE: These entries will never be removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
statsCache := m.kubeletStatsCache.getCacheMapEntry(m.cacheHash)

// Check if the last time we tried to make a request to the Kubelet API ended in a 401 Unauthorized error.
// If this is the case, we should not keep making requests.
errorUnauthorisedMsg := fmt.Sprintf("HTTP error %d", httpnet.StatusUnauthorized)
if statsCache.lastFetchErr != nil && strings.Contains(statsCache.lastFetchErr.Error(), errorUnauthorisedMsg) {
return statsCache.sharedStats, statsCache.lastFetchErr
}

// If this is the first request, or it has passed more time than config.period, we should
// make a request to the Kubelet API again to get the last metrics' values.
if statsCache.lastFetchTimestamp.IsZero() || now.Sub(statsCache.lastFetchTimestamp) > m.Config().Period {
statsCache.sharedStats, statsCache.lastFetchErr = http.FetchContent()

// If we got an unauthorized error from our HTTP request, it is possible the token has expired.
// We should update the Authorization header in that case. We only try this for the first time
// we get HTTP 401 to avoid getting in a loop in case the cause of the error is something different.
if statsCache.lastFetchErr != nil && strings.Contains(statsCache.lastFetchErr.Error(), errorUnauthorisedMsg) {
if _, err := http.RefreshAuthorizationHeader(); err == nil {
statsCache.sharedStats, statsCache.lastFetchErr = http.FetchContent()
}
}

statsCache.lastFetchTimestamp = now
}

Expand Down

0 comments on commit 45ef1d5

Please sign in to comment.