Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor koperator reserved label keys #841

Merged
merged 2 commits into from
Jul 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
@@ -47,6 +47,13 @@ const (
DefaultEnvoyAdminPort = 8081
// DefaultBrokerTerminationGracePeriod default kafka pod termination grace period
DefaultBrokerTerminationGracePeriod = 120

// AppLabelKey is used to represent the reserved operator label, "app"
AppLabelKey = "app"
// KafkaCRLabelKey is used to represent the reserved operator label, "kafka_cr"
KafkaCRLabelKey = "kafka_cr"
// BrokerIdLabelKey is used to represent the reserved operator label, "brokerId"
BrokerIdLabelKey = "brokerId"
)

// KafkaClusterSpec defines the desired state of KafkaCluster
@@ -801,7 +808,7 @@ func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId i
return util.MergeLabels(
bConfig.BrokerLabels,
util.LabelsForKafka(kafkaClusterName),
map[string]string{"brokerId": fmt.Sprintf("%d", brokerId)},
map[string]string{BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)},
)
}

26 changes: 13 additions & 13 deletions api/v1beta1/kafkacluster_types_test.go
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ func TestGetBrokerConfigAffinityMergeBrokerNodeAffinityWithGroupsAntiAffinity(t
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "kafka", "kafka_cr": "kafka_broker"}},
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{AppLabelKey: "kafka", KafkaCRLabelKey: "kafka_broker"}},
Namespaces: nil,
TopologyKey: "kubernetes.io/hostname",
},
@@ -87,7 +87,7 @@ func TestGetBrokerConfigAffinityMergeBrokerNodeAffinityWithGroupsAntiAffinity(t
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "kafka", "kafka_cr": "kafka_broker"},
MatchLabels: map[string]string{AppLabelKey: "kafka", KafkaCRLabelKey: "kafka_broker"},
},
TopologyKey: "kubernetes.io/hostname",
},
@@ -122,7 +122,7 @@ func TestGetBrokerConfigAffinityMergeEmptyBrokerConfigWithDefaultConfig(t *testi
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "kafka", "kafka_cr": "kafka_config_group"}},
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{AppLabelKey: "kafka", KafkaCRLabelKey: "kafka_config_group"}},
Namespaces: nil,
TopologyKey: "kubernetes.io/hostname",
},
@@ -139,7 +139,7 @@ func TestGetBrokerConfigAffinityMergeEmptyBrokerConfigWithDefaultConfig(t *testi
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "kafka", "kafka_cr": "kafka_config_group"},
MatchLabels: map[string]string{AppLabelKey: "kafka", KafkaCRLabelKey: "kafka_config_group"},
},
TopologyKey: "kubernetes.io/hostname",
},
@@ -188,7 +188,7 @@ func TestGetBrokerConfigAffinityMergeEqualPodAntiAffinity(t *testing.T) {
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "kafka", "kafka_cr": "kafka_broker"},
MatchLabels: map[string]string{AppLabelKey: "kafka", KafkaCRLabelKey: "kafka_broker"},
},
TopologyKey: "kubernetes.io/hostname",
},
@@ -206,7 +206,7 @@ func TestGetBrokerConfigAffinityMergeEqualPodAntiAffinity(t *testing.T) {
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "kafka", "kafka_cr": "kafka_broker"},
MatchLabels: map[string]string{AppLabelKey: "kafka", KafkaCRLabelKey: "kafka_broker"},
},
TopologyKey: "kubernetes.io/hostname",
},
@@ -224,7 +224,7 @@ func TestGetBrokerConfigAffinityMergeEqualPodAntiAffinity(t *testing.T) {
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "kafka", "kafka_cr": "kafka_config_group"},
MatchLabels: map[string]string{AppLabelKey: "kafka", KafkaCRLabelKey: "kafka_config_group"},
},
TopologyKey: "kubernetes.io/hostname",
},
@@ -437,17 +437,17 @@ func TestGetBrokerLabels(t *testing.T) {
)

expected := map[string]string{
"app": expectedDefaultLabelApp,
"brokerId": strconv.Itoa(expectedBrokerId),
"kafka_cr": expectedKafkaCRName,
AppLabelKey: expectedDefaultLabelApp,
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
"test_label_key": "test_label_value",
}

brokerConfig := &BrokerConfig{
BrokerLabels: map[string]string{
"app": "test_app",
"brokerId": "test_id",
"kafka_cr": "test_cr_name",
AppLabelKey: "test_app",
BrokerIdLabelKey: "test_id",
KafkaCRLabelKey: "test_cr_name",
"test_label_key": "test_label_value",
},
}
22 changes: 11 additions & 11 deletions controllers/tests/kafkacluster_controller_cruisecontrol_test.go
Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ func expectCruiseControlTopic(kafkaCluster *v1beta1.KafkaCluster) {
}).Should(Succeed())

Expect(topic).NotTo(BeNil())
Expect(topic.Labels).To(HaveKeyWithValue("app", "kafka"))
Expect(topic.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(topic.Labels).To(HaveKeyWithValue("clusterName", kafkaCluster.Name))
Expect(topic.Labels).To(HaveKeyWithValue("clusterNamespace", kafkaCluster.Namespace))

@@ -80,8 +80,8 @@ func expectCruiseControlService(kafkaCluster *v1beta1.KafkaCluster) {
}, service)
}).Should(Succeed())

Expect(service.Labels).To(HaveKeyWithValue("app", "cruisecontrol"))
Expect(service.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "cruisecontrol"))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(service.Spec.Ports).To(ConsistOf(
corev1.ServicePort{
Name: "cc",
@@ -96,8 +96,8 @@ func expectCruiseControlService(kafkaCluster *v1beta1.KafkaCluster) {
TargetPort: intstr.FromInt(9020),
},
))
Expect(service.Spec.Selector).To(HaveKeyWithValue("kafka_cr", "kafkacluster-1"))
Expect(service.Spec.Selector).To(HaveKeyWithValue("app", "cruisecontrol"))
Expect(service.Spec.Selector).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, "kafkacluster-1"))
Expect(service.Spec.Selector).To(HaveKeyWithValue(v1beta1.AppLabelKey, "cruisecontrol"))
}

func expectCruiseControlConfigMap(kafkaCluster *v1beta1.KafkaCluster) {
@@ -109,8 +109,8 @@ func expectCruiseControlConfigMap(kafkaCluster *v1beta1.KafkaCluster) {
}, configMap)
}).Should(Succeed())

Expect(configMap.Labels).To(HaveKeyWithValue("app", "cruisecontrol"))
Expect(configMap.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(configMap.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "cruisecontrol"))
Expect(configMap.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))

Expect(configMap.Data).To(HaveKeyWithValue("cruisecontrol.properties", fmt.Sprintf(`bootstrap.servers=%s-all-broker.%s.%s:29092
some.config=value
@@ -233,12 +233,12 @@ func expectCruiseControlDeployment(kafkaCluster *v1beta1.KafkaCluster) {
}, deployment)
}).Should(Succeed())

Expect(deployment.Labels).To(HaveKeyWithValue("app", "cruisecontrol"))
Expect(deployment.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(deployment.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "cruisecontrol"))
Expect(deployment.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))

Expect(deployment.Spec.Selector).NotTo(BeNil())
Expect(deployment.Spec.Selector.MatchLabels).To(HaveKeyWithValue("app", "cruisecontrol"))
Expect(deployment.Spec.Selector.MatchLabels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(deployment.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "cruisecontrol"))
Expect(deployment.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))

Expect(deployment.Spec.Template.Annotations).To(HaveKey("cruiseControlCapacity.json"))
Expect(deployment.Spec.Template.Annotations).To(HaveKey("cruiseControlClusterConfig.json"))
10 changes: 5 additions & 5 deletions controllers/tests/kafkacluster_controller_envoy_test.go
Original file line number Diff line number Diff line change
@@ -29,9 +29,9 @@ import (
)

func expectEnvoyIngressLabels(labels map[string]string, eListenerName, crName string) {
Expect(labels).To(HaveKeyWithValue("app", "envoyingress"))
Expect(labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "envoyingress"))
Expect(labels).To(HaveKeyWithValue("eListenerName", eListenerName))
Expect(labels).To(HaveKeyWithValue("kafka_cr", crName))
Expect(labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, crName))
}

func expectEnvoyIngressAnnotations(annotations map[string]string) {
@@ -49,9 +49,9 @@ func expectEnvoyLoadBalancer(kafkaCluster *v1beta1.KafkaCluster, eListenerTempla
expectEnvoyIngressLabels(loadBalancer.Labels, eListenerTemplate, kafkaCluster.Name)
Expect(loadBalancer.Spec.Type).To(Equal(corev1.ServiceTypeLoadBalancer))
Expect(loadBalancer.Spec.Selector).To(Equal(map[string]string{
"app": "envoyingress",
"eListenerName": eListenerTemplate,
"kafka_cr": kafkaCluster.Name,
v1beta1.AppLabelKey: "envoyingress",
"eListenerName": eListenerTemplate,
v1beta1.KafkaCRLabelKey: kafkaCluster.Name,
}))
Expect(loadBalancer.Spec.Ports).To(HaveLen(6))
for i, port := range loadBalancer.Spec.Ports {
Original file line number Diff line number Diff line change
@@ -41,9 +41,9 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1
}, &configMap)
}).Should(Succeed())

Expect(configMap.Labels).To(HaveKeyWithValue("app", "kafka"))
Expect(configMap.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(configMap.Labels).To(HaveKeyWithValue("brokerId", strconv.Itoa(int(broker.Id))))
Expect(configMap.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(configMap.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(configMap.Labels).To(HaveKeyWithValue(v1beta1.BrokerIdLabelKey, strconv.Itoa(int(broker.Id))))

brokerConfig, err := properties.NewFromString(configMap.Data["broker-config"])
Expect(err).NotTo(HaveOccurred())
@@ -66,9 +66,9 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1
}, &service)
}).Should(Succeed())

Expect(service.Labels).To(HaveKeyWithValue("app", "kafka"))
Expect(service.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(service.Labels).To(HaveKeyWithValue("brokerId", strconv.Itoa(int(broker.Id))))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.BrokerIdLabelKey, strconv.Itoa(int(broker.Id))))

Expect(service.Spec.Ports).To(ConsistOf(
corev1.ServicePort{
Original file line number Diff line number Diff line change
@@ -154,16 +154,16 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", func() {
}).Should(Succeed())

Expect(svc.Labels).To(Equal(map[string]string{
"app": "kafka",
"brokerId": "0",
"kafka_cr": kafkaCluster.Name,
v1beta1.AppLabelKey: "kafka",
v1beta1.BrokerIdLabelKey: "0",
v1beta1.KafkaCRLabelKey: kafkaCluster.Name,
}))

Expect(svc.Spec.Type).To(Equal(corev1.ServiceTypeNodePort))
Expect(svc.Spec.Selector).To(Equal(map[string]string{
"app": "kafka",
"brokerId": "0",
"kafka_cr": kafkaCluster.Name,
v1beta1.AppLabelKey: "kafka",
v1beta1.BrokerIdLabelKey: "0",
v1beta1.KafkaCRLabelKey: kafkaCluster.Name,
}))

Expect(svc.Spec.Ports).To(HaveLen(1))
Original file line number Diff line number Diff line change
@@ -47,9 +47,9 @@ var _ = Describe("KafkaClusterIstioIngressController", func() {
)

ExpectIstioIngressLabels := func(labels map[string]string, eListenerName, crName string) {
Expect(labels).To(HaveKeyWithValue("app", "istioingress"))
Expect(labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "istioingress"))
Expect(labels).To(HaveKeyWithValue("eListenerName", eListenerName))
Expect(labels).To(HaveKeyWithValue("kafka_cr", crName))
Expect(labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, crName))
}

BeforeEach(func() {
@@ -397,9 +397,9 @@ var _ = Describe("KafkaClusterIstioIngressControllerWithBrokerIdBindings", func(
)

ExpectIstioIngressLabels := func(labels map[string]string, eListenerName, crName string) {
Expect(labels).To(HaveKeyWithValue("app", "istioingress"))
Expect(labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "istioingress"))
Expect(labels).To(HaveKeyWithValue("eListenerName", eListenerName))
Expect(labels).To(HaveKeyWithValue("kafka_cr", crName))
Expect(labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, crName))
}

BeforeEach(func() {
46 changes: 23 additions & 23 deletions controllers/tests/kafkacluster_controller_kafka_test.go
Original file line number Diff line number Diff line change
@@ -59,13 +59,13 @@ func expectKafkaAllBrokerService(kafkaCluster *v1beta1.KafkaCluster) {
}, service)
}).Should(Succeed())

Expect(service.Labels).To(HaveKeyWithValue("app", "kafka"))
Expect(service.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))

Expect(service.Spec.Type).To(Equal(corev1.ServiceTypeClusterIP))
Expect(service.Spec.SessionAffinity).To(Equal(corev1.ServiceAffinityNone))
Expect(service.Spec.Selector).To(HaveKeyWithValue("app", "kafka"))
Expect(service.Spec.Selector).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(service.Spec.Selector).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(service.Spec.Selector).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(service.Spec.Ports).To(ConsistOf(
corev1.ServicePort{
Name: "tcp-internal",
@@ -116,12 +116,12 @@ func expectKafkaPDB(kafkaCluster *v1beta1.KafkaCluster) {
}).Should(Succeed())

// make assertions
Expect(pdb.Labels).To(HaveKeyWithValue("app", "kafka"))
Expect(pdb.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(pdb.Spec.MinAvailable).To(Equal(util.IntstrPointer(3)))
Expect(pdb.Spec.Selector).NotTo(BeNil())
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue("app", "kafka"))
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
}

func expectKafkaPVC(kafkaCluster *v1beta1.KafkaCluster) {
@@ -130,15 +130,15 @@ func expectKafkaPVC(kafkaCluster *v1beta1.KafkaCluster) {
Eventually(func() error {
return k8sClient.List(context.Background(), &pvcs,
client.ListOption(client.InNamespace(kafkaCluster.Namespace)),
client.ListOption(client.MatchingLabels(map[string]string{"app": "kafka", "kafka_cr": kafkaCluster.Name})))
client.ListOption(client.MatchingLabels(map[string]string{v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.Name})))
}).Should(Succeed())

Expect(pvcs.Items).To(HaveLen(3))
for i, pvc := range pvcs.Items {
Expect(pvc.GenerateName).To(Equal(fmt.Sprintf("%s-%d-storage-0-", kafkaCluster.Name, i)))
Expect(pvc.Labels).To(HaveKeyWithValue("app", "kafka"))
Expect(pvc.Labels).To(HaveKeyWithValue("brokerId", strconv.Itoa(i)))
Expect(pvc.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(pvc.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(pvc.Labels).To(HaveKeyWithValue(v1beta1.BrokerIdLabelKey, strconv.Itoa(i)))
Expect(pvc.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(pvc.Annotations).To(HaveKeyWithValue("mountPath", "/kafka-logs"))
Expect(pvc.Spec.AccessModes).To(ConsistOf(corev1.ReadWriteOnce))
Expect(pvc.Spec.Resources).To(Equal(corev1.ResourceRequirements{
@@ -158,9 +158,9 @@ func expectKafkaBrokerConfigmap(kafkaCluster *v1beta1.KafkaCluster, broker v1bet
}, &configMap)
}).Should(Succeed())

Expect(configMap.Labels).To(HaveKeyWithValue("app", "kafka"))
Expect(configMap.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(configMap.Labels).To(HaveKeyWithValue("brokerId", strconv.Itoa(int(broker.Id))))
Expect(configMap.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(configMap.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(configMap.Labels).To(HaveKeyWithValue(v1beta1.BrokerIdLabelKey, strconv.Itoa(int(broker.Id))))

Expect(configMap.Data).To(HaveKeyWithValue("broker-config", fmt.Sprintf(`advertised.listeners=CONTROLLER://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29093,INTERNAL://kafkacluster-%d-%d.kafka-%d.svc.cluster.local:29092,TEST://test.host.com:%d
broker.id=%d
@@ -187,9 +187,9 @@ func expectKafkaBrokerService(kafkaCluster *v1beta1.KafkaCluster, broker v1beta1
}, &service)
}).Should(Succeed())

Expect(service.Labels).To(HaveKeyWithValue("app", "kafka"))
Expect(service.Labels).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(service.Labels).To(HaveKeyWithValue("brokerId", strconv.Itoa(int(broker.Id))))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(service.Labels).To(HaveKeyWithValue(v1beta1.BrokerIdLabelKey, strconv.Itoa(int(broker.Id))))

Expect(service.Spec.Ports).To(ConsistOf(
corev1.ServicePort{
@@ -217,9 +217,9 @@ func expectKafkaBrokerService(kafkaCluster *v1beta1.KafkaCluster, broker v1beta1
TargetPort: intstr.FromInt(9020),
}))

Expect(service.Spec.Selector).To(HaveKeyWithValue("app", "kafka"))
Expect(service.Spec.Selector).To(HaveKeyWithValue("kafka_cr", kafkaCluster.Name))
Expect(service.Spec.Selector).To(HaveKeyWithValue("brokerId", strconv.Itoa(int(broker.Id))))
Expect(service.Spec.Selector).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(service.Spec.Selector).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(service.Spec.Selector).To(HaveKeyWithValue(v1beta1.BrokerIdLabelKey, strconv.Itoa(int(broker.Id))))
Expect(service.Spec.Type).To(Equal(corev1.ServiceTypeClusterIP))
}

@@ -228,14 +228,14 @@ func expectKafkaBrokerPod(kafkaCluster *v1beta1.KafkaCluster, broker v1beta1.Bro
Eventually(func() ([]corev1.Pod, error) {
err := k8sClient.List(context.Background(), &podList,
client.ListOption(client.InNamespace(kafkaCluster.Namespace)),
client.ListOption(client.MatchingLabels(map[string]string{"app": "kafka", "kafka_cr": kafkaCluster.Name, "brokerId": strconv.Itoa(int(broker.Id))})))
client.ListOption(client.MatchingLabels(map[string]string{v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.Name, v1beta1.BrokerIdLabelKey: strconv.Itoa(int(broker.Id))})))
return podList.Items, err
}).Should(HaveLen(1))

pod := podList.Items[0]

Expect(pod.GenerateName).To(Equal(fmt.Sprintf("%s-%d-", kafkaCluster.Name, broker.Id)))
Expect(pod.Labels).To(HaveKeyWithValue("brokerId", strconv.Itoa(int(broker.Id))))
Expect(pod.Labels).To(HaveKeyWithValue(v1beta1.BrokerIdLabelKey, strconv.Itoa(int(broker.Id))))
getContainerName := func(c corev1.Container) string { return c.Name }
// test exact order, because if the slice reorders, it triggers another reconcile cycle
Expect(pod.Spec.InitContainers).To(HaveLen(4))
Loading