Skip to content

Commit

Permalink
feat: adding version metadata to be able to specify kafka broker vers…
Browse files Browse the repository at this point in the history
…ion (#1866)

Signed-off-by: Flavien Chantelot <[email protected]>
  • Loading branch information
Dorn- authored Jun 14, 2021
1 parent 1fbb5c6 commit d11b1d6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
13 changes: 12 additions & 1 deletion pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type kafkaMetadata struct {
lagThreshold int64
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
version sarama.KafkaVersion

// SASL
saslType kafkaSaslType
Expand Down Expand Up @@ -191,6 +192,16 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta.allowIdleConsumers = t
}

meta.version = sarama.V1_0_0_0
if val, ok := config.TriggerMetadata["version"]; ok {
val = strings.TrimSpace(val)
version, err := sarama.ParseKafkaVersion(val)
if err != nil {
return meta, fmt.Errorf("error parsing kafka version: %s", err)
}
meta.version = version
}

return meta, nil
}

Expand Down Expand Up @@ -224,7 +235,7 @@ func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {

func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin, error) {
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Version = metadata.version

if metadata.saslType != KafkaSASLTypeNone {
config.Net.SASL.Enable = true
Expand Down
4 changes: 4 additions & 0 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest", false},
// failure, no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false},
// failure, version not supported
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success, more brokers
Expand All @@ -66,6 +68,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success, allowIdleConsumers is true
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true},
// success, version supported
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true},
}

var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
Expand Down

0 comments on commit d11b1d6

Please sign in to comment.