Skip to content

Commit

Permalink
refactor(kafka): rename extensions to oauthExtensions
Browse files Browse the repository at this point in the history
Signed-off-by: qvalentin <[email protected]>
  • Loading branch information
qvalentin committed May 3, 2023
1 parent b2a1d81 commit 128a561
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
22 changes: 11 additions & 11 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type kafkaMetadata struct {
// OAUTHBEARER
scopes []string
oauthTokenEndpointURI string
extensions map[string]string
oauthExtensions map[string]string

// TLS
enableTLS bool
Expand Down Expand Up @@ -165,16 +165,16 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.extensions = make(map[string]string)
for _, extension := range strings.Split(config.AuthParams["extensions"], ",") {
if extension == "" {
continue
meta.oauthExtensions = make(map[string]string)
oauthExtensionsRaw := config.AuthParams["oauthExtensions"]
if oauthExtensionsRaw != "" {
for _, extension := range strings.Split(oauthExtensionsRaw, ",") {
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1]
}
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.extensions[splittedExtension[0]] = splittedExtension[1]
}
}
} else {
Expand Down Expand Up @@ -395,7 +395,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin

if metadata.saslType == KafkaSASLTypeOAuthbearer {
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.extensions)
config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.oauthExtensions)
}

client, err := sarama.NewClient(metadata.bootstrapServers, config)
Expand Down
12 changes: 6 additions & 6 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,11 @@ var parseKafkaOAuthbrearerAuthParamsTestDataset = []parseKafkaAuthParamsTestData
// failure, SASL OAUTHBEARER + TLS missing oauthTokenEndpointUri
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "", "tls": "disable"}, true, false},
// success, SASL OAUTHBEARER + extension
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "extensions": "extension_foo=bar"}, false, false},
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "oauthExtensions": "extension_foo=bar"}, false, false},
// success, SASL OAUTHBEARER + multiple extensions
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "extensions": "extension_foo=bar,extension_baz=baz"}, false, false},
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "oauthExtensions": "extension_foo=bar,extension_baz=baz"}, false, false},
// failure, SASL OAUTHBEARER + bad extension
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "extensions": "extension_foo=bar,extension_bazbaz"}, true, false},
{map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable", "oauthExtensions": "extension_foo=bar,extension_bazbaz"}, true, false},
}

var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
Expand Down Expand Up @@ -390,9 +390,9 @@ func TestKafkaOAuthbrearerAuthParams(t *testing.T) {
t.Errorf("Expected scopes to be set to %v but got %v\n", strings.Count(testData.authParams["scopes"], ","), len(meta.scopes))
}
}
if err == nil && testData.authParams["extensions"] != "" {
if len(meta.extensions) != strings.Count(testData.authParams["extensions"], ",")+1 {
t.Errorf("Expected number of extensions to be set to %v but got %v\n", strings.Count(testData.authParams["extensions"], ",")+1, len(meta.extensions))
if err == nil && testData.authParams["oauthExtensions"] != "" {
if len(meta.oauthExtensions) != strings.Count(testData.authParams["oauthExtensions"], ",")+1 {
t.Errorf("Expected number of extensions to be set to %v but got %v\n", strings.Count(testData.authParams["oauthExtensions"], ",")+1, len(meta.oauthExtensions))
}
}
}
Expand Down

0 comments on commit 128a561

Please sign in to comment.