Skip to content

Commit

Permalink
Support Management over TLS (#115)
Browse files Browse the repository at this point in the history
* Support Management over TLS

Note that this skips certificate validation, and only prevents a panic,
rather than performing a full TLS verification.

* Refactor certificate generation into testutils

* Allow x509.CertPool to be provided to client factory

* Add system tests for TLS enforced server

* Connect to RabbitmqCluster over FQDN instead of IP address

* Fix controller integration tests

* Use Go1.16+ for PR system tests

The ubuntu-latest CI image provided by GH doesn't yet include Go1.16, so
we have to manually fetch it first

* Bump test timeouts for system tests

* Add integration tests to CI
  • Loading branch information
coro authored Apr 22, 2021
1 parent 318332d commit 1044f0b
Show file tree
Hide file tree
Showing 38 changed files with 1,392 additions and 407 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ jobs:
uses: actions/checkout@v2
- name: Unit tests
run: make unit-tests
- name: Integration tests
run: make integration-tests

system_tests:
name: system tests
Expand All @@ -25,6 +27,9 @@ jobs:
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '^1.16.0' # Require Go 1.16 and above, but lower than Go 2.0.0
- name: System tests
env:
K8S_VERSION: ${{ matrix.k8s }}
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Messaging Topology Operator is tested with the latest release of RabbitMQ [Clust
It uses the generated default user secret from RabbitmqCluster (set in `rabbitmqcluster.status.binding`) to authenticate with RabbitMQ server.
If your RabbitmqCluster is deployed with import definitions or provided default user credentials,
the default user secret from `rabbitmqcluster.status.binding` may not be correct and Messaging Topology Operator will fail with authentication error.
If your RabbitmqCluster is configured to serve management traffic over TLS, you may need to configure the Messaging Topology Operator to trust the CA that signed the server's certificates. For more information, see [this doc](./docs/installation/management-over-https.md).

## Contributing

Expand Down
30 changes: 21 additions & 9 deletions controllers/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,30 @@ func (r *BindingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return reconcile.Result{}, client.IgnoreNotFound(err)
}

rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, binding.Spec.RabbitmqClusterReference, binding.Namespace)
systemCertPool, err := extractSystemCertPool(ctx, r.Recorder, binding)
if err != nil {
return ctrl.Result{}, err
}

if errors.Is(err, internal.NoSuchRabbitmqClusterError) && binding.ObjectMeta.DeletionTimestamp.IsZero() {
rmq, svc, secret, err := internal.ParseRabbitmqClusterReference(ctx, r.Client, binding.Spec.RabbitmqClusterReference, binding.Namespace)
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && !binding.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info(noSuchRabbitDeletion, "binding", binding.Name)
r.Recorder.Event(binding, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted binding")
return reconcile.Result{}, r.removeFinalizer(ctx, binding)
}
if errors.Is(err, internal.NoSuchRabbitmqClusterError) {
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
}
if err != nil {
logger.Error(err, failedParseClusterRef)
return reconcile.Result{}, err
}

rabbitClient, err := r.RabbitmqClientFactory(rmq, svc, secret, serviceDNSAddress(svc), systemCertPool)
if err != nil {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -133,12 +151,6 @@ func (r *BindingReconciler) declareBinding(ctx context.Context, client internal.
func (r *BindingReconciler) deleteBinding(ctx context.Context, client internal.RabbitMQClient, binding *topology.Binding) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil || reflect.ValueOf(client).IsNil() {
logger.Info(noSuchRabbitDeletion, "binding", binding.Name)
r.Recorder.Event(binding, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted binding")
return r.removeFinalizer(ctx, binding)
}

var info *rabbithole.BindingInfo
var err error
if binding.Spec.Arguments != nil {
Expand Down
19 changes: 0 additions & 19 deletions controllers/binding_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,6 @@ var _ = Describe("bindingController", func() {
})
})

When("the RabbitMQ cluster is nil", func() {
BeforeEach(func() {
bindingName = "delete-binding-client-not-found-error"
})

JustBeforeEach(func() {
prepareNoSuchClusterError()
})

It("successfully deletes the binding regardless", func() {
Expect(client.Delete(ctx, &binding)).To(Succeed())
Eventually(func() bool {
err := client.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &topology.Binding{})
return apierrors.IsNotFound(err)
}, 5).Should(BeTrue())
Expect(observedEvents()).To(ContainElement("Normal SuccessfulDelete successfully deleted binding"))
})
})

When("the RabbitMQ Client successfully deletes a binding", func() {
BeforeEach(func() {
bindingName = "delete-binding-success"
Expand Down
23 changes: 23 additions & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
package controllers

import (
"context"
"crypto/x509"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
)

// common error messages shared across controllers
const (
failedStatusUpdate = "failed to update object status"
failedMarshalSpec = "failed to marshal spec"
failedGenerateRabbitClient = "failed to generate http rabbitClient"
failedParseClusterRef = "failed to retrieve cluster from reference"
failedRetrieveSysCertPool = "failed to retrieve system trusted certs"
noSuchRabbitDeletion = "RabbitmqCluster is already gone: cannot find its connection secret"
)

Expand All @@ -19,3 +31,14 @@ const (
PermissionControllerName = "permission-controller"
SchemaReplicationControllerName = "schema-replication-controller"
)

func extractSystemCertPool(ctx context.Context, recorder record.EventRecorder, object runtime.Object) (*x509.CertPool, error) {
logger := ctrl.LoggerFrom(ctx)

systemCertPool, err := x509.SystemCertPool()
if err != nil {
recorder.Event(object, corev1.EventTypeWarning, "FailedUpdate", failedRetrieveSysCertPool)
logger.Error(err, failedRetrieveSysCertPool)
}
return systemCertPool, err
}
34 changes: 22 additions & 12 deletions controllers/exchange_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"context"
"encoding/json"
"errors"
"reflect"
"time"

"github.com/rabbitmq/messaging-topology-operator/internal"
Expand Down Expand Up @@ -53,13 +52,30 @@ func (r *ExchangeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return reconcile.Result{}, client.IgnoreNotFound(err)
}

rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, exchange.Spec.RabbitmqClusterReference, exchange.Namespace)
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && exchange.ObjectMeta.DeletionTimestamp.IsZero() {
systemCertPool, err := extractSystemCertPool(ctx, r.Recorder, exchange)
if err != nil {
return ctrl.Result{}, err
}

rmq, svc, secret, err := internal.ParseRabbitmqClusterReference(ctx, r.Client, exchange.Spec.RabbitmqClusterReference, exchange.Namespace)
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && !exchange.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info(noSuchRabbitDeletion, "exchange", exchange.Name)
r.Recorder.Event(exchange, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted exchange")
return reconcile.Result{}, r.removeFinalizer(ctx, exchange)
}
if errors.Is(err, internal.NoSuchRabbitmqClusterError) {
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
}
if err != nil {
logger.Error(err, failedParseClusterRef)
return reconcile.Result{}, err
}

rabbitClient, err := r.RabbitmqClientFactory(rmq, svc, secret, serviceDNSAddress(svc), systemCertPool)
if err != nil {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -143,12 +159,6 @@ func (r *ExchangeReconciler) addFinalizerIfNeeded(ctx context.Context, e *topolo
func (r *ExchangeReconciler) deleteExchange(ctx context.Context, client internal.RabbitMQClient, exchange *topology.Exchange) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil || reflect.ValueOf(client).IsNil() {
logger.Info(noSuchRabbitDeletion, "exchange", exchange.Name)
r.Recorder.Event(exchange, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted exchange")
return r.removeFinalizer(ctx, exchange)
}

err := validateResponseForDeletion(client.DeleteExchange(exchange.Spec.Vhost, exchange.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find exchange in rabbitmq server; already deleted", "exchange", exchange.Spec.Name)
Expand Down
19 changes: 0 additions & 19 deletions controllers/exchange_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,6 @@ var _ = Describe("exchange-controller", func() {
})
})

When("the RabbitMQ cluster is nil", func() {
BeforeEach(func() {
exchangeName = "delete-client-not-found-error"
})

JustBeforeEach(func() {
prepareNoSuchClusterError()
})

It("successfully deletes the exchange regardless", func() {
Expect(client.Delete(ctx, &exchange)).To(Succeed())
Eventually(func() bool {
err := client.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &topology.Exchange{})
return apierrors.IsNotFound(err)
}, 5).Should(BeTrue())
Expect(observedEvents()).To(ContainElement("Normal SuccessfulDelete successfully deleted exchange"))
})
})

When("the RabbitMQ Client successfully deletes a exchange", func() {
BeforeEach(func() {
exchangeName = "delete-exchange-success"
Expand Down
34 changes: 22 additions & 12 deletions controllers/permission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"reflect"
"time"

"github.com/rabbitmq/messaging-topology-operator/internal"
Expand Down Expand Up @@ -44,13 +43,30 @@ func (r *PermissionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, client.IgnoreNotFound(err)
}

rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, permission.Spec.RabbitmqClusterReference, permission.Namespace)
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && permission.ObjectMeta.DeletionTimestamp.IsZero() {
systemCertPool, err := extractSystemCertPool(ctx, r.Recorder, permission)
if err != nil {
return ctrl.Result{}, err
}

rmq, svc, secret, err := internal.ParseRabbitmqClusterReference(ctx, r.Client, permission.Spec.RabbitmqClusterReference, permission.Namespace)
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && !permission.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info(noSuchRabbitDeletion, "permission", permission.Name)
r.Recorder.Event(permission, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted permission")
return reconcile.Result{}, r.removeFinalizer(ctx, permission)
}
if errors.Is(err, internal.NoSuchRabbitmqClusterError) {
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
}
if err != nil {
logger.Error(err, failedParseClusterRef)
return reconcile.Result{}, err
}

rabbitClient, err := r.RabbitmqClientFactory(rmq, svc, secret, serviceDNSAddress(svc), systemCertPool)
if err != nil {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -123,12 +139,6 @@ func (r *PermissionReconciler) addFinalizerIfNeeded(ctx context.Context, permiss
func (r *PermissionReconciler) revokePermissions(ctx context.Context, client internal.RabbitMQClient, permission *topology.Permission) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil || reflect.ValueOf(client).IsNil() {
logger.Info(noSuchRabbitDeletion, "permission", permission.Name)
r.Recorder.Event(permission, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted permission")
return r.removeFinalizer(ctx, permission)
}

err := validateResponseForDeletion(client.ClearPermissionsIn(permission.Spec.Vhost, permission.Spec.User))
if errors.Is(err, NotFound) {
logger.Info("cannot find user or vhost in rabbitmq server; no need to delete permission", "user", permission.Spec.User, "vhost", permission.Spec.Vhost)
Expand Down
19 changes: 0 additions & 19 deletions controllers/permission_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,6 @@ var _ = Describe("permission-controller", func() {
})
})

When("the RabbitMQ cluster is nil", func() {
BeforeEach(func() {
permissionName = "delete-client-not-found-error"
})

JustBeforeEach(func() {
prepareNoSuchClusterError()
})

It("successfully deletes the permission regardless", func() {
Expect(client.Delete(ctx, &permission)).To(Succeed())
Eventually(func() bool {
err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{})
return apierrors.IsNotFound(err)
}, 5).Should(BeTrue())
Expect(observedEvents()).To(ContainElement("Normal SuccessfulDelete successfully deleted permission"))
})
})

When("the RabbitMQ Client successfully deletes a permission", func() {
BeforeEach(func() {
permissionName = "delete-permission-success"
Expand Down
34 changes: 22 additions & 12 deletions controllers/policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"context"
"encoding/json"
"errors"
"reflect"
"time"

"github.com/rabbitmq/messaging-topology-operator/internal"
Expand Down Expand Up @@ -54,13 +53,30 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return reconcile.Result{}, client.IgnoreNotFound(err)
}

rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, policy.Spec.RabbitmqClusterReference, policy.Namespace)
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && policy.ObjectMeta.DeletionTimestamp.IsZero() {
systemCertPool, err := extractSystemCertPool(ctx, r.Recorder, policy)
if err != nil {
return ctrl.Result{}, err
}

rmq, svc, secret, err := internal.ParseRabbitmqClusterReference(ctx, r.Client, policy.Spec.RabbitmqClusterReference, policy.Namespace)
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && !policy.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info(noSuchRabbitDeletion, "policy", policy.Name)
r.Recorder.Event(policy, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted policy")
return reconcile.Result{}, r.removeFinalizer(ctx, policy)
}
if errors.Is(err, internal.NoSuchRabbitmqClusterError) {
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
}
if err != nil {
logger.Error(err, failedParseClusterRef)
return reconcile.Result{}, err
}

rabbitClient, err := r.RabbitmqClientFactory(rmq, svc, secret, serviceDNSAddress(svc), systemCertPool)
if err != nil {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -143,12 +159,6 @@ func (r *PolicyReconciler) addFinalizerIfNeeded(ctx context.Context, policy *top
func (r *PolicyReconciler) deletePolicy(ctx context.Context, client internal.RabbitMQClient, policy *topology.Policy) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil || reflect.ValueOf(client).IsNil() {
logger.Info(noSuchRabbitDeletion, "policy", policy.Name)
r.Recorder.Event(policy, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted policy")
return r.removeFinalizer(ctx, policy)
}

err := validateResponseForDeletion(client.DeletePolicy(policy.Spec.Vhost, policy.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find policy in rabbitmq server; already deleted", "policy", policy.Spec.Name)
Expand Down
22 changes: 2 additions & 20 deletions controllers/policy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"bytes"
"errors"
"io/ioutil"
"k8s.io/apimachinery/pkg/runtime"
"net/http"
"time"

"k8s.io/apimachinery/pkg/runtime"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
Expand Down Expand Up @@ -179,25 +180,6 @@ var _ = Describe("policy-controller", func() {
})
})

When("the RabbitMQ cluster is nil", func() {
BeforeEach(func() {
policyName = "delete-client-not-found-error"
})

JustBeforeEach(func() {
prepareNoSuchClusterError()
})

It("successfully deletes the policy regardless", func() {
Expect(client.Delete(ctx, &policy)).To(Succeed())
Eventually(func() bool {
err := client.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &topology.Policy{})
return apierrors.IsNotFound(err)
}, 5).Should(BeTrue())
Expect(observedEvents()).To(ContainElement("Normal SuccessfulDelete successfully deleted policy"))
})
})

When("the RabbitMQ Client successfully deletes a policy", func() {
BeforeEach(func() {
policyName = "delete-policy-success"
Expand Down
Loading

0 comments on commit 1044f0b

Please sign in to comment.