diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e61dbfeedd..662631d449f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,10 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md - TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) +### Fixes + +- **General:** Use metricName from GetMetricsSpec in ScaledJobs instead of `queueLength` ([#3032](https://github.com/kedacore/keda/issue/3032)) + ### Deprecations - TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) diff --git a/pkg/scalers/external_scaler.go b/pkg/scalers/external_scaler.go index a0cd8857c4e..aa929704f4b 100644 --- a/pkg/scalers/external_scaler.go +++ b/pkg/scalers/external_scaler.go @@ -198,13 +198,13 @@ func (s *externalScaler) GetMetrics(ctx context.Context, metricName string, metr defer done() // Remove the sX- prefix as the external scaler shouldn't have to know about it - metricName, err = RemoveIndexFromMetricName(s.metadata.scalerIndex, metricName) + metricNameWithoutIndex, err := RemoveIndexFromMetricName(s.metadata.scalerIndex, metricName) if err != nil { return metrics, err } request := &pb.GetMetricsRequest{ - MetricName: metricName, + MetricName: metricNameWithoutIndex, ScaledObjectRef: &s.scaledObjectRef, } @@ -216,7 +216,7 @@ func (s *externalScaler) GetMetrics(ctx context.Context, metricName string, metr for _, metricResult := range response.MetricValues { metric := external_metrics.ExternalMetricValue{ - MetricName: metricResult.MetricName, + MetricName: metricName, Value: *resource.NewQuantity(metricResult.MetricValue, resource.DecimalSI), Timestamp: metav1.Now(), } diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index 315abba36da..d77a2f7b550 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -271,7 +271,7 @@ func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav targetAverageValue = getTargetAverageValue(metricSpecs) - metrics, err := s.Scaler.GetMetrics(ctx, "queueLength", nil) + metrics, err := s.Scaler.GetMetrics(ctx, metricSpecs[0].External.Metric.Name, nil) if err != nil { scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err) c.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) @@ -281,12 +281,12 @@ func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav var metricValue int64 for _, m := range metrics { - if m.MetricName == "queueLength" { + if m.MetricName == metricSpecs[0].External.Metric.Name { metricValue, _ = m.Value.AsInt64() queueLength += metricValue } } - scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, "queueLength", queueLength, "targetAverageValue", targetAverageValue) + scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) if isTriggerActive { isActive = true diff --git a/pkg/scaling/cache/scalers_cache_test.go b/pkg/scaling/cache/scalers_cache_test.go index 694ab68705f..db7f3a8937f 100644 --- a/pkg/scaling/cache/scalers_cache_test.go +++ b/pkg/scaling/cache/scalers_cache_test.go @@ -21,56 +21,60 @@ import ( func TestTargetAverageValue(t *testing.T) { // count = 0 specs := []v2beta2.MetricSpec{} + metricName := "s0-messageCount" targetAverageValue := getTargetAverageValue(specs) assert.Equal(t, int64(0), targetAverageValue) // 1 1 specs = []v2beta2.MetricSpec{ - createMetricSpec(1), - createMetricSpec(1), + createMetricSpec(1, metricName), + createMetricSpec(1, metricName), } targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, int64(1), targetAverageValue) // 5 5 3 specs = []v2beta2.MetricSpec{ - createMetricSpec(5), - createMetricSpec(5), - createMetricSpec(3), + createMetricSpec(5, metricName), + createMetricSpec(5, metricName), + createMetricSpec(3, metricName), } targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, int64(4), targetAverageValue) // 5 5 4 specs = []v2beta2.MetricSpec{ - createMetricSpec(5), - createMetricSpec(5), - createMetricSpec(3), + createMetricSpec(5, metricName), + createMetricSpec(5, metricName), + createMetricSpec(3, metricName), } targetAverageValue = getTargetAverageValue(specs) assert.Equal(t, int64(4), targetAverageValue) } -func createMetricSpec(averageValue int64) v2beta2.MetricSpec { +func createMetricSpec(averageValue int64, metricName string) v2beta2.MetricSpec { qty := resource.NewQuantity(averageValue, resource.DecimalSI) return v2beta2.MetricSpec{ External: &v2beta2.ExternalMetricSource{ Target: v2beta2.MetricTarget{ AverageValue: qty, }, + Metric: v2beta2.MetricIdentifier{ + Name: metricName, + }, }, } } func TestIsScaledJobActive(t *testing.T) { + metricName := "s0-queueLength" ctrl := gomock.NewController(t) recorder := record.NewFakeRecorder(1) - // Keep the current behavior // Assme 1 trigger only scaledJobSingle := createScaledObject(100, "") // testing default = max scalerSingle := []ScalerBuilder{{ - Scaler: createScaler(ctrl, int64(20), int64(2), true), + Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName), Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, int64(20), int64(2), true), nil + return createScaler(ctrl, int64(20), int64(2), true, metricName), nil }, }} @@ -88,9 +92,9 @@ func TestIsScaledJobActive(t *testing.T) { // Non-Active trigger only scalerSingle = []ScalerBuilder{{ - Scaler: createScaler(ctrl, int64(0), int64(2), false), + Scaler: createScaler(ctrl, int64(0), int64(2), false, metricName), Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, int64(0), int64(2), false), nil + return createScaler(ctrl, int64(0), int64(2), false, metricName), nil }, }} @@ -108,34 +112,34 @@ func TestIsScaledJobActive(t *testing.T) { // Test the valiation scalerTestDatam := []scalerTestData{ - newScalerTestData(100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20), - newScalerTestData(100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2), - newScalerTestData(100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9), - newScalerTestData(100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27), - newScalerTestData(25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25), + newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20), + newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2), + newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9), + newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27), + newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25), } for index, scalerTestData := range scalerTestDatam { scaledJob := createScaledObject(scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation) scalersToTest := []ScalerBuilder{{ - Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive), + Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive), nil + return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), nil }, }, { - Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive), + Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive), nil + return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), nil }, }, { - Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive), + Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive), nil + return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), nil }, }, { - Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive), + Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), Factory: func() (scalers.Scaler, error) { - return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive), nil + return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), nil }, }} @@ -155,6 +159,7 @@ func TestIsScaledJobActive(t *testing.T) { } func newScalerTestData( + metricName string, maxReplicaCount int, multipleScalersCalculation string, scaler1QueueLength, //nolint:golint,unparam @@ -173,6 +178,7 @@ func newScalerTestData( resultQueueLength, resultMaxLength int) scalerTestData { return scalerTestData{ + MetricName: metricName, MaxReplicaCount: int32(maxReplicaCount), MultipleScalersCalculation: multipleScalersCalculation, Scaler1QueueLength: int64(scaler1QueueLength), @@ -194,6 +200,7 @@ func newScalerTestData( } type scalerTestData struct { + MetricName string MaxReplicaCount int32 MultipleScalersCalculation string Scaler1QueueLength int64 @@ -231,10 +238,10 @@ func createScaledObject(maxReplicaCount int32, multipleScalersCalculation string } } -func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool) *mock_scalers.MockScaler { - metricName := "queueLength" +func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64, isActive bool, metricName string) *mock_scalers.MockScaler { scaler := mock_scalers.NewMockScaler(ctrl) - metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(averageValue)} + metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(averageValue, metricName)} + metrics := []external_metrics.ExternalMetricValue{ { MetricName: metricName, @@ -243,7 +250,7 @@ func createScaler(ctrl *gomock.Controller, queueLength int64, averageValue int64 } scaler.EXPECT().IsActive(gomock.Any()).Return(isActive, nil) scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs) - scaler.EXPECT().GetMetrics(gomock.Any(), metricName, nil).Return(metrics, nil) + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Any(), nil).Return(metrics, nil) scaler.EXPECT().Close(gomock.Any()) return scaler } diff --git a/tests/scalers/external-scaler.sj.test.ts b/tests/scalers/external-scaler.sj.test.ts new file mode 100644 index 00000000000..d204d888015 --- /dev/null +++ b/tests/scalers/external-scaler.sj.test.ts @@ -0,0 +1,155 @@ +import * as sh from "shelljs" +import test from "ava" +import { createNamespace, createYamlFile, waitForDeploymentReplicaCount, waitForJobCount } from "./helpers" + +const testName = "test-external-scaler-sj" +const testNamespace = `${testName}-ns` +const scalerName = `${testName}-scaler` +const serviceName = `${testName}-service` +const deploymentName = `${testName}-deployment` +const scaledJobName = `${testName}-scaled-job` + +const maxReplicaCount = 3 +const threshold = 10 + +test.before(async t => { + sh.config.silent = true + + // Create Kubernetes Namespace + createNamespace(testNamespace) + + // Create external scaler deployment + t.is( + sh.exec(`kubectl apply -f ${createYamlFile(scalerYaml)} -n ${testNamespace}`).code, + 0, + "Createing a external scaler deployment should work" + ) + + // Create service + t.is( + sh.exec(`kubectl apply -f ${createYamlFile(serviceYaml)} -n ${testNamespace}`).code, + 0, + "Createing a service should work" + ) + + // Create scaled job + t.is( + sh.exec(`kubectl apply -f ${createYamlFile(scaledJobYaml.replace("{{VALUE}}", "0"))} -n ${testNamespace}`).code, + 0, + "Creating a scaled job should work" + ) + + t.true(await waitForJobCount(0, testNamespace, 60, 1000),`Replica count should be 0 after 1 minute`) +}) + +test.serial("Deployment should scale up to maxReplicaCount", async t => { + // Modify scaled job's metricValue to induce scaling + t.is( + sh.exec(`kubectl apply -f ${createYamlFile(scaledJobYaml.replace("{{VALUE}}", `${threshold * maxReplicaCount}`))} -n ${testNamespace}`).code, + 0, + "Modifying scaled job should work" + ) + + t.true(await waitForJobCount(maxReplicaCount, testNamespace, 60, 1000),`Replica count should be ${maxReplicaCount} after 1 minute`) +}) + +test.serial("Deployment should scale back down to 0", async t => { + // Modify scaled job's metricValue to induce scaling + t.is( + sh.exec(`kubectl apply -f ${createYamlFile(scaledJobYaml.replace("{{VALUE}}", "0"))} -n ${testNamespace}`).code, + 0, + "Modifying scaled job should work" + ) + + t.true(await waitForJobCount(0, testNamespace, 120, 1000),`Replica count should be 0 after 2 minute`) +}) + +test.after.always("Clean up E2E K8s objects", async t => { + const resources = [ + `scaledjob.keda.sh/${scaledJobName}`, + `deployments.apps/${deploymentName}`, + `service/${serviceName}`, + `deployments.apps/${scalerName}`, + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} -n ${testNamespace}`) + } + + sh.exec(`kubectl delete ns ${testNamespace}`) +}) + +// YAML Definitions for Kubernetes resources +// External Scaler Deployment +const scalerYaml = +` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ${scalerName} + namespace: ${testNamespace} +spec: + replicas: 1 + selector: + matchLabels: + app: ${scalerName} + template: + metadata: + labels: + app: ${scalerName} + spec: + containers: + - name: scaler + image: ghcr.io/kedacore/tests-external-scaler-e2e:latest + imagePullPolicy: Always + ports: + - containerPort: 6000 +` + +const serviceYaml = +` +apiVersion: v1 +kind: Service +metadata: + name: ${serviceName} + namespace: ${testNamespace} +spec: + ports: + - port: 6000 + targetPort: 6000 + selector: + app: ${scalerName} +` + +// scaled job +const scaledJobYaml = +` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: ${scaledJobName} + namespace: ${testNamespace} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 5 + maxReplicaCount: ${maxReplicaCount} + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 10 + triggers: + - type: external + metadata: + scalerAddress: ${serviceName}.${testNamespace}:6000 + metricThreshold: "${threshold}" + metricValue: "{{VALUE}}" +` diff --git a/tests/scalers/external-scaler.test.ts b/tests/scalers/external-scaler.so.test.ts similarity index 97% rename from tests/scalers/external-scaler.test.ts rename to tests/scalers/external-scaler.so.test.ts index 450f932e3ad..24a2191d415 100644 --- a/tests/scalers/external-scaler.test.ts +++ b/tests/scalers/external-scaler.so.test.ts @@ -2,7 +2,7 @@ import * as sh from "shelljs" import test from "ava" import { createNamespace, createYamlFile, waitForDeploymentReplicaCount } from "./helpers" -const testName = "test-external-scaler" +const testName = "test-external-scaler-so" const testNamespace = `${testName}-ns` const scalerName = `${testName}-scaler` const serviceName = `${testName}-service` @@ -67,7 +67,7 @@ test.serial("Deployment should scale up to minReplicaCount", async t => { test.serial("Deployment should scale up to maxReplicaCount", async t => { // Modify scaled object's metricValue to induce scaling t.is( - sh.exec(`kubectl apply -f ${createYamlFile(scaledObjectYaml.replace("{{VALUE}}", `${threshold * 2}`))} -n ${testNamespace}`).code, + sh.exec(`kubectl apply -f ${createYamlFile(scaledObjectYaml.replace("{{VALUE}}", `${threshold * maxReplicaCount}`))} -n ${testNamespace}`).code, 0, "Modifying scaled object should work" ) diff --git a/tests/scalers/helpers.ts b/tests/scalers/helpers.ts index 5af7677d0fe..8d80c37b9d3 100644 --- a/tests/scalers/helpers.ts +++ b/tests/scalers/helpers.ts @@ -25,6 +25,26 @@ export async function waitForDeploymentReplicaCount(target: number, name: string return false } +export async function waitForJobCount(target: number, namespace: string, iterations = 10, interval = 3000): Promise { + for (let i = 0; i < iterations; i++) { + let jobCountStr = sh.exec(`kubectl get job --namespace ${namespace} | wc -l`).stdout.replace(/[\r\n]/g,"") + try { + let jobCount = parseInt(jobCountStr, 10) + // This method counts also the header line in the output, so we have to remove 1 if the jobCount is > 1 + if (jobCount > 0) { + jobCount-- + } + + if (jobCount === target) { + return true + } + } catch { } + + await sleep(interval) + } + return false +} + export async function createNamespace(namespace: string) { const namespaceFile = tmp.fileSync() fs.writeFileSync(namespaceFile.name, namespaceTemplate.replace('{{NAMESPACE}}', namespace)) diff --git a/tests/scalers/mongodb.test.ts b/tests/scalers/mongodb.test.ts index ec79d11d73a..7d8b7d8d4d0 100644 --- a/tests/scalers/mongodb.test.ts +++ b/tests/scalers/mongodb.test.ts @@ -2,7 +2,7 @@ import * as fs from 'fs' import * as sh from 'shelljs' import * as tmp from 'tmp' import test from 'ava' -import { createNamespace } from './helpers' +import { createNamespace, waitForJobCount } from './helpers' const mongoDBNamespace = 'mongodb' const testNamespace = 'mongodb-test' @@ -61,14 +61,11 @@ test.before(t => { }) -test.serial('Job should have 0 job on start', t => { - const jobCount = sh.exec( - `kubectl get job --namespace ${testNamespace}` - ).stdout - t.is(jobCount, '', 'job count should start out as 0') +test.serial('Job should have 0 job on start', async t => { + t.true(await waitForJobCount(0, testNamespace, 10, 1000), `job count should start out as 0`) }) -test.serial(`Job should scale to 5 then back to 0`, t => { +test.serial(`Job should scale to 5 then back to 0`, async t => { // insert data to mongodb const InsertJS = `db.${mongodbCollection}.insert([ {"region":"eu-1","state":"running","plan":"planA","goods":"apple"}, @@ -85,33 +82,12 @@ test.serial(`Job should scale to 5 then back to 0`, t => { sh.exec(`kubectl exec -n ${mongoDBNamespace} ${mongoDBPod} -- mongo --eval \'${InsertJS}\'`).code, 'insert 5 mongo record' ) + let maxJobCount = 5 + t.true(await waitForJobCount(maxJobCount, testNamespace, 60, 1000), `Job count should be ${maxJobCount} after 60 seconds`) - let jobCount = '0' - // maxJobCount = real Job + first line of output - const maxJobCount = '6' + // Process elements - for (let i = 0; i < 30 && jobCount !== maxJobCount; i++) { - jobCount = sh.exec( - `kubectl get job --namespace ${testNamespace} | wc -l` - ).stdout.replace(/[\r\n]/g,"") - - if (jobCount !== maxJobCount) { - sh.exec('sleep 2s') - } - } - - t.is(maxJobCount, jobCount, `Job count should be ${maxJobCount} after 60 seconds`) - - for (let i = 0; i < 36 && jobCount !== '0'; i++) { - jobCount = sh.exec( - `kubectl get job --namespace ${testNamespace} | wc -l` - ).stdout.replace(/[\r\n]/g,"") - if (jobCount !== '0') { - sh.exec('sleep 5s') - } - } - - t.is('0', jobCount, 'Job count should be 0 after 3 minutes') + t.true(await waitForJobCount(0, testNamespace, 180, 1000), `Job count should be 0 after 3 minutes`) }) test.after.always.cb('clean up mongodb deployment', t => {