diff --git a/cmd/main.go b/cmd/main.go index 2dc8e84037..e3ba18f6e5 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,16 +18,14 @@ package main import ( "context" - "net/http" "time" flag "github.com/spf13/pflag" - "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics" logsapi "k8s.io/component-base/logs/api/v1" json "k8s.io/component-base/logs/json" - "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" ) @@ -58,17 +56,9 @@ func main() { }() } - cloud.RegisterMetrics() if options.ServerOptions.HttpEndpoint != "" { - mux := http.NewServeMux() - mux.Handle("/metrics", legacyregistry.HandlerWithReset()) - go func() { - err := http.ListenAndServe(options.ServerOptions.HttpEndpoint, mux) - if err != nil { - klog.ErrorS(err, "failed to listen & serve metrics", "endpoint", options.ServerOptions.HttpEndpoint) - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - }() + r := metrics.InitializeRecorder() + r.InitializeMetricsHandler(options.ServerOptions.HttpEndpoint, "/metrics") } drv, err := driver.NewDriver( diff --git a/pkg/cloud/aws_metrics.go b/pkg/cloud/aws_metrics.go deleted file mode 100644 index 5f48369abd..0000000000 --- a/pkg/cloud/aws_metrics.go +++ /dev/null @@ -1,75 +0,0 @@ -//go:build !providerless -// +build !providerless - -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cloud - -import ( - "sync" - - "k8s.io/component-base/metrics" - "k8s.io/component-base/metrics/legacyregistry" -) - -var ( - awsAPIMetric = metrics.NewHistogramVec( - &metrics.HistogramOpts{ - Name: "cloudprovider_aws_api_request_duration_seconds", - Help: "Latency of AWS API calls", - StabilityLevel: metrics.ALPHA, - }, - []string{"request"}) - - awsAPIErrorMetric = metrics.NewCounterVec( - &metrics.CounterOpts{ - Name: "cloudprovider_aws_api_request_errors", - Help: "AWS API errors", - StabilityLevel: metrics.ALPHA, - }, - []string{"request"}) - - awsAPIThrottlesMetric = metrics.NewCounterVec( - &metrics.CounterOpts{ - Name: "cloudprovider_aws_api_throttled_requests_total", - Help: "AWS API throttled requests", - StabilityLevel: metrics.ALPHA, - }, - []string{"operation_name"}) -) - -func recordAWSMetric(actionName string, timeTaken float64, err error) { - if err != nil { - awsAPIErrorMetric.With(metrics.Labels{"request": actionName}).Inc() - } else { - awsAPIMetric.With(metrics.Labels{"request": actionName}).Observe(timeTaken) - } -} - -func recordAWSThrottlesMetric(operation string) { - awsAPIThrottlesMetric.With(metrics.Labels{"operation_name": operation}).Inc() -} - -var registerOnce sync.Once - -func RegisterMetrics() { - registerOnce.Do(func() { - legacyregistry.MustRegister(awsAPIMetric) - legacyregistry.MustRegister(awsAPIErrorMetric) - legacyregistry.MustRegister(awsAPIThrottlesMetric) - }) -} diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 30baee8b37..aa493497f5 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -245,7 +245,6 @@ var _ Cloud = &cloud{} // NewCloud returns a new instance of AWS cloud // It panics if session is invalid func NewCloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Cloud, error) { - RegisterMetrics() return newEC2Cloud(region, awsSdkDebugLog, userAgentExtra) } diff --git a/pkg/cloud/handlers.go b/pkg/cloud/handlers.go index d90797fde4..b1ffef26d3 100644 --- a/pkg/cloud/handlers.go +++ b/pkg/cloud/handlers.go @@ -20,19 +20,32 @@ import ( "time" "github.com/aws/aws-sdk-go/aws/request" - + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics" "k8s.io/klog/v2" ) -// RecordRequestsComplete is added to the Complete chain; called after any request +// RecordRequestsHandler is added to the Complete chain; called after any request func RecordRequestsHandler(r *request.Request) { - recordAWSMetric(operationName(r), time.Since(r.Time).Seconds(), r.Error) + labels := map[string]string{ + "request": operationName(r), + } + + if r.Error != nil { + metrics.Recorder().IncreaseCount("cloudprovider_aws_api_request_errors", labels) + } else { + duration := time.Since(r.Time).Seconds() + metrics.Recorder().ObserveHistogram("cloudprovider_aws_api_request_duration_seconds", duration, labels, nil) + } } -// RecordThrottlesAfterRetry is added to the AfterRetry chain; called after any error +// RecordThrottledRequestsHandler is added to the AfterRetry chain; called after any error func RecordThrottledRequestsHandler(r *request.Request) { + labels := map[string]string{ + "operation_name": operationName(r), + } + if r.IsErrorThrottle() { - recordAWSThrottlesMetric(operationName(r)) + metrics.Recorder().IncreaseCount("cloudprovider_aws_api_throttled_requests_total", labels) klog.InfoS("Got RequestLimitExceeded error on AWS request", "request", describeRequest(r)) } } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000000..4ad4aab788 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,149 @@ +package metrics + +import ( + "net/http" + "sync" + "time" + + "k8s.io/component-base/metrics" + "k8s.io/klog/v2" +) + +var ( + r *metricRecorder // singleton instance of metricRecorder + once sync.Once +) + +type metricRecorder struct { + registry metrics.KubeRegistry + metrics map[string]interface{} +} + +// Recorder returns the singleton instance of metricRecorder. +// nil is returned if the recorder is not initialized. +func Recorder() *metricRecorder { + return r +} + +// InitializeRecorder initializes a new metricRecorder instance if it hasn't been initialized. +func InitializeRecorder() *metricRecorder { + once.Do(func() { + r = &metricRecorder{ + registry: metrics.NewKubeRegistry(), + metrics: make(map[string]interface{}), + } + }) + return r +} + +// IncreaseCount increases the counter metric by 1. +func (m *metricRecorder) IncreaseCount(name string, labels map[string]string) { + if m == nil { + return // recorder is not initialized + } + + metric, ok := m.metrics[name] + + if !ok { + klog.V(4).InfoS("Metric not found, registering", "name", name, "labels", labels) + m.registerCounterVec(name, "ebs_csi_aws_com metric", getLabelNames(labels)) + m.IncreaseCount(name, labels) + return + } + + metric.(*metrics.CounterVec).With(metrics.Labels(labels)).Inc() +} + +// ObserveHistogram records the given value in the histogram metric. +func (m *metricRecorder) ObserveHistogram(name string, value float64, labels map[string]string, buckets []float64) { + if m == nil { + return // recorder is not initialized + } + metric, ok := m.metrics[name] + + if !ok { + klog.V(4).InfoS("Metric not found, registering", "name", name, "labels", labels, "buckets", buckets) + m.registerHistogramVec(name, "ebs_csi_aws_com metric", getLabelNames(labels), buckets) + m.ObserveHistogram(name, value, labels, buckets) + return + } + + metric.(*metrics.HistogramVec).With(metrics.Labels(labels)).Observe(value) +} + +// InitializeMetricsHandler starts a new HTTP server to expose the metrics. +func (m *metricRecorder) InitializeMetricsHandler(address, path string) { + if m == nil { + klog.InfoS("InitializeMetricsHandler: metric recorder is not initialized") + return + } + + mux := http.NewServeMux() + mux.Handle(path, metrics.HandlerFor( + m.registry, + metrics.HandlerOpts{ + ErrorHandling: metrics.ContinueOnError, + })) + + server := &http.Server{ + Addr: address, + Handler: mux, + ReadTimeout: 3 * time.Second, + } + + go func() { + klog.InfoS("Metric server listening", "address", address, "path", path) + + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + klog.ErrorS(err, "Failed to start metric server", "address", address, "path", path) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + }() +} + +func (m *metricRecorder) registerHistogramVec(name, help string, labels []string, buckets []float64) { + if _, exists := m.metrics[name]; exists { + return + } + histogram := createHistogramVec(name, help, labels, buckets) + m.metrics[name] = histogram + m.registry.MustRegister(histogram) +} + +func (m *metricRecorder) registerCounterVec(name, help string, labels []string) { + if _, exists := m.metrics[name]; exists { + return + } + counter := createCounterVec(name, help, labels) + m.metrics[name] = counter + m.registry.MustRegister(counter) +} + +func createHistogramVec(name, help string, labels []string, buckets []float64) *metrics.HistogramVec { + opts := &metrics.HistogramOpts{ + Name: name, + Help: help, + StabilityLevel: metrics.ALPHA, + Buckets: buckets, + } + return metrics.NewHistogramVec(opts, labels) +} + +func createCounterVec(name, help string, labels []string) *metrics.CounterVec { + return metrics.NewCounterVec( + &metrics.CounterOpts{ + Name: name, + Help: help, + StabilityLevel: metrics.ALPHA, + }, + labels, + ) +} + +func getLabelNames(labels map[string]string) []string { + names := make([]string, 0, len(labels)) + for n := range labels { + names = append(names, n) + } + return names +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go new file mode 100644 index 0000000000..27b9d4f0eb --- /dev/null +++ b/pkg/metrics/metrics_test.go @@ -0,0 +1,95 @@ +package metrics + +import ( + "strings" + "testing" + + "k8s.io/component-base/metrics/testutil" +) + +func TestMetricRecorder(t *testing.T) { + tests := []struct { + name string + exec func(m *metricRecorder) + expected string + recorder bool + }{ + { + name: "TestMetricRecorder: IncreaseCounterMetric", + exec: func(m *metricRecorder) { + m.IncreaseCount("test_counter", map[string]string{"key": "value"}) + }, + expected: ` + # HELP test_counter ebs_csi_aws_com metric + # TYPE test_counter counter + test_counter{key="value"} 1 + `, + recorder: true, + }, + { + name: "TestMetricRecorder: ObserveHistogramMetric", + exec: func(m *metricRecorder) { + m.ObserveHistogram("test_histogram", 1.5, map[string]string{"key": "value"}, []float64{1, 2, 3}) + }, + expected: ` + # HELP test_histogram ebs_csi_aws_com metric + # TYPE test_histogram histogram + test_histogram_bucket{key="value",le="1"} 0 + test_histogram_bucket{key="value",le="2"} 1 + test_histogram_bucket{key="value",le="3"} 1 + test_histogram_sum{key="value"} 1.5 + test_histogram_count{key="value"} 1 + `, + recorder: true, + }, + { + name: "TestMetricRecorder: Re-register metric", + exec: func(m *metricRecorder) { + m.IncreaseCount("test_re_register_counter", map[string]string{"key": "value1"}) + m.registerCounterVec("test_re_register_counter", "ebs_csi_aws_com metric", []string{"key"}) + m.IncreaseCount("test_re_register_counter", map[string]string{"key": "value1"}) + m.IncreaseCount("test_re_register_counter", map[string]string{"key": "value2"}) + }, + expected: ` + # HELP test_re_register_counter ebs_csi_aws_com metric + # TYPE test_re_register_counter counter + test_re_register_counter{key="value1"} 2 + test_re_register_counter{key="value2"} 1 + `, + recorder: true, + }, + { + name: "TestMetricRecorder: Recorder not initialized", + exec: func(m *metricRecorder) { + m.IncreaseCount("test_not_initialized_counter", map[string]string{"key": "value"}) + }, + expected: ``, + recorder: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.recorder { + InitializeRecorder() + } + m := Recorder() + + tt.exec(m) + + if err := testutil.GatherAndCompare(m.registry, strings.NewReader(tt.expected), getMetricNameFromExpected(tt.expected)); err != nil { + t.Fatal(err) + } + }) + } +} + +func getMetricNameFromExpected(expected string) string { + lines := strings.Split(expected, "\n") + for _, line := range lines { + if strings.Contains(line, "{") { + return strings.Split(line, "{")[0] + } + } + return "" +}