Skip to content

Commit

Permalink
CIAINFRA-890 operator v1: remove obsolete additional container ports
Browse files Browse the repository at this point in the history
prior to this patch, a container pod was added to pod spec for every
broker that exists. the initial idea was to do this for AWS PrivateLink
or GCP PrivateServiceConnect. however, this was never used, and is not
required: the original container port - that is the same for every pod -
is used.

the problem with these ports: they are part of Pod Spec, and therefore
added to every pod. Scaling up adds a pod, so adds another container
port, and therefore rolls all Pods. This is undesired behavior, and
causes unwanted restarts.

in addition, advertised port and kafka/proxy port had same names in
additionalProperties (pl-proxy for example), and did override each
other. the port actually used in AWS PL @ redpanda cloud was never added,
only the advertised one (which is obsolete).
it did not matter really, because ports work, even if not added
to container ports.
  • Loading branch information
birdayz committed Dec 18, 2024
1 parent 7d7ea08 commit 94a735e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 55 deletions.
66 changes: 30 additions & 36 deletions operator/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/go-logr/logr"
"gopkg.in/yaml.v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -1024,58 +1025,41 @@ func (r *StatefulSetResource) AdditionalListenersEnvVars() []corev1.EnvVar {
// - pandaproxy.pandaproxy_api
// - pandaproxy.advertised_pandaproxy_api
// example: redpanda.kafka_api: "[{'name':'private-link','address':'0.0.0.0','port':39002}]"
// example: redpanda.advertised_kafka_api: "[{'name':'private-link','address':'{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 7 }}.cluster123.fmc.prd.cloud.redpanda.com','port': 'port': {{30092 | add .Index}}}]"
func (r *StatefulSetResource) GetPortsForListenersInAdditionalConfig() []corev1.ContainerPort {
ports := []corev1.ContainerPort{}

if len(r.pandaCluster.Spec.AdditionalConfiguration) == 0 {
return ports
}

additionalNode0Config := config.ProdDefault()
for _, k := range additionalListenerCfgNames {
if v, found := r.pandaCluster.Spec.AdditionalConfiguration[k]; found {
res, err := utils.Compute(v, utils.NewEndpointTemplateData(0, "dummy", 0), false)
if err != nil {
r.logger.Error(err, "failed to evaluate template", "template", v)
continue
}
err = config.Set(additionalNode0Config, k, res)
getPorts := func(key string) []corev1.ContainerPort {
if v, found := r.pandaCluster.Spec.AdditionalConfiguration[key]; found {
var ports []corev1.ContainerPort
res, err := utils.Compute(v, utils.NewEndpointTemplateData(0, "dummy", r.nodePool.HostIndexOffset), false)
if err != nil {
r.logger.Error(err, "failed to set node config", k, v)
continue
return nil
}
}
}

for i := 0; i < int(ptr.Deref(r.nodePool.Replicas, 0)); i++ {
for _, n := range additionalNode0Config.Redpanda.AdvertisedKafkaAPI {
ports = append(ports, corev1.ContainerPort{
Name: getAdditionalListenerPortName(n.Name, i),
ContainerPort: int32(n.Port + i),
})
}
if additionalNode0Config.Pandaproxy != nil {
for _, n := range additionalNode0Config.Pandaproxy.AdvertisedPandaproxyAPI {
var kafkaAPI []config.NamedAuthNSocketAddress
if err := yaml.Unmarshal([]byte(res), &kafkaAPI); err != nil {
r.logger.Error(err, "failed to unmarshal additionalProperty %s into []config.NamedAuthNSocketAddress", "additionalProperty", key)
return nil
}
for _, v := range kafkaAPI {
ports = append(ports, corev1.ContainerPort{
Name: getAdditionalListenerPortName(n.Name, i),
ContainerPort: int32(n.Port + i),
Name: getAdditionalListenerPortName(v.Name),
ContainerPort: int32(v.Port),
})
}
return ports
}
return nil
}
return ports
}

// getAdditionalListenerPortName returns the name of container port for additional listener.
// Container port name must be unique and its length can not exceed 15.
func getAdditionalListenerPortName(listenerName string, podOrdinal int) string {
portName := fmt.Sprintf("%s%d", listenerName, podOrdinal)
s := 0
if len(portName) > 15 {
s = len(portName) - 15
}
return portName[s:]
ports = append(ports, getPorts("redpanda.kafka_api")...)
ports = append(ports, getPorts("pandaproxy.pandaproxy_api")...)

return ports
}

func (r *StatefulSetResource) fullConfiguratorImage() string {
Expand All @@ -1093,6 +1077,16 @@ func (r *StatefulSetResource) Version() string {
return ""
}

// getAdditionalListenerPortName returns the name of container port for additional listener.
// Container port name must be unique and its length can not exceed 15.
func getAdditionalListenerPortName(listenerName string) string {
s := 0
if len(listenerName) > 15 {
s = len(listenerName) - 15
}
return listenerName[s:]
}

func redpandaContainerVersion(containers []corev1.Container) string {
for i := range containers {
c := containers[i]
Expand Down
48 changes: 29 additions & 19 deletions operator/pkg/resources/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,18 +955,16 @@ func TestStatefulSetPorts_AdditionalListeners(t *testing.T) {
},
},
AdditionalConfiguration: map[string]string{
"redpanda.advertised_kafka_api": "[{'name':'pl-kafka','address':'0.0.0.0','port': {{30092 | add .Index}}}]",
"redpanda.kafka_api": "[{'name':'pl-kafka','address':'0.0.0.0','port': {{30092 | add .Index}}}]",
},
},
},
expectedContainerPorts: []corev1.ContainerPort{
{Name: "pl-kafka0", ContainerPort: 30092},
{Name: "pl-kafka1", ContainerPort: 30093},
{Name: "pl-kafka2", ContainerPort: 30094},
{Name: "pl-kafka", ContainerPort: 30092},
},
},
{
name: "additional kafka and panda proxy listeners",
name: "do not add advertised listeners",
pandaCluster: &vectorizedv1alpha1.Cluster{
Spec: vectorizedv1alpha1.ClusterSpec{
NodePools: []vectorizedv1alpha1.NodePoolSpec{
Expand All @@ -981,13 +979,28 @@ func TestStatefulSetPorts_AdditionalListeners(t *testing.T) {
},
},
},
expectedContainerPorts: []corev1.ContainerPort{},
},

{
name: "additional kafka and panda proxy listeners",
pandaCluster: &vectorizedv1alpha1.Cluster{
Spec: vectorizedv1alpha1.ClusterSpec{
NodePools: []vectorizedv1alpha1.NodePoolSpec{
{
Name: "test",
Replicas: &replicas,
},
},
AdditionalConfiguration: map[string]string{
"redpanda.kafka_api": "[{'name':'pl-kafka','address':'0.0.0.0', 'port': {{30092 | add .Index}}}]",
"pandaproxy.pandaproxy_api": "[{'name':'pl-proxy','address':'0.0.0.0', 'port': {{39282 | add .Index}}}]",
},
},
},
expectedContainerPorts: []corev1.ContainerPort{
{Name: "pl-kafka0", ContainerPort: 30092},
{Name: "pl-kafka1", ContainerPort: 30093},
{Name: "pl-kafka2", ContainerPort: 30094},
{Name: "pl-proxy0", ContainerPort: 39282},
{Name: "pl-proxy1", ContainerPort: 39283},
{Name: "pl-proxy2", ContainerPort: 39284},
{Name: "pl-kafka", ContainerPort: 30092},
{Name: "pl-proxy", ContainerPort: 39282},
},
},
{
Expand All @@ -1001,18 +1014,14 @@ func TestStatefulSetPorts_AdditionalListeners(t *testing.T) {
},
},
AdditionalConfiguration: map[string]string{
"redpanda.advertised_kafka_api": "[{'name':'private-link-kafka','address':'0.0.0.0', 'port': {{30092 | add .Index}}}]",
"pandaproxy.advertised_pandaproxy_api": "[{'name':'private-link-proxy','address':'0.0.0.0', 'port': {{39282 | add .Index}}}]",
"redpanda.kafka_api": "[{'name':'private-link-kafka','address':'0.0.0.0', 'port': {{30092 | add .Index}}}]",
"pandaproxy.pandaproxy_api": "[{'name':'private-link-proxy','address':'0.0.0.0', 'port': {{39282 | add .Index}}}]",
},
},
},
expectedContainerPorts: []corev1.ContainerPort{
{Name: "ate-link-kafka0", ContainerPort: 30092},
{Name: "ate-link-kafka1", ContainerPort: 30093},
{Name: "ate-link-kafka2", ContainerPort: 30094},
{Name: "ate-link-proxy0", ContainerPort: 39282},
{Name: "ate-link-proxy1", ContainerPort: 39283},
{Name: "ate-link-proxy2", ContainerPort: 39284},
{Name: "vate-link-kafka", ContainerPort: 30092},
{Name: "vate-link-proxy", ContainerPort: 39282},
},
},
}
Expand All @@ -1027,6 +1036,7 @@ func TestStatefulSetPorts_AdditionalListeners(t *testing.T) {
true)
containerPorts := r.GetPortsForListenersInAdditionalConfig()
assert.Equal(t, len(tt.expectedContainerPorts), len(containerPorts))

for _, cp := range containerPorts {
found := false
for _, p := range tt.expectedContainerPorts {
Expand Down

0 comments on commit 94a735e

Please sign in to comment.