Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

discipline equals freedom #44

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
tests/suites/**/*-junit.xml
images/kafka-utils/**/*-junit.xml
images/kafka-utils/**/*-junit.xml
tests/.idea/
31 changes: 31 additions & 0 deletions tests/.golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
linters:
auto-fix: false
enable:
- errcheck
- goimports
- golint
- gosec
- misspell
- scopelint
- unconvert
- unparam
- interfacer
- nakedret
- gocyclo
- dupl
- goconst
- lll
run:
skip-dirs:
# autogenerated clientset by client-gen
- pkg/client
linters-settings:
errcheck:
check-type-assertions: true
lll:
line-length: 250
dupl:
threshold: 400
goimports:
# Don't use 'github.com/kudobuilder/kudo', it'll result in unreliable output!
local-prefixes: github.com/mesosphere
4 changes: 4 additions & 0 deletions tests/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

.PHONY: lint
lint:
golangci-lint run
25 changes: 15 additions & 10 deletions tests/suites/kafka_auto_tls/kafka_auto_tls_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package kafka_auto_tls
package kafkaautotls

import (
"fmt"
"testing"

"github.com/mesosphere/kudo-kafka-operator/tests/suites"

"github.com/mesosphere/kudo-kafka-operator/tests/utils"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/reporters"
. "github.com/onsi/gomega"

"github.com/mesosphere/kudo-kafka-operator/tests/utils"
)

var (
Expand All @@ -23,9 +24,9 @@ var _ = Describe("KafkaAutoTLS", func() {
Namespace: utils.String(customNamespace),
})
It("statefulset should have 1 replica with status READY", func() {
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
Expect(utils.KClient.GetStatefulSetCount(suites.DefaultKafkaStatefulSetName, customNamespace)).To(Equal(1))
})
Expand Down Expand Up @@ -57,14 +58,16 @@ var _ = Describe("KafkaAutoTLS", func() {
var _ = BeforeSuite(func() {
utils.TearDown(customNamespace)
Expect(utils.DeletePVCs("data-dir")).To(BeNil())
utils.KClient.CreateNamespace(customNamespace, false)
utils.InstallKudoOperator(customNamespace, utils.ZK_INSTANCE, utils.ZK_FRAMEWORK_DIR_ENV, map[string]string{
_, err := utils.KClient.CreateNamespace(customNamespace, false)
Expect(err).To(BeNil())
utils.InstallKudoOperator(customNamespace, utils.ZkInstance, utils.ZkFrameworkDirEnv, map[string]string{
"MEMORY": "256Mi",
"CPUS": "0.25",
"NODE_COUNT": "1",
})
utils.KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds)
utils.InstallKudoOperator(customNamespace, utils.KAFKA_INSTANCE, utils.KAFKA_FRAMEWORK_DIR_ENV, map[string]string{
err = utils.KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
utils.InstallKudoOperator(customNamespace, utils.KafkaInstance, utils.KafkaFrameworkDirEnv, map[string]string{
"BROKER_MEM": "512Mi",
"BROKER_CPUS": "0.25",
"BROKER_COUNT": "1",
Expand All @@ -73,13 +76,15 @@ var _ = BeforeSuite(func() {
"USE_AUTO_TLS_CERTIFICATE": "true",
"OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
})
utils.KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
})

var _ = AfterSuite(func() {
utils.TearDown(customNamespace)
Expect(utils.DeletePVCs("data-dir")).To(BeNil())
utils.KClient.DeleteNamespace(customNamespace)
err := utils.KClient.DeleteNamespace(customNamespace)
Expect(err).To(BeNil())
})

func TestService(t *testing.T) {
Expand Down
18 changes: 11 additions & 7 deletions tests/suites/kafka_kerberos/kafka_kerberos_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package kafka_kerberos
package kafkakerberos

import (
"fmt"
"testing"

. "github.com/mesosphere/kudo-kafka-operator/tests/suites"

"github.com/mesosphere/kudo-kafka-operator/tests/utils"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/reporters"
. "github.com/onsi/gomega"

"github.com/mesosphere/kudo-kafka-operator/tests/utils"
)

var (
Expand All @@ -30,9 +31,9 @@ var _ = Describe("KafkaTest", func() {
Expect(krb5Client.CreateKeytabSecret(utils.GetKafkaKeyTabs(1, customNamespace), "kafka", "base64-kafka-keytab-secret")).To(BeNil())
})
It("Kafka and Zookeeper statefulset should have 1 replicas with status READY", func() {
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(1))
})
Expand All @@ -46,7 +47,8 @@ var _ = Describe("KafkaTest", func() {
out, err := kafkaClient.CreateTopic(GetBrokerPodName(0), DefaultContainerName, topicName, "1")
Expect(err).To(BeNil())
Expect(out).To(ContainSubstring("Created topic"))
kafkaClient.DescribeTopic(GetBrokerPodName(0), DefaultContainerName, topicName)
_, err = kafkaClient.DescribeTopic(GetBrokerPodName(0), DefaultContainerName, topicName)
Expect(err).To(BeNil())
messageToTest := "KerberosMessage"
_, err = kafkaClient.WriteInTopic(GetBrokerPodName(0), DefaultContainerName, topicName, messageToTest)
Expect(err).To(BeNil())
Expand All @@ -61,7 +63,8 @@ var _ = Describe("KafkaTest", func() {
var _ = BeforeSuite(func() {
utils.TearDown(customNamespace)
Expect(utils.DeletePVCs("data-dir")).To(BeNil())
utils.KClient.CreateNamespace(customNamespace, false)
_, err := utils.KClient.CreateNamespace(customNamespace, false)
Expect(err).To(BeNil())
Expect(krb5Client.Deploy()).To(BeNil())
utils.SetupWithKerberos(customNamespace, false)
})
Expand All @@ -70,7 +73,8 @@ var _ = AfterSuite(func() {
utils.TearDown(customNamespace)
Expect(krb5Client.TearDown()).To(BeNil())
Expect(utils.DeletePVCs("data-dir")).To(BeNil())
utils.KClient.DeleteNamespace(customNamespace)
err := utils.KClient.DeleteNamespace(customNamespace)
Expect(err).To(BeNil())
})

func TestService(t *testing.T) {
Expand Down
18 changes: 11 additions & 7 deletions tests/suites/kafka_kerberos_tls/kafka_kerberos_tls_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package kafka_kerberos
package kafkakerberos

import (
"fmt"
"testing"

. "github.com/mesosphere/kudo-kafka-operator/tests/suites"

"github.com/mesosphere/kudo-kafka-operator/tests/utils"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/reporters"
. "github.com/onsi/gomega"

"github.com/mesosphere/kudo-kafka-operator/tests/utils"
)

var (
Expand Down Expand Up @@ -37,9 +38,9 @@ var _ = Describe("KafkaTest", func() {
Expect(krb5Client.CreateKeytabSecret(utils.GetKafkaKeyTabs(1, customNamespace), "kafka", "base64-kafka-keytab-secret")).To(BeNil())
})
It("Kafka and Zookeeper statefulset should have 1 replica each with status READY", func() {
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, zkNodeCount, utils.DefaultStatefulReadyWaitSeconds)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, zkNodeCount, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, kafkaBrokerCount, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, kafkaBrokerCount, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
})
It("verify the SSL listener", func() {
Expand Down Expand Up @@ -68,8 +69,10 @@ var _ = Describe("KafkaTest", func() {
var _ = BeforeSuite(func() {
utils.TearDown(customNamespace)
Expect(utils.DeletePVCs("data-dir")).To(BeNil())
utils.KClient.CreateNamespace(customNamespace, false)
utils.KClient.CreateTLSCertSecret(customNamespace, "kafka-tls", "Kafka")
_, err := utils.KClient.CreateNamespace(customNamespace, false)
Expect(err).To(BeNil())
_, err = utils.KClient.CreateTLSCertSecret(customNamespace, "kafka-tls", "Kafka")
Expect(err).To(BeNil())
Expect(krb5Client.Deploy()).To(BeNil())
utils.SetupWithKerberos(customNamespace, true)
})
Expand All @@ -78,7 +81,8 @@ var _ = AfterSuite(func() {
utils.TearDown(customNamespace)
Expect(krb5Client.TearDown()).To(BeNil())
Expect(utils.DeletePVCs("data-dir")).To(BeNil())
utils.KClient.DeleteNamespace(customNamespace)
err := utils.KClient.DeleteNamespace(customNamespace)
Expect(err).To(BeNil())
})

func TestService(t *testing.T) {
Expand Down
55 changes: 32 additions & 23 deletions tests/suites/kafka_sanity/kafka_broker_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka_sanity
package kafkasanity

import (
"fmt"
Expand Down Expand Up @@ -40,24 +40,27 @@ var _ = Describe("KafkaTest", func() {
Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3))
})
It("statefulset should have 3 replicas with status READY", func() {
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3))
})
It("Create a topic and read and write a message", func() {
kafkaClient := utils.NewKafkaClient(utils.KClient, &utils.KafkaClientConfiguration{
Namespace: utils.String(customNamespace),
})
kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100)
kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(1), DefaultContainerName, 100)
kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(2), DefaultContainerName, 100)
err := kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100)
Expect(err).To(BeNil())
err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(1), DefaultContainerName, 100)
Expect(err).To(BeNil())
err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(2), DefaultContainerName, 100)
Expect(err).To(BeNil())
topicSuffix, _ := utils.GetRandString(6)
topicName = fmt.Sprintf("test-topic-%s", topicSuffix)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
out, err := kafkaClient.CreateTopic(GetBrokerPodName(0), DefaultContainerName, topicName, "0")
Expect(err).To(BeNil())
Expand All @@ -76,7 +79,7 @@ var _ = Describe("KafkaTest", func() {
Namespace: utils.String(customNamespace),
})
It("should have dns resolution using service", func() {
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
output, err := kafkaClient.ExecInPod(customNamespace, utilsContainer, utilsContainer,
[]string{"nslookup", fmt.Sprintf("kafka-svc.%s.svc.cluster.local", customNamespace)})
Expand Down Expand Up @@ -111,7 +114,7 @@ var _ = Describe("KafkaTest", func() {
Namespace: utils.String(customNamespace),
})
It("verify that access mode is ReadWriteOnce", func() {
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
sts, err := utils.KClient.AppsV1().StatefulSets(customNamespace).Get(DefaultKafkaStatefulSetName, metav1.GetOptions{})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -148,9 +151,9 @@ var _ = Describe("KafkaTest", func() {

Context("scale and write/read in topics", func() {
It("statefulset should have 3 replicas with status READY", func() {
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3))
})
Expand All @@ -162,9 +165,10 @@ var _ = Describe("KafkaTest", func() {
topicName := fmt.Sprintf("test-topic-%s", topicSuffix)
err := utils.KClient.UpdateInstancesCount(DefaultKudoKafkaInstance, customNamespace, 4)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 4, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 4, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(3), DefaultContainerName, 100)
Expect(err).To(BeNil())
kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(3), DefaultContainerName, 100)
out, err := kafkaClient.CreateTopic(GetBrokerPodName(3), DefaultContainerName, topicName, "0:1:2")
Expect(err).To(BeNil())
Expect(out).To(ContainSubstring("Created topic"))
Expand All @@ -177,10 +181,14 @@ var _ = Describe("KafkaTest", func() {
err = utils.KClient.UpdateInstancesCount(DefaultKudoKafkaInstance, customNamespace, 3)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, 240)
Expect(err).To(BeNil())
// in case the broker with id 3 was the active controller we should wait for the new active controller
kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100)
kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(1), DefaultContainerName, 100)
kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(2), DefaultContainerName, 100)
err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100)
Expect(err).To(BeNil())
err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(1), DefaultContainerName, 100)
Expect(err).To(BeNil())
err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(2), DefaultContainerName, 100)
Expect(err).To(BeNil())
out, err = kafkaClient.ReadFromTopic(GetBrokerPodName(0), DefaultContainerName, topicName, messageToTest)
Expect(err).To(BeNil())
Expect(out).To(ContainSubstring(messageToTest))
Expand All @@ -191,9 +199,9 @@ var _ = Describe("KafkaTest", func() {
It("should have 3 replicas", func() {
err := utils.KClient.UpdateInstancesCount(DefaultKudoKafkaInstance, customNamespace, 1)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = utils.KClient.UpdateInstancesCount(DefaultKudoKafkaInstance, customNamespace, 3)
Expect(err).To(BeNil())
Expand All @@ -202,7 +210,7 @@ var _ = Describe("KafkaTest", func() {
Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3))
})
It("should have 3 replicas with status READY", func() {
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3))
})
Expand Down Expand Up @@ -233,9 +241,9 @@ var _ = Describe("KafkaTest", func() {
Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3))
})
It("should have 3 replicas with status READY", func() {
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds)
err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait)
Expect(err).To(BeNil())
Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3))
})
Expand All @@ -253,7 +261,8 @@ var _ = Describe("KafkaTest", func() {
var _ = BeforeSuite(func() {
utils.TearDown(customNamespace)
Expect(utils.DeletePVCs("data-dir")).To(BeNil())
utils.KClient.CreateNamespace(customNamespace, false)
_, err := utils.KClient.CreateNamespace(customNamespace, false)
Expect(err).To(BeNil())
utils.Apply(fmt.Sprintf("%s/%s", repoRoot, resources), customNamespace)
utils.Setup(customNamespace)
})
Expand Down
Loading