Skip to content

Commit

Permalink
feat(kakfa): WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
paologallinaharbur authored and alvarocabanas committed Jun 21, 2022
1 parent 2ee397d commit 252744a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 71 deletions.
63 changes: 10 additions & 53 deletions src/consumeroffset/kafka_offset_collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,55 +221,6 @@ func createFetchRequest(topicPartitions TopicPartitions, client connection.Clien
return request
}

// populateOffsetStructs takes a map of offsets and high water marks and
// populates an array of partitionOffsets which can then be marshalled into metric
// sets
func populateOffsetStructs(offsets, hwms groupOffsets) []*partitionOffsets {

var poffsets []*partitionOffsets
for topic, partitions := range hwms {
for partition, hwm := range partitions {
offsetPointer := func() *int64 {
topicOffsets, ok := offsets[topic]
if !ok || len(topicOffsets) == 0 {
log.Error("Offset not collected for topic %s", topic, partition)
return nil
}

offset, ok := topicOffsets[partition]
if !ok || offset == -1 {
log.Error("Offset not collected for topic %s, partition %d", topic, partition)
return nil
}

return &offset
}()

lag := func() *int64 {
if offsetPointer == nil {
return nil
}

returnLag := hwm - *offsetPointer
return &returnLag
}()

poffset := &partitionOffsets{
Topic: topic,
Partition: strconv.Itoa(int(partition)),
ConsumerOffset: offsetPointer,
HighWaterMark: &hwm,
ConsumerLag: lag,
}

poffsets = append(poffsets, poffset)
}
}

return poffsets

}

func collectOffsetsForConsumerGroup(client connection.Client, clusterAdmin sarama.ClusterAdmin, consumerGroup string, members map[string]*sarama.GroupMemberDescription, kafkaIntegration *integration.Integration, wg *sync.WaitGroup) {
defer wg.Done()
log.Debug("Collecting offsets for consumer group '%s'", consumerGroup)
Expand Down Expand Up @@ -298,13 +249,14 @@ func collectOffsetsForConsumerGroup(client connection.Client, clusterAdmin saram

for topicName, topic := range topicMap {
topicPartitions := map[string][]int32{}
for partitionID, _ := range topic.ReplicaAssignment {
topicPartitions[topicName] = append(topicPartitions[topicName], partitionID)
for i := int32(0); i < topic.NumPartitions; i++ {
topicPartitions[topicName] = append(topicPartitions[topicName], i)
}

listGroupsResponse, _ := clusterAdmin.ListConsumerGroupOffsets(consumerGroup, topicPartitions)

for _, partitionMap := range listGroupsResponse.Blocks {

for partition, block := range partitionMap {

if block.Offset == -1 {
Expand All @@ -319,17 +271,22 @@ func collectOffsetsForConsumerGroup(client connection.Client, clusterAdmin saram
log.Error("Failed to get hwm for topic %s, partition %d: %s", topic, partition, err)
return
}

log.Error("%q %d %d %d", topicName, partition, offSetPartition[topicName][partition], block.Offset)

lag := offSetPartition[topicName][partition] - block.Offset

// Calculate the max lag for the consumer group
if lag > maxLag {
maxLag = lag
}

// Add lag to the total lag for the consumer group
totalLag += totalLag + lag
totalLag = totalLag + lag
log.Error("lag %d total %d", lag, totalLag)

}
}

}

err = ms.SetMetric("consumerGroup.totalLag", totalLag, metric.GAUGE)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ services:
"-jar",
"./target/kafka_dummy-1.0-jar-with-dependencies.jar"
]
command: ["consumer","kafka1:9092","topicA","groupA"]
command: ["consumer","kafka1:9092","topicB","groupA"]
networks:
- kfk
depends_on:
Expand Down
34 changes: 17 additions & 17 deletions tests/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,55 +327,55 @@ func TestKafkaIntegration_bootstrap_inventory(t *testing.T) {
}
}

func TestKafkaIntegration_consumer_offset(t *testing.T) {
bootstrapDiscoverConfigInventory := func(command []string) []string {
func TestKafkaIntegration_producer_test(t *testing.T) {
consumerConfig := func(command []string) []string {
return append(
bootstrapDiscoverConfig(command),
"--consumer_offset",
"--consumer_group_regex", ".*",
command,
"--producers", "[{\"name\": \"kafka_dummy_producer\", \"host\": \"kafka_dummy_producer\", \"port\": 1089}]",
)
}

stdout, stderr, err := runIntegration(t, bootstrapDiscoverConfigInventory)

stdout, stderr, err := runIntegration(t, consumerConfig)
assert.NotNil(t, stderr, "unexpected stderr")
assert.NoError(t, err, "Unexpected error")

schemaPath := filepath.Join("json-schema-files", "kafka-schema-consumer-offset.json")
schemaPath := filepath.Join("json-schema-files", "kafka-schema-producer.json")
err = jsonschema.Validate(schemaPath, stdout)
assert.NoError(t, err, "The output of kafka integration doesn't have expected format.")
}

func TestKafkaIntegration_producer_test(t *testing.T) {
func TestKafkaIntegration_consumer_test(t *testing.T) {
consumerConfig := func(command []string) []string {
return append(
command,
"--producers", "[{\"name\": \"kafka_dummy_producer\", \"host\": \"kafka_dummy_producer\", \"port\": 1089}]",
"--consumers", "[{\"name\": \"kafka_dummy_consumer\", \"host\": \"kafka_dummy_consumer\", \"port\": 1087},{\"name\": \"kafka_dummy_consumer2\", \"host\": \"kafka_dummy_consumer2\", \"port\": 1088}]",
)
}

stdout, stderr, err := runIntegration(t, consumerConfig)
assert.NotNil(t, stderr, "unexpected stderr")
assert.NoError(t, err, "Unexpected error")

schemaPath := filepath.Join("json-schema-files", "kafka-schema-producer.json")
schemaPath := filepath.Join("json-schema-files", "kafka-schema-consumer.json")
err = jsonschema.Validate(schemaPath, stdout)
assert.NoError(t, err, "The output of kafka integration doesn't have expected format.")
}

func TestKafkaIntegration_consumer_test(t *testing.T) {
consumerConfig := func(command []string) []string {
func TestKafkaIntegration_consumer_offset(t *testing.T) {
bootstrapDiscoverConfigInventory := func(command []string) []string {
return append(
command,
"--consumers", "[{\"name\": \"kafka_dummy_consumer\", \"host\": \"kafka_dummy_consumer\", \"port\": 1087},{\"name\": \"kafka_dummy_consumer2\", \"host\": \"kafka_dummy_consumer2\", \"port\": 1088}]",
bootstrapDiscoverConfig(command),
"--consumer_offset",
"--consumer_group_regex", ".*",
)
}

stdout, stderr, err := runIntegration(t, consumerConfig)
stdout, stderr, err := runIntegration(t, bootstrapDiscoverConfigInventory)

assert.NotNil(t, stderr, "unexpected stderr")
assert.NoError(t, err, "Unexpected error")

schemaPath := filepath.Join("json-schema-files", "kafka-schema-consumer.json")
schemaPath := filepath.Join("json-schema-files", "kafka-schema-consumer-offset.json")
err = jsonschema.Validate(schemaPath, stdout)
assert.NoError(t, err, "The output of kafka integration doesn't have expected format.")
}

0 comments on commit 252744a

Please sign in to comment.