Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Add metric to represent a consumer group's total offset lag per topic #93

Merged
merged 2 commits into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ Labels: `cluster_name, group, is_simple_consumer`

The highest (maximum) lag in time for a given consumer group.

**`kafka_consumergroup_group_sum_lag`**

Labels: `cluster_name, group`

The sum of the difference between the last produced offset and the last consumed offset of all partitions for this group.

**`kafka_consumergroup_group_topic_sum_lag`**

Labels: `cluster_name, group, topic`

The sum of the difference between the last produced offset and the last consumed offset of all partitions in this topic for this group.

**`kafka_partition_latest_offset`**

Labels: `cluster_name, topic, partition`
Expand All @@ -101,7 +113,6 @@ Labels: `cluster_name, topic, partition`

The earliest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the earliest available offset. The earliest available offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the offset-based volume of a partition in certain panels.


### Labels

Each metric may include the following labels when reported. If you define the `labels` property for configuration of a cluster then those labels will also be included.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,21 @@ object ConsumerGroupCollector {
GroupPartitionLag(gtp, offsetLag, timeLag)
}

for((group, values) <- groupLag.groupBy(_.gtp.id)) {
val maxOffsetLag = values.maxBy(_.offsetLag)
val maxTimeLag = values.maxBy(_.timeLag)
for((group, groupValues) <- groupLag.groupBy(_.gtp.id)) {
val maxOffsetLag = groupValues.maxBy(_.offsetLag)
val maxTimeLag = groupValues.maxBy(_.timeLag)

reporter ! Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, config.cluster.name, group, maxOffsetLag.offsetLag)
reporter ! Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, config.cluster.name, group, maxTimeLag.timeLag)

dylanmei marked this conversation as resolved.
Show resolved Hide resolved
val sumOffsetLag = groupValues.map(_.offsetLag).sum
reporter ! Metrics.GroupValueMessage(Metrics.SumGroupOffsetLagMetric, config.cluster.name, group, sumOffsetLag)

for((topic, topicValues) <- groupValues.groupBy(_.gtp.topic)) {
val topicOffsetLag = topicValues.map(_.offsetLag).sum

reporter ! Metrics.GroupTopicValueMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic, topicOffsetLag)
}
}
}

Expand Down Expand Up @@ -269,12 +278,18 @@ object ConsumerGroupCollector {
groups.foreach { group =>
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupOffsetLagMetric, config.cluster.name, group)
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, config.cluster.name, group)
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.SumGroupOffsetLagMetric, config.cluster.name, group)
}
gtps.foreach { gtp =>
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.LastGroupOffsetMetric, config.cluster.name, gtp)
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.OffsetLagMetric, config.cluster.name, gtp)
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.TimeLagMetric, config.cluster.name, gtp)
}

for {
(group, gtps) <- gtps.groupBy(_.id)
topic <- gtps.map(_.topic).distinct
} reporter ! Metrics.GroupTopicRemoveMetricMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic)
}
}
}
37 changes: 34 additions & 3 deletions src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ object Metrics {
def topicPartition: Domain.TopicPartition
override def labels: List[String] =
List(
clusterName,
topicPartition.topic,
clusterName, topicPartition.topic,
topicPartition.partition.toString
)
}
Expand Down Expand Up @@ -54,6 +53,22 @@ object Metrics {
final case class GroupPartitionValueMessage(definition: GaugeDefinition, clusterName: String, gtp: Domain.GroupTopicPartition, value: Double) extends GroupPartitionMessage with MetricValue
final case class GroupPartitionRemoveMetricMessage(definition: GaugeDefinition, clusterName: String, gtp: Domain.GroupTopicPartition) extends GroupPartitionMessage with RemoveMetric

sealed trait GroupTopicMessage extends Message with Metric {
def definition: GaugeDefinition
def clusterName: String
def group: String
def topic: String
override def labels: List[String] =
List(
clusterName,
group,
topic
)
}

final case class GroupTopicValueMessage(definition: GaugeDefinition, clusterName: String, group: String, topic: String, value: Double) extends GroupTopicMessage with MetricValue
final case class GroupTopicRemoveMetricMessage(definition: GaugeDefinition, clusterName: String, group: String, topic: String) extends GroupTopicMessage with RemoveMetric

val topicPartitionLabels = List("cluster_name", "topic", "partition")

val LatestOffsetMetric = GaugeDefinition(
Expand Down Expand Up @@ -82,6 +97,12 @@ object Metrics {
groupLabels
)

val SumGroupOffsetLagMetric = GaugeDefinition(
"kafka_consumergroup_group_sum_lag",
"Sum of group offset lag",
groupLabels
)

val groupPartitionLabels = List("cluster_name", "group", "topic", "partition", "member_host", "consumer_id", "client_id")

val LastGroupOffsetMetric = GaugeDefinition(
Expand All @@ -102,13 +123,23 @@ object Metrics {
groupPartitionLabels
)

val groupTopicLabels = List("cluster_name", "group", "topic")

val SumGroupTopicOffsetLagMetric = GaugeDefinition(
"kafka_consumergroup_group_topic_sum_lag",
"Sum of group offset lag across topic partitions",
groupTopicLabels
)

val definitions = List(
LatestOffsetMetric,
EarliestOffsetMetric,
MaxGroupOffsetLagMetric,
MaxGroupTimeLagMetric,
LastGroupOffsetMetric,
OffsetLagMetric,
TimeLagMetric
TimeLagMetric,
SumGroupOffsetLagMetric,
SumGroupTopicOffsetLagMetric
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi

val metrics = reporter.receiveAll()

"report 7 metrics" in { metrics.length shouldBe 7 }
"report 9 metrics" in { metrics.length shouldBe 9 }

"earliest offset metric" in {
metrics should contain(
Expand Down Expand Up @@ -78,6 +78,14 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
"max group time lag metric" in {
metrics should contain(GroupValueMessage(MaxGroupTimeLagMetric, config.cluster.name, groupId, value = 0.02))
}

"sum group offset lag metric" in {
metrics should contain(GroupValueMessage(SumGroupOffsetLagMetric, config.cluster.name, groupId, value = 20))
}

"sum topic offset lag metric" in {
metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, config.cluster.name, groupId, topic, value = 20))
}
}

"ConsumerGroupCollector should calculate max group metrics and send" - {
Expand Down Expand Up @@ -126,6 +134,57 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
}
}

"ConsumerGroupCollector should sum the group offset lag metrics and send" - {
val reporter = TestInbox[MetricsSink.Message]()

val lookupTable = Table(20)
lookupTable.addPoint(Point(100, 100))

val state = ConsumerGroupCollector.CollectorState(
topicPartitionTables = TopicPartitionTable(config.lookupTableSize, Map(
topicPartition0 -> lookupTable.copy(),
topicPartition1 -> lookupTable.copy(),
topicPartition2 -> lookupTable.copy(),
topic2Partition0 -> lookupTable.copy()
)),
)

val behavior = ConsumerGroupCollector.collector(config, client, reporter.ref, state)
val testKit = BehaviorTestKit(behavior)

val newEarliestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 0, time = 100),
topicPartition1 -> Point(offset = 0, time = 100),
topicPartition2 -> Point(offset = 0, time = 100),
topic2Partition0 -> Point(offset = 0, time = 100)
)
val newLatestOffsets = PartitionOffsets(
topicPartition0 -> Point(offset = 100, time = 200),
topicPartition1 -> Point(offset = 100, time = 200),
topicPartition2 -> Point(offset = 100, time = 200),
topic2Partition0 -> Point(offset = 100, time = 200)
)
val newLastGroupOffsets = GroupOffsets(
gtp0 -> Some(Point(offset = 10, time = 200)),
gtp1 -> Some(Point(offset = 20, time = 200)),
gtp2 -> Some(Point(offset = 30, time = 200)),
gt2p0 -> Some(Point(offset = 40, time = 200))
)

testKit.run(ConsumerGroupCollector.OffsetsSnapshot(timestamp = timestampNow, List(groupId), newEarliestOffsets, newLatestOffsets, newLastGroupOffsets))

val metrics = reporter.receiveAll()

"sum of offset lag metric" in {
metrics should contain(GroupValueMessage(SumGroupOffsetLagMetric, clusterName, groupId, value = 300))
}

"sum of offset lag by topic metric" in {
metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic, value = 240))
metrics should contain(GroupTopicValueMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic2, value = 60))
}
}

"ConsumerGroupCollector should report group offset, lag, and time lag as NaN when no group offsets found" - {
val reporter = TestInbox[MetricsSink.Message]()

Expand Down Expand Up @@ -188,6 +247,12 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
metrics should contain(
Metrics.TopicPartitionValueMessage(LatestOffsetMetric, config.cluster.name, topicPartition0, value = 200))
}

"topic offset lag metric" in {
metrics.collectFirst {
case GroupTopicValueMessage(`SumGroupTopicOffsetLagMetric`, config.cluster.name, `groupId`, `topic`, value) if value.isNaN => true
}.nonEmpty shouldBe true
}
}

"ConsumerGroupCollector should evict data when group metadata changes" - {
Expand Down Expand Up @@ -252,6 +317,7 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
metrics should contain(GroupPartitionRemoveMetricMessage(TimeLagMetric, clusterName, gtpSingleMember))
metrics should contain(GroupRemoveMetricMessage(MaxGroupTimeLagMetric, clusterName, groupId))
metrics should contain(GroupRemoveMetricMessage(MaxGroupOffsetLagMetric, clusterName, groupId))
metrics should contain(GroupTopicRemoveMetricMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic))
}

"remove metrics for topic partitions no longer being reported" - {
Expand Down Expand Up @@ -284,6 +350,8 @@ class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData wi
metrics should contain(GroupPartitionRemoveMetricMessage(TimeLagMetric, clusterName, gtpSingleMember))
metrics should contain(GroupRemoveMetricMessage(MaxGroupTimeLagMetric, clusterName, groupId))
metrics should contain(GroupRemoveMetricMessage(MaxGroupOffsetLagMetric, clusterName, groupId))
metrics should contain(GroupRemoveMetricMessage(SumGroupOffsetLagMetric, clusterName, groupId))
metrics should contain(GroupTopicRemoveMetricMessage(SumGroupTopicOffsetLagMetric, clusterName, groupId, topic))
}

"topic partition in topic partition table removed" in {
Expand Down