Skip to content

Commit

Permalink
Add support for GSSAPI in Kafka scaler (kedacore#4851)
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Novichenok <[email protected]>
Signed-off-by: novicr <[email protected]>
Co-authored-by: Roman Novichenok <[email protected]>
Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: anton.lysina <[email protected]>
  • Loading branch information
3 people authored and toniiiik committed Jan 15, 2024
1 parent 0b2590c commit 9c65f62
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 39 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Here is an overview of all new **experimental** features:
### Improvements

- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836))

### Fixes

Expand All @@ -77,7 +78,6 @@ New deprecation(s):
### Breaking Changes

- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

### Other

- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
Expand Down
184 changes: 151 additions & 33 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -70,6 +71,11 @@ type kafkaMetadata struct {
username string
password string

// GSSAPI
keytabPath string
realm string
kerberosConfigPath string

// OAUTHBEARER
scopes []string
oauthTokenEndpointURI string
Expand Down Expand Up @@ -102,6 +108,7 @@ const (
KafkaSASLTypeSCRAMSHA256 kafkaSaslType = "scram_sha256"
KafkaSASLTypeSCRAMSHA512 kafkaSaslType = "scram_sha512"
KafkaSASLTypeOAuthbearer kafkaSaslType = "oauthbearer"
KafkaSASLTypeGSSAPI kafkaSaslType = "gssapi"
)

const (
Expand Down Expand Up @@ -165,39 +172,18 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
saslAuthType = strings.TrimSpace(saslAuthType)
mode := kafkaSaslType(saslAuthType)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
switch {
case mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer:
err := parseSaslParams(config, meta, mode)
if err != nil {
return err
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode

if mode == KafkaSASLTypeOAuthbearer {
meta.scopes = strings.Split(config.AuthParams["scopes"], ",")

if config.AuthParams["oauthTokenEndpointUri"] == "" {
return errors.New("no oauth token endpoint uri given")
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.oauthExtensions = make(map[string]string)
oauthExtensionsRaw := config.AuthParams["oauthExtensions"]
if oauthExtensionsRaw != "" {
for _, extension := range strings.Split(oauthExtensionsRaw, ",") {
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1]
}
}
case mode == KafkaSASLTypeGSSAPI:
err := parseKerberosParams(config, meta, mode)
if err != nil {
return err
}
} else {
default:
return fmt.Errorf("err SASL mode %s given", mode)
}
}
Expand Down Expand Up @@ -265,10 +251,109 @@ func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error {
meta.keyPassword = ""
}
meta.enableTLS = true
return nil
}

func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if (config.AuthParams["password"] == "" && config.AuthParams["keytab"] == "") ||
(config.AuthParams["password"] != "" && config.AuthParams["keytab"] != "") {
return errors.New("exactly one of 'password' or 'keytab' must be provided for GSSAPI authentication")
}
if config.AuthParams["password"] != "" {
meta.password = strings.TrimSpace(config.AuthParams["password"])
} else {
path, err := saveToFile(config.AuthParams["keytab"])
if err != nil {
return fmt.Errorf("error saving keytab to file: %w", err)
}
meta.keytabPath = path
}

if config.AuthParams["realm"] == "" {
return errors.New("no realm given")
}
meta.realm = strings.TrimSpace(config.AuthParams["realm"])

if config.AuthParams["kerberosConfig"] == "" {
return errors.New("no Kerberos configuration file (kerberosConfig) given")
}
path, err := saveToFile(config.AuthParams["kerberosConfig"])
if err != nil {
return fmt.Errorf("error saving kerberosConfig to file: %w", err)
}
meta.kerberosConfigPath = path

meta.saslType = mode
return nil
}

func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode

if mode == KafkaSASLTypeOAuthbearer {
meta.scopes = strings.Split(config.AuthParams["scopes"], ",")

if config.AuthParams["oauthTokenEndpointUri"] == "" {
return errors.New("no oauth token endpoint uri given")
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.oauthExtensions = make(map[string]string)
oauthExtensionsRaw := config.AuthParams["oauthExtensions"]
if oauthExtensionsRaw != "" {
for _, extension := range strings.Split(oauthExtensionsRaw, ",") {
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1]
}
}
}
return nil
}

func saveToFile(content string) (string, error) {
data := []byte(content)

tempKrbDir := fmt.Sprintf("%s%c%s", os.TempDir(), os.PathSeparator, "kerberos")
err := os.MkdirAll(tempKrbDir, 0700)
if err != nil {
return "", fmt.Errorf(`error creating temporary directory: %s. Error: %w
Note, when running in a container a writable /tmp/kerberos emptyDir must be mounted. Refer to documentation`, tempKrbDir, err)
}

tempFile, err := os.CreateTemp(tempKrbDir, "krb_*")
if err != nil {
return "", fmt.Errorf("error creating temporary file: %w", err)
}
defer tempFile.Close()

_, err = tempFile.Write(data)
if err != nil {
return "", fmt.Errorf("error writing to temporary file: %w", err)
}

// Get the temporary file's name
tempFilename := tempFile.Name()

return tempFilename, nil
}

func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata, error) {
meta := kafkaMetadata{}
switch {
Expand Down Expand Up @@ -400,7 +485,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config := sarama.NewConfig()
config.Version = metadata.version

if metadata.saslType != KafkaSASLTypeNone {
if metadata.saslType != KafkaSASLTypeNone && metadata.saslType != KafkaSASLTypeGSSAPI {
config.Net.SASL.Enable = true
config.Net.SASL.User = metadata.username
config.Net.SASL.Password = metadata.password
Expand Down Expand Up @@ -434,6 +519,22 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.oauthExtensions)
}

if metadata.saslType == KafkaSASLTypeGSSAPI {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
config.Net.SASL.GSSAPI.ServiceName = "kafka"
config.Net.SASL.GSSAPI.Username = metadata.username
config.Net.SASL.GSSAPI.Realm = metadata.realm
config.Net.SASL.GSSAPI.KerberosConfigPath = metadata.kerberosConfigPath
if metadata.keytabPath != "" {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
config.Net.SASL.GSSAPI.KeyTabPath = metadata.keytabPath
} else {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
config.Net.SASL.GSSAPI.Password = metadata.password
}
}

client, err := sarama.NewClient(metadata.bootstrapServers, config)
if err != nil {
return nil, nil, fmt.Errorf("error creating kafka client: %w", err)
Expand Down Expand Up @@ -594,7 +695,24 @@ func (s *kafkaScaler) Close(context.Context) error {
if s.admin == nil {
return nil
}
return s.admin.Close()

err := s.admin.Close()
if err != nil {
return err
}

// clean up any temporary files
if strings.TrimSpace(s.metadata.kerberosConfigPath) != "" {
if err := os.Remove(s.metadata.kerberosConfigPath); err != nil {
return err
}
}
if strings.TrimSpace(s.metadata.keytabPath) != "" {
if err := os.Remove(s.metadata.keytabPath); err != nil {
return err
}
}
return nil
}

func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
Expand Down
Loading

0 comments on commit 9c65f62

Please sign in to comment.