diff --git a/CHANGELOG.md b/CHANGELOG.md index 59c68d022d8..a5ed109977e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features: - **Azure Pipelines Scaler**: New configuration parameter `requireAllDemands` to scale only if jobs request all demands provided by the scaling definition ([#4138](https://github.com/kedacore/keda/issues/4138)) - **Hashicorp Vault**: Add support to secrets backend version 1 ([#2645](https://github.com/kedacore/keda/issues/2645)) - **Kafka Scaler**: Improve error logging for `GetBlock` method ([#4232](https://github.com/kedacore/keda/issues/4232)) +- **Kafka Scaler**: Add support to use `tls` and `sasl` in ScaledObject ([#4232](https://github.com/kedacore/keda/issues/4322)) - **Prometheus Scaler**: Add custom headers and custom auth support ([#4208](https://github.com/kedacore/keda/issues/4208)) - **RabbitMQ Scaler**: Add TLS support ([#967](https://github.com/kedacore/keda/issues/967)) - **Redis Scalers**: Add support to Redis 7 ([#4052](https://github.com/kedacore/keda/issues/4052)) diff --git a/pkg/scalers/azure_blob_scaler.go b/pkg/scalers/azure_blob_scaler.go index 68c9204359e..9324c742095 100644 --- a/pkg/scalers/azure_blob_scaler.go +++ b/pkg/scalers/azure_blob_scaler.go @@ -139,7 +139,7 @@ func parseAzureBlobMetadata(config *ScalerConfig, logger logr.Logger) (*azure.Bl meta.EndpointSuffix = endpointSuffix // before triggerAuthentication CRD, pod identity was configured using this property - if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity.Provider == "" && val == "true" { + if val, ok := config.TriggerMetadata["useAAdPodIdentity"]; ok && config.PodIdentity.Provider == "" && val == stringTrue { config.PodIdentity = kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure} } diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 3e3ff1174c1..1505dd748fb 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -25,6 +25,11 @@ type kafkaScaler struct { previousOffsets map[string]map[int32]int64 } +const ( + stringEnable = "enable" + stringDisable = "disable" +) + type kafkaMetadata struct { bootstrapServers []string group string @@ -121,9 +126,23 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) { func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error { meta.saslType = KafkaSASLTypeNone + var saslAuthType string + switch { + case config.TriggerMetadata["sasl"] != "": + saslAuthType = config.TriggerMetadata["sasl"] + default: + saslAuthType = "" + } + if val, ok := config.AuthParams["sasl"]; ok { - val = strings.TrimSpace(val) - mode := kafkaSaslType(val) + if saslAuthType != "" { + return errors.New("unable to set `sasl` in both ScaledObject and TriggerAuthentication together") + } + saslAuthType = val + } + + if saslAuthType != "" { + mode := kafkaSaslType(saslAuthType) if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer { if config.AuthParams["username"] == "" { @@ -151,30 +170,51 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error { } meta.enableTLS = false + enableTLS := false + if val, ok := config.TriggerMetadata["tls"]; ok { + switch val { + case stringEnable: + enableTLS = true + case stringDisable: + enableTLS = false + default: + return fmt.Errorf("error incorrect TLS value given, got %s", val) + } + } + if val, ok := config.AuthParams["tls"]; ok { val = strings.TrimSpace(val) + if enableTLS { + return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together") + } + switch val { + case stringEnable: + enableTLS = true + case stringDisable: + enableTLS = false + default: + return fmt.Errorf("error incorrect TLS value given, got %s", val) + } + } - if val == "enable" { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { - return errors.New("key must be provided with cert") - } - if keyGiven && !certGiven { - return errors.New("cert must be provided with key") - } - meta.ca = config.AuthParams["ca"] - meta.cert = config.AuthParams["cert"] - meta.key = config.AuthParams["key"] - if value, found := config.AuthParams["keyPassword"]; found { - meta.keyPassword = value - } else { - meta.keyPassword = "" - } - meta.enableTLS = true - } else if val != "disable" { - return fmt.Errorf("err incorrect value for TLS given: %s", val) + if enableTLS { + certGiven := config.AuthParams["cert"] != "" + keyGiven := config.AuthParams["key"] != "" + if certGiven && !keyGiven { + return errors.New("key must be provided with cert") + } + if keyGiven && !certGiven { + return errors.New("cert must be provided with key") + } + meta.ca = config.AuthParams["ca"] + meta.cert = config.AuthParams["cert"] + meta.key = config.AuthParams["key"] + if value, found := config.AuthParams["keyPassword"]; found { + meta.keyPassword = value + } else { + meta.keyPassword = "" } + meta.enableTLS = true } return nil diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 7a1df4b7d62..d9eb7fb6b6f 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -29,6 +29,14 @@ type parseKafkaAuthParamsTestData struct { enableTLS bool } +// Testing the case where `tls` and `sasl` are specified in ScaledObject +type parseAuthParamsTestDataSecondAuthMethod struct { + metadata map[string]string + authParams map[string]string + isError bool + enableTLS bool +} + type kafkaMetricIdentifier struct { metadataTestData *parseKafkaMetadataTestData scalerIndex int @@ -158,8 +166,65 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ // failure, SASL + TLS, missing key {map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false}, } +var parseAuthParamsTestDataset = []parseAuthParamsTestDataSecondAuthMethod{ + // success, SASL plaintext + {map[string]string{"sasl": "plaintext", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false}, + // success, SASL scram_sha256 + {map[string]string{"sasl": "scram_sha256", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false}, + // success, SASL scram_sha512 + {map[string]string{"sasl": "scram_sha512", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false}, + // success, TLS only + {map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "cert": "ceert", "key": "keey"}, false, true}, + // success, TLS cert/key and assumed public CA + {map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"cert": "ceert", "key": "keey"}, false, true}, + // success, TLS cert/key + key password and assumed public CA + {map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true}, + // success, TLS CA only + {map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa"}, false, true}, + // success, SASL + TLS + {map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true}, + // success, SASL + TLS explicitly disabled + {map[string]string{"sasl": "plaintext", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false}, + // success, SASL OAUTHBEARER + TLS explicitly disabled + {map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com"}, false, false}, + // failure, SASL OAUTHBEARER + TLS explicitly disable + bad SASL type + {map[string]string{"sasl": "foo", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com"}, true, false}, + // success, SASL OAUTHBEARER + TLS missing scope + {map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "oauthTokenEndpointUri": "https://website.com"}, false, false}, + // failure, SASL OAUTHBEARER + TLS missing oauthTokenEndpointUri + {map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": ""}, true, false}, + // failure, SASL incorrect type + {map[string]string{"sasl": "foo", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, true, false}, + // failure, SASL missing username + {map[string]string{"sasl": "plaintext", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"password": "admin"}, true, false}, + // failure, SASL missing password + {map[string]string{"sasl": "plaintext", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin"}, true, false}, + // failure, TLS missing cert + {map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "key": "keey"}, true, false}, + // failure, TLS missing key + {map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "cert": "ceert"}, true, false}, + // failure, TLS invalid + {map[string]string{"tls": "random", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + // failure, SASL + TLS, incorrect SASL type + {map[string]string{"sasl": "foo", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + // failure, SASL + TLS, incorrect tls + {map[string]string{"sasl": "plaintext", "tls": "foo", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + // failure, SASL + TLS, missing username + {map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + // failure, SASL + TLS, missing password + {map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + // failure, SASL + TLS, missing cert + {map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "key": "keey"}, true, false}, + // failure, SASL + TLS, missing key + {map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert"}, true, false}, + + // failure, setting SASL values in both places + {map[string]string{"sasl": "scram_sha512", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"sasl": "scram_sha512", "username": "admin", "password": "admin"}, true, false}, + // failure, setting TLS values in both places + {map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, true}, +} -var parseKafkaOAuthbreakerAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ +var parseKafkaOAuthbrearerAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ // success, SASL OAUTHBEARER + TLS {map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable"}, false, false}, // success, SASL OAUTHBEARER + TLS multiple scopes @@ -243,6 +308,7 @@ func TestGetBrokers(t *testing.T) { } func TestKafkaAuthParams(t *testing.T) { + // Testing tls and sasl value in TriggerAuthentication for _, testData := range parseKafkaAuthParamsTestDataset { meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard()) @@ -270,10 +336,41 @@ func TestKafkaAuthParams(t *testing.T) { } } } + + // Testing tls and sasl value in scaledObject + for id, testData := range parseAuthParamsTestDataset { + meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}, logr.Discard()) + + if err != nil && !testData.isError { + t.Errorf("Test case: %v. Expected success but got error %v", id, err) + } + if testData.isError && err == nil { + t.Errorf("Test case: %v. Expected error but got success", id) + } + if !testData.isError { + if testData.metadata["tls"] == "true" && !meta.enableTLS { + t.Errorf("Test case: %v. Expected tls to be set to %v but got %v\n", id, testData.metadata["tls"], meta.enableTLS) + } + if meta.enableTLS { + if meta.ca != testData.authParams["ca"] { + t.Errorf("Test case: %v. Expected ca to be set to %v but got %v\n", id, testData.authParams["ca"], meta.ca) + } + if meta.cert != testData.authParams["cert"] { + t.Errorf("Test case: %v. Expected cert to be set to %v but got %v\n", id, testData.authParams["cert"], meta.cert) + } + if meta.key != testData.authParams["key"] { + t.Errorf("Test case: %v. Expected key to be set to %v but got %v\n", id, testData.authParams["key"], meta.key) + } + if meta.keyPassword != testData.authParams["keyPassword"] { + t.Errorf("Test case: %v. Expected key to be set to %v but got %v\n", id, testData.authParams["keyPassword"], meta.keyPassword) + } + } + } + } } -func TestKafkaOAuthbreakerAuthParams(t *testing.T) { - for _, testData := range parseKafkaOAuthbreakerAuthParamsTestDataset { +func TestKafkaOAuthbrearerAuthParams(t *testing.T) { + for _, testData := range parseKafkaOAuthbrearerAuthParamsTestDataset { meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard()) if err != nil && !testData.isError {