Skip to content

Commit

Permalink
Minikube integration tests (seglo#346)
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo authored and JohnPreston committed May 4, 2022
1 parent be5fec5 commit ac49cdd
Show file tree
Hide file tree
Showing 20 changed files with 326 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ A clear and concise description of what the bug is.

**To Reproduce**
Provide logs and application configuration.
See the [Troubleshooting](https://github.com/lightbend/kafka-lag-exporter#troubleshooting) section of the `README` for instructions on how to enable `DEBUG` logging.
See the [Troubleshooting](https://github.com/seglo/kafka-lag-exporter#troubleshooting) section of the `README` for instructions on how to enable `DEBUG` logging.

**Environment**
- Version
Expand Down
84 changes: 83 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ on:
- release-*
tags-ignore: [ v.* ]


jobs:
test:
name: Build and Test
Expand Down Expand Up @@ -43,6 +42,89 @@ jobs:
if: ${{ failure() }}
run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;

local-test:
name: Integration tests using Testcontainers
runs-on: ubuntu-18.04
strategy:
fail-fast: false
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Checkout GitHub merge
if: github.event.pull_request
run: |-
git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch
git checkout scratch
- name: Setup Scala with Java ${{ matrix.java-version }}
uses: olafurpg/setup-scala@v13
with:
java-version: [email protected]

- name: Cache Coursier cache
uses: coursier/cache-action@v6

- name: Run tests
run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.testcontainers.*"

- name: Print logs on failure
if: ${{ failure() }}
run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;

minikube-test:
name: Integration tests using Minikube
runs-on: ubuntu-18.04
strategy:
fail-fast: false
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Checkout GitHub merge
if: github.event.pull_request
run: |-
git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch
git checkout scratch
- name: Setup Scala with Java ${{ matrix.java-version }}
uses: olafurpg/setup-scala@v13
with:
java-version: [email protected]

- name: Cache Coursier cache
uses: coursier/cache-action@v6

- name: Start minikube
uses: medyagh/setup-minikube@master

- name: Setup Minikube
if: github.event.pull_request
run: |-
sbt docker:publishLocal
echo Minikube image loading..
minikube image load seglo/kafka-lag-exporter:latest
echo Install Strimzi and Kafka cluster..
helm repo add strimzi https://strimzi.io/charts/
helm install strimzi strimzi/strimzi-kafka-operator
kubectl apply -f ./examples/k8s/strimzi-kafka-cluster.yaml
kubectl wait kafka/strimzi-kafka-cluster --for=condition=Ready --timeout=300s
echo Running pods..
kubectl get po -A
- name: Run tests
run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.minikube.*"

- name: Print logs on failure
if: ${{ failure() }}
run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;

build-docker:
name: Build Docker Image
runs-on: ubuntu-18.04
Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ lazy val kafkaLagExporter =
.enablePlugins(AutomateHeaderPlugin)
.enablePlugins(JavaAppPackaging)
.enablePlugins(DockerPlugin)
.configs(IntegrationTest.extend(Test))
.settings(Defaults.itSettings)
.settings(headerSettings(IntegrationTest))
.settings(commonSettings)
.settings(
name := "kafka-lag-exporter",
Expand Down
2 changes: 1 addition & 1 deletion charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ akkaLogLevel: DEBUG

## You probably won't need to change anything below this line.
image:
repository: lightbend/kafka-lag-exporter
repository: seglo/kafka-lag-exporter
tag: 0.6.9-SNAPSHOT
# If digest is set it will be used instead of tag to specify the image
# digest: sha256:0f6387aa011e6eb7952ea211ac139bf8613ad3ef6954a1a5d910676d293bd610
Expand Down
13 changes: 13 additions & 0 deletions examples/k8s/kafka-lag-exporter-helm-values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
clusters:
- name: "default"
bootstrapBrokers: "strimzi-kafka-cluster-kafka-bootstrap.default.svc.cluster.local:9092"
pollIntervalSeconds: 1
service:
type: NodePort
rootLogLevel: INFO
kafkaLagExporterLogLevel: DEBUG
kafkaLogLevel: INFO
akkaLogLevel: DEBUG
image:
tag: latest
pullPolicy: Never
34 changes: 34 additions & 0 deletions examples/k8s/minikube-coredns-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# coredns doesn't like references to nameservers on local loop back
# fix replaces `forward` with generic config pointing to google dns
# https://github.com/kubernetes/minikube/issues/7512#issuecomment-664348459
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns
namespace: kube-system
data:
Corefile: |
.:53 {
errors
health {
lameduck 5s
}
ready
kubernetes cluster.local in-addr.arpa ip6.arpa {
pods insecure
fallthrough in-addr.arpa ip6.arpa
ttl 30
}
prometheus :9153
hosts {
192.168.49.1 host.minikube.internal
fallthrough
}
cache 30
loop
reload
loadbalance
forward . 8.8.8.8 8.8.4.4
}
5 changes: 5 additions & 0 deletions examples/k8s/scripts/get_nodeport_for_service.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash
export SERVICE=$1
NODE_PORT=$(kubectl get service ${SERVICE} -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}') &&
NODE_IP=$(minikube ip) &&
echo ${NODE_IP}:${NODE_PORT}
35 changes: 35 additions & 0 deletions examples/k8s/strimzi-kafka-cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: strimzi-kafka-cluster
spec:
kafka:
version: 3.0.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: nodeport
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.0"
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator: {}
12 changes: 12 additions & 0 deletions examples/k8s/strimzi-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: my-topic
labels:
strimzi.io/cluster: strimzi-kafka-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Copyright (C) 2020-2022 Sean Glover <https://seanglover.com>
package com.lightbend.kafkalagexporter.integration

object ExporterPorts {
val IntegrationSpec = 8000
val MetricsEvictionSpec = 8001
val MetricsEvictionOnFailureSpec = 8002
val LocalIntegrationSpec = 8000
val MetricsEvictionSpec = 8991
val MetricsEvictionOnFailureSpec = 8992
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,32 @@
package com.lightbend.kafkalagexporter.integration

import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.kafka.testkit.scaladsl.KafkaSpec
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import com.lightbend.kafkalagexporter.Metrics._

import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.duration.DurationInt
import scala.util.Try

class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationSpec) {
trait IntegrationSpec extends KafkaSpec
with AnyWordSpecLike
with BeforeAndAfterEach
with Matchers
with ScalaFutures
with Eventually
with PrometheusUtils
with LagSim {

def exporterHostPort: String

implicit val patience: PatienceConfig = PatienceConfig(30.seconds, 2.second)

"kafka lag exporter" should {
val clusterName = "default"
val group = createGroupId(1)
val partition = "0"

Expand All @@ -39,7 +57,7 @@ class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationS
simulator.produceElements(totalOffsets)
simulator.consumeElements(offsetsToCommit)

eventually(scrapeAndAssert(exporterPort, "Assert offset-based metrics", rules: _*))
eventually(scrapeAndAssert(exporterHostPort, "Assert offset-based metrics", rules: _*))

simulator.shutdown()
}
Expand Down Expand Up @@ -68,7 +86,7 @@ class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationS
val isIncreasingRule = Rule.create(TimeLagMetric, isIncreasing, clusterName, group, topic, partition)

(1 to 3).foreach { i =>
eventually(scrapeAndAssert(exporterPort, s"Assert lag in time metrics are increasing ($i)", isIncreasingRule))
eventually(scrapeAndAssert(exporterHostPort, s"Assert lag in time metrics are increasing ($i)", isIncreasingRule))
}

testKit.stop(simulatorActor)
Expand All @@ -77,7 +95,7 @@ class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationS
"report poll time metric greater than 0 ms" in {
assertAllStagesStopped {
val rule = Rule.create(PollTimeMetric, (actual: String) => actual.toDouble should be > 0d, clusterName)
eventually(scrapeAndAssert(exporterPort, "Assert poll time metric", rule))
eventually(scrapeAndAssert(exporterHostPort, "Assert poll time metric", rule))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,29 @@

package com.lightbend.kafkalagexporter.integration

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.unmarshalling.Unmarshal
import com.lightbend.kafkalagexporter.MetricsSink.GaugeDefinition
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.slf4j.Logger

import scala.concurrent.{ExecutionContext, Future}
import scala.util.matching.Regex

/**
* Test utilities to parse the Prometheus health endpoint to assert metrics in integration tests.
*/
trait PrometheusUtils { self: SpecBase =>
trait PrometheusUtils extends ScalaFutures with Matchers {
implicit val system: ActorSystem
val log: Logger

private val http = Http()

def scrape(port: Int, rules: Rule*)(implicit ec: ExecutionContext): Future[List[Result]] = {
val request = HttpRequest(uri = s"http://localhost:$port/metrics")
def scrape(hostPort: String, rules: Rule*)(implicit ec: ExecutionContext): Future[List[Result]] = {
val request = HttpRequest(uri = s"http://$hostPort/metrics")
for {
HttpResponse(StatusCodes.OK, _, entity, _) <- http.singleRequest(request)
body <- Unmarshal(entity).to[String]
Expand All @@ -34,18 +41,18 @@ trait PrometheusUtils { self: SpecBase =>
}
}

def scrapeAndAssert(port: Int, description: String, rules: Rule*)
def scrapeAndAssert(hostPort: String, description: String, rules: Rule*)
(implicit ec: ExecutionContext): Unit =
scrapeAndAssert(port, description, _.assert(), rules: _*)
scrapeAndAssert(hostPort, description, _.assert(), rules: _*)

def scrapeAndAssertDne(port: Int, description: String, rules: Rule*)
def scrapeAndAssertDne(hostPort: String, description: String, rules: Rule*)
(implicit ec: ExecutionContext): Unit =
scrapeAndAssert(port, description, _.assertDne(), rules: _*)
scrapeAndAssert(hostPort, description, _.assertDne(), rules: _*)


private def scrapeAndAssert(port: Int, description: String, resultF: Result => Unit, rules: Rule*)
private def scrapeAndAssert(hostPort: String, description: String, resultF: Result => Unit, rules: Rule*)
(implicit ec: ExecutionContext): Unit = {
val results = scrape(port, rules: _*).futureValue
val results = scrape(hostPort, rules: _*).futureValue
log.debug("Start: {}", description)
results.foreach(resultF)
log.debug("End (Successful): {}", description)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.lightbend.kafkalagexporter.integration.minikube

import com.lightbend.kafkalagexporter.integration.IntegrationSpec

import scala.concurrent.duration.DurationInt

class MiniKubeIntegrationSpec extends MinikubeSpecBase with IntegrationSpec {
override implicit val patience: PatienceConfig = PatienceConfig(90.seconds, 2.second)
}
Loading

0 comments on commit ac49cdd

Please sign in to comment.