diff --git a/CHANGELOG.md b/CHANGELOG.md index f29a8402d90..607121845d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310)) - **Event Hubs Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569)) - **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728)) +- **Pulsar Scaler:** Add support for partitioned topics ([#3833](https://github.com/kedacore/keda/issues/3833)) ### Fixes diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index bca66308b98..79db0c581e1 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -28,6 +28,7 @@ type pulsarScaler struct { type pulsarMetadata struct { adminURL string topic string + isPartitionedTopic bool subscription string msgBacklogThreshold int64 activationMsgBacklogThreshold int64 @@ -48,6 +49,7 @@ const ( pulsarMetricType = "External" defaultMsgBacklogThreshold = 10 enable = "enable" + stringTrue = "true" ) type pulsarSubscription struct { @@ -140,7 +142,13 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) { } topic := strings.ReplaceAll(meta.topic, "persistent://", "") - meta.statsURL = meta.adminURL + "/admin/v2/persistent/" + topic + "/stats" + if config.TriggerMetadata["isPartitionedTopic"] == stringTrue { + meta.isPartitionedTopic = true + meta.statsURL = meta.adminURL + "/admin/v2/persistent/" + topic + "/partitioned-stats" + } else { + meta.isPartitionedTopic = false + meta.statsURL = meta.adminURL + "/admin/v2/persistent/" + topic + "/stats" + } switch { case config.TriggerMetadata["subscriptionFromEnv"] != "": diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index 16638981d88..b78fe9d2333 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -3,18 +3,20 @@ package scalers import ( "context" "fmt" + "strings" "testing" "github.com/go-logr/logr" ) type parsePulsarMetadataTestData struct { - metadata map[string]string - isError bool - isActive bool - adminURL string - topic string - subscription string + metadata map[string]string + isError bool + isActive bool + isPartitionedTopic bool + adminURL string + topic string + subscription string } type parsePulsarAuthParamsTestData struct { @@ -51,18 +53,20 @@ var validPulsarWithoutAuthParams = map[string]string{} var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{ // failure, no adminURL - {map[string]string{}, true, false, "", "", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, "http://172.20.0.151:80", "", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, "http://172.20.0.151:80", "", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, "http://172.20.0.151:80", "", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic"}, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub1"}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub2"}, false, true, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub2"}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub3"}, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub3"}, - {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{}, true, false, false, "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic"}, true, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub2"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub2"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub3"}, false, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub3"}, + {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "isPartitionedTopic": "true", "subscription": "sub1"}, false, false, true, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, // tls - {map[string]string{"adminURL": "https://localhost:8443", "tls": "enable", "cert": "certdata", "key": "keydata", "ca": "cadata", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, "https://localhost:8443", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{"adminURL": "https://localhost:8443", "tls": "enable", "cert": "certdata", "key": "keydata", "ca": "cadata", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1"}, } var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{ @@ -89,6 +93,24 @@ func TestParsePulsarMetadata(t *testing.T) { t.Errorf("Expected adminURL %s but got %s\n", testData.adminURL, meta.adminURL) } + if !testData.isError { + if testData.isPartitionedTopic { + if !meta.isPartitionedTopic { + t.Errorf("Expected isPartitionedTopic %t but got %t\n", testData.isPartitionedTopic, meta.isPartitionedTopic) + } + if !strings.HasSuffix(meta.statsURL, "/partitioned-stats") { + t.Errorf("Expected statsURL to end with /partitioned-stats but got %s\n", meta.statsURL) + } + } else { + if meta.isPartitionedTopic { + t.Errorf("Expected isPartitionedTopic %t but got %t\n", testData.isPartitionedTopic, meta.isPartitionedTopic) + } + if !strings.HasSuffix(meta.statsURL, "/stats") { + t.Errorf("Expected statsURL to end with /stats but got %s\n", meta.statsURL) + } + } + } + if meta.topic != testData.topic { t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic) } diff --git a/tests/scalers/pulsar/helper/helper.go b/tests/scalers/pulsar/helper/helper.go new file mode 100644 index 00000000000..3c0e5625f74 --- /dev/null +++ b/tests/scalers/pulsar/helper/helper.go @@ -0,0 +1,247 @@ +//go:build e2e +// +build e2e + +package helper + +import ( + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + apachePulsarVersion = "2.10.2" + messageCount = 3 + minReplicaCount = 0 + maxReplicaCount = 5 + msgBacklog = 10 +) + +type templateData struct { + ApachePulsarVersion string + TestName string // Used for most resource names + NumPartitions int // Use 0 to create a non-partitioned topic + MessageCount int + MinReplicaCount int + MaxReplicaCount int + MsgBacklog int +} + +const pulsarStatefulsetTemplate = ` +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: {{.TestName}} + namespace: {{.TestName}} + labels: + app: pulsar +spec: + selector: + matchLabels: + app: pulsar + replicas: 1 + serviceName: {{.TestName}} + template: + metadata: + labels: + app: pulsar + spec: + containers: + - name: pulsar + image: apachepulsar/pulsar:{{.ApachePulsarVersion}} + imagePullPolicy: IfNotPresent + readinessProbe: + tcpSocket: + port: 8080 + ports: + - name: pulsar + containerPort: 6650 + protocol: TCP + - name: admin + containerPort: 8080 + protocol: TCP + env: + - name: PULSAR_PREFIX_tlsRequireTrustedClientCertOnConnect + value: "true" + command: + - sh + - -c + args: ["bin/apply-config-from-env.py conf/client.conf && bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"] +` + +const pulsarServiceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.TestName}} + namespace: {{.TestName}} +spec: + type: ClusterIP + ports: + - name: http + port: 8080 + targetPort: 8080 + protocol: TCP + - name: pulsar + port: 6650 + targetPort: 6650 + protocol: TCP + selector: + app: pulsar +` + +const consumerTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.TestName}}-consumer + namespace: {{.TestName}} + labels: + app: pulsar-consumer +spec: + selector: + matchLabels: + app: pulsar-consumer + template: + metadata: + labels: + app: pulsar-consumer + spec: + containers: + - name: pulsar-consumer + image: ghcr.io/pulsar-sigs/pulsar-client:v0.3.1 + imagePullPolicy: IfNotPresent + readinessProbe: + tcpSocket: + port: 9494 + args: ["consumer","--broker","pulsar://{{.TestName}}.{{.TestName}}:6650","--topic","persistent://public/default/keda","--subscription-name","keda","--consume-time","200"] + +` + +const scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.TestName}} + namespace: {{.TestName}} +spec: + scaleTargetRef: + name: {{.TestName}}-consumer + pollingInterval: 5 # Optional. Default: 30 seconds + cooldownPeriod: 30 # Optional. Default: 300 seconds + maxReplicaCount: {{.MaxReplicaCount}} + minReplicaCount: {{.MinReplicaCount}} + isPartitionedTopic: {{ if .NumPartitions }} "true" {{else}} "false" {{end}} + triggers: + - type: pulsar + metadata: + msgBacklog: "{{.MsgBacklog}}" + activationMsgBacklogThreshold: "5" + adminURL: http://{{.TestName}}.{{.TestName}}:8080 + topic: persistent://public/default/keda + subscription: keda + ` + +const topicPublishJobTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: {{.TestName}}-producer + namespace: {{.TestName}} +spec: + template: + spec: + containers: + - name: pulsar-producer + image: apachepulsar/pulsar:{{.ApachePulsarVersion}} + imagePullPolicy: IfNotPresent + args: ["bin/pulsar-perf produce --service-url pulsar://{{.TestName}}.{{.TestName}}:6650 --num-messages {{.MessageCount}} {{ if .NumPartitions }} "--partitions {{.NumPartitions}}" {{ end }} persistent://public/default/keda"] + restartPolicy: Never + backoffLimit: 4 +` + +func TestScalerWithConfig(t *testing.T, testName string, numPartitions int) { + // setup + t.Log("--- setting up ---") + + // Create kubernetes resources + kc := helper.GetKubernetesClient(t) + data, templates := getTemplateData(testName, numPartitions) + + helper.CreateKubernetesResources(t, kc, testName, data, templates) + + assert.True(t, helper.WaitForStatefulsetReplicaReadyCount(t, kc, testName, testName, 1, 300, 1), + "replica count should be 1 after 5 minute") + + helper.KubectlApplyWithTemplate(t, data, "consumerTemplate", consumerTemplate) + + // run consumer for create subscription + assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(testName), testName, 1, 300, 1), + "replica count should be 1 after 5 minute") + + helper.KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + + assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(testName), testName, 0, 60, 1), + "replica count should be 0 after a minute") + + testActivation(t, kc, data) + // scale up + testScaleUp(t, kc, data) + // scale down + testScaleDown(t, kc, testName) + + // cleanup + helper.KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + helper.KubectlDeleteWithTemplate(t, data, "publishJobTemplate", topicPublishJobTemplate) + + helper.DeleteKubernetesResources(t, kc, testName, data, templates) +} + +func getTemplateData(testName string, numPartitions int) (templateData, []helper.Template) { + return templateData{ + ApachePulsarVersion: apachePulsarVersion, + TestName: testName, + NumPartitions: numPartitions, + MessageCount: messageCount, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + MsgBacklog: msgBacklog, + }, []helper.Template{ + {Name: "statefulsetTemplate", Config: pulsarStatefulsetTemplate}, + {Name: "serviceTemplate", Config: pulsarServiceTemplate}, + } +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing activation ---") + // publish message and less than MsgBacklog + helper.KubectlApplyWithTemplate(t, data, "publishJobTemplate", topicPublishJobTemplate) + helper.AssertReplicaCountNotChangeDuringTimePeriod(t, kc, getConsumerDeploymentName(data.TestName), data.TestName, data.MinReplicaCount, 60) + helper.KubectlDeleteWithTemplate(t, data, "publishJobTemplate", topicPublishJobTemplate) +} + +func testScaleUp(t *testing.T, kc *kubernetes.Clientset, data templateData) { + data.MessageCount = 100 + helper.KubectlApplyWithTemplate(t, data, "publishJobTemplate", topicPublishJobTemplate) + assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(data.TestName), data.TestName, 5, 300, 1), + "replica count should be 5 after 5 minute") +} + +func testScaleDown(t *testing.T, kc *kubernetes.Clientset, testName string) { + t.Log("--- testing scale down ---") + // Check if deployment scale down to 0 after 5 minutes + assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(testName), testName, 0, 300, 1), + "Replica count should be 0 after 5 minutes") +} + +func getConsumerDeploymentName(testName string) string { + return fmt.Sprintf("%s-consumer", testName) +} diff --git a/tests/scalers/pulsar/pulsar_non_partitioned_topic/pulsar_non_partitioned_topic_test.go b/tests/scalers/pulsar/pulsar_non_partitioned_topic/pulsar_non_partitioned_topic_test.go new file mode 100644 index 00000000000..a2841dcaa10 --- /dev/null +++ b/tests/scalers/pulsar/pulsar_non_partitioned_topic/pulsar_non_partitioned_topic_test.go @@ -0,0 +1,19 @@ +//go:build e2e +// +build e2e + +package pulsar_non_partitioned_topic_test + +import ( + "testing" + + pulsar "github.com/kedacore/keda/v2/tests/scalers/pulsar/helper" +) + +const ( + testName = "pulsar-non-partitioned-topic-test" + numPartitions = 0 +) + +func TestScaler(t *testing.T) { + pulsar.TestScalerWithConfig(t, testName, numPartitions) +} diff --git a/tests/scalers/pulsar/pulsar_partitioned_topic/pulsar_partitioned_topic_test.go b/tests/scalers/pulsar/pulsar_partitioned_topic/pulsar_partitioned_topic_test.go new file mode 100644 index 00000000000..310d186560d --- /dev/null +++ b/tests/scalers/pulsar/pulsar_partitioned_topic/pulsar_partitioned_topic_test.go @@ -0,0 +1,19 @@ +//go:build e2e +// +build e2e + +package pulsar_partitioned_topic_test + +import ( + "testing" + + pulsar "github.com/kedacore/keda/v2/tests/scalers/pulsar/helper" +) + +const ( + testName = "pulsar-partitioned-topic-test" + numPartitions = 2 +) + +func TestScaler(t *testing.T) { + pulsar.TestScalerWithConfig(t, testName, numPartitions) +} diff --git a/tests/scalers/pulsar/pulsar_test.go b/tests/scalers/pulsar/pulsar_test.go deleted file mode 100644 index ba3f751261b..00000000000 --- a/tests/scalers/pulsar/pulsar_test.go +++ /dev/null @@ -1,254 +0,0 @@ -//go:build e2e -// +build e2e - -package pulsar_test - -import ( - "fmt" - "testing" - - "github.com/joho/godotenv" - "github.com/stretchr/testify/assert" - "k8s.io/client-go/kubernetes" - - . "github.com/kedacore/keda/v2/tests/helper" -) - -// Load environment variables from .env file -var _ = godotenv.Load("../../.env") - -const ( - testName = "pulsar-test" -) - -var ( - testNamespace = fmt.Sprintf("%s-ns", testName) - statefulSetName = fmt.Sprintf("%s-sts", testName) - scaledObjectName = fmt.Sprintf("%s-so", testName) - consumerDeploymentName = fmt.Sprintf("%s-consumer-deploy", testName) - producerJobName = fmt.Sprintf("%s-producer-job", testName) - messageCount = 3 - minReplicaCount = 0 - maxReplicaCount = 5 - msgBacklog = 10 -) - -type templateData struct { - TestNamespace string - StatefulSetName string - MessageCount int - ScaledObjectName string - ConsumerDeploymentName string - ProducerJobName string - MinReplicaCount int - MaxReplicaCount int - MsgBacklog int -} - -const statefulsetTemplate = ` -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: {{.StatefulSetName}} - namespace: {{.TestNamespace}} - labels: - app: pulsar -spec: - selector: - matchLabels: - app: pulsar - replicas: 1 - serviceName: {{.StatefulSetName}} - template: - metadata: - labels: - app: pulsar - spec: - containers: - - name: pulsar - image: apachepulsar/pulsar:2.10.0 - imagePullPolicy: IfNotPresent - readinessProbe: - tcpSocket: - port: 8080 - ports: - - name: pulsar - containerPort: 6650 - protocol: TCP - - name: admin - containerPort: 8080 - protocol: TCP - env: - - name: PULSAR_PREFIX_tlsRequireTrustedClientCertOnConnect - value: "true" - command: - - sh - - -c - args: ["bin/apply-config-from-env.py conf/client.conf && bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"] -` - -const consumerTemplate = ` -apiVersion: apps/v1 -kind: Deployment -metadata: - name: {{.ConsumerDeploymentName}} - namespace: {{.TestNamespace}} - labels: - app: pulsar-consumer -spec: - selector: - matchLabels: - app: pulsar-consumer - template: - metadata: - labels: - app: pulsar-consumer - spec: - containers: - - name: pulsar-consumer - image: ghcr.io/pulsar-sigs/pulsar-client:v0.3.1 - imagePullPolicy: IfNotPresent - readinessProbe: - tcpSocket: - port: 9494 - args: ["consumer","--broker","pulsar://{{.StatefulSetName}}.{{.TestNamespace}}:6650","--topic","persistent://public/default/keda","--subscription-name","keda","--consume-time","200"] - -` - -const scaledObjectTemplate = ` -apiVersion: keda.sh/v1alpha1 -kind: ScaledObject -metadata: - name: {{.ScaledObjectName}} - namespace: {{.TestNamespace}} -spec: - scaleTargetRef: - name: {{.ConsumerDeploymentName}} - pollingInterval: 5 # Optional. Default: 30 seconds - cooldownPeriod: 30 # Optional. Default: 300 seconds - maxReplicaCount: {{.MaxReplicaCount}} - minReplicaCount: {{.MinReplicaCount}} - triggers: - - type: pulsar - metadata: - msgBacklog: "{{.MsgBacklog}}" - activationMsgBacklogThreshold: "5" - adminURL: http://{{.StatefulSetName}}.{{.TestNamespace}}:8080 - topic: persistent://public/default/keda - subscription: keda - ` - -const publishJobTemplate = ` -apiVersion: batch/v1 -kind: Job -metadata: - name: {{.ProducerJobName}} - namespace: {{.TestNamespace}} -spec: - template: - spec: - containers: - - name: pulsar-client - image: ghcr.io/pulsar-sigs/pulsar-client:v0.3.1 - imagePullPolicy: IfNotPresent - args: ["producer", "--broker","pulsar://{{.StatefulSetName}}.{{.TestNamespace}}:6650","--topic","persistent://public/default/keda","--message-num","{{.MessageCount}}"] - restartPolicy: Never - backoffLimit: 4 -` - -const serviceTemplate = ` -apiVersion: v1 -kind: Service -metadata: - name: {{.StatefulSetName}} - namespace: {{.TestNamespace}} -spec: - type: ClusterIP - ports: - - name: http - port: 8080 - targetPort: 8080 - protocol: TCP - - name: pulsar - port: 6650 - targetPort: 6650 - protocol: TCP - selector: - app: pulsar -` - -func getTemplateData() (templateData, []Template) { - return templateData{ - TestNamespace: testNamespace, - StatefulSetName: statefulSetName, - MessageCount: messageCount, - ScaledObjectName: scaledObjectName, - ConsumerDeploymentName: consumerDeploymentName, - ProducerJobName: producerJobName, - MinReplicaCount: minReplicaCount, - MaxReplicaCount: maxReplicaCount, - MsgBacklog: msgBacklog, - }, []Template{ - {Name: "statefulsetTemplate", Config: statefulsetTemplate}, - {Name: "serviceTemplate", Config: serviceTemplate}, - } -} - -func TestScaler(t *testing.T) { - // setup - t.Log("--- setting up ---") - - // Create kubernetes resources - kc := GetKubernetesClient(t) - data, templates := getTemplateData() - - CreateKubernetesResources(t, kc, testNamespace, data, templates) - - assert.True(t, WaitForStatefulsetReplicaReadyCount(t, kc, statefulSetName, testNamespace, 1, 300, 1), - "replica count should be 1 after 5 minute") - - KubectlApplyWithTemplate(t, data, "consumerTemplate", consumerTemplate) - - // run consumer for create subcription - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, consumerDeploymentName, testNamespace, 1, 300, 1), - "replica count should be 1 after 5 minute") - - KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) - - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, consumerDeploymentName, testNamespace, 0, 60, 1), - "replica count should be 0 after a minute") - - testActivation(t, kc, data) - // scale up - testScaleUp(t, kc, data) - // scale down - testScaleDown(t, kc) - - // cleanup - KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) - KubectlDeleteWithTemplate(t, data, "publishJobTemplate", publishJobTemplate) - - DeleteKubernetesResources(t, kc, testNamespace, data, templates) -} - -func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing activation ---") - // publish message and less than MsgBacklog - KubectlApplyWithTemplate(t, data, "publishJobTemplate", publishJobTemplate) - AssertReplicaCountNotChangeDuringTimePeriod(t, kc, consumerDeploymentName, testNamespace, minReplicaCount, 60) - KubectlDeleteWithTemplate(t, data, "publishJobTemplate", publishJobTemplate) -} - -func testScaleUp(t *testing.T, kc *kubernetes.Clientset, data templateData) { - data.MessageCount = 100 - KubectlApplyWithTemplate(t, data, "publishJobTemplate", publishJobTemplate) - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, consumerDeploymentName, testNamespace, 5, 300, 1), - "replica count should be 5 after 5 minute") -} - -func testScaleDown(t *testing.T, kc *kubernetes.Clientset) { - t.Log("--- testing scale down ---") - // Check if deployment scale down to 0 after 5 minutes - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, consumerDeploymentName, testNamespace, 0, 300, 1), - "Replica count should be 0 after 5 minutes") -}