Skip to content

Commit

Permalink
Limit Kafka Partitions KEDA operates on (#3879)
Browse files Browse the repository at this point in the history
Signed-off-by: Tobias Krause <[email protected]>
Co-authored-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
tobiaskrause and zroubalik authored Dec 7, 2022
1 parent b04cf7f commit ccd6705
Show file tree
Hide file tree
Showing 5 changed files with 398 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General:** Support disable keep http connection alive([#3874](https://github.com/kedacore/keda/issues/3874)
- **General:** Improve the function used to normalize metric names ([#3789](https://github.com/kedacore/keda/issues/3789)
- **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681))
- **Apache Kafka Scaler:** Limit Kafka Partitions KEDA operates on ([#3830](https://github.com/kedacore/keda/issues/3830))
- **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610))
- **Azure Event Hub Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569))
- **Azure Event Hub Scaler:** Support using connection strings for Event Hub namespace instead of the Event Hub itself. ([#3922](https://github.com/kedacore/keda/issues/3922))
Expand Down
40 changes: 37 additions & 3 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type kafkaMetadata struct {
bootstrapServers []string
group string
topic string
partitionLimitation []int32
lagThreshold int64
activationLagThreshold int64
offsetResetPolicy offsetResetPolicy
Expand Down Expand Up @@ -205,6 +206,21 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

meta.partitionLimitation = nil
if config.TriggerMetadata["partitionLimitation"] != "" {
if meta.topic == "" {
logger.V(1).Info("no specific topic set, ignoring partitionLimitation setting")
} else {
pattern := config.TriggerMetadata["partitionLimitation"]
parsed, err := kedautil.ParseInt32List(pattern)
if err != nil {
return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %s", pattern, err)
}
meta.partitionLimitation = parsed
logger.V(0).Info(fmt.Sprintf("partition limit active '%s'", pattern))
}
}

meta.offsetResetPolicy = defaultOffsetResetPolicy

if config.TriggerMetadata["offsetResetPolicy"] != "" {
Expand Down Expand Up @@ -378,15 +394,33 @@ func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) {
s.logger.Error(errMsg, "")
}
partitionMetadata := topicMetadata.Partitions
partitions := make([]int32, len(partitionMetadata))
for i, p := range partitionMetadata {
partitions[i] = p.ID
var partitions []int32
for _, p := range partitionMetadata {
if s.isActivePartition(p.ID) {
partitions = append(partitions, p.ID)
}
}
if len(partitions) == 0 {
return nil, fmt.Errorf("expected at least one active partition within the topic '%s'", topicMetadata.Name)
}

topicPartitions[topicMetadata.Name] = partitions
}
return topicPartitions, nil
}

func (s *kafkaScaler) isActivePartition(pID int32) bool {
if s.metadata.partitionLimitation == nil {
return true
}
for _, _pID := range s.metadata.partitionLimitation {
if pID == _pID {
return true
}
}
return false
}

func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, topicPartitions)
if err != nil {
Expand Down
233 changes: 208 additions & 25 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import (
"strings"
"testing"

"github.com/Shopify/sarama"
"github.com/go-logr/logr"
)

type parseKafkaMetadataTestData struct {
metadata map[string]string
isError bool
numBrokers int
brokers []string
group string
topic string
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
metadata map[string]string
isError bool
numBrokers int
brokers []string
group string
topic string
partitionLimitation []int32
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
}

type parseKafkaAuthParamsTestData struct {
Expand Down Expand Up @@ -52,35 +54,45 @@ var validWithoutAuthParams = map[string]string{}

var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
// failure, no bootstrapServers
{map[string]string{}, true, 0, nil, "", "", "", false},
{map[string]string{}, true, 0, nil, "", "", nil, "", false},
// failure, no consumer group
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest", false},
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", nil, "latest", false},
// success, no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false},
// success, ignore partitionLimitation if no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false},
// failure, version not supported
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false},
// failure, lagThreshold is negative value
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false},
// failure, lagThreshold is 0
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false},
// failure, activationLagThreshold is not int
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false},
// success
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false},
// success, partitionLimitation as list
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false},
// success, partitionLimitation as range
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false},
// success, partitionLimitation mixed list + ranges
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false},
// failure, partitionLimitation wrong data type
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false},
// success, more brokers
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false},
// success, offsetResetPolicy policy latest
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false},
// failure, offsetResetPolicy policy wrong
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", "", false},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, "", false},
// success, offsetResetPolicy policy earliest
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("earliest"), false},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("earliest"), false},
// failure, allowIdleConsumers malformed
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false},
// success, allowIdleConsumers is true
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true},
// success, version supported
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true},
}

var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
Expand Down Expand Up @@ -150,8 +162,8 @@ var parseKafkaOAuthbreakerAuthParamsTestDataset = []parseKafkaAuthParamsTestData
}

var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
{&parseKafkaMetadataTestDataset[7], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[7], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[8], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[8], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"},
}

Expand All @@ -177,6 +189,9 @@ func TestGetBrokers(t *testing.T) {
if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}
if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) {
t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
Expand All @@ -201,6 +216,9 @@ func TestGetBrokers(t *testing.T) {
if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}
if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) {
t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
Expand Down Expand Up @@ -273,3 +291,168 @@ func TestKafkaGetMetricSpecForScaling(t *testing.T) {
}
}
}

func TestGetTopicPartitions(t *testing.T) {
testData := []struct {
name string
metadata map[string]string
partitionIds []int32
exp map[string][]int32
}{
{"success_all_partitions", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2"}, []int32{1, 2}, map[string][]int32{"my-topic": {1, 2}}},
{"success_partial_partitions", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3"}, []int32{1, 2, 3, 4, 5, 6}, map[string][]int32{"my-topic": {1, 2, 3}}},
}

for _, tt := range testData {
t.Run(tt.name, func(t *testing.T) {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: tt.metadata, AuthParams: validWithAuthParams}, logr.Discard())
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockKafkaScaler := kafkaScaler{"", meta, nil, &MockClusterAdmin{partitionIds: tt.partitionIds}, logr.Discard()}

patitions, err := mockKafkaScaler.getTopicPartitions()

if !reflect.DeepEqual(tt.exp, patitions) {
t.Errorf("Expected %v but got %v\n", tt.exp, patitions)
}

if err != nil {
t.Error("Expected success but got error", err)
}
})
}
}

type MockClusterAdmin struct {
partitionIds []int32
}

func (m *MockClusterAdmin) CreateTopic(topic string, detail *sarama.TopicDetail, validateOnly bool) error {
return nil
}
func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error) {
return nil, nil
}

func (m *MockClusterAdmin) DescribeTopics(topics []string) (metadata []*sarama.TopicMetadata, err error) {
metadatas := make([]*sarama.TopicMetadata, len(topics))

partitionMetadata := make([]*sarama.PartitionMetadata, len(m.partitionIds))
for i, id := range m.partitionIds {
partitionMetadata[i] = &sarama.PartitionMetadata{ID: id}
}

for i, name := range topics {
metadatas[i] = &sarama.TopicMetadata{Name: name, Partitions: partitionMetadata}
}
return metadatas, nil
}

func (m *MockClusterAdmin) DeleteTopic(topic string) error {
return nil
}

func (m *MockClusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
return nil
}

func (m *MockClusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
return nil
}

func (m *MockClusterAdmin) ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, err error) {
return nil, nil
}

func (m *MockClusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
return nil
}

func (m *MockClusterAdmin) DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error) {
return nil, nil
}

func (m *MockClusterAdmin) AlterConfig(resourceType sarama.ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
return nil
}

func (m *MockClusterAdmin) IncrementalAlterConfig(resourceType sarama.ConfigResourceType, name string, entries map[string]sarama.IncrementalAlterConfigsEntry, validateOnly bool) error {
return nil
}

func (m *MockClusterAdmin) CreateACL(resource sarama.Resource, acl sarama.Acl) error {
return nil
}

func (m *MockClusterAdmin) CreateACLs([]*sarama.ResourceAcls) error {
return nil
}

func (m *MockClusterAdmin) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error) {
return nil, nil
}

func (m *MockClusterAdmin) DeleteACL(filter sarama.AclFilter, validateOnly bool) ([]sarama.MatchingAcl, error) {
return nil, nil
}

func (m *MockClusterAdmin) ListConsumerGroups() (map[string]string, error) {
return nil, nil
}

func (m *MockClusterAdmin) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error) {
return nil, nil
}

func (m *MockClusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
return nil, nil
}

func (m *MockClusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
return nil
}

func (m *MockClusterAdmin) DeleteConsumerGroup(group string) error {
return nil
}

func (m *MockClusterAdmin) DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) {
return nil, 0, nil
}

func (m *MockClusterAdmin) DescribeLogDirs(brokers []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error) {
return nil, nil
}

func (m *MockClusterAdmin) DescribeUserScramCredentials(users []string) ([]*sarama.DescribeUserScramCredentialsResult, error) {
return nil, nil
}

func (m *MockClusterAdmin) DeleteUserScramCredentials(delete []sarama.AlterUserScramCredentialsDelete) ([]*sarama.AlterUserScramCredentialsResult, error) {
return nil, nil
}

func (m *MockClusterAdmin) UpsertUserScramCredentials(upsert []sarama.AlterUserScramCredentialsUpsert) ([]*sarama.AlterUserScramCredentialsResult, error) {
return nil, nil
}

func (m *MockClusterAdmin) DescribeClientQuotas(components []sarama.QuotaFilterComponent, strict bool) ([]sarama.DescribeClientQuotasEntry, error) {
return nil, nil
}

func (m *MockClusterAdmin) AlterClientQuotas(entity []sarama.QuotaEntityComponent, op sarama.ClientQuotasOp, validateOnly bool) error {
return nil
}

func (m *MockClusterAdmin) Controller() (*sarama.Broker, error) {
return nil, nil
}

func (m *MockClusterAdmin) RemoveMemberFromConsumerGroup(groupID string, groupInstanceIds []string) (*sarama.LeaveGroupResponse, error) {
return nil, nil
}

func (m *MockClusterAdmin) Close() error {
return nil
}
Loading

0 comments on commit ccd6705

Please sign in to comment.