diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f61d750aed..b06c86dda1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ - **Datadog Scaler:** Several improvements, including a new optional parameter `metricUnavailableValue` to fill data when no Datadog metric was returned ([#2657](https://github.com/kedacore/keda/issues/2657)) - **Datadog Scaler:** Rely on Datadog API to validate the query ([2761](https://github.com/kedacore/keda/issues/2761)) - **Kafka Scaler:** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608)) +- **Kafka Scaler:** New `scaleToZeroOnInvalidOffset` to control behavior when partitions have an invalid offset ([#2033](https://github.com/kedacore/keda/issues/2033)[#2612](https://github.com/kedacore/keda/issues/2612)) - **Metric API Scaler:** Improve error handling on not-ok response ([#2317](https://github.com/kedacore/keda/issues/2317)) - **Prometheus Scaler:** Check and properly inform user that `threshold` is not set ([#2793](https://github.com/kedacore/keda/issues/2793)) - **Prometheus Scaler:** Support for `X-Scope-OrgID` header ([#2667](https://github.com/kedacore/keda/issues/2667)) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 01a691f3552..170d7eed425 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -35,6 +35,10 @@ type kafkaMetadata struct { allowIdleConsumers bool version sarama.KafkaVersion + // If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can + // occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612 + scaleToZeroOnInvalidOffset bool + // SASL saslType kafkaSaslType username string @@ -101,6 +105,53 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) { }, nil } +func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error { + meta.saslType = KafkaSASLTypeNone + if val, ok := config.AuthParams["sasl"]; ok { + val = strings.TrimSpace(val) + mode := kafkaSaslType(val) + + if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 { + if config.AuthParams["username"] == "" { + return errors.New("no username given") + } + meta.username = strings.TrimSpace(config.AuthParams["username"]) + + if config.AuthParams["password"] == "" { + return errors.New("no password given") + } + meta.password = strings.TrimSpace(config.AuthParams["password"]) + meta.saslType = mode + } else { + return fmt.Errorf("err SASL mode %s given", mode) + } + } + + meta.enableTLS = false + if val, ok := config.AuthParams["tls"]; ok { + val = strings.TrimSpace(val) + + if val == "enable" { + certGiven := config.AuthParams["cert"] != "" + keyGiven := config.AuthParams["key"] != "" + if certGiven && !keyGiven { + return errors.New("key must be provided with cert") + } + if keyGiven && !certGiven { + return errors.New("cert must be provided with key") + } + meta.ca = config.AuthParams["ca"] + meta.cert = config.AuthParams["cert"] + meta.key = config.AuthParams["key"] + meta.enableTLS = true + } else if val != "disable" { + return fmt.Errorf("err incorrect value for TLS given: %s", val) + } + } + + return nil +} + func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta := kafkaMetadata{} switch { @@ -128,7 +179,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta.topic = config.TriggerMetadata["topic"] default: meta.topic = "" - kafkaLog.V(1).Info(fmt.Sprintf("cosumer group %s has no topic specified, "+ + kafkaLog.V(1).Info(fmt.Sprintf("consumer group %s has no topic specified, "+ "will use all topics subscribed by the consumer group for scaling", meta.group)) } @@ -152,47 +203,8 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta.lagThreshold = t } - meta.saslType = KafkaSASLTypeNone - if val, ok := config.AuthParams["sasl"]; ok { - val = strings.TrimSpace(val) - mode := kafkaSaslType(val) - - if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 { - if config.AuthParams["username"] == "" { - return meta, errors.New("no username given") - } - meta.username = strings.TrimSpace(config.AuthParams["username"]) - - if config.AuthParams["password"] == "" { - return meta, errors.New("no password given") - } - meta.password = strings.TrimSpace(config.AuthParams["password"]) - meta.saslType = mode - } else { - return meta, fmt.Errorf("err SASL mode %s given", mode) - } - } - - meta.enableTLS = false - if val, ok := config.AuthParams["tls"]; ok { - val = strings.TrimSpace(val) - - if val == "enable" { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { - return meta, errors.New("key must be provided with cert") - } - if keyGiven && !certGiven { - return meta, errors.New("cert must be provided with key") - } - meta.ca = config.AuthParams["ca"] - meta.cert = config.AuthParams["cert"] - meta.key = config.AuthParams["key"] - meta.enableTLS = true - } else if val != "disable" { - return meta, fmt.Errorf("err incorrect value for TLS given: %s", val) - } + if err := parseKafkaAuthParams(config, &meta); err != nil { + return meta, err } meta.allowIdleConsumers = false @@ -204,6 +216,15 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta.allowIdleConsumers = t } + meta.scaleToZeroOnInvalidOffset = false + if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok { + t, err := strconv.ParseBool(val) + if err != nil { + return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %s", err) + } + meta.scaleToZeroOnInvalidOffset = t + } + meta.version = sarama.V1_0_0_0 if val, ok := config.TriggerMetadata["version"]; ok { val = strings.TrimSpace(val) @@ -343,17 +364,25 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) { block := offsets.GetBlock(topic, partitionID) if block == nil { - kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID), "") - return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID) + errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID) + kafkaLog.Error(errMsg, "") + return 0, errMsg } consumerOffset := block.Offset if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { - kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID)) - return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID) + retVal := int64(1) + if s.metadata.scaleToZeroOnInvalidOffset { + retVal = 0 + } + msg := fmt.Sprintf( + "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d", + topic, s.metadata.group, partitionID, retVal) + kafkaLog.V(0).Info(msg) + return retVal, nil } if _, found := topicPartitionOffsets[topic]; !found { - return 0, fmt.Errorf("error finding parition offset for topic %s", topic) + return 0, fmt.Errorf("error finding partition offset for topic %s", topic) } latestOffset := topicPartitionOffsets[topic][partitionID] if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 7dfcae20ebe..166e37d628f 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -191,6 +191,17 @@ func TestKafkaAuthParams(t *testing.T) { if meta.enableTLS != testData.enableTLS { t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS) } + if meta.enableTLS { + if meta.ca != testData.authParams["ca"] { + t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS) + } + if meta.cert != testData.authParams["cert"] { + t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert) + } + if meta.key != testData.authParams["key"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key) + } + } } } diff --git a/tests/scalers/kafka.test.ts b/tests/scalers/kafka.test.ts index 102190c8148..a1fb8a077a5 100644 --- a/tests/scalers/kafka.test.ts +++ b/tests/scalers/kafka.test.ts @@ -1,311 +1,233 @@ import * as fs from 'fs' import * as sh from 'shelljs' import * as tmp from 'tmp' -import test from 'ava'; -import { createNamespace } from './helpers'; +import test, { Assertions } from 'ava'; +import { createNamespace, waitForDeploymentReplicaCount } from './helpers'; const defaultNamespace = 'kafka-test' const defaultCluster = 'kafka-cluster' const timeToWait = 300 const defaultTopic = 'kafka-topic' const defaultTopic2 = 'kafka-topic-2' +const zeroInvalidOffsetTopic = 'kafka-topic-zero-invalid-offset' +const oneInvalidOffsetTopic = 'kafka-topic-one-invalid-offset' +const invalidOffsetGroup = 'invalidOffset' const defaultKafkaClient = 'kafka-client' -const strimziOperatorVersion = '0.18.0' -const commandToCheckReplicas = `kubectl get deployments/kafka-consumer --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"` +const strimziOperatorVersion = '0.23.0' +const bootstrapServer = `${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092` const strimziOperatorYamlFile = tmp.fileSync() const kafkaClusterYamlFile = tmp.fileSync() const kafkaTopicYamlFile = tmp.fileSync() const kafkaClientYamlFile = tmp.fileSync() -const kafkaApplicationLatestYamlFile = tmp.fileSync() -const kafkaApplicationEarliestYamlFile = tmp.fileSync() -const kafkaApplicationMultipleTopicsYamlFile = tmp.fileSync() -const scaledObjectEarliestYamlFile = tmp.fileSync() -const scaledObjectLatestYamlFile = tmp.fileSync() -const scaledObjectMultipleTopicsYamlFile = tmp.fileSync() - -test.before('Set up, create necessary resources.', t => { - sh.config.silent = true - createNamespace(defaultNamespace) - - const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout - fs.writeFileSync(strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`)) - t.is( - 0, - sh.exec(`kubectl apply -f ${strimziOperatorYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Strimzi operator should work.' - ) +const kafkaApplicationYamlFile = tmp.fileSync() +const scaledObjectYamlFile = tmp.fileSync() - fs.writeFileSync(kafkaClusterYamlFile.name, kafkaClusterYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaClusterYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka cluster instance should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait kafka/${defaultCluster} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka instance should be ready within given time limit.' - ) +function deployFromYaml(t: Assertions, filename: string, yaml: string, name: string) { + sh.exec(`echo Deploying ${name}`) + fs.writeFileSync(filename, yaml) + t.is(0, sh.exec(`kubectl apply -f ${filename} --namespace ${defaultNamespace}`).code, `Deploying ${name} should work.`) +} - fs.writeFileSync(kafkaTopicYamlFile.name, kafkaTopicsYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaTopicYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka topic should work.' - ) - t.is( +function waitForReady(t: Assertions, app: string, name: string, condition: string = 'Ready') { + sh.exec(`echo Waiting for ${app} for ${timeToWait} seconds to be ${condition}`) + t.is( 0, - sh.exec(`kubectl wait kafkatopic/${defaultTopic} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka topic should be ready withlanguage-mattersin given time limit.' - ) - t.is( - 0, - sh.exec(`kubectl wait kafkatopic/${defaultTopic2} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka topic2 should be ready within given time limit.' + sh.exec(`kubectl wait ${app} --for=condition=${condition} --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, + `${name} should be ready within given time limit.` ) +} - fs.writeFileSync(kafkaClientYamlFile.name, kafkaClientYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaClientYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka client should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait pod/${defaultKafkaClient} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka client should be ready within given time limit.' - ) +function commitPartition(topic: string, group: string) { + sh.exec(`echo Committing partition for ${topic}:${group}`) + return sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${bootstrapServer}" --topic ${topic} --group ${group} --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`).code == 0 +} - fs.writeFileSync(kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml) +function publishMessage(topic: string) { + sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${bootstrapServer} --topic ${topic}'`) + sh.exec(`sleep 5s`) +} - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaApplicationEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka application should work.' +function cleanup(t: Assertions) { + t.is( + 0, + sh.exec(`kubectl delete -f ${scaledObjectYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting Scaled Object should work.' ) - fs.writeFileSync(scaledObjectEarliestYamlFile.name, scaledObjectEarliestYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${scaledObjectEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait deployment/kafka-consumer --for=condition=Available --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka application should be ready within given time limit.' - ) - waitForReplicaCount(0, commandToCheckReplicas) - t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') -}); - -function waitForReplicaCount(desiredReplicaCount: number, commandToCheck: string) { - let replicaCount = undefined - let changed = undefined - for (let i = 0; i < 10; i++) { - changed = false - // checks the replica count 3 times, it tends to fluctuate from the beginning - for (let j = 0; j < 3; j++) { - replicaCount = sh.exec(commandToCheck).stdout - if (replicaCount === desiredReplicaCount.toString()) { - sh.exec('sleep 2s') - } else { - changed = true - break - } - } - if (changed === false) { - return - } else { - sh.exec('sleep 3s') - } - } + t.is( + 0, + sh.exec(`kubectl delete -f ${kafkaApplicationYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting kafka application should work.' + ) + sh.exec(`sleep 10s`) } -test.serial('Scale application with kafka messages.', t => { - for (let r = 1; r <= 3; r++) { - - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) +test.before('Set up, create necessary resources.', async t => { + createNamespace(defaultNamespace) - waitForReplicaCount(r, commandToCheckReplicas) + sh.config.silent = true + const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout + sh.config.silent = false + + deployFromYaml(t, strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`), 'Strimzi operator') + deployFromYaml(t, kafkaClusterYamlFile.name, kafkaClusterYaml, 'Kafka cluster') + waitForReady(t, `kafka/${defaultCluster}`,'Kafka instance') + + var kafkaTopicsYaml = + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', defaultTopic).replace('{{PARTITIONS}}', '3') + + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', defaultTopic2).replace('{{PARTITIONS}}', '3') + + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', zeroInvalidOffsetTopic).replace('{{PARTITIONS}}', '1') + + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', oneInvalidOffsetTopic).replace('{{PARTITIONS}}', '1') + deployFromYaml(t, kafkaTopicYamlFile.name, kafkaTopicsYaml, 'Kafka topic') + waitForReady(t, `kafkatopic/${defaultTopic}`,defaultTopic) + waitForReady(t, `kafkatopic/${defaultTopic2}`,defaultTopic2) + waitForReady(t, `kafkatopic/${zeroInvalidOffsetTopic}`,zeroInvalidOffsetTopic) + waitForReady(t, `kafkatopic/${oneInvalidOffsetTopic}`,oneInvalidOffsetTopic) + + deployFromYaml(t, kafkaClientYamlFile.name, kafkaClientYaml, 'Kafka client') + waitForReady(t, `pod/${defaultKafkaClient}`,'Kafka client') +}); - t.is(r.toString(), sh.exec(commandToCheckReplicas).stdout, `Replica count should be ${r}.`) - } -}) +test.serial('Applying ScaledObject earliest policy should not scale up pods', async t => { -test.serial('Scale application beyond partition max.', t => { - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationEarliestYaml, 'Kafka application') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectEarliestYaml, 'Scaled Object') + waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') - waitForReplicaCount(3, commandToCheckReplicas) + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), 'replica count should start out as 0') +}); - t.is('3', sh.exec(commandToCheckReplicas).stdout, `Replica count should be 3.`) +test.serial('Scale application with kafka messages.', async t => { + for (let r = 1; r <= 3; r++) { + publishMessage(defaultTopic) + t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) + } }) -test.serial('cleanup after earliest policy test', t=> { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) - - sh.exec(`sleep 30s`) +test.serial('Scale application beyond partition max.', async t => { + publishMessage(defaultTopic) + t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) -test.serial('Applying ScaledObject latest policy should not scale up pods', t => { +test.serial('cleanup after earliest policy test', t => cleanup(t)) + +test.serial('Applying ScaledObject latest policy should not scale up pods', async t => { //Make the consumer commit the first offset for each partition. - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group latest --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) + t.true(commitPartition(defaultTopic, 'latest'), 'Commit partition should work') - fs.writeFileSync(kafkaApplicationLatestYamlFile.name, kafkaApplicationLatestYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaApplicationLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka application should work.' - ) + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationLatestYaml, 'Kafka application') sh.exec(`sleep 10s`) - fs.writeFileSync(scaledObjectLatestYamlFile.name, scaledObjectLatestYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${scaledObjectLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Scaled Object should work.' - ) + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectLatestYaml, 'Scaled Object') sh.exec(`sleep 5s`) - waitForReplicaCount(1, commandToCheckReplicas) - t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) }) - -test.serial('Latest Scale object should scale with new messages', t => { +test.serial('Latest Scale object should scale with new messages', async t => { for (let r = 1; r <= 3; r++) { - - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) - - waitForReplicaCount(r, commandToCheckReplicas) - - t.is(r.toString(), sh.exec(commandToCheckReplicas).stdout, `Replica count should be ${r}.`) + publishMessage(defaultTopic) + t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) } }) -test.serial('Cleanup after latest policy test', t=> { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) - sh.exec(`sleep 10s`) -}) +test.serial('Cleanup after latest policy test', t => cleanup(t)) -test.serial('Applying ScaledObject with multiple topics should scale up pods', t => { +test.serial('Applying ScaledObject with multiple topics should scale up pods', async t => { // Make the consumer commit the all offsets for all topics in the group - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic2} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) - - fs.writeFileSync(kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaApplicationMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka application should work.' - ) - sh.exec(`sleep 5s`) - fs.writeFileSync(scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml) + t.true(commitPartition(defaultTopic, 'multiTopic'), 'Commit partition should work') + t.true(commitPartition(defaultTopic2, 'multiTopic'), 'Commit partition should work') - t.is( - 0, - sh.exec(`kubectl apply -f ${scaledObjectMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Scaled Object should work.' - ) + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationMultipleTopicsYaml, 'Kafka application') + sh.exec(`sleep 5s`) + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectMultipleTopicsYaml, 'Scaled Object') sh.exec(`sleep 5s`) // when lag is 0, scaled object is not active, replica = 0 - waitForReplicaCount(0, commandToCheckReplicas) - t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) // produce a single msg to the default topic // should turn scale object active, replica = 1 - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) - waitForReplicaCount(1, commandToCheckReplicas) - t.is('1', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 1.') + publishMessage(defaultTopic) + t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) // produce one more msg to the different topic within the same group // will turn total consumer group lag to 2. // with lagThreshold as 1 -> making hpa AverageValue to 1 // this should turn nb of replicas to 2 // as desiredReplicaCount = totalLag / avgThreshold - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic2}'`) - sh.exec(`sleep 5s`) - waitForReplicaCount(2, commandToCheckReplicas) - t.is('2', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 2.') + publishMessage(defaultTopic2) + t.true(await waitForDeploymentReplicaCount(2, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 2.`) // make it 3 cause why not? - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) - waitForReplicaCount(3, commandToCheckReplicas) - t.is('3', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 3.') + publishMessage(defaultTopic) + t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) -test.serial('Cleanup after multiple topics test', t=> { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) +test.serial('Cleanup after multiple topics test', t => cleanup(t)) + +test.serial('Applying ScaledObject zeroOnInvalidOffset policy should not scale up pods', async t => { + + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationZeroOnInvalidYaml, 'Kafka application') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectZeroOnInvalidOffsetYaml, 'Scaled Object') + waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') + sh.exec(`sleep 30s`) + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 3000), `Replica count should be 0.`) }) +test.serial('cleanup after zeroOnInvalidOffset policy test', t => cleanup(t)) -test.after.always('Clean up, delete created resources.', t => { - const resources = [ - `${scaledObjectEarliestYamlFile.name}`, - `${scaledObjectLatestYamlFile.name}`, - `${scaledObjectMultipleTopicsYamlFile.name}`, +test.serial('Applying ScaledObject oneOnInvalidOffset policy should scale to one pod', async t => { + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationOneOnInvalidYaml, 'Kafka application') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectOneOnInvalidOffsetYaml, 'Scaled Object') + waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') + sh.exec(`sleep 30s`) + t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) +}) - `${kafkaApplicationEarliestYamlFile.name}`, - `${kafkaApplicationLatestYamlFile.name}`, - `${kafkaApplicationMultipleTopicsYamlFile.name}`, +test.serial('oneOnInvalidOffset Scale object should scale to zero when offset is set', async t => { + t.true(commitPartition(oneInvalidOffsetTopic, invalidOffsetGroup), 'Commit partition should work') + publishMessage(oneInvalidOffsetTopic) + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 60, 10000), `Replica count should scale down to 0.`) +}) + +test.serial('cleanup after oneOnInvalidOffset policy test', t => cleanup(t)) + +test.after.always('Clean up, delete created resources.', t => { + const resources = [ `${kafkaClientYamlFile.name}`, `${kafkaTopicYamlFile.name}`, `${kafkaClusterYamlFile.name}`, - `${strimziOperatorYamlFile}` + `${strimziOperatorYamlFile.name}` ] for (const resource of resources) { - sh.exec(`kubectl delete ${resource} --namespace ${defaultNamespace}`) + sh.exec(`echo Deleting resource from file ${resource}`) + sh.exec(`kubectl delete -f ${resource} --namespace ${defaultNamespace}`) } + sh.exec(`echo Deleting namespace ${defaultNamespace}`) sh.exec(`kubectl delete namespace ${defaultNamespace}`) }) -const kafkaClusterYaml = `apiVersion: kafka.strimzi.io/v1beta1 +const kafkaClusterYaml = `apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: ${defaultCluster} namespace: ${defaultNamespace} spec: kafka: - version: 2.5.0 - replicas: 3 + version: "2.6.0" + replicas: 1 listeners: - plain: {} - tls: {} + - name: plain + port: 9092 + type: internal + tls: false + - name: tls + port: 9093 + type: internal + tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 @@ -321,33 +243,21 @@ spec: topicOperator: {} userOperator: {}` -const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta1 +const kafkaTopicsTemplateYaml = `apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: - name: ${defaultTopic} + name: {{TOPIC}} labels: strimzi.io/cluster: ${defaultCluster} namespace: ${defaultNamespace} spec: - partitions: 3 + partitions: {{PARTITIONS}} replicas: 1 config: retention.ms: 604800000 segment.bytes: 1073741824 --- -apiVersion: kafka.strimzi.io/v1beta1 -kind: KafkaTopic -metadata: - name: ${defaultTopic2} - labels: - strimzi.io/cluster: ${defaultCluster} - namespace: ${defaultNamespace} -spec: - partitions: 3 - replicas: 1 - config: - retention.ms: 604800000 - segment.bytes: 1073741824` +` const kafkaClientYaml = `apiVersion: v1 kind: Pod @@ -363,7 +273,7 @@ spec: - -c - "exec tail -f /dev/null"` -const kafkaApplicationLatestYaml = ` +const kafkaApplicationTemplateYaml = ` apiVersion: apps/v1 kind: Deployment metadata: @@ -386,33 +296,12 @@ spec: command: - sh - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group latest --consumer-property enable.auto.commit=false"` + - "kafka-console-consumer --bootstrap-server ${bootstrapServer} PARAMS --consumer-property enable.auto.commit=COMMIT"` - -const kafkaApplicationEarliestYaml = ` -apiVersion: apps/v1 -kind: Deployment -metadata: - name: kafka-consumer - namespace: ${defaultNamespace} - labels: - app: kafka-consumer -spec: - selector: - matchLabels: - app: kafka-consumer - template: - metadata: - labels: - app: kafka-consumer - spec: - containers: - - name: kafka-consumer - image: confluentinc/cp-kafka:5.2.1 - command: - - sh - - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group earliest --from-beginning --consumer-property enable.auto.commit=false"` +const kafkaApplicationEarliestYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${defaultTopic} --group earliest --from-beginning`).replace(/COMMIT/g, 'false') +const kafkaApplicationLatestYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${defaultTopic} --group latest`).replace(/COMMIT/g, 'false') +const kafkaApplicationZeroOnInvalidYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${zeroInvalidOffsetTopic} --group ${invalidOffsetGroup}`).replace(/COMMIT/g, 'true') +const kafkaApplicationOneOnInvalidYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${oneInvalidOffsetTopic} --group ${invalidOffsetGroup} --from-beginning`).replace(/COMMIT/g, 'true') const kafkaApplicationMultipleTopicsYaml = ` apiVersion: apps/v1 @@ -440,18 +329,18 @@ spec: command: - sh - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic '${defaultTopic}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" + - "kafka-console-consumer --bootstrap-server ${bootstrapServer} --topic '${defaultTopic}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" - name: kafka-consumer-2 image: confluentinc/cp-kafka:5.2.1 command: - sh - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic '${defaultTopic2}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false"` + - "kafka-console-consumer --bootstrap-server ${bootstrapServer} --topic '${defaultTopic2}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false"` -const scaledObjectEarliestYaml = `apiVersion: keda.sh/v1alpha1 +const scaledObjectTemplateYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: - name: kafka-consumer-earliest + name: kafka-consumer-GROUP namespace: ${defaultNamespace} spec: scaleTargetRef: @@ -460,15 +349,18 @@ spec: - type: kafka metadata: topic: ${defaultTopic} - bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 - consumerGroup: earliest + bootstrapServers: ${bootstrapServer} + consumerGroup: GROUP lagThreshold: '1' - offsetResetPolicy: 'earliest'` + offsetResetPolicy: 'GROUP'` -const scaledObjectLatestYaml = `apiVersion: keda.sh/v1alpha1 +const scaledObjectEarliestYaml = scaledObjectTemplateYaml.replace(/GROUP/g, 'earliest') +const scaledObjectLatestYaml = scaledObjectTemplateYaml.replace(/GROUP/g, 'latest') + +const scaledObjectMultipleTopicsYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: - name: kafka-consumer-latest + name: kafka-consumer-multi-topic namespace: ${defaultNamespace} spec: scaleTargetRef: @@ -476,16 +368,15 @@ spec: triggers: - type: kafka metadata: - topic: ${defaultTopic} - bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 - consumerGroup: latest + bootstrapServers: ${bootstrapServer} + consumerGroup: multiTopic lagThreshold: '1' offsetResetPolicy: 'latest'` -const scaledObjectMultipleTopicsYaml = `apiVersion: keda.sh/v1alpha1 +const scaledObjectInvalidOffsetTemplateYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: - name: kafka-consumer-multi-topic + name: kafka-consumer-on-invalid namespace: ${defaultNamespace} spec: scaleTargetRef: @@ -493,7 +384,12 @@ spec: triggers: - type: kafka metadata: - bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 - consumerGroup: multiTopic + topic: TOPIC + bootstrapServers: ${bootstrapServer} + consumerGroup: ${invalidOffsetGroup} lagThreshold: '1' + scaleToZeroOnInvalidOffset: 'VALUE' offsetResetPolicy: 'latest'` + +const scaledObjectZeroOnInvalidOffsetYaml = scaledObjectInvalidOffsetTemplateYaml.replace(/TOPIC/g, zeroInvalidOffsetTopic).replace(/VALUE/g, 'true') +const scaledObjectOneOnInvalidOffsetYaml = scaledObjectInvalidOffsetTemplateYaml.replace(/TOPIC/g, oneInvalidOffsetTopic).replace(/VALUE/g, 'false')