Skip to content

Commit

Permalink
Implement using /metrics/resource Kubelet endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: JunYang <[email protected]>
  • Loading branch information
yangjunmyfm192085 committed May 27, 2021
1 parent d766094 commit bd5008e
Show file tree
Hide file tree
Showing 7 changed files with 856 additions and 3 deletions.
1 change: 1 addition & 0 deletions manifests/base/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ rules:
- pods
- nodes
- nodes/stats
- nodes/metrics
- namespaces
- configmaps
verbs:
Expand Down
2 changes: 1 addition & 1 deletion pkg/scraper/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import (

// KubeletMetricsInterface knows how to fetch metrics from the Kubelet
type KubeletMetricsInterface interface {
// GetSummary fetches summary metrics from the given Kubelet
// GetMetrics fetches Resource metrics from the given Kubelet
GetMetrics(ctx context.Context, node *v1.Node) (*storage.MetricsBatch, error)
}
164 changes: 164 additions & 0 deletions pkg/scraper/client/resource/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2018 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resource

import (
"bytes"
"context"
"fmt"
"io"

"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"

"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"

"k8s.io/client-go/rest"
"sigs.k8s.io/metrics-server/pkg/scraper/client"
"sigs.k8s.io/metrics-server/pkg/storage"
"sigs.k8s.io/metrics-server/pkg/utils"

corev1 "k8s.io/api/core/v1"
)

type kubeletClient struct {
defaultPort int
useNodeStatusPort bool
client *http.Client
scheme string
addrResolver utils.NodeAddressResolver
buffers sync.Pool
}

func NewClient(config client.KubeletClientConfig) (*kubeletClient, error) {
transport, err := rest.TransportFor(&config.Client)
if err != nil {
return nil, fmt.Errorf("unable to construct transport: %v", err)
}

c := &http.Client{
Transport: transport,
}
return &kubeletClient{
addrResolver: utils.NewPriorityNodeAddressResolver(config.AddressTypePriority),
defaultPort: config.DefaultPort,
client: c,
scheme: config.Scheme,
useNodeStatusPort: config.UseNodeStatusPort,
buffers: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}, nil
}

var _ client.KubeletMetricsInterface = (*kubeletClient)(nil)

type ErrNotFound struct {
endpoint string
}

func (err *ErrNotFound) Error() string {
return fmt.Sprintf("%q not found", err.endpoint)
}

func (kc *kubeletClient) GetMetrics(ctx context.Context, node *corev1.Node) (*storage.MetricsBatch, error) {
port := kc.defaultPort
nodeStatusPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
if kc.useNodeStatusPort && nodeStatusPort != 0 {
port = nodeStatusPort
}
addr, err := kc.addrResolver.NodeAddress(node)
if err != nil {
return nil, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err)
}
url := url.URL{
Scheme: kc.scheme,
Host: net.JoinHostPort(addr, strconv.Itoa(port)),
Path: "/metrics/resource",
}

req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
client := kc.client
if client == nil {
client = http.DefaultClient
}
resource, err := kc.makeRequest(client, req.WithContext(ctx))
if err != nil {
return nil, err
}
samples, err := decodeProm004(resource)
if err != nil {
return nil, err
}
return decodeBatch(samples, node.Name), err
}

func decodeProm004(resource string) ([]*model.Sample, error) {
dec := expfmt.NewDecoder(strings.NewReader(resource), expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}

var samples []*model.Sample
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return samples, nil
}
return nil, err
}
samples = append(samples, v...)
}
}

func (kc *kubeletClient) makeRequest(client *http.Client, req *http.Request) (string, error) {
response, err := client.Do(req)
if err != nil {
return "", err
}
defer response.Body.Close()
b := kc.getBuffer()
defer kc.returnBuffer(b)
_, err = io.Copy(b, response.Body)
if err != nil {
return "", err
}
if response.StatusCode != http.StatusOK {
return "", fmt.Errorf("request failed - %q.", response.Status)
}
return b.String(), nil
}

func (kc *kubeletClient) getBuffer() *bytes.Buffer {
return kc.buffers.Get().(*bytes.Buffer)
}

func (kc *kubeletClient) returnBuffer(b *bytes.Buffer) {
b.Reset()
kc.buffers.Put(b)
}
105 changes: 105 additions & 0 deletions pkg/scraper/client/resource/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2021 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resource

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/prometheus/common/model"
)

func TestClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Decode Suite")
}

var _ = Describe("Decode", func() {
var (
metricsOrigData string
)
It("should use the decode node result from the CPU", func() {
metricsOrigData = `# HELP node_cpu_usage_seconds_total [ALPHA] Cumulative cpu time consumed by the node in core-seconds
# TYPE node_cpu_usage_seconds_total counter
node_cpu_usage_seconds_total 497090.546884895 1621165414370
`

By("decoding")
samples, err := decodeProm004(metricsOrigData)

By("verifying that the scrape CumulativeCpuUsed and Timestamp are as expected")
Expect(err).Should(BeNil())
Expect(len(samples)).To(Equal(1))
Expect(string(samples[0].Metric[model.MetricNameLabel])).To(Equal("node_cpu_usage_seconds_total"))
Expect(float64(samples[0].Value)).To(Equal(497090.546884895))
Expect(int(samples[0].Timestamp)).To(Equal(1621165414370))
})

It("should use the decode node result from the Memory", func() {
metricsOrigData = `# HELP node_memory_working_set_bytes [ALPHA] Current working set of the node in bytes
# TYPE node_memory_working_set_bytes gauge
node_memory_working_set_bytes 8.391589888e+09 1621165414370
`
By("decoding")
samples, err := decodeProm004(metricsOrigData)

By("verifying that the scrape MemoryUsage and Timestamp are as expected")
Expect(err).Should(BeNil())
Expect(len(samples)).To(Equal(1))
Expect(string(samples[0].Metric[model.MetricNameLabel])).To(Equal("node_memory_working_set_bytes"))
Expect(float64(samples[0].Value)).To(Equal(8.391589888e+09))
Expect(int(samples[0].Timestamp)).To(Equal(1621165414370))
})

It("should use the decode pod result from the CumulativeCpuUsed", func() {
metricsOrigData = `# HELP container_cpu_usage_seconds_total [ALPHA] Cumulative cpu time consumed by the container in core-seconds
# TYPE container_cpu_usage_seconds_total counter
container_cpu_usage_seconds_total{container="container1",namespace="default",pod="pod1"} 15625.474020075 1621165406043
`

By("decoding")
samples, err := decodeProm004(metricsOrigData)

By("verifying that the scrape CumulativeCpuUsed and Timestamp are as expected")
Expect(err).Should(BeNil())
Expect(len(samples)).To(Equal(1))
Expect(string(samples[0].Metric[model.MetricNameLabel])).To(Equal("container_cpu_usage_seconds_total"))
Expect(string(samples[0].Metric["namespace"])).To(Equal("default"))
Expect(string(samples[0].Metric["pod"])).To(Equal("pod1"))
Expect(string(samples[0].Metric["container"])).To(Equal("container1"))
Expect(float64(samples[0].Value)).To(Equal(15625.474020075))
Expect(int(samples[0].Timestamp)).To(Equal(1621165406043))
})

It("should use the decode pod result from the Memory", func() {
metricsOrigData = `# HELP container_memory_working_set_bytes [ALPHA] Current working set of the container in bytes
# TYPE container_memory_working_set_bytes gauge
container_memory_working_set_bytes{container="container1",namespace="default",pod="pod1"} 1.6244736e+07 1621165406043
`
By("decoding")
samples, err := decodeProm004(metricsOrigData)

By("verifying that the scrape MemoryUsage and Timestamp are as expected")
Expect(err).Should(BeNil())
Expect(len(samples)).To(Equal(1))
Expect(string(samples[0].Metric[model.MetricNameLabel])).To(Equal("container_memory_working_set_bytes"))
Expect(string(samples[0].Metric["namespace"])).To(Equal("default"))
Expect(string(samples[0].Metric["pod"])).To(Equal("pod1"))
Expect(string(samples[0].Metric["container"])).To(Equal("container1"))
Expect(float64(samples[0].Value)).To(Equal(1.6244736e+07))
Expect(int(samples[0].Timestamp)).To(Equal(1621165406043))
})
})
Loading

0 comments on commit bd5008e

Please sign in to comment.