diff --git a/PROJECT b/PROJECT index 943d94fa..be243df5 100644 --- a/PROJECT +++ b/PROJECT @@ -22,4 +22,7 @@ resources: - group: rabbitmq.com kind: Permission version: v1alpha2 +- group: rabbitmq.com + kind: SchemaReplication + version: v1alpha2 version: "2" diff --git a/api/v1alpha2/schemareplication_types.go b/api/v1alpha2/schemareplication_types.go new file mode 100644 index 00000000..1bd36b24 --- /dev/null +++ b/api/v1alpha2/schemareplication_types.go @@ -0,0 +1,61 @@ +/* +RabbitMQ Messaging Topology Kubernetes Operator +Copyright 2021 VMware, Inc. + +This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License. + +This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file. +*/ + +package v1alpha2 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// SchemaReplicationSpec defines the desired state of SchemaReplication +type SchemaReplicationSpec struct { + // Reference to the RabbitmqCluster that schema replication would be set for. Must be an existing cluster. + // +kubebuilder:validation:Required + RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"` + // Defines a Secret which contains credentials to be used for schema replication. + // The Secret must contain the keys `endpoints`, `username` and `password` in its Data field, or the controller errors. + // +kubebuilder:validation:Required + UpstreamSecret *corev1.LocalObjectReference `json:"upstreamSecret,omitempty"` +} + +// SchemaReplicationStatus defines the observed state of SchemaReplication +type SchemaReplicationStatus struct { + // observedGeneration is the most recent successful generation observed for this Queue. It corresponds to the + // Queue's generation, which is updated on mutation by the API Server. + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + Conditions []Condition `json:"conditions,omitempty"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status + +// SchemaReplication is the Schema for the schemareplications API +// This feature requires Tanzu RabbitMQ with schema replication plugin. +// For more information, see: https://tanzu.vmware.com/rabbitmq and https://www.rabbitmq.com/definitions-standby.html. +type SchemaReplication struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec SchemaReplicationSpec `json:"spec,omitempty"` + Status SchemaReplicationStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +// SchemaReplicationList contains a list of SchemaReplication +type SchemaReplicationList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []SchemaReplication `json:"items"` +} + +func init() { + SchemeBuilder.Register(&SchemaReplication{}, &SchemaReplicationList{}) +} diff --git a/api/v1alpha2/schemareplication_types_test.go b/api/v1alpha2/schemareplication_types_test.go new file mode 100644 index 00000000..495d6866 --- /dev/null +++ b/api/v1alpha2/schemareplication_types_test.go @@ -0,0 +1,39 @@ +package v1alpha2 + +import ( + "context" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("schemaReplication spec", func() { + It("creates a schemaReplication", func() { + replication := SchemaReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "replication", + Namespace: "default", + }, + Spec: SchemaReplicationSpec{ + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "some-cluster", + }, + UpstreamSecret: &corev1.LocalObjectReference{ + Name: "a-secret", + }, + }} + Expect(k8sClient.Create(context.Background(), &replication)).To(Succeed()) + + fetched := &SchemaReplication{} + Expect(k8sClient.Get(context.Background(), types.NamespacedName{ + Name: replication.Name, + Namespace: replication.Namespace, + }, fetched)).To(Succeed()) + Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{ + Name: "some-cluster", + })) + Expect(fetched.Spec.UpstreamSecret.Name).To(Equal("a-secret")) + }) +}) diff --git a/api/v1alpha2/zz_generated.deepcopy.go b/api/v1alpha2/zz_generated.deepcopy.go index 9c57622e..f8edaa55 100644 --- a/api/v1alpha2/zz_generated.deepcopy.go +++ b/api/v1alpha2/zz_generated.deepcopy.go @@ -555,6 +555,108 @@ func (in *RabbitmqClusterReference) DeepCopy() *RabbitmqClusterReference { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchemaReplication) DeepCopyInto(out *SchemaReplication) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchemaReplication. +func (in *SchemaReplication) DeepCopy() *SchemaReplication { + if in == nil { + return nil + } + out := new(SchemaReplication) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SchemaReplication) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchemaReplicationList) DeepCopyInto(out *SchemaReplicationList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]SchemaReplication, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchemaReplicationList. +func (in *SchemaReplicationList) DeepCopy() *SchemaReplicationList { + if in == nil { + return nil + } + out := new(SchemaReplicationList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SchemaReplicationList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchemaReplicationSpec) DeepCopyInto(out *SchemaReplicationSpec) { + *out = *in + out.RabbitmqClusterReference = in.RabbitmqClusterReference + if in.UpstreamSecret != nil { + in, out := &in.UpstreamSecret, &out.UpstreamSecret + *out = new(v1.LocalObjectReference) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchemaReplicationSpec. +func (in *SchemaReplicationSpec) DeepCopy() *SchemaReplicationSpec { + if in == nil { + return nil + } + out := new(SchemaReplicationSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchemaReplicationStatus) DeepCopyInto(out *SchemaReplicationStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchemaReplicationStatus. +func (in *SchemaReplicationStatus) DeepCopy() *SchemaReplicationStatus { + if in == nil { + return nil + } + out := new(SchemaReplicationStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *User) DeepCopyInto(out *User) { *out = *in diff --git a/config/crd/bases/rabbitmq.com_schemareplications.yaml b/config/crd/bases/rabbitmq.com_schemareplications.yaml new file mode 100644 index 00000000..4d61c30a --- /dev/null +++ b/config/crd/bases/rabbitmq.com_schemareplications.yaml @@ -0,0 +1,109 @@ + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.5.0 + creationTimestamp: null + name: schemareplications.rabbitmq.com +spec: + group: rabbitmq.com + names: + kind: SchemaReplication + listKind: SchemaReplicationList + plural: schemareplications + singular: schemareplication + scope: Namespaced + versions: + - name: v1alpha2 + schema: + openAPIV3Schema: + description: 'SchemaReplication is the Schema for the schemareplications API + This feature requires Tanzu RabbitMQ with schema replication plugin. For + more information, see: https://tanzu.vmware.com/rabbitmq and https://www.rabbitmq.com/definitions-standby.html.' + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: SchemaReplicationSpec defines the desired state of SchemaReplication + properties: + rabbitmqClusterReference: + description: Reference to the RabbitmqCluster that schema replication + would be set for. Must be an existing cluster. + properties: + name: + type: string + required: + - name + type: object + upstreamSecret: + description: Defines a Secret which contains credentials to be used + for schema replication. The Secret must contain the keys `endpoints`, + `username` and `password` in its Data field, or the controller errors. + properties: + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid?' + type: string + type: object + required: + - rabbitmqClusterReference + type: object + status: + description: SchemaReplicationStatus defines the observed state of SchemaReplication + properties: + conditions: + items: + properties: + lastTransitionTime: + description: The last time this Condition type changed. + format: date-time + type: string + message: + description: Full text reason for current status of the condition. + type: string + reason: + description: One word, camel-case reason for current status + of the condition. + type: string + status: + description: True, False, or Unknown + type: string + type: + description: Type indicates the scope of RabbitmqCluster status + addressed by the condition. + type: string + required: + - status + - type + type: object + type: array + observedGeneration: + description: observedGeneration is the most recent successful generation + observed for this Queue. It corresponds to the Queue's generation, + which is updated on mutation by the API Server. + format: int64 + type: integer + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 4b9594e2..e3a06b20 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -9,6 +9,7 @@ resources: - bases/rabbitmq.com_vhosts.yaml - bases/rabbitmq.com_policies.yaml - bases/rabbitmq.com_permissions.yaml +- bases/rabbitmq.com_schemareplications.yaml # +kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: @@ -19,6 +20,7 @@ patchesStrategicMerge: - patches/webhook_in_policies.yaml - patches/webhook_in_users.yaml - patches/webhook_in_permissions.yaml +#- patches/webhook_in_schemareplications.yaml # +kubebuilder:scaffold:crdkustomizewebhookpatch - patches/cainjection_in_bindings.yaml @@ -28,6 +30,7 @@ patchesStrategicMerge: - patches/cainjection_in_policies.yaml - patches/cainjection_in_users.yaml - patches/cainjection_in_permissions.yaml +#- patches/cainjection_in_schemareplications.yaml # +kubebuilder:scaffold:crdkustomizecainjectionpatch configurations: diff --git a/config/crd/patches/cainjection_in_schemareplications.yaml b/config/crd/patches/cainjection_in_schemareplications.yaml new file mode 100644 index 00000000..c1ec28aa --- /dev/null +++ b/config/crd/patches/cainjection_in_schemareplications.yaml @@ -0,0 +1,8 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: schemareplications.rabbitmq.com diff --git a/config/crd/patches/webhook_in_schemareplications.yaml b/config/crd/patches/webhook_in_schemareplications.yaml new file mode 100644 index 00000000..7c5883c5 --- /dev/null +++ b/config/crd/patches/webhook_in_schemareplications.yaml @@ -0,0 +1,17 @@ +# The following patch enables conversion webhook for CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: schemareplications.rabbitmq.com +spec: + conversion: + strategy: Webhook + webhook: + conversionReviewVersions: ["v1", "v1beta1"] + clientConfig: + caBundle: Cg== + service: + namespace: system + name: webhook-service + path: /convert diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index e5d57eff..c6f7a931 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -151,6 +151,26 @@ rules: verbs: - get - update +- apiGroups: + - rabbitmq.com + resources: + - schemareplications + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - rabbitmq.com + resources: + - schemareplications/status + verbs: + - get + - patch + - update - apiGroups: - rabbitmq.com resources: diff --git a/config/rbac/schemareplication_editor_role.yaml b/config/rbac/schemareplication_editor_role.yaml new file mode 100644 index 00000000..55e6225b --- /dev/null +++ b/config/rbac/schemareplication_editor_role.yaml @@ -0,0 +1,24 @@ +# permissions for end users to edit schemareplications. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: schemareplication-editor-role +rules: +- apiGroups: + - rabbitmq.com + resources: + - schemareplications + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - rabbitmq.com + resources: + - schemareplications/status + verbs: + - get diff --git a/config/rbac/schemareplication_viewer_role.yaml b/config/rbac/schemareplication_viewer_role.yaml new file mode 100644 index 00000000..7bb08025 --- /dev/null +++ b/config/rbac/schemareplication_viewer_role.yaml @@ -0,0 +1,20 @@ +# permissions for end users to view schemareplications. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: schemareplication-viewer-role +rules: +- apiGroups: + - rabbitmq.com + resources: + - schemareplications + verbs: + - get + - list + - watch +- apiGroups: + - rabbitmq.com + resources: + - schemareplications/status + verbs: + - get diff --git a/config/samples/rabbitmq.com_v1alpha2_schemareplication.yaml b/config/samples/rabbitmq.com_v1alpha2_schemareplication.yaml new file mode 100644 index 00000000..8fc1119a --- /dev/null +++ b/config/samples/rabbitmq.com_v1alpha2_schemareplication.yaml @@ -0,0 +1,7 @@ +apiVersion: rabbitmq.com/v1alpha2 +kind: SchemaReplication +metadata: + name: schemareplication-sample +spec: + # Add fields here + foo: bar diff --git a/controllers/common.go b/controllers/common.go index f980fe3c..439c2f3a 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -10,11 +10,12 @@ const ( // names for each of the controllers const ( - VhostControllerName = "vhost-controller" - QueueControllerName = "queue-controller" - ExchangeControllerName = "exchange-controller" - BindingControllerName = "binding-controller" - UserControllerName = "user-controller" - PolicyControllerName = "policy-controller" - PermissionControllerName = "permission-controller" + VhostControllerName = "vhost-controller" + QueueControllerName = "queue-controller" + ExchangeControllerName = "exchange-controller" + BindingControllerName = "binding-controller" + UserControllerName = "user-controller" + PolicyControllerName = "policy-controller" + PermissionControllerName = "permission-controller" + SchemaReplicationControllerName = "schema-replication-controller" ) diff --git a/controllers/schemareplication_controller.go b/controllers/schemareplication_controller.go new file mode 100644 index 00000000..2d50fd88 --- /dev/null +++ b/controllers/schemareplication_controller.go @@ -0,0 +1,188 @@ +package controllers + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/rabbitmq/messaging-topology-operator/internal" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + clientretry "k8s.io/client-go/util/retry" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" +) + +const replicationFinalizer = "deletion.finalizers.schemareplications.rabbitmq.com" +const schemaReplicationParameterName = "schema_definition_sync_upstream" + +// SchemaReplicationReconciler reconciles a SchemaReplication object +type SchemaReplicationReconciler struct { + client.Client + Log logr.Logger + Scheme *runtime.Scheme + Recorder record.EventRecorder + RabbitmqClientFactory internal.RabbitMQClientFactory +} + +// +kubebuilder:rbac:groups=rabbitmq.com,resources=schemareplications,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=rabbitmq.com,resources=schemareplications/status,verbs=get;update;patch + +func (r *SchemaReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := ctrl.LoggerFrom(ctx) + + replication := &topology.SchemaReplication{} + if err := r.Get(ctx, req.NamespacedName, replication); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) + } + + rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, replication.Spec.RabbitmqClusterReference, replication.Namespace) + // If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that + // the Cluster is temporarily down. Requeue until it comes back up. + if errors.Is(err, internal.NoSuchRabbitmqClusterError) && replication.ObjectMeta.DeletionTimestamp.IsZero() { + logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error()) + return reconcile.Result{RequeueAfter: 10 * time.Second}, err + } else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) { + logger.Error(err, failedGenerateRabbitClient) + return reconcile.Result{}, err + } + + if !replication.ObjectMeta.DeletionTimestamp.IsZero() { + logger.Info("Deleting") + return ctrl.Result{}, r.deleteSchemaReplicationParameters(ctx, rabbitClient, replication) + } + + if err := r.addFinalizerIfNeeded(ctx, replication); err != nil { + return ctrl.Result{}, err + } + + spec, err := json.Marshal(replication.Spec) + if err != nil { + logger.Error(err, failedMarshalSpec) + } + + logger.Info("Start reconciling", + "spec", string(spec)) + + if err := r.setSchemaReplicationUpstream(ctx, rabbitClient, replication); err != nil { + // Set Condition 'Ready' to false with message + replication.Status.Conditions = []topology.Condition{topology.NotReady(err.Error())} + if writerErr := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error { + return r.Status().Update(ctx, replication) + }); writerErr != nil { + logger.Error(writerErr, failedStatusUpdate) + } + return ctrl.Result{}, err + } + + replication.Status.Conditions = []topology.Condition{topology.Ready()} + replication.Status.ObservedGeneration = replication.GetGeneration() + if writerErr := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error { + return r.Status().Update(ctx, replication) + }); writerErr != nil { + logger.Error(writerErr, failedStatusUpdate) + } + logger.Info("Finished reconciling") + + return ctrl.Result{}, nil +} + +func (r *SchemaReplicationReconciler) setSchemaReplicationUpstream(ctx context.Context, client internal.RabbitMQClient, replication *topology.SchemaReplication) error { + logger := ctrl.LoggerFrom(ctx) + + endpoints, err := r.getUpstreamEndpoints(ctx, replication) + if err != nil { + msg := "failed to generate upstream endpoints" + r.Recorder.Event(replication, corev1.EventTypeWarning, "FailedUpdate", msg) + logger.Error(err, msg, "upstream secret", replication.Spec.UpstreamSecret) + return err + } + + if err := validateResponse(client.PutGlobalParameter(schemaReplicationParameterName, endpoints)); err != nil { + msg := fmt.Sprintf("failed to set '%s' global parameter", schemaReplicationParameterName) + r.Recorder.Event(replication, corev1.EventTypeWarning, "FailedUpdate", msg) + logger.Error(err, msg, "upstream secret", replication.Spec.UpstreamSecret) + return err + } + + msg := fmt.Sprintf("successfully set '%s' global parameter", schemaReplicationParameterName) + logger.Info(msg) + r.Recorder.Event(replication, corev1.EventTypeNormal, "SuccessfulUpdate", msg) + return nil +} + +func (r *SchemaReplicationReconciler) addFinalizerIfNeeded(ctx context.Context, replication *topology.SchemaReplication) error { + if replication.ObjectMeta.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(replication, replicationFinalizer) { + controllerutil.AddFinalizer(replication, replicationFinalizer) + if err := r.Client.Update(ctx, replication); err != nil { + return err + } + } + return nil +} + +func (r *SchemaReplicationReconciler) deleteSchemaReplicationParameters(ctx context.Context, client internal.RabbitMQClient, replication *topology.SchemaReplication) error { + logger := ctrl.LoggerFrom(ctx) + + if client == nil || reflect.ValueOf(client).IsNil() { + logger.Info(noSuchRabbitDeletion, "schemaReplication", replication.Name) + r.Recorder.Event(replication, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted schemaReplication") + return r.removeFinalizer(ctx, replication) + } + + err := validateResponseForDeletion(client.DeleteGlobalParameter(schemaReplicationParameterName)) + if errors.Is(err, NotFound) { + logger.Info("cannot find global parameter; no need to delete it", "parameter", schemaReplicationParameterName) + } else if err != nil { + msg := fmt.Sprintf("failed to delete global parameter '%s'", schemaReplicationParameterName) + r.Recorder.Event(replication, corev1.EventTypeWarning, "FailedDelete", msg) + logger.Error(err, msg) + return err + } + + msg := fmt.Sprintf("successfully delete '%s' global parameter", schemaReplicationParameterName) + logger.Info(msg) + r.Recorder.Event(replication, corev1.EventTypeNormal, "SuccessfulDelete", msg) + return r.removeFinalizer(ctx, replication) +} + +func (r *SchemaReplicationReconciler) getUpstreamEndpoints(ctx context.Context, replication *topology.SchemaReplication) (internal.UpstreamEndpoints, error) { + if replication.Spec.UpstreamSecret == nil { + return internal.UpstreamEndpoints{}, fmt.Errorf("no upstream secret provided") + } + secret := &corev1.Secret{} + if err := r.Get(ctx, types.NamespacedName{Name: replication.Spec.UpstreamSecret.Name, Namespace: replication.Namespace}, secret); err != nil { + return internal.UpstreamEndpoints{}, err + } + + endpoints, err := internal.GenerateSchemaReplicationParameters(secret) + if err != nil { + return internal.UpstreamEndpoints{}, err + } + + return endpoints, nil +} + +func (r *SchemaReplicationReconciler) removeFinalizer(ctx context.Context, replication *topology.SchemaReplication) error { + controllerutil.RemoveFinalizer(replication, replicationFinalizer) + if err := r.Client.Update(ctx, replication); err != nil { + return err + } + return nil +} + +func (r *SchemaReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&topology.SchemaReplication{}). + Complete(r) +} diff --git a/controllers/schemareplication_controller_test.go b/controllers/schemareplication_controller_test.go new file mode 100644 index 00000000..e6ded7af --- /dev/null +++ b/controllers/schemareplication_controller_test.go @@ -0,0 +1,202 @@ +package controllers_test + +import ( + "bytes" + "errors" + "io/ioutil" + "net/http" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + . "github.com/onsi/gomega/gstruct" + topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("schema-replication-controller", func() { + var replication topology.SchemaReplication + var name string + + JustBeforeEach(func() { + replication = topology.SchemaReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: topology.SchemaReplicationSpec{ + UpstreamSecret: &corev1.LocalObjectReference{ + Name: "endpoints-secret", // created in 'BeforeSuite' + }, + RabbitmqClusterReference: topology.RabbitmqClusterReference{ + Name: "example-rabbit", + }, + }, + } + }) + + When("creation", func() { + When("the RabbitMQ Client returns a HTTP error response", func() { + BeforeEach(func() { + name = "test-replication-http-error" + fakeRabbitMQClient.PutGlobalParameterReturns(&http.Response{ + Status: "418 I'm a teapot", + StatusCode: 418, + }, errors.New("Some HTTP error")) + }) + + It("sets the status condition to indicate a failure to reconcile", func() { + Expect(client.Create(ctx, &replication)).To(Succeed()) + EventuallyWithOffset(1, func() []topology.Condition { + _ = client.Get( + ctx, + types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, + &replication, + ) + + return replication.Status.Conditions + }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("Some HTTP error"), + }))) + }) + }) + + When("the RabbitMQ Client returns a Go error response", func() { + BeforeEach(func() { + name = "test-replication-go-error" + fakeRabbitMQClient.PutGlobalParameterReturns(nil, errors.New("some go failure here")) + }) + + It("sets the status condition to indicate a failure to reconcile", func() { + Expect(client.Create(ctx, &replication)).To(Succeed()) + EventuallyWithOffset(1, func() []topology.Condition { + _ = client.Get( + ctx, + types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, + &replication, + ) + + return replication.Status.Conditions + }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some go failure here"), + }))) + }) + }) + + Context("success", func() { + BeforeEach(func() { + name = "test-replication-success" + fakeRabbitMQClient.PutGlobalParameterReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + }) + + It("sets the status condition 'Ready' to 'true'", func() { + Expect(client.Create(ctx, &replication)).To(Succeed()) + EventuallyWithOffset(1, func() []topology.Condition { + _ = client.Get( + ctx, + types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, + &replication, + ) + + return replication.Status.Conditions + }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) + }) + }) + }) + + When("deletion", func() { + JustBeforeEach(func() { + fakeRabbitMQClient.PutGlobalParameterReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + Expect(client.Create(ctx, &replication)).To(Succeed()) + EventuallyWithOffset(1, func() []topology.Condition { + _ = client.Get( + ctx, + types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, + &replication, + ) + + return replication.Status.Conditions + }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) + }) + + When("the RabbitMQ Client returns a HTTP error response", func() { + BeforeEach(func() { + name = "delete-replication-http-error" + fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ + Status: "502 Bad Gateway", + StatusCode: http.StatusBadGateway, + Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + }, nil) + }) + + It("raises an event to indicate a failure to delete", func() { + Expect(client.Delete(ctx, &replication)).To(Succeed()) + Consistently(func() bool { + err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + return apierrors.IsNotFound(err) + }, 5).Should(BeFalse()) + Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete global parameter 'schema_definition_sync_upstream'")) + }) + }) + + When("the RabbitMQ Client returns a Go error response", func() { + BeforeEach(func() { + name = "delete-replication-go-error" + fakeRabbitMQClient.DeleteGlobalParameterReturns(nil, errors.New("some error")) + }) + + It("publishes a 'warning' event", func() { + Expect(client.Delete(ctx, &replication)).To(Succeed()) + Consistently(func() bool { + err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + return apierrors.IsNotFound(err) + }, 5).Should(BeFalse()) + Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete global parameter 'schema_definition_sync_upstream'")) + }) + }) + + Context("success", func() { + BeforeEach(func() { + name = "delete-replication-success" + fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ + Status: "204 No Content", + StatusCode: http.StatusNoContent, + }, nil) + }) + + It("publishes a normal event", func() { + Expect(client.Delete(ctx, &replication)).To(Succeed()) + Eventually(func() bool { + err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + return apierrors.IsNotFound(err) + }, 5).Should(BeTrue()) + observedEvents := observedEvents() + Expect(observedEvents).NotTo(ContainElement("Warning FailedDelete failed to delete global parameter 'schema_definition_sync_upstream'")) + Expect(observedEvents).To(ContainElement("Normal SuccessfulDelete successfully delete 'schema_definition_sync_upstream' global parameter")) + }) + }) + }) +}) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index e133f83e..bffb8f3c 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -12,6 +12,8 @@ package controllers_test import ( "context" "errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "path/filepath" "testing" @@ -124,6 +126,13 @@ var _ = BeforeSuite(func(done Done) { RabbitmqClientFactory: fakeRabbitMQClientFactory, }).SetupWithManager(mgr) Expect(err).ToNot(HaveOccurred()) + err = (&controllers.SchemaReplicationReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + }).SetupWithManager(mgr) + Expect(err).ToNot(HaveOccurred()) go func() { err = mgr.Start(ctrl.SetupSignalHandler()) @@ -133,6 +142,21 @@ var _ = BeforeSuite(func(done Done) { client = mgr.GetClient() Expect(client).ToNot(BeNil()) + // used in schema-replication-controller test + secret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoints-secret", + Namespace: "default", + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "username": []byte("a-random-user"), + "password": []byte("a-random-password"), + "endpoints": []byte("a.endpoints.local:5672,b.endpoints.local:5672,c.endpoints.local:5672"), + }, + } + Expect(client.Create(ctx, &secret)).To(Succeed()) + close(done) }, 60) diff --git a/docs/api/rabbitmq.com.ref.asciidoc b/docs/api/rabbitmq.com.ref.asciidoc index 1eea0d0d..5002585d 100644 --- a/docs/api/rabbitmq.com.ref.asciidoc +++ b/docs/api/rabbitmq.com.ref.asciidoc @@ -24,6 +24,8 @@ Package v1alpha2 contains API Schema definitions for the rabbitmq.com v1alpha2 A - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-policylist[$$PolicyList$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-queue[$$Queue$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-queuelist[$$QueueList$$] +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication[$$SchemaReplication$$] +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationlist[$$SchemaReplicationList$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-user[$$User$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-userlist[$$UserList$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-vhost[$$Vhost$$] @@ -127,6 +129,7 @@ BindingStatus defines the observed state of Binding - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-permissionstatus[$$PermissionStatus$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-policystatus[$$PolicyStatus$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-queuestatus[$$QueueStatus$$] +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationstatus[$$SchemaReplicationStatus$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-userstatus[$$UserStatus$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-vhoststatus[$$VhostStatus$$] **** @@ -495,6 +498,7 @@ QueueStatus defines the observed state of Queue - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-permissionspec[$$PermissionSpec$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-policyspec[$$PolicySpec$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-queuespec[$$QueueSpec$$] +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationspec[$$SchemaReplicationSpec$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-userspec[$$UserSpec$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-vhostspec[$$VhostSpec$$] **** @@ -506,6 +510,84 @@ QueueStatus defines the observed state of Queue |=== +[id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication"] +==== SchemaReplication + +SchemaReplication is the Schema for the schemareplications API This feature requires Tanzu RabbitMQ with schema replication plugin. For more information, see: https://tanzu.vmware.com/rabbitmq and https://www.rabbitmq.com/definitions-standby.html. + +.Appears In: +**** +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationlist[$$SchemaReplicationList$$] +**** + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`apiVersion`* __string__ | `rabbitmq.com/v1alpha2` +| *`kind`* __string__ | `SchemaReplication` +| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. + +| *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationspec[$$SchemaReplicationSpec$$]__ | +| *`status`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationstatus[$$SchemaReplicationStatus$$]__ | +|=== + + +[id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationlist"] +==== SchemaReplicationList + +SchemaReplicationList contains a list of SchemaReplication + + + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`apiVersion`* __string__ | `rabbitmq.com/v1alpha2` +| *`kind`* __string__ | `SchemaReplicationList` +| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. + +| *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication[$$SchemaReplication$$]__ | +|=== + + +[id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationspec"] +==== SchemaReplicationSpec + +SchemaReplicationSpec defines the desired state of SchemaReplication + +.Appears In: +**** +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication[$$SchemaReplication$$] +**** + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`rabbitmqClusterReference`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that schema replication would be set for. Must be an existing cluster. +| *`upstreamSecret`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#localobjectreference-v1-core[$$LocalObjectReference$$]__ | Defines a Secret which contains credentials to be used for schema replication. The Secret must contain the keys `endpoints`, `username` and `password` in its Data field, or the controller errors. +|=== + + +[id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationstatus"] +==== SchemaReplicationStatus + +SchemaReplicationStatus defines the observed state of SchemaReplication + +.Appears In: +**** +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication[$$SchemaReplication$$] +**** + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`observedGeneration`* __integer__ | observedGeneration is the most recent successful generation observed for this Queue. It corresponds to the Queue's generation, which is updated on mutation by the API Server. +| *`conditions`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-condition[$$Condition$$]__ | +|=== + + [id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-user"] ==== User diff --git a/internal/internalfakes/fake_rabbit_mqclient.go b/internal/internalfakes/fake_rabbit_mqclient.go index 97e6eb02..295bc853 100644 --- a/internal/internalfakes/fake_rabbit_mqclient.go +++ b/internal/internalfakes/fake_rabbit_mqclient.go @@ -96,6 +96,19 @@ type FakeRabbitMQClient struct { result1 *http.Response result2 error } + DeleteGlobalParameterStub func(string) (*http.Response, error) + deleteGlobalParameterMutex sync.RWMutex + deleteGlobalParameterArgsForCall []struct { + arg1 string + } + deleteGlobalParameterReturns struct { + result1 *http.Response + result2 error + } + deleteGlobalParameterReturnsOnCall map[int]struct { + result1 *http.Response + result2 error + } DeletePolicyStub func(string, string) (*http.Response, error) deletePolicyMutex sync.RWMutex deletePolicyArgsForCall []struct { @@ -181,6 +194,20 @@ type FakeRabbitMQClient struct { result1 []rabbithole.BindingInfo result2 error } + PutGlobalParameterStub func(string, interface{}) (*http.Response, error) + putGlobalParameterMutex sync.RWMutex + putGlobalParameterArgsForCall []struct { + arg1 string + arg2 interface{} + } + putGlobalParameterReturns struct { + result1 *http.Response + result2 error + } + putGlobalParameterReturnsOnCall map[int]struct { + result1 *http.Response + result2 error + } PutPolicyStub func(string, string, rabbithole.Policy) (*http.Response, error) putPolicyMutex sync.RWMutex putPolicyArgsForCall []struct { @@ -635,6 +662,70 @@ func (fake *FakeRabbitMQClient) DeleteExchangeReturnsOnCall(i int, result1 *http }{result1, result2} } +func (fake *FakeRabbitMQClient) DeleteGlobalParameter(arg1 string) (*http.Response, error) { + fake.deleteGlobalParameterMutex.Lock() + ret, specificReturn := fake.deleteGlobalParameterReturnsOnCall[len(fake.deleteGlobalParameterArgsForCall)] + fake.deleteGlobalParameterArgsForCall = append(fake.deleteGlobalParameterArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.DeleteGlobalParameterStub + fakeReturns := fake.deleteGlobalParameterReturns + fake.recordInvocation("DeleteGlobalParameter", []interface{}{arg1}) + fake.deleteGlobalParameterMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterCallCount() int { + fake.deleteGlobalParameterMutex.RLock() + defer fake.deleteGlobalParameterMutex.RUnlock() + return len(fake.deleteGlobalParameterArgsForCall) +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterCalls(stub func(string) (*http.Response, error)) { + fake.deleteGlobalParameterMutex.Lock() + defer fake.deleteGlobalParameterMutex.Unlock() + fake.DeleteGlobalParameterStub = stub +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterArgsForCall(i int) string { + fake.deleteGlobalParameterMutex.RLock() + defer fake.deleteGlobalParameterMutex.RUnlock() + argsForCall := fake.deleteGlobalParameterArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterReturns(result1 *http.Response, result2 error) { + fake.deleteGlobalParameterMutex.Lock() + defer fake.deleteGlobalParameterMutex.Unlock() + fake.DeleteGlobalParameterStub = nil + fake.deleteGlobalParameterReturns = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterReturnsOnCall(i int, result1 *http.Response, result2 error) { + fake.deleteGlobalParameterMutex.Lock() + defer fake.deleteGlobalParameterMutex.Unlock() + fake.DeleteGlobalParameterStub = nil + if fake.deleteGlobalParameterReturnsOnCall == nil { + fake.deleteGlobalParameterReturnsOnCall = make(map[int]struct { + result1 *http.Response + result2 error + }) + } + fake.deleteGlobalParameterReturnsOnCall[i] = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + func (fake *FakeRabbitMQClient) DeletePolicy(arg1 string, arg2 string) (*http.Response, error) { fake.deletePolicyMutex.Lock() ret, specificReturn := fake.deletePolicyReturnsOnCall[len(fake.deletePolicyArgsForCall)] @@ -1026,6 +1117,71 @@ func (fake *FakeRabbitMQClient) ListQueueBindingsBetweenReturnsOnCall(i int, res }{result1, result2} } +func (fake *FakeRabbitMQClient) PutGlobalParameter(arg1 string, arg2 interface{}) (*http.Response, error) { + fake.putGlobalParameterMutex.Lock() + ret, specificReturn := fake.putGlobalParameterReturnsOnCall[len(fake.putGlobalParameterArgsForCall)] + fake.putGlobalParameterArgsForCall = append(fake.putGlobalParameterArgsForCall, struct { + arg1 string + arg2 interface{} + }{arg1, arg2}) + stub := fake.PutGlobalParameterStub + fakeReturns := fake.putGlobalParameterReturns + fake.recordInvocation("PutGlobalParameter", []interface{}{arg1, arg2}) + fake.putGlobalParameterMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterCallCount() int { + fake.putGlobalParameterMutex.RLock() + defer fake.putGlobalParameterMutex.RUnlock() + return len(fake.putGlobalParameterArgsForCall) +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterCalls(stub func(string, interface{}) (*http.Response, error)) { + fake.putGlobalParameterMutex.Lock() + defer fake.putGlobalParameterMutex.Unlock() + fake.PutGlobalParameterStub = stub +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterArgsForCall(i int) (string, interface{}) { + fake.putGlobalParameterMutex.RLock() + defer fake.putGlobalParameterMutex.RUnlock() + argsForCall := fake.putGlobalParameterArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterReturns(result1 *http.Response, result2 error) { + fake.putGlobalParameterMutex.Lock() + defer fake.putGlobalParameterMutex.Unlock() + fake.PutGlobalParameterStub = nil + fake.putGlobalParameterReturns = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterReturnsOnCall(i int, result1 *http.Response, result2 error) { + fake.putGlobalParameterMutex.Lock() + defer fake.putGlobalParameterMutex.Unlock() + fake.PutGlobalParameterStub = nil + if fake.putGlobalParameterReturnsOnCall == nil { + fake.putGlobalParameterReturnsOnCall = make(map[int]struct { + result1 *http.Response + result2 error + }) + } + fake.putGlobalParameterReturnsOnCall[i] = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + func (fake *FakeRabbitMQClient) PutPolicy(arg1 string, arg2 string, arg3 rabbithole.Policy) (*http.Response, error) { fake.putPolicyMutex.Lock() ret, specificReturn := fake.putPolicyReturnsOnCall[len(fake.putPolicyArgsForCall)] @@ -1303,6 +1459,8 @@ func (fake *FakeRabbitMQClient) Invocations() map[string][][]interface{} { defer fake.deleteBindingMutex.RUnlock() fake.deleteExchangeMutex.RLock() defer fake.deleteExchangeMutex.RUnlock() + fake.deleteGlobalParameterMutex.RLock() + defer fake.deleteGlobalParameterMutex.RUnlock() fake.deletePolicyMutex.RLock() defer fake.deletePolicyMutex.RUnlock() fake.deleteQueueMutex.RLock() @@ -1315,6 +1473,8 @@ func (fake *FakeRabbitMQClient) Invocations() map[string][][]interface{} { defer fake.listExchangeBindingsBetweenMutex.RUnlock() fake.listQueueBindingsBetweenMutex.RLock() defer fake.listQueueBindingsBetweenMutex.RUnlock() + fake.putGlobalParameterMutex.RLock() + defer fake.putGlobalParameterMutex.RUnlock() fake.putPolicyMutex.RLock() defer fake.putPolicyMutex.RUnlock() fake.putUserMutex.RLock() diff --git a/internal/rabbitmq_client_factory.go b/internal/rabbitmq_client_factory.go index bef4c9d8..bd44381a 100644 --- a/internal/rabbitmq_client_factory.go +++ b/internal/rabbitmq_client_factory.go @@ -43,6 +43,8 @@ type RabbitMQClient interface { DeleteExchange(string, string) (*http.Response, error) PutVhost(string, rabbithole.VhostSettings) (*http.Response, error) DeleteVhost(string) (*http.Response, error) + PutGlobalParameter(name string, value interface{}) (*http.Response, error) + DeleteGlobalParameter(name string) (*http.Response, error) } type RabbitMQClientFactory func(ctx context.Context, c client.Client, rmq topology.RabbitmqClusterReference, namespace string) (RabbitMQClient, error) diff --git a/internal/schema_replication.go b/internal/schema_replication.go new file mode 100644 index 00000000..ce65fd2b --- /dev/null +++ b/internal/schema_replication.go @@ -0,0 +1,47 @@ +/* +RabbitMQ Messaging Topology Kubernetes Operator +Copyright 2021 VMware, Inc. + +This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License. + +This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file. +*/ + +package internal + +import ( + "fmt" + corev1 "k8s.io/api/core/v1" + "strings" +) + +type UpstreamEndpoints struct { + Username string `json:"username"` + Password string `json:"password"` + Endpoints []string `json:"endpoints"` +} + +func GenerateSchemaReplicationParameters(secret *corev1.Secret) (UpstreamEndpoints, error) { + username, ok := secret.Data["username"] + if !ok { + return UpstreamEndpoints{}, fmt.Errorf("could not find username in secret %s", secret.Name) + } + password, ok := secret.Data["password"] + if !ok { + return UpstreamEndpoints{}, fmt.Errorf("could not find password in secret %s", secret.Name) + } + + endpoints, ok := secret.Data["endpoints"] + if !ok { + return UpstreamEndpoints{}, fmt.Errorf("could not find endpoints in secret %s", secret.Name) + } + + endpointsList := strings.Split(string(endpoints), ",") + + return UpstreamEndpoints{ + Username: string(username), + Password: string(password), + Endpoints: endpointsList, + }, nil + +} diff --git a/internal/schema_replication_test.go b/internal/schema_replication_test.go new file mode 100644 index 00000000..be55a376 --- /dev/null +++ b/internal/schema_replication_test.go @@ -0,0 +1,35 @@ +package internal_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/rabbitmq/messaging-topology-operator/internal" + corev1 "k8s.io/api/core/v1" +) + +var _ = Describe("GenerateSchemaReplicationParameters", func() { + var secret corev1.Secret + + BeforeEach(func() { + secret = corev1.Secret{ + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "username": []byte("a-random-user"), + "password": []byte("a-random-password"), + "endpoints": []byte("a.endpoints.local:5672,b.endpoints.local:5672,c.endpoints.local:5672"), + }, + } + }) + + It("generates expected replication parameters", func() { + parameters, err := internal.GenerateSchemaReplicationParameters(&secret) + Expect(err).NotTo(HaveOccurred()) + Expect(parameters.Username).To(Equal("a-random-user")) + Expect(parameters.Password).To(Equal("a-random-password")) + Expect(parameters.Endpoints).To(Equal([]string{ + "a.endpoints.local:5672", + "b.endpoints.local:5672", + "c.endpoints.local:5672", + })) + }) +}) diff --git a/main.go b/main.go index cd8c6f0b..659470ea 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ import ( rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" + rabbitmqcomv1alpha2 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" "github.com/rabbitmq/messaging-topology-operator/controllers" "github.com/rabbitmq/messaging-topology-operator/internal" @@ -37,6 +38,7 @@ func init() { _ = rabbitmqv1beta1.AddToScheme(scheme) _ = topology.AddToScheme(scheme) + _ = rabbitmqcomv1alpha2.AddToScheme(scheme) // +kubebuilder:scaffold:scheme } @@ -128,6 +130,16 @@ func main() { log.Error(err, "unable to create controller", "controller", controllers.PermissionControllerName) os.Exit(1) } + if err = (&controllers.SchemaReplicationReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("SchemaReplication"), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor(controllers.SchemaReplicationControllerName), + RabbitmqClientFactory: internal.RabbitholeClientFactory, + }).SetupWithManager(mgr); err != nil { + log.Error(err, "unable to create controller", "controller", controllers.SchemaReplicationControllerName) + os.Exit(1) + } if err = (&topology.Binding{}).SetupWebhookWithManager(mgr); err != nil { log.Error(err, "unable to create webhook", "webhook", "Binding") diff --git a/system_tests/schema_replication_system_test.go b/system_tests/schema_replication_system_test.go new file mode 100644 index 00000000..315c83d0 --- /dev/null +++ b/system_tests/schema_replication_system_test.go @@ -0,0 +1,103 @@ +package system_tests + +import ( + "context" + rabbithole "github.com/michaelklishin/rabbit-hole/v2" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" +) + +var _ = Describe("schema replication", func() { + + var ( + endpointsSecret corev1.Secret + namespace = MustHaveEnv("NAMESPACE") + ctx = context.Background() + replication = &topology.SchemaReplication{} + ) + + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &endpointsSecret, &client.DeleteOptions{})).To(Succeed()) + }) + + BeforeEach(func() { + endpointsSecret = corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoints-secret", + Namespace: namespace, + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "endpoints": []byte("abc.endpoints.local:5672,efg.endpoints.local:1234"), + "username": []byte("some-username"), + "password": []byte("some-password"), + }, + } + Expect(k8sClient.Create(ctx, &endpointsSecret, &client.CreateOptions{})).To(Succeed()) + replication = &topology.SchemaReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "replication", + Namespace: namespace, + }, + Spec: topology.SchemaReplicationSpec{ + UpstreamSecret: &corev1.LocalObjectReference{ + Name: "endpoints-secret", + }, + RabbitmqClusterReference: topology.RabbitmqClusterReference{ + Name: rmq.Name, + }, + }, + } + }) + + It("works", func() { + By("setting schema replication upstream global parameters successfully") + Expect(k8sClient.Create(ctx, replication, &client.CreateOptions{})).To(Succeed()) + var allGlobalParams []rabbithole.GlobalRuntimeParameter + Eventually(func() []rabbithole.GlobalRuntimeParameter { + var err error + allGlobalParams, err = rabbitClient.ListGlobalParameters() + Expect(err).NotTo(HaveOccurred()) + return allGlobalParams + }, 5, 2).Should(HaveLen(3)) // cluster_name and internal_cluster_id are set by default by RabbitMQ + + Expect(allGlobalParams).To(ContainElement( + rabbithole.GlobalRuntimeParameter{ + Name: "schema_definition_sync_upstream", + Value: map[string]interface{}{ + "endpoints": []interface{}{"abc.endpoints.local:5672", "efg.endpoints.local:1234"}, + "username": "some-username", + "password": "some-password", + }, + })) + + By("updating status condition 'Ready'") + updatedReplication := topology.SchemaReplication{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &updatedReplication)).To(Succeed()) + + Expect(updatedReplication.Status.Conditions).To(HaveLen(1)) + readyCondition := updatedReplication.Status.Conditions[0] + Expect(string(readyCondition.Type)).To(Equal("Ready")) + Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue)) + Expect(readyCondition.Reason).To(Equal("SuccessfulCreateOrUpdate")) + Expect(readyCondition.LastTransitionTime).NotTo(Equal(metav1.Time{})) + + By("setting status.observedGeneration") + Expect(updatedReplication.Status.ObservedGeneration).To(Equal(updatedReplication.GetGeneration())) + + By("unsetting schema replication upstream global parameters on deletion") + Expect(k8sClient.Delete(ctx, replication)).To(Succeed()) + Eventually(func() []rabbithole.GlobalRuntimeParameter { + var err error + allGlobalParams, err = rabbitClient.ListGlobalParameters() + Expect(err).NotTo(HaveOccurred()) + return allGlobalParams + }, 5, 2).Should(HaveLen(2)) // cluster_name and internal_cluster_id are set by default by RabbitMQ + }) +})