From 5c2c52e4e2a24a22663da88716c5c4352ff5f09e Mon Sep 17 00:00:00 2001 From: Sean Glover Date: Sun, 3 Apr 2022 13:39:44 -0400 Subject: [PATCH 1/6] decouple prometheusutils --- examples/k8s/minikube-coredns-fix.yaml | 34 +++++++++++++++++++ examples/k8s/strimzi-kafka-cluster.yaml | 33 ++++++++++++++++++ .../integration/ExporterPorts.scala | 4 +-- .../integration/MinikubeIntegrationSpec.scala | 20 +++++++++++ .../integration/PrometheusUtils.scala | 9 ++++- 5 files changed, 97 insertions(+), 3 deletions(-) create mode 100644 examples/k8s/minikube-coredns-fix.yaml create mode 100644 examples/k8s/strimzi-kafka-cluster.yaml create mode 100644 src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala diff --git a/examples/k8s/minikube-coredns-fix.yaml b/examples/k8s/minikube-coredns-fix.yaml new file mode 100644 index 00000000..32c8b934 --- /dev/null +++ b/examples/k8s/minikube-coredns-fix.yaml @@ -0,0 +1,34 @@ +# coredns doesn't like references to nameservers on local loop back +# fix replaces `forward` with generic config pointing to google dns +# https://github.com/kubernetes/minikube/issues/7512#issuecomment-664348459 +apiVersion: v1 +kind: ConfigMap +metadata: + name: coredns + namespace: kube-system +data: + Corefile: | + .:53 { + errors + health { + lameduck 5s + } + ready + kubernetes cluster.local in-addr.arpa ip6.arpa { + pods insecure + fallthrough in-addr.arpa ip6.arpa + ttl 30 + } + prometheus :9153 + hosts { + 192.168.49.1 host.minikube.internal + fallthrough + } + cache 30 + loop + reload + loadbalance + forward . 8.8.8.8 8.8.4.4 + } + + diff --git a/examples/k8s/strimzi-kafka-cluster.yaml b/examples/k8s/strimzi-kafka-cluster.yaml new file mode 100644 index 00000000..6ec4fcdc --- /dev/null +++ b/examples/k8s/strimzi-kafka-cluster.yaml @@ -0,0 +1,33 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: strimzi-kafka-cluster +spec: + kafka: + version: 3.0.0 + replicas: 1 + listeners: + - name: plain + port: 9092 + type: internal + tls: false + - name: tls + port: 9093 + type: internal + tls: true + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + default.replication.factor: 1 + min.insync.replicas: 1 + inter.broker.protocol.version: "3.0" + storage: + type: ephemeral + zookeeper: + replicas: 3 + storage: + type: ephemeral + entityOperator: + topicOperator: {} + userOperator: {} \ No newline at end of file diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala index 3f2309a8..c6e3375c 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala @@ -8,6 +8,6 @@ package com.lightbend.kafkalagexporter.integration object ExporterPorts { val IntegrationSpec = 8000 - val MetricsEvictionSpec = 8001 - val MetricsEvictionOnFailureSpec = 8002 + val MetricsEvictionSpec = 8991 + val MetricsEvictionOnFailureSpec = 8992 } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala new file mode 100644 index 00000000..227edf1a --- /dev/null +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2018-2022 Lightbend Inc. + * Copyright (C) 2022 Sean Glover + */ + +//package com.lightbend.kafkalagexporter.integration +// +//import org.scalatest.BeforeAndAfterEach +//import org.scalatest.concurrent.{Eventually, ScalaFutures} +//import org.scalatest.matchers.should.Matchers +//import org.scalatest.wordspec.AnyWordSpecLike +// +//class MinikubeIntegrationSpec extends AnyWordSpecLike +//with BeforeAndAfterEach +//with Matchers +//with ScalaFutures +//with Eventually +//with PrometheusUtils { +// +//} diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala index 79fbb269..a0bc031a 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala @@ -5,10 +5,14 @@ package com.lightbend.kafkalagexporter.integration +import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes} import akka.http.scaladsl.unmarshalling.Unmarshal import com.lightbend.kafkalagexporter.MetricsSink.GaugeDefinition +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.slf4j.Logger import scala.concurrent.{ExecutionContext, Future} import scala.util.matching.Regex @@ -16,7 +20,10 @@ import scala.util.matching.Regex /** * Test utilities to parse the Prometheus health endpoint to assert metrics in integration tests. */ -trait PrometheusUtils { self: SpecBase => +trait PrometheusUtils extends ScalaFutures with Matchers { + implicit val system: ActorSystem + val log: Logger + private val http = Http() def scrape(port: Int, rules: Rule*)(implicit ec: ExecutionContext): Future[List[Result]] = { From beab9a4000117e50e838d6b490ebedecd860d4a7 Mon Sep 17 00:00:00 2001 From: Sean Glover Date: Sun, 3 Apr 2022 19:20:20 -0400 Subject: [PATCH 2/6] local minikube --- .github/ISSUE_TEMPLATE/bug_report.md | 2 +- .github/workflows/ci.yaml | 39 +++++++++ charts/kafka-lag-exporter/values.yaml | 2 +- .../k8s/kafka-lag-exporter-helm-values.yaml | 13 +++ .../k8s/scripts/get_nodeport_for_service.sh | 5 ++ examples/k8s/strimzi-kafka-cluster.yaml | 4 + examples/k8s/strimzi-topic-template.yaml | 12 +++ examples/k8s/strimzi-topic.yaml | 12 +++ src/test/resources/logback-test.xml | 2 +- .../integration/IntegrationSpec.scala | 6 +- .../MetricsEvictionOnFailureSpec.scala | 4 +- .../integration/MetricsEvictionSpec.scala | 4 +- .../integration/MinikubeIntegrationSpec.scala | 82 +++++++++++++++---- .../integration/MinikubeSpecBase.scala | 74 +++++++++++++++++ .../integration/PrometheusUtils.scala | 16 ++-- .../integration/SpecBase.scala | 2 + 16 files changed, 246 insertions(+), 33 deletions(-) create mode 100644 examples/k8s/kafka-lag-exporter-helm-values.yaml create mode 100755 examples/k8s/scripts/get_nodeport_for_service.sh create mode 100644 examples/k8s/strimzi-topic-template.yaml create mode 100644 examples/k8s/strimzi-topic.yaml create mode 100644 src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 7ce1e6ce..14d90097 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -12,7 +12,7 @@ A clear and concise description of what the bug is. **To Reproduce** Provide logs and application configuration. -See the [Troubleshooting](https://github.com/lightbend/kafka-lag-exporter#troubleshooting) section of the `README` for instructions on how to enable `DEBUG` logging. +See the [Troubleshooting](https://github.com/seglo/kafka-lag-exporter#troubleshooting) section of the `README` for instructions on how to enable `DEBUG` logging. **Environment** - Version diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 134da270..e8d63111 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -42,6 +42,45 @@ jobs: - name: Print logs on failure if: ${{ failure() }} run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \; + minikube-test: + name: End to end test on Minikube + runs-on: ubuntu-18.04 + strategy: + fail-fast: false + steps: + - name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Checkout GitHub merge + if: github.event.pull_request + run: |- + git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch + git checkout scratch + + - name: Setup Scala with Java ${{ matrix.java-version }} + uses: olafurpg/setup-scala@v13 + with: + java-version: openjdk@1.17 + + - name: Cache Coursier cache + uses: coursier/cache-action@v6 + + - name: Setup Minikube + if: github.event.pull_request + run: |- + sbt docker:publishLocal + minikube image load seglo/kafka-lag-exporter:latest + + helm repo add strimzi https://strimzi.io/charts/ + helm install strimzi strimzi/strimzi-kafka-operator + kubectl apply -f ./examples/strimzi-kafka-cluster.yaml + + sbt testOnly *.MinikubeIntegrationSpec + - name: Print logs on failure + if: ${{ failure() }} + run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \; build-docker: name: Build Docker Image diff --git a/charts/kafka-lag-exporter/values.yaml b/charts/kafka-lag-exporter/values.yaml index 46d684dc..487c983c 100644 --- a/charts/kafka-lag-exporter/values.yaml +++ b/charts/kafka-lag-exporter/values.yaml @@ -110,7 +110,7 @@ akkaLogLevel: DEBUG ## You probably won't need to change anything below this line. image: - repository: lightbend/kafka-lag-exporter + repository: seglo/kafka-lag-exporter tag: 0.6.9-SNAPSHOT # If digest is set it will be used instead of tag to specify the image # digest: sha256:0f6387aa011e6eb7952ea211ac139bf8613ad3ef6954a1a5d910676d293bd610 diff --git a/examples/k8s/kafka-lag-exporter-helm-values.yaml b/examples/k8s/kafka-lag-exporter-helm-values.yaml new file mode 100644 index 00000000..146be269 --- /dev/null +++ b/examples/k8s/kafka-lag-exporter-helm-values.yaml @@ -0,0 +1,13 @@ +clusters: + - name: "default" + bootstrapBrokers: "strimzi-kafka-cluster-kafka-bootstrap.default.svc.cluster.local:9092" +pollIntervalSeconds: 1 +service: + type: NodePort +rootLogLevel: INFO +kafkaLagExporterLogLevel: DEBUG +kafkaLogLevel: INFO +akkaLogLevel: DEBUG +image: + tag: latest + pullPolicy: Never diff --git a/examples/k8s/scripts/get_nodeport_for_service.sh b/examples/k8s/scripts/get_nodeport_for_service.sh new file mode 100755 index 00000000..abd03f05 --- /dev/null +++ b/examples/k8s/scripts/get_nodeport_for_service.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +export SERVICE=$1 +NODE_PORT=$(kubectl get service ${SERVICE} -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}') && +NODE_IP=$(minikube ip) && +echo ${NODE_IP}:${NODE_PORT} diff --git a/examples/k8s/strimzi-kafka-cluster.yaml b/examples/k8s/strimzi-kafka-cluster.yaml index 6ec4fcdc..87742803 100644 --- a/examples/k8s/strimzi-kafka-cluster.yaml +++ b/examples/k8s/strimzi-kafka-cluster.yaml @@ -15,6 +15,10 @@ spec: port: 9093 type: internal tls: true + - name: external + port: 9094 + type: nodeport + tls: false config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 diff --git a/examples/k8s/strimzi-topic-template.yaml b/examples/k8s/strimzi-topic-template.yaml new file mode 100644 index 00000000..1baaf5c8 --- /dev/null +++ b/examples/k8s/strimzi-topic-template.yaml @@ -0,0 +1,12 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + name: {{TOPIC_NAME}} + labels: + strimzi.io/cluster: strimzi-kafka-cluster +spec: + partitions: {{PARTITIONS}} + replicas: {{REPLICAS}} + config: + retention.ms: 7200000 + segment.bytes: 1073741824 diff --git a/examples/k8s/strimzi-topic.yaml b/examples/k8s/strimzi-topic.yaml new file mode 100644 index 00000000..eee99e2e --- /dev/null +++ b/examples/k8s/strimzi-topic.yaml @@ -0,0 +1,12 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + name: my-topic + labels: + strimzi.io/cluster: strimzi-kafka-cluster +spec: + partitions: 1 + replicas: 1 + config: + retention.ms: 7200000 + segment.bytes: 1073741824 diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 7af7f6c1..301dc167 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -18,7 +18,7 @@ - + diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala index f14cee35..a019b397 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala @@ -39,7 +39,7 @@ class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationS simulator.produceElements(totalOffsets) simulator.consumeElements(offsetsToCommit) - eventually(scrapeAndAssert(exporterPort, "Assert offset-based metrics", rules: _*)) + eventually(scrapeAndAssert(exporterHostPort, "Assert offset-based metrics", rules: _*)) simulator.shutdown() } @@ -68,7 +68,7 @@ class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationS val isIncreasingRule = Rule.create(TimeLagMetric, isIncreasing, clusterName, group, topic, partition) (1 to 3).foreach { i => - eventually(scrapeAndAssert(exporterPort, s"Assert lag in time metrics are increasing ($i)", isIncreasingRule)) + eventually(scrapeAndAssert(exporterHostPort, s"Assert lag in time metrics are increasing ($i)", isIncreasingRule)) } testKit.stop(simulatorActor) @@ -77,7 +77,7 @@ class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationS "report poll time metric greater than 0 ms" in { assertAllStagesStopped { val rule = Rule.create(PollTimeMetric, (actual: String) => actual.toDouble should be > 0d, clusterName) - eventually(scrapeAndAssert(exporterPort, "Assert poll time metric", rule)) + eventually(scrapeAndAssert(exporterHostPort, "Assert poll time metric", rule)) } } } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala index 79061620..732f8ee5 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala @@ -32,11 +32,11 @@ class MetricsEvictionOnFailureSpec extends SpecBase(exporterPort = ExporterPorts simulator.consumeElements(offsetsToCommit) simulator.shutdown() - eventually(scrapeAndAssert(exporterPort, "Assert offset-based metrics", rules: _*)) + eventually(scrapeAndAssert(exporterHostPort, "Assert offset-based metrics", rules: _*)) stopKafka() - eventually(scrapeAndAssertDne(exporterPort, "Assert offset-based metrics no longer exist", rules: _*)) + eventually(scrapeAndAssertDne(exporterHostPort, "Assert offset-based metrics no longer exist", rules: _*)) } } } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala index 27a5f13f..7ff7f8eb 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala @@ -34,11 +34,11 @@ class MetricsEvictionSpec extends SpecBase(exporterPort = ExporterPorts.MetricsE simulator.consumeElements(offsetsToCommit) simulator.shutdown() - eventually(scrapeAndAssert(exporterPort, "Assert offset-based metrics", rules: _*)) + eventually(scrapeAndAssert(exporterHostPort, "Assert offset-based metrics", rules: _*)) adminClient.deleteConsumerGroups(List(group).asJava) - eventually(scrapeAndAssertDne(exporterPort, "Assert offset-based metrics no longer exist", rules: _*)) + eventually(scrapeAndAssertDne(exporterHostPort, "Assert offset-based metrics no longer exist", rules: _*)) } } } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala index 227edf1a..949490e2 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala @@ -3,18 +3,70 @@ * Copyright (C) 2022 Sean Glover */ -//package com.lightbend.kafkalagexporter.integration -// -//import org.scalatest.BeforeAndAfterEach -//import org.scalatest.concurrent.{Eventually, ScalaFutures} -//import org.scalatest.matchers.should.Matchers -//import org.scalatest.wordspec.AnyWordSpecLike -// -//class MinikubeIntegrationSpec extends AnyWordSpecLike -//with BeforeAndAfterEach -//with Matchers -//with ScalaFutures -//with Eventually -//with PrometheusUtils { -// -//} +package com.lightbend.kafkalagexporter.integration + +import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped +import com.lightbend.kafkalagexporter.Metrics.{EarliestOffsetMetric, LastGroupOffsetMetric, LatestOffsetMetric, MaxGroupOffsetLagMetric, MaxGroupTimeLagMetric, OffsetLagMetric, TimeLagMetric} +import org.scalactic.source.Position +import org.scalatest.concurrent.PatienceConfiguration +import org.scalatest.time._ + +import scala.concurrent.duration.DurationInt + +class MinikubeIntegrationSpec extends MinikubeSpecBase { + + override def beforeAll(): Unit = { + setUp() + install() + waitForExporterService() + } + + override def afterAll(): Unit = { + cleanUp() + uninstall() + } + + "kafka lag exporter on minikube" should { + "install" in { + succeed + } + + "report lag" in { + val topic = createTopic() + println(s"created $topic") + succeed + } + + val group = createGroupId(1) + val partition = "0" + val clusterName = "default" + + "reports offset-based lag metrics" in { + assertAllStagesStopped { + val topic = createTopic(1, 1, 1) + + val offsetsToCommit = 5 + val totalOffsets = 10 + + val rules = List( + Rule.create(LatestOffsetMetric, (actual: String) => actual shouldBe (totalOffsets + 1).toDouble.toString, clusterName, topic, partition), + Rule.create(EarliestOffsetMetric, (actual: String) => actual shouldBe 0.toDouble.toString, clusterName, topic, partition), + Rule.create(LastGroupOffsetMetric, (actual: String) => actual shouldBe offsetsToCommit.toDouble.toString, clusterName, group, topic, partition), + Rule.create(OffsetLagMetric, (actual: String) => actual shouldBe (offsetsToCommit + 1).toDouble.toString, clusterName, group, topic, partition), + // TODO: update test so we can assert actual lag in time. keep producer running for more than two polling cycles. + Rule.create(TimeLagMetric, (_: String) => (), clusterName, group, topic, partition), + Rule.create(MaxGroupOffsetLagMetric, (actual: String) => actual shouldBe (offsetsToCommit + 1).toDouble.toString, clusterName, group), + Rule.create(MaxGroupTimeLagMetric, (_: String) => (), clusterName, group) + ) + + val simulator = new LagSimulator(topic, group) + simulator.produceElements(totalOffsets) + simulator.consumeElements(offsetsToCommit) + + eventually(scrapeAndAssert(exporterHostPort, "Assert offset-based metrics", rules: _*)) + + simulator.shutdown() + } + } + } +} diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala new file mode 100644 index 00000000..fde1ea03 --- /dev/null +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala @@ -0,0 +1,74 @@ +package com.lightbend.kafkalagexporter.integration + +import akka.actor.typed.ActorSystem +import akka.kafka.testkit.KafkaTestkitTestcontainersSettings +import akka.kafka.testkit.scaladsl.{KafkaSpec, ScalatestKafkaSpec, TestcontainersKafkaPerClassLike} +import com.lightbend.kafkalagexporter.{KafkaClusterManager, MainApp} +import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.matchers.should.Matchers +import org.scalatest.time._ +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.{Logger, LoggerFactory} + +import scala.concurrent.duration._ +import scala.util.Random +import sys.process._ + +abstract class MinikubeSpecBase + extends KafkaSpec(-1) + with AnyWordSpecLike + with BeforeAndAfterEach + with BeforeAndAfterAll + with Matchers + with ScalaFutures + with Eventually + with PrometheusUtils + with LagSim { + + private[this] val log: Logger = LoggerFactory.getLogger(getClass) + + implicit val patience: PatienceConfig = PatienceConfig(30.seconds, 2.seconds) + + val rnd: String = Random.alphanumeric.take(5).mkString + + override def createGroupId(suffix: Int): String = s"group-$suffix-$rnd" + override def createTopicName(suffix: Int): String = s"topic-$suffix-$rnd" + + override val bootstrapServers: String = + getNodePortForService("strimzi-kafka-cluster-kafka-external-bootstrap") + + def exporterHostPort: String = getNodePortForService("kafka-lag-exporter-service") + + def waitForExporterService(): Unit = { + val exp = timeout(Span(90, Seconds)) + eventually(exp)(scrape(exporterHostPort)) + } + + + +// private val topicCounter = new AtomicInteger() +// private def nextNumber(): Int = topicCounter.incrementAndGet() +// def createTopicName(suffix: Int): String = s"topic-$suffix-$nextNumber" +// +// override def createTopic(): String = createTopic(0, 1, 1) +// +// def createTopic(suffix: Int, partitions: Int, replication: Int): String = { +// val topicName = createTopicName(suffix) +// s"cat strimzi-topic-template.yaml | sed \"s/{{TOPIC_NAME}}/$topicName/g;s/{{PARTITIONS}}/$partitions/g;s/{{REPLICAS}}/$replication/g\" | kubectl apply -f -" ! +// topicName +// } + + def getNodePortForService(service: String): String = { + val nodePort = s"./examples/k8s/scripts/get_nodeport_for_service.sh $service".!!.trim + log.info(s"NodePort host and port for service '$service': $nodePort") + nodePort + } + + def install(): Unit = + "helm install kafka-lag-exporter ./charts/kafka-lag-exporter --values ./examples/k8s/kafka-lag-exporter-helm-values.yaml".! + + def uninstall(): Unit = "helm uninstall kafka-lag-exporter".! +} + diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala index a0bc031a..76cfb886 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala @@ -26,8 +26,8 @@ trait PrometheusUtils extends ScalaFutures with Matchers { private val http = Http() - def scrape(port: Int, rules: Rule*)(implicit ec: ExecutionContext): Future[List[Result]] = { - val request = HttpRequest(uri = s"http://localhost:$port/metrics") + def scrape(hostPort: String, rules: Rule*)(implicit ec: ExecutionContext): Future[List[Result]] = { + val request = HttpRequest(uri = s"http://$hostPort/metrics") for { HttpResponse(StatusCodes.OK, _, entity, _) <- http.singleRequest(request) body <- Unmarshal(entity).to[String] @@ -41,18 +41,18 @@ trait PrometheusUtils extends ScalaFutures with Matchers { } } - def scrapeAndAssert(port: Int, description: String, rules: Rule*) + def scrapeAndAssert(hostPort: String, description: String, rules: Rule*) (implicit ec: ExecutionContext): Unit = - scrapeAndAssert(port, description, _.assert(), rules: _*) + scrapeAndAssert(hostPort, description, _.assert(), rules: _*) - def scrapeAndAssertDne(port: Int, description: String, rules: Rule*) + def scrapeAndAssertDne(hostPort: String, description: String, rules: Rule*) (implicit ec: ExecutionContext): Unit = - scrapeAndAssert(port, description, _.assertDne(), rules: _*) + scrapeAndAssert(hostPort, description, _.assertDne(), rules: _*) - private def scrapeAndAssert(port: Int, description: String, resultF: Result => Unit, rules: Rule*) + private def scrapeAndAssert(hostPort: String, description: String, resultF: Result => Unit, rules: Rule*) (implicit ec: ExecutionContext): Unit = { - val results = scrape(port, rules: _*).futureValue + val results = scrape(hostPort, rules: _*).futureValue log.debug("Start: {}", description) results.foreach(resultF) log.debug("End (Successful): {}", description) diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala index 0cd64385..bea1f7b2 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala @@ -43,6 +43,8 @@ abstract class SpecBase(val exporterPort: Int) val clusterName = "default" + val exporterHostPort = s"localhost:$exporterPort" + def config: Config = ConfigFactory.parseString(s""" |kafka-lag-exporter { | reporters.prometheus.port = $exporterPort From e430bc2f56f5076ac9261c53482dc27557779b81 Mon Sep 17 00:00:00 2001 From: Sean Glover Date: Sun, 3 Apr 2022 19:57:23 -0400 Subject: [PATCH 3/6] cleanup --- .github/workflows/ci.yaml | 3 + .../integration/ExporterPorts.scala | 2 +- .../integration/IntegrationSpec.scala | 25 ++++++- .../{SpecBase.scala => LocalSpecBase.scala} | 9 +-- .../MetricsEvictionOnFailureSpec.scala | 6 +- .../integration/MetricsEvictionSpec.scala | 5 +- .../integration/MinikubeIntegrationSpec.scala | 72 ------------------- .../integration/MinikubeSpecBase.scala | 57 ++++++--------- 8 files changed, 60 insertions(+), 119 deletions(-) rename src/test/scala/com/lightbend/kafkalagexporter/integration/{SpecBase.scala => LocalSpecBase.scala} (92%) delete mode 100644 src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e8d63111..2696711c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -67,6 +67,9 @@ jobs: - name: Cache Coursier cache uses: coursier/cache-action@v6 + - name: Start minikube + uses: medyagh/setup-minikube@master + - name: Setup Minikube if: github.event.pull_request run: |- diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala index c6e3375c..d1c1ebb4 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala @@ -7,7 +7,7 @@ Copyright (C) 2020-2022 Sean Glover package com.lightbend.kafkalagexporter.integration object ExporterPorts { - val IntegrationSpec = 8000 + val LocalIntegrationSpec = 8000 val MetricsEvictionSpec = 8991 val MetricsEvictionOnFailureSpec = 8992 } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala index a019b397..d078426a 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala @@ -6,14 +6,37 @@ package com.lightbend.kafkalagexporter.integration import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.kafka.testkit.scaladsl.KafkaSpec import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import com.lightbend.kafkalagexporter.Metrics._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.duration.DurationInt import scala.util.Try -class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationSpec) { +class LocalIntegrationSpec extends LocalSpecBase(exporterPort = ExporterPorts.LocalIntegrationSpec) with IntegrationSpec +class MiniKubeIntegrationSpec extends MinikubeSpecBase with IntegrationSpec { + override implicit val patience: PatienceConfig = PatienceConfig(90.seconds, 2.second) +} + +trait IntegrationSpec extends KafkaSpec + with AnyWordSpecLike + with BeforeAndAfterEach + with Matchers + with ScalaFutures + with Eventually + with PrometheusUtils + with LagSim { + + def exporterHostPort: String + + implicit val patience: PatienceConfig = PatienceConfig(30.seconds, 2.second) "kafka lag exporter" should { + val clusterName = "default" val group = createGroupId(1) val partition = "0" diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/LocalSpecBase.scala similarity index 92% rename from src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala rename to src/test/scala/com/lightbend/kafkalagexporter/integration/LocalSpecBase.scala index bea1f7b2..3451bf02 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/SpecBase.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/LocalSpecBase.scala @@ -19,21 +19,16 @@ import org.slf4j.{Logger, LoggerFactory} import scala.concurrent.Await import scala.concurrent.duration._ -abstract class SpecBase(val exporterPort: Int) +abstract class LocalSpecBase(val exporterPort: Int) extends ScalatestKafkaSpec(-1) with AnyWordSpecLike with BeforeAndAfterEach with TestcontainersKafkaPerClassLike - with Matchers - with ScalaFutures with Eventually - with PrometheusUtils - with LagSim { + with PrometheusUtils { private[this] val log: Logger = LoggerFactory.getLogger(getClass) - implicit val patience: PatienceConfig = PatienceConfig(30 seconds, 2 second) - override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system) .withConfigureKafka { brokerContainers => brokerContainers.foreach(_.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")) diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala index 732f8ee5..6741f003 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala @@ -7,7 +7,11 @@ package com.lightbend.kafkalagexporter.integration import com.lightbend.kafkalagexporter.Metrics._ -class MetricsEvictionOnFailureSpec extends SpecBase(exporterPort = ExporterPorts.MetricsEvictionOnFailureSpec) { +import scala.concurrent.duration.DurationInt + +class MetricsEvictionOnFailureSpec extends LocalSpecBase(exporterPort = ExporterPorts.MetricsEvictionOnFailureSpec) with LagSim { + implicit val patience: PatienceConfig = PatienceConfig(30.seconds, 2.second) + "kafka lag exporter" should { "not report metrics for group members or partitions after a failure" in { val group = createGroupId(1) diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala index 7ff7f8eb..d8f28762 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala @@ -7,9 +7,12 @@ package com.lightbend.kafkalagexporter.integration import com.lightbend.kafkalagexporter.Metrics._ +import scala.concurrent.duration.DurationInt import scala.jdk.CollectionConverters._ -class MetricsEvictionSpec extends SpecBase(exporterPort = ExporterPorts.MetricsEvictionSpec) { +class MetricsEvictionSpec extends LocalSpecBase(exporterPort = ExporterPorts.MetricsEvictionSpec) with LagSim { + implicit val patience: PatienceConfig = PatienceConfig(30.seconds, 2.second) + "kafka lag exporter" should { "not report metrics for group members or partitions that no longer exist" in { val group = createGroupId(1) diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala deleted file mode 100644 index 949490e2..00000000 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeIntegrationSpec.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (C) 2018-2022 Lightbend Inc. - * Copyright (C) 2022 Sean Glover - */ - -package com.lightbend.kafkalagexporter.integration - -import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped -import com.lightbend.kafkalagexporter.Metrics.{EarliestOffsetMetric, LastGroupOffsetMetric, LatestOffsetMetric, MaxGroupOffsetLagMetric, MaxGroupTimeLagMetric, OffsetLagMetric, TimeLagMetric} -import org.scalactic.source.Position -import org.scalatest.concurrent.PatienceConfiguration -import org.scalatest.time._ - -import scala.concurrent.duration.DurationInt - -class MinikubeIntegrationSpec extends MinikubeSpecBase { - - override def beforeAll(): Unit = { - setUp() - install() - waitForExporterService() - } - - override def afterAll(): Unit = { - cleanUp() - uninstall() - } - - "kafka lag exporter on minikube" should { - "install" in { - succeed - } - - "report lag" in { - val topic = createTopic() - println(s"created $topic") - succeed - } - - val group = createGroupId(1) - val partition = "0" - val clusterName = "default" - - "reports offset-based lag metrics" in { - assertAllStagesStopped { - val topic = createTopic(1, 1, 1) - - val offsetsToCommit = 5 - val totalOffsets = 10 - - val rules = List( - Rule.create(LatestOffsetMetric, (actual: String) => actual shouldBe (totalOffsets + 1).toDouble.toString, clusterName, topic, partition), - Rule.create(EarliestOffsetMetric, (actual: String) => actual shouldBe 0.toDouble.toString, clusterName, topic, partition), - Rule.create(LastGroupOffsetMetric, (actual: String) => actual shouldBe offsetsToCommit.toDouble.toString, clusterName, group, topic, partition), - Rule.create(OffsetLagMetric, (actual: String) => actual shouldBe (offsetsToCommit + 1).toDouble.toString, clusterName, group, topic, partition), - // TODO: update test so we can assert actual lag in time. keep producer running for more than two polling cycles. - Rule.create(TimeLagMetric, (_: String) => (), clusterName, group, topic, partition), - Rule.create(MaxGroupOffsetLagMetric, (actual: String) => actual shouldBe (offsetsToCommit + 1).toDouble.toString, clusterName, group), - Rule.create(MaxGroupTimeLagMetric, (_: String) => (), clusterName, group) - ) - - val simulator = new LagSimulator(topic, group) - simulator.produceElements(totalOffsets) - simulator.consumeElements(offsetsToCommit) - - eventually(scrapeAndAssert(exporterHostPort, "Assert offset-based metrics", rules: _*)) - - simulator.shutdown() - } - } - } -} diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala b/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala index fde1ea03..a682ca4c 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala @@ -1,11 +1,12 @@ +/* + * Copyright (C) 2018-2022 Lightbend Inc. + * Copyright (C) 2022 Sean Glover + */ + package com.lightbend.kafkalagexporter.integration -import akka.actor.typed.ActorSystem -import akka.kafka.testkit.KafkaTestkitTestcontainersSettings -import akka.kafka.testkit.scaladsl.{KafkaSpec, ScalatestKafkaSpec, TestcontainersKafkaPerClassLike} -import com.lightbend.kafkalagexporter.{KafkaClusterManager, MainApp} -import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import akka.kafka.testkit.scaladsl.KafkaSpec +import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.scalatest.matchers.should.Matchers import org.scalatest.time._ @@ -13,52 +14,36 @@ import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.{Logger, LoggerFactory} import scala.concurrent.duration._ +import scala.sys.process._ import scala.util.Random -import sys.process._ -abstract class MinikubeSpecBase - extends KafkaSpec(-1) +abstract class MinikubeSpecBase extends KafkaSpec(-1) with AnyWordSpecLike - with BeforeAndAfterEach with BeforeAndAfterAll with Matchers - with ScalaFutures with Eventually - with PrometheusUtils - with LagSim { + with PrometheusUtils { private[this] val log: Logger = LoggerFactory.getLogger(getClass) - - implicit val patience: PatienceConfig = PatienceConfig(30.seconds, 2.seconds) - - val rnd: String = Random.alphanumeric.take(5).mkString - - override def createGroupId(suffix: Int): String = s"group-$suffix-$rnd" - override def createTopicName(suffix: Int): String = s"topic-$suffix-$rnd" - override val bootstrapServers: String = getNodePortForService("strimzi-kafka-cluster-kafka-external-bootstrap") - def exporterHostPort: String = getNodePortForService("kafka-lag-exporter-service") + override def beforeAll(): Unit = { + setUp() + install() + } - def waitForExporterService(): Unit = { - val exp = timeout(Span(90, Seconds)) - eventually(exp)(scrape(exporterHostPort)) + override def afterAll(): Unit = { + cleanUp() + uninstall() } + def rnd: String = Random.alphanumeric.take(5).mkString + override def createGroupId(suffix: Int): String = s"group-$suffix-$rnd" + override def createTopicName(suffix: Int): String = s"topic-$suffix-$rnd" -// private val topicCounter = new AtomicInteger() -// private def nextNumber(): Int = topicCounter.incrementAndGet() -// def createTopicName(suffix: Int): String = s"topic-$suffix-$nextNumber" -// -// override def createTopic(): String = createTopic(0, 1, 1) -// -// def createTopic(suffix: Int, partitions: Int, replication: Int): String = { -// val topicName = createTopicName(suffix) -// s"cat strimzi-topic-template.yaml | sed \"s/{{TOPIC_NAME}}/$topicName/g;s/{{PARTITIONS}}/$partitions/g;s/{{REPLICAS}}/$replication/g\" | kubectl apply -f -" ! -// topicName -// } + def exporterHostPort: String = getNodePortForService("kafka-lag-exporter-service") def getNodePortForService(service: String): String = { val nodePort = s"./examples/k8s/scripts/get_nodeport_for_service.sh $service".!!.trim From 1472827863775364316b278be3ca05b6faeeb21e Mon Sep 17 00:00:00 2001 From: Sean Glover Date: Sun, 3 Apr 2022 20:40:44 -0400 Subject: [PATCH 4/6] ci --- .github/workflows/ci.yaml | 42 +++++++++++++++++-- build.sbt | 3 ++ examples/k8s/strimzi-kafka-cluster.yaml | 4 +- examples/k8s/strimzi-topic-template.yaml | 12 ------ .../integration/ExporterPorts.scala | 0 .../integration/IntegrationSpec.scala | 6 +-- .../kafkalagexporter/integration/LagSim.scala | 0 .../integration/PrometheusUtils.scala | 0 .../minikube/MiniKubeIntegrationSpec.scala | 9 ++++ .../minikube}/MinikubeSpecBase.scala | 7 ++-- .../testcontainers/LocalIntegrationSpec.scala | 5 +++ .../testcontainers}/LocalSpecBase.scala | 6 +-- .../MetricsEvictionOnFailureSpec.scala | 3 +- .../testcontainers}/MetricsEvictionSpec.scala | 3 +- 14 files changed, 68 insertions(+), 32 deletions(-) delete mode 100644 examples/k8s/strimzi-topic-template.yaml rename src/{test => it}/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala (100%) rename src/{test => it}/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala (93%) rename src/{test => it}/scala/com/lightbend/kafkalagexporter/integration/LagSim.scala (100%) rename src/{test => it}/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala (100%) create mode 100644 src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MiniKubeIntegrationSpec.scala rename src/{test/scala/com/lightbend/kafkalagexporter/integration => it/scala/com/lightbend/kafkalagexporter/integration/minikube}/MinikubeSpecBase.scala (90%) create mode 100644 src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalIntegrationSpec.scala rename src/{test/scala/com/lightbend/kafkalagexporter/integration => it/scala/com/lightbend/kafkalagexporter/integration/testcontainers}/LocalSpecBase.scala (93%) rename src/{test/scala/com/lightbend/kafkalagexporter/integration => it/scala/com/lightbend/kafkalagexporter/integration/testcontainers}/MetricsEvictionOnFailureSpec.scala (93%) rename src/{test/scala/com/lightbend/kafkalagexporter/integration => it/scala/com/lightbend/kafkalagexporter/integration/testcontainers}/MetricsEvictionSpec.scala (93%) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 2696711c..58a1f0f7 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -42,8 +42,41 @@ jobs: - name: Print logs on failure if: ${{ failure() }} run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \; + + local-test: + name: Integration tests using Testcontainers + runs-on: ubuntu-18.04 + strategy: + fail-fast: false + steps: + - name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Checkout GitHub merge + if: github.event.pull_request + run: |- + git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch + git checkout scratch + + - name: Setup Scala with Java ${{ matrix.java-version }} + uses: olafurpg/setup-scala@v13 + with: + java-version: openjdk@1.17 + + - name: Cache Coursier cache + uses: coursier/cache-action@v6 + + - name: Run tests + run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.testcontainers.* + + - name: Print logs on failure + if: ${{ failure() }} + run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \; + minikube-test: - name: End to end test on Minikube + name: Integration tests using Minikube runs-on: ubuntu-18.04 strategy: fail-fast: false @@ -78,9 +111,12 @@ jobs: helm repo add strimzi https://strimzi.io/charts/ helm install strimzi strimzi/strimzi-kafka-operator - kubectl apply -f ./examples/strimzi-kafka-cluster.yaml + kubectl apply -f ./examples/k8s/strimzi-kafka-cluster.yaml + kubectl wait kafka/strimzi-kafka-cluster --for=condition=Ready --timeout=300s + + - name: Run tests + run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.minikube.* - sbt testOnly *.MinikubeIntegrationSpec - name: Print logs on failure if: ${{ failure() }} run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \; diff --git a/build.sbt b/build.sbt index 041f9849..a675265d 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,9 @@ lazy val kafkaLagExporter = .enablePlugins(AutomateHeaderPlugin) .enablePlugins(JavaAppPackaging) .enablePlugins(DockerPlugin) + .configs(IntegrationTest.extend(Test)) + .settings(Defaults.itSettings) + .settings(headerSettings(IntegrationTest)) .settings(commonSettings) .settings( name := "kafka-lag-exporter", diff --git a/examples/k8s/strimzi-kafka-cluster.yaml b/examples/k8s/strimzi-kafka-cluster.yaml index 87742803..9a41a811 100644 --- a/examples/k8s/strimzi-kafka-cluster.yaml +++ b/examples/k8s/strimzi-kafka-cluster.yaml @@ -32,6 +32,4 @@ spec: replicas: 3 storage: type: ephemeral - entityOperator: - topicOperator: {} - userOperator: {} \ No newline at end of file + entityOperator: {} diff --git a/examples/k8s/strimzi-topic-template.yaml b/examples/k8s/strimzi-topic-template.yaml deleted file mode 100644 index 1baaf5c8..00000000 --- a/examples/k8s/strimzi-topic-template.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: kafka.strimzi.io/v1beta2 -kind: KafkaTopic -metadata: - name: {{TOPIC_NAME}} - labels: - strimzi.io/cluster: strimzi-kafka-cluster -spec: - partitions: {{PARTITIONS}} - replicas: {{REPLICAS}} - config: - retention.ms: 7200000 - segment.bytes: 1073741824 diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala similarity index 100% rename from src/test/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala rename to src/it/scala/com/lightbend/kafkalagexporter/integration/ExporterPorts.scala diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala similarity index 93% rename from src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala rename to src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala index d078426a..279ef1e1 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala @@ -9,6 +9,7 @@ import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.kafka.testkit.scaladsl.KafkaSpec import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import com.lightbend.kafkalagexporter.Metrics._ +import com.lightbend.kafkalagexporter.integration.minikube.MinikubeSpecBase import org.scalatest.BeforeAndAfterEach import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.scalatest.matchers.should.Matchers @@ -17,11 +18,6 @@ import org.scalatest.wordspec.AnyWordSpecLike import scala.concurrent.duration.DurationInt import scala.util.Try -class LocalIntegrationSpec extends LocalSpecBase(exporterPort = ExporterPorts.LocalIntegrationSpec) with IntegrationSpec -class MiniKubeIntegrationSpec extends MinikubeSpecBase with IntegrationSpec { - override implicit val patience: PatienceConfig = PatienceConfig(90.seconds, 2.second) -} - trait IntegrationSpec extends KafkaSpec with AnyWordSpecLike with BeforeAndAfterEach diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/LagSim.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/LagSim.scala similarity index 100% rename from src/test/scala/com/lightbend/kafkalagexporter/integration/LagSim.scala rename to src/it/scala/com/lightbend/kafkalagexporter/integration/LagSim.scala diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala similarity index 100% rename from src/test/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala rename to src/it/scala/com/lightbend/kafkalagexporter/integration/PrometheusUtils.scala diff --git a/src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MiniKubeIntegrationSpec.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MiniKubeIntegrationSpec.scala new file mode 100644 index 00000000..28ce47aa --- /dev/null +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MiniKubeIntegrationSpec.scala @@ -0,0 +1,9 @@ +package com.lightbend.kafkalagexporter.integration.minikube + +import com.lightbend.kafkalagexporter.integration.IntegrationSpec + +import scala.concurrent.duration.DurationInt + +class MiniKubeIntegrationSpec extends MinikubeSpecBase with IntegrationSpec { + override implicit val patience: PatienceConfig = PatienceConfig(90.seconds, 2.second) +} diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MinikubeSpecBase.scala similarity index 90% rename from src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala rename to src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MinikubeSpecBase.scala index a682ca4c..246525ab 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MinikubeSpecBase.scala +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/minikube/MinikubeSpecBase.scala @@ -3,17 +3,16 @@ * Copyright (C) 2022 Sean Glover */ -package com.lightbend.kafkalagexporter.integration +package com.lightbend.kafkalagexporter.integration.minikube import akka.kafka.testkit.scaladsl.KafkaSpec +import com.lightbend.kafkalagexporter.integration.PrometheusUtils import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers -import org.scalatest.time._ import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.{Logger, LoggerFactory} -import scala.concurrent.duration._ import scala.sys.process._ import scala.util.Random diff --git a/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalIntegrationSpec.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalIntegrationSpec.scala new file mode 100644 index 00000000..9285ee1f --- /dev/null +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalIntegrationSpec.scala @@ -0,0 +1,5 @@ +package com.lightbend.kafkalagexporter.integration.testcontainers + +import com.lightbend.kafkalagexporter.integration.{ExporterPorts, IntegrationSpec} + +class LocalIntegrationSpec extends LocalSpecBase(exporterPort = ExporterPorts.LocalIntegrationSpec) with IntegrationSpec diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/LocalSpecBase.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalSpecBase.scala similarity index 93% rename from src/test/scala/com/lightbend/kafkalagexporter/integration/LocalSpecBase.scala rename to src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalSpecBase.scala index 3451bf02..ed56c7d7 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/LocalSpecBase.scala +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalSpecBase.scala @@ -3,16 +3,16 @@ * Copyright (C) 2022 Sean Glover */ -package com.lightbend.kafkalagexporter.integration +package com.lightbend.kafkalagexporter.integration.testcontainers import akka.actor.typed.ActorSystem import akka.kafka.testkit.KafkaTestkitTestcontainersSettings import akka.kafka.testkit.scaladsl.{ScalatestKafkaSpec, TestcontainersKafkaPerClassLike} +import com.lightbend.kafkalagexporter.integration.PrometheusUtils import com.lightbend.kafkalagexporter.{KafkaClusterManager, MainApp} import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.BeforeAndAfterEach -import org.scalatest.concurrent.{Eventually, ScalaFutures} -import org.scalatest.matchers.should.Matchers +import org.scalatest.concurrent.Eventually import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.{Logger, LoggerFactory} diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/MetricsEvictionOnFailureSpec.scala similarity index 93% rename from src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala rename to src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/MetricsEvictionOnFailureSpec.scala index 6741f003..17384973 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionOnFailureSpec.scala +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/MetricsEvictionOnFailureSpec.scala @@ -3,9 +3,10 @@ * Copyright (C) 2022 Sean Glover */ -package com.lightbend.kafkalagexporter.integration +package com.lightbend.kafkalagexporter.integration.testcontainers import com.lightbend.kafkalagexporter.Metrics._ +import com.lightbend.kafkalagexporter.integration.{ExporterPorts, LagSim} import scala.concurrent.duration.DurationInt diff --git a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/MetricsEvictionSpec.scala similarity index 93% rename from src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala rename to src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/MetricsEvictionSpec.scala index d8f28762..df7a0791 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/integration/MetricsEvictionSpec.scala +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/MetricsEvictionSpec.scala @@ -3,9 +3,10 @@ * Copyright (C) 2022 Sean Glover */ -package com.lightbend.kafkalagexporter.integration +package com.lightbend.kafkalagexporter.integration.testcontainers import com.lightbend.kafkalagexporter.Metrics._ +import com.lightbend.kafkalagexporter.integration.{ExporterPorts, LagSim} import scala.concurrent.duration.DurationInt import scala.jdk.CollectionConverters._ From 6324300d26c17423ca66c701e01f795d2b32ec31 Mon Sep 17 00:00:00 2001 From: Sean Glover Date: Sun, 3 Apr 2022 20:48:52 -0400 Subject: [PATCH 5/6] cleanup --- .github/workflows/ci.yaml | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 58a1f0f7..cae2094d 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -42,7 +42,7 @@ jobs: - name: Print logs on failure if: ${{ failure() }} run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \; - + local-test: name: Integration tests using Testcontainers runs-on: ubuntu-18.04 @@ -69,7 +69,7 @@ jobs: uses: coursier/cache-action@v6 - name: Run tests - run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.testcontainers.* + run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.testcontainers.*" - name: Print logs on failure if: ${{ failure() }} @@ -107,15 +107,20 @@ jobs: if: github.event.pull_request run: |- sbt docker:publishLocal + echo Minikube image load minikube image load seglo/kafka-lag-exporter:latest + echo Install Strimzi and Kafka cluster helm repo add strimzi https://strimzi.io/charts/ helm install strimzi strimzi/strimzi-kafka-operator kubectl apply -f ./examples/k8s/strimzi-kafka-cluster.yaml kubectl wait kafka/strimzi-kafka-cluster --for=condition=Ready --timeout=300s + echo Running pods + kubectl get po -A + - name: Run tests - run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.minikube.* + run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.minikube.*" - name: Print logs on failure if: ${{ failure() }} From b01ae8ff4c9ac6821d3ed8aab89a68b6d152f24f Mon Sep 17 00:00:00 2001 From: Sean Glover Date: Sun, 3 Apr 2022 21:04:02 -0400 Subject: [PATCH 6/6] cleanup --- .github/workflows/ci.yaml | 7 +++---- .../kafkalagexporter/integration/IntegrationSpec.scala | 5 ++--- .../integration/testcontainers/LocalSpecBase.scala | 3 --- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index cae2094d..83d702b0 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -9,7 +9,6 @@ on: - release-* tags-ignore: [ v.* ] - jobs: test: name: Build and Test @@ -107,16 +106,16 @@ jobs: if: github.event.pull_request run: |- sbt docker:publishLocal - echo Minikube image load + echo Minikube image loading.. minikube image load seglo/kafka-lag-exporter:latest - echo Install Strimzi and Kafka cluster + echo Install Strimzi and Kafka cluster.. helm repo add strimzi https://strimzi.io/charts/ helm install strimzi strimzi/strimzi-kafka-operator kubectl apply -f ./examples/k8s/strimzi-kafka-cluster.yaml kubectl wait kafka/strimzi-kafka-cluster --for=condition=Ready --timeout=300s - echo Running pods + echo Running pods.. kubectl get po -A - name: Run tests diff --git a/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala index 279ef1e1..38aa34ed 100644 --- a/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/IntegrationSpec.scala @@ -9,12 +9,11 @@ import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.kafka.testkit.scaladsl.KafkaSpec import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import com.lightbend.kafkalagexporter.Metrics._ -import com.lightbend.kafkalagexporter.integration.minikube.MinikubeSpecBase import org.scalatest.BeforeAndAfterEach -import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike - import scala.concurrent.duration.DurationInt import scala.util.Try diff --git a/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalSpecBase.scala b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalSpecBase.scala index ed56c7d7..6a068d85 100644 --- a/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalSpecBase.scala +++ b/src/it/scala/com/lightbend/kafkalagexporter/integration/testcontainers/LocalSpecBase.scala @@ -14,7 +14,6 @@ import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.BeforeAndAfterEach import org.scalatest.concurrent.Eventually import org.scalatest.wordspec.AnyWordSpecLike -import org.slf4j.{Logger, LoggerFactory} import scala.concurrent.Await import scala.concurrent.duration._ @@ -27,8 +26,6 @@ abstract class LocalSpecBase(val exporterPort: Int) with Eventually with PrometheusUtils { - private[this] val log: Logger = LoggerFactory.getLogger(getClass) - override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system) .withConfigureKafka { brokerContainers => brokerContainers.foreach(_.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"))