Skip to content

Commit

Permalink
Implement using /metrics/resource Kubelet endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: JunYang <[email protected]>
  • Loading branch information
yangjunmyfm192085 committed May 29, 2021
1 parent d766094 commit c6398d6
Show file tree
Hide file tree
Showing 24 changed files with 946 additions and 1,782 deletions.
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,6 @@ verify-generated: update-generated

.PHONY: update-generated
update-generated:
# pkg/scraper/client/summary/types_easyjson.go:
go install -mod=readonly github.com/mailru/easyjson/easyjson
$(GOPATH)/bin/easyjson -all pkg/scraper/client/summary/types.go
# pkg/api/generated/openapi/zz_generated.openapi.go
go install -mod=readonly k8s.io/kube-openapi/cmd/openapi-gen
$(GOPATH)/bin/openapi-gen --logtostderr -i k8s.io/metrics/pkg/apis/metrics/v1beta1,k8s.io/apimachinery/pkg/apis/meta/v1,k8s.io/apimachinery/pkg/api/resource,k8s.io/apimachinery/pkg/version -p pkg/api/generated/openapi/ -O zz_generated.openapi -o $(REPO_DIR) -h $(REPO_DIR)/scripts/boilerplate.go.txt -r /dev/null
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/go-openapi/spec v0.20.3
github.com/google/addlicense v0.0.0-20210428195630-6d92264d7170
github.com/google/go-cmp v0.5.5
github.com/mailru/easyjson v0.7.7
github.com/mailru/easyjson v0.7.7 // indirect
github.com/onsi/ginkgo v1.13.0
github.com/onsi/gomega v1.11.0
github.com/prometheus/common v0.25.0
Expand All @@ -21,7 +21,6 @@ require (
k8s.io/klog/hack/tools v0.0.0-20210512110738-02ca14bed863
k8s.io/klog/v2 v2.8.0
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7
k8s.io/kubelet v0.21.1
k8s.io/metrics v0.21.1
sigs.k8s.io/mdtoc v1.0.1
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,6 @@ k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts=
k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 h1:vEx13qjvaZ4yfObSSXW7BrMc/KQBBT/Jyee8XtLf4x0=
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE=
k8s.io/kubelet v0.21.1 h1:JeZsCr3GN2Kjg3gn21jLU10RFu0APUK/vdpFWa8P8Nw=
k8s.io/kubelet v0.21.1/go.mod h1:poOR6Iaa5WqytFOp0egXFV8c2XTLFxaXTdj5njUlnVY=
k8s.io/metrics v0.21.1 h1:Xlfrjdda/WWHxG6/h6ACykxb1RByy5EIT862Vc81IYQ=
k8s.io/metrics v0.21.1/go.mod h1:pyDVLsLe++FIGDBFU80NcW4xMFsuiVTWL8Zfi7+PpNo=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw=
Expand Down
2 changes: 1 addition & 1 deletion manifests/base/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ rules:
resources:
- pods
- nodes
- nodes/stats
- nodes/metrics
- namespaces
- configmaps
verbs:
Expand Down
2 changes: 1 addition & 1 deletion pkg/scraper/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import (

// KubeletMetricsInterface knows how to fetch metrics from the Kubelet
type KubeletMetricsInterface interface {
// GetSummary fetches summary metrics from the given Kubelet
// GetMetrics fetches Resource metrics from the given Kubelet
GetMetrics(ctx context.Context, node *v1.Node) (*storage.MetricsBatch, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package summary
package resource

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"

"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"

"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"

"k8s.io/client-go/rest"
"sigs.k8s.io/metrics-server/pkg/scraper/client"

"sigs.k8s.io/metrics-server/pkg/storage"

"github.com/mailru/easyjson"
"sigs.k8s.io/metrics-server/pkg/utils"

corev1 "k8s.io/api/core/v1"

"sigs.k8s.io/metrics-server/pkg/utils"
)

type kubeletClient struct {
defaultPort int
useNodeStatusPort bool
client *http.Client
scheme string
addrResolver utils.NodeAddressResolver
buffers sync.Pool
}

func NewClient(config client.KubeletClientConfig) (*kubeletClient, error) {
transport, err := rest.TransportFor(&config.Client)
if err != nil {
Expand All @@ -60,42 +71,12 @@ func NewClient(config client.KubeletClientConfig) (*kubeletClient, error) {
}, nil
}

type kubeletClient struct {
defaultPort int
useNodeStatusPort bool
client *http.Client
scheme string
addrResolver utils.NodeAddressResolver
buffers sync.Pool
}

var _ client.KubeletMetricsInterface = (*kubeletClient)(nil)

func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.Request, value easyjson.Unmarshaler) error {
// TODO(directxman12): support validating certs by hostname
response, err := client.Do(req)
if err != nil {
return err
}
defer response.Body.Close()
b := kc.getBuffer()
defer kc.returnBuffer(b)
_, err = io.Copy(b, response.Body)
if err != nil {
return err
}
body := b.Bytes()
if response.StatusCode != http.StatusOK {
return fmt.Errorf("GET %q: bad status code %q", req.URL, response.Status)
}

err = easyjson.Unmarshal(body, value)
if err != nil {
return fmt.Errorf("GET %q: failed to parse output: %w", req.URL, err)
}
return nil
}

//GetMetrics get metrics from kubelet/metrics/resource endpoint
//
//1. Send request, fetch data from kubelet/metrics/resource endpoint
//2. Use prometheus decoder to decode data
func (kc *kubeletClient) GetMetrics(ctx context.Context, node *corev1.Node) (*storage.MetricsBatch, error) {
port := kc.defaultPort
nodeStatusPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
Expand All @@ -104,33 +85,64 @@ func (kc *kubeletClient) GetMetrics(ctx context.Context, node *corev1.Node) (*st
}
addr, err := kc.addrResolver.NodeAddress(node)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to extract connection information for node %q: %v", node.Name, err)
}
url := url.URL{
Scheme: kc.scheme,
Host: net.JoinHostPort(addr, strconv.Itoa(port)),
Path: "/stats/summary",
RawQuery: "only_cpu_and_memory=true",
Scheme: kc.scheme,
Host: net.JoinHostPort(addr, strconv.Itoa(port)),
Path: "/metrics/resource",
}

req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
summary := &Summary{}
client := kc.client
if client == nil {
client = http.DefaultClient
//1. Send request, fetch data from kubelet/metrics/resource endpoint
resource, err := kc.makeRequest(kc.client, req.WithContext(ctx))
if err != nil {
return nil, err
}
//2. Use prometheus decoder to decode data
samples, err := decodeProm004(resource)
if err != nil {
return nil, err
}
err = kc.makeRequestAndGetValue(client, req.WithContext(ctx), summary)
return decodeBatch(summary), err
return decodeBatch(samples, node.Name), err
}

func (kc *kubeletClient) getBuffer() *bytes.Buffer {
return kc.buffers.Get().(*bytes.Buffer)
func decodeProm004(payload string) ([]*model.Sample, error) {
dec := expfmt.NewDecoder(strings.NewReader(payload), expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}

var samples []*model.Sample
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return samples, nil
}
return nil, err
}
samples = append(samples, v...)
}
}

func (kc *kubeletClient) returnBuffer(b *bytes.Buffer) {
b.Reset()
kc.buffers.Put(b)
func (kc *kubeletClient) makeRequest(client *http.Client, req *http.Request) (string, error) {
response, err := client.Do(req)
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return "", fmt.Errorf("request failed - %q.", response.Status)
}
b, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(b), nil
}
105 changes: 105 additions & 0 deletions pkg/scraper/client/resource/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2021 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resource

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/prometheus/common/model"
)

func TestClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Decode Suite")
}

var _ = Describe("Decode", func() {
var (
metricsOrigData string
)
It("should use the decode node result from the CPU", func() {
metricsOrigData = `# HELP node_cpu_usage_seconds_total [ALPHA] Cumulative cpu time consumed by the node in core-seconds
# TYPE node_cpu_usage_seconds_total counter
node_cpu_usage_seconds_total 497090.546884895 1621165414370
`

By("decoding")
samples, err := decodeProm004(metricsOrigData)

By("verifying that the scrape CumulativeCpuUsed and Timestamp are as expected")
Expect(err).Should(BeNil())
Expect(len(samples)).To(Equal(1))
Expect(string(samples[0].Metric[model.MetricNameLabel])).To(Equal("node_cpu_usage_seconds_total"))
Expect(float64(samples[0].Value)).To(Equal(497090.546884895))
Expect(int(samples[0].Timestamp)).To(Equal(1621165414370))
})

It("should use the decode node result from the Memory", func() {
metricsOrigData = `# HELP node_memory_working_set_bytes [ALPHA] Current working set of the node in bytes
# TYPE node_memory_working_set_bytes gauge
node_memory_working_set_bytes 8.391589888e+09 1621165414370
`
By("decoding")
samples, err := decodeProm004(metricsOrigData)

By("verifying that the scrape MemoryUsage and Timestamp are as expected")
Expect(err).Should(BeNil())
Expect(len(samples)).To(Equal(1))
Expect(string(samples[0].Metric[model.MetricNameLabel])).To(Equal("node_memory_working_set_bytes"))
Expect(float64(samples[0].Value)).To(Equal(8.391589888e+09))
Expect(int(samples[0].Timestamp)).To(Equal(1621165414370))
})

It("should use the decode pod result from the CumulativeCpuUsed", func() {
metricsOrigData = `# HELP container_cpu_usage_seconds_total [ALPHA] Cumulative cpu time consumed by the container in core-seconds
# TYPE container_cpu_usage_seconds_total counter
container_cpu_usage_seconds_total{container="container1",namespace="default",pod="pod1"} 15625.474020075 1621165406043
`

By("decoding")
samples, err := decodeProm004(metricsOrigData)

By("verifying that the scrape CumulativeCpuUsed and Timestamp are as expected")
Expect(err).Should(BeNil())
Expect(len(samples)).To(Equal(1))
Expect(string(samples[0].Metric[model.MetricNameLabel])).To(Equal("container_cpu_usage_seconds_total"))
Expect(string(samples[0].Metric["namespace"])).To(Equal("default"))
Expect(string(samples[0].Metric["pod"])).To(Equal("pod1"))
Expect(string(samples[0].Metric["container"])).To(Equal("container1"))
Expect(float64(samples[0].Value)).To(Equal(15625.474020075))
Expect(int(samples[0].Timestamp)).To(Equal(1621165406043))
})

It("should use the decode pod result from the Memory", func() {
metricsOrigData = `# HELP container_memory_working_set_bytes [ALPHA] Current working set of the container in bytes
# TYPE container_memory_working_set_bytes gauge
container_memory_working_set_bytes{container="container1",namespace="default",pod="pod1"} 1.6244736e+07 1621165406043
`
By("decoding")
samples, err := decodeProm004(metricsOrigData)

By("verifying that the scrape MemoryUsage and Timestamp are as expected")
Expect(err).Should(BeNil())
Expect(len(samples)).To(Equal(1))
Expect(string(samples[0].Metric[model.MetricNameLabel])).To(Equal("container_memory_working_set_bytes"))
Expect(string(samples[0].Metric["namespace"])).To(Equal("default"))
Expect(string(samples[0].Metric["pod"])).To(Equal("pod1"))
Expect(string(samples[0].Metric["container"])).To(Equal("container1"))
Expect(float64(samples[0].Value)).To(Equal(1.6244736e+07))
Expect(int(samples[0].Timestamp)).To(Equal(1621165406043))
})
})
Loading

0 comments on commit c6398d6

Please sign in to comment.