diff --git a/.gitignore b/.gitignore index 0dd6da1..fc2fb18 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ tests/suites/**/*-junit.xml -images/kafka-utils/**/*-junit.xml \ No newline at end of file +images/kafka-utils/**/*-junit.xml +tests/.idea/ diff --git a/tests/.golangci.yml b/tests/.golangci.yml new file mode 100644 index 0000000..6572881 --- /dev/null +++ b/tests/.golangci.yml @@ -0,0 +1,31 @@ +linters: + auto-fix: false + enable: + - errcheck + - goimports + - golint + - gosec + - misspell + - scopelint + - unconvert + - unparam + - interfacer + - nakedret + - gocyclo + - dupl + - goconst + - lll +run: + skip-dirs: + # autogenerated clientset by client-gen + - pkg/client +linters-settings: + errcheck: + check-type-assertions: true + lll: + line-length: 250 + dupl: + threshold: 400 + goimports: + # Don't use 'github.com/kudobuilder/kudo', it'll result in unreliable output! + local-prefixes: github.com/mesosphere diff --git a/tests/Makefile b/tests/Makefile new file mode 100644 index 0000000..af1debe --- /dev/null +++ b/tests/Makefile @@ -0,0 +1,4 @@ + +.PHONY: lint +lint: + golangci-lint run \ No newline at end of file diff --git a/tests/suites/kafka_auto_tls/kafka_auto_tls_test.go b/tests/suites/kafka_auto_tls/kafka_auto_tls_test.go index 038473a..40457f6 100644 --- a/tests/suites/kafka_auto_tls/kafka_auto_tls_test.go +++ b/tests/suites/kafka_auto_tls/kafka_auto_tls_test.go @@ -1,4 +1,4 @@ -package kafka_auto_tls +package kafkaautotls import ( "fmt" @@ -6,10 +6,11 @@ import ( "github.com/mesosphere/kudo-kafka-operator/tests/suites" - "github.com/mesosphere/kudo-kafka-operator/tests/utils" . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/reporters" . "github.com/onsi/gomega" + + "github.com/mesosphere/kudo-kafka-operator/tests/utils" ) var ( @@ -23,9 +24,9 @@ var _ = Describe("KafkaAutoTLS", func() { Namespace: utils.String(customNamespace), }) It("statefulset should have 1 replica with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) Expect(utils.KClient.GetStatefulSetCount(suites.DefaultKafkaStatefulSetName, customNamespace)).To(Equal(1)) }) @@ -57,14 +58,16 @@ var _ = Describe("KafkaAutoTLS", func() { var _ = BeforeSuite(func() { utils.TearDown(customNamespace) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.CreateNamespace(customNamespace, false) - utils.InstallKudoOperator(customNamespace, utils.ZK_INSTANCE, utils.ZK_FRAMEWORK_DIR_ENV, map[string]string{ + _, err := utils.KClient.CreateNamespace(customNamespace, false) + Expect(err).To(BeNil()) + utils.InstallKudoOperator(customNamespace, utils.ZkInstance, utils.ZkFrameworkDirEnv, map[string]string{ "MEMORY": "256Mi", "CPUS": "0.25", "NODE_COUNT": "1", }) - utils.KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) - utils.InstallKudoOperator(customNamespace, utils.KAFKA_INSTANCE, utils.KAFKA_FRAMEWORK_DIR_ENV, map[string]string{ + err = utils.KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) + Expect(err).To(BeNil()) + utils.InstallKudoOperator(customNamespace, utils.KafkaInstance, utils.KafkaFrameworkDirEnv, map[string]string{ "BROKER_MEM": "512Mi", "BROKER_CPUS": "0.25", "BROKER_COUNT": "1", @@ -73,13 +76,15 @@ var _ = BeforeSuite(func() { "USE_AUTO_TLS_CERTIFICATE": "true", "OFFSETS_TOPIC_REPLICATION_FACTOR": "1", }) - utils.KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) + Expect(err).To(BeNil()) }) var _ = AfterSuite(func() { utils.TearDown(customNamespace) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.DeleteNamespace(customNamespace) + err := utils.KClient.DeleteNamespace(customNamespace) + Expect(err).To(BeNil()) }) func TestService(t *testing.T) { diff --git a/tests/suites/kafka_kerberos/kafka_kerberos_test.go b/tests/suites/kafka_kerberos/kafka_kerberos_test.go index f6fe134..1a8ba16 100644 --- a/tests/suites/kafka_kerberos/kafka_kerberos_test.go +++ b/tests/suites/kafka_kerberos/kafka_kerberos_test.go @@ -1,4 +1,4 @@ -package kafka_kerberos +package kafkakerberos import ( "fmt" @@ -6,10 +6,11 @@ import ( . "github.com/mesosphere/kudo-kafka-operator/tests/suites" - "github.com/mesosphere/kudo-kafka-operator/tests/utils" . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/reporters" . "github.com/onsi/gomega" + + "github.com/mesosphere/kudo-kafka-operator/tests/utils" ) var ( @@ -30,9 +31,9 @@ var _ = Describe("KafkaTest", func() { Expect(krb5Client.CreateKeytabSecret(utils.GetKafkaKeyTabs(1, customNamespace), "kafka", "base64-kafka-keytab-secret")).To(BeNil()) }) It("Kafka and Zookeeper statefulset should have 1 replicas with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(1)) }) @@ -46,7 +47,8 @@ var _ = Describe("KafkaTest", func() { out, err := kafkaClient.CreateTopic(GetBrokerPodName(0), DefaultContainerName, topicName, "1") Expect(err).To(BeNil()) Expect(out).To(ContainSubstring("Created topic")) - kafkaClient.DescribeTopic(GetBrokerPodName(0), DefaultContainerName, topicName) + _, err = kafkaClient.DescribeTopic(GetBrokerPodName(0), DefaultContainerName, topicName) + Expect(err).To(BeNil()) messageToTest := "KerberosMessage" _, err = kafkaClient.WriteInTopic(GetBrokerPodName(0), DefaultContainerName, topicName, messageToTest) Expect(err).To(BeNil()) @@ -61,7 +63,8 @@ var _ = Describe("KafkaTest", func() { var _ = BeforeSuite(func() { utils.TearDown(customNamespace) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.CreateNamespace(customNamespace, false) + _, err := utils.KClient.CreateNamespace(customNamespace, false) + Expect(err).To(BeNil()) Expect(krb5Client.Deploy()).To(BeNil()) utils.SetupWithKerberos(customNamespace, false) }) @@ -70,7 +73,8 @@ var _ = AfterSuite(func() { utils.TearDown(customNamespace) Expect(krb5Client.TearDown()).To(BeNil()) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.DeleteNamespace(customNamespace) + err := utils.KClient.DeleteNamespace(customNamespace) + Expect(err).To(BeNil()) }) func TestService(t *testing.T) { diff --git a/tests/suites/kafka_kerberos_tls/kafka_kerberos_tls_test.go b/tests/suites/kafka_kerberos_tls/kafka_kerberos_tls_test.go index 5ae9d3d..d7f2120 100644 --- a/tests/suites/kafka_kerberos_tls/kafka_kerberos_tls_test.go +++ b/tests/suites/kafka_kerberos_tls/kafka_kerberos_tls_test.go @@ -1,4 +1,4 @@ -package kafka_kerberos +package kafkakerberos import ( "fmt" @@ -6,10 +6,11 @@ import ( . "github.com/mesosphere/kudo-kafka-operator/tests/suites" - "github.com/mesosphere/kudo-kafka-operator/tests/utils" . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/reporters" . "github.com/onsi/gomega" + + "github.com/mesosphere/kudo-kafka-operator/tests/utils" ) var ( @@ -37,9 +38,9 @@ var _ = Describe("KafkaTest", func() { Expect(krb5Client.CreateKeytabSecret(utils.GetKafkaKeyTabs(1, customNamespace), "kafka", "base64-kafka-keytab-secret")).To(BeNil()) }) It("Kafka and Zookeeper statefulset should have 1 replica each with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, zkNodeCount, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, zkNodeCount, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, kafkaBrokerCount, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, kafkaBrokerCount, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) }) It("verify the SSL listener", func() { @@ -68,8 +69,10 @@ var _ = Describe("KafkaTest", func() { var _ = BeforeSuite(func() { utils.TearDown(customNamespace) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.CreateNamespace(customNamespace, false) - utils.KClient.CreateTLSCertSecret(customNamespace, "kafka-tls", "Kafka") + _, err := utils.KClient.CreateNamespace(customNamespace, false) + Expect(err).To(BeNil()) + _, err = utils.KClient.CreateTLSCertSecret(customNamespace, "kafka-tls", "Kafka") + Expect(err).To(BeNil()) Expect(krb5Client.Deploy()).To(BeNil()) utils.SetupWithKerberos(customNamespace, true) }) @@ -78,7 +81,8 @@ var _ = AfterSuite(func() { utils.TearDown(customNamespace) Expect(krb5Client.TearDown()).To(BeNil()) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.DeleteNamespace(customNamespace) + err := utils.KClient.DeleteNamespace(customNamespace) + Expect(err).To(BeNil()) }) func TestService(t *testing.T) { diff --git a/tests/suites/kafka_sanity/kafka_broker_test.go b/tests/suites/kafka_sanity/kafka_broker_test.go index 9da7819..7f6055a 100644 --- a/tests/suites/kafka_sanity/kafka_broker_test.go +++ b/tests/suites/kafka_sanity/kafka_broker_test.go @@ -1,4 +1,4 @@ -package kafka_sanity +package kafkasanity import ( "fmt" @@ -40,9 +40,9 @@ var _ = Describe("KafkaTest", func() { Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3)) }) It("statefulset should have 3 replicas with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3)) }) @@ -50,14 +50,17 @@ var _ = Describe("KafkaTest", func() { kafkaClient := utils.NewKafkaClient(utils.KClient, &utils.KafkaClientConfiguration{ Namespace: utils.String(customNamespace), }) - kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100) - kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(1), DefaultContainerName, 100) - kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(2), DefaultContainerName, 100) + err := kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100) + Expect(err).To(BeNil()) + err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(1), DefaultContainerName, 100) + Expect(err).To(BeNil()) + err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(2), DefaultContainerName, 100) + Expect(err).To(BeNil()) topicSuffix, _ := utils.GetRandString(6) topicName = fmt.Sprintf("test-topic-%s", topicSuffix) - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) out, err := kafkaClient.CreateTopic(GetBrokerPodName(0), DefaultContainerName, topicName, "0") Expect(err).To(BeNil()) @@ -76,7 +79,7 @@ var _ = Describe("KafkaTest", func() { Namespace: utils.String(customNamespace), }) It("should have dns resolution using service", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) output, err := kafkaClient.ExecInPod(customNamespace, utilsContainer, utilsContainer, []string{"nslookup", fmt.Sprintf("kafka-svc.%s.svc.cluster.local", customNamespace)}) @@ -111,7 +114,7 @@ var _ = Describe("KafkaTest", func() { Namespace: utils.String(customNamespace), }) It("verify that access mode is ReadWriteOnce", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) sts, err := utils.KClient.AppsV1().StatefulSets(customNamespace).Get(DefaultKafkaStatefulSetName, metav1.GetOptions{}) Expect(err).To(BeNil()) @@ -148,9 +151,9 @@ var _ = Describe("KafkaTest", func() { Context("scale and write/read in topics", func() { It("statefulset should have 3 replicas with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3)) }) @@ -162,9 +165,10 @@ var _ = Describe("KafkaTest", func() { topicName := fmt.Sprintf("test-topic-%s", topicSuffix) err := utils.KClient.UpdateInstancesCount(DefaultKudoKafkaInstance, customNamespace, 4) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 4, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 4, utils.DefaultStatefulReadyWait) + Expect(err).To(BeNil()) + err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(3), DefaultContainerName, 100) Expect(err).To(BeNil()) - kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(3), DefaultContainerName, 100) out, err := kafkaClient.CreateTopic(GetBrokerPodName(3), DefaultContainerName, topicName, "0:1:2") Expect(err).To(BeNil()) Expect(out).To(ContainSubstring("Created topic")) @@ -177,10 +181,14 @@ var _ = Describe("KafkaTest", func() { err = utils.KClient.UpdateInstancesCount(DefaultKudoKafkaInstance, customNamespace, 3) Expect(err).To(BeNil()) err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, 240) + Expect(err).To(BeNil()) // in case the broker with id 3 was the active controller we should wait for the new active controller - kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100) - kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(1), DefaultContainerName, 100) - kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(2), DefaultContainerName, 100) + err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100) + Expect(err).To(BeNil()) + err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(1), DefaultContainerName, 100) + Expect(err).To(BeNil()) + err = kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(2), DefaultContainerName, 100) + Expect(err).To(BeNil()) out, err = kafkaClient.ReadFromTopic(GetBrokerPodName(0), DefaultContainerName, topicName, messageToTest) Expect(err).To(BeNil()) Expect(out).To(ContainSubstring(messageToTest)) @@ -191,9 +199,9 @@ var _ = Describe("KafkaTest", func() { It("should have 3 replicas", func() { err := utils.KClient.UpdateInstancesCount(DefaultKudoKafkaInstance, customNamespace, 1) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) err = utils.KClient.UpdateInstancesCount(DefaultKudoKafkaInstance, customNamespace, 3) Expect(err).To(BeNil()) @@ -202,7 +210,7 @@ var _ = Describe("KafkaTest", func() { Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3)) }) It("should have 3 replicas with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3)) }) @@ -233,9 +241,9 @@ var _ = Describe("KafkaTest", func() { Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3)) }) It("should have 3 replicas with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 3, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(3)) }) @@ -253,7 +261,8 @@ var _ = Describe("KafkaTest", func() { var _ = BeforeSuite(func() { utils.TearDown(customNamespace) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.CreateNamespace(customNamespace, false) + _, err := utils.KClient.CreateNamespace(customNamespace, false) + Expect(err).To(BeNil()) utils.Apply(fmt.Sprintf("%s/%s", repoRoot, resources), customNamespace) utils.Setup(customNamespace) }) diff --git a/tests/suites/kafka_tls/kafka_tls_test.go b/tests/suites/kafka_tls/kafka_tls_test.go index b76df64..19d4e8a 100644 --- a/tests/suites/kafka_tls/kafka_tls_test.go +++ b/tests/suites/kafka_tls/kafka_tls_test.go @@ -1,4 +1,4 @@ -package kafka_tls +package kafkatls import ( "fmt" @@ -6,10 +6,11 @@ import ( "github.com/mesosphere/kudo-kafka-operator/tests/suites" - "github.com/mesosphere/kudo-kafka-operator/tests/utils" . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/reporters" . "github.com/onsi/gomega" + + "github.com/mesosphere/kudo-kafka-operator/tests/utils" ) var ( @@ -23,9 +24,9 @@ var _ = Describe("KafkaTLS", func() { Namespace: utils.String(customNamespace), }) It("statefulset should have 1 replica with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) Expect(utils.KClient.GetStatefulSetCount(suites.DefaultKafkaStatefulSetName, customNamespace)).To(Equal(1)) }) @@ -57,15 +58,18 @@ var _ = Describe("KafkaTLS", func() { var _ = BeforeSuite(func() { utils.TearDown(customNamespace) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.CreateNamespace(customNamespace, false) - utils.KClient.CreateTLSCertSecret(customNamespace, "kafka-tls", "Kafka") - utils.InstallKudoOperator(customNamespace, utils.ZK_INSTANCE, utils.ZK_FRAMEWORK_DIR_ENV, map[string]string{ + _, err := utils.KClient.CreateNamespace(customNamespace, false) + Expect(err).To(BeNil()) + _, err = utils.KClient.CreateTLSCertSecret(customNamespace, "kafka-tls", "Kafka") + Expect(err).To(BeNil()) + utils.InstallKudoOperator(customNamespace, utils.ZkInstance, utils.ZkFrameworkDirEnv, map[string]string{ "MEMORY": "256Mi", "CPUS": "0.25", "NODE_COUNT": "1", }) - utils.KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) - utils.InstallKudoOperator(customNamespace, utils.KAFKA_INSTANCE, utils.KAFKA_FRAMEWORK_DIR_ENV, map[string]string{ + err = utils.KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) + Expect(err).To(BeNil()) + utils.InstallKudoOperator(customNamespace, utils.KafkaInstance, utils.KafkaFrameworkDirEnv, map[string]string{ "BROKER_MEM": "1Gi", "BROKER_CPUS": "0.25", "BROKER_COUNT": "1", @@ -74,13 +78,15 @@ var _ = BeforeSuite(func() { "ZOOKEEPER_URI": "zookeeper-instance-zookeeper-0.zookeeper-instance-hs:2181", "OFFSETS_TOPIC_REPLICATION_FACTOR": "1", }) - utils.KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) + Expect(err).To(BeNil()) }) var _ = AfterSuite(func() { utils.TearDown(customNamespace) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.DeleteNamespace(customNamespace) + err := utils.KClient.DeleteNamespace(customNamespace) + Expect(err).To(BeNil()) }) func TestService(t *testing.T) { diff --git a/tests/suites/kafka_upgrade/kafka_upgrade_test.go b/tests/suites/kafka_upgrade/kafka_upgrade_test.go index 8fab279..2053411 100644 --- a/tests/suites/kafka_upgrade/kafka_upgrade_test.go +++ b/tests/suites/kafka_upgrade/kafka_upgrade_test.go @@ -1,4 +1,4 @@ -package kafka_upgrade +package kafkaupgrade import ( "fmt" @@ -9,11 +9,12 @@ import ( . "github.com/mesosphere/kudo-kafka-operator/tests/suites" - "github.com/mesosphere/kudo-kafka-operator/tests/utils" . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/reporters" . "github.com/onsi/gomega" log "github.com/sirupsen/logrus" + + "github.com/mesosphere/kudo-kafka-operator/tests/utils" ) var ( @@ -29,7 +30,8 @@ var _ = Describe("KafkaTest", func() { Describe("[Kafka Upgrade Checks]", func() { Context("default installation", func() { It("service should have count 1", func() { - utils.KClient.WaitForStatus(utils.KAFKA_INSTANCE, customNamespace, v1beta1.ExecutionComplete, 300) + err := utils.KClient.WaitForStatus(utils.KafkaInstance, customNamespace, v1beta1.ExecutionComplete, 300) + Expect(err).To(BeNil()) kudoInstances, _ := utils.KClient.GetInstancesInNamespace(customNamespace) for _, instance := range kudoInstances.Items { log.Printf("%s kudo instance in namespace %s ", instance.Name, instance.Namespace) @@ -41,19 +43,20 @@ var _ = Describe("KafkaTest", func() { Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(1)) }) It("statefulset should have 1 replicas with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(1)) }) It("Create a topic and write a message", func() { - kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100) + err := kafkaClient.WaitForBrokersToBeRegisteredWithService(GetBrokerPodName(0), DefaultContainerName, 100) + Expect(err).To(BeNil()) topicSuffix, _ := utils.GetRandString(6) topicName = fmt.Sprintf("test-topic-%s", topicSuffix) - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) out, err := kafkaClient.CreateTopic(GetBrokerPodName(0), DefaultContainerName, topicName, "") Expect(err).To(BeNil()) @@ -65,10 +68,11 @@ var _ = Describe("KafkaTest", func() { }) Context("upgrade operator", func() { It("operator version should change", func() { - currentOperatorVersion, _ := utils.KClient.GetOperatorVersionForKudoInstance(utils.KAFKA_INSTANCE, customNamespace) - utils.KClient.UpgardeInstanceFromPath(os.Getenv(utils.KAFKA_FRAMEWORK_DIR_ENV), customNamespace, utils.KAFKA_INSTANCE, map[string]string{}) - utils.KClient.WaitForStatus(utils.KAFKA_INSTANCE, customNamespace, v1beta1.ExecutionComplete, 300) - newOperatorVersion, _ := utils.KClient.GetOperatorVersionForKudoInstance(utils.KAFKA_INSTANCE, customNamespace) + currentOperatorVersion, _ := utils.KClient.GetOperatorVersionForKudoInstance(utils.KafkaInstance, customNamespace) + utils.KClient.UpgardeInstanceFromPath(os.Getenv(utils.KafkaFrameworkDirEnv), customNamespace, utils.KafkaInstance, map[string]string{}) + err := utils.KClient.WaitForStatus(utils.KafkaInstance, customNamespace, v1beta1.ExecutionComplete, 300) + Expect(err).To(BeNil()) + newOperatorVersion, _ := utils.KClient.GetOperatorVersionForKudoInstance(utils.KafkaInstance, customNamespace) Expect(newOperatorVersion).NotTo(BeNil()) Expect(newOperatorVersion).NotTo(Equal(currentOperatorVersion)) }) @@ -76,9 +80,9 @@ var _ = Describe("KafkaTest", func() { Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(1)) }) It("statefulset should have 1 replicas with status READY", func() { - err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err := utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) - err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetReadyReplicasCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) Expect(err).To(BeNil()) Expect(utils.KClient.GetStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace)).To(Equal(1)) }) @@ -95,21 +99,24 @@ var _ = Describe("KafkaTest", func() { var _ = BeforeSuite(func() { utils.TearDown(customNamespace) Expect(utils.DeletePVCs("data-dir")).To(BeNil()) - utils.KClient.CreateNamespace(customNamespace, false) - utils.InstallKudoOperator(customNamespace, utils.ZK_INSTANCE, utils.ZK_FRAMEWORK_DIR_ENV, map[string]string{ + _, err := utils.KClient.CreateNamespace(customNamespace, false) + Expect(err).To(BeNil()) + utils.InstallKudoOperator(customNamespace, utils.ZkInstance, utils.ZkFrameworkDirEnv, map[string]string{ "MEMORY": "256Mi", "CPUS": "0.25", "NODE_COUNT": "1", }) - utils.KClient.WaitForStatefulSetCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) - utils.KClient.InstallOperatorFromRepository(customNamespace, "kafka", utils.KAFKA_INSTANCE, defaultOperatorVersion, map[string]string{ + err = utils.KClient.WaitForStatefulSetCount(DefaultZkStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) + Expect(err).To(BeNil()) + utils.KClient.InstallOperatorFromRepository(customNamespace, "kafka", utils.KafkaInstance, defaultOperatorVersion, map[string]string{ "BROKER_MEM": "512Mi", "BROKER_CPUS": "0.25", "ZOOKEEPER_URI": "zookeeper-instance-zookeeper-0.zookeeper-instance-hs:2181", "BROKER_COUNT": "1", "OFFSETS_TOPIC_REPLICATION_FACTOR": "1", }) - utils.KClient.WaitForStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWaitSeconds) + err = utils.KClient.WaitForStatefulSetCount(DefaultKafkaStatefulSetName, customNamespace, 1, utils.DefaultStatefulReadyWait) + Expect(err).To(BeNil()) utils.KClient.LogObjectsOfKinds(customNamespace, []string{"svc", "pdb", "operatorversions", "operators", "instance"}) }) diff --git a/tests/utils/client.go b/tests/utils/client.go index fd464f3..2040001 100644 --- a/tests/utils/client.go +++ b/tests/utils/client.go @@ -27,9 +27,9 @@ import ( ) const ( - EMPTY_CONDITION = "" - KAFKA_INSTANCE = "kafka" - ZK_INSTANCE = "zookeeper-instance" + EmptyCondition = "" + KafkaInstance = "kafka" + ZkInstance = "zookeeper-instance" ) var ( @@ -68,7 +68,7 @@ func GetKubernetesClient() (*kubernetes.Clientset, error) { } func (c *KubernetesTestClient) createSecret(name string, data []string, namespace string) { - c.CoreV1().Secrets(namespace).Create(&v1.Secret{ + _, err := c.CoreV1().Secrets(namespace).Create(&v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, @@ -76,7 +76,11 @@ func (c *KubernetesTestClient) createSecret(name string, data []string, namespac data[0]: data[1], }, }) - _, err := c.CoreV1().Secrets(namespace).List(metav1.ListOptions{}) + if err != nil { + log.Warningf("%v", err) + return + } + _, err = c.CoreV1().Secrets(namespace).List(metav1.ListOptions{}) if err != nil { log.Warningf("%v", err) } @@ -84,7 +88,7 @@ func (c *KubernetesTestClient) createSecret(name string, data []string, namespac func (c *KubernetesTestClient) WaitForPod(name, namespace string, timeoutSeconds time.Duration) error { timeout := time.After(timeoutSeconds * time.Second) - tick := time.Tick(500 * time.Millisecond) + tick := time.Tick(500 * time.Millisecond) //nolint for { select { case <-timeout: @@ -100,7 +104,7 @@ func (c *KubernetesTestClient) WaitForPod(name, namespace string, timeoutSeconds func (c *KubernetesTestClient) WaitForContainerToBeReady(containerName, podName, namespace string, timeoutSeconds time.Duration) error { timeout := time.After(timeoutSeconds * time.Second) - tick := time.Tick(500 * time.Millisecond) + tick := time.Tick(500 * time.Millisecond) //nolint for { select { case <-timeout: @@ -130,12 +134,12 @@ func (c *KubernetesTestClient) WaitForContainerToBeReady(containerName, podName, func (c *KubernetesTestClient) WaitForStatefulSetCount(name, namespace string, count int, timeoutSeconds time.Duration) error { timeout := time.After(timeoutSeconds * time.Second) - tick := time.Tick(2 * time.Second) + tick := time.Tick(2 * time.Second) //nolint for { select { case <-timeout: c.PrintLogsOfNamespace(namespace) - return errors.New(fmt.Sprintf("Timeout while waiting for statefulset [%s/%s] count to be %d", namespace, name, count)) + return fmt.Errorf("Timeout while waiting for statefulset [%s/%s] count to be %d", namespace, name, count) case <-tick: if count == KClient.GetStatefulSetCount(name, namespace) { return nil @@ -146,12 +150,12 @@ func (c *KubernetesTestClient) WaitForStatefulSetCount(name, namespace string, c func (c *KubernetesTestClient) WaitForStatefulSetReadyReplicasCount(name, namespace string, count int, timeoutSeconds time.Duration) error { timeout := time.After(timeoutSeconds * time.Second) - tick := time.Tick(2 * time.Second) + tick := time.Tick(2 * time.Second) //nolint for { select { case <-timeout: c.PrintLogsOfNamespace(namespace) - return errors.New(fmt.Sprintf("Timeout while waiting for statefulset [%s/%s] ready replicas count to be %d", namespace, name, count)) + return fmt.Errorf("Timeout while waiting for statefulset [%s/%s] ready replicas count to be %d", namespace, name, count) case <-tick: if count == KClient.GetStatefulSetReadyReplicasCount(name, namespace) { return nil @@ -234,27 +238,40 @@ func (c *KubernetesTestClient) GetServicesCount(name string, namespace string) i } func Setup(namespace string) { - InstallKudoOperator(namespace, ZK_INSTANCE, ZK_FRAMEWORK_DIR_ENV, map[string]string{ + InstallKudoOperator(namespace, ZkInstance, ZkFrameworkDirEnv, map[string]string{ "MEMORY": "256Mi", "CPUS": "0.25", }) - KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, namespace, 3, 300) - InstallKudoOperator(namespace, KAFKA_INSTANCE, KAFKA_FRAMEWORK_DIR_ENV, map[string]string{ + err := KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, namespace, 3, 300) + if err != nil { + log.Errorf("error waiting for install of operator %s error: %v", ZkInstance, err) + return + } + InstallKudoOperator(namespace, KafkaInstance, KafkaFrameworkDirEnv, map[string]string{ "BROKER_MEM": "512Mi", "BROKER_CPUS": "0.25", "METRICS_ENABLED": "true", }) - KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, namespace, 3, 300) + err = KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, namespace, 3, 300) + if err != nil { + log.Errorf("error waiting for install of operator %s error: %v", suites.DefaultKafkaStatefulSetName, err) + return + } } func SetupWithKerberos(namespace string, tlsEnabled bool) { - InstallKudoOperator(namespace, ZK_INSTANCE, ZK_FRAMEWORK_DIR_ENV, map[string]string{ + InstallKudoOperator(namespace, ZkInstance, ZkFrameworkDirEnv, map[string]string{ "MEMORY": "256Mi", "CPUS": "0.25", "NODE_COUNT": "1", }) - KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, namespace, 1, 30) - InstallKudoOperator(namespace, KAFKA_INSTANCE, KAFKA_FRAMEWORK_DIR_ENV, map[string]string{ + err := KClient.WaitForStatefulSetCount(suites.DefaultZkStatefulSetName, namespace, 1, 30) + if err != nil { + log.Errorf("error waiting for install of operator %s error: %v", suites.DefaultZkStatefulSetName, err) + return + } + + InstallKudoOperator(namespace, KafkaInstance, KafkaFrameworkDirEnv, map[string]string{ "KERBEROS_ENABLED": "true", "KERBEROS_KDC_HOSTNAME": "kdc-service", "KERBEROS_KDC_PORT": "2500", @@ -267,14 +284,25 @@ func SetupWithKerberos(namespace string, tlsEnabled bool) { "OFFSETS_TOPIC_REPLICATION_FACTOR": "1", "USE_AUTO_TLS_CERTIFICATE": "true", }) - KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, namespace, 1, 30) + err = KClient.WaitForStatefulSetCount(suites.DefaultKafkaStatefulSetName, namespace, 1, 30) + if err != nil { + log.Errorf("error waiting for install of operator %s error: %v", suites.DefaultKafkaStatefulSetName, err) + return + } } func TearDown(namespace string) { - DeleteInstances(namespace, ZK_INSTANCE) - KClient.WaitForStatefulSetCount(fmt.Sprintf("%s-%s", ZK_INSTANCE, ZK_INSTANCE), namespace, 0, 30) - DeleteInstances(namespace, KAFKA_INSTANCE) - KClient.WaitForStatefulSetCount(fmt.Sprintf("%s-%s", KAFKA_INSTANCE, KAFKA_INSTANCE), namespace, 0, 30) + DeleteInstances(namespace, ZkInstance) + err := KClient.WaitForStatefulSetCount(fmt.Sprintf("%s-%s", ZkInstance, ZkInstance), namespace, 0, 30) + if err != nil { + log.Errorf("error tearing down operator %s error: %v", ZkInstance, err) + } + + DeleteInstances(namespace, KafkaInstance) + err = KClient.WaitForStatefulSetCount(fmt.Sprintf("%s-%s", KafkaInstance, KafkaInstance), namespace, 0, 30) + if err != nil { + log.Errorf("error tearing down operator %s error: %v", KafkaInstance, err) + } } func Retry(attempts int, sleep time.Duration, condition string, f func() (string, error)) (resp string, err error) { @@ -343,6 +371,9 @@ func (c *KubernetesTestClient) ExecInPod(namespace string, podName string, conta Tty: true, TerminalSizeQueue: nil, }) + if err != nil { + return "", err + } if stdErr.Len() > 0 { stdErrString := stdErr.String() diff --git a/tests/utils/kafka.go b/tests/utils/kafka.go index f765dd9..93fa3b0 100644 --- a/tests/utils/kafka.go +++ b/tests/utils/kafka.go @@ -17,11 +17,11 @@ import ( ) var ( - defaultKafkaRetry = 3 - defaultKafkaRetryInterval = 1 * time.Second - defaultNamespace = "default" - defaultInstanceName = "kafka" - DefaultStatefulReadyWaitSeconds = 300 * time.Second + defaultKafkaRetry = 3 + defaultKafkaRetryInterval = 1 * time.Second + defaultNamespace = "default" + defaultInstanceName = "kafka" + DefaultStatefulReadyWait = 300 * time.Second ) type KafkaClient struct { @@ -61,13 +61,16 @@ func (c *KafkaClient) getClientConfigurationCommand() string { conf := "" securityProtocol := "" if c.conf.TLSEnabled { - conf = "KAFKA_PRODUCER_CONFIG_OPTIONS=\"--producer-property security.protocol=$SECURITY_PROTOCOL --producer-property ssl.keystore.location=/home/kafka/tls/kafka.server.keystore.jks --producer-property ssl.keystore.password=changeit --producer-property ssl.key.password=changeit --producer-property ssl.truststore.location=/home/kafka/tls/kafka.server.truststore.jks --producer-property ssl.truststore.password=changeit\";" + - "KAFKA_CONSUMER_CONFIG_OPTIONS=\"--consumer-property security.protocol=$SECURITY_PROTOCOL --consumer-property ssl.keystore.location=/home/kafka/tls/kafka.server.keystore.jks --consumer-property ssl.keystore.password=changeit --consumer-property ssl.key.password=changeit --consumer-property ssl.truststore.location=/home/kafka/tls/kafka.server.truststore.jks --consumer-property ssl.truststore.password=changeit\";" + conf = "KAFKA_PRODUCER_CONFIG_OPTIONS=\"--producer-property security.protocol=$SECURITY_PROTOCOL --producer-property ssl.keystore.location=/home/kafka/tls/kafka.server.keystore.jks " + + "--producer-property ssl.keystore.password=changeit --producer-property ssl.key.password=changeit --producer-property ssl.truststore.location=/home/kafka/tls/kafka.server.truststore.jks " + + "--producer-property ssl.truststore.password=changeit\"; KAFKA_CONSUMER_CONFIG_OPTIONS=\"--consumer-property security.protocol=$SECURITY_PROTOCOL " + + "--consumer-property ssl.keystore.location=/home/kafka/tls/kafka.server.keystore.jks --consumer-property ssl.keystore.password=changeit --consumer-property ssl.key.password=changeit " + + "--consumer-property ssl.truststore.location=/home/kafka/tls/kafka.server.truststore.jks --consumer-property ssl.truststore.password=changeit\";" securityProtocol = "SSL" } if c.conf.KerberosEnabled { - conf = conf + "printf 'KafkaClient {\ncom.sun.security.auth.module.Krb5LoginModule required\nuseKeyTab=true\nstoreKey=true\nuseTicketCache=false\nkeyTab=\"kafka.keytab\"\nprincipal=\"kafka/%s@LOCAL\";\n};' $(hostname -f) > /tmp/kafka_client_jaas.conf;" + - "export KAFKA_OPTS=\"-Djava.security.auth.login.config=/tmp/kafka_client_jaas.conf -Djava.security.krb5.conf=${KAFKA_HOME}/config/krb5.conf\";" + + conf = conf + "printf 'KafkaClient {\ncom.sun.security.auth.module.Krb5LoginModule required\nuseKeyTab=true\nstoreKey=true\nuseTicketCache=false\nkeyTab=\"kafka.keytab\"\nprincipal=\"kafka/%s@LOCAL\";\n};' " + + "$(hostname -f) > /tmp/kafka_client_jaas.conf; export KAFKA_OPTS=\"-Djava.security.auth.login.config=/tmp/kafka_client_jaas.conf -Djava.security.krb5.conf=${KAFKA_HOME}/config/krb5.conf\";" + "KAFKA_PRODUCER_CONFIG_OPTIONS=\"$KAFKA_PRODUCER_CONFIG_OPTIONS --producer-property sasl.mechanism=GSSAPI --producer-property security.protocol=$SECURITY_PROTOCOL --producer-property sasl.kerberos.service.name=kafka\";" + "KAFKA_CONSUMER_CONFIG_OPTIONS=\"$KAFKA_CONSUMER_CONFIG_OPTIONS --consumer-property sasl.mechanism=GSSAPI --consumer-property security.protocol=$SECURITY_PROTOCOL --consumer-property sasl.kerberos.service.name=kafka\";" securityProtocol = "SASL_PLAINTEXT" @@ -124,7 +127,7 @@ func (c *KafkaClient) readFromTopic(podName, container, topicName string) (strin logrus.Error(fmt.Sprintf("Error getting BROKER_PORT for kafka: %v\n", err)) return "", err } - command := []string{} + var command []string if c.conf.KerberosEnabled { command = []string{ "bash", "-c", c.getClientConfigurationCommand() + fmt.Sprintf("/opt/kafka/bin/kafka-console-consumer.sh $KAFKA_CONSUMER_CONFIG_OPTIONS --bootstrap-server "+ @@ -175,11 +178,11 @@ func (c *KafkaClient) describeTopic(podName, container, topicName string) (strin func (c *KafkaClient) WaitForBrokersToBeRegisteredWithService(podName, container string, timeoutSeconds time.Duration) error { timeout := time.After(timeoutSeconds * time.Second) - tick := time.Tick(3 * time.Second) + tick := time.Tick(3 * time.Second) //nolint for { select { case <-timeout: - return errors.New(fmt.Sprintf("Timeout while waiting for broker %s to be registered", podName)) + return fmt.Errorf("Timeout while waiting for broker %s to be registered", podName) case <-tick: if c.BrokerAddressIsRegistered(podName, container) { return nil @@ -200,10 +203,7 @@ func (c *KafkaClient) BrokerAddressIsRegistered(podName, container string) bool logrus.Println(command) output, _ := c.ExecInPod(*c.conf.Namespace, podName, container, command) logrus.Println(output) - if strings.Contains(output, "Error connecting to node") { - return false - } - return true + return !strings.Contains(output, "Error connecting to node") } func (c *KafkaClient) ExecInPod(namespace, name, container string, commands []string) (string, error) { @@ -236,6 +236,9 @@ func (c *KafkaClient) ExecInPod(namespace, name, container string, commands []st Tty: true, TerminalSizeQueue: nil, }) + if err != nil { + return "", err + } if stdErr.Len() > 0 { stdErrString := stdErr.String() @@ -250,9 +253,8 @@ func (c *KafkaClient) ExecInPod(namespace, name, container string, commands []st func (c *KafkaClient) GetPortForInstance() (string, error) { if c.conf.TLSEnabled { return c.kClient.GetParamForKudoInstance(*c.conf.InstanceName, *c.conf.Namespace, "BROKER_PORT_TLS") - } else { - return c.kClient.GetParamForKudoInstance(*c.conf.InstanceName, *c.conf.Namespace, "BROKER_PORT") } + return c.kClient.GetParamForKudoInstance(*c.conf.InstanceName, *c.conf.Namespace, "BROKER_PORT") } func (c *KafkaClient) CheckForValueInFile(expectedValue, filePath, ns, pod, containerName string) (string, error) { return Retry(*c.conf.Retry, *c.conf.RetryInterval, expectedValue, func() (string, error) { diff --git a/tests/utils/kdc.go b/tests/utils/kdc.go index cfc5f23..95515cc 100644 --- a/tests/utils/kdc.go +++ b/tests/utils/kdc.go @@ -1,22 +1,19 @@ package utils import ( - "fmt" + "errors" "os" "strings" - - v1 "k8s.io/api/core/v1" ) // KDCClient Struct defining the KDC Client type KDCClient struct { - pod *v1.Pod Namespace string } const ( - POD_NAME = "kdc" - CONTAINER_NAME = "kdc" + PodName = "kdc" + ContainerName = "kdc" ) // setNamespace Set Namespace @@ -33,7 +30,7 @@ func (k *KDCClient) Deploy() error { return KClient.WaitForPod("kdc", k.Namespace, 240) } - return fmt.Errorf("environment variable REPO_ROOT is not set!") + return errors.New("environment variable REPO_ROOT is not set") } // TearDown Use it to destroy the kdc server @@ -45,7 +42,7 @@ func (k *KDCClient) TearDown() error { return nil } - return fmt.Errorf("environment variable REPO_ROOT is not set!") + return errors.New("environment variable REPO_ROOT is not set") } // CreateKeytabSecret Pass it string array of principals and it will create a keytab secret @@ -56,11 +53,11 @@ func (k *KDCClient) CreateKeytabSecret(principals []string, serviceName string, "rm /kdc/" + serviceName + ".keytab;" + "cat /kdc/" + serviceName + "-principals.txt | while read line; do /usr/sbin/kadmin -l ext -k /kdc/" + serviceName + ".keytab $line; done;" - stdout, err := KClient.ExecInPod(k.Namespace, POD_NAME, CONTAINER_NAME, []string{"/bin/sh", "-c", command}) + _, err := KClient.ExecInPod(k.Namespace, PodName, ContainerName, []string{"/bin/sh", "-c", command}) if err != nil { return err } - stdout, err = KClient.ExecInPod(k.Namespace, POD_NAME, CONTAINER_NAME, []string{"/bin/sh", "-c", "cat /kdc/" + serviceName + ".keytab | base64 -w 0"}) + stdout, err := KClient.ExecInPod(k.Namespace, PodName, ContainerName, []string{"/bin/sh", "-c", "cat /kdc/" + serviceName + ".keytab | base64 -w 0"}) if err != nil { return err } diff --git a/tests/utils/kubectl.go b/tests/utils/kubectl.go index 65f4028..28407e6 100644 --- a/tests/utils/kubectl.go +++ b/tests/utils/kubectl.go @@ -10,8 +10,8 @@ import ( ) const ( - KAFKA_FRAMEWORK_DIR_ENV = "KAFKA_FRAMEWORK_DIR" - ZK_FRAMEWORK_DIR_ENV = "ZK_FRAMEWORK_DIR" + KafkaFrameworkDirEnv = "KAFKA_FRAMEWORK_DIR" + ZkFrameworkDirEnv = "ZK_FRAMEWORK_DIR" ) type environment struct { @@ -32,10 +32,14 @@ func applyManifests(resourcesAbsoluteDirectoryPath, action, namespace string) { log.Info(fmt.Sprintf("Using kubectl from path: %s", kubectlPath)) log.Info(fmt.Sprintf("Applying templates in directory: %s", resourcesAbsoluteDirectoryPath)) env := environment{kubectlPath, namespace} + var err error if action == "apply" { - filepath.Walk(resourcesAbsoluteDirectoryPath, env.applyManifest) + err = filepath.Walk(resourcesAbsoluteDirectoryPath, env.applyManifest) } else if action == "delete" { - filepath.Walk(resourcesAbsoluteDirectoryPath, env.deleteManifest) + err = filepath.Walk(resourcesAbsoluteDirectoryPath, env.deleteManifest) + } + if err != nil { + log.Errorf("error applying manifests with error: %v", err) } } @@ -55,7 +59,7 @@ func (env *environment) apply(filePath string, info os.FileInfo, err error, dele } if !info.IsDir() { log.Info(fmt.Sprintf("%s Template: %q\n", action, filePath)) - cmd := exec.Command(env.kubectlPath, action, "-f", filePath, "--namespace", env.namespace) + cmd := exec.Command(env.kubectlPath, action, "-f", filePath, "--namespace", env.namespace) //#nosec G204 out, err := cmd.Output() if err != nil { log.Error(string(err.(*exec.ExitError).Stderr)) diff --git a/tests/utils/kudo.go b/tests/utils/kudo.go index e890d25..6cfbe03 100644 --- a/tests/utils/kudo.go +++ b/tests/utils/kudo.go @@ -81,7 +81,7 @@ func (c *KubernetesTestClient) GetOperatorVersionForKudoInstance(name, namespace } func (c *KubernetesTestClient) UpdateInstancesCount(name, namespace string, count int) error { - _, err := Retry(3, 0*time.Second, EMPTY_CONDITION, func() (string, error) { + _, err := Retry(3, 0*time.Second, EmptyCondition, func() (string, error) { return updateInstancesCount(name, namespace, count) }) return err @@ -114,7 +114,7 @@ func updateInstancesCount(name, namespace string, count int) (string, error) { } func (c *KubernetesTestClient) UpdateInstanceParams(name, namespace string, mapParam map[string]string) error { - _, err := Retry(3, 0*time.Second, EMPTY_CONDITION, func() (string, error) { + _, err := Retry(3, 0*time.Second, EmptyCondition, func() (string, error) { return updateInstanceParams(name, namespace, mapParam) }) return err @@ -173,7 +173,7 @@ func (c *KubernetesTestClient) installOrUpgradeOperator(operation, namespace, op kubectlPath := getKubectlPath() log.Info(fmt.Sprintf("Using kubectl from path: %s", kubectlPath)) - install_cmd := []string{ + installCmd := []string{ "kudo", operation, operatorNameOrPath, @@ -182,14 +182,14 @@ func (c *KubernetesTestClient) installOrUpgradeOperator(operation, namespace, op } if version != "" { - install_cmd = append(install_cmd, fmt.Sprintf("--operator-version=%s", version)) + installCmd = append(installCmd, fmt.Sprintf("--operator-version=%s", version)) } for key, val := range params { - install_cmd = append(install_cmd, "-p", fmt.Sprintf("%s=%s", key, val)) + installCmd = append(installCmd, "-p", fmt.Sprintf("%s=%s", key, val)) } - cmd := exec.Command(kubectlPath, install_cmd...) + cmd := exec.Command(kubectlPath, installCmd...) log.Infoln(cmd.Args) out, err := cmd.Output() if err != nil { @@ -212,7 +212,7 @@ func (c *KubernetesTestClient) DeleteInstance(namespace, name string) { func (c *KubernetesTestClient) WaitForStatus(name, namespace string, expectedStatus v1beta1.ExecutionStatus, timeoutSeconds time.Duration) error { timeout := time.After(timeoutSeconds * time.Second) - tick := time.Tick(2 * time.Second) + tick := time.Tick(2 * time.Second) //nolint for { select { case <-timeout: @@ -248,7 +248,7 @@ func (c *KubernetesTestClient) LogObjectsOfKinds(namespace string, components [] if err != nil { log.Error(string(err.(*exec.ExitError).Stderr)) } - log.Info(fmt.Sprintf(string(out))) + log.Info(string(out)) } } @@ -275,7 +275,7 @@ func (c *KubernetesTestClient) PrintLogsOfNamespace(namespace string) { if err != nil { log.Error(string(err.(*exec.ExitError).Stderr)) } - log.Info(fmt.Sprintf(string(out))) + log.Info(string(out)) }