Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/consumer offset no consumers #172

Merged
merged 30 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a1059b7
feat(kakfa): WIP
paologallinaharbur May 9, 2022
dd606b3
feat(kakfa): WIP
paologallinaharbur May 9, 2022
9be910e
No_Offset in constant
alvarocabanas Jun 22, 2022
21f933d
Add new consumerOffset args
alvarocabanas Jun 28, 2022
244083a
Create new topic Offset getter with cache
alvarocabanas Jun 28, 2022
232a6a8
Add inactive consumers to kafka offset collection
alvarocabanas Jun 28, 2022
8d8a218
Separate lag totals calculation in another file
alvarocabanas Jun 28, 2022
e4f4a43
Create interface for topicOffsetGetter
alvarocabanas Jun 28, 2022
29b0f24
Add topicLister with cache
alvarocabanas Jun 28, 2022
f1e1141
Make interface method public and goimports
alvarocabanas Jun 28, 2022
da7b5d0
Add first kafka offset collect unit test
alvarocabanas Jun 29, 2022
8b3d81d
Add all cases to kafka offset collect unit test
alvarocabanas Jun 29, 2022
0cda8e1
Parallelize tests
alvarocabanas Jun 29, 2022
7ad5735
Remove test Parallelization because of global args
alvarocabanas Jun 29, 2022
80fdee2
Fix linting problems
alvarocabanas Jun 29, 2022
dc636dd
remove test from linter
alvarocabanas Jun 29, 2022
6d40085
Fix linting errors
alvarocabanas Jun 29, 2022
310a25b
Reformat calculate lag totals functions
alvarocabanas Jun 29, 2022
a3292cf
gofmt the collect_test
alvarocabanas Jun 29, 2022
8c170cd
Fix docker-compose for integration tests
alvarocabanas Jun 29, 2022
5d05a1f
Recover docker-compose down and exit on integration-test
alvarocabanas Jun 29, 2022
475e5b5
Typo TopicExclusions
alvarocabanas Jun 30, 2022
f436791
Correct numEntities typo
alvarocabanas Jun 30, 2022
369f172
Update src/args/args.go
alvarocabanas Jun 30, 2022
0f32584
Extract channel result feeding from collect function
alvarocabanas Jun 30, 2022
4829250
Modify variable name
alvarocabanas Jun 30, 2022
20b565a
Refactor aggregate kafka metrics
alvarocabanas Jun 30, 2022
9350b80
Fix client aggregator and add condition to test
alvarocabanas Jun 30, 2022
1f792b3
Create interface for aggregators
alvarocabanas Jul 1, 2022
bdb257f
Export ClientAggregations for linter
alvarocabanas Jul 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/args/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ type ArgumentList struct {
SaslGssapiDisableFASTNegotiation bool `default:"false" help:"Disable FAST negotiation."`

// Collection configuration
LocalOnlyCollection bool `default:"false" help:"Collect only the metrics related to the configured bootstrap broker. Useful for distributed metric collection"`
CollectBrokerTopicData bool `default:"true" help:"DEPRECATED -- currently it has no effect"`
TopicMode string `default:"None" help:"Possible options are All, None, or List. If List, must also specify the list of topics to collect with the topic_list option."`
TopicList string `default:"[]" help:"JSON array of strings with the names of topics to monitor. Only used if collect_topics is set to 'List'"`
TopicRegex string `default:"" help:"A regex pattern that matches the list of topics to collect. Only used if collect_topics is set to 'Regex'"`
TopicBucket string `default:"1/1" help:"Allows the partitioning of topic collection across multiple instances. The second number is the number of instances topics are partitioned across. The first number is the bucket number of the current instance, which should be between 1 and the second number."`
CollectTopicSize bool `default:"false" help:"Enablement of on disk Topic size metric collection. This metric can be very resource intensive to collect especially against many topics."`
CollectTopicOffset bool `default:"false" help:"Enablement of Topic offsets collection. This metric can be very resource intensive to collect especially against many topics."`
LocalOnlyCollection bool `default:"false" help:"Collect only the metrics related to the configured bootstrap broker. Useful for distributed metric collection"`
TopicMode string `default:"None" help:"Possiblex options are All, None, or List. If List, must also specify the list of topics to collect with the topic_list option."`
TopicList string `default:"[]" help:"JSON array of strings with the names of topics to monitor. Only used if collect_topics is set to 'List'"`
TopicRegex string `default:"" help:"A regex pattern that matches the list of topics to collect. Only used if collect_topics is set to 'Regex'"`
TopicBucket string `default:"1/1" help:"Allows the partitioning of topic collection across multiple instances. The second number is the number of instances topics are partitioned across. The first number is the bucket number of the current instance, which should be between 1 and the second number."`
CollectTopicSize bool `default:"false" help:"Enablement of on disk Topic size metric collection. This metric can be very resource intensive to collect especially against many topics."`
CollectTopicOffset bool `default:"false" help:"Enablement of Topic offsets collection. This metric can be very resource intensive to collect especially against many topics."`
alvarocabanas marked this conversation as resolved.
Show resolved Hide resolved

// Consumer offset arguments
ConsumerOffset bool `default:"false" help:"Populate consumer offset data"`
ConsumerGroups string `default:"{}" help:"DEPRECATED -- JSON Object whitelist of consumer groups to their topics and topics to their partitions, in which to collect consumer offsets for."`
ConsumerGroupRegex string `default:"" help:"A regex pattern matching the consumer groups to collect"`
ConsumerOffset bool `default:"false" help:"Populate consumer offset data"`
ConsumerGroupRegex string `default:"" help:"A regex pattern matching the consumer groups to collect"`
ConsumerGroupOffsetByTopic bool `default:"false" help:"Report consumer-group offset metrics by topic"`
InactiveConsumerGroupOffset bool `default:"false" help:"Collect offset from consumer-groups with inactive consumers"`
sigilioso marked this conversation as resolved.
Show resolved Hide resolved

Timeout int `default:"10000" help:"Timeout in milliseconds per single JMX query."`

Expand Down
3 changes: 0 additions & 3 deletions src/args/args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func TestParseArgs(t *testing.T) {
TopicBucket: `1/3`,
Timeout: 1000,
ConsumerOffset: false,
ConsumerGroups: "[]",
ConsumerGroupRegex: ".*",
SaslMechanism: "PLAIN",
SaslUsername: "admin3",
Expand Down Expand Up @@ -99,7 +98,6 @@ func TestParseArgs(t *testing.T) {
TopicBucket: TopicBucket{1, 3},
Timeout: 1000,
ConsumerOffset: false,
ConsumerGroups: nil,
ConsumerGroupRegex: regexp.MustCompile(".*"),
SaslMechanism: "PLAIN",
SaslUsername: "admin3",
Expand Down Expand Up @@ -150,7 +148,6 @@ func TestDefaultArgs(t *testing.T) {
CollectTopicSize: false,
CollectTopicOffset: false,
ConsumerOffset: false,
ConsumerGroups: nil,
ConsumerGroupRegex: nil,
SaslGssapiKerberosConfigPath: "/etc/krb5.conf",
SaslMechanism: "GSSAPI",
Expand Down
21 changes: 6 additions & 15 deletions src/args/parsed_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ type ParsedArguments struct {
CollectTopicOffset bool

// Consumer offset arguments
ConsumerOffset bool
ConsumerGroups ConsumerGroups
ConsumerGroupRegex *regexp.Regexp
ConsumerOffset bool
ConsumerGroupRegex *regexp.Regexp
ConsumerGroupOffsetByTopic bool
InactiveConsumerGroupOffset bool

Timeout int `default:"10000" help:"Timeout in milliseconds per single JMX query."`

Expand Down Expand Up @@ -236,13 +237,6 @@ func ParseArgs(a ArgumentList) (*ParsedArguments, error) {
NumBuckets: numBuckets,
}

// Parse consumser offset args
consumerGroups, err := unmarshalConsumerGroups(a.ConsumerOffset, a.ConsumerGroups)
if err != nil {
log.Error("Error with Consumer Group configuration: %s", err.Error())
return nil, err
}

var consumerGroupRegex *regexp.Regexp
if a.ConsumerGroupRegex != "" {
consumerGroupRegex, err = regexp.Compile(a.ConsumerGroupRegex)
Expand All @@ -252,10 +246,6 @@ func ParseArgs(a ArgumentList) (*ParsedArguments, error) {
}
}

if !a.CollectBrokerTopicData {
log.Warn("CollectBrokerTopicData has been deprecated. Currently it has no effect")
}

version, err := sarama.ParseKafkaVersion(a.KafkaVersion)
if err != nil {
return nil, fmt.Errorf("failed to parse kafka version: %s", err)
Expand Down Expand Up @@ -295,8 +285,9 @@ func ParseArgs(a ArgumentList) (*ParsedArguments, error) {
CollectTopicSize: a.CollectTopicSize,
CollectTopicOffset: a.CollectTopicOffset,
ConsumerOffset: a.ConsumerOffset,
ConsumerGroups: consumerGroups,
ConsumerGroupRegex: consumerGroupRegex,
ConsumerGroupOffsetByTopic: a.ConsumerGroupOffsetByTopic,
InactiveConsumerGroupOffset: a.InactiveConsumerGroupOffset,
SaslMechanism: a.SaslMechanism,
SaslUsername: a.SaslUsername,
SaslPassword: a.SaslPassword,
Expand Down
93 changes: 93 additions & 0 deletions src/consumeroffset/cgroup_metrics_aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package consumeroffset

import (
"github.com/newrelic/infra-integrations-sdk/log"
)

type (
consumerGroupID string
topic string
)

type CGroupAggregations struct {
consumerGroupRollup map[consumerGroupID]int
consumerGroupMaxLagRollup map[consumerGroupID]int
cGroupActiveClientsRollup map[clientID]struct{}

topicRollup map[topic]int
topicMaxLagRollup map[topic]int
topicActiveClientsRollup map[topic]map[clientID]struct{}
}

type CGroupMetricsAggregator struct {
cgroupMetrics CGroupAggregations
consumerGroup string
consumerGroupOffsetByTopic bool
}

func NewCGroupMetricsAggregator(consumerGroup string, consumerGroupOffsetByTopic bool) *CGroupMetricsAggregator {
return &CGroupMetricsAggregator{
consumerGroup: consumerGroup,
consumerGroupOffsetByTopic: consumerGroupOffsetByTopic,
cgroupMetrics: CGroupAggregations{
consumerGroupRollup: make(map[consumerGroupID]int),
consumerGroupMaxLagRollup: make(map[consumerGroupID]int),
cGroupActiveClientsRollup: make(map[clientID]struct{}),
topicRollup: make(map[topic]int),
topicMaxLagRollup: make(map[topic]int),
topicActiveClientsRollup: make(map[topic]map[clientID]struct{}),
},
}
}

// WaitAndAggregateMetrics waits for data from partitionLagChan and aggregates it, returning it when the channel closes
func (cma *CGroupMetricsAggregator) WaitAndAggregateMetrics(partitionLagChan chan partitionLagResult) {
log.Debug("Calculating consumer group lag rollup metrics for consumer group '%s'", cma.consumerGroup)
defer log.Debug("Finished calculating consumer group lag rollup metrics for consumer group '%s'", cma.consumerGroup)

for {
result, ok := <-partitionLagChan
if !ok {
break // channel has been closed
}

cGroupID := consumerGroupID(result.ConsumerGroup)
topicName := topic(result.Topic)
cID := clientID(result.ClientID)

// Add lag to the total lag for the consumer group
cma.cgroupMetrics.consumerGroupRollup[cGroupID] += result.Lag

// Calculate the max lag for the consumer group
if result.Lag > cma.cgroupMetrics.consumerGroupMaxLagRollup[cGroupID] {
cma.cgroupMetrics.consumerGroupMaxLagRollup[cGroupID] = result.Lag
}

// Add ClientID to number of active consumers
if cID != "" {
cma.cgroupMetrics.cGroupActiveClientsRollup[cID] = struct{}{}
}

// Add aggregation by topic
if cma.consumerGroupOffsetByTopic {
cma.cgroupMetrics.topicRollup[topicName] += result.Lag

// Calculate the max lag for the consumer group in this topic
if result.Lag > cma.cgroupMetrics.topicMaxLagRollup[topicName] {
cma.cgroupMetrics.topicMaxLagRollup[topicName] = result.Lag
}

// Add ClientID to number of active consumers for this topic
if cID != "" {
if _, ok := cma.cgroupMetrics.topicActiveClientsRollup[topicName]; !ok {
cma.cgroupMetrics.topicActiveClientsRollup[topicName] = map[clientID]struct{}{}
}
cma.cgroupMetrics.topicActiveClientsRollup[topicName][cID] = struct{}{}
}
}
}
}

func (cma *CGroupMetricsAggregator) GetAggregatedMetrics() CGroupAggregations {
return cma.cgroupMetrics
}
40 changes: 40 additions & 0 deletions src/consumeroffset/client_metrics_aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package consumeroffset

import "github.com/newrelic/infra-integrations-sdk/log"

type (
clientID string
ClientAggregations map[clientID]int
)

type ClientMetricsAggregator struct {
clientMetrics ClientAggregations
consumerGroup string
}

func NewClientMetricsAggregator(consumerGroup string) *ClientMetricsAggregator {
return &ClientMetricsAggregator{
consumerGroup: consumerGroup,
clientMetrics: make(map[clientID]int),
}
}

// WaitAndAggregateMetrics waits for data from partitionLagChan and aggregates it, returning it when the channel closes
func (cma *ClientMetricsAggregator) WaitAndAggregateMetrics(partitionLagChan chan partitionLagResult) {
log.Debug("Calculating consumer lag rollup metrics for consumer group '%s'", cma.consumerGroup)
defer log.Debug("Finished calculating consumer lag rollup metrics for consumer group '%s'", cma.consumerGroup)

for {
result, ok := <-partitionLagChan
if !ok {
break // channel has been closed
}

// Add lag to the total lag for the client
cma.clientMetrics[clientID(result.ClientID)] += result.Lag
}
}

func (cma *ClientMetricsAggregator) GetAggregatedMetrics() ClientAggregations {
return cma.clientMetrics
}
sigilioso marked this conversation as resolved.
Show resolved Hide resolved
112 changes: 54 additions & 58 deletions src/consumeroffset/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@ import (
"sync"

"github.com/Shopify/sarama"

"github.com/newrelic/infra-integrations-sdk/data/attribute"
"github.com/newrelic/infra-integrations-sdk/integration"
"github.com/newrelic/infra-integrations-sdk/log"
"github.com/newrelic/nri-kafka/src/args"
"github.com/newrelic/nri-kafka/src/connection"
)

const (
nrConsumerGroupEntity = "ka-consumer-group"
nrConsumerGroupTopicEntity = "ka-consumer-group-topic"
nrConsumerEntity = "ka-consumer"
nrPartitionConsumerEntity = "ka-partition-consumer"
)

var ErrNoConsumerGroupRegex = errors.New("if consumer_offset is set, consumer_group_regex must also be set")

type partitionOffsets struct {
Topic string `metric_name:"topic" source_type:"attribute"`
Partition string `metric_name:"partition" source_type:"attribute"`
Expand All @@ -29,73 +39,59 @@ type TopicPartitions map[string][]int32
func Collect(client connection.Client, kafkaIntegration *integration.Integration) error {
clusterAdmin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
return fmt.Errorf("failed to create cluster admin: %s", err)
return fmt.Errorf("failed to create cluster admin: %w", err)
}

// Use the more modern collection method if the configuration exists
if args.GlobalArgs.ConsumerGroupRegex != nil {
consumerGroupMap, err := clusterAdmin.ListConsumerGroups()
if err != nil {
return fmt.Errorf("failed to get list of consumer groups: %s", err)
}
consumerGroupList := make([]string, 0, len(consumerGroupMap))
for consumerGroup := range consumerGroupMap {
consumerGroupList = append(consumerGroupList, consumerGroup)
}
log.Debug("Retrieved the list of consumer groups: %v", consumerGroupList)
if args.GlobalArgs.ConsumerGroupRegex == nil {
return ErrNoConsumerGroupRegex
}

consumerGroups, err := clusterAdmin.DescribeConsumerGroups(consumerGroupList)
if err != nil {
return fmt.Errorf("failed to get consumer group descriptions: %s", err)
}
log.Debug("Retrieved the descriptions of all consumer groups")

var unmatchedConsumerGroups []string
var wg sync.WaitGroup
for _, consumerGroup := range consumerGroups {
if args.GlobalArgs.ConsumerGroupRegex.MatchString(consumerGroup.GroupId) {
wg.Add(1)
go collectOffsetsForConsumerGroup(client, clusterAdmin, consumerGroup.GroupId, consumerGroup.Members, kafkaIntegration, &wg)
} else {
unmatchedConsumerGroups = append(unmatchedConsumerGroups, consumerGroup.GroupId)
}
}
consumerGroupMap, err := clusterAdmin.ListConsumerGroups()
if err != nil {
return fmt.Errorf("failed to get list of consumer groups: %w", err)
}
consumerGroupList := make([]string, 0, len(consumerGroupMap))
for consumerGroup := range consumerGroupMap {
consumerGroupList = append(consumerGroupList, consumerGroup)
}
log.Debug("Retrieved the list of consumer groups: %v", consumerGroupList)

if len(unmatchedConsumerGroups) > 0 {
log.Debug("Skipped collecting consumer offsets for unmatched consumer groups %v", unmatchedConsumerGroups)
consumerGroups, err := clusterAdmin.DescribeConsumerGroups(consumerGroupList)
if err != nil {
return fmt.Errorf("failed to get consumer group descriptions: %w", err)
}
log.Debug("Retrieved the descriptions of all consumer groups")

topicOffsetGetter := NewSaramaTopicOffsetGetter(client)
cAdminConsumerGroupTopicLister := NewCAdminConsumerGroupTopicLister(clusterAdmin)

var unmatchedConsumerGroups []string
var wg sync.WaitGroup
for _, consumerGroup := range consumerGroups {
if args.GlobalArgs.ConsumerGroupRegex.MatchString(consumerGroup.GroupId) {
wg.Add(1)
go func(consumerGroup *sarama.GroupDescription) {
collectOffsetsForConsumerGroup(
cAdminConsumerGroupTopicLister,
consumerGroup.GroupId,
consumerGroup.Members,
kafkaIntegration,
topicOffsetGetter,
)
wg.Done()
}(consumerGroup)
} else {
unmatchedConsumerGroups = append(unmatchedConsumerGroups, consumerGroup.GroupId)
}
}

wg.Wait()
} else if len(args.GlobalArgs.ConsumerGroups) != 0 {
alvarocabanas marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("Argument 'consumer_groups' is deprecated and will be removed in a future version. Use 'consumer_group_regex' instead.")
// We retrieve the offsets for each group before calculating the high water mark
// so that the lag is never negative
for consumerGroup, topics := range args.GlobalArgs.ConsumerGroups {
topicPartitions := fillTopicPartitions(consumerGroup, topics, client)
if len(topicPartitions) == 0 {
log.Error("No topics specified for consumer group '%s'", consumerGroup)
continue
}

offsetData, err := getConsumerOffsets(consumerGroup, topicPartitions, client)
if err != nil {
log.Info("Failed to collect consumerOffsets for group %s: %v", consumerGroup, err)
}
highWaterMarks, err := getHighWaterMarks(topicPartitions, client)
if err != nil {
log.Info("Failed to collect highWaterMarks for group %s: %v", consumerGroup, err)
}

offsetStructs := populateOffsetStructs(offsetData, highWaterMarks)

if err := setMetrics(consumerGroup, offsetStructs, kafkaIntegration); err != nil {
log.Error("Error setting metrics for consumer group '%s': %s", consumerGroup, err.Error())
}
}
} else {
return errors.New("if consumer_offset is set, either consumer_group_regex or consumer_groups (deprecated) must also be set")
if len(unmatchedConsumerGroups) > 0 {
log.Debug("Skipped collecting consumer offsets for unmatched consumer groups %v", unmatchedConsumerGroups)
}

wg.Wait()

return nil
}

Expand Down
Loading