Skip to content

Commit

Permalink
WIP: metrics redo
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Romani <[email protected]>
  • Loading branch information
ffromani committed Oct 31, 2023
1 parent 6c3d536 commit ac3f168
Show file tree
Hide file tree
Showing 30 changed files with 2,866 additions and 90 deletions.
11 changes: 8 additions & 3 deletions cmd/resource-topology-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (

"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/config"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/k8shelpers"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/metrics"
metricssrv "github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/metrics/server"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/podres"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/podres/middleware/podexclude"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/podres/middleware/sharedcpuspool"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/podres/middleware/terminalpods"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/prometheus"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/resourcemonitor"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/resourcetopologyexporter"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/version"
Expand Down Expand Up @@ -77,9 +78,13 @@ func main() {
}
}

err = prometheus.InitPrometheus(parsedArgs.RTE.PrometheusMode)
err = metrics.Setup()
if err != nil {
klog.Fatalf("failed to start prometheus server: %v", err)
klog.Fatalf("failed to setup metrics: %v", err)
}
err = metricssrv.Setup(parsedArgs.RTE.MetricsMode, metricssrv.NewDefaultConfig())
if err != nil {
klog.Fatalf("failed to setup metrics server: %v", err)
}

hnd := resourcemonitor.Handle{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/stretchr/testify v1.8.1
go.uber.org/ratelimit v0.2.0
google.golang.org/grpc v1.58.3
gopkg.in/fsnotify.v1 v1.4.7
k8s.io/api v0.27.6
k8s.io/apiextensions-apiserver v0.27.6
k8s.io/apimachinery v0.27.6
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
Expand Down
8 changes: 4 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (

"github.com/k8stopologyawareschedwg/podfingerprint"

metricssrv "github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/metrics/server"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/nrtupdater"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/podres/middleware/podexclude"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/podres/middleware/sharedcpuspool"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/prometheus"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/resourcemonitor"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/resourcetopologyexporter"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/version"
Expand Down Expand Up @@ -87,7 +87,7 @@ func LoadArgs(args ...string) (ProgArgs, error) {

var configPath string
var pfpMethod string
var promMode string
var metricsMode string
flags := flag.NewFlagSet(version.ProgramName, flag.ExitOnError)

klog.InitFlags(flags)
Expand Down Expand Up @@ -115,7 +115,7 @@ func LoadArgs(args ...string) (ProgArgs, error) {
flags.StringVar(&pArgs.RTE.PodResourcesSocketPath, "podresources-socket", "unix:///podresources/kubelet.sock", "Pod Resource Socket path to use.")
flags.BoolVar(&pArgs.RTE.PodReadinessEnable, "podreadiness", true, "Custom condition injection using Podreadiness.")
flags.BoolVar(&pArgs.RTE.AddNRTOwnerEnable, "add-nrt-owner", true, "RTE will inject NRT's related node as OwnerReference to ensure cleanup if the node is deleted.")
flags.StringVar(&promMode, "metrics-mode", prometheus.ServingDisabled, fmt.Sprintf("Select the mode to expose metrics endpoint. Valid options: %s", prometheus.ServingModeSupported()))
flags.StringVar(&metricsMode, "metrics-mode", metricssrv.ServingDisabled, fmt.Sprintf("Select the mode to expose metrics endpoint. Valid options: %s", metricssrv.ServingModeSupported()))

refCnt := flags.String("reference-container", "", "Reference container, used to learn about the shared cpu pool\n See: https://github.com/kubernetes/kubernetes/issues/102190\n format of spec is namespace/podname/containername.\n Alternatively, you can use the env vars REFERENCE_NAMESPACE, REFERENCE_POD_NAME, REFERENCE_CONTAINER_NAME.")

Expand Down Expand Up @@ -153,7 +153,7 @@ Special targets:
pArgs.RTE.ReferenceContainer = sharedcpuspool.ContainerIdentFromEnv()
}

pArgs.RTE.PrometheusMode, err = prometheus.ServingModeIsSupported(promMode)
pArgs.RTE.MetricsMode, err = metricssrv.ServingModeIsSupported(metricsMode)
if err != nil {
return pArgs, err
}
Expand Down
100 changes: 29 additions & 71 deletions pkg/prometheus/prometheus.go → pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,28 @@
package prometheus
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"fmt"
"net/http"
"os"
"strconv"
"strings"

"k8s.io/klog/v2"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const prometheusDefaultPort = "2112"

const (
ServingDisabled = "disabled"
ServingHTTP = "http" // plaintext
)

func ServingModeIsSupported(value string) (string, error) {
val := strings.ToLower(value)
switch val {
case ServingDisabled:
return val, nil
case ServingHTTP:
return val, nil
default:
return val, fmt.Errorf("unsupported method %q", value)
}
}

func ServingModeSupported() string {
modes := []string{
ServingDisabled,
ServingHTTP,
}
return strings.Join(modes, ",")
}

var nodeName string

var (
Expand All @@ -65,19 +47,6 @@ var (
}, []string{"node", "trigger"})
)

func getNodeName() (string, error) {
var err error

val, ok := os.LookupEnv("NODE_NAME")
if !ok {
val, err = os.Hostname()
if err != nil {
return "", err
}
}
return val, nil
}

func UpdateNodeResourceTopologyWritesMetric(operation, trigger string) {
NodeResourceTopologyWrites.With(prometheus.Labels{
"node": nodeName,
Expand Down Expand Up @@ -108,35 +77,24 @@ func UpdateWakeupDelayMetric(trigger string, wakeupDelay float64) {
}).Set(wakeupDelay)
}

func InitPrometheus(mode string) error {
if mode == ServingDisabled {
klog.Infof("prometheus endpoint disabled")
return nil
}

func Setup() error {
var err error
var port = prometheusDefaultPort

if envValue, ok := os.LookupEnv("METRICS_PORT"); ok {
if _, err = strconv.Atoi(envValue); err != nil {
return fmt.Errorf("the env variable PROMETHEUS_PORT has inccorrect value %q: %w", envValue, err)
}
port = envValue
}

nodeName, err = getNodeName()
if err != nil {
return err
}
return nil
}

http.Handle("/metrics", promhttp.Handler())
addr := fmt.Sprintf("0.0.0.0:%s", port)
func getNodeName() (string, error) {
var err error

go func() {
if err = http.ListenAndServe(addr, nil); err != nil {
klog.Fatalf("failed to run prometheus server; %v", err)
val, ok := os.LookupEnv("NODE_NAME")
if !ok {
val, err = os.Hostname()
if err != nil {
return "", err
}
}()

return nil
}
return val, nil
}
136 changes: 136 additions & 0 deletions pkg/metrics/server/checksums.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"bytes"
"crypto/sha256"
"errors"
"io"
"os"

"k8s.io/klog/v2"
)

type checksums struct {
cert []byte
key []byte
authCA []byte
}

func (ck checksums) IsValid() bool {
return len(ck.cert) > 0 && len(ck.key) > 0 && len(ck.authCA) > 0
}

func (ck checksums) Equal(ck2 checksums) bool {
if !bytes.Equal(ck.cert, ck2.cert) {
klog.Infof("cert changed: %x -> %x", ck.cert, ck2.cert)
return false
}
if !bytes.Equal(ck.key, ck2.key) {
klog.Infof("key changed: %x -> %x", ck.key, ck2.key)
return false
}
if !bytes.Equal(ck.authCA, ck2.authCA) {
klog.Infof("auth CA changed: %x -> %x", ck.authCA, ck2.authCA)
return false
}
return true
}

func newChecksums() checksums {
return newChecksumsWithPaths(tlsCert, tlsKey, authCAFile)
}

func allFilesReady() bool {
return allFilesReadyWithPaths(tlsCert, tlsKey, authCAFile)
}

// newChecksumsWithPaths is meant for testing purposes
func newChecksumsWithPaths(tlsCertPath, tlsKeyPath, authCAFilePath string) checksums {
return checksums{
cert: checksumFile(tlsCertPath),
key: checksumFile(tlsKeyPath),
authCA: checksumFile(authCAFilePath),
}
}

// allFilesReadyWithPaths is meant for testing purposes
func allFilesReadyWithPaths(tlsCertPath, tlsKeyPath, authCAFilePath string) bool {
type entrySpec struct {
path string
desc string
}

entries := []entrySpec{
{
path: tlsCertPath,
desc: "TLS cert",
},
{
path: tlsKeyPath,
desc: "TLS key",
},
{
path: authCAFilePath,
desc: "auth CA",
},
}

for _, entry := range entries {
ok, err := fileExistsAndNotEmpty(entry.path)
if err != nil {
klog.Warningf("error checking if changed %s file empty/exists: %v", entry.desc, err)
return false
}
if !ok {
klog.V(2).Infof("%s missing or empty, certificates will not be rotated", entry.desc)
return false
}
}

return true
}

func checksumFile(fName string) []byte {
file, err := os.Open(fName)
if err != nil {
klog.Errorf("failed to open file %v for checksum: %v", fName, err)
return []byte{}
}
defer file.Close()

hash := sha256.New()

if _, err = io.Copy(hash, file); err != nil {
klog.Errorf("failed to compute checksum for file %v: %v", fName, err)
return []byte{}
}

return hash.Sum(nil)
}

func fileExistsAndNotEmpty(fName string) (bool, error) {
fi, err := os.Stat(fName)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return false, nil
}
return false, err
}
return (fi.Size() != 0), nil
}
Loading

0 comments on commit ac3f168

Please sign in to comment.