Skip to content

Commit

Permalink
Add metric to represent a consumer group's total offset lag per topic (
Browse files Browse the repository at this point in the history
…seglo#93)

* Add metric of a consumer group's total offset lag and total lag per topic
* Update name of group/topic aggregate metric
  • Loading branch information
dylanmei authored and anbarasan committed Nov 24, 2019
1 parent b63226f commit 8dd4006
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 9 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ Labels: `cluster_name, group, is_simple_consumer`

The highest (maximum) lag in time for a given consumer group.

**`kafka_consumergroup_group_sum_lag`**

Labels: `cluster_name, group`

The sum of the difference between the last produced offset and the last consumed offset of all partitions for this group.

**`kafka_consumergroup_group_topic_sum_lag`**

Labels: `cluster_name, group, topic`

The sum of the difference between the last produced offset and the last consumed offset of all partitions in this topic for this group.

**`kafka_partition_latest_offset`**

Labels: `cluster_name, topic, partition`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,20 @@ object ConsumerGroupCollector {
GroupPartitionLag(gtp, offsetLag, timeLag)
}

for((group, values) <- groupLag.groupBy(_.gtp.id)) {
val maxOffsetLag = values.maxBy(_.offsetLag)
val maxTimeLag = values.maxBy(_.timeLag)
for((group, groupValues) <- groupLag.groupBy(_.gtp.id)) {
val maxOffsetLag = groupValues.maxBy(_.offsetLag)
val maxTimeLag = groupValues.maxBy(_.timeLag)

reporter ! Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, config.cluster.name, group, maxOffsetLag.offsetLag)
reporter ! Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, config.cluster.name, group, maxTimeLag.timeLag)

val sumOffsetLag = groupValues.map(_.offsetLag).sum
reporter ! Metrics.GroupValueMessage(Metrics.SumGroupOffsetLagMetric, config.cluster.name, group, sumOffsetLag)

for((topic, topicValues) <- groupValues.groupBy(_.gtp.topic)) {
val topicOffsetLag = topicValues.map(_.offsetLag).sum
reporter ! Metrics.GroupTopicValueMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic, topicOffsetLag)
}
}
}

Expand Down Expand Up @@ -282,12 +290,18 @@ object ConsumerGroupCollector {
groups.foreach { group =>
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupOffsetLagMetric, config.cluster.name, group)
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, config.cluster.name, group)
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.SumGroupOffsetLagMetric, config.cluster.name, group)
}
gtps.foreach { gtp =>
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.LastGroupOffsetMetric, config.cluster.name, gtp)
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.OffsetLagMetric, config.cluster.name, gtp)
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.TimeLagMetric, config.cluster.name, gtp)
}

for {
(group, gtps) <- gtps.groupBy(_.id)
topic <- gtps.map(_.topic).distinct
} reporter ! Metrics.GroupTopicRemoveMetricMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic)
}
}

Expand Down
42 changes: 37 additions & 5 deletions src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ package com.lightbend.kafkalagexporter
import com.lightbend.kafkalagexporter.MetricsSink._

object Metrics {
sealed trait ClusterMessage extends Message with Metric {
def definition: GaugeDefinition
def clusterName: String
override def labels: List[String] =
List(
clusterName
)
}

final case class ClusterValueMessage(definition: GaugeDefinition, clusterName: String, value: Double) extends ClusterMessage with MetricValue

sealed trait TopicPartitionMessage extends Message with Metric {
def definition: GaugeDefinition
def clusterName: String
Expand Down Expand Up @@ -54,17 +65,21 @@ object Metrics {
final case class GroupPartitionValueMessage(definition: GaugeDefinition, clusterName: String, gtp: Domain.GroupTopicPartition, value: Double) extends GroupPartitionMessage with MetricValue
final case class GroupPartitionRemoveMetricMessage(definition: GaugeDefinition, clusterName: String, gtp: Domain.GroupTopicPartition) extends GroupPartitionMessage with RemoveMetric

sealed trait ClusterMessage extends Message with Metric {
sealed trait GroupTopicMessage extends Message with Metric {
def definition: GaugeDefinition
def clusterName: String
def group: String
def topic: String
override def labels: List[String] =
List(
clusterName
clusterName,
group,
topic
)
}

final case class ClusterValueMessage(definition: GaugeDefinition, clusterName: String, value: Double) extends ClusterMessage with MetricValue

final case class GroupTopicValueMessage(definition: GaugeDefinition, clusterName: String, group: String, topic: String, value: Double) extends GroupTopicMessage with MetricValue
final case class GroupTopicRemoveMetricMessage(definition: GaugeDefinition, clusterName: String, group: String, topic: String) extends GroupTopicMessage with RemoveMetric

val topicPartitionLabels = List("cluster_name", "topic", "partition")

Expand All @@ -84,6 +99,7 @@ object Metrics {

val ClusterLabels = List("cluster_name")


val MaxGroupOffsetLagMetric = GaugeDefinition(
"kafka_consumergroup_group_max_lag",
"Max group offset lag",
Expand All @@ -96,6 +112,12 @@ object Metrics {
groupLabels
)

val SumGroupOffsetLagMetric = GaugeDefinition(
"kafka_consumergroup_group_sum_lag",
"Sum of group offset lag",
groupLabels
)

val groupPartitionLabels = List("cluster_name", "group", "topic", "partition", "member_host", "consumer_id", "client_id")

val LastGroupOffsetMetric = GaugeDefinition(
Expand All @@ -116,8 +138,16 @@ object Metrics {
groupPartitionLabels
)

val groupTopicLabels = List("cluster_name", "group", "topic")

val SumGroupTopicOffsetLagMetric = GaugeDefinition(
"kafka_consumergroup_group_topic_sum_lag",
"Sum of group offset lag across topic partitions",
groupTopicLabels
)

val PollTimeMetric = GaugeDefinition(
"kafka_consumergroup_poll_time_seconds",
"kafka_consumergroup_poll_time_ms",
"Group time poll time",
ClusterLabels
)
Expand All @@ -130,6 +160,8 @@ object Metrics {
LastGroupOffsetMetric,
OffsetLagMetric,
TimeLagMetric,
SumGroupOffsetLagMetric,
SumGroupTopicOffsetLagMetric,
PollTimeMetric
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi

val metrics = reporter.receiveAll()

"report 7 metrics" in { metrics.length shouldBe 7 }
"report 9 metrics" in { metrics.length shouldBe 9 }

"earliest offset metric" in {
metrics should contain(
Expand Down Expand Up @@ -78,6 +78,14 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
"max group time lag metric" in {
metrics should contain(GroupValueMessage(MaxGroupTimeLagMetric, config.cluster.name, groupId, value = 0.02))
}

"sum group offset lag metric" in {
metrics should contain(GroupValueMessage(SumGroupOffsetLagMetric, config.cluster.name, groupId, value = 20))
}

"sum topic offset lag metric" in {
metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, config.cluster.name, groupId, topic, value = 20))
}
}

"ConsumerGroupCollector should calculate max group metrics and send" - {
Expand Down Expand Up @@ -126,6 +134,57 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
}
}

"ConsumerGroupCollector should sum the group offset lag metrics and send" - {
val reporter = TestInbox[MetricsSink.Message]()

val lookupTable = Table(20)
lookupTable.addPoint(Point(100, 100))

val state = ConsumerGroupCollector.CollectorState(
topicPartitionTables = TopicPartitionTable(config.lookupTableSize, Map(
topicPartition0 -> lookupTable.copy(),
topicPartition1 -> lookupTable.copy(),
topicPartition2 -> lookupTable.copy(),
topic2Partition0 -> lookupTable.copy()
)),
)

val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state)
val testKit = BehaviorTestKit(behavior)

val newEarliestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 0, time = 100),
topicPartition1 -> Point(offset = 0, time = 100),
topicPartition2 -> Point(offset = 0, time = 100),
topic2Partition0 -> Point(offset = 0, time = 100)
)
val newLatestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 100, time = 200),
topicPartition1 -> Point(offset = 100, time = 200),
topicPartition2 -> Point(offset = 100, time = 200),
topic2Partition0 -> Point(offset = 100, time = 200)
)
val newLastGroupOffsets = GroupOffsets(
gtp0 -> Some(Point(offset = 10, time = 200)),
gtp1 -> Some(Point(offset = 20, time = 200)),
gtp2 -> Some(Point(offset = 30, time = 200)),
gt2p0 -> Some(Point(offset = 40, time = 200))
)

testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets))

val metrics = reporter.receiveAll()

"sum of offset lag metric" in {
metrics should contain(GroupValueMessage(SumGroupOffsetLagMetric, clusterName, groupId, value = 300))
}

"sum of offset lag by topic metric" in {
metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic, value = 240))
metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic2, value = 60))
}
}

"ConsumerGroupCollector should report group offset, lag, and time lag as NaN when no group offsets found" - {
val reporter = TestInbox[MetricsSink.Message]()

Expand Down Expand Up @@ -188,6 +247,12 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
metrics should contain(
Metrics.TopicPartitionValueMessage(LatestOffsetMetric, config.cluster.name, topicPartition0, value = 200))
}

"topic offset lag metric" in {
metrics.collectFirst {
case GroupTopicValueMessage(`SumGroupTopicOffsetLagMetric`, config.cluster.name, `groupId`, `topic`, value) if value.isNaN => true
}.nonEmpty shouldBe true
}
}

"ConsumerGroupCollector should evict data when group metadata changes" - {
Expand Down Expand Up @@ -252,6 +317,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
metrics should contain(GroupPartitionRemoveMetricMessage(TimeLagMetric, clusterName, gtpSingleMember))
metrics should contain(GroupRemoveMetricMessage(MaxGroupTimeLagMetric, clusterName, groupId))
metrics should contain(GroupRemoveMetricMessage(MaxGroupOffsetLagMetric, clusterName, groupId))
metrics should contain(GroupTopicRemoveMetricMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic))
}

"remove metrics for topic partitions no longer being reported" - {
Expand Down Expand Up @@ -284,6 +350,8 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
metrics should contain(GroupPartitionRemoveMetricMessage(TimeLagMetric, clusterName, gtpSingleMember))
metrics should contain(GroupRemoveMetricMessage(MaxGroupTimeLagMetric, clusterName, groupId))
metrics should contain(GroupRemoveMetricMessage(MaxGroupOffsetLagMetric, clusterName, groupId))
metrics should contain(GroupRemoveMetricMessage(SumGroupOffsetLagMetric, clusterName, groupId))
metrics should contain(GroupTopicRemoveMetricMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic))
}

"topic partition in topic partition table removed" in {
Expand Down

0 comments on commit 8dd4006

Please sign in to comment.