Skip to content

Commit

Permalink
Place RabbitMQ configuration in confd (#548)
Browse files Browse the repository at this point in the history
* Place RabbitMQ configuration in confd

Files places in confd are loaded after 'rabbitmq.conf' is loaded,
allowing the user configuration to take precedence. The ConfigMap has
keys without any numeric prefix because file naming is done in
StatefulSet's pod template volume mounts. This allows flexibility in the
future, shall we decide we rename or change the numeric prefixes.

This commit also adds a condition to decide whether to include the
server ConfigMap as a volume. This happens if RabbitMQ env or advanced
config are set in the RabbitmqCluster Spec. The server conf is partially
projected into a volume that is always specified.

* Use a string builder to generate RMQ configuration

String builder is more efficient than a byte buffer for strings.

* Rename additionalConfig to userDefinedConfiguration

The previous name could set an expectation to be a 1-to-1 mapping
between the file and the contents in `.spec.rabbitmq.additionalConfig`.
This was not intended, as this file can contain other configuration,
generated based on the user input.

Reverted the change that used string concatenation instead of multi-line
string in test expectations.
  • Loading branch information
Zerpet authored Jan 11, 2021
1 parent bba233e commit 5191f32
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 310 deletions.
28 changes: 17 additions & 11 deletions controllers/rabbitmqcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,22 +886,28 @@ var _ = Describe("RabbitmqClusterController", func() {
},
},
},
{
ConfigMap: &corev1.ConfigMapProjection{
LocalObjectReference: corev1.LocalObjectReference{
Name: "rabbitmq-sts-override-server-conf",
},
Items: []corev1.KeyToPath{
{
Key: "operatorDefaults.conf",
Path: "operatorDefaults.conf",
},
{
Key: "userDefinedConfiguration.conf",
Path: "userDefinedConfiguration.conf",
},
},
},
},
},
DefaultMode: &defaultMode,
},
},
},
corev1.Volume{
Name: "server-conf",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
DefaultMode: &defaultMode,
LocalObjectReference: corev1.LocalObjectReference{
Name: "rabbitmq-sts-override-server-conf",
},
},
},
},
corev1.Volume{
Name: "plugins-conf",
VolumeSource: corev1.VolumeSource{
Expand Down
80 changes: 46 additions & 34 deletions internal/resource/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
package resource

import (
"bytes"
"fmt"
"strings"

"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -80,135 +81,146 @@ func (builder *ServerConfigMapBuilder) Update(object client.Object) error {
configMap := object.(*corev1.ConfigMap)

ini.PrettySection = false // Remove trailing new line because rabbitmq.conf has only a default section.
cfg, err := ini.Load([]byte(defaultRabbitmqConf))
operatorConfiguration, err := ini.Load([]byte(defaultRabbitmqConf))
if err != nil {
return err
}
defaultSection := cfg.Section("")
defaultSection := operatorConfiguration.Section("")

if _, err := defaultSection.NewKey("cluster_name", builder.Instance.Name); err != nil {
return err
}

userConfiguration := ini.Empty(ini.LoadOptions{})
userConfigurationSection := userConfiguration.Section("")

if builder.Instance.TLSEnabled() {
if err := cfg.Append([]byte(defaultTLSConf)); err != nil {
if err := userConfiguration.Append([]byte(defaultTLSConf)); err != nil {
return err
}
if builder.Instance.DisableNonTLSListeners() {
if _, err := defaultSection.NewKey("listeners.tcp", "none"); err != nil {
if _, err := userConfigurationSection.NewKey("listeners.tcp", "none"); err != nil {
return err
}
} else {
// management plugin does not have a *.listeners.tcp settings like other plugins
// management tcp listener can be disabled by setting management.ssl.port without setting management.tcp.port
// we set management tcp listener only if tls is enabled and disableNonTLSListeners is false
if _, err := defaultSection.NewKey("management.tcp.port", "15672"); err != nil {
if _, err := userConfigurationSection.NewKey("management.tcp.port", "15672"); err != nil {
return err
}

if _, err := defaultSection.NewKey("prometheus.tcp.port", "15692"); err != nil {
if _, err := userConfigurationSection.NewKey("prometheus.tcp.port", "15692"); err != nil {
return err
}
}
if builder.Instance.AdditionalPluginEnabled("rabbitmq_mqtt") {
if _, err := defaultSection.NewKey("mqtt.listeners.ssl.default", "8883"); err != nil {
if _, err := userConfigurationSection.NewKey("mqtt.listeners.ssl.default", "8883"); err != nil {
return err
}
if builder.Instance.DisableNonTLSListeners() {
if _, err := defaultSection.NewKey("mqtt.listeners.tcp", "none"); err != nil {
if _, err := userConfigurationSection.NewKey("mqtt.listeners.tcp", "none"); err != nil {
return err
}
}
}
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stomp") {
if _, err := defaultSection.NewKey("stomp.listeners.ssl.1", "61614"); err != nil {
if _, err := userConfigurationSection.NewKey("stomp.listeners.ssl.1", "61614"); err != nil {
return err
}
if builder.Instance.DisableNonTLSListeners() {
if _, err := defaultSection.NewKey("stomp.listeners.tcp", "none"); err != nil {
if _, err := userConfigurationSection.NewKey("stomp.listeners.tcp", "none"); err != nil {
return err
}
}
}
}

if builder.Instance.MutualTLSEnabled() {
if _, err := defaultSection.NewKey("ssl_options.cacertfile", caCertPath); err != nil {
if _, err := userConfigurationSection.NewKey("ssl_options.cacertfile", caCertPath); err != nil {
return err
}
if _, err := defaultSection.NewKey("ssl_options.verify", "verify_peer"); err != nil {
if _, err := userConfigurationSection.NewKey("ssl_options.verify", "verify_peer"); err != nil {
return err
}

if _, err := defaultSection.NewKey("management.ssl.cacertfile", caCertPath); err != nil {
if _, err := userConfigurationSection.NewKey("management.ssl.cacertfile", caCertPath); err != nil {
return err
}

if _, err := defaultSection.NewKey("prometheus.ssl.cacertfile", caCertPath); err != nil {
if _, err := userConfigurationSection.NewKey("prometheus.ssl.cacertfile", caCertPath); err != nil {
return err
}

if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_mqtt") {
if _, err := defaultSection.NewKey("web_mqtt.ssl.port", "15676"); err != nil {
if _, err := userConfigurationSection.NewKey("web_mqtt.ssl.port", "15676"); err != nil {
return err
}
if _, err := defaultSection.NewKey("web_mqtt.ssl.cacertfile", caCertPath); err != nil {
if _, err := userConfigurationSection.NewKey("web_mqtt.ssl.cacertfile", caCertPath); err != nil {
return err
}
if _, err := defaultSection.NewKey("web_mqtt.ssl.certfile", tlsCertPath); err != nil {
if _, err := userConfigurationSection.NewKey("web_mqtt.ssl.certfile", tlsCertPath); err != nil {
return err
}
if _, err := defaultSection.NewKey("web_mqtt.ssl.keyfile", tlsKeyPath); err != nil {
if _, err := userConfigurationSection.NewKey("web_mqtt.ssl.keyfile", tlsKeyPath); err != nil {
return err
}
if builder.Instance.DisableNonTLSListeners() {
if _, err := defaultSection.NewKey("web_mqtt.tcp.listener", "none"); err != nil {
if _, err := userConfigurationSection.NewKey("web_mqtt.tcp.listener", "none"); err != nil {
return err
}
}
}
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_stomp") {
if _, err := defaultSection.NewKey("web_stomp.ssl.port", "15673"); err != nil {
if _, err := userConfigurationSection.NewKey("web_stomp.ssl.port", "15673"); err != nil {
return err
}
if _, err := defaultSection.NewKey("web_stomp.ssl.cacertfile", caCertPath); err != nil {
if _, err := userConfigurationSection.NewKey("web_stomp.ssl.cacertfile", caCertPath); err != nil {
return err
}
if _, err := defaultSection.NewKey("web_stomp.ssl.certfile", tlsCertPath); err != nil {
if _, err := userConfigurationSection.NewKey("web_stomp.ssl.certfile", tlsCertPath); err != nil {
return err
}
if _, err := defaultSection.NewKey("web_stomp.ssl.keyfile", tlsKeyPath); err != nil {
if _, err := userConfigurationSection.NewKey("web_stomp.ssl.keyfile", tlsKeyPath); err != nil {
return err
}
if builder.Instance.DisableNonTLSListeners() {
if _, err := defaultSection.NewKey("web_stomp.tcp.listener", "none"); err != nil {
if _, err := userConfigurationSection.NewKey("web_stomp.tcp.listener", "none"); err != nil {
return err
}
}
}
}

if builder.Instance.MemoryLimited() {
if _, err := defaultSection.NewKey("total_memory_available_override_value", fmt.Sprintf("%d", removeHeadroom(builder.Instance.Spec.Resources.Limits.Memory().Value()))); err != nil {
if _, err := userConfigurationSection.NewKey("total_memory_available_override_value", fmt.Sprintf("%d", removeHeadroom(builder.Instance.Spec.Resources.Limits.Memory().Value()))); err != nil {
return err
}
}

rmqProperties := builder.Instance.Spec.Rabbitmq
if err := cfg.Append([]byte(rmqProperties.AdditionalConfig)); err != nil {
return fmt.Errorf("failed to append spec.rabbitmq.additionalConfig: %w", err)
}

var rmqConfBuffer bytes.Buffer
if _, err := cfg.WriteTo(&rmqConfBuffer); err != nil {
var rmqConfBuffer strings.Builder
if _, err := operatorConfiguration.WriteTo(&rmqConfBuffer); err != nil {
return err
}

if configMap.Data == nil {
configMap.Data = make(map[string]string)
}

configMap.Data["rabbitmq.conf"] = rmqConfBuffer.String()
configMap.Data["operatorDefaults.conf"] = rmqConfBuffer.String()

rmqConfBuffer.Reset()

rmqProperties := builder.Instance.Spec.Rabbitmq
if err := userConfiguration.Append([]byte(rmqProperties.AdditionalConfig)); err != nil {
return fmt.Errorf("failed to append spec.rabbitmq.additionalConfig: %w", err)
}

if _, err := userConfiguration.WriteTo(&rmqConfBuffer); err != nil {
return err
}

configMap.Data["userDefinedConfiguration.conf"] = rmqConfBuffer.String()

updateProperty(configMap.Data, "advanced.config", rmqProperties.AdvancedConfig)
updateProperty(configMap.Data, "rabbitmq-env.conf", rmqProperties.EnvConfig)
Expand Down
Loading

0 comments on commit 5191f32

Please sign in to comment.