diff --git a/controllers/keda/hpa_test.go b/controllers/keda/hpa_test.go index e3ac8892ceb..898587eb8cd 100644 --- a/controllers/keda/hpa_test.go +++ b/controllers/keda/hpa_test.go @@ -133,8 +133,8 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc scalersCache := cache.ScalersCache{ Scalers: []cache.ScalerBuilder{{ Scaler: scaler, - Factory: func() (scalers.Scaler, error) { - return scaler, nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return scaler, &scalers.ScalerConfig{}, nil }, }}, Logger: logr.Discard(), diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index 90678609ae9..1a928a533d0 100644 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -246,6 +246,11 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg return "ScaledObject doesn't have correct Idle/Min/Max Replica Counts specification", err } + err = r.checkTriggerNamesAreUnique(scaledObject) + if err != nil { + return "ScaledObject doesn't have correct triggers specification", err + } + // Create a new HPA or update existing one according to ScaledObject newHPACreated, err := r.ensureHPAForScaledObjectExists(ctx, logger, scaledObject, &gvkr) if err != nil { @@ -357,6 +362,27 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte return gvkr, nil } +// checkTriggerNamesAreUnique checks that all triggerNames in ScaledObject are unique +func (r *ScaledObjectReconciler) checkTriggerNamesAreUnique(scaledObject *kedav1alpha1.ScaledObject) error { + triggersCount := len(scaledObject.Spec.Triggers) + + if triggersCount > 1 { + triggerNames := make(map[string]bool, triggersCount) + for i := 0; i < triggersCount; i++ { + name := scaledObject.Spec.Triggers[i].Name + if name != "" { + if _, found := triggerNames[name]; found { + // found duplicate name + return fmt.Errorf("triggerName=%s is defined multiple times in the ScaledObject, but it must be unique", name) + } + triggerNames[name] = true + } + } + } + + return nil +} + // checkReplicaCountBoundsAreValid checks that Idle/Min/Max ReplicaCount defined in ScaledObject are correctly specified // ie. that Min is not greater then Max or Idle greater or equal to Min func (r *ScaledObjectReconciler) checkReplicaCountBoundsAreValid(scaledObject *kedav1alpha1.ScaledObject) error { diff --git a/controllers/keda/scaledobject_controller_test.go b/controllers/keda/scaledobject_controller_test.go index e429c7065b3..c5a24d41639 100644 --- a/controllers/keda/scaledobject_controller_test.go +++ b/controllers/keda/scaledobject_controller_test.go @@ -105,8 +105,9 @@ var _ = Describe("ScaledObjectController", func() { testScalers = append(testScalers, cache.ScalerBuilder{ Scaler: s, - Factory: func() (scalers.Scaler, error) { - return scalers.NewPrometheusScaler(config) + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + scaler, err := scalers.NewPrometheusScaler(config) + return scaler, config, err }, }) for _, metricSpec := range s.GetMetricSpecForScaling(context.Background()) { @@ -161,8 +162,8 @@ var _ = Describe("ScaledObjectController", func() { scalersCache := cache.ScalersCache{ Scalers: []cache.ScalerBuilder{{ Scaler: s, - Factory: func() (scalers.Scaler, error) { - return s, nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return s, config, nil }, }}, } @@ -205,8 +206,8 @@ var _ = Describe("ScaledObjectController", func() { testScalers = append(testScalers, cache.ScalerBuilder{ Scaler: s, - Factory: func() (scalers.Scaler, error) { - return s, nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return s, config, nil }, }) } @@ -659,6 +660,62 @@ var _ = Describe("ScaledObjectController", func() { return so.Status.Conditions.GetReadyCondition().Status }, 20*time.Second).Should(Equal(metav1.ConditionFalse)) }) + + It("doesn't allow non-unique triggerName in ScaledObject", func() { + deploymentName := "non-unique-triggername" + soName := "so-" + deploymentName + + triggerName := "non-unique" + + // Create the scaling target. + err := k8sClient.Create(context.Background(), generateDeployment(deploymentName)) + Expect(err).ToNot(HaveOccurred()) + + var five int32 = 5 + var ten int32 = 10 + + // Create the ScaledObject with two triggers + so := &kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{Name: soName, Namespace: "default"}, + Spec: kedav1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &kedav1alpha1.ScaleTarget{ + Name: deploymentName, + }, + IdleReplicaCount: &ten, + MinReplicaCount: &five, + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Name: triggerName, + Metadata: map[string]string{ + "timezone": "UTC", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + { + Type: "cron", + Name: triggerName, + Metadata: map[string]string{ + "timezone": "UTC", + "start": "10 * * * *", + "end": "11 * * * *", + "desiredReplicas": "1", + }, + }, + }, + }, + } + err = k8sClient.Create(context.Background(), so) + Ω(err).ToNot(HaveOccurred()) + + Eventually(func() metav1.ConditionStatus { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + Ω(err).ToNot(HaveOccurred()) + return so.Status.Conditions.GetReadyCondition().Status + }, 20*time.Second).Should(Equal(metav1.ConditionFalse)) + }) }) It("scaleobject ready condition 'False/Unknow' to 'True' will requeue", func() { diff --git a/pkg/prommetrics/adapter_prommetrics.go b/pkg/prommetrics/adapter_prommetrics.go index 7572b47635a..b240ad81097 100644 --- a/pkg/prommetrics/adapter_prommetrics.go +++ b/pkg/prommetrics/adapter_prommetrics.go @@ -100,21 +100,21 @@ func (metricsServer PrometheusMetricServer) NewServer(address string, pattern st } // RecordHPAScalerMetric create a measurement of the external metric used by the HPA -func (metricsServer PrometheusMetricServer) RecordHPAScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, scalerName string, metric string, value float64) { - scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, scalerName, metric)).Set(value) +func (metricsServer PrometheusMetricServer) RecordHPAScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) { + scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) } // RecordHPAScalerError counts the number of errors occurred in trying get an external metric used by the HPA -func (metricsServer PrometheusMetricServer) RecordHPAScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, scalerName string, metric string, err error) { +func (metricsServer PrometheusMetricServer) RecordHPAScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) { if err != nil { - scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, scalerName, metric)).Inc() + scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Inc() // scaledObjectErrors.With(prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject}).Inc() metricsServer.RecordScalerObjectError(namespace, scaledObject, err) scalerErrorsTotal.With(prometheus.Labels{}).Inc() return } // initialize metric with 0 if not already set - _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, scalerName, metric)) + _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)) if errscaler != nil { log.Fatalf("Unable to write to serve custom metrics: %v", errscaler) } @@ -135,6 +135,6 @@ func (metricsServer PrometheusMetricServer) RecordScalerObjectError(namespace st } } -func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, scalerName string, metric string) prometheus.Labels { - return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "scalerName": scalerName, "metric": metric} +func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string) prometheus.Labels { + return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric} } diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 208d43ae22d..c1ae91303e5 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -103,12 +103,15 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, return nil, fmt.Errorf("error when getting scalers %s", err) } + // let's check metrics for all scalers in a ScaledObject scalerError := false - - for scalerIndex, scalerPair := range cache.GetScalers() { - metricSpecs := scalerPair.Scaler.GetMetricSpecForScaling(ctx) - scalerName := strings.Replace(fmt.Sprintf("%T", scalerPair.Scaler), "*scalers.", "", 1) - triggerName := scalerPair.TriggerName + scalers, scalerConfigs := cache.GetScalers() + for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ { + metricSpecs := scalers[scalerIndex].GetMetricSpecForScaling(ctx) + scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1) + if scalerConfigs[scalerIndex].TriggerName != "" { + scalerName = scalerConfigs[scalerIndex].TriggerName + } for _, metricSpec := range metricSpecs { // skip cpu/memory resource scaler @@ -121,15 +124,15 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, metrics, err = p.getMetricsWithFallback(ctx, metrics, err, info.Metric, scaledObject, metricSpec) if err != nil { scalerError = true - logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerPair.Scaler) + logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerName) } else { for _, metric := range metrics { metricValue := metric.Value.AsApproximateFloat64() - metricsServer.RecordHPAScalerMetric(namespace, scaledObject.Name, scalerName, scalerIndex, triggerName, metric.MetricName, metricValue) + metricsServer.RecordHPAScalerMetric(namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue) } matchingMetrics = append(matchingMetrics, metrics...) } - metricsServer.RecordHPAScalerError(namespace, scaledObject.Name, scalerName, scalerIndex, triggerName, info.Metric, err) + metricsServer.RecordHPAScalerError(namespace, scaledObject.Name, scalerName, scalerIndex, info.Metric, err) } } } diff --git a/pkg/scalers/scaler.go b/pkg/scalers/scaler.go index a55a7e509ff..210e6c6195a 100644 --- a/pkg/scalers/scaler.go +++ b/pkg/scalers/scaler.go @@ -78,6 +78,9 @@ type ScalerConfig struct { // The timeout to be used on all HTTP requests from the controller GlobalHTTPTimeout time.Duration + // Name of the trigger + TriggerName string + // TriggerMetadata TriggerMetadata map[string]string diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index 01017b6ae4d..314c361fa12 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -42,25 +42,20 @@ type ScalersCache struct { } type ScalerBuilder struct { - Scaler scalers.Scaler - TriggerName string - Factory func() (scalers.Scaler, error) + Scaler scalers.Scaler + ScalerConfig scalers.ScalerConfig + Factory func() (scalers.Scaler, *scalers.ScalerConfig, error) } -type ScalerPair struct { - Scaler scalers.Scaler - TriggerName string -} - -func (c *ScalersCache) GetScalers() []ScalerPair { - result := make([]ScalerPair, 0, len(c.Scalers)) +func (c *ScalersCache) GetScalers() ([]scalers.Scaler, []scalers.ScalerConfig) { + scalersList := make([]scalers.Scaler, 0, len(c.Scalers)) + configsList := make([]scalers.ScalerConfig, 0, len(c.Scalers)) for _, s := range c.Scalers { - result = append(result, ScalerPair{ - Scaler: s.Scaler, - TriggerName: s.TriggerName, - }) + scalersList = append(scalersList, s.Scaler) + configsList = append(configsList, s.ScalerConfig) } - return result + + return scalersList, configsList } func (c *ScalersCache) GetPushScalers() []scalers.PushScaler { @@ -211,14 +206,15 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale } sb := c.Scalers[id] - ns, err := sb.Factory() + ns, sConfig, err := sb.Factory() if err != nil { return nil, err } c.Scalers[id] = ScalerBuilder{ - Scaler: ns, - Factory: sb.Factory, + Scaler: ns, + ScalerConfig: *sConfig, + Factory: sb.Factory, } sb.Scaler.Close(ctx) diff --git a/pkg/scaling/cache/scalers_cache_test.go b/pkg/scaling/cache/scalers_cache_test.go index 4036f18704e..10d651f46ac 100644 --- a/pkg/scaling/cache/scalers_cache_test.go +++ b/pkg/scaling/cache/scalers_cache_test.go @@ -73,8 +73,8 @@ func TestIsScaledJobActive(t *testing.T) { scaledJobSingle := createScaledObject(0, 100, "") // testing default = max scalerSingle := []ScalerBuilder{{ Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName), - Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, int64(20), int64(2), true, metricName), nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, int64(20), int64(2), true, metricName), &scalers.ScalerConfig{}, nil }, }} @@ -93,8 +93,8 @@ func TestIsScaledJobActive(t *testing.T) { // Non-Active trigger only scalerSingle = []ScalerBuilder{{ Scaler: createScaler(ctrl, int64(0), int64(2), false, metricName), - Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, int64(0), int64(2), false, metricName), nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, int64(0), int64(2), false, metricName), &scalers.ScalerConfig{}, nil }, }} @@ -123,23 +123,23 @@ func TestIsScaledJobActive(t *testing.T) { scaledJob := createScaledObject(scalerTestData.MinReplicaCount, scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation) scalersToTest := []ScalerBuilder{{ Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil }, }, { Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil }, }, { Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil }, }, { Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), - Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil }, }} @@ -167,8 +167,8 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T scaledJobSingle := createScaledObject(1, 100, "") // testing default = max scalerSingle := []ScalerBuilder{{ Scaler: createScaler(ctrl, int64(0), int64(1), true, metricName), - Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, int64(0), int64(1), true, metricName), nil + Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) { + return createScaler(ctrl, int64(0), int64(1), true, metricName), &scalers.ScalerConfig{}, nil }, }} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index dd6789ba6f6..4bde8291454 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -298,17 +298,18 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp for i, t := range withTriggers.Spec.Triggers { triggerIndex, trigger := i, t - factory := func() (scalers.Scaler, error) { + factory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { if podTemplateSpec != nil { resolvedEnv, err = resolver.ResolveContainerEnv(ctx, h.client, logger, &podTemplateSpec.Spec, containerName, withTriggers.Namespace) if err != nil { - return nil, fmt.Errorf("error resolving secrets for ScaleTarget: %s", err) + return nil, nil, fmt.Errorf("error resolving secrets for ScaleTarget: %s", err) } } config := &scalers.ScalerConfig{ ScalableObjectName: withTriggers.Name, ScalableObjectNamespace: withTriggers.Namespace, ScalableObjectType: withTriggers.Kind, + TriggerName: trigger.Name, TriggerMetadata: trigger.Metadata, ResolvedEnv: resolvedEnv, AuthParams: make(map[string]string), @@ -319,13 +320,14 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace) if err != nil { - return nil, err + return nil, nil, err } - return buildScaler(ctx, h.client, trigger.Type, config) + scaler, err := buildScaler(ctx, h.client, trigger.Type, config) + return scaler, config, err } - scaler, err := factory() + scaler, config, err := factory() if err != nil { h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) h.logger.Error(err, "error resolving auth params", "scalerIndex", triggerIndex, "object", withTriggers) @@ -339,9 +341,9 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp } result = append(result, cache.ScalerBuilder{ - Scaler: scaler, - TriggerName: trigger.Name, - Factory: factory, + Scaler: scaler, + ScalerConfig: *config, + Factory: factory, }) } diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 2c05f9aff7c..2acc0e2903b 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -39,13 +39,13 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { ctrl := gomock.NewController(t) recorder := record.NewFakeRecorder(1) - factory := func() (scalers.Scaler, error) { + factory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { scaler := mock_scalers.NewMockScaler(ctrl) scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("some error")) scaler.EXPECT().Close(gomock.Any()) - return scaler, nil + return scaler, &scalers.ScalerConfig{}, nil } - scaler, err := factory() + scaler, _, err := factory() assert.Nil(t, err) scaledObject := kedav1alpha1.ScaledObject{ @@ -82,23 +82,23 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { metricsSpecs := []v2.MetricSpec{createMetricSpec(1)} - activeFactory := func() (scalers.Scaler, error) { + activeFactory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { scaler := mock_scalers.NewMockScaler(ctrl) scaler.EXPECT().IsActive(gomock.Any()).Return(true, nil) scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Times(2).Return(metricsSpecs) scaler.EXPECT().Close(gomock.Any()) - return scaler, nil + return scaler, &scalers.ScalerConfig{}, nil } - activeScaler, err := activeFactory() + activeScaler, _, err := activeFactory() assert.Nil(t, err) - failingFactory := func() (scalers.Scaler, error) { + failingFactory := func() (scalers.Scaler, *scalers.ScalerConfig, error) { scaler := mock_scalers.NewMockScaler(ctrl) scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("some error")) scaler.EXPECT().Close(gomock.Any()) - return scaler, nil + return scaler, &scalers.ScalerConfig{}, nil } - failingScaler, err := failingFactory() + failingScaler, _, err := failingFactory() assert.Nil(t, err) scaledObject := &kedav1alpha1.ScaledObject{