Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka scaler sasl #486

Merged
merged 5 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/testify v1.4.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
google.golang.org/api v0.10.0
google.golang.org/genproto v0.0.0-20191002211648-c459b9ce5143
google.golang.org/grpc v1.24.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,9 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 h1:3SVOIvH7Ae1KRYy
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
case "aws-cloudwatch":
return scalers.NewAwsCloudwatchScaler(resolvedEnv, triggerMetadata, authParams)
case "kafka":
return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata)
return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams)
case "rabbitmq":
return scalers.NewRabbitMQScaler(resolvedEnv, triggerMetadata, authParams)
case "azure-eventhub":
Expand Down
115 changes: 104 additions & 11 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package scalers

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"strconv"
Expand All @@ -17,19 +19,38 @@ import (
)

type kafkaScaler struct {
resolvedSecrets map[string]string
metadata kafkaMetadata
client sarama.Client
admin sarama.ClusterAdmin
metadata kafkaMetadata
client sarama.Client
admin sarama.ClusterAdmin
}

type kafkaMetadata struct {
brokers []string
group string
topic string
lagThreshold int64

// auth
authMode kafkaAuthMode
username string
password string

// ssl
cert string
key string
ca string
}

type kafkaAuthMode string

const (
kafkaAuthModeForNone kafkaAuthMode = "none"
kafkaAuthModeForSaslPlaintext kafkaAuthMode = "sasl_plaintext"
kafkaAuthModeForSaslScramSha256 kafkaAuthMode = "sasl_scram_sha256"
kafkaAuthModeForSaslScramSha512 kafkaAuthMode = "sasl_scram_sha512"
kafkaAuthModeForSaslSSL kafkaAuthMode = "sasl_ssl"
)

const (
lagThresholdMetricName = "lagThreshold"
kafkaMetricType = "External"
Expand All @@ -39,8 +60,8 @@ const (
var kafkaLog = logf.Log.WithName("kafka_scaler")

// NewKafkaScaler creates a new kafkaScaler
func NewKafkaScaler(resolvedSecrets, metadata map[string]string) (Scaler, error) {
kafkaMetadata, err := parseKafkaMetadata(metadata)
func NewKafkaScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
kafkaMetadata, err := parseKafkaMetadata(resolvedEnv, metadata, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing kafka metadata: %s", err)
}
Expand All @@ -51,14 +72,13 @@ func NewKafkaScaler(resolvedSecrets, metadata map[string]string) (Scaler, error)
}

return &kafkaScaler{
client: client,
admin: admin,
metadata: kafkaMetadata,
resolvedSecrets: resolvedSecrets,
client: client,
admin: admin,
metadata: kafkaMetadata,
}, nil
}

func parseKafkaMetadata(metadata map[string]string) (kafkaMetadata, error) {
func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (kafkaMetadata, error) {
meta := kafkaMetadata{}

if metadata["brokerList"] == "" {
Expand Down Expand Up @@ -86,6 +106,45 @@ func parseKafkaMetadata(metadata map[string]string) (kafkaMetadata, error) {
meta.lagThreshold = t
}

meta.authMode = kafkaAuthModeForNone
if val, ok := authParams["authMode"]; ok {
mode := kafkaAuthMode(val)
if mode != kafkaAuthModeForNone && mode != kafkaAuthModeForSaslPlaintext && mode != kafkaAuthModeForSaslSSL && mode != kafkaAuthModeForSaslScramSha256 && mode != kafkaAuthModeForSaslScramSha512 {
jeffhollan marked this conversation as resolved.
Show resolved Hide resolved
return meta, fmt.Errorf("err auth mode %s given", mode)
}

meta.authMode = mode
}

if meta.authMode != kafkaAuthModeForNone {
if authParams["username"] == "" {
return meta, errors.New("no username given")
}
meta.username = authParams["username"]

if authParams["password"] == "" {
return meta, errors.New("no password given")
}
meta.password = authParams["password"]
}

if meta.authMode == kafkaAuthModeForSaslSSL {
if authParams["ca"] == "" {
return meta, errors.New("no ca given")
}
meta.ca = authParams["ca"]

if authParams["cert"] == "" {
return meta, errors.New("no cert given")
}
meta.cert = authParams["cert"]

if authParams["key"] == "" {
return meta, errors.New("no key given")
}
meta.key = authParams["key"]
}

return meta, nil
}

Expand Down Expand Up @@ -118,6 +177,40 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0

if ok := metadata.authMode == kafkaAuthModeForSaslPlaintext || metadata.authMode == kafkaAuthModeForSaslSSL || metadata.authMode == kafkaAuthModeForSaslScramSha256 || metadata.authMode == kafkaAuthModeForSaslScramSha512; ok {
config.Net.SASL.Enable = true
config.Net.SASL.User = metadata.username
config.Net.SASL.Password = metadata.password
}

if metadata.authMode == kafkaAuthModeForSaslSSL {
cert, err := tls.X509KeyPair([]byte(metadata.cert), []byte(metadata.key))
if err != nil {
return nil, nil, fmt.Errorf("error parse X509KeyPair: %s", err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(metadata.ca))

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}

config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}

if metadata.authMode == kafkaAuthModeForSaslScramSha256 {
jeffhollan marked this conversation as resolved.
Show resolved Hide resolved
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
}

if metadata.authMode == kafkaAuthModeForSaslScramSha512 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
}

client, err := sarama.NewClient(metadata.brokers, config)
if err != nil {
return nil, nil, fmt.Errorf("error creating kafka client: %s", err)
Expand Down
34 changes: 32 additions & 2 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,47 @@ var validMetadata = map[string]string{
"topic": "my-topic",
}

// A complete valid authParams example for sasl, with username and passwd
var validWithAuthParams = map[string]string{
"authMode": "sasl_plaintext",
"username": "admin",
"passwd": "admin",
}

// A complete valid authParams example for sasl, without username and passwd
var validWithoutAuthParams = map[string]string{}

var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{}, true, 0, nil, "", ""},
{map[string]string{"brokerList": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", ""},
{map[string]string{"brokerList": "foo:9092,bar:9092"}, true, 2, []string{"foo:9092", "bar:9092"}, "", ""},
{map[string]string{"brokerList": "a", "consumerGroup": "my-group"}, true, 1, []string{"a"}, "my-group", ""},
jeffhollan marked this conversation as resolved.
Show resolved Hide resolved
{validMetadata, false, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"},
}

func TestGetBrokers(t *testing.T) {
for _, testData := range parseKafkaMetadataTestDataset {
meta, err := parseKafkaMetadata(testData.metadata)
meta, err := parseKafkaMetadata(nil, testData.metadata, validWithAuthParams)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if len(meta.brokers) != testData.numBrokers {
t.Errorf("Expected %d brokers but got %d\n", testData.numBrokers, len(meta.brokers))
}
if !reflect.DeepEqual(testData.brokers, meta.brokers) {
t.Errorf("Expected %v but got %v\n", testData.brokers, meta.brokers)
}
if meta.group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.group)
}
if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}

meta, err = parseKafkaMetadata(nil, testData.metadata, validWithoutAuthParams)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
Expand Down
36 changes: 36 additions & 0 deletions pkg/scalers/kafka_scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package scalers

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}