From b1042f0609ef4a693c9bea82017672ad76bc8ae0 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Wed, 30 Mar 2022 12:49:14 +0300 Subject: [PATCH] Add tests for new scaleToZeroOnInvalidOffset configuration Signed-off-by: Ram Cohen --- pkg/scalers/kafka_scaler.go | 2 +- tests/scalers/kafka.test.ts | 263 +++++++++++++++++------------------- 2 files changed, 122 insertions(+), 143 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 2ce215532b8..d0ff5d393b5 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -360,7 +360,7 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset retVal = 0 } errMsg := fmt.Errorf( - "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Scaling to %d", + "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d", topic, s.metadata.group, partitionID, retVal) kafkaLog.V(0).Info(errMsg.Error()) return retVal, errMsg diff --git a/tests/scalers/kafka.test.ts b/tests/scalers/kafka.test.ts index 45368034f34..a1fb8a077a5 100644 --- a/tests/scalers/kafka.test.ts +++ b/tests/scalers/kafka.test.ts @@ -9,19 +9,19 @@ const defaultCluster = 'kafka-cluster' const timeToWait = 300 const defaultTopic = 'kafka-topic' const defaultTopic2 = 'kafka-topic-2' +const zeroInvalidOffsetTopic = 'kafka-topic-zero-invalid-offset' +const oneInvalidOffsetTopic = 'kafka-topic-one-invalid-offset' +const invalidOffsetGroup = 'invalidOffset' const defaultKafkaClient = 'kafka-client' const strimziOperatorVersion = '0.23.0' +const bootstrapServer = `${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092` const strimziOperatorYamlFile = tmp.fileSync() const kafkaClusterYamlFile = tmp.fileSync() const kafkaTopicYamlFile = tmp.fileSync() const kafkaClientYamlFile = tmp.fileSync() -const kafkaApplicationLatestYamlFile = tmp.fileSync() -const kafkaApplicationEarliestYamlFile = tmp.fileSync() -const kafkaApplicationMultipleTopicsYamlFile = tmp.fileSync() -const scaledObjectEarliestYamlFile = tmp.fileSync() -const scaledObjectLatestYamlFile = tmp.fileSync() -const scaledObjectMultipleTopicsYamlFile = tmp.fileSync() +const kafkaApplicationYamlFile = tmp.fileSync() +const scaledObjectYamlFile = tmp.fileSync() function deployFromYaml(t: Assertions, filename: string, yaml: string, name: string) { sh.exec(`echo Deploying ${name}`) @@ -38,6 +38,30 @@ function waitForReady(t: Assertions, app: string, name: string, condition: strin ) } +function commitPartition(topic: string, group: string) { + sh.exec(`echo Committing partition for ${topic}:${group}`) + return sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${bootstrapServer}" --topic ${topic} --group ${group} --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`).code == 0 +} + +function publishMessage(topic: string) { + sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${bootstrapServer} --topic ${topic}'`) + sh.exec(`sleep 5s`) +} + +function cleanup(t: Assertions) { + t.is( + 0, + sh.exec(`kubectl delete -f ${scaledObjectYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting Scaled Object should work.' + ) + t.is( + 0, + sh.exec(`kubectl delete -f ${kafkaApplicationYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting kafka application should work.' + ) + sh.exec(`sleep 10s`) +} + test.before('Set up, create necessary resources.', async t => { createNamespace(defaultNamespace) @@ -49,15 +73,25 @@ test.before('Set up, create necessary resources.', async t => { deployFromYaml(t, kafkaClusterYamlFile.name, kafkaClusterYaml, 'Kafka cluster') waitForReady(t, `kafka/${defaultCluster}`,'Kafka instance') + var kafkaTopicsYaml = + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', defaultTopic).replace('{{PARTITIONS}}', '3') + + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', defaultTopic2).replace('{{PARTITIONS}}', '3') + + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', zeroInvalidOffsetTopic).replace('{{PARTITIONS}}', '1') + + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', oneInvalidOffsetTopic).replace('{{PARTITIONS}}', '1') deployFromYaml(t, kafkaTopicYamlFile.name, kafkaTopicsYaml, 'Kafka topic') - waitForReady(t, `kafkatopic/${defaultTopic}`,'Kafka topic') - waitForReady(t, `kafkatopic/${defaultTopic2}`,'Kafka topic2') + waitForReady(t, `kafkatopic/${defaultTopic}`,defaultTopic) + waitForReady(t, `kafkatopic/${defaultTopic2}`,defaultTopic2) + waitForReady(t, `kafkatopic/${zeroInvalidOffsetTopic}`,zeroInvalidOffsetTopic) + waitForReady(t, `kafkatopic/${oneInvalidOffsetTopic}`,oneInvalidOffsetTopic) deployFromYaml(t, kafkaClientYamlFile.name, kafkaClientYaml, 'Kafka client') waitForReady(t, `pod/${defaultKafkaClient}`,'Kafka client') +}); + +test.serial('Applying ScaledObject earliest policy should not scale up pods', async t => { - deployFromYaml(t, kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml, 'Kafka application') - deployFromYaml(t, scaledObjectEarliestYamlFile.name, scaledObjectEarliestYaml, 'Scaled Object') + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationEarliestYaml, 'Kafka application') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectEarliestYaml, 'Scaled Object') waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), 'replica count should start out as 0') @@ -65,82 +99,48 @@ test.before('Set up, create necessary resources.', async t => { test.serial('Scale application with kafka messages.', async t => { for (let r = 1; r <= 3; r++) { - - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) - + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) } }) test.serial('Scale application beyond partition max.', async t => { - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) - + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) -test.serial('cleanup after earliest policy test', t=> { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) - - sh.exec(`sleep 30s`) -}) +test.serial('cleanup after earliest policy test', t => cleanup(t)) test.serial('Applying ScaledObject latest policy should not scale up pods', async t => { //Make the consumer commit the first offset for each partition. - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group latest --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) + t.true(commitPartition(defaultTopic, 'latest'), 'Commit partition should work') - deployFromYaml(t, kafkaApplicationLatestYamlFile.name, kafkaApplicationLatestYaml, 'Kafka application') + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationLatestYaml, 'Kafka application') sh.exec(`sleep 10s`) - deployFromYaml(t, scaledObjectLatestYamlFile.name, scaledObjectLatestYaml, 'Scaled Object') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectLatestYaml, 'Scaled Object') sh.exec(`sleep 5s`) t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) }) - test.serial('Latest Scale object should scale with new messages', async t => { for (let r = 1; r <= 3; r++) { - - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) - + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) } }) -test.serial('Cleanup after latest policy test', t=> { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) - sh.exec(`sleep 10s`) -}) +test.serial('Cleanup after latest policy test', t => cleanup(t)) test.serial('Applying ScaledObject with multiple topics should scale up pods', async t => { // Make the consumer commit the all offsets for all topics in the group - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic2} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) + t.true(commitPartition(defaultTopic, 'multiTopic'), 'Commit partition should work') + t.true(commitPartition(defaultTopic2, 'multiTopic'), 'Commit partition should work') - deployFromYaml(t, kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml, 'Kafka application') + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationMultipleTopicsYaml, 'Kafka application') sh.exec(`sleep 5s`) - deployFromYaml(t, scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml, ' Scaled Object') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectMultipleTopicsYaml, 'Scaled Object') sh.exec(`sleep 5s`) // when lag is 0, scaled object is not active, replica = 0 @@ -148,8 +148,7 @@ test.serial('Applying ScaledObject with multiple topics should scale up pods', a // produce a single msg to the default topic // should turn scale object active, replica = 1 - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) // produce one more msg to the different topic within the same group @@ -157,44 +156,50 @@ test.serial('Applying ScaledObject with multiple topics should scale up pods', a // with lagThreshold as 1 -> making hpa AverageValue to 1 // this should turn nb of replicas to 2 // as desiredReplicaCount = totalLag / avgThreshold - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic2}'`) - sh.exec(`sleep 5s`) + publishMessage(defaultTopic2) t.true(await waitForDeploymentReplicaCount(2, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 2.`) // make it 3 cause why not? - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) -test.serial('Cleanup after multiple topics test', t=> { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) +test.serial('Cleanup after multiple topics test', t => cleanup(t)) + +test.serial('Applying ScaledObject zeroOnInvalidOffset policy should not scale up pods', async t => { + + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationZeroOnInvalidYaml, 'Kafka application') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectZeroOnInvalidOffsetYaml, 'Scaled Object') + waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') + sh.exec(`sleep 30s`) + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 3000), `Replica count should be 0.`) }) +test.serial('cleanup after zeroOnInvalidOffset policy test', t => cleanup(t)) -test.after.always('Clean up, delete created resources.', t => { - const resources = [ - `${scaledObjectEarliestYamlFile.name}`, - `${scaledObjectLatestYamlFile.name}`, - `${scaledObjectMultipleTopicsYamlFile.name}`, +test.serial('Applying ScaledObject oneOnInvalidOffset policy should scale to one pod', async t => { + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationOneOnInvalidYaml, 'Kafka application') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectOneOnInvalidOffsetYaml, 'Scaled Object') + waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') + sh.exec(`sleep 30s`) + t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) +}) + +test.serial('oneOnInvalidOffset Scale object should scale to zero when offset is set', async t => { + t.true(commitPartition(oneInvalidOffsetTopic, invalidOffsetGroup), 'Commit partition should work') + publishMessage(oneInvalidOffsetTopic) + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 60, 10000), `Replica count should scale down to 0.`) +}) - `${kafkaApplicationEarliestYamlFile.name}`, - `${kafkaApplicationLatestYamlFile.name}`, - `${kafkaApplicationMultipleTopicsYamlFile.name}`, +test.serial('cleanup after oneOnInvalidOffset policy test', t => cleanup(t)) + +test.after.always('Clean up, delete created resources.', t => { + const resources = [ `${kafkaClientYamlFile.name}`, `${kafkaTopicYamlFile.name}`, `${kafkaClusterYamlFile.name}`, - `${strimziOperatorYamlFile}` + `${strimziOperatorYamlFile.name}` ] for (const resource of resources) { @@ -213,7 +218,7 @@ metadata: spec: kafka: version: "2.6.0" - replicas: 3 + replicas: 1 listeners: - name: plain port: 9092 @@ -238,33 +243,21 @@ spec: topicOperator: {} userOperator: {}` -const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta2 +const kafkaTopicsTemplateYaml = `apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: - name: ${defaultTopic} + name: {{TOPIC}} labels: strimzi.io/cluster: ${defaultCluster} namespace: ${defaultNamespace} spec: - partitions: 3 + partitions: {{PARTITIONS}} replicas: 1 config: retention.ms: 604800000 segment.bytes: 1073741824 --- -apiVersion: kafka.strimzi.io/v1beta2 -kind: KafkaTopic -metadata: - name: ${defaultTopic2} - labels: - strimzi.io/cluster: ${defaultCluster} - namespace: ${defaultNamespace} -spec: - partitions: 3 - replicas: 1 - config: - retention.ms: 604800000 - segment.bytes: 1073741824` +` const kafkaClientYaml = `apiVersion: v1 kind: Pod @@ -280,7 +273,7 @@ spec: - -c - "exec tail -f /dev/null"` -const kafkaApplicationLatestYaml = ` +const kafkaApplicationTemplateYaml = ` apiVersion: apps/v1 kind: Deployment metadata: @@ -303,33 +296,12 @@ spec: command: - sh - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group latest --consumer-property enable.auto.commit=false"` - + - "kafka-console-consumer --bootstrap-server ${bootstrapServer} PARAMS --consumer-property enable.auto.commit=COMMIT"` -const kafkaApplicationEarliestYaml = ` -apiVersion: apps/v1 -kind: Deployment -metadata: - name: kafka-consumer - namespace: ${defaultNamespace} - labels: - app: kafka-consumer -spec: - selector: - matchLabels: - app: kafka-consumer - template: - metadata: - labels: - app: kafka-consumer - spec: - containers: - - name: kafka-consumer - image: confluentinc/cp-kafka:5.2.1 - command: - - sh - - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group earliest --from-beginning --consumer-property enable.auto.commit=false"` +const kafkaApplicationEarliestYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${defaultTopic} --group earliest --from-beginning`).replace(/COMMIT/g, 'false') +const kafkaApplicationLatestYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${defaultTopic} --group latest`).replace(/COMMIT/g, 'false') +const kafkaApplicationZeroOnInvalidYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${zeroInvalidOffsetTopic} --group ${invalidOffsetGroup}`).replace(/COMMIT/g, 'true') +const kafkaApplicationOneOnInvalidYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${oneInvalidOffsetTopic} --group ${invalidOffsetGroup} --from-beginning`).replace(/COMMIT/g, 'true') const kafkaApplicationMultipleTopicsYaml = ` apiVersion: apps/v1 @@ -357,18 +329,18 @@ spec: command: - sh - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic '${defaultTopic}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" + - "kafka-console-consumer --bootstrap-server ${bootstrapServer} --topic '${defaultTopic}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" - name: kafka-consumer-2 image: confluentinc/cp-kafka:5.2.1 command: - sh - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic '${defaultTopic2}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false"` + - "kafka-console-consumer --bootstrap-server ${bootstrapServer} --topic '${defaultTopic2}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false"` -const scaledObjectEarliestYaml = `apiVersion: keda.sh/v1alpha1 +const scaledObjectTemplateYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: - name: kafka-consumer-earliest + name: kafka-consumer-GROUP namespace: ${defaultNamespace} spec: scaleTargetRef: @@ -377,15 +349,18 @@ spec: - type: kafka metadata: topic: ${defaultTopic} - bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 - consumerGroup: earliest + bootstrapServers: ${bootstrapServer} + consumerGroup: GROUP lagThreshold: '1' - offsetResetPolicy: 'earliest'` + offsetResetPolicy: 'GROUP'` -const scaledObjectLatestYaml = `apiVersion: keda.sh/v1alpha1 +const scaledObjectEarliestYaml = scaledObjectTemplateYaml.replace(/GROUP/g, 'earliest') +const scaledObjectLatestYaml = scaledObjectTemplateYaml.replace(/GROUP/g, 'latest') + +const scaledObjectMultipleTopicsYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: - name: kafka-consumer-latest + name: kafka-consumer-multi-topic namespace: ${defaultNamespace} spec: scaleTargetRef: @@ -393,16 +368,15 @@ spec: triggers: - type: kafka metadata: - topic: ${defaultTopic} - bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 - consumerGroup: latest + bootstrapServers: ${bootstrapServer} + consumerGroup: multiTopic lagThreshold: '1' offsetResetPolicy: 'latest'` -const scaledObjectMultipleTopicsYaml = `apiVersion: keda.sh/v1alpha1 +const scaledObjectInvalidOffsetTemplateYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: - name: kafka-consumer-multi-topic + name: kafka-consumer-on-invalid namespace: ${defaultNamespace} spec: scaleTargetRef: @@ -410,7 +384,12 @@ spec: triggers: - type: kafka metadata: - bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 - consumerGroup: multiTopic + topic: TOPIC + bootstrapServers: ${bootstrapServer} + consumerGroup: ${invalidOffsetGroup} lagThreshold: '1' + scaleToZeroOnInvalidOffset: 'VALUE' offsetResetPolicy: 'latest'` + +const scaledObjectZeroOnInvalidOffsetYaml = scaledObjectInvalidOffsetTemplateYaml.replace(/TOPIC/g, zeroInvalidOffsetTopic).replace(/VALUE/g, 'true') +const scaledObjectOneOnInvalidOffsetYaml = scaledObjectInvalidOffsetTemplateYaml.replace(/TOPIC/g, oneInvalidOffsetTopic).replace(/VALUE/g, 'false')