Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Minikube integration tests #346

Merged
merged 6 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ A clear and concise description of what the bug is.

**To Reproduce**
Provide logs and application configuration.
See the [Troubleshooting](https://github.com/lightbend/kafka-lag-exporter#troubleshooting) section of the `README` for instructions on how to enable `DEBUG` logging.
See the [Troubleshooting](https://github.com/seglo/kafka-lag-exporter#troubleshooting) section of the `README` for instructions on how to enable `DEBUG` logging.

**Environment**
- Version
Expand Down
84 changes: 83 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ on:
- release-*
tags-ignore: [ v.* ]


jobs:
test:
name: Build and Test
Expand Down Expand Up @@ -43,6 +42,89 @@ jobs:
if: ${{ failure() }}
run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;

local-test:
name: Integration tests using Testcontainers
runs-on: ubuntu-18.04
strategy:
fail-fast: false
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Checkout GitHub merge
if: github.event.pull_request
run: |-
git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch
git checkout scratch

- name: Setup Scala with Java ${{ matrix.java-version }}
uses: olafurpg/setup-scala@v13
with:
java-version: [email protected]

- name: Cache Coursier cache
uses: coursier/cache-action@v6

- name: Run tests
run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.testcontainers.*"

- name: Print logs on failure
if: ${{ failure() }}
run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;

minikube-test:
name: Integration tests using Minikube
runs-on: ubuntu-18.04
strategy:
fail-fast: false
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Checkout GitHub merge
if: github.event.pull_request
run: |-
git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch
git checkout scratch

- name: Setup Scala with Java ${{ matrix.java-version }}
uses: olafurpg/setup-scala@v13
with:
java-version: [email protected]

- name: Cache Coursier cache
uses: coursier/cache-action@v6

- name: Start minikube
uses: medyagh/setup-minikube@master

- name: Setup Minikube
if: github.event.pull_request
run: |-
sbt docker:publishLocal
echo Minikube image loading..
minikube image load seglo/kafka-lag-exporter:latest

echo Install Strimzi and Kafka cluster..
helm repo add strimzi https://strimzi.io/charts/
helm install strimzi strimzi/strimzi-kafka-operator
kubectl apply -f ./examples/k8s/strimzi-kafka-cluster.yaml
kubectl wait kafka/strimzi-kafka-cluster --for=condition=Ready --timeout=300s

echo Running pods..
kubectl get po -A

- name: Run tests
run: sbt "IntegrationTest/testOnly com.lightbend.kafkalagexporter.integration.minikube.*"

- name: Print logs on failure
if: ${{ failure() }}
run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;

build-docker:
name: Build Docker Image
runs-on: ubuntu-18.04
Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ lazy val kafkaLagExporter =
.enablePlugins(AutomateHeaderPlugin)
.enablePlugins(JavaAppPackaging)
.enablePlugins(DockerPlugin)
.configs(IntegrationTest.extend(Test))
.settings(Defaults.itSettings)
.settings(headerSettings(IntegrationTest))
.settings(commonSettings)
.settings(
name := "kafka-lag-exporter",
Expand Down
2 changes: 1 addition & 1 deletion charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ akkaLogLevel: DEBUG

## You probably won't need to change anything below this line.
image:
repository: lightbend/kafka-lag-exporter
repository: seglo/kafka-lag-exporter
tag: 0.6.9-SNAPSHOT
# If digest is set it will be used instead of tag to specify the image
# digest: sha256:0f6387aa011e6eb7952ea211ac139bf8613ad3ef6954a1a5d910676d293bd610
Expand Down
13 changes: 13 additions & 0 deletions examples/k8s/kafka-lag-exporter-helm-values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
clusters:
- name: "default"
bootstrapBrokers: "strimzi-kafka-cluster-kafka-bootstrap.default.svc.cluster.local:9092"
pollIntervalSeconds: 1
service:
type: NodePort
rootLogLevel: INFO
kafkaLagExporterLogLevel: DEBUG
kafkaLogLevel: INFO
akkaLogLevel: DEBUG
image:
tag: latest
pullPolicy: Never
34 changes: 34 additions & 0 deletions examples/k8s/minikube-coredns-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# coredns doesn't like references to nameservers on local loop back
# fix replaces `forward` with generic config pointing to google dns
# https://github.com/kubernetes/minikube/issues/7512#issuecomment-664348459
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns
namespace: kube-system
data:
Corefile: |
.:53 {
errors
health {
lameduck 5s
}
ready
kubernetes cluster.local in-addr.arpa ip6.arpa {
pods insecure
fallthrough in-addr.arpa ip6.arpa
ttl 30
}
prometheus :9153
hosts {
192.168.49.1 host.minikube.internal
fallthrough
}
cache 30
loop
reload
loadbalance
forward . 8.8.8.8 8.8.4.4
}


5 changes: 5 additions & 0 deletions examples/k8s/scripts/get_nodeport_for_service.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash
export SERVICE=$1
NODE_PORT=$(kubectl get service ${SERVICE} -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}') &&
NODE_IP=$(minikube ip) &&
echo ${NODE_IP}:${NODE_PORT}
35 changes: 35 additions & 0 deletions examples/k8s/strimzi-kafka-cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: strimzi-kafka-cluster
spec:
kafka:
version: 3.0.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: nodeport
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.0"
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator: {}
12 changes: 12 additions & 0 deletions examples/k8s/strimzi-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: my-topic
labels:
strimzi.io/cluster: strimzi-kafka-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Copyright (C) 2020-2022 Sean Glover <https://seanglover.com>
package com.lightbend.kafkalagexporter.integration

object ExporterPorts {
val IntegrationSpec = 8000
val MetricsEvictionSpec = 8001
val MetricsEvictionOnFailureSpec = 8002
val LocalIntegrationSpec = 8000
val MetricsEvictionSpec = 8991
val MetricsEvictionOnFailureSpec = 8992
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,32 @@
package com.lightbend.kafkalagexporter.integration

import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.kafka.testkit.scaladsl.KafkaSpec
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import com.lightbend.kafkalagexporter.Metrics._

import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.duration.DurationInt
import scala.util.Try

class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationSpec) {
trait IntegrationSpec extends KafkaSpec
with AnyWordSpecLike
with BeforeAndAfterEach
with Matchers
with ScalaFutures
with Eventually
with PrometheusUtils
with LagSim {

def exporterHostPort: String

implicit val patience: PatienceConfig = PatienceConfig(30.seconds, 2.second)

"kafka lag exporter" should {
val clusterName = "default"
val group = createGroupId(1)
val partition = "0"

Expand All @@ -39,7 +57,7 @@ class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationS
simulator.produceElements(totalOffsets)
simulator.consumeElements(offsetsToCommit)

eventually(scrapeAndAssert(exporterPort, "Assert offset-based metrics", rules: _*))
eventually(scrapeAndAssert(exporterHostPort, "Assert offset-based metrics", rules: _*))

simulator.shutdown()
}
Expand Down Expand Up @@ -68,7 +86,7 @@ class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationS
val isIncreasingRule = Rule.create(TimeLagMetric, isIncreasing, clusterName, group, topic, partition)

(1 to 3).foreach { i =>
eventually(scrapeAndAssert(exporterPort, s"Assert lag in time metrics are increasing ($i)", isIncreasingRule))
eventually(scrapeAndAssert(exporterHostPort, s"Assert lag in time metrics are increasing ($i)", isIncreasingRule))
}

testKit.stop(simulatorActor)
Expand All @@ -77,7 +95,7 @@ class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationS
"report poll time metric greater than 0 ms" in {
assertAllStagesStopped {
val rule = Rule.create(PollTimeMetric, (actual: String) => actual.toDouble should be > 0d, clusterName)
eventually(scrapeAndAssert(exporterPort, "Assert poll time metric", rule))
eventually(scrapeAndAssert(exporterHostPort, "Assert poll time metric", rule))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,29 @@

package com.lightbend.kafkalagexporter.integration

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.unmarshalling.Unmarshal
import com.lightbend.kafkalagexporter.MetricsSink.GaugeDefinition
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.slf4j.Logger

import scala.concurrent.{ExecutionContext, Future}
import scala.util.matching.Regex

/**
* Test utilities to parse the Prometheus health endpoint to assert metrics in integration tests.
*/
trait PrometheusUtils { self: SpecBase =>
trait PrometheusUtils extends ScalaFutures with Matchers {
implicit val system: ActorSystem
val log: Logger

private val http = Http()

def scrape(port: Int, rules: Rule*)(implicit ec: ExecutionContext): Future[List[Result]] = {
val request = HttpRequest(uri = s"http://localhost:$port/metrics")
def scrape(hostPort: String, rules: Rule*)(implicit ec: ExecutionContext): Future[List[Result]] = {
val request = HttpRequest(uri = s"http://$hostPort/metrics")
for {
HttpResponse(StatusCodes.OK, _, entity, _) <- http.singleRequest(request)
body <- Unmarshal(entity).to[String]
Expand All @@ -34,18 +41,18 @@ trait PrometheusUtils { self: SpecBase =>
}
}

def scrapeAndAssert(port: Int, description: String, rules: Rule*)
def scrapeAndAssert(hostPort: String, description: String, rules: Rule*)
(implicit ec: ExecutionContext): Unit =
scrapeAndAssert(port, description, _.assert(), rules: _*)
scrapeAndAssert(hostPort, description, _.assert(), rules: _*)

def scrapeAndAssertDne(port: Int, description: String, rules: Rule*)
def scrapeAndAssertDne(hostPort: String, description: String, rules: Rule*)
(implicit ec: ExecutionContext): Unit =
scrapeAndAssert(port, description, _.assertDne(), rules: _*)
scrapeAndAssert(hostPort, description, _.assertDne(), rules: _*)


private def scrapeAndAssert(port: Int, description: String, resultF: Result => Unit, rules: Rule*)
private def scrapeAndAssert(hostPort: String, description: String, resultF: Result => Unit, rules: Rule*)
(implicit ec: ExecutionContext): Unit = {
val results = scrape(port, rules: _*).futureValue
val results = scrape(hostPort, rules: _*).futureValue
log.debug("Start: {}", description)
results.foreach(resultF)
log.debug("End (Successful): {}", description)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.lightbend.kafkalagexporter.integration.minikube

import com.lightbend.kafkalagexporter.integration.IntegrationSpec

import scala.concurrent.duration.DurationInt

class MiniKubeIntegrationSpec extends MinikubeSpecBase with IntegrationSpec {
override implicit val patience: PatienceConfig = PatienceConfig(90.seconds, 2.second)
}
Loading