diff --git a/README.md b/README.md index 2212f86f..be91999a 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,209 @@ -# Kafka Metrics Tools +# Kafka Lag Exporter -> A set of tools and libraries to expose Kafka performance metrics in the K8s and Prometheus ecosystem - -## [Kafka Lag Exporter](./kafka-lag-exporter/README.md) +> A Kafka consumer group lag exporter for Kubernetes The Kafka Lag Exporter is a Prometheus Exporter which will calculate the consumer lag for all consumer groups running -in a Kafka cluster. It exports several consumer group related metrics, including an extrapolation of consumer group -lag in seconds. See kafka-lag-exporter/README.md for more information. +in a Kafka cluster. It exports several consumer group related metrics, including an interpolation or extrapolation of +consumer group lag in seconds. + +We can calculate a reasonable approximation of consumer lag in seconds by applying a linear extrapolation formula to +predict the time that a consumer will reach the latest partition offset available based on previously measured +consumer group consumed offsets for the same partition. + +For each poll interval we associate all the latest consumed offsets with current system time (wall clock). After at +least two measurements are made we can extrapolate at what time an arbitrary offset in the future will be consumed. As +a refresher, linear interpolation and extrapolation is just estimating a point on a slope and estimating its +coordinates. [Read this post for more details.](https://math.tutorvista.com/calculus/extrapolation.html) + +## Metrics + +The following metrics are exposed: + +* `kafka_consumergroup_group_lag_seconds` - Extrapolated lag in seconds for each partition. +* `kafka_consumergroup_group_max_lag_seconds` - Max extrapolated lag in seconds for each consumer group. +* `kafka_consumergroup_group_lag` - Lag in offsets for each partition (latest offset - last consumed offset). +* `kafka_consumergroup_group_max_lag` - Max offset lag for each consumer group. +* `kafka_consumergroup_group_offset` - Last consumed offset for each consumer group partition. +* `kafka_partition_latest_offset` - Latest offset available for each partition. + +## Configuration + +Details for configuration for the Helm Chart can be found in the [`values.yaml`](./charts/kafka-lag-exporter/values.yaml) +file of the accompanying Helm Chart. + +## Install with Helm + +You can install the chart from the local filesystem. + +``` +helm install ./charts/kafka-lag-exporter +``` + +Or from the lightbend Helm Chart repository. + +``` +helm repo add lightbend https://repo.lightbend.com/helm-charts +helm install lightbend/kafka-lag-exporter +``` + +### Examples + +Install with the [Strimzi](https://strimzi.io/) Kafka discovery feature. +See [Strimzi Kafka Cluster Watcher](#strimzi-kafka-cluster-watcher) for more details. + +``` +helm install ./charts/kafka-lag-exporter \ + --name kafka-lag-exporter \ + --namespace myproject \ + --set watchers.strimzi=true +``` + +Install with statically defined cluster at the CLI. + +``` +helm install ./charts/kafka-lag-exporter \ + --name kafka-lag-exporter \ + --namespace myproject \ + --set clusters\[0\].name=my-cluster \ + --set clusters\[0\].bootstrapBrokers=my-cluster-kafka-bootstrap:9092 +``` + +Run a debug install (`DEBUG` logging, debug helm chart install, force docker pull policy to `Always`). + +``` +helm install ./charts/kafka-lag-exporter \ + --name kafka-lag-exporter \ + --namespace myproject \ + --set image.pullPolicy=Always \ + --set logLevel=DEBUG \ + --set clusters\[0\].name=my-cluster \ + --set clusters\[0\].bootstrapBrokers=my-cluster-kafka-bootstrap.myproject:9092 \ + --debug +``` + +### View the health endpoint + +To view the Prometheus health endpoint from outside your Kubernetes cluster, use `kubectl port-forward`. + +Ex) + +``` +kubectl port-forward service/kafka-lag-exporter-service 8080:8000 --namespace myproject +``` + +### Exporter logs + +To view the logs of the exporter, identify the pod name of the exporter and use the `kubectl logs` command. + +Ex) + +``` +kubectl logs {POD_ID} --namespace myproject -f +``` + +## Testing with local `docker-compose.yaml` + +A Docker Compose cluster with producers and multiple consumer groups is defined in `./docker/docker-compose.yaml`. This +is useful to manually test the project locally, without K8s infrastructure. These images are based on the popular +[`wurstmeister`](https://hub.docker.com/r/wurstmeister/kafka/) Apache Kafka Docker images. Confirm you match up the +version of these images with the correct version of Kafka you wish to test. + +Remove any previous volume state. + +``` +docker-compose rm -f +``` + +Start up the cluster in the foreground. + +``` +docker-compose up +``` + +## Strimzi Kafka Cluster Watcher + +When you install the chart with `--set watchers.strimzi=true` then the exporter will create a new `ClusterRole` and +`ClusterRoleBinding` to allow for the automatic discovery of [Strimzi](https://strimzi.io/) Kafka clusters. The exporter will watch for +`Kafka` resources to be created or destroyed. If the cluster already exists, or was created while the exporter was +online then it will automatically begin to collect consumer group metadata and export it. If a `Kafka` resource is +destroyed then it will stop collecting consumer group metadata for that cluster. + +The exporter will name the cluster the same as `Kafka` resources `metadata.name` field. + +## Grafana Dashboard + +A sample Grafana dashboard is provided in `./grafana/`. It can be imported into a Grafana server that is configured +with a Prometheus datasource that is reading the Kafka Lag Exporter's Prometheus health endpoint. + +The dashboard contains several high level user-configurable variables. + +* **Namespace** - The namespace of the Kafka cluster. Only 1 namespace can be selected at a time. +* **Cluster Name** - The name of the Kafka cluster. Only 1 cluster name can be selected at a time. +* **Consumer Group** - The name of the Consumer Group. This is a multi-select list which allows you to view the dashboard +for 1 to All consumer groups. + +This dashboard has several rows that are described below. + +* **All Consumer Group Lag** - A high level set of 4 panels. + * Max lag in seconds per group + * Lag in seconds per group partition + * Max lag in offsets per group + * Lag in offsets per group partition +* **Consumer Group Lag Per Group** - One panel for each consumer group that shows a sum of lag for all partitions on the +left Y axis. The right Y axis has the sum of latest and last consumed offsets for all group partitions. +* **Kafka Lag Exporter JVM Metrics** - Critical JVM metrics for the Kafka Lag Exporter itself. + +Example snapshot of dashboard showing all Consumer Groups (2) + +![Kafka Lag Exporter Dashboard](./grafana/example_dashboard_snapshot.png) + +## Release Process + +1. Update the Change Log +2. Run `./scripts/release.sh` which will do the following: + * Run `compile` and `test` targets. A pre-compile task will automatically update the version in the Helm Chart. + * Publish docker image to DockerHub at `lightbend/kafka-lag-exporter`. If not publishing to `lightbend` repository, + update `./build.sbt` file with the correct repository, or publish locally instead (`sbt docker:publishLocal`). + * Bundle Helm Chart into a tarball artifact. The `helm package` command will output the artifact in the CWD it is + executed from. +3. Upload the tarball to a Helm Chart Repository. + +## Change log + +0.4.0 + +* Add Integration tests using Embedded Kafka #11 +* Replace lag in time implementation with interpolation table implementation #5 +* Removed `spark-event-exporter`. See the [`spark-committer`](https://github.com/lightbend/spark-committer) GitHub +project to commit offsets in Spark Structured Streaming back to Kafka. #9 +* Implement backoff strategy for Kafka connections in Kafka Lag Exporter #6 + +0.3.6 + +* Add `kafka-client-timeout` config. +* Tune retry and timeout logic of Kafka admin client and consumer +* Use backoff strategy restarting offset collection logic when transient runtime exceptions are encountered +* Terminate when Prometheus HTTP server can't start (i.e. port can't be bound) + +0.3.1 + +* Default partition to 0 (instead of omitting it from being reported) when a consumer group returns no offset for a +group partition +* Use `akkaSource` for actor path in logging + +0.3.0 + +* Bugfix: Parse `poll-interval` in seconds +* Rename metric from `kafka_consumergroup_latest_offset` to `kafka_partition_latest_offset` +* Use JVM 8 experimental cgroup memory awareness flags when running exporter in container +* Use snakecase for metric label names +* Sample Grafana Dashboard + +0.2.0 + +* Strimzi cluster auto discovery -## [Spark Event Exporter](./spark-event-exporter/README.md) +0.1.0 -Spark Event Exporter is a library you can include in your Spark driver application which can output several performance -metrics including Kafka client lag, lag in seconds, last read offset, as well as input and processed records per -second per streaming source. See spark-event-exporter/README.md for more information. +* Initial release -This project was developed by [Sean Glover](https://github.com/seglo) at [Lightbend](https://www.lightbend.com). For a production-ready system with support for Kafka, Spark, with this project, Akka, Play, Lagom, and other tools on Kubernetes and OpenShift, see [Lightbend Platform](https://www.lightbend.com/lightbend-platform). diff --git a/build.sbt b/build.sbt index b681f190..d54f162c 100644 --- a/build.sbt +++ b/build.sbt @@ -2,62 +2,15 @@ import com.typesafe.sbt.packager.docker.{Cmd, ExecCmd} import Dependencies._ import com.typesafe.sbt.packager.docker.DockerPlugin.autoImport.{dockerCommands, dockerUsername} -lazy val root = - Project(id = "root", base = file(".")) - .settings( - name := "root", - skip in publish := true - ) - .withId("root") - .settings(commonSettings) - .aggregate( - kafkaMetricsTools, - kafkaLagExporter, -// sparkEventExporter - ) - -lazy val kafkaMetricsTools = - module("kafka-metrics-tools") - .settings( - description := "Tools to help get and report Kafka client metrics", - libraryDependencies ++= Vector( - Kafka, - AkkaTyped, - AkkaSlf4j, - Logback, - Prometheus, - PrometheusHotSpot, - PrometheusHttpServer, - DropwizardMetrics, - ScalaJava8Compat, - ScalaTest, - AkkaTypedTestKit, - MockitoScala - ) - ) - -//lazy val sparkEventExporter = -// module("spark-event-exporter") -// .dependsOn(kafkaMetricsTools % "compile->compile;test->test") -// .settings( -// description := "Spark event exporter exposes offset and throughput related metrics for Spark Streaming apps", -// libraryDependencies ++= Vector( -// Spark, -// SparkSql, -// ScalaTest, -// AkkaTypedTestKit, -// MockitoScala -// ) -// ) - lazy val updateHelmChartVersions = taskKey[Unit]("Update Helm Chart versions") lazy val kafkaLagExporter = - module("kafka-lag-exporter") - .dependsOn(kafkaMetricsTools % "compile->compile;test->test") + Project(id = "kafka-lag-exporter", base = file(".")) .enablePlugins(JavaAppPackaging) .enablePlugins(DockerPlugin) + .settings(commonSettings) .settings( + name := "kafka-lag-exporter", description := "Kafka lag exporter finds and reports Kafka consumer group lag metrics", libraryDependencies ++= Vector( LightbendConfig, @@ -66,6 +19,10 @@ lazy val kafkaLagExporter = AkkaSlf4j, Fabric8Model, Fabric8Client, + Prometheus, + PrometheusHotSpot, + PrometheusHttpServer, + ScalaJava8Compat, Logback, ScalaTest, AkkaTypedTestKit, @@ -97,9 +54,7 @@ lazy val kafkaLagExporter = ) lazy val commonSettings = Seq( - organization := "com.lightbend.kafka", - bintrayOrganization := Some("lightbend"), - bintrayRepository := "pipelines-internal", + organization := "com.lightbend.kafkalagexporter", publishMavenStyle := false, bintrayOmitLicense := true, scalaVersion := Version.Scala, @@ -119,12 +74,3 @@ lazy val commonSettings = Seq( scalacOptions in (Compile, console) := (scalacOptions in (Global)).value.filter(_ == "-Ywarn-unused-import"), scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value, ) - -def module(moduleId: String): Project = { - Project(id = moduleId, base = file(moduleId)) - .settings( - name := moduleId - ) - .withId(moduleId) - .settings(commonSettings) -} \ No newline at end of file diff --git a/kafka-lag-exporter/README.md b/kafka-lag-exporter/README.md deleted file mode 100644 index cc2d8ab8..00000000 --- a/kafka-lag-exporter/README.md +++ /dev/null @@ -1,201 +0,0 @@ -# Kafka Lag Exporter - -> A Kafka consumer group lag exporter for Kubernetes - -The Kafka Lag Exporter is a Prometheus Exporter which will calculate the consumer lag for all consumer groups running -in a Kafka cluster. It exports several consumer group related metrics, including an extrapolation of consumer group -lag in seconds. - -We can calculate a reasonable approximation of consumer lag in seconds by applying a linear extrapolation formula to -predict the time that a consumer will reach the latest partition offset available based on previously measured -consumer group consumed offsets for the same partition. - -For each poll interval we associate all the latest consumed offsets with current system time (wall clock). After at -least two measurements are made we can extrapolate at what time an arbitrary offset in the future will be consumed. As -a refresher, linear interpolation and extrapolation is just estimating a point on a slope and estimating its -coordinates. [Read this post for more details.](https://math.tutorvista.com/calculus/extrapolation.html) - -## Metrics - -The following metrics are exposed: - -* `kafka_consumergroup_group_lag_seconds` - Extrapolated lag in seconds for each partition. -* `kafka_consumergroup_group_max_lag_seconds` - Max extrapolated lag in seconds for each consumer group. -* `kafka_consumergroup_group_lag` - Lag in offsets for each partition (latest offset - last consumed offset). -* `kafka_consumergroup_group_max_lag` - Max offset lag for each consumer group. -* `kafka_consumergroup_group_offset` - Last consumed offset for each consumer group partition. -* `kafka_partition_latest_offset` - Latest offset available for each partition. - -## Configuration - -Details for configuration for the Helm Chart can be found in the [`values.yaml`](./charts/kafka-lag-exporter/values.yaml) -file of the accompanying Helm Chart. - -## Install with Helm - -You can install the chart from the local filesystem. - -``` -helm install ./charts/kafka-lag-exporter -``` - -Or from the lightbend Helm Chart repository. - -``` -helm repo add lightbend https://repo.lightbend.com/helm-charts -helm install lightbend/kafka-lag-exporter -``` - -### Examples - -Install with the [Strimzi](https://strimzi.io/) Kafka discovery feature. -See [Strimzi Kafka Cluster Watcher](#strimzi-kafka-cluster-watcher) for more details. - -``` -helm install ./charts/kafka-lag-exporter \ - --name kafka-lag-exporter \ - --namespace myproject \ - --set watchers.strimzi=true -``` - -Install with statically defined cluster at the CLI. - -``` -helm install ./charts/kafka-lag-exporter \ - --name kafka-lag-exporter \ - --namespace myproject \ - --set clusters\[0\].name=my-cluster \ - --set clusters\[0\].bootstrapBrokers=my-cluster-kafka-bootstrap:9092 -``` - -Run a debug install (`DEBUG` logging, debug helm chart install, force docker pull policy to `Always`). - -``` -helm install ./charts/kafka-lag-exporter \ - --name kafka-lag-exporter \ - --namespace myproject \ - --set image.pullPolicy=Always \ - --set logLevel=DEBUG \ - --set clusters\[0\].name=my-cluster \ - --set clusters\[0\].bootstrapBrokers=my-cluster-kafka-bootstrap.myproject:9092 \ - --debug -``` - -### View the health endpoint - -To view the Prometheus health endpoint from outside your Kubernetes cluster, use `kubectl port-forward`. - -Ex) - -``` -kubectl port-forward service/kafka-lag-exporter-service 8080:8000 --namespace myproject -``` - -### Exporter logs - -To view the logs of the exporter, identify the pod name of the exporter and use the `kubectl logs` command. - -Ex) - -``` -kubectl logs {POD_ID} --namespace myproject -f -``` - -## Testing with local `docker-compose.yaml` - -A Docker Compose cluster with producers and multiple consumer groups is defined in `./docker/docker-compose.yaml`. This -is useful to manually test the project locally, without K8s infrastructure. These images are based on the popular -[`wurstmeister`](https://hub.docker.com/r/wurstmeister/kafka/) Apache Kafka Docker images. Confirm you match up the -version of these images with the correct version of Kafka you wish to test. - -Remove any previous volume state. - -``` -docker-compose rm -f -``` - -Start up the cluster in the foreground. - -``` -docker-compose up -``` - -## Strimzi Kafka Cluster Watcher - -When you install the chart with `--set watchers.strimzi=true` then the exporter will create a new `ClusterRole` and -`ClusterRoleBinding` to allow for the automatic discovery of [Strimzi](https://strimzi.io/) Kafka clusters. The exporter will watch for -`Kafka` resources to be created or destroyed. If the cluster already exists, or was created while the exporter was -online then it will automatically begin to collect consumer group metadata and export it. If a `Kafka` resource is -destroyed then it will stop collecting consumer group metadata for that cluster. - -The exporter will name the cluster the same as `Kafka` resources `metadata.name` field. - -## Grafana Dashboard - -A sample Grafana dashboard is provided in `./grafana/`. It can be imported into a Grafana server that is configured -with a Prometheus datasource that is reading the Kafka Lag Exporter's Prometheus health endpoint. - -The dashboard contains several high level user-configurable variables. - -* **Namespace** - The namespace of the Kafka cluster. Only 1 namespace can be selected at a time. -* **Cluster Name** - The name of the Kafka cluster. Only 1 cluster name can be selected at a time. -* **Consumer Group** - The name of the Consumer Group. This is a multi-select list which allows you to view the dashboard -for 1 to All consumer groups. - -This dashboard has several rows that are described below. - -* **All Consumer Group Lag** - A high level set of 4 panels. - * Max lag in seconds per group - * Lag in seconds per group partition - * Max lag in offsets per group - * Lag in offsets per group partition -* **Consumer Group Lag Per Group** - One panel for each consumer group that shows a sum of lag for all partitions on the -left Y axis. The right Y axis has the sum of latest and last consumed offsets for all group partitions. -* **Kafka Lag Exporter JVM Metrics** - Critical JVM metrics for the Kafka Lag Exporter itself. - -Example snapshot of dashboard showing all Consumer Groups (2) - -![Kafka Lag Exporter Dashboard](./grafana/example_dashboard_snapshot.png) - -## Release Process - -1. Update the Change Log -2. Run `./scripts/release.sh` which will do the following: - * Run `compile` and `test` targets. A pre-compile task will automatically update the version in the Helm Chart. - * Publish docker image to DockerHub at `lightbend/kafka-lag-exporter`. If not publishing to `lightbend` repository, - update `./build.sbt` file with the correct repository, or publish locally instead (`sbt docker:publishLocal`). - * Bundle Helm Chart into a tarball artifact. The `helm package` command will output the artifact in the CWD it is - executed from. -3. Upload the tarball to a Helm Chart Repository. - -## Change log - -0.3.6 - -* Add `kafka-client-timeout` config. -* Tune retry and timeout logic of Kafka admin client and consumer -* Use backoff strategy restarting offset collection logic when transient runtime exceptions are encountered -* Terminate when Prometheus HTTP server can't start (i.e. port can't be bound) - -0.3.1 - -* Default partition to 0 (instead of omitting it from being reported) when a consumer group returns no offset for a -group partition -* Use `akkaSource` for actor path in logging - -0.3.0 - -* Bugfix: Parse `poll-interval` in seconds -* Rename metric from `kafka_consumergroup_latest_offset` to `kafka_partition_latest_offset` -* Use JVM 8 experimental cgroup memory awareness flags when running exporter in container -* Use snakecase for metric label names -* Sample Grafana Dashboard - -0.2.0 - -* Strimzi cluster auto discovery - -0.1.0 - -* Initial release - diff --git a/kafka-lag-exporter/src/main/resources/logback.xml b/kafka-lag-exporter/src/main/resources/logback.xml deleted file mode 100644 index 7b4e5175..00000000 --- a/kafka-lag-exporter/src/main/resources/logback.xml +++ /dev/null @@ -1,15 +0,0 @@ - - - - - - - %date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg %ex%n - - - - - - - - \ No newline at end of file diff --git a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/CodahaleMetricsSink.scala b/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/CodahaleMetricsSink.scala deleted file mode 100644 index 166d7734..00000000 --- a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/CodahaleMetricsSink.scala +++ /dev/null @@ -1,54 +0,0 @@ -package com.lightbend.kafka.kafkametricstools - -import com.codahale.metrics.{Gauge, MetricRegistry} -import com.lightbend.kafka.kafkametricstools.CodahaleMetricsSink.SparkGauge -import com.lightbend.kafka.kafkametricstools.MetricsSink.{Metric, MetricDefinitions} - -object CodahaleMetricsSink { - def apply(registry: MetricRegistry, definitions: MetricDefinitions, newMetricRegistered: () => Unit): MetricsSink = - new CodahaleMetricsSink(registry, definitions, newMetricRegistered) - - class SparkGauge extends Gauge[Double] { - private var _value: Double = 0 - def setValue(value: Double): Unit = _value = value - override def getValue: Double = _value - } -} - -class CodahaleMetricsSink private(registry: MetricRegistry, definitions: MetricDefinitions, newMetricRegistered: () => Unit) - extends MetricsSink { - - private def upsertGauge(metricType: Class[_], labels: List[String]): SparkGauge = { - def newGauge(name: String): SparkGauge = { - val gauge = new SparkGauge - registry.register(name, gauge) - newMetricRegistered() - gauge - } - - val defn = definitions.getOrElse(metricType, throw new IllegalArgumentException(s"No metric with type $metricType defined")) - val metricName = encodeNameWithLabels(labels, defn) - - if (registry.getGauges.containsKey(metricName)) - registry.getGauges().get(metricName).asInstanceOf[SparkGauge] - else - newGauge(metricName) - } - - /** - * Encodes label names and values into the metric name to make parsing easier downstream. - * i.e. label_one=some-value,label_two=some-other.value,label_three=yet-another-value - */ - private def encodeNameWithLabels( - labels: List[String], - defn: MetricsSink.GaugeDefinition - ) = { - defn.name + "," + defn.label - .zip(labels) - .map { case (name, value) => s"$name=$value" } - .mkString(",") - } - override def report(m: Metric): Unit = { - upsertGauge(m.getClass, m.labels).setValue(m.value) - } -} diff --git a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/Config.scala b/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/Config.scala deleted file mode 100644 index fee57d99..00000000 --- a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/Config.scala +++ /dev/null @@ -1,11 +0,0 @@ -package com.lightbend.kafka.kafkametricstools - -import scala.concurrent.duration.FiniteDuration - -trait SimpleConfig { - def pollInterval: FiniteDuration - def port: Int - def clientGroupId: String -} - -final case class KafkaCluster(name: String, bootstrapBrokers: String) diff --git a/kafka-metrics-tools/src/test/scala/com/lightbend/kafka/kafkametricstools/CodahaleMetricsSinkSpec.scala b/kafka-metrics-tools/src/test/scala/com/lightbend/kafka/kafkametricstools/CodahaleMetricsSinkSpec.scala deleted file mode 100644 index 0480b07a..00000000 --- a/kafka-metrics-tools/src/test/scala/com/lightbend/kafka/kafkametricstools/CodahaleMetricsSinkSpec.scala +++ /dev/null @@ -1,47 +0,0 @@ -package com.lightbend.kafka.kafkametricstools -import com.codahale.metrics.MetricRegistry -import com.lightbend.kafka.kafkametricstools.MetricsSink.{GaugeDefinition, Message, Metric, MetricDefinitions} -import org.scalatest.{FlatSpec, Matchers} - -import scala.collection.JavaConverters._ - -class CodahaleMetricsSinkSpec extends FlatSpec with Matchers { - case class TestMetric(clusterName: String, providedName: String, topic: String, partition: Int, value: Double) - extends Message with Metric { - override def labels: List[String] = - List( - clusterName, - providedName, - topic, - partition.toString - ) - } - val metricDefinitions: MetricDefinitions = Map( - classOf[TestMetric] -> GaugeDefinition( - "test_metric", - "Test Metric", - "cluster_name", "provided_name", "topic", "partition" - ) - ) - "CodahaleMetricsSink" should "delimit metric labels correctly" in { - - val newMetricRegistered = () => {} - val registry = new MetricRegistry - - val sink = CodahaleMetricsSink( - registry, - metricDefinitions, - newMetricRegistered - ) - - val m = TestMetric("pipelines-strimzi", "application.streamlet.inlet", "application.otherstreamlet.outlet", 0, 47) - - sink.report(m) - - val (gaugeName, gauge) = registry.getGauges().asScala.head - - gauge shouldNot be(null) - gauge.getValue should be(47) - gaugeName should be("test_metric,cluster_name=pipelines-strimzi,provided_name=application.streamlet.inlet,topic=application.otherstreamlet.outlet,partition=0") - } -} diff --git a/lib/jmx-prometheus-exporter-config.yaml b/lib/jmx-prometheus-exporter-config.yaml deleted file mode 100644 index c09b3eaa..00000000 --- a/lib/jmx-prometheus-exporter-config.yaml +++ /dev/null @@ -1,15 +0,0 @@ ---- -startDelaySeconds: 10 -lowercaseOutputName: true -lowercaseOutputLabelNames: true -#whitelistObjectNames: ["org.apache.cassandra.metrics:*"] -#blacklistObjectNames: ["org.apache.cassandra.metrics:type=ColumnFamily,*"] -#rules: -#- pattern: 'org.apache.cassandra.metrics<>Value: (\d+)' -# name: cassandra_$1_$2 -# value: $3 -# valueFactor: 0.001 -# labels: {} -# help: "Cassandra metric $1 $2" -# type: GAUGE -# attrNameSnakeCase: false \ No newline at end of file diff --git a/lib/jmx_prometheus_javaagent-0.3.1.jar b/lib/jmx_prometheus_javaagent-0.3.1.jar deleted file mode 100644 index 1408b9f3..00000000 Binary files a/lib/jmx_prometheus_javaagent-0.3.1.jar and /dev/null differ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 258c3381..0c2b33ad 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -5,17 +5,23 @@ object Version { val Akka = "2.5.20" val Prometheus = "0.5.0" val Fabric8 = "4.1.0" - val Spark = "2.4.0" } object Dependencies { - val LightbendConfig = "com.typesafe" % "config" % "1.3.2" /* * Dependencies on com.fasterxml.jackson.core:jackson-databind and com.fasterxml.jackson.core:jackson-core - * conflict with Spark, so in spark-event-exporter exclude all transient dependencies in the - * com.fasterxml.jackson.core organization. + * conflict with Spark and Kafka, so in spark-committer exclude all transient dependencies in the + * com.fasterxml.jackson.core organization from Kafka deps. + * + * Kafka dependencies on Java logging frameworks interfere with correctly setting app logging in spark-committer's + * integration tests. */ - val Kafka = "org.apache.kafka" %% "kafka" % "2.1.0" excludeAll ExclusionRule("com.fasterxml.jackson.core") + private val jacksonExclusionRule = ExclusionRule("com.fasterxml.jackson.core") + private val log4jExclusionRule = ExclusionRule("log4j") + private val slf4jExclusionRule = ExclusionRule("org.slf4j") + + val LightbendConfig = "com.typesafe" % "config" % "1.3.2" + val Kafka = "org.apache.kafka" %% "kafka" % "2.1.0" excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule) val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3" @@ -24,14 +30,11 @@ object Dependencies { val PrometheusHttpServer = "io.prometheus" % "simpleclient_httpserver" % Version.Prometheus val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8 val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8 - val DropwizardMetrics = "io.dropwizard.metrics" % "metrics-core" % "3.1.5" - val Spark = "org.apache.spark" %% "spark-core" % Version.Spark - val SparkSql = "org.apache.spark" %% "spark-sql" % Version.Spark val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0" val ScalaTest = "org.scalatest" %% "scalatest" % "3.0.5" % Test val AkkaTypedTestKit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % Version.Akka % Test val MockitoScala = "org.mockito" %% "mockito-scala" % "1.0.8" % Test - val AlpakkaKafkaTestKit = "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "1.0.1" % Test + val AlpakkaKafkaTestKit = "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "1.0.1" % Test excludeAll(jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule) val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.8" % Test } \ No newline at end of file diff --git a/spark-event-exporter/README.md b/spark-event-exporter/README.md deleted file mode 100644 index 274aefd3..00000000 --- a/spark-event-exporter/README.md +++ /dev/null @@ -1,50 +0,0 @@ -# Spark Event Exporter - -> Exports metrics from data in Spark's `StreamingQueryListener` events - -Spark Event Exporter is a library you can include in your Spark driver application that can output several performance -metrics including Kafka client lag, lag in seconds, last read offset, as well as input and processed records per -second per streaming source. - -Spark Event Exporter uses the Spark `StreamingQueryListener` to obtain this event data that can then be directly -exported as a Prometheus endpoint, or integrated into Spark's own metrics system as custom metrics for the Spark -driver application. - -The following metrics are exported. - -* `spark_kafka_last_offset` - The last offset read for a partition. -* `spark_kafka_last_offset_lag` - The lag for a partition. -* `spark_kafka_last_offset_lag_seconds` - The extrapolated lag in seconds for a partition. -* `spark_kafka_input_records_per_second` - The input records per second for a Spark streaming source. -* `spark_kafka_processed_records_per_second` - The processed records per second for a Spark streaming source. - -## Documentation TODO: - -- Configuration -- Metrics sink configuration -- Metric labels - -## Building Yourself - -[SBT] is used to build this project. - -* Run `sbt test` to compile everything and run the tests -* Run `sbt package` to create the binary artifacts -* Run `sbt publishLocal` to create artifacts and publish to your local Ivy repository -* Run `sbt publishM2` to create artifacts and publish to your local Maven repository -* Run `sbt release` to upgrade to the next version and publish Ivy artifacts to bintray - -## Change log - -0.3.6 - -* Final refinements before open sourcing the project. - -0.3.5 - -* Encode labels into Codahale metric names using key=value convention to make them easier to parse. - -0.3.2 - -* Initial release - diff --git a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/Config.scala b/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/Config.scala deleted file mode 100644 index 5c414b22..00000000 --- a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/Config.scala +++ /dev/null @@ -1,46 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter -import java.util.UUID - -import com.lightbend.kafka.kafkametricstools.KafkaCluster -import org.apache.spark.SparkEnv -import org.apache.spark.sql.SparkSession -import scala.concurrent.duration._ - -sealed trait MetricsSinkConfig - -/** - * Exposes an internal prometheus HTTP metrics endpoint - */ -final case class PrometheusEndpointSinkConfig(port: Int = 8080) extends MetricsSinkConfig - -/** - * Uses Spark's existing metrics system. This will result in a lack of fidelity in terms of the number of labels/tags - * that can be expressed per metric. - */ -case object SparkMetricsSinkConfig extends MetricsSinkConfig - -final case class Config( - cluster: KafkaCluster, - providedName: String, - sparkSession: SparkSession, - sparkEnv: SparkEnv, - metricsSink: MetricsSinkConfig, - kafkaClientTimeout: FiniteDuration = 10 seconds, - clientGroupId: String = s"spark-event-exporter-${UUID.randomUUID()}" - ) { - require(cluster.bootstrapBrokers != null && cluster.bootstrapBrokers != "", - "You must provide the Kafka bootstrap brokers connection string") - require(sparkSession != null, "You must provide a SparkSession object") - - override def toString: String = { - s""" - |Kafka cluster: - | Name: ${cluster.name} - | Bootstrap brokers: ${cluster.bootstrapBrokers} - |Provided name: $providedName - |Metrics sink: $metricsSink - |Kafka client timeout ms: ${kafkaClientTimeout.toMillis} - |Client consumer group id: $clientGroupId - """.stripMargin - } -} diff --git a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/SparkEventExporter.scala b/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/SparkEventExporter.scala deleted file mode 100644 index de949014..00000000 --- a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/SparkEventExporter.scala +++ /dev/null @@ -1,69 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter - -import java.util.concurrent.Executors - -import akka.actor.typed.{ActorSystem, Terminated} -import com.lightbend.kafka.kafkametricstools.{CodahaleMetricsSink, KafkaClient, MetricsSink, PrometheusEndpointSink} -import com.lightbend.kafka.sparkeventexporter.internal.{ExporterManager, Metrics} -import org.apache.spark.lightbend.sparkeventexporter.SparkEventExporterSource - -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext} - -object SparkEventExporter { - /** - * Create a new SparkEventExporter instance with the provided configuration - */ - def apply(config: Config): SparkEventExporter = new SparkEventExporter(config) - def initialize(config: Config): SparkEventExporter = apply(config) -} - -final class SparkEventExporter(config: Config) { - import config._ - - // Cached thread pool for various Kafka calls for non-blocking I/O - private val kafkaClientEc = ExecutionContext.fromExecutor(Executors.newCachedThreadPool()) - - private val clientCreator = () => KafkaClient(cluster.bootstrapBrokers, clientGroupId, kafkaClientTimeout)(kafkaClientEc) - - private val metricsSinkCreator = () => metricsSink match { - case PrometheusEndpointSinkConfig(port) => PrometheusEndpointSink(port, Metrics.metricDefinitions) - case SparkMetricsSinkConfig => createCodahaleMetricsSink - } - - private val system = ActorSystem(ExporterManager.init(config, cluster, metricsSinkCreator, clientCreator), "spark-event-exporter") - - private def createCodahaleMetricsSink: MetricsSink = { - val sparkEventSource = new SparkEventExporterSource() - - /* - * Due to limitations with the dropwizard metrics library we must encode metric labels such as "topic" and "partition" - * into the metric name itself, because labels/tags are naturally supported. This requires us to register metrics - * at runtime, but they're only added to Spark's internal metric registry when a metric `Source` is registered. - * Therefore we have to re-register the whole set of metrics, in order to pick up new metrics that are created. This - * can be done by removing and registering the source again each time a new metric is added. - * - * This could be optimized by re-registering metrics based on a timer, but since an application's is likely only - * going to generate new metrics at the beginning of its lifecycle it probably doesn't have much of a performance - * impact. - */ - val newMetricRegistered = () => { - sparkEnv.metricsSystem.removeSource(sparkEventSource) - sparkEnv.metricsSystem.registerSource(sparkEventSource) - } - - CodahaleMetricsSink( - sparkEventSource.metricRegistry, - Metrics.metricDefinitions, - newMetricRegistered - ) - } - - /** - * Synchronous call to stop the SparkEventExporter actor system - */ - def stop(timeout: FiniteDuration = 5 seconds): Terminated = { - system ! ExporterManager.Stop - Await.result(system.whenTerminated, timeout) - } -} diff --git a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/Domain.scala b/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/Domain.scala deleted file mode 100644 index 63c47c31..00000000 --- a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/Domain.scala +++ /dev/null @@ -1,12 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter.internal -import com.lightbend.kafka.kafkametricstools.Domain.{Measurements, TopicPartition} - -object Domain { - final case class SourceMetrics( - inputRecordsPerSecond: Double, - outputRecordsPerSecond: Double, - endOffsets: Map[TopicPartition, Measurements.Single] - ) - - final case class Query(sparkAppId: String, timestamp: Long, sourceMetrics: List[SourceMetrics]) -} diff --git a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/ExporterManager.scala b/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/ExporterManager.scala deleted file mode 100644 index 389ff5d5..00000000 --- a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/ExporterManager.scala +++ /dev/null @@ -1,49 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter.internal -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} -import com.lightbend.kafka.kafkametricstools.KafkaClient.KafkaClientContract -import com.lightbend.kafka.kafkametricstools.{KafkaCluster, MetricsReporter, MetricsSink} -import com.lightbend.kafka.sparkeventexporter.Config -import com.lightbend.kafka.sparkeventexporter.internal.MetricCollector.CollectorState -import org.apache.spark.sql.streaming.StreamingQueryListener -import scala.concurrent.duration._ - -object ExporterManager { - sealed trait Message - sealed trait Stop extends Message - final case object Stop extends Stop - - def init( - config: Config, - cluster: KafkaCluster, - metricsSinkCreator: () => MetricsSink, - clientCreator: () => KafkaClientContract): Behavior[Message] = - Behaviors.supervise[Message] { - Behaviors.setup[Message] { context => - context.log.info("Starting Spark Events Exporter with configuration: \n{}", config) - - val metricsSinkInst = metricsSinkCreator() - val reporter: ActorRef[MetricsSink.Message] = - context.spawn(MetricsReporter.init(metricsSinkInst), "lag-reporter") - val collectorState = CollectorState(config.providedName, cluster) - val collector: ActorRef[MetricCollector.Message] = context.spawn( - MetricCollector.init(collectorState, clientCreator, reporter),"offset-collector") - - val listener: StreamingQueryListener = - MetricsStreamingQueryListener(collector) - config.sparkSession.streams.addListener(listener) - - main(reporter, collector) - } - }.onFailure(SupervisorStrategy.restartWithBackoff(200 milliseconds, 30 seconds, 0.2)) - - def main( - reporter: ActorRef[MetricsSink.Message], - collector: ActorRef[MetricCollector.Message]): Behavior[Message] = Behaviors.receive { - case (context, _: Stop) => - context.log.info("Attempting graceful shutdown") - collector ! MetricCollector.Stop - reporter ! MetricsSink.Stop - Behaviors.stopped - } -} diff --git a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/MetricCollector.scala b/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/MetricCollector.scala deleted file mode 100644 index 9007d460..00000000 --- a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/MetricCollector.scala +++ /dev/null @@ -1,142 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter.internal -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ActorRef, Behavior, PostStop} -import com.lightbend.kafka.kafkametricstools.Domain.{Measurements, PartitionOffsets, TopicPartition} -import com.lightbend.kafka.kafkametricstools.KafkaClient.KafkaClientContract -import com.lightbend.kafka.kafkametricstools.{KafkaCluster, MetricsSink} -import com.lightbend.kafka.sparkeventexporter.internal.Domain.{Query, SourceMetrics} - -import scala.util.{Failure, Success} - -object MetricCollector { - sealed trait Message - sealed trait Stop extends Message - final case object Stop extends Stop - - final case class QueryResults(query: Query) extends Message - - final case class NewOffsets( - sparkAppId: String, - sourceMetrics: List[SourceMetrics], - latestOffsets: PartitionOffsets, - lastOffsets: PartitionOffsets, - ) extends Message - - - final case class CollectorState( - name: String, - cluster: KafkaCluster, - latestOffsets: PartitionOffsets = PartitionOffsets(), - lastOffsets: PartitionOffsets = PartitionOffsets(), - ) - - def init(state: CollectorState, - clientCreator: () => KafkaClientContract, - reporter: ActorRef[MetricsSink.Message]): Behavior[Message] = Behaviors.setup { _ => - collector(clientCreator(), reporter, state) - } - - def collector( - client: KafkaClientContract, - reporter: ActorRef[MetricsSink.Message], - state: CollectorState - ): Behavior[Message] = Behaviors.receive { - case (context, queryResults: QueryResults) => - context.log.debug(s"Processing Spark Event Metrics:\n$queryResults") - - val sourceMetrics = queryResults.query.sourceMetrics - val sparkAppId = queryResults.query.sparkAppId - - val topicPartitions: Set[TopicPartition] = for { - source: Domain.SourceMetrics <- sourceMetrics.toSet - (topicPartition, _) <- source.endOffsets - } yield topicPartition - - val lastOffsets = sourceMetrics.foldLeft(PartitionOffsets()) { (acc, source) => - acc ++ source.endOffsets - } - - client.getLatestOffsets(queryResults.query.timestamp, topicPartitions) match { - case Success(latestOffsets) => - val newOffsets = NewOffsets(sparkAppId, sourceMetrics, latestOffsets, lastOffsets) - context.self ! newOffsets - case Failure(e) => - context.log.error(e, "An error occurred while retrieving latest offsets") - context.self ! Stop - } - - Behaviors.same - case (context, newOffsets: NewOffsets) => - val mergedState = mergeState(state, newOffsets) - - context.log.debug("Reporting Metrics") - reportThroughputMetrics(newOffsets.sparkAppId, newOffsets.sourceMetrics, state, reporter) - //reportLatestOffsetMetrics(newOffsets.sparkAppId, mergedState, reporter) - reportConsumerMetrics(newOffsets.sparkAppId, mergedState, reporter) - - collector(client, reporter, mergedState) - case (context, _: Stop) => - Behaviors.stopped { - Behaviors.receiveSignal { - case (_, PostStop) => - client.close() - context.log.info("Gracefully stopped polling and Kafka client for cluster: {}", state.cluster.name) - Behaviors.same - } - } - } - - private def reportThroughputMetrics( - sparkAppId: String, - sourceMetrics: List[SourceMetrics], - state: CollectorState, - reporter: ActorRef[MetricsSink.Message]): Unit = { - for(sourceMetric <- sourceMetrics) { - /* - * A Spark Query subscription could contain more than 1 topic, but throughput is only available per Source - */ - val sourceTopics = sourceMetric.endOffsets.keys.map(_.topic).toSet.mkString(",") - reporter ! Metrics.InputRecordsPerSecondMetric(state.cluster.name, sparkAppId, state.name, sourceTopics, sourceMetric.inputRecordsPerSecond) - reporter ! Metrics.ProcessedRecordsPerSecondMetric(state.cluster.name, sparkAppId, state.name, sourceTopics, sourceMetric.outputRecordsPerSecond) - } - } - - private def reportConsumerMetrics( - sparkAppId: String, - state: CollectorState, - reporter: ActorRef[MetricsSink.Message] - ): Unit = { - for { - (tp, measurement: Measurements.Double) <- state.lastOffsets - latestOffset <- state.latestOffsets.get(tp) - } { - val offsetLag = measurement.offsetLag(latestOffset.offset) - val timeLag = measurement.timeLag(latestOffset.offset) - - reporter ! Metrics.LastOffsetMetric(state.cluster.name, sparkAppId, state.name, tp, measurement.b.offset) - reporter ! Metrics.OffsetLagMetric(state.cluster.name, sparkAppId, state.name, tp, offsetLag) - reporter ! Metrics.TimeLagMetric(state.cluster.name, sparkAppId, state.name, tp, timeLag) - } - } - -// private def reportLatestOffsetMetrics( -// sparkAppId: String, -// state: CollectorState, -// reporter: ActorRef[MetricsSink.Message] -// ): Unit = { -// for ((tp, measurement: Measurements.Single) <- state.latestOffsets) -// reporter ! Metrics.LatestOffsetMetric(state.cluster.name, sparkAppId, state.name, tp, measurement.offset) -// } - - private def mergeState(state: CollectorState, newOffsets: NewOffsets): CollectorState = { - val mergedOffsets: PartitionOffsets = for { - (topicPartition, newMeasurement: Measurements.Single) <- newOffsets.lastOffsets - } yield { - topicPartition -> state.lastOffsets - .get(topicPartition) - .map(measurement => measurement.addMeasurement(newMeasurement)) - .getOrElse(newMeasurement) - } - state.copy(latestOffsets = newOffsets.latestOffsets, lastOffsets = mergedOffsets) - } -} diff --git a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/Metrics.scala b/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/Metrics.scala deleted file mode 100644 index 2e1407ad..00000000 --- a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/Metrics.scala +++ /dev/null @@ -1,75 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter.internal -import com.lightbend.kafka.kafkametricstools.Domain.TopicPartition -import com.lightbend.kafka.kafkametricstools.MetricsSink.{GaugeDefinition, Message, Metric, MetricDefinitions} - -object Metrics { - sealed trait SparkOffsetMetric extends Message with Metric { - def clusterName: String - def sparkAppId: String - def name: String - def topicPartition: TopicPartition - - override def labels: List[String] = - List( - clusterName, - name, - topicPartition.topic, - topicPartition.partition.toString - ) - } - - final case class LatestOffsetMetric(clusterName: String, sparkAppId: String, name: String, topicPartition: TopicPartition, value: Double) extends SparkOffsetMetric - final case class LastOffsetMetric(clusterName: String, sparkAppId: String, name: String, topicPartition: TopicPartition, value: Double) extends SparkOffsetMetric - final case class OffsetLagMetric(clusterName: String, sparkAppId: String, name: String, topicPartition: TopicPartition, value: Double) extends SparkOffsetMetric - final case class TimeLagMetric(clusterName: String, sparkAppId: String, name: String, topicPartition: TopicPartition, value: Double) extends SparkOffsetMetric - - sealed trait SparkThroughputMetric extends Message with Metric { - def clusterName: String - def sparkAppId: String - def name: String - def sourceTopics: String - - override def labels: List[String] = - List( - clusterName, - name, - sourceTopics - ) - } - - final case class InputRecordsPerSecondMetric(clusterName: String, sparkAppId: String, name: String, sourceTopics: String, value: Double) extends SparkThroughputMetric - final case class ProcessedRecordsPerSecondMetric(clusterName: String, sparkAppId: String, name: String, sourceTopics: String, value: Double) extends SparkThroughputMetric - - val metricDefinitions: MetricDefinitions = Map( - classOf[LatestOffsetMetric] -> GaugeDefinition( - "spark_kafka_partition_latest_offset", - "Latest offset of a partition", - "cluster_name", "provided_name", "topic", "partition" - ), - classOf[LastOffsetMetric] -> GaugeDefinition( - "spark_kafka_last_offset", - "Last consumed offset of a partition", - "cluster_name", "provided_name", "topic", "partition" - ), - classOf[OffsetLagMetric] -> GaugeDefinition( - "spark_kafka_last_offset_lag", - "Last consumed offset lag of a partition", - "cluster_name", "provided_name", "topic", "partition" - ), - classOf[TimeLagMetric] -> GaugeDefinition( - "spark_kafka_last_offset_lag_seconds", - "Last consumed offset time lag of a partition", - "cluster_name", "provided_name", "topic", "partition" - ), - classOf[InputRecordsPerSecondMetric] -> GaugeDefinition( - "spark_kafka_input_records_per_second", - "Input records per second for a source", - "cluster_name", "provided_name", "source_topics" - ), - classOf[ProcessedRecordsPerSecondMetric] -> GaugeDefinition( - "spark_kafka_processed_records_per_second", - "Processed records per second for a source", - "cluster_name", "provided_name", "source_topics" - ) - ) -} diff --git a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/MetricsStreamingQueryListener.scala b/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/MetricsStreamingQueryListener.scala deleted file mode 100644 index 16f57027..00000000 --- a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/MetricsStreamingQueryListener.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter.internal - -import akka.actor.typed.ActorRef -import org.apache.spark.sql.streaming.StreamingQueryListener -import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} - -object MetricsStreamingQueryListener { - def apply(collector: ActorRef[MetricCollector.Message]): MetricsStreamingQueryListener = - new MetricsStreamingQueryListener(collector) -} - -final class MetricsStreamingQueryListener private[internal](collector: ActorRef[MetricCollector.Message]) extends StreamingQueryListener() { - override def onQueryStarted(event: QueryStartedEvent): Unit = () - override def onQueryTerminated(event: QueryTerminatedEvent): Unit = () - override def onQueryProgress(event: QueryProgressEvent): Unit = { - val results = SparkEventAdapter.parseProgress(event.progress) - collector ! MetricCollector.QueryResults(results) - } -} diff --git a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/SparkEventAdapter.scala b/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/SparkEventAdapter.scala deleted file mode 100644 index 3f16fa47..00000000 --- a/spark-event-exporter/src/main/scala/com/lightbend/kafka/sparkeventexporter/internal/SparkEventAdapter.scala +++ /dev/null @@ -1,118 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter.internal -import java.time.Instant - -import com.lightbend.kafka.kafkametricstools.Domain.{Measurements, TopicPartition} -import com.lightbend.kafka.sparkeventexporter.internal.Domain.{Query, SourceMetrics} -import org.apache.spark.sql.streaming.{SourceProgress, StreamingQueryProgress} -import org.json4s.JsonAST.{JInt, JObject} -import org.json4s.jackson.JsonMethods.parseOpt - -import scala.util.Try - -object SparkEventAdapter { - /** - * Parse a `org.apache.spark.sql.streaming.StreamingQueryProgress` for relevant metric labels and values. - * - * An example of this JSON could be - * - * ``` - * { - * "id" : "9a8b3024-d197-4689-9075-43c312e33637", - * "runId" : "1b88089d-b62a-4194-89db-2e22708c0fad", - * "name" : null, - * "timestamp" : "2019-02-04T18:22:08.230Z", - * "batchId" : 1, - * "numInputRows" : 1832, - * "inputRowsPerSecond" : 48.09156297579671, - * "processedRowsPerSecond" : 184.9384211588936, - * "durationMs" : { - * "addBatch" : 9291, - * "getBatch" : 3, - * "getEndOffset" : 1, - * "queryPlanning" : 391, - * "setOffsetRange" : 17, - * "triggerExecution" : 9906, - * "walCommit" : 173 - * }, - * "eventTime" : { - * "avg" : "2019-02-04T18:21:52.239Z", - * "max" : "2019-02-04T18:22:07.000Z", - * "min" : "2019-02-04T18:21:38.000Z", - * "watermark" : "2019-02-04T18:20:38.000Z" - * }, - * "stateOperators" : [ { - * "numRowsTotal" : 3, - * "numRowsUpdated" : 2, - * "memoryUsedBytes" : 113431, - * "customMetrics" : { - * "loadedMapCacheHitCount" : 400, - * "loadedMapCacheMissCount" : 0, - * "stateOnCurrentVersionSizeBytes" : 22047 - * } - * } ], - * "sources" : [ { - * "description" : "KafkaV2[Subscribe[call-record-pipeline-seglo.cdr-validator.out-1]]", - * "startOffset" : { - * "call-record-pipeline-seglo.cdr-validator.out-1" : { - * "0" : 12382, - * "1" : 12126, - * } - * }, - * "endOffset" : { - * "call-record-pipeline-seglo.cdr-validator.out-1" : { - * "0" : 12428, - * "1" : 12156, - * } - * }, - * "numInputRows" : 1832, - * "inputRowsPerSecond" : 48.09156297579671, - * "processedRowsPerSecond" : 184.9384211588936 - * } ], - * "sink" : { - * "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider@6598181e" - * } - * ``` - */ - def parseProgress(qp: StreamingQueryProgress): Query = { - val sparkQueryId: String = qp.id.toString - val timestamp: Long = Instant.parse(qp.timestamp).toEpochMilli - - val sourceMetrics = qp.sources.toList.map { sp: SourceProgress => - val endOffsets = parseEndOffsets(sp.endOffset, timestamp) - SourceMetrics(sp.inputRowsPerSecond, sp.processedRowsPerSecond, endOffsets) - } - - - Query(sparkQueryId, timestamp, sourceMetrics) - } - - /** - * Parse the `endOffsets` JSON happy path from the Spark `org.apache.spark.sql.streaming.SourceProgress` object. - * - * An example of this JSON could be: - * - * ``` - * { - * "call-record-pipeline-seglo.cdr-validator.out-1" : { - * "0" : 12477, - * "1" : 12293, - * "2" : 11274 - * } - * } - * ``` - */ - def parseEndOffsets(endOffsetsJson: String, timestamp: Long): Map[TopicPartition, Measurements.Single] = { - val endOffsets = parseOpt(endOffsetsJson) - - val lastOffsets = for { - JObject(topicJson) <- endOffsets.toList - (topic, JObject(offsetsJson)) <- topicJson - (partitionString, JInt(offsetBigInt)) <- offsetsJson - offset = offsetBigInt.toLong - partition <- Try(partitionString.toInt).toOption - } - yield TopicPartition(topic, partition) -> Measurements.Single(offset, timestamp) - - lastOffsets.toMap - } -} diff --git a/spark-event-exporter/src/main/scala/org/apache/spark/lightbend/sparkeventexporter/SparkEventExporterSource.scala b/spark-event-exporter/src/main/scala/org/apache/spark/lightbend/sparkeventexporter/SparkEventExporterSource.scala deleted file mode 100644 index 265c10ce..00000000 --- a/spark-event-exporter/src/main/scala/org/apache/spark/lightbend/sparkeventexporter/SparkEventExporterSource.scala +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.spark.lightbend.sparkeventexporter - -import com.codahale.metrics.MetricRegistry -import org.apache.spark.metrics.source.Source - -class SparkEventExporterSource extends Source with Serializable { - private val _metricRegistry = new MetricRegistry - override def sourceName: String = "SparkEventExporter" - override def metricRegistry: MetricRegistry = _metricRegistry -} - diff --git a/spark-event-exporter/src/test/scala/com/lightbend/kafka/sparkeventexporter/internal/MetricCollectorSpec.scala b/spark-event-exporter/src/test/scala/com/lightbend/kafka/sparkeventexporter/internal/MetricCollectorSpec.scala deleted file mode 100644 index 0120507f..00000000 --- a/spark-event-exporter/src/test/scala/com/lightbend/kafka/sparkeventexporter/internal/MetricCollectorSpec.scala +++ /dev/null @@ -1,75 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter.internal - -import akka.actor.testkit.typed.scaladsl.{BehaviorTestKit, TestInbox} -import com.lightbend.kafka.kafkametricstools -import com.lightbend.kafka.kafkametricstools.Domain._ -import com.lightbend.kafka.kafkametricstools.KafkaClient.KafkaClientContract -import com.lightbend.kafka.kafkametricstools.{KafkaCluster, MetricsSink} -import com.lightbend.kafka.sparkeventexporter.internal.Domain.SourceMetrics -import org.mockito.MockitoSugar -import org.scalatest.{Matchers, _} - -class MetricCollectorSpec extends FreeSpec with Matchers with kafkametricstools.TestData with MockitoSugar { - val client: KafkaClientContract = mock[KafkaClientContract] - val sparkAppId = "my-spark-id-uuid" - - "MetricCollectorSpec should send" - { - val reporter = TestInbox[MetricsSink.Message]() - - val state = MetricCollector.CollectorState( - name = "my-app-foo", - cluster = KafkaCluster("kafka-cluster-name", ""), - latestOffsets = PartitionOffsets() + (topicPartition0 -> Measurements.Single(offset = 100, timestamp = 100)), - lastOffsets = PartitionOffsets() + (topicPartition0 -> Measurements.Single(offset = 90, timestamp = 100)) - ) - - val providedName = state.name - val clusterName = state.cluster.name - - val behavior = MetricCollector.collector(client, reporter.ref, state) - val testKit = BehaviorTestKit(behavior) - - val newOffsets = MetricCollector.NewOffsets( - sparkAppId = sparkAppId, - sourceMetrics = List(SourceMetrics(1000, 500, Map(topicPartition0 -> Measurements.Single(180, timestamp = 200)))), - latestOffsets = PartitionOffsets() + (topicPartition0 -> Measurements.Single(offset = 200, timestamp = 200)), - lastOffsets = PartitionOffsets() + (topicPartition0 -> Measurements.Single(offset = 180, timestamp = 200)) - ) - - testKit.run(newOffsets) - - val metrics = reporter.receiveAll() - - "report 5 metrics" in { metrics.length shouldBe 5 } - -// "latest offset metric" in { -// metrics should contain( -// Metrics.LatestOffsetMetric(clusterName, sparkAppId, providedName, topicPartition0, value = 200)) -// } - - "last group offset metric" in { - metrics should contain( - Metrics.LastOffsetMetric(clusterName, sparkAppId, providedName, topicPartition0, value = 180)) - } - - "offset lag metric" in { - metrics should contain( - Metrics.OffsetLagMetric(clusterName, sparkAppId, providedName, topicPartition0, value = 20)) - } - - "time lag metric" in { - metrics should contain( - Metrics.TimeLagMetric(clusterName, sparkAppId, providedName, topicPartition0, value = 0.022)) - } - - "input throughput metric" in { - metrics should contain( - Metrics.InputRecordsPerSecondMetric(clusterName, sparkAppId, providedName, topicPartition0.topic, 1000)) - } - - "processed throughput metric" in { - metrics should contain( - Metrics.ProcessedRecordsPerSecondMetric(clusterName, sparkAppId, providedName, topicPartition0.topic, 500)) - } - } -} diff --git a/spark-event-exporter/src/test/scala/com/lightbend/kafka/sparkeventexporter/internal/SparkEventAdapterSpec.scala b/spark-event-exporter/src/test/scala/com/lightbend/kafka/sparkeventexporter/internal/SparkEventAdapterSpec.scala deleted file mode 100644 index 86c664d2..00000000 --- a/spark-event-exporter/src/test/scala/com/lightbend/kafka/sparkeventexporter/internal/SparkEventAdapterSpec.scala +++ /dev/null @@ -1,31 +0,0 @@ -package com.lightbend.kafka.sparkeventexporter.internal - -import com.lightbend.kafka.kafkametricstools -import com.lightbend.kafka.kafkametricstools.Domain.{Measurements, PartitionOffsets, TopicPartition} -import org.mockito.MockitoSugar -import org.scalatest.{Matchers, _} - -class SparkEventAdapterSpec extends FreeSpec with Matchers with kafkametricstools.TestData with MockitoSugar { - "SparkEventAdapterSpec" - { - "parseEndOffsets" in { - val endOffsetsJson = - """ - |{ - | "call-record-pipeline-seglo.cdr-validator.out-1" : { - | "0" : 12477, - | "1" : 12293, - | "2" : 11274 - | } - |} - """.stripMargin - - val offsets: PartitionOffsets = SparkEventAdapter.parseEndOffsets(endOffsetsJson, 0) - - offsets shouldBe Map( - TopicPartition("call-record-pipeline-seglo.cdr-validator.out-1", 0) -> Measurements.Single(12477, 0), - TopicPartition("call-record-pipeline-seglo.cdr-validator.out-1", 1) -> Measurements.Single(12293, 0), - TopicPartition("call-record-pipeline-seglo.cdr-validator.out-1", 2) -> Measurements.Single(11274, 0) - ) - } - } -} diff --git a/spark-event-exporter/src/main/resources/logback.xml b/src/main/resources/logback.xml similarity index 88% rename from spark-event-exporter/src/main/resources/logback.xml rename to src/main/resources/logback.xml index 7b4e5175..ded95953 100644 --- a/spark-event-exporter/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -8,7 +8,7 @@ - + diff --git a/kafka-lag-exporter/src/main/resources/reference.conf b/src/main/resources/reference.conf similarity index 100% rename from kafka-lag-exporter/src/main/resources/reference.conf rename to src/main/resources/reference.conf diff --git a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/AppConfig.scala b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala similarity index 90% rename from kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/AppConfig.scala rename to src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala index fa29ca0e..c22a3044 100644 --- a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/AppConfig.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala @@ -1,6 +1,5 @@ -package com.lightbend.kafka.kafkalagexporter +package com.lightbend.kafkalagexporter -import com.lightbend.kafka.kafkametricstools.{KafkaCluster, SimpleConfig} import com.typesafe.config.Config import scala.collection.JavaConverters._ @@ -26,9 +25,9 @@ object AppConfig { } } +final case class KafkaCluster(name: String, bootstrapBrokers: String) final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, port: Int, clientGroupId: String, - clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) - extends SimpleConfig { + clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) { override def toString(): String = { val clusterString = if (clusters.isEmpty) diff --git a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/ConsumerGroupCollector.scala b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala similarity index 96% rename from kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/ConsumerGroupCollector.scala rename to src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala index 81072e18..52d8ffdf 100644 --- a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/ConsumerGroupCollector.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollector.scala @@ -1,20 +1,19 @@ -package com.lightbend.kafka.kafkalagexporter +package com.lightbend.kafkalagexporter import java.time.Clock import akka.actor.Cancellable import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ActorRef, Behavior, PostStop, SupervisorStrategy} -import com.lightbend.kafka.kafkametricstools.KafkaClient.KafkaClientContract -import com.lightbend.kafka.kafkametricstools.{Domain, LookupTable, MetricsSink} +import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract import scala.collection.immutable -import scala.concurrent.duration.{FiniteDuration, _} +import scala.concurrent.duration._ import scala.concurrent.{ExecutionContextExecutor, Future} import scala.util.{Failure, Success} object ConsumerGroupCollector { - import com.lightbend.kafka.kafkametricstools.Domain._ + import Domain._ sealed trait Message sealed trait Collect extends Message diff --git a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/Domain.scala b/src/main/scala/com/lightbend/kafkalagexporter/Domain.scala similarity index 96% rename from kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/Domain.scala rename to src/main/scala/com/lightbend/kafkalagexporter/Domain.scala index 750c5020..5e8ba63b 100644 --- a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/Domain.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/Domain.scala @@ -1,4 +1,4 @@ -package com.lightbend.kafka.kafkametricstools +package com.lightbend.kafkalagexporter object Domain { final case class TopicPartition(topic: String, partition: Int) diff --git a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala similarity index 98% rename from kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/KafkaClient.scala rename to src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index 988f9f87..0849116f 100644 --- a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -1,11 +1,11 @@ -package com.lightbend.kafka.kafkametricstools +package com.lightbend.kafkalagexporter import java.time.Duration import java.util.Properties import java.util.concurrent.TimeUnit import java.{lang, util} -import com.lightbend.kafka.kafkametricstools.KafkaClient.KafkaClientContract +import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.common.serialization.StringDeserializer diff --git a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/KafkaClusterManager.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClusterManager.scala similarity index 90% rename from kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/KafkaClusterManager.scala rename to src/main/scala/com/lightbend/kafkalagexporter/KafkaClusterManager.scala index a77273d2..1894e22c 100644 --- a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/KafkaClusterManager.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClusterManager.scala @@ -1,10 +1,9 @@ -package com.lightbend.kafka.kafkalagexporter +package com.lightbend.kafkalagexporter import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed._ -import com.lightbend.kafka.kafkalagexporter.watchers.Watcher -import com.lightbend.kafka.kafkametricstools.KafkaClient.KafkaClientContract -import com.lightbend.kafka.kafkametricstools.{KafkaCluster, MetricsReporter, MetricsSink} +import akka.actor.typed.{ActorRef, Behavior, ChildFailed} +import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract +import com.lightbend.kafkalagexporter.watchers.Watcher object KafkaClusterManager { sealed trait Message @@ -77,5 +76,3 @@ object KafkaClusterManager { Behaviors.same } } - - diff --git a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/LookupTable.scala b/src/main/scala/com/lightbend/kafkalagexporter/LookupTable.scala similarity index 98% rename from kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/LookupTable.scala rename to src/main/scala/com/lightbend/kafkalagexporter/LookupTable.scala index e680cdee..5e51dbc1 100644 --- a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/LookupTable.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/LookupTable.scala @@ -1,4 +1,5 @@ -package com.lightbend.kafka.kafkametricstools +package com.lightbend.kafkalagexporter + import scala.collection.mutable object LookupTable { diff --git a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/MainApp.scala b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala similarity index 89% rename from kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/MainApp.scala rename to src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala index 945d891c..82fd31f5 100644 --- a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/MainApp.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala @@ -1,9 +1,8 @@ -package com.lightbend.kafka.kafkalagexporter +package com.lightbend.kafkalagexporter import java.util.concurrent.Executors import akka.actor.typed.ActorSystem -import com.lightbend.kafka.kafkametricstools.{KafkaClient, PrometheusEndpointSink} import com.typesafe.config.{Config, ConfigFactory} import scala.concurrent.duration._ diff --git a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/Metrics.scala b/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala similarity index 94% rename from kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/Metrics.scala rename to src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala index ae563894..75315dab 100644 --- a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/Metrics.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala @@ -1,7 +1,6 @@ -package com.lightbend.kafka.kafkalagexporter +package com.lightbend.kafkalagexporter -import com.lightbend.kafka.kafkametricstools.Domain -import com.lightbend.kafka.kafkametricstools.MetricsSink.{GaugeDefinition, Message, Metric, MetricDefinitions} +import com.lightbend.kafkalagexporter.MetricsSink._ object Metrics { final case class LatestOffsetMetric(clusterName: String, topicPartition: Domain.TopicPartition, value: Double) diff --git a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/MetricsReporter.scala b/src/main/scala/com/lightbend/kafkalagexporter/MetricsReporter.scala similarity index 86% rename from kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/MetricsReporter.scala rename to src/main/scala/com/lightbend/kafkalagexporter/MetricsReporter.scala index 4e05accc..c3560d47 100644 --- a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/MetricsReporter.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MetricsReporter.scala @@ -1,8 +1,8 @@ -package com.lightbend.kafka.kafkametricstools +package com.lightbend.kafkalagexporter import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{Behavior, PostStop} -import com.lightbend.kafka.kafkametricstools.MetricsSink.{Message, Metric, Stop} +import com.lightbend.kafkalagexporter.MetricsSink.{Message, Metric, Stop} object MetricsReporter { def init( diff --git a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/MetricsSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala similarity index 78% rename from kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/MetricsSink.scala rename to src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala index a9de7d83..32047570 100644 --- a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/MetricsSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala @@ -1,6 +1,6 @@ -package com.lightbend.kafka.kafkametricstools +package com.lightbend.kafkalagexporter -import com.lightbend.kafka.kafkametricstools.MetricsSink.Metric +import com.lightbend.kafkalagexporter.MetricsSink.Metric object MetricsSink { trait Message diff --git a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/PrometheusEndpointSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala similarity index 91% rename from kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/PrometheusEndpointSink.scala rename to src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala index 5924f4cc..08d36174 100644 --- a/kafka-metrics-tools/src/main/scala/com/lightbend/kafka/kafkametricstools/PrometheusEndpointSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala @@ -1,8 +1,9 @@ -package com.lightbend.kafka.kafkametricstools -import com.lightbend.kafka.kafkametricstools.MetricsSink.{Metric, MetricDefinitions} -import io.prometheus.client.{CollectorRegistry, Gauge} +package com.lightbend.kafkalagexporter + +import com.lightbend.kafkalagexporter.MetricsSink.{Metric, MetricDefinitions} import io.prometheus.client.exporter.HTTPServer import io.prometheus.client.hotspot.DefaultExports +import io.prometheus.client.{CollectorRegistry, Gauge} import scala.util.Try diff --git a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/watchers/StrimziClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/watchers/StrimziClient.scala similarity index 96% rename from kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/watchers/StrimziClient.scala rename to src/main/scala/com/lightbend/kafkalagexporter/watchers/StrimziClient.scala index 6e1fe478..4b3e3748 100644 --- a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/watchers/StrimziClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/watchers/StrimziClient.scala @@ -1,9 +1,9 @@ -package com.lightbend.kafka.kafkalagexporter.watchers +package com.lightbend.kafkalagexporter.watchers import java.lang import com.fasterxml.jackson.annotation.JsonIgnoreProperties -import com.lightbend.kafka.kafkametricstools.KafkaCluster +import com.lightbend.kafkalagexporter.KafkaCluster import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinitionBuilder import io.fabric8.kubernetes.client.{Watcher => FWatcher, _} import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable diff --git a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/watchers/StrimziClusterWatcher.scala b/src/main/scala/com/lightbend/kafkalagexporter/watchers/StrimziClusterWatcher.scala similarity index 85% rename from kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/watchers/StrimziClusterWatcher.scala rename to src/main/scala/com/lightbend/kafkalagexporter/watchers/StrimziClusterWatcher.scala index c055e69a..c305d09b 100644 --- a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/watchers/StrimziClusterWatcher.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/watchers/StrimziClusterWatcher.scala @@ -1,9 +1,10 @@ -package com.lightbend.kafka.kafkalagexporter.watchers +package com.lightbend.kafkalagexporter.watchers import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ActorRef, Behavior, PostStop} -import com.lightbend.kafka.kafkalagexporter.KafkaClusterManager -import com.lightbend.kafka.kafkametricstools.KafkaCluster +import com.lightbend.kafkalagexporter.KafkaCluster +import com.lightbend.kafkalagexporter.KafkaClusterManager + object StrimziClusterWatcher { val name: String = "strimzi" diff --git a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/watchers/Watcher.scala b/src/main/scala/com/lightbend/kafkalagexporter/watchers/Watcher.scala similarity index 83% rename from kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/watchers/Watcher.scala rename to src/main/scala/com/lightbend/kafkalagexporter/watchers/Watcher.scala index de3f981e..92ad6dd6 100644 --- a/kafka-lag-exporter/src/main/scala/com/lightbend/kafka/kafkalagexporter/watchers/Watcher.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/watchers/Watcher.scala @@ -1,9 +1,9 @@ -package com.lightbend.kafka.kafkalagexporter.watchers +package com.lightbend.kafkalagexporter.watchers import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.ActorContext -import com.lightbend.kafka.kafkametricstools.KafkaCluster -import com.lightbend.kafka.kafkalagexporter.{AppConfig, KafkaClusterManager} +import com.lightbend.kafkalagexporter.{AppConfig, KafkaCluster} +import com.lightbend.kafkalagexporter.KafkaClusterManager object Watcher { diff --git a/kafka-lag-exporter/src/test/resources/logback.xml b/src/test/resources/logback.xml similarity index 88% rename from kafka-lag-exporter/src/test/resources/logback.xml rename to src/test/resources/logback.xml index b85e8381..d2f70d94 100644 --- a/kafka-lag-exporter/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -8,7 +8,7 @@ - + diff --git a/kafka-lag-exporter/src/test/resources/reference.conf b/src/test/resources/reference.conf similarity index 100% rename from kafka-lag-exporter/src/test/resources/reference.conf rename to src/test/resources/reference.conf diff --git a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/ConsumerGroupCollectorSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala similarity index 94% rename from kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/ConsumerGroupCollectorSpec.scala rename to src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala index 88206e58..89cbec67 100644 --- a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/ConsumerGroupCollectorSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/ConsumerGroupCollectorSpec.scala @@ -1,18 +1,16 @@ -package com.lightbend.kafka.kafkalagexporter +package com.lightbend.kafkalagexporter import java.time.{Clock, Instant, ZoneId} import akka.actor.testkit.typed.scaladsl.{BehaviorTestKit, TestInbox} -import com.lightbend.kafka.kafkametricstools -import com.lightbend.kafka.kafkametricstools.KafkaClient.KafkaClientContract -import com.lightbend.kafka.kafkametricstools.LookupTable._ -import com.lightbend.kafka.kafkametricstools.{Domain, LookupTable, MetricsSink} +import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract +import com.lightbend.kafkalagexporter.LookupTable._ import org.mockito.MockitoSugar import org.scalatest.{Matchers, _} import scala.concurrent.duration._ -class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with kafkametricstools.TestData with MockitoSugar { +class ConsumerGroupCollectorSpec extends FreeSpec with Matchers with TestData with MockitoSugar { val client: KafkaClientContract = mock[KafkaClientContract] val config = ConsumerGroupCollector.CollectorConfig(0 seconds, 20, "default", "", Clock.fixed(Instant.ofEpochMilli(0), ZoneId.systemDefault())) diff --git a/kafka-metrics-tools/src/test/scala/com/lightbend/kafka/kafkametricstools/LookupTableSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/LookupTableSpec.scala similarity index 98% rename from kafka-metrics-tools/src/test/scala/com/lightbend/kafka/kafkametricstools/LookupTableSpec.scala rename to src/test/scala/com/lightbend/kafkalagexporter/LookupTableSpec.scala index eeda89bc..014cf260 100644 --- a/kafka-metrics-tools/src/test/scala/com/lightbend/kafka/kafkametricstools/LookupTableSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/LookupTableSpec.scala @@ -1,9 +1,10 @@ -package com.lightbend.kafka.kafkametricstools +package com.lightbend.kafkalagexporter + import org.scalatest.{FreeSpec, Matchers} class LookupTableSpec extends FreeSpec with Matchers { - import LookupTable._ + import com.lightbend.kafkalagexporter.LookupTable._ "LookupTable" - { "lookupOffset" - { diff --git a/kafka-metrics-tools/src/test/scala/com/lightbend/kafka/kafkametricstools/TestData.scala b/src/test/scala/com/lightbend/kafkalagexporter/TestData.scala similarity index 87% rename from kafka-metrics-tools/src/test/scala/com/lightbend/kafka/kafkametricstools/TestData.scala rename to src/test/scala/com/lightbend/kafkalagexporter/TestData.scala index 7b4cdda5..974c7815 100644 --- a/kafka-metrics-tools/src/test/scala/com/lightbend/kafka/kafkametricstools/TestData.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/TestData.scala @@ -1,6 +1,6 @@ -package com.lightbend.kafka.kafkametricstools +package com.lightbend.kafkalagexporter -import com.lightbend.kafka.kafkametricstools.Domain.{ConsumerGroup, ConsumerGroupMember, GroupTopicPartition, TopicPartition} +import com.lightbend.kafkalagexporter.Domain._ trait TestData { val topicPartition0 = TopicPartition("test-topic", 0) diff --git a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/IntegrationSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala similarity index 93% rename from kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/IntegrationSpec.scala rename to src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala index 88d2ffe6..b871cbe9 100644 --- a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/IntegrationSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala @@ -1,8 +1,8 @@ -package com.lightbend.kafka.kafkalagexporter.integration +package com.lightbend.kafkalagexporter.integration import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped -import com.lightbend.kafka.kafkalagexporter.Metrics._ +import com.lightbend.kafkalagexporter.Metrics._ import org.scalatest.BeforeAndAfterEach import scala.concurrent.duration._ @@ -52,7 +52,7 @@ class IntegrationSpec extends SpecBase(kafkaPort = 9094, exporterPort = 8000) wi var lastLagInTime: Double = 0 - def isIncreasing: String => Unit = (actual: String) => { + val isIncreasing: String => Unit = (actual: String) => { val parsedDoubleTry = Try(actual.toDouble) assert(parsedDoubleTry.isSuccess) val parsedDouble = parsedDoubleTry.get diff --git a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/LagSim.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/LagSim.scala similarity index 97% rename from kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/LagSim.scala rename to src/test/scala/com/lightbend/kafkalagexporter/integration/LagSim.scala index 27ffd693..99bff132 100644 --- a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/LagSim.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/LagSim.scala @@ -1,4 +1,5 @@ -package com.lightbend.kafka.kafkalagexporter.integration +package com.lightbend.kafkalagexporter.integration + import akka.actor.Cancellable import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{Behavior, PostStop} diff --git a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/PrometheusUtils.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala similarity index 96% rename from kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/PrometheusUtils.scala rename to src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala index 06f55e02..d9781a7e 100644 --- a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/PrometheusUtils.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala @@ -1,10 +1,11 @@ -package com.lightbend.kafka.kafkalagexporter.integration +package com.lightbend.kafkalagexporter.integration + import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes} import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.Materializer -import com.lightbend.kafka.kafkalagexporter.Metrics +import com.lightbend.kafkalagexporter.Metrics import org.scalatest.Matchers import org.scalatest.concurrent.ScalaFutures import org.slf4j.{Logger, LoggerFactory} diff --git a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/SpecBase.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala similarity index 92% rename from kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/SpecBase.scala rename to src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala index eacd0fd1..299ccaf9 100644 --- a/kafka-lag-exporter/src/test/scala/com/lightbend/kafka/kafkalagexporter/integration/SpecBase.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala @@ -1,8 +1,9 @@ -package com.lightbend.kafka.kafkalagexporter.integration +package com.lightbend.kafkalagexporter.integration import akka.actor.typed.ActorSystem import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec} -import com.lightbend.kafka.kafkalagexporter.{KafkaClusterManager, MainApp} +import com.lightbend.kafkalagexporter.MainApp +import com.lightbend.kafkalagexporter.KafkaClusterManager import com.typesafe.config.{Config, ConfigFactory} import net.manub.embeddedkafka.EmbeddedKafkaConfig import org.scalatest.concurrent.{Eventually, ScalaFutures}