Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor Apache Kafka scaler config (kedacore#5804)
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Wozniak <[email protected]>
Signed-off-by: Ranjith Gopal <[email protected]>
wozniakjan authored and Ranjith Gopal committed Jun 24, 2024
1 parent 0b32629 commit fff7eac
Showing 4 changed files with 284 additions and 368 deletions.
408 changes: 116 additions & 292 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"strconv"
"strings"

"github.com/go-logr/logr"
@@ -49,41 +48,81 @@ type apacheKafkaScaler struct {
}

type apacheKafkaMetadata struct {
bootstrapServers []string
group string
topic []string
partitionLimitation []int32
lagThreshold int64
activationLagThreshold int64
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
excludePersistentLag bool
BootstrapServers []string `keda:"name=bootstrapServers, order=triggerMetadata;resolvedEnv"`
Group string `keda:"name=consumerGroup, order=triggerMetadata;resolvedEnv"`
Topic []string `keda:"name=topic, order=triggerMetadata;resolvedEnv, optional"`
PartitionLimitation []int `keda:"name=partitionLimitation, order=triggerMetadata, optional, range"`
LagThreshold int64 `keda:"name=lagThreshold, order=triggerMetadata, default=10"`
ActivationLagThreshold int64 `keda:"name=activationLagThreshold, order=triggerMetadata, default=0"`
OffsetResetPolicy offsetResetPolicy `keda:"name=offsetResetPolicy, order=triggerMetadata, enum=earliest;latest, default=latest"`
AllowIdleConsumers bool `keda:"name=allowIdleConsumers, order=triggerMetadata, optional"`
ExcludePersistentLag bool `keda:"name=excludePersistentLag, order=triggerMetadata, optional"`

// If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can
// occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612
scaleToZeroOnInvalidOffset bool
limitToPartitionsWithLag bool
ScaleToZeroOnInvalidOffset bool `keda:"name=scaleToZeroOnInvalidOffset, order=triggerMetadata, optional"`
LimitToPartitionsWithLag bool `keda:"name=limitToPartitionsWithLag, order=triggerMetadata, optional"`

// SASL
saslType kafkaSaslType
username string
password string
SASLType kafkaSaslType `keda:"name=sasl, order=triggerMetadata;authParams, enum=none;plaintext;scram_sha256;scram_sha512;gssapi;aws_msk_iam, default=none"`
Username string `keda:"name=username, order=authParams, optional"`
Password string `keda:"name=password, order=authParams, optional"`

// MSK
awsRegion string
awsEndpoint string
awsAuthorization awsutils.AuthorizationMetadata
AWSRegion string `keda:"name=awsRegion, order=triggerMetadata, optional"`
AWSEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"`
AWSAuthorization awsutils.AuthorizationMetadata

// TLS
enableTLS bool
cert string
key string
keyPassword string
ca string
TLS string `keda:"name=tls, order=triggerMetadata;authParams, enum=enable;disable, default=disable"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
CA string `keda:"name=ca, order=authParams, optional"`

triggerIndex int
}

func (a *apacheKafkaMetadata) enableTLS() bool {
return a.TLS == stringEnable
}

func (a *apacheKafkaMetadata) Validate() error {
if a.LagThreshold <= 0 {
return fmt.Errorf("lagThreshold must be a positive number")
}
if a.ActivationLagThreshold < 0 {
return fmt.Errorf("activationLagThreshold must be a positive number")
}
if a.AllowIdleConsumers && a.LimitToPartitionsWithLag {
return fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously")
}
if len(a.Topic) == 0 && a.LimitToPartitionsWithLag {
return fmt.Errorf("topic must be specified when using limitToPartitionsWithLag")
}
if len(a.Topic) == 0 && len(a.PartitionLimitation) > 0 {
// no specific topics set, ignoring partitionLimitation setting
a.PartitionLimitation = nil
}
if a.enableTLS() && ((a.Cert == "") != (a.Key == "")) {
return fmt.Errorf("can't set only one of cert or key when using TLS")
}
switch a.SASLType {
case KafkaSASLTypePlaintext:
if a.Username == "" || a.Password == "" {
return fmt.Errorf("username and password must be set when using SASL/PLAINTEXT")
}
case KafkaSASLTypeMskIam:
if a.AWSRegion == "" {
return fmt.Errorf("awsRegion must be set when using AWS MSK IAM")
}
if !a.enableTLS() {
return fmt.Errorf("TLS must be enabled when using AWS MSK IAM")
}
}
return nil
}

const (
KafkaSASLTypeMskIam = "aws_msk_iam"
)
@@ -95,13 +134,12 @@ func NewApacheKafkaScaler(ctx context.Context, config *scalersconfig.ScalerConfi
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "apache_kafka_scaler")

kafkaMetadata, err := parseApacheKafkaMetadata(config, logger)
kafkaMetadata, err := parseApacheKafkaMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing kafka metadata: %w", err)
}

logger := InitializeLogger(config, "apache_kafka_scaler")
client, err := getApacheKafkaClient(ctx, kafkaMetadata, logger)
if err != nil {
return nil, err
@@ -119,246 +157,32 @@ func NewApacheKafkaScaler(ctx context.Context, config *scalersconfig.ScalerConfi
}

func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apacheKafkaMetadata) error {
meta.enableTLS = false
enableTLS := false
if val, ok := config.TriggerMetadata["tls"]; ok {
switch val {
case stringEnable:
enableTLS = true
case stringDisable:
enableTLS = false
default:
return fmt.Errorf("error incorrect TLS value given, got %s", val)
}
}

if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)
if enableTLS {
return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together")
}
switch val {
case stringEnable:
enableTLS = true
case stringDisable:
enableTLS = false
default:
return fmt.Errorf("error incorrect TLS value given, got %s", val)
}
}

if enableTLS {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true
}

meta.saslType = KafkaSASLTypeNone
var saslAuthType string
switch {
case config.TriggerMetadata["sasl"] != "":
saslAuthType = config.TriggerMetadata["sasl"]
default:
saslAuthType = ""
if config.TriggerMetadata["sasl"] != "" && config.AuthParams["sasl"] != "" {
return errors.New("unable to set `sasl` in both ScaledObject and TriggerAuthentication together")
}
if val, ok := config.AuthParams["sasl"]; ok {
if saslAuthType != "" {
return errors.New("unable to set `sasl` in both ScaledObject and TriggerAuthentication together")
}
saslAuthType = val
if config.TriggerMetadata["tls"] != "" && config.AuthParams["tls"] != "" {
return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together")
}

if saslAuthType != "" {
saslAuthType = strings.TrimSpace(saslAuthType)
switch mode := kafkaSaslType(saslAuthType); mode {
case KafkaSASLTypeMskIam:
meta.saslType = mode
if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}
if !meta.enableTLS {
return errors.New("TLS is required for MSK")
}
if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return errors.New("no awsRegion given")
}
auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
if err != nil {
return err
}
meta.awsAuthorization = auth
case KafkaSASLTypePlaintext:
fallthrough
case KafkaSASLTypeSCRAMSHA256:
fallthrough
case KafkaSASLTypeSCRAMSHA512:
if val, ok := config.AuthParams["username"]; ok {
meta.username = strings.TrimSpace(val)
} else {
return errors.New("no username given")
}
if val, ok := config.AuthParams["password"]; ok {
meta.password = strings.TrimSpace(val)
} else {
return errors.New("no password given")
}
case KafkaSASLTypeOAuthbearer:
return errors.New("SASL/OAUTHBEARER is not implemented yet")
default:
return fmt.Errorf("err sasl type %q given", mode)
if meta.SASLType == KafkaSASLTypeMskIam {
auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
if err != nil {
return err
}
meta.AWSAuthorization = auth
}

return nil
}

func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (apacheKafkaMetadata, error) {
meta := apacheKafkaMetadata{}
switch {
case config.TriggerMetadata["bootstrapServersFromEnv"] != "":
meta.bootstrapServers = strings.Split(config.ResolvedEnv[config.TriggerMetadata["bootstrapServersFromEnv"]], ",")
case config.TriggerMetadata["bootstrapServers"] != "":
meta.bootstrapServers = strings.Split(config.TriggerMetadata["bootstrapServers"], ",")
default:
return meta, errors.New("no bootstrapServers given")
}

switch {
case config.TriggerMetadata["consumerGroupFromEnv"] != "":
meta.group = config.ResolvedEnv[config.TriggerMetadata["consumerGroupFromEnv"]]
case config.TriggerMetadata["consumerGroup"] != "":
meta.group = config.TriggerMetadata["consumerGroup"]
default:
return meta, errors.New("no consumer group given")
}

switch {
case config.TriggerMetadata["topicFromEnv"] != "":
meta.topic = strings.Split(config.ResolvedEnv[config.TriggerMetadata["topicFromEnv"]], ",")
case config.TriggerMetadata["topic"] != "":
meta.topic = strings.Split(config.TriggerMetadata["topic"], ",")
default:
meta.topic = []string{}
logger.V(1).Info(fmt.Sprintf("consumer group %q has no topics specified, "+
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

meta.partitionLimitation = nil
partitionLimitationMetadata := strings.TrimSpace(config.TriggerMetadata["partitionLimitation"])
if partitionLimitationMetadata != "" {
if meta.topic == nil || len(meta.topic) == 0 {
logger.V(1).Info("no specific topics set, ignoring partitionLimitation setting")
} else {
pattern := config.TriggerMetadata["partitionLimitation"]
parsed, err := kedautil.ParseInt32List(pattern)
if err != nil {
return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %w", pattern, err)
}
meta.partitionLimitation = parsed
logger.V(0).Info(fmt.Sprintf("partition limit active '%s'", pattern))
}
}

meta.offsetResetPolicy = defaultOffsetResetPolicy

if config.TriggerMetadata["offsetResetPolicy"] != "" {
policy := offsetResetPolicy(config.TriggerMetadata["offsetResetPolicy"])
if policy != earliest && policy != latest {
return meta, fmt.Errorf("err offsetResetPolicy policy %q given", policy)
}
meta.offsetResetPolicy = policy
}

meta.lagThreshold = defaultKafkaLagThreshold

if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %q: %w", lagThresholdMetricName, err)
}
if t <= 0 {
return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName)
}
meta.lagThreshold = t
}

meta.activationLagThreshold = defaultKafkaActivationLagThreshold

if val, ok := config.TriggerMetadata[activationLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %q: %w", activationLagThresholdMetricName, err)
}
if t < 0 {
return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName)
}
meta.activationLagThreshold = t
func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig) (apacheKafkaMetadata, error) {
meta := apacheKafkaMetadata{triggerIndex: config.TriggerIndex}
if err := config.TypedConfig(&meta); err != nil {
return meta, fmt.Errorf("error parsing kafka metadata: %w", err)
}

if err := parseApacheKafkaAuthParams(config, &meta); err != nil {
return meta, err
}

meta.allowIdleConsumers = false
if val, ok := config.TriggerMetadata["allowIdleConsumers"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err)
}
meta.allowIdleConsumers = t
}

meta.excludePersistentLag = false
if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err)
}
meta.excludePersistentLag = t
}

meta.scaleToZeroOnInvalidOffset = false
if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err)
}
meta.scaleToZeroOnInvalidOffset = t
}

meta.limitToPartitionsWithLag = false
if val, ok := config.TriggerMetadata["limitToPartitionsWithLag"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing limitToPartitionsWithLag: %w", err)
}
meta.limitToPartitionsWithLag = t

if meta.allowIdleConsumers && meta.limitToPartitionsWithLag {
return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously")
}
if len(meta.topic) == 0 && meta.limitToPartitionsWithLag {
return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag")
}
}

meta.triggerIndex = config.TriggerIndex
return meta, nil
}

@@ -367,51 +191,51 @@ func getApacheKafkaClient(ctx context.Context, metadata apacheKafkaMetadata, log
var tlsConfig *tls.Config
var err error

logger.V(4).Info(fmt.Sprintf("Kafka SASL type %s", metadata.saslType))
if metadata.enableTLS {
tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca, false)
logger.V(4).Info(fmt.Sprintf("Kafka SASL type %s", metadata.SASLType))
if metadata.enableTLS() {
tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.Cert, metadata.Key, metadata.KeyPassword, metadata.CA, false)
if err != nil {
return nil, err
}
}

switch metadata.saslType {
switch metadata.SASLType {
case KafkaSASLTypeNone:
saslMechanism = nil
case KafkaSASLTypePlaintext:
saslMechanism = plain.Mechanism{
Username: metadata.username,
Password: metadata.password,
Username: metadata.Username,
Password: metadata.Password,
}
case KafkaSASLTypeSCRAMSHA256:
saslMechanism, err = scram.Mechanism(scram.SHA256, metadata.username, metadata.password)
saslMechanism, err = scram.Mechanism(scram.SHA256, metadata.Username, metadata.Password)
if err != nil {
return nil, err
}
case KafkaSASLTypeSCRAMSHA512:
saslMechanism, err = scram.Mechanism(scram.SHA512, metadata.username, metadata.password)
saslMechanism, err = scram.Mechanism(scram.SHA512, metadata.Username, metadata.Password)
if err != nil {
return nil, err
}
case KafkaSASLTypeOAuthbearer:
return nil, errors.New("SASL/OAUTHBEARER is not implemented yet")
case KafkaSASLTypeMskIam:
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AWSRegion, metadata.AWSAuthorization)
if err != nil {
return nil, err
}

saslMechanism = aws_msk_iam_v2.NewMechanism(*cfg)
default:
return nil, fmt.Errorf("err sasl type %q given", metadata.saslType)
return nil, fmt.Errorf("err sasl type %q given", metadata.SASLType)
}

transport := &kafka.Transport{
TLS: tlsConfig,
SASL: saslMechanism,
}
client := kafka.Client{
Addr: kafka.TCP(metadata.bootstrapServers...),
Addr: kafka.TCP(metadata.BootstrapServers...),
Transport: transport,
}
if err != nil {
@@ -430,30 +254,30 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string]
}
s.logger.V(1).Info(fmt.Sprintf("Listed topics %v", metadata.Topics))

if len(s.metadata.topic) == 0 {
if len(s.metadata.Topic) == 0 {
// in case of empty topic name, we will get all topics that the consumer group is subscribed to
describeGrpReq := &kafka.DescribeGroupsRequest{
Addr: s.client.Addr,
GroupIDs: []string{
s.metadata.group,
s.metadata.Group,
},
}
describeGrp, err := s.client.DescribeGroups(ctx, describeGrpReq)
if err != nil {
return nil, fmt.Errorf("error describing group: %w", err)
}
if len(describeGrp.Groups[0].Members) == 0 {
return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.group, describeGrp.Groups[0].GroupState)
return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.Group, describeGrp.Groups[0].GroupState)
}
s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.group, describeGrp))
s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.Group, describeGrp))

result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
for _, partition := range topic.Partitions {
// if no partitions limitatitions are specified, all partitions are considered
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
if (len(s.metadata.PartitionLimitation) == 0) ||
(len(s.metadata.PartitionLimitation) > 0 && kedautil.Contains(s.metadata.PartitionLimitation, partition.ID)) {
partitions = append(partitions, partition.ID)
}
}
@@ -464,10 +288,10 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string]
result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
if kedautil.Contains(s.metadata.topic, topic.Name) {
if kedautil.Contains(s.metadata.Topic, topic.Name) {
for _, partition := range topic.Partitions {
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
if (len(s.metadata.PartitionLimitation) == 0) ||
(len(s.metadata.PartitionLimitation) > 0 && kedautil.Contains(s.metadata.PartitionLimitation, partition.ID)) {
partitions = append(partitions, partition.ID)
}
}
@@ -481,7 +305,7 @@ func (s *apacheKafkaScaler) getConsumerOffsets(ctx context.Context, topicPartiti
response, err := s.client.OffsetFetch(
ctx,
&kafka.OffsetFetchRequest{
GroupID: s.metadata.group,
GroupID: s.metadata.Group,
Topics: topicPartitions,
},
)
@@ -514,14 +338,14 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co
}

consumerOffset := consumerOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
if consumerOffset == invalidOffset && s.metadata.OffsetResetPolicy == latest {
retVal := int64(1)
if s.metadata.scaleToZeroOnInvalidOffset {
if s.metadata.ScaleToZeroOnInvalidOffset {
retVal = 0
}
msg := fmt.Sprintf(
"invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d",
topic, s.metadata.group, partitionID, retVal)
topic, s.metadata.Group, partitionID, retVal)
s.logger.V(1).Info(msg)
return retVal, retVal, nil
}
@@ -530,15 +354,15 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co
return 0, 0, fmt.Errorf("error finding partition offset for topic %s", topic)
}
producerOffset := producerOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
if s.metadata.scaleToZeroOnInvalidOffset {
if consumerOffset == invalidOffset && s.metadata.OffsetResetPolicy == earliest {
if s.metadata.ScaleToZeroOnInvalidOffset {
return 0, 0, nil
}
return producerOffset, producerOffset, nil
}

// This code block tries to prevent KEDA Kafka trigger from scaling the scale target based on erroneous events
if s.metadata.excludePersistentLag {
if s.metadata.ExcludePersistentLag {
switch previousOffset, found := s.previousOffsets[topic][partitionID]; {
case !found:
// No record of previous offset, so store current consumer offset
@@ -558,8 +382,8 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co
}
}

s.logger.V(4).Info(fmt.Sprintf("Consumer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.group, partitionID, consumerOffset))
s.logger.V(4).Info(fmt.Sprintf("Producer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.group, partitionID, producerOffset))
s.logger.V(4).Info(fmt.Sprintf("Consumer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.Group, partitionID, consumerOffset))
s.logger.V(4).Info(fmt.Sprintf("Producer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.Group, partitionID, producerOffset))

return producerOffset - consumerOffset, producerOffset - consumerOffset, nil
}
@@ -578,17 +402,17 @@ func (s *apacheKafkaScaler) Close(context.Context) error {

func (s *apacheKafkaScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var metricName string
if s.metadata.topic != nil && len(s.metadata.topic) > 0 {
metricName = fmt.Sprintf("kafka-%s", strings.Join(s.metadata.topic, ","))
if s.metadata.Topic != nil && len(s.metadata.Topic) > 0 {
metricName = fmt.Sprintf("kafka-%s", strings.Join(s.metadata.Topic, ","))
} else {
metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.group)
metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.Group)
}

externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(metricName)),
},
Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold),
Target: GetMetricTarget(s.metricType, s.metadata.LagThreshold),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: kafkaMetricType}
return []v2.MetricSpec{metricSpec}
@@ -639,7 +463,7 @@ func (s *apacheKafkaScaler) GetMetricsAndActivity(ctx context.Context, metricNam
}
metric := GenerateMetricInMili(metricName, float64(totalLag))

return []external_metrics.ExternalMetricValue{metric}, totalLagWithPersistent > s.metadata.activationLagThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, totalLagWithPersistent > s.metadata.ActivationLagThreshold, nil
}

// getTotalLag returns totalLag, totalLagWithPersistent, error
@@ -678,19 +502,19 @@ func (s *apacheKafkaScaler) getTotalLag(ctx context.Context) (int64, int64, erro
}
totalTopicPartitions += (int64)(len(partitionsOffsets))
}
s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, topicPartitions, s.metadata.lagThreshold))
s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, topicPartitions, s.metadata.LagThreshold))

s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Consumer offsets %v, producer offsets %v", consumerOffsets, producerOffsets))

if !s.metadata.allowIdleConsumers || s.metadata.limitToPartitionsWithLag {
if !s.metadata.AllowIdleConsumers || s.metadata.LimitToPartitionsWithLag {
// don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings
upperBound := totalTopicPartitions
if s.metadata.limitToPartitionsWithLag {
if s.metadata.LimitToPartitionsWithLag {
upperBound = partitionsWithLag
}

if (totalLag / s.metadata.lagThreshold) > upperBound {
totalLag = upperBound * s.metadata.lagThreshold
if (totalLag / s.metadata.LagThreshold) > upperBound {
totalLag = upperBound * s.metadata.LagThreshold
}
}
return totalLag, totalLagWithPersistent, nil
118 changes: 59 additions & 59 deletions pkg/scalers/apache_kafka_scaler_test.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ type parseApacheKafkaMetadataTestData struct {
brokers []string
group string
topic []string
partitionLimitation []int32
partitionLimitation []int
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
excludePersistentLag bool
@@ -68,13 +68,13 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{
// failure, no consumer group
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", nil, nil, "latest", false, false, false},
// success, no topics
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false},
// success, ignore partitionLimitation if no topics
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false},
// success, no limitation with whitespaced limitation value
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false},
// success, no limitation
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false},
// failure, lagThreshold is negative value
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// failure, lagThreshold is 0
@@ -86,11 +86,11 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{
// success
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, partitionLimitation as list
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false},
// success, partitionLimitation as range
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false},
// success, partitionLimitation mixed list + ranges
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false, false},
// failure, partitionLimitation wrong data type
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, more brokers
@@ -120,7 +120,7 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{
// success, allowIdleConsumers can be set when limitToPartitionsWithLag is false
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true", "limitToPartitionsWithLag": "false"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, false},
// failure, topic must be specified when limitToPartitionsWithLag is true
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, true},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, true},
}

var parseApacheKafkaAuthParamsTestDataset = []parseApacheKafkaAuthParamsTestData{
@@ -243,10 +243,10 @@ var apacheKafkaMetricIdentifiers = []apacheKafkaMetricIdentifier{

func TestApacheKafkaGetBrokers(t *testing.T) {
for _, testData := range parseApacheKafkaMetadataTestDataset {
meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithAuthParams}, logr.Discard())
meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithAuthParams})
getBrokerApacheKafkaTestBase(t, meta, testData, err)

meta, err = parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithoutAuthParams}, logr.Discard())
meta, err = parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithoutAuthParams})
getBrokerApacheKafkaTestBase(t, meta, testData, err)
}
}
@@ -258,77 +258,77 @@ func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testDa
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if len(meta.bootstrapServers) != testData.numBrokers {
t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers))
if len(meta.BootstrapServers) != testData.numBrokers {
t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.BootstrapServers))
}
if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) {
t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.bootstrapServers)
if !reflect.DeepEqual(testData.brokers, meta.BootstrapServers) {
t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.BootstrapServers)
}
if meta.group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.group)
if meta.Group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.Group)
}
if !reflect.DeepEqual(testData.topic, meta.topic) {
t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.topic)
if !reflect.DeepEqual(testData.topic, meta.Topic) {
t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.Topic)
}
if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) {
t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.partitionLimitation)
if !reflect.DeepEqual(testData.partitionLimitation, meta.PartitionLimitation) {
t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.PartitionLimitation)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
if err == nil && meta.OffsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.OffsetResetPolicy)
}
if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers {
t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers)
if err == nil && meta.AllowIdleConsumers != testData.allowIdleConsumers {
t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.AllowIdleConsumers)
}
if err == nil && meta.excludePersistentLag != testData.excludePersistentLag {
t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag)
if err == nil && meta.ExcludePersistentLag != testData.excludePersistentLag {
t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.ExcludePersistentLag)
}
if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag {
t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag)
if err == nil && meta.LimitToPartitionsWithLag != testData.limitToPartitionsWithLag {
t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.LimitToPartitionsWithLag)
}

expectedLagThreshold, er := parseExpectedLagThreshold(testData.metadata)
if er != nil {
t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"])
}

if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold {
t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold)
if meta.LagThreshold != expectedLagThreshold && meta.LagThreshold != defaultKafkaLagThreshold {
t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.LagThreshold, defaultKafkaLagThreshold, expectedLagThreshold)
}
}
func TestApacheKafkaAuthParams(t *testing.T) {
// Testing tls and sasl value in TriggerAuthentication
for _, testData := range parseApacheKafkaAuthParamsTestDataset {
meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: validApacheKafkaMetadata, AuthParams: testData.authParams}, logr.Discard())
for i, testData := range parseApacheKafkaAuthParamsTestDataset {
meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: validApacheKafkaMetadata, AuthParams: testData.authParams})

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
t.Error(i, "Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
t.Error(i, "Expected error but got success")
}
// we can ignore what tls is set if there is error
if err == nil && meta.enableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %#v but got %#v\n", testData.enableTLS, meta.enableTLS)
if err == nil && meta.enableTLS() != testData.enableTLS {
t.Errorf("%v Expected enableTLS to be set to %#v but got %#v\n", i, testData.enableTLS, meta.enableTLS())
}
if err == nil && meta.enableTLS {
if meta.ca != testData.authParams["ca"] {
t.Errorf("Expected ca to be set to %#v but got %#v\n", testData.authParams["ca"], meta.ca)
if err == nil && meta.enableTLS() {
if meta.CA != testData.authParams["ca"] {
t.Errorf("%v Expected ca to be set to %#v but got %#v\n", i, testData.authParams["ca"], meta.CA)
}
if meta.cert != testData.authParams["cert"] {
t.Errorf("Expected cert to be set to %#v but got %#v\n", testData.authParams["cert"], meta.cert)
if meta.Cert != testData.authParams["cert"] {
t.Errorf("%v Expected cert to be set to %#v but got %#v\n", i, testData.authParams["cert"], meta.Cert)
}
if meta.key != testData.authParams["key"] {
t.Errorf("Expected key to be set to %#v but got %#v\n", testData.authParams["key"], meta.key)
if meta.Key != testData.authParams["key"] {
t.Errorf("%v Expected key to be set to %#v but got %#v\n", i, testData.authParams["key"], meta.Key)
}
if meta.keyPassword != testData.authParams["keyPassword"] {
t.Errorf("Expected key to be set to %#v but got %#v\n", testData.authParams["keyPassword"], meta.key)
if meta.KeyPassword != testData.authParams["keyPassword"] {
t.Errorf("%v Expected key to be set to %#v but got %#v\n", i, testData.authParams["keyPassword"], meta.Key)
}
}
}

// Testing tls and sasl value in scaledObject
for id, testData := range parseApacheKafkaAuthParamsTestDataset2 {
meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}, logr.Discard())
meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})

if err != nil && !testData.isError {
t.Errorf("Test case: %#v. Expected success but got error %#v", id, err)
@@ -337,21 +337,21 @@ func TestApacheKafkaAuthParams(t *testing.T) {
t.Errorf("Test case: %#v. Expected error but got success", id)
}
if !testData.isError {
if testData.metadata["tls"] == stringTrue && !meta.enableTLS {
t.Errorf("Test case: %#v. Expected tls to be set to %#v but got %#v\n", id, testData.metadata["tls"], meta.enableTLS)
if testData.metadata["tls"] == stringTrue && !meta.enableTLS() {
t.Errorf("Test case: %#v. Expected tls to be set to %#v but got %#v\n", id, testData.metadata["tls"], meta.enableTLS())
}
if meta.enableTLS {
if meta.ca != testData.authParams["ca"] {
t.Errorf("Test case: %#v. Expected ca to be set to %#v but got %#v\n", id, testData.authParams["ca"], meta.ca)
if meta.enableTLS() {
if meta.CA != testData.authParams["ca"] {
t.Errorf("Test case: %#v. Expected ca to be set to %#v but got %#v\n", id, testData.authParams["ca"], meta.CA)
}
if meta.cert != testData.authParams["cert"] {
t.Errorf("Test case: %#v. Expected cert to be set to %#v but got %#v\n", id, testData.authParams["cert"], meta.cert)
if meta.Cert != testData.authParams["cert"] {
t.Errorf("Test case: %#v. Expected cert to be set to %#v but got %#v\n", id, testData.authParams["cert"], meta.Cert)
}
if meta.key != testData.authParams["key"] {
t.Errorf("Test case: %#v. Expected key to be set to %#v but got %#v\n", id, testData.authParams["key"], meta.key)
if meta.Key != testData.authParams["key"] {
t.Errorf("Test case: %#v. Expected key to be set to %#v but got %#v\n", id, testData.authParams["key"], meta.Key)
}
if meta.keyPassword != testData.authParams["keyPassword"] {
t.Errorf("Test case: %#v. Expected key to be set to %#v but got %#v\n", id, testData.authParams["keyPassword"], meta.keyPassword)
if meta.KeyPassword != testData.authParams["keyPassword"] {
t.Errorf("Test case: %#v. Expected key to be set to %#v but got %#v\n", id, testData.authParams["keyPassword"], meta.KeyPassword)
}
}
}
@@ -360,7 +360,7 @@ func TestApacheKafkaAuthParams(t *testing.T) {

func TestApacheKafkaGetMetricSpecForScaling(t *testing.T) {
for _, testData := range apacheKafkaMetricIdentifiers {
meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validApacheKafkaWithAuthParams, TriggerIndex: testData.triggerIndex}, logr.Discard())
meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validApacheKafkaWithAuthParams, TriggerIndex: testData.triggerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
94 changes: 77 additions & 17 deletions pkg/scalers/scalersconfig/typed_config.go
Original file line number Diff line number Diff line change
@@ -75,6 +75,7 @@ const (
nameTag = "name"
enumTag = "enum"
exclusiveSetTag = "exclusiveSet"
rangeTag = "range"
)

// Params is a struct that represents the parameter list that can be used in the keda tag
@@ -105,6 +106,9 @@ type Params struct {

// ExclusiveSet is the 'exclusiveSet' tag parameter defining the list of values that are mutually exclusive
ExclusiveSet []string

// RangeSeparator is the 'range' tag parameter defining the separator for range values
RangeSeparator string
}

// IsNested is a function that returns true if the parameter is nested
@@ -134,7 +138,7 @@ func (sc *ScalerConfig) TypedConfig(typedConfig any) (err error) {
// this shouldn't happen, but calling certain reflection functions may result in panic
// if it does, it's better to return a error with stacktrace and reject parsing config
// rather than crashing KEDA
err = fmt.Errorf("failed to parse typed config %T resulted in panic\n%v", r, debug.Stack())
err = fmt.Errorf("failed to parse typed config %T resulted in panic\n%v", r, string(debug.Stack()))
}
}()
err = sc.parseTypedConfig(typedConfig, false)
@@ -242,14 +246,14 @@ func (sc *ScalerConfig) setValue(field reflect.Value, params Params) error {
}
return sc.parseTypedConfig(field.Addr().Interface(), params.Optional)
}
if err := setConfigValueHelper(valFromConfig, field); err != nil {
if err := setConfigValueHelper(params, valFromConfig, field); err != nil {
return fmt.Errorf("unable to set param %q value %q: %w", params.Name, valFromConfig, err)
}
return nil
}

// setConfigValueURLParams is a function that sets the value of the url.Values field
func setConfigValueURLParams(valFromConfig string, field reflect.Value) error {
func setConfigValueURLParams(params Params, valFromConfig string, field reflect.Value) error {
field.Set(reflect.MakeMap(reflect.MapOf(field.Type().Key(), field.Type().Elem())))
vals, err := url.ParseQuery(valFromConfig)
if err != nil {
@@ -258,7 +262,7 @@ func setConfigValueURLParams(valFromConfig string, field reflect.Value) error {
for k, vs := range vals {
ifcMapKeyElem := reflect.New(field.Type().Key()).Elem()
ifcMapValueElem := reflect.New(field.Type().Elem()).Elem()
if err := setConfigValueHelper(k, ifcMapKeyElem); err != nil {
if err := setConfigValueHelper(params, k, ifcMapKeyElem); err != nil {
return fmt.Errorf("map key %q: %w", k, err)
}
for _, v := range vs {
@@ -270,7 +274,7 @@ func setConfigValueURLParams(valFromConfig string, field reflect.Value) error {
}

// setConfigValueMap is a function that sets the value of the map field
func setConfigValueMap(valFromConfig string, field reflect.Value) error {
func setConfigValueMap(params Params, valFromConfig string, field reflect.Value) error {
field.Set(reflect.MakeMap(reflect.MapOf(field.Type().Key(), field.Type().Elem())))
split := strings.Split(valFromConfig, elemSeparator)
for _, s := range split {
@@ -282,34 +286,81 @@ func setConfigValueMap(valFromConfig string, field reflect.Value) error {
key := strings.TrimSpace(kv[0])
val := strings.TrimSpace(kv[1])
ifcKeyElem := reflect.New(field.Type().Key()).Elem()
if err := setConfigValueHelper(key, ifcKeyElem); err != nil {
if err := setConfigValueHelper(params, key, ifcKeyElem); err != nil {
return fmt.Errorf("map key %q: %w", key, err)
}
ifcValueElem := reflect.New(field.Type().Elem()).Elem()
if err := setConfigValueHelper(val, ifcValueElem); err != nil {
if err := setConfigValueHelper(params, val, ifcValueElem); err != nil {
return fmt.Errorf("map key %q, value %q: %w", key, val, err)
}
field.SetMapIndex(ifcKeyElem, ifcValueElem)
}
return nil
}

// canRange is a function that checks if the value can be ranged
func canRange(valFromConfig, elemRangeSeparator string, field reflect.Value) bool {
if elemRangeSeparator == "" {
return false
}
if field.Kind() != reflect.Slice {
return false
}
elemIfc := reflect.New(field.Type().Elem()).Interface()
elemVal := reflect.ValueOf(elemIfc).Elem()
if !elemVal.CanInt() {
return false
}
return strings.Contains(valFromConfig, elemRangeSeparator)
}

// setConfigValueRange is a function that sets the value of the range field
func setConfigValueRange(params Params, valFromConfig string, field reflect.Value) error {
rangeSplit := strings.Split(valFromConfig, params.RangeSeparator)
if len(rangeSplit) != 2 {
return fmt.Errorf("expected format start%vend, got %q", params.RangeSeparator, valFromConfig)
}
start := reflect.New(field.Type().Elem()).Interface()
end := reflect.New(field.Type().Elem()).Interface()
if err := json.Unmarshal([]byte(rangeSplit[0]), &start); err != nil {
return fmt.Errorf("unable to parse start value %q: %w", rangeSplit[0], err)
}
if err := json.Unmarshal([]byte(rangeSplit[1]), &end); err != nil {
return fmt.Errorf("unable to parse end value %q: %w", rangeSplit[1], err)
}

startVal := reflect.ValueOf(start).Elem()
endVal := reflect.ValueOf(end).Elem()
for i := startVal.Int(); i <= endVal.Int(); i++ {
elemVal := reflect.New(field.Type().Elem()).Elem()
elemVal.SetInt(i)
field.Set(reflect.Append(field, elemVal))
}
return nil
}

// setConfigValueSlice is a function that sets the value of the slice field
func setConfigValueSlice(valFromConfig string, field reflect.Value) error {
func setConfigValueSlice(params Params, valFromConfig string, field reflect.Value) error {
elemIfc := reflect.New(field.Type().Elem()).Interface()
split := strings.Split(valFromConfig, elemSeparator)
for i, s := range split {
s := strings.TrimSpace(s)
if err := setConfigValueHelper(s, reflect.ValueOf(elemIfc).Elem()); err != nil {
return fmt.Errorf("slice element %d: %w", i, err)
if canRange(s, params.RangeSeparator, field) {
if err := setConfigValueRange(params, s, field); err != nil {
return fmt.Errorf("slice element %d: %w", i, err)
}
} else {
if err := setConfigValueHelper(params, s, reflect.ValueOf(elemIfc).Elem()); err != nil {
return fmt.Errorf("slice element %d: %w", i, err)
}
field.Set(reflect.Append(field, reflect.ValueOf(elemIfc).Elem()))
}
field.Set(reflect.Append(field, reflect.ValueOf(elemIfc).Elem()))
}
return nil
}

// setParamValueHelper is a function that sets the value of the parameter
func setConfigValueHelper(valFromConfig string, field reflect.Value) error {
func setConfigValueHelper(params Params, valFromConfig string, field reflect.Value) error {
paramValue := reflect.ValueOf(valFromConfig)
if paramValue.Type().AssignableTo(field.Type()) {
field.SetString(valFromConfig)
@@ -320,13 +371,13 @@ func setConfigValueHelper(valFromConfig string, field reflect.Value) error {
return nil
}
if field.Type() == reflect.TypeOf(url.Values{}) {
return setConfigValueURLParams(valFromConfig, field)
return setConfigValueURLParams(params, valFromConfig, field)
}
if field.Kind() == reflect.Map {
return setConfigValueMap(valFromConfig, field)
return setConfigValueMap(params, valFromConfig, field)
}
if field.Kind() == reflect.Slice {
return setConfigValueSlice(valFromConfig, field)
return setConfigValueSlice(params, valFromConfig, field)
}
if field.CanInterface() {
ifc := reflect.New(field.Type()).Interface()
@@ -356,8 +407,10 @@ func (sc *ScalerConfig) configParamValue(params Params) (string, bool) {
// this is checked when parsing the tags but adding as default case to avoid any potential future problems
return "", false
}
if param, ok := m[key]; ok && param != "" {
return strings.TrimSpace(param), true
param, ok := m[key]
param = strings.TrimSpace(param)
if ok && param != "" {
return param, true
}
}
return "", params.IsNested()
@@ -413,6 +466,13 @@ func paramsFromTag(tag string, field reflect.StructField) (Params, error) {
if len(tsplit) > 1 {
params.ExclusiveSet = strings.Split(tsplit[1], tagValueSeparator)
}
case rangeTag:
if len(tsplit) == 1 {
params.RangeSeparator = "-"
}
if len(tsplit) == 2 {
params.RangeSeparator = strings.TrimSpace(tsplit[1])
}
case "":
continue
default:
32 changes: 32 additions & 0 deletions pkg/scalers/scalersconfig/typed_config_test.go
Original file line number Diff line number Diff line change
@@ -515,3 +515,35 @@ func TestNoParsingOrder(t *testing.T) {
Expect(err).To(BeNil())
Expect(tsdm.DefaultVal2).To(Equal("dv"))
}

// TestRange tests the range param
func TestRange(t *testing.T) {
RegisterTestingT(t)

sc := &ScalerConfig{
TriggerMetadata: map[string]string{
"range": "5-10",
"multiRange": "5-10, 15-20",
"dottedRange": "2..7",
"wrongRange": "5..3",
},
}

type testStruct struct {
Range []int `keda:"name=range, order=triggerMetadata, range=-"`
MultiRange []int `keda:"name=multiRange, order=triggerMetadata, range"`
DottedRange []int `keda:"name=dottedRange, order=triggerMetadata, range=.."`
WrongRange []int `keda:"name=wrongRange, order=triggerMetadata, range=.."`
}

ts := testStruct{}
err := sc.TypedConfig(&ts)
Expect(err).To(BeNil())
Expect(ts.Range).To(HaveLen(6))
Expect(ts.Range).To(ConsistOf(5, 6, 7, 8, 9, 10))
Expect(ts.MultiRange).To(HaveLen(12))
Expect(ts.MultiRange).To(ConsistOf(5, 6, 7, 8, 9, 10, 15, 16, 17, 18, 19, 20))
Expect(ts.DottedRange).To(HaveLen(6))
Expect(ts.DottedRange).To(ConsistOf(2, 3, 4, 5, 6, 7))
Expect(ts.WrongRange).To(HaveLen(0))
}

0 comments on commit fff7eac

Please sign in to comment.