Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Ability to extend and configure desired sink to report lag metrics, adding support to push lag metrics into InfluxDB as well. #130

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ lazy val kafkaLagExporter =
EmbeddedKafka,
AkkaHttp
),
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-influxdb" % "1.1.2",
dockerRepository := Option(System.getenv("DOCKER_REPOSITORY")).orElse(None),
dockerUsername := Option(System.getenv("DOCKER_USERNAME")).orElse(Some("lightbend")),
// Based on best practices found in OpenShift Creating images guidelines
Expand Down Expand Up @@ -216,4 +217,4 @@ lazy val packageJavaApp = ReleaseStep(
val ref = extracted.get(thisProjectRef)
extracted.runAggregated(packageBin in Universal in ref, st)
}
)
)
23 changes: 16 additions & 7 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ object AppConfig {
val c = config.getConfig("kafka-lag-exporter")
val pollInterval = c.getDuration("poll-interval").toScala
val lookupTableSize = c.getInt("lookup-table-size")
val port = c.getInt("port")
val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList
val sinkType = if (c.hasPath("sink"))
c.getString("sink")
else "PrometheusEndpointSink"

val sinkConfig = sinkType match {
case "PrometheusEndpointSink" => new PrometheusEndpointSinkConfig(sinkType, metricWhitelist, c)
case "InfluxDBPusherSink" => new InfluxDBPusherSinkConfig(sinkType, metricWhitelist, c)
}

val clientGroupId = c.getString("client-group-id")
val kafkaClientTimeout = c.getDuration("kafka-client-timeout").toScala
val clusters = c.getConfigList("clusters").asScala.toList.map { clusterConfig =>
Expand Down Expand Up @@ -66,8 +75,8 @@ object AppConfig {
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList
AppConfig(pollInterval, lookupTableSize, port, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher, metricWhitelist)

AppConfig(pollInterval, lookupTableSize, sinkConfig, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher)
}

// Copied from Alpakka Kafka
Expand Down Expand Up @@ -120,8 +129,8 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String,
""".stripMargin
}
}
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, port: Int, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean, metricWhitelist: List[String]) {
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, sinkConfig: SinkConfig, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) {
override def toString(): String = {
val clusterString =
if (clusters.isEmpty)
Expand All @@ -130,8 +139,8 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p
s"""
|Poll interval: $pollInterval
|Lookup table size: $lookupTableSize
|Prometheus metrics endpoint port: $port
|Prometheus metrics whitelist: [${metricWhitelist.mkString(", ")}]
|Sink : $sinkConfig.sinkType
|Sink metrics whitelist: [${sinkConfig.metricWhitelist.mkString(", ")}]
|Admin client consumer group id: $clientGroupId
|Kafka client timeout: $clientTimeout
|Statically defined Clusters:
Expand Down
154 changes: 154 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import com.lightbend.kafkalagexporter.MetricsSink._
import com.lightbend.kafkalagexporter.InfluxDBPusherSink.{ClusterGlobalLabels, ClusterName}
import org.influxdb.{InfluxDB, InfluxDBFactory, BatchOptions}
import org.influxdb.InfluxDB.ConsistencyLevel
import org.influxdb.dto.{Query, Point, BatchPoints}
import java.io.IOException
import java.util.function.Consumer
import java.util.function.BiConsumer
import org.influxdb.dto.QueryResult
import java.lang.Iterable
import com.typesafe.scalalogging.Logger

import scala.util.Try

object InfluxDBPusherSink
{
type ClusterName = String
type GlobalLabels = Map[String, String]
type ClusterGlobalLabels = Map[ClusterName, GlobalLabels]

def apply(sinkConfig: InfluxDBPusherSinkConfig, definitions: MetricDefinitions, clusterGlobalLabels: ClusterGlobalLabels): MetricsSink =
{
Try(new InfluxDBPusherSink(sinkConfig, definitions, clusterGlobalLabels))
.fold(t => throw new IOException("Could not create Influx DB Pusher Sink", t), sink => sink)
}
}

class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, definitions: MetricDefinitions, clusterGlobalLabels: ClusterGlobalLabels) extends MetricsSink {

val logger = Logger("InfluxDBPusherSink")
val influxDB = connect()
createDatabase()
enableBatching()

private[kafkalagexporter] val globalLabelNames: List[String] = {
clusterGlobalLabels.values.flatMap(_.keys).toList.distinct
}

override def report(m: MetricValue): Unit = {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches)) {
if (!m.value.isNaN) {
write(m)
}
}
}

def write(m: MetricValue): Unit = {
try {
val point = buildPoint(m)
if (sinkConfig.async)
writeAsync(point)
else
writeSync(point)
} catch {
case t: Throwable =>
handlingFailure(t)
}
}

def writeAsync(point: Point): Unit = {
influxDB.write(point)
}

def writeSync(point: Point): Unit = {
val batchPoints = BatchPoints
.database(sinkConfig.databaseName)
.tag("async", "true")
.consistency(ConsistencyLevel.ALL)
.build()

batchPoints.point(point)
influxDB.write(batchPoints)
}

def buildPoint(m: MetricValue): Point = {
val point = Point.measurement(m.definition.name)
val fields = m.definition.labels zip m.labels
fields.foreach { field => point.tag(field._1, field._2) }
point.addField("value", m.value)
return point.build()
}

override def remove(m: RemoveMetric): Unit = {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches))
logger.warn("Remove is not supported by InfluxDBPusherSink")
}

def enableBatching(): Unit = {
if (sinkConfig.async) {
influxDB.setDatabase(sinkConfig.databaseName)
influxDB.enableBatch(BatchOptions.DEFAULTS.exceptionHandler(createExceptionHandler()))
}
}

def connect(): InfluxDB =
{
val url = sinkConfig.endPoint + ":" + sinkConfig.port
if (!sinkConfig.username.isEmpty) return InfluxDBFactory.connect(url, sinkConfig.username, sinkConfig.password)
else return InfluxDBFactory.connect(url)
}

def createDatabase() =
{
influxDB.query(new Query("CREATE DATABASE " + sinkConfig.databaseName, sinkConfig.databaseName), successQueryHandler(), failQueryHandler())
}

def successQueryHandler(): Consumer[QueryResult] =
{
return new Consumer[QueryResult] {
override def accept(result:QueryResult): Unit = {
logger.info(result.toString())
}
}
}

def failQueryHandler(): Consumer[Throwable] =
{
return new Consumer[Throwable] {
override def accept(throwable:Throwable): Unit = {
handlingFailure(throwable)
}
}
}

def createExceptionHandler(): BiConsumer[Iterable[Point], Throwable] =
{
return new BiConsumer[Iterable[Point], Throwable] {
override def accept(failedPoints:Iterable[Point], throwable:Throwable): Unit = {
handlingFailure(throwable)
}
}
}

def handlingFailure(t: Throwable): Unit = {
logger.error("Unrecoverable exception, will stop ", t)
stop()
throw t
}

override def stop(): Unit = {
influxDB.close()
}

def getGlobalLabelValuesOrDefault(clusterName: ClusterName): List[String] = {
val globalLabelValuesForCluster = clusterGlobalLabels.getOrElse(clusterName, Map.empty)
globalLabelNames.map(l => globalLabelValuesForCluster.getOrElse(l, ""))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import com.typesafe.config.Config

class InfluxDBPusherSinkConfig(sinkType: String, metricWhitelist: List[String], config: Config) extends SinkConfig(sinkType, metricWhitelist, config)
{
val endPoint: String = config.getString("end-point")
val port: Int = config.getInt("port")
val defaultDatabase: String = "kafka_lag_exporter"
var databaseName: String = if (config.hasPath("database")) config.getString("database") else defaultDatabase
var username: String = if (config.hasPath("username")) config.getString("username") else ""
val password: String = if (config.hasPath("password")) config.getString("password") else ""
val async: Boolean = if (config.hasPath("async")) config.getBoolean("async") else true
}
11 changes: 7 additions & 4 deletions src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import java.util.concurrent.Executors
import akka.actor.typed.ActorSystem
import com.typesafe.config.{Config, ConfigFactory}
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.HTTPServer

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
Expand All @@ -31,9 +30,13 @@ object MainApp extends App {

val clientCreator = (cluster: KafkaCluster) =>
KafkaClient(cluster, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc)
val server = new HTTPServer(appConfig.port)
val endpointCreator = () => PrometheusEndpointSink(Metrics.definitions, appConfig.metricWhitelist,
appConfig.clustersGlobalLabels(), server, CollectorRegistry.defaultRegistry)

val endpointCreator = () => appConfig.sinkConfig.sinkType match {
case "PrometheusEndpointSink" => PrometheusEndpointSink(appConfig.sinkConfig.asInstanceOf[PrometheusEndpointSinkConfig], Metrics.definitions,
appConfig.clustersGlobalLabels(), CollectorRegistry.defaultRegistry)
case "InfluxDBPusherSink" => InfluxDBPusherSink(appConfig.sinkConfig.asInstanceOf[InfluxDBPusherSinkConfig], Metrics.definitions,
appConfig.clustersGlobalLabels())
}

ActorSystem(
KafkaClusterManager.init(appConfig, endpointCreator, clientCreator), "kafka-lag-exporter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object MetricsReporter {
case (context, Stop(sender)) =>
Behaviors.stopped { () =>
metricsSink.stop()
context.log.info("Gracefully stopped Prometheus metrics endpoint HTTP server")
context.log.info("Gracefully stopped metrics sink")
sender ! KafkaClusterManager.Done
}
case (context, m) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ object PrometheusEndpointSink {
type ClusterGlobalLabels = Map[ClusterName, GlobalLabels]
type Metrics = Map[GaugeDefinition, Gauge]

def apply(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
server: HTTPServer, registry: CollectorRegistry): MetricsSink = {
Try(new PrometheusEndpointSink(definitions, metricWhitelist, clusterGlobalLabels, server, registry))
def apply(sinkConfig: PrometheusEndpointSinkConfig, definitions: MetricDefinitions, clusterGlobalLabels: ClusterGlobalLabels, registry: CollectorRegistry): MetricsSink = {
Try(new PrometheusEndpointSink(sinkConfig: PrometheusEndpointSinkConfig, definitions, clusterGlobalLabels, new HTTPServer(sinkConfig.port), registry))
.fold(t => throw new Exception("Could not create Prometheus Endpoint", t), sink => sink)
}

def apply(sinkConfig: PrometheusEndpointSinkConfig, definitions: MetricDefinitions, clusterGlobalLabels: ClusterGlobalLabels, server: HTTPServer, registry: CollectorRegistry): MetricsSink = {
Try(new PrometheusEndpointSink(sinkConfig: PrometheusEndpointSinkConfig, definitions, clusterGlobalLabels, server, registry))
.fold(t => throw new Exception("Could not create Prometheus Endpoint", t), sink => sink)
}
}

class PrometheusEndpointSink private(definitions: MetricDefinitions, metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
class PrometheusEndpointSink private(sinkConfig: PrometheusEndpointSinkConfig, definitions: MetricDefinitions, clusterGlobalLabels: ClusterGlobalLabels,
server: HTTPServer, registry: CollectorRegistry) extends MetricsSink {
DefaultExports.initialize()

Expand All @@ -34,24 +38,24 @@ class PrometheusEndpointSink private(definitions: MetricDefinitions, metricWhite
}

private val metrics: Metrics = {
definitions.filter(d => metricWhitelist.exists(d.name.matches)).map { d =>
definitions.filter(d => sinkConfig.metricWhitelist.exists(d.name.matches)).map { d =>
d -> Gauge.build()
.name(d.name)
.help(d.help)
.labelNames(globalLabelNames ++ d.labels: _*)
.register(registry)
}.toMap
}
}

override def report(m: MetricValue): Unit = {
if (metricWhitelist.exists(m.definition.name.matches)) {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches)) {
val metric = metrics.getOrElse(m.definition, throw new IllegalArgumentException(s"No metric with definition ${m.definition.name} registered"))
metric.labels(getGlobalLabelValuesOrDefault(m.clusterName) ++ m.labels: _*).set(m.value)
}
}

override def remove(m: RemoveMetric): Unit = {
if (metricWhitelist.exists(m.definition.name.matches)) {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches)) {
for {
gauge <- metrics.get(m.definition)
} {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import com.typesafe.config.Config
import java.net.ServerSocket
import scala.util.{Failure, Success, Try}

class PrometheusEndpointSinkConfig(sinkType: String, metricWhitelist: List[String], config: Config) extends SinkConfig(sinkType, metricWhitelist, config)
{
val port: Int = getPort(config)

def getPort(config: Config): Int = {
if (config.hasPath("port"))
config.getInt("port")
else
Try(new ServerSocket(0)) match {
case Success(socket) =>
val freePort = socket.getLocalPort
socket.close()
freePort
case Failure(exception) => throw exception
}
}
}
10 changes: 10 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/SinkConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter
import com.typesafe.config.{Config}

abstract class SinkConfig(val sinkType: String, val metricWhitelist: List[String], val config: Config)
{
}
Loading