Skip to content

Commit

Permalink
Add support for Stream over TLS (#703)
Browse files Browse the repository at this point in the history
* Add support for Stream over TLS

Automatically configure TLS for the Stream plugin when TLS is enabled.
  • Loading branch information
mkuratczyk authored May 27, 2021
1 parent fe1acaa commit 81b26b7
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 11 deletions.
10 changes: 10 additions & 0 deletions internal/resource/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ func (builder *ServerConfigMapBuilder) Update(object client.Object) error {
}
}
}
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
if _, err := userConfigurationSection.NewKey("stream.listeners.ssl.default", "5551"); err != nil {
return err
}
if builder.Instance.DisableNonTLSListeners() {
if _, err := userConfigurationSection.NewKey("stream.listeners.tcp", "none"); err != nil {
return err
}
}
}
}

if builder.Instance.MutualTLSEnabled() {
Expand Down
15 changes: 10 additions & 5 deletions internal/resource/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ CONSOLE_LOG=new`
Expect(configMap.Data).To(HaveKeyWithValue("userDefinedConfiguration.conf", expectedConfiguration))
})

When("MQTT, STOMP and AMQP 1.0 plugins are enabled", func() {
When("MQTT, STOMP, AMQP 1.0 and Stream plugins are enabled", func() {
It("adds TLS config for the additional plugins", func() {
additionalPlugins := []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp", "rabbitmq_amqp_1_0"}
additionalPlugins := []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp", "rabbitmq_amqp_1_0", "rabbitmq_stream"}

instance.ObjectMeta.Name = "rabbit-tls"
instance.Spec.TLS.SecretName = "tls-secret"
Expand All @@ -271,7 +271,8 @@ CONSOLE_LOG=new`
management.tcp.port = 15672
prometheus.tcp.port = 15692
mqtt.listeners.ssl.default = 8883
stomp.listeners.ssl.1 = 61614`)
stomp.listeners.ssl.1 = 61614
stream.listeners.ssl.default = 5551`)

Expect(configMapBuilder.Update(configMap)).To(Succeed())
Expect(configMap.Data).To(HaveKeyWithValue("userDefinedConfiguration.conf", expectedConfiguration))
Expand Down Expand Up @@ -403,7 +404,7 @@ CONSOLE_LOG=new`
Expect(configMap.Data).To(HaveKeyWithValue("userDefinedConfiguration.conf", expectedConfiguration))
})

It("disables non tls listeners for mqtt and stomp when enabled", func() {
It("disables non tls listeners for mqtt, stomp and stream plugins if enabled", func() {
instance = rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbit-tls",
Expand All @@ -417,6 +418,7 @@ CONSOLE_LOG=new`
AdditionalPlugins: []rabbitmqv1beta1.Plugin{
"rabbitmq_mqtt",
"rabbitmq_stomp",
"rabbitmq_stream",
},
},
},
Expand All @@ -440,7 +442,10 @@ CONSOLE_LOG=new`
mqtt.listeners.tcp = none
stomp.listeners.ssl.1 = 61614
stomp.listeners.tcp = none`)
stomp.listeners.tcp = none
stream.listeners.ssl.default = 5551
stream.listeners.tcp = none`)

Expect(configMapBuilder.Update(configMap)).To(Succeed())
Expect(configMap.Data).To(HaveKeyWithValue("userDefinedConfiguration.conf", expectedConfiguration))
Expand Down
17 changes: 17 additions & 0 deletions internal/resource/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ func (builder *ServiceBuilder) generateServicePortsMapOnlyTLSListeners() map[str
}
}

if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
servicePortsMap["streams"] = corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: 5551,
Name: "streams",
TargetPort: intstr.FromInt(5551),
}
}

if builder.Instance.MutualTLSEnabled() {
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_stomp") {
servicePortsMap["web-stomp-tls"] = corev1.ServicePort{
Expand Down Expand Up @@ -261,6 +270,14 @@ func (builder *ServiceBuilder) generateServicePortsMap() map[string]corev1.Servi
TargetPort: intstr.FromInt(8883),
}
}
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
servicePortsMap["streams"] = corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: 5551,
Name: "streams",
TargetPort: intstr.FromInt(5551),
}
}

// We expose either 15692 or 15691 in the Service, but not both.
// If we exposed both ports, a ServiceMonitor selecting all RabbitMQ pods and
Expand Down
11 changes: 9 additions & 2 deletions internal/resource/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ var _ = Context("Services", func() {
))
})

When("mqtt and stomp are enabled", func() {
When("mqtt, stomp and stream are enabled", func() {
It("opens ports for those plugins", func() {
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp"}
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp", "rabbitmq_stream"}
Expect(serviceBuilder.Update(svc)).To(Succeed())
Expect(svc.Spec.Ports).To(ContainElements([]corev1.ServicePort{
{
Expand All @@ -137,6 +137,12 @@ var _ = Context("Services", func() {
Port: 61614,
TargetPort: intstr.FromInt(61614),
},
{
Name: "streams",
Protocol: corev1.ProtocolTCP,
Port: 5551,
TargetPort: intstr.FromInt(5551),
},
}))
})
})
Expand Down Expand Up @@ -227,6 +233,7 @@ var _ = Context("Services", func() {
Entry("MQTT-over-WebSockets", "rabbitmq_web_mqtt", "web-mqtt-tls", 15676),
Entry("STOMP", "rabbitmq_stomp", "stomps", 61614),
Entry("STOMP-over-WebSockets", "rabbitmq_web_stomp", "web-stomp-tls", 15673),
Entry("Stream", "rabbitmq_stream", "streams", 5551),
)
})
})
Expand Down
14 changes: 14 additions & 0 deletions internal/resource/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,13 @@ func (builder *StatefulSetBuilder) updateContainerPorts() []corev1.ContainerPort
})
}

if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
ports = append(ports, corev1.ContainerPort{
Name: "streams",
ContainerPort: 5551,
})
}

if builder.Instance.MutualTLSEnabled() {
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_mqtt") {
ports = append(ports, corev1.ContainerPort{
Expand Down Expand Up @@ -837,6 +844,13 @@ func (builder *StatefulSetBuilder) updateContainerPortsOnlyTLSListeners() []core
})
}

if builder.Instance.AdditionalPluginEnabled("rabbitmq_stream") {
ports = append(ports, corev1.ContainerPort{
Name: "streams",
ContainerPort: 5551,
})
}

if builder.Instance.MutualTLSEnabled() {
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_mqtt") {
ports = append(ports, corev1.ContainerPort{
Expand Down
16 changes: 12 additions & 4 deletions internal/resource/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,9 +728,9 @@ var _ = Describe("StatefulSet", func() {
}))
})

It("opens tls ports when mqtt and stomp are configured", func() {
It("opens tls ports when mqtt, stomp and stream are configured", func() {
instance.Spec.TLS.SecretName = "tls-secret"
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp"}
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp", "rabbitmq_stream"}
Expect(stsBuilder.Update(statefulSet)).To(Succeed())

rabbitmqContainerSpec := extractContainer(statefulSet.Spec.Template.Spec.Containers, "rabbitmq")
Expand All @@ -744,6 +744,10 @@ var _ = Describe("StatefulSet", func() {
Name: "stomps",
ContainerPort: 61614,
},
{
Name: "streams",
ContainerPort: 5551,
},
}))
})

Expand Down Expand Up @@ -832,8 +836,8 @@ var _ = Describe("StatefulSet", func() {
}))
})

It("disables non tls ports for mqtt and stomp when configured", func() {
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp"}
It("disables non tls ports for mqtt, stomp and stream if enabled", func() {
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp", "rabbitmq_stream"}
Expect(stsBuilder.Update(statefulSet)).To(Succeed())

rabbitmqContainerSpec := extractContainer(statefulSet.Spec.Template.Spec.Containers, "rabbitmq")
Expand Down Expand Up @@ -862,6 +866,10 @@ var _ = Describe("StatefulSet", func() {
Name: "stomps",
ContainerPort: 61614,
},
{
Name: "streams",
ContainerPort: 5551,
},
}))
})

Expand Down

0 comments on commit 81b26b7

Please sign in to comment.