From be8886e61cbe019e4636beed602a4b9eb8ed4dc9 Mon Sep 17 00:00:00 2001 From: kevin Date: Sun, 25 Dec 2022 20:14:17 +0800 Subject: [PATCH 1/6] Add scaler metric latency support Add scaler metric latency support Signed-off-by: kevin --- pkg/prommetrics/adapter/adapter_prommetrics.go | 15 +++++++++++++++ pkg/prommetrics/prommetrics.go | 15 +++++++++++++++ pkg/provider/provider.go | 4 ++++ pkg/scaling/scale_handler.go | 3 +++ 4 files changed, 37 insertions(+) diff --git a/pkg/prommetrics/adapter/adapter_prommetrics.go b/pkg/prommetrics/adapter/adapter_prommetrics.go index fddcc7066e5..f58fb4174cf 100644 --- a/pkg/prommetrics/adapter/adapter_prommetrics.go +++ b/pkg/prommetrics/adapter/adapter_prommetrics.go @@ -45,6 +45,15 @@ var ( }, metricLabels, ) + scalerMetricsLatency = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "keda_metrics_adapter", + Subsystem: "scaler", + Name: "metrics_latency", + Help: "Scaler Metrics Latency", + }, + metricLabels, + ) scalerErrors = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "keda_metrics_adapter", @@ -74,6 +83,7 @@ func init() { registry = prometheus.NewRegistry() registry.MustRegister(scalerErrorsTotal) registry.MustRegister(scalerMetricsValue) + registry.MustRegister(scalerMetricsLatency) registry.MustRegister(scalerErrors) registry.MustRegister(scaledObjectErrors) } @@ -104,6 +114,11 @@ func (metricsServer PrometheusMetricServer) RecordHPAScalerMetric(namespace stri scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) } +// RecordHPAScalerLatency create a measurement of the latency to external metric +func (metricsServer PrometheusMetricServer) RecordHPAScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) { + scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) +} + // RecordHPAScalerError counts the number of errors occurred in trying get an external metric used by the HPA func (metricsServer PrometheusMetricServer) RecordHPAScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { if err != nil { diff --git a/pkg/prommetrics/prommetrics.go b/pkg/prommetrics/prommetrics.go index 193128ef29c..a6e154ee1c5 100644 --- a/pkg/prommetrics/prommetrics.go +++ b/pkg/prommetrics/prommetrics.go @@ -55,6 +55,15 @@ var ( }, metricLabels, ) + scalerMetricsLatency = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaler", + Name: "metrics_latency", + Help: "Scaler Metrics Latency", + }, + metricLabels, + ) scalerErrors = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: DefaultPromMetricsNamespace, @@ -96,6 +105,7 @@ var ( func init() { metrics.Registry.MustRegister(scalerErrorsTotal) metrics.Registry.MustRegister(scalerMetricsValue) + metrics.Registry.MustRegister(scalerMetricsLatency) metrics.Registry.MustRegister(scalerErrors) metrics.Registry.MustRegister(scaledObjectErrors) @@ -108,6 +118,11 @@ func RecordScalerMetric(namespace string, scaledObject string, scaler string, sc scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) } +// RecordScalerLatency create a measurement of the latency to external metric +func RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) { + scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) +} + // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA func RecordScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { if err != nil { diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 7241bb6a14e..128a990e321 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/go-logr/logr" apiErrors "k8s.io/apimachinery/pkg/api/errors" @@ -166,7 +167,10 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } // Filter only the desired metric if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { + startTime := time.Now() metrics, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric) + scalerLatency := time.Since(startTime).Milliseconds() + promMetricsServer.RecordHPAScalerLatency(namespace, scaledObject.Name, scalerName, scalerIndex, info.Metric, float64(scalerLatency)) metrics, err = fallback.GetMetricsWithFallback(ctx, p.client, logger, metrics, err, info.Metric, scaledObject, metricSpec) if err != nil { scalerError = true diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 9b87b46790d..3fdceb7e9cf 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -440,7 +440,10 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } if !metricsFoundInCache { + startTime := time.Now() metrics, err = cache.GetMetricsForScaler(ctx, scalerIndex, metricName) + scalerLatency := time.Since(startTime).Milliseconds() + prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(scalerLatency)) h.logger.V(1).Info("Getting metrics from scaler", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName, "metricName", metricSpec.External.Metric.Name, "metrics", metrics, "scalerError", err) } metrics, err = fallback.GetMetricsWithFallback(ctx, h.client, h.logger, metrics, err, metricName, scaledObject, metricSpec) From 0ca2891d39076792a7a1d36f82ce53cc87616cf3 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 6 Jan 2023 16:48:01 +0800 Subject: [PATCH 2/6] Add e2e test Add e2e test Signed-off-by: kevin --- .../adapter/adapter_prommetrics.go | 15 ------------ pkg/provider/provider.go | 4 ---- .../prometheus_metrics_test.go | 24 +++++++++++++++++++ 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/pkg/prommetrics/adapter/adapter_prommetrics.go b/pkg/prommetrics/adapter/adapter_prommetrics.go index f58fb4174cf..fddcc7066e5 100644 --- a/pkg/prommetrics/adapter/adapter_prommetrics.go +++ b/pkg/prommetrics/adapter/adapter_prommetrics.go @@ -45,15 +45,6 @@ var ( }, metricLabels, ) - scalerMetricsLatency = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "keda_metrics_adapter", - Subsystem: "scaler", - Name: "metrics_latency", - Help: "Scaler Metrics Latency", - }, - metricLabels, - ) scalerErrors = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "keda_metrics_adapter", @@ -83,7 +74,6 @@ func init() { registry = prometheus.NewRegistry() registry.MustRegister(scalerErrorsTotal) registry.MustRegister(scalerMetricsValue) - registry.MustRegister(scalerMetricsLatency) registry.MustRegister(scalerErrors) registry.MustRegister(scaledObjectErrors) } @@ -114,11 +104,6 @@ func (metricsServer PrometheusMetricServer) RecordHPAScalerMetric(namespace stri scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) } -// RecordHPAScalerLatency create a measurement of the latency to external metric -func (metricsServer PrometheusMetricServer) RecordHPAScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) { - scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) -} - // RecordHPAScalerError counts the number of errors occurred in trying get an external metric used by the HPA func (metricsServer PrometheusMetricServer) RecordHPAScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { if err != nil { diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 128a990e321..7241bb6a14e 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -21,7 +21,6 @@ import ( "fmt" "strings" "sync" - "time" "github.com/go-logr/logr" apiErrors "k8s.io/apimachinery/pkg/api/errors" @@ -167,10 +166,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } // Filter only the desired metric if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { - startTime := time.Now() metrics, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric) - scalerLatency := time.Since(startTime).Milliseconds() - promMetricsServer.RecordHPAScalerLatency(namespace, scaledObject.Name, scalerName, scalerIndex, info.Metric, float64(scalerLatency)) metrics, err = fallback.GetMetricsWithFallback(ctx, p.client, logger, metrics, err, info.Metric, scaledObject, metricSpec) if err != nil { scalerError = true diff --git a/tests/internals/prometheus_metrics/prometheus_metrics_test.go b/tests/internals/prometheus_metrics/prometheus_metrics_test.go index 24e142e7658..ec0de8c3a55 100644 --- a/tests/internals/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/internals/prometheus_metrics/prometheus_metrics_test.go @@ -225,6 +225,7 @@ func TestScaler(t *testing.T) { "replica count should be 2 after 2 minute") testScalerMetricValue(t) + testScalerMetricLatency(t) testMetricsServerScalerMetricValue(t) testOperatorMetrics(t, kc, data) @@ -286,6 +287,29 @@ func testScalerMetricValue(t *testing.T) { } } +func testScalerMetricLatency(t *testing.T) { + t.Log("--- testing scaler metric latency ---") + + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + + if val, ok := family["keda_scaler_metrics_latency"]; ok { + var found bool + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "scaledObject" && *label.Value == scaledObjectName { + assert.Equal(t, float64(0), *metric.Gauge.Value) + found = true + } + } + } + assert.Equal(t, true, found) + } else { + t.Errorf("metric not available") + } +} + // [DEPRECATED] handle exporting Prometheus metrics from Operator to Metrics Server func testMetricsServerScalerMetricValue(t *testing.T) { t.Log("--- testing scaler metric value in metrics server ---") From fa1858200a76d48fde913793740e69304edd829f Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 6 Jan 2023 17:09:41 +0800 Subject: [PATCH 3/6] Add changelog & fix static checks Add changelog & fix static checks Signed-off-by: kevin --- CHANGELOG.md | 1 + .../prometheus_metrics/prometheus_metrics_test.go | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84da4a92191..0136d27f2bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio Here is an overview of all **stable** additions: +- **Prometheus Metrics**: Introduce scaler latency in Prometheus metrics. ([#4037](https://github.com/kedacore/keda/issues/4037)) - **General**: Introduce new ArangoDB Scaler ([#4000](https://github.com/kedacore/keda/issues/4000)) Here is an overview of all new **experimental** features: diff --git a/tests/internals/prometheus_metrics/prometheus_metrics_test.go b/tests/internals/prometheus_metrics/prometheus_metrics_test.go index ec0de8c3a55..ef8ca1c7310 100644 --- a/tests/internals/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/internals/prometheus_metrics/prometheus_metrics_test.go @@ -21,6 +21,7 @@ import ( const ( testName = "prometheus-metrics-test" + labelScaledObject = "scaledObject" ) var ( @@ -275,7 +276,7 @@ func testScalerMetricValue(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == "scaledObject" && *label.Value == scaledObjectName { + if *label.Name == labelScaledObject && *label.Value == scaledObjectName { assert.Equal(t, float64(4), *metric.Gauge.Value) found = true } @@ -298,7 +299,7 @@ func testScalerMetricLatency(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == "scaledObject" && *label.Value == scaledObjectName { + if *label.Name == labelScaledObject && *label.Value == scaledObjectName { assert.Equal(t, float64(0), *metric.Gauge.Value) found = true } @@ -322,7 +323,7 @@ func testMetricsServerScalerMetricValue(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == "scaledObject" && *label.Value == scaledObjectName { + if *label.Name == labelScaledObject && *label.Value == scaledObjectName { assert.Equal(t, float64(4), *metric.Gauge.Value) found = true } From fad3925f99877076fb1cf25de1132ee81869ffb3 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 6 Jan 2023 17:58:04 +0800 Subject: [PATCH 4/6] Fix scaled_object_errors Fix scaled_object_errors Signed-off-by: kevin --- pkg/prommetrics/prommetrics.go | 2 +- tests/internals/prometheus_metrics/prometheus_metrics_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/prommetrics/prommetrics.go b/pkg/prommetrics/prommetrics.go index a6e154ee1c5..9303d9439c1 100644 --- a/pkg/prommetrics/prommetrics.go +++ b/pkg/prommetrics/prommetrics.go @@ -76,7 +76,7 @@ var ( scaledObjectErrors = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: DefaultPromMetricsNamespace, - Subsystem: "scaled", + Subsystem: "scaled_object", Name: "errors", Help: "Number of scaled object errors", }, diff --git a/tests/internals/prometheus_metrics/prometheus_metrics_test.go b/tests/internals/prometheus_metrics/prometheus_metrics_test.go index ef8ca1c7310..6fd48587078 100644 --- a/tests/internals/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/internals/prometheus_metrics/prometheus_metrics_test.go @@ -20,7 +20,7 @@ import ( ) const ( - testName = "prometheus-metrics-test" + testName = "prometheus-metrics-test" labelScaledObject = "scaledObject" ) From c62d21ad0e2090d46d9315672122b38bd1100a4f Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 6 Jan 2023 18:15:05 +0800 Subject: [PATCH 5/6] move the logic for measuring latency inside cache.GetMetricsForScaler() move the logic for measuring latency inside cache.GetMetricsForScaler() Signed-off-by: kevin --- pkg/provider/provider.go | 2 +- pkg/scaling/cache/scalers_cache.go | 16 +++++++++------- pkg/scaling/scale_handler.go | 8 ++++---- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 7241bb6a14e..f529f585dd1 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -166,7 +166,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } // Filter only the desired metric if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { - metrics, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric) + metrics, _, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric) metrics, err = fallback.GetMetricsWithFallback(ctx, p.client, logger, metrics, err, info.Metric, scaledObject, metricSpec) if err != nil { scalerError = true diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index 813ed7b4bfc..35dd2e2c717 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "time" v2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" @@ -69,24 +70,25 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler { return result } -// GetMetricsForScaler returns metric value for a scaler identified by the metric name +// GetMetricsForScaler returns metric value and latency for a scaler identified by the metric name // and by the input index (from the list of scalers in this ScaledObject) -func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, error) { +func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, int64, error) { if index < 0 || index >= len(c.Scalers) { - return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) + return nil, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) } + startTime := time.Now() m, _, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName) if err == nil { - return m, nil + return m, time.Since(startTime).Milliseconds(), nil } ns, err := c.refreshScaler(ctx, index) if err != nil { - return nil, err + return nil, -1, err } - + startTime = time.Now() m, _, err = ns.GetMetricsAndActivity(ctx, metricName) - return m, err + return m, time.Since(startTime).Milliseconds(), err } // GetScaledObjectState returns whether the input ScaledObject is active as a first parameters, diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 3fdceb7e9cf..0a4da0b771b 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -440,10 +440,10 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } if !metricsFoundInCache { - startTime := time.Now() - metrics, err = cache.GetMetricsForScaler(ctx, scalerIndex, metricName) - scalerLatency := time.Since(startTime).Milliseconds() - prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(scalerLatency)) + metrics, latency, err := cache.GetMetricsForScaler(ctx, scalerIndex, metricName) + if latency != -1 { + prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) + } h.logger.V(1).Info("Getting metrics from scaler", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName, "metricName", metricSpec.External.Metric.Name, "metrics", metrics, "scalerError", err) } metrics, err = fallback.GetMetricsWithFallback(ctx, h.client, h.logger, metrics, err, metricName, scaledObject, metricSpec) From e4ea07ed5b1034c4fbe0fd15110322ec68a78ab7 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 6 Jan 2023 18:56:22 +0800 Subject: [PATCH 6/6] fix ut fix ut Signed-off-by: kevin --- pkg/scaling/scale_handler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 0a4da0b771b..67141aea3e0 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -440,7 +440,8 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } if !metricsFoundInCache { - metrics, latency, err := cache.GetMetricsForScaler(ctx, scalerIndex, metricName) + var latency int64 + metrics, latency, err = cache.GetMetricsForScaler(ctx, scalerIndex, metricName) if latency != -1 { prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) }