Skip to content

Commit

Permalink
Support custom connect uri through annotation (#644)
Browse files Browse the repository at this point in the history
- allow connection uri to be overwritten by annotation in
rabbitmqcluster
  • Loading branch information
ChunyiLyu authored Jun 29, 2023
1 parent a9296f0 commit 685e56d
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 8 deletions.
37 changes: 29 additions & 8 deletions rabbitmqclient/cluster_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const uriAnnotationKey = "rabbitmq.com/operator-connection-uri"

type ClusterCredentials struct {
data map[string][]byte
}
Expand Down Expand Up @@ -90,6 +92,18 @@ func ParseReference(ctx context.Context, c client.Client, rmq topology.RabbitmqC
}
}

if uriAnnotation, ok := cluster.Annotations[uriAnnotationKey]; ok {
uri, useTLSForConnection, err := extractURIandScheme(uriAnnotation)
if err != nil {
return nil, false, err
}
return map[string]string{
"username": user,
"password": pass,
"uri": uri,
}, useTLSForConnection, nil
}

svc := &corev1.Service{}
if err := c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cluster.Status.DefaultUser.ServiceReference.Name}, svc); err != nil {
return nil, false, err
Expand Down Expand Up @@ -140,15 +154,9 @@ func readCredentialsFromKubernetesSecret(secret *corev1.Secret) (map[string]stri
return nil, false, keyMissingErr("uri")
}

uri := string(uBytes)
if !strings.HasPrefix(uri, "http") {
uri = "http://" + uri // set scheme to http if not provided
}
var tlsEnabled bool
if parsed, err := url.Parse(uri); err != nil {
uri, tlsEnabled, err := extractURIandScheme(string(uBytes))
if err != nil {
return nil, false, err
} else if parsed.Scheme == "https" {
tlsEnabled = true
}

return map[string]string{
Expand All @@ -158,6 +166,19 @@ func readCredentialsFromKubernetesSecret(secret *corev1.Secret) (map[string]stri
}, tlsEnabled, nil
}

func extractURIandScheme(uri string) (string, bool, error) {
if !strings.HasPrefix(uri, "http") {
uri = "http://" + uri // set scheme to http if not provided
}

if parsed, err := url.Parse(uri); err != nil {
return "", false, err
} else if parsed.Scheme == "https" {
return uri, true, nil
}
return uri, false, nil
}

func readClusterAdditionalConfig(cluster *rabbitmqv1beta1.RabbitmqCluster) (additionalConfig map[string]string, err error) {
cfg, err := ini.Load([]byte(cluster.Spec.Rabbitmq.AdditionalConfig))
if err != nil {
Expand Down
154 changes: 154 additions & 0 deletions rabbitmqclient/cluster_reference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var _ = Describe("ParseReference", func() {
existingService *corev1.Service
ctx = context.Background()
namespace = "rabbitmq-system"
uriAnnotationKey = "rabbitmq.com/operator-connection-uri"
)

JustBeforeEach(func() {
Expand Down Expand Up @@ -765,6 +766,159 @@ var _ = Describe("ParseReference", func() {
})

})

When("the RabbitmqCluster is annotated with connection uri override", func() {
BeforeEach(func() {
existingRabbitMQCluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
Annotations: map[string]string{
uriAnnotationKey: "http://a-rabbitmq-test:2333",
},
},
Status: rabbitmqv1beta1.RabbitmqClusterStatus{
Binding: &corev1.LocalObjectReference{
Name: "rmq-default-user-credentials",
},
DefaultUser: &rabbitmqv1beta1.RabbitmqClusterDefaultUser{
ServiceReference: &rabbitmqv1beta1.RabbitmqClusterServiceReference{
Name: "rmq",
Namespace: namespace,
},
},
},
}
existingCredentialSecret = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-default-user-credentials",
Namespace: namespace,
},
Data: map[string][]byte{
"username": []byte(existingRabbitMQUsername),
"password": []byte(existingRabbitMQPassword),
},
}
existingService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "1.2.3.4",
Ports: []corev1.ServicePort{
{
Name: "management",
Port: int32(15672),
},
},
},
}
objs = []runtime.Object{existingRabbitMQCluster, existingCredentialSecret, existingService}
})

It("returns correct credentials", func() {
creds, tlsOn, err := rabbitmqclient.ParseReference(ctx, fakeClient,
topology.RabbitmqClusterReference{Name: existingRabbitMQCluster.Name},
existingRabbitMQCluster.Namespace,
"",
false)
Expect(err).NotTo(HaveOccurred())
Expect(tlsOn).To(BeFalse())

usernameBytes, _ := creds["username"]
passwordBytes, _ := creds["password"]
uriBytes, _ := creds["uri"]
Expect(usernameBytes).To(Equal(existingRabbitMQUsername))
Expect(passwordBytes).To(Equal(existingRabbitMQPassword))
Expect(uriBytes).To(Equal("http://a-rabbitmq-test:2333"))
})

When("annotated URI has no scheme", func() {
BeforeEach(func() {
*existingRabbitMQCluster = rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
Annotations: map[string]string{
uriAnnotationKey: "a-rabbitmq-test:7890",
},
},
Status: rabbitmqv1beta1.RabbitmqClusterStatus{
Binding: &corev1.LocalObjectReference{
Name: "rmq-default-user-credentials",
},
DefaultUser: &rabbitmqv1beta1.RabbitmqClusterDefaultUser{
ServiceReference: &rabbitmqv1beta1.RabbitmqClusterServiceReference{
Name: "rmq",
Namespace: namespace,
},
},
},
}
})

It("sets http as the scheme", func() {
creds, tlsOn, err := rabbitmqclient.ParseReference(ctx, fakeClient,
topology.RabbitmqClusterReference{Name: existingRabbitMQCluster.Name},
existingRabbitMQCluster.Namespace,
"",
false)
Expect(err).NotTo(HaveOccurred())
Expect(tlsOn).To(BeFalse())

usernameBytes, _ := creds["username"]
passwordBytes, _ := creds["password"]
uriBytes, _ := creds["uri"]
Expect(usernameBytes).To(Equal(existingRabbitMQUsername))
Expect(passwordBytes).To(Equal(existingRabbitMQPassword))
Expect(uriBytes).To(Equal("http://a-rabbitmq-test:7890"))
})
})

When("annotated URI has https as scheme", func() {
BeforeEach(func() {
*existingRabbitMQCluster = rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
Annotations: map[string]string{
uriAnnotationKey: "https://a-rabbitmq-test:2333",
},
},
Status: rabbitmqv1beta1.RabbitmqClusterStatus{
Binding: &corev1.LocalObjectReference{
Name: "rmq-default-user-credentials",
},
DefaultUser: &rabbitmqv1beta1.RabbitmqClusterDefaultUser{
ServiceReference: &rabbitmqv1beta1.RabbitmqClusterServiceReference{
Name: "rmq",
Namespace: namespace,
},
},
},
}
})

It("returns correct credentials", func() {
creds, tlsOn, err := rabbitmqclient.ParseReference(ctx, fakeClient,
topology.RabbitmqClusterReference{Name: existingRabbitMQCluster.Name},
existingRabbitMQCluster.Namespace,
"",
false)
Expect(err).NotTo(HaveOccurred())
Expect(tlsOn).To(BeTrue())

usernameBytes, _ := creds["username"]
passwordBytes, _ := creds["password"]
uriBytes, _ := creds["uri"]
Expect(usernameBytes).To(Equal(existingRabbitMQUsername))
Expect(passwordBytes).To(Equal(existingRabbitMQPassword))
Expect(uriBytes).To(Equal("https://a-rabbitmq-test:2333"))
})
})

})
})

var _ = Describe("AllowedNamespace", func() {
Expand Down

0 comments on commit 685e56d

Please sign in to comment.