From 00b1004c17c3e50e2da393cc286b2c18d4b54035 Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Wed, 14 Apr 2021 15:06:16 +0100 Subject: [PATCH 1/3] Bump to rabbithole v2.8 --- go.mod | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 8e2e9fe3..091f9f97 100644 --- a/go.mod +++ b/go.mod @@ -6,15 +6,15 @@ require ( github.com/elastic/crd-ref-docs v0.0.7 github.com/go-logr/logr v0.4.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.4.1 - github.com/michaelklishin/rabbit-hole/v2 v2.7.0 + github.com/michaelklishin/rabbit-hole/v2 v2.8.0 github.com/onsi/ginkgo v1.16.1 github.com/onsi/gomega v1.11.0 github.com/rabbitmq/cluster-operator v1.6.0 k8s.io/api v0.20.5 k8s.io/apimachinery v0.20.5 k8s.io/client-go v0.20.5 - k8s.io/code-generator v0.20.5 - k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd + k8s.io/code-generator v0.21.0 + k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 k8s.io/utils v0.0.0-20210111153108-fddb29f9d009 sigs.k8s.io/controller-runtime v0.8.3 sigs.k8s.io/controller-tools v0.5.0 From a7fd85a41e70dc848b0d3f3c2f31c9ca65b88e38 Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Thu, 15 Apr 2021 14:56:14 +0100 Subject: [PATCH 2/3] Add schemareplications.rabbitmq.com - sets global parameter 'schema_definition_sync_upstream' with value 'username' 'password' and 'endpoints' --- PROJECT | 3 + api/v1alpha2/schemareplication_types.go | 61 ++++++ api/v1alpha2/schemareplication_types_test.go | 39 ++++ api/v1alpha2/zz_generated.deepcopy.go | 102 +++++++++ .../rabbitmq.com_schemareplications.yaml | 109 ++++++++++ config/crd/kustomization.yaml | 3 + .../cainjection_in_schemareplications.yaml | 8 + .../webhook_in_schemareplications.yaml | 17 ++ config/rbac/role.yaml | 20 ++ .../rbac/schemareplication_editor_role.yaml | 24 +++ .../rbac/schemareplication_viewer_role.yaml | 20 ++ ...bbitmq.com_v1alpha2_schemareplication.yaml | 7 + controllers/common.go | 15 +- controllers/schemareplication_controller.go | 188 ++++++++++++++++ .../schemareplication_controller_test.go | 202 ++++++++++++++++++ controllers/suite_test.go | 24 +++ docs/api/rabbitmq.com.ref.asciidoc | 82 +++++++ .../internalfakes/fake_rabbit_mqclient.go | 160 ++++++++++++++ internal/rabbitmq_client_factory.go | 2 + internal/schema_replication.go | 47 ++++ internal/schema_replication_test.go | 35 +++ main.go | 12 ++ .../schema_replication_system_test.go | 103 +++++++++ 23 files changed, 1276 insertions(+), 7 deletions(-) create mode 100644 api/v1alpha2/schemareplication_types.go create mode 100644 api/v1alpha2/schemareplication_types_test.go create mode 100644 config/crd/bases/rabbitmq.com_schemareplications.yaml create mode 100644 config/crd/patches/cainjection_in_schemareplications.yaml create mode 100644 config/crd/patches/webhook_in_schemareplications.yaml create mode 100644 config/rbac/schemareplication_editor_role.yaml create mode 100644 config/rbac/schemareplication_viewer_role.yaml create mode 100644 config/samples/rabbitmq.com_v1alpha2_schemareplication.yaml create mode 100644 controllers/schemareplication_controller.go create mode 100644 controllers/schemareplication_controller_test.go create mode 100644 internal/schema_replication.go create mode 100644 internal/schema_replication_test.go create mode 100644 system_tests/schema_replication_system_test.go diff --git a/PROJECT b/PROJECT index 943d94fa..be243df5 100644 --- a/PROJECT +++ b/PROJECT @@ -22,4 +22,7 @@ resources: - group: rabbitmq.com kind: Permission version: v1alpha2 +- group: rabbitmq.com + kind: SchemaReplication + version: v1alpha2 version: "2" diff --git a/api/v1alpha2/schemareplication_types.go b/api/v1alpha2/schemareplication_types.go new file mode 100644 index 00000000..1bd36b24 --- /dev/null +++ b/api/v1alpha2/schemareplication_types.go @@ -0,0 +1,61 @@ +/* +RabbitMQ Messaging Topology Kubernetes Operator +Copyright 2021 VMware, Inc. + +This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License. + +This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file. +*/ + +package v1alpha2 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// SchemaReplicationSpec defines the desired state of SchemaReplication +type SchemaReplicationSpec struct { + // Reference to the RabbitmqCluster that schema replication would be set for. Must be an existing cluster. + // +kubebuilder:validation:Required + RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"` + // Defines a Secret which contains credentials to be used for schema replication. + // The Secret must contain the keys `endpoints`, `username` and `password` in its Data field, or the controller errors. + // +kubebuilder:validation:Required + UpstreamSecret *corev1.LocalObjectReference `json:"upstreamSecret,omitempty"` +} + +// SchemaReplicationStatus defines the observed state of SchemaReplication +type SchemaReplicationStatus struct { + // observedGeneration is the most recent successful generation observed for this Queue. It corresponds to the + // Queue's generation, which is updated on mutation by the API Server. + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + Conditions []Condition `json:"conditions,omitempty"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status + +// SchemaReplication is the Schema for the schemareplications API +// This feature requires Tanzu RabbitMQ with schema replication plugin. +// For more information, see: https://tanzu.vmware.com/rabbitmq and https://www.rabbitmq.com/definitions-standby.html. +type SchemaReplication struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec SchemaReplicationSpec `json:"spec,omitempty"` + Status SchemaReplicationStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +// SchemaReplicationList contains a list of SchemaReplication +type SchemaReplicationList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []SchemaReplication `json:"items"` +} + +func init() { + SchemeBuilder.Register(&SchemaReplication{}, &SchemaReplicationList{}) +} diff --git a/api/v1alpha2/schemareplication_types_test.go b/api/v1alpha2/schemareplication_types_test.go new file mode 100644 index 00000000..495d6866 --- /dev/null +++ b/api/v1alpha2/schemareplication_types_test.go @@ -0,0 +1,39 @@ +package v1alpha2 + +import ( + "context" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("schemaReplication spec", func() { + It("creates a schemaReplication", func() { + replication := SchemaReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "replication", + Namespace: "default", + }, + Spec: SchemaReplicationSpec{ + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "some-cluster", + }, + UpstreamSecret: &corev1.LocalObjectReference{ + Name: "a-secret", + }, + }} + Expect(k8sClient.Create(context.Background(), &replication)).To(Succeed()) + + fetched := &SchemaReplication{} + Expect(k8sClient.Get(context.Background(), types.NamespacedName{ + Name: replication.Name, + Namespace: replication.Namespace, + }, fetched)).To(Succeed()) + Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{ + Name: "some-cluster", + })) + Expect(fetched.Spec.UpstreamSecret.Name).To(Equal("a-secret")) + }) +}) diff --git a/api/v1alpha2/zz_generated.deepcopy.go b/api/v1alpha2/zz_generated.deepcopy.go index 9c57622e..f8edaa55 100644 --- a/api/v1alpha2/zz_generated.deepcopy.go +++ b/api/v1alpha2/zz_generated.deepcopy.go @@ -555,6 +555,108 @@ func (in *RabbitmqClusterReference) DeepCopy() *RabbitmqClusterReference { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchemaReplication) DeepCopyInto(out *SchemaReplication) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchemaReplication. +func (in *SchemaReplication) DeepCopy() *SchemaReplication { + if in == nil { + return nil + } + out := new(SchemaReplication) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SchemaReplication) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchemaReplicationList) DeepCopyInto(out *SchemaReplicationList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]SchemaReplication, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchemaReplicationList. +func (in *SchemaReplicationList) DeepCopy() *SchemaReplicationList { + if in == nil { + return nil + } + out := new(SchemaReplicationList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SchemaReplicationList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchemaReplicationSpec) DeepCopyInto(out *SchemaReplicationSpec) { + *out = *in + out.RabbitmqClusterReference = in.RabbitmqClusterReference + if in.UpstreamSecret != nil { + in, out := &in.UpstreamSecret, &out.UpstreamSecret + *out = new(v1.LocalObjectReference) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchemaReplicationSpec. +func (in *SchemaReplicationSpec) DeepCopy() *SchemaReplicationSpec { + if in == nil { + return nil + } + out := new(SchemaReplicationSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SchemaReplicationStatus) DeepCopyInto(out *SchemaReplicationStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchemaReplicationStatus. +func (in *SchemaReplicationStatus) DeepCopy() *SchemaReplicationStatus { + if in == nil { + return nil + } + out := new(SchemaReplicationStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *User) DeepCopyInto(out *User) { *out = *in diff --git a/config/crd/bases/rabbitmq.com_schemareplications.yaml b/config/crd/bases/rabbitmq.com_schemareplications.yaml new file mode 100644 index 00000000..4d61c30a --- /dev/null +++ b/config/crd/bases/rabbitmq.com_schemareplications.yaml @@ -0,0 +1,109 @@ + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.5.0 + creationTimestamp: null + name: schemareplications.rabbitmq.com +spec: + group: rabbitmq.com + names: + kind: SchemaReplication + listKind: SchemaReplicationList + plural: schemareplications + singular: schemareplication + scope: Namespaced + versions: + - name: v1alpha2 + schema: + openAPIV3Schema: + description: 'SchemaReplication is the Schema for the schemareplications API + This feature requires Tanzu RabbitMQ with schema replication plugin. For + more information, see: https://tanzu.vmware.com/rabbitmq and https://www.rabbitmq.com/definitions-standby.html.' + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: SchemaReplicationSpec defines the desired state of SchemaReplication + properties: + rabbitmqClusterReference: + description: Reference to the RabbitmqCluster that schema replication + would be set for. Must be an existing cluster. + properties: + name: + type: string + required: + - name + type: object + upstreamSecret: + description: Defines a Secret which contains credentials to be used + for schema replication. The Secret must contain the keys `endpoints`, + `username` and `password` in its Data field, or the controller errors. + properties: + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid?' + type: string + type: object + required: + - rabbitmqClusterReference + type: object + status: + description: SchemaReplicationStatus defines the observed state of SchemaReplication + properties: + conditions: + items: + properties: + lastTransitionTime: + description: The last time this Condition type changed. + format: date-time + type: string + message: + description: Full text reason for current status of the condition. + type: string + reason: + description: One word, camel-case reason for current status + of the condition. + type: string + status: + description: True, False, or Unknown + type: string + type: + description: Type indicates the scope of RabbitmqCluster status + addressed by the condition. + type: string + required: + - status + - type + type: object + type: array + observedGeneration: + description: observedGeneration is the most recent successful generation + observed for this Queue. It corresponds to the Queue's generation, + which is updated on mutation by the API Server. + format: int64 + type: integer + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 4b9594e2..e3a06b20 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -9,6 +9,7 @@ resources: - bases/rabbitmq.com_vhosts.yaml - bases/rabbitmq.com_policies.yaml - bases/rabbitmq.com_permissions.yaml +- bases/rabbitmq.com_schemareplications.yaml # +kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: @@ -19,6 +20,7 @@ patchesStrategicMerge: - patches/webhook_in_policies.yaml - patches/webhook_in_users.yaml - patches/webhook_in_permissions.yaml +#- patches/webhook_in_schemareplications.yaml # +kubebuilder:scaffold:crdkustomizewebhookpatch - patches/cainjection_in_bindings.yaml @@ -28,6 +30,7 @@ patchesStrategicMerge: - patches/cainjection_in_policies.yaml - patches/cainjection_in_users.yaml - patches/cainjection_in_permissions.yaml +#- patches/cainjection_in_schemareplications.yaml # +kubebuilder:scaffold:crdkustomizecainjectionpatch configurations: diff --git a/config/crd/patches/cainjection_in_schemareplications.yaml b/config/crd/patches/cainjection_in_schemareplications.yaml new file mode 100644 index 00000000..c1ec28aa --- /dev/null +++ b/config/crd/patches/cainjection_in_schemareplications.yaml @@ -0,0 +1,8 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: schemareplications.rabbitmq.com diff --git a/config/crd/patches/webhook_in_schemareplications.yaml b/config/crd/patches/webhook_in_schemareplications.yaml new file mode 100644 index 00000000..7c5883c5 --- /dev/null +++ b/config/crd/patches/webhook_in_schemareplications.yaml @@ -0,0 +1,17 @@ +# The following patch enables conversion webhook for CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: schemareplications.rabbitmq.com +spec: + conversion: + strategy: Webhook + webhook: + conversionReviewVersions: ["v1", "v1beta1"] + clientConfig: + caBundle: Cg== + service: + namespace: system + name: webhook-service + path: /convert diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index e5d57eff..c6f7a931 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -151,6 +151,26 @@ rules: verbs: - get - update +- apiGroups: + - rabbitmq.com + resources: + - schemareplications + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - rabbitmq.com + resources: + - schemareplications/status + verbs: + - get + - patch + - update - apiGroups: - rabbitmq.com resources: diff --git a/config/rbac/schemareplication_editor_role.yaml b/config/rbac/schemareplication_editor_role.yaml new file mode 100644 index 00000000..55e6225b --- /dev/null +++ b/config/rbac/schemareplication_editor_role.yaml @@ -0,0 +1,24 @@ +# permissions for end users to edit schemareplications. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: schemareplication-editor-role +rules: +- apiGroups: + - rabbitmq.com + resources: + - schemareplications + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - rabbitmq.com + resources: + - schemareplications/status + verbs: + - get diff --git a/config/rbac/schemareplication_viewer_role.yaml b/config/rbac/schemareplication_viewer_role.yaml new file mode 100644 index 00000000..7bb08025 --- /dev/null +++ b/config/rbac/schemareplication_viewer_role.yaml @@ -0,0 +1,20 @@ +# permissions for end users to view schemareplications. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: schemareplication-viewer-role +rules: +- apiGroups: + - rabbitmq.com + resources: + - schemareplications + verbs: + - get + - list + - watch +- apiGroups: + - rabbitmq.com + resources: + - schemareplications/status + verbs: + - get diff --git a/config/samples/rabbitmq.com_v1alpha2_schemareplication.yaml b/config/samples/rabbitmq.com_v1alpha2_schemareplication.yaml new file mode 100644 index 00000000..8fc1119a --- /dev/null +++ b/config/samples/rabbitmq.com_v1alpha2_schemareplication.yaml @@ -0,0 +1,7 @@ +apiVersion: rabbitmq.com/v1alpha2 +kind: SchemaReplication +metadata: + name: schemareplication-sample +spec: + # Add fields here + foo: bar diff --git a/controllers/common.go b/controllers/common.go index f980fe3c..439c2f3a 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -10,11 +10,12 @@ const ( // names for each of the controllers const ( - VhostControllerName = "vhost-controller" - QueueControllerName = "queue-controller" - ExchangeControllerName = "exchange-controller" - BindingControllerName = "binding-controller" - UserControllerName = "user-controller" - PolicyControllerName = "policy-controller" - PermissionControllerName = "permission-controller" + VhostControllerName = "vhost-controller" + QueueControllerName = "queue-controller" + ExchangeControllerName = "exchange-controller" + BindingControllerName = "binding-controller" + UserControllerName = "user-controller" + PolicyControllerName = "policy-controller" + PermissionControllerName = "permission-controller" + SchemaReplicationControllerName = "schema-replication-controller" ) diff --git a/controllers/schemareplication_controller.go b/controllers/schemareplication_controller.go new file mode 100644 index 00000000..2d50fd88 --- /dev/null +++ b/controllers/schemareplication_controller.go @@ -0,0 +1,188 @@ +package controllers + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/rabbitmq/messaging-topology-operator/internal" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + clientretry "k8s.io/client-go/util/retry" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" +) + +const replicationFinalizer = "deletion.finalizers.schemareplications.rabbitmq.com" +const schemaReplicationParameterName = "schema_definition_sync_upstream" + +// SchemaReplicationReconciler reconciles a SchemaReplication object +type SchemaReplicationReconciler struct { + client.Client + Log logr.Logger + Scheme *runtime.Scheme + Recorder record.EventRecorder + RabbitmqClientFactory internal.RabbitMQClientFactory +} + +// +kubebuilder:rbac:groups=rabbitmq.com,resources=schemareplications,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=rabbitmq.com,resources=schemareplications/status,verbs=get;update;patch + +func (r *SchemaReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := ctrl.LoggerFrom(ctx) + + replication := &topology.SchemaReplication{} + if err := r.Get(ctx, req.NamespacedName, replication); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) + } + + rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, replication.Spec.RabbitmqClusterReference, replication.Namespace) + // If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that + // the Cluster is temporarily down. Requeue until it comes back up. + if errors.Is(err, internal.NoSuchRabbitmqClusterError) && replication.ObjectMeta.DeletionTimestamp.IsZero() { + logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error()) + return reconcile.Result{RequeueAfter: 10 * time.Second}, err + } else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) { + logger.Error(err, failedGenerateRabbitClient) + return reconcile.Result{}, err + } + + if !replication.ObjectMeta.DeletionTimestamp.IsZero() { + logger.Info("Deleting") + return ctrl.Result{}, r.deleteSchemaReplicationParameters(ctx, rabbitClient, replication) + } + + if err := r.addFinalizerIfNeeded(ctx, replication); err != nil { + return ctrl.Result{}, err + } + + spec, err := json.Marshal(replication.Spec) + if err != nil { + logger.Error(err, failedMarshalSpec) + } + + logger.Info("Start reconciling", + "spec", string(spec)) + + if err := r.setSchemaReplicationUpstream(ctx, rabbitClient, replication); err != nil { + // Set Condition 'Ready' to false with message + replication.Status.Conditions = []topology.Condition{topology.NotReady(err.Error())} + if writerErr := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error { + return r.Status().Update(ctx, replication) + }); writerErr != nil { + logger.Error(writerErr, failedStatusUpdate) + } + return ctrl.Result{}, err + } + + replication.Status.Conditions = []topology.Condition{topology.Ready()} + replication.Status.ObservedGeneration = replication.GetGeneration() + if writerErr := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error { + return r.Status().Update(ctx, replication) + }); writerErr != nil { + logger.Error(writerErr, failedStatusUpdate) + } + logger.Info("Finished reconciling") + + return ctrl.Result{}, nil +} + +func (r *SchemaReplicationReconciler) setSchemaReplicationUpstream(ctx context.Context, client internal.RabbitMQClient, replication *topology.SchemaReplication) error { + logger := ctrl.LoggerFrom(ctx) + + endpoints, err := r.getUpstreamEndpoints(ctx, replication) + if err != nil { + msg := "failed to generate upstream endpoints" + r.Recorder.Event(replication, corev1.EventTypeWarning, "FailedUpdate", msg) + logger.Error(err, msg, "upstream secret", replication.Spec.UpstreamSecret) + return err + } + + if err := validateResponse(client.PutGlobalParameter(schemaReplicationParameterName, endpoints)); err != nil { + msg := fmt.Sprintf("failed to set '%s' global parameter", schemaReplicationParameterName) + r.Recorder.Event(replication, corev1.EventTypeWarning, "FailedUpdate", msg) + logger.Error(err, msg, "upstream secret", replication.Spec.UpstreamSecret) + return err + } + + msg := fmt.Sprintf("successfully set '%s' global parameter", schemaReplicationParameterName) + logger.Info(msg) + r.Recorder.Event(replication, corev1.EventTypeNormal, "SuccessfulUpdate", msg) + return nil +} + +func (r *SchemaReplicationReconciler) addFinalizerIfNeeded(ctx context.Context, replication *topology.SchemaReplication) error { + if replication.ObjectMeta.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(replication, replicationFinalizer) { + controllerutil.AddFinalizer(replication, replicationFinalizer) + if err := r.Client.Update(ctx, replication); err != nil { + return err + } + } + return nil +} + +func (r *SchemaReplicationReconciler) deleteSchemaReplicationParameters(ctx context.Context, client internal.RabbitMQClient, replication *topology.SchemaReplication) error { + logger := ctrl.LoggerFrom(ctx) + + if client == nil || reflect.ValueOf(client).IsNil() { + logger.Info(noSuchRabbitDeletion, "schemaReplication", replication.Name) + r.Recorder.Event(replication, corev1.EventTypeNormal, "SuccessfulDelete", "successfully deleted schemaReplication") + return r.removeFinalizer(ctx, replication) + } + + err := validateResponseForDeletion(client.DeleteGlobalParameter(schemaReplicationParameterName)) + if errors.Is(err, NotFound) { + logger.Info("cannot find global parameter; no need to delete it", "parameter", schemaReplicationParameterName) + } else if err != nil { + msg := fmt.Sprintf("failed to delete global parameter '%s'", schemaReplicationParameterName) + r.Recorder.Event(replication, corev1.EventTypeWarning, "FailedDelete", msg) + logger.Error(err, msg) + return err + } + + msg := fmt.Sprintf("successfully delete '%s' global parameter", schemaReplicationParameterName) + logger.Info(msg) + r.Recorder.Event(replication, corev1.EventTypeNormal, "SuccessfulDelete", msg) + return r.removeFinalizer(ctx, replication) +} + +func (r *SchemaReplicationReconciler) getUpstreamEndpoints(ctx context.Context, replication *topology.SchemaReplication) (internal.UpstreamEndpoints, error) { + if replication.Spec.UpstreamSecret == nil { + return internal.UpstreamEndpoints{}, fmt.Errorf("no upstream secret provided") + } + secret := &corev1.Secret{} + if err := r.Get(ctx, types.NamespacedName{Name: replication.Spec.UpstreamSecret.Name, Namespace: replication.Namespace}, secret); err != nil { + return internal.UpstreamEndpoints{}, err + } + + endpoints, err := internal.GenerateSchemaReplicationParameters(secret) + if err != nil { + return internal.UpstreamEndpoints{}, err + } + + return endpoints, nil +} + +func (r *SchemaReplicationReconciler) removeFinalizer(ctx context.Context, replication *topology.SchemaReplication) error { + controllerutil.RemoveFinalizer(replication, replicationFinalizer) + if err := r.Client.Update(ctx, replication); err != nil { + return err + } + return nil +} + +func (r *SchemaReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&topology.SchemaReplication{}). + Complete(r) +} diff --git a/controllers/schemareplication_controller_test.go b/controllers/schemareplication_controller_test.go new file mode 100644 index 00000000..e6ded7af --- /dev/null +++ b/controllers/schemareplication_controller_test.go @@ -0,0 +1,202 @@ +package controllers_test + +import ( + "bytes" + "errors" + "io/ioutil" + "net/http" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + . "github.com/onsi/gomega/gstruct" + topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("schema-replication-controller", func() { + var replication topology.SchemaReplication + var name string + + JustBeforeEach(func() { + replication = topology.SchemaReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: topology.SchemaReplicationSpec{ + UpstreamSecret: &corev1.LocalObjectReference{ + Name: "endpoints-secret", // created in 'BeforeSuite' + }, + RabbitmqClusterReference: topology.RabbitmqClusterReference{ + Name: "example-rabbit", + }, + }, + } + }) + + When("creation", func() { + When("the RabbitMQ Client returns a HTTP error response", func() { + BeforeEach(func() { + name = "test-replication-http-error" + fakeRabbitMQClient.PutGlobalParameterReturns(&http.Response{ + Status: "418 I'm a teapot", + StatusCode: 418, + }, errors.New("Some HTTP error")) + }) + + It("sets the status condition to indicate a failure to reconcile", func() { + Expect(client.Create(ctx, &replication)).To(Succeed()) + EventuallyWithOffset(1, func() []topology.Condition { + _ = client.Get( + ctx, + types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, + &replication, + ) + + return replication.Status.Conditions + }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("Some HTTP error"), + }))) + }) + }) + + When("the RabbitMQ Client returns a Go error response", func() { + BeforeEach(func() { + name = "test-replication-go-error" + fakeRabbitMQClient.PutGlobalParameterReturns(nil, errors.New("some go failure here")) + }) + + It("sets the status condition to indicate a failure to reconcile", func() { + Expect(client.Create(ctx, &replication)).To(Succeed()) + EventuallyWithOffset(1, func() []topology.Condition { + _ = client.Get( + ctx, + types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, + &replication, + ) + + return replication.Status.Conditions + }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some go failure here"), + }))) + }) + }) + + Context("success", func() { + BeforeEach(func() { + name = "test-replication-success" + fakeRabbitMQClient.PutGlobalParameterReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + }) + + It("sets the status condition 'Ready' to 'true'", func() { + Expect(client.Create(ctx, &replication)).To(Succeed()) + EventuallyWithOffset(1, func() []topology.Condition { + _ = client.Get( + ctx, + types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, + &replication, + ) + + return replication.Status.Conditions + }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) + }) + }) + }) + + When("deletion", func() { + JustBeforeEach(func() { + fakeRabbitMQClient.PutGlobalParameterReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + Expect(client.Create(ctx, &replication)).To(Succeed()) + EventuallyWithOffset(1, func() []topology.Condition { + _ = client.Get( + ctx, + types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, + &replication, + ) + + return replication.Status.Conditions + }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) + }) + + When("the RabbitMQ Client returns a HTTP error response", func() { + BeforeEach(func() { + name = "delete-replication-http-error" + fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ + Status: "502 Bad Gateway", + StatusCode: http.StatusBadGateway, + Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + }, nil) + }) + + It("raises an event to indicate a failure to delete", func() { + Expect(client.Delete(ctx, &replication)).To(Succeed()) + Consistently(func() bool { + err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + return apierrors.IsNotFound(err) + }, 5).Should(BeFalse()) + Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete global parameter 'schema_definition_sync_upstream'")) + }) + }) + + When("the RabbitMQ Client returns a Go error response", func() { + BeforeEach(func() { + name = "delete-replication-go-error" + fakeRabbitMQClient.DeleteGlobalParameterReturns(nil, errors.New("some error")) + }) + + It("publishes a 'warning' event", func() { + Expect(client.Delete(ctx, &replication)).To(Succeed()) + Consistently(func() bool { + err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + return apierrors.IsNotFound(err) + }, 5).Should(BeFalse()) + Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete global parameter 'schema_definition_sync_upstream'")) + }) + }) + + Context("success", func() { + BeforeEach(func() { + name = "delete-replication-success" + fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ + Status: "204 No Content", + StatusCode: http.StatusNoContent, + }, nil) + }) + + It("publishes a normal event", func() { + Expect(client.Delete(ctx, &replication)).To(Succeed()) + Eventually(func() bool { + err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + return apierrors.IsNotFound(err) + }, 5).Should(BeTrue()) + observedEvents := observedEvents() + Expect(observedEvents).NotTo(ContainElement("Warning FailedDelete failed to delete global parameter 'schema_definition_sync_upstream'")) + Expect(observedEvents).To(ContainElement("Normal SuccessfulDelete successfully delete 'schema_definition_sync_upstream' global parameter")) + }) + }) + }) +}) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index e133f83e..bffb8f3c 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -12,6 +12,8 @@ package controllers_test import ( "context" "errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "path/filepath" "testing" @@ -124,6 +126,13 @@ var _ = BeforeSuite(func(done Done) { RabbitmqClientFactory: fakeRabbitMQClientFactory, }).SetupWithManager(mgr) Expect(err).ToNot(HaveOccurred()) + err = (&controllers.SchemaReplicationReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + }).SetupWithManager(mgr) + Expect(err).ToNot(HaveOccurred()) go func() { err = mgr.Start(ctrl.SetupSignalHandler()) @@ -133,6 +142,21 @@ var _ = BeforeSuite(func(done Done) { client = mgr.GetClient() Expect(client).ToNot(BeNil()) + // used in schema-replication-controller test + secret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoints-secret", + Namespace: "default", + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "username": []byte("a-random-user"), + "password": []byte("a-random-password"), + "endpoints": []byte("a.endpoints.local:5672,b.endpoints.local:5672,c.endpoints.local:5672"), + }, + } + Expect(client.Create(ctx, &secret)).To(Succeed()) + close(done) }, 60) diff --git a/docs/api/rabbitmq.com.ref.asciidoc b/docs/api/rabbitmq.com.ref.asciidoc index 1eea0d0d..5002585d 100644 --- a/docs/api/rabbitmq.com.ref.asciidoc +++ b/docs/api/rabbitmq.com.ref.asciidoc @@ -24,6 +24,8 @@ Package v1alpha2 contains API Schema definitions for the rabbitmq.com v1alpha2 A - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-policylist[$$PolicyList$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-queue[$$Queue$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-queuelist[$$QueueList$$] +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication[$$SchemaReplication$$] +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationlist[$$SchemaReplicationList$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-user[$$User$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-userlist[$$UserList$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-vhost[$$Vhost$$] @@ -127,6 +129,7 @@ BindingStatus defines the observed state of Binding - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-permissionstatus[$$PermissionStatus$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-policystatus[$$PolicyStatus$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-queuestatus[$$QueueStatus$$] +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationstatus[$$SchemaReplicationStatus$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-userstatus[$$UserStatus$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-vhoststatus[$$VhostStatus$$] **** @@ -495,6 +498,7 @@ QueueStatus defines the observed state of Queue - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-permissionspec[$$PermissionSpec$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-policyspec[$$PolicySpec$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-queuespec[$$QueueSpec$$] +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationspec[$$SchemaReplicationSpec$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-userspec[$$UserSpec$$] - xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-vhostspec[$$VhostSpec$$] **** @@ -506,6 +510,84 @@ QueueStatus defines the observed state of Queue |=== +[id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication"] +==== SchemaReplication + +SchemaReplication is the Schema for the schemareplications API This feature requires Tanzu RabbitMQ with schema replication plugin. For more information, see: https://tanzu.vmware.com/rabbitmq and https://www.rabbitmq.com/definitions-standby.html. + +.Appears In: +**** +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationlist[$$SchemaReplicationList$$] +**** + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`apiVersion`* __string__ | `rabbitmq.com/v1alpha2` +| *`kind`* __string__ | `SchemaReplication` +| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. + +| *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationspec[$$SchemaReplicationSpec$$]__ | +| *`status`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationstatus[$$SchemaReplicationStatus$$]__ | +|=== + + +[id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationlist"] +==== SchemaReplicationList + +SchemaReplicationList contains a list of SchemaReplication + + + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`apiVersion`* __string__ | `rabbitmq.com/v1alpha2` +| *`kind`* __string__ | `SchemaReplicationList` +| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. + +| *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication[$$SchemaReplication$$]__ | +|=== + + +[id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationspec"] +==== SchemaReplicationSpec + +SchemaReplicationSpec defines the desired state of SchemaReplication + +.Appears In: +**** +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication[$$SchemaReplication$$] +**** + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`rabbitmqClusterReference`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that schema replication would be set for. Must be an existing cluster. +| *`upstreamSecret`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#localobjectreference-v1-core[$$LocalObjectReference$$]__ | Defines a Secret which contains credentials to be used for schema replication. The Secret must contain the keys `endpoints`, `username` and `password` in its Data field, or the controller errors. +|=== + + +[id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplicationstatus"] +==== SchemaReplicationStatus + +SchemaReplicationStatus defines the observed state of SchemaReplication + +.Appears In: +**** +- xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-schemareplication[$$SchemaReplication$$] +**** + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`observedGeneration`* __integer__ | observedGeneration is the most recent successful generation observed for this Queue. It corresponds to the Queue's generation, which is updated on mutation by the API Server. +| *`conditions`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-condition[$$Condition$$]__ | +|=== + + [id="{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-user"] ==== User diff --git a/internal/internalfakes/fake_rabbit_mqclient.go b/internal/internalfakes/fake_rabbit_mqclient.go index 97e6eb02..295bc853 100644 --- a/internal/internalfakes/fake_rabbit_mqclient.go +++ b/internal/internalfakes/fake_rabbit_mqclient.go @@ -96,6 +96,19 @@ type FakeRabbitMQClient struct { result1 *http.Response result2 error } + DeleteGlobalParameterStub func(string) (*http.Response, error) + deleteGlobalParameterMutex sync.RWMutex + deleteGlobalParameterArgsForCall []struct { + arg1 string + } + deleteGlobalParameterReturns struct { + result1 *http.Response + result2 error + } + deleteGlobalParameterReturnsOnCall map[int]struct { + result1 *http.Response + result2 error + } DeletePolicyStub func(string, string) (*http.Response, error) deletePolicyMutex sync.RWMutex deletePolicyArgsForCall []struct { @@ -181,6 +194,20 @@ type FakeRabbitMQClient struct { result1 []rabbithole.BindingInfo result2 error } + PutGlobalParameterStub func(string, interface{}) (*http.Response, error) + putGlobalParameterMutex sync.RWMutex + putGlobalParameterArgsForCall []struct { + arg1 string + arg2 interface{} + } + putGlobalParameterReturns struct { + result1 *http.Response + result2 error + } + putGlobalParameterReturnsOnCall map[int]struct { + result1 *http.Response + result2 error + } PutPolicyStub func(string, string, rabbithole.Policy) (*http.Response, error) putPolicyMutex sync.RWMutex putPolicyArgsForCall []struct { @@ -635,6 +662,70 @@ func (fake *FakeRabbitMQClient) DeleteExchangeReturnsOnCall(i int, result1 *http }{result1, result2} } +func (fake *FakeRabbitMQClient) DeleteGlobalParameter(arg1 string) (*http.Response, error) { + fake.deleteGlobalParameterMutex.Lock() + ret, specificReturn := fake.deleteGlobalParameterReturnsOnCall[len(fake.deleteGlobalParameterArgsForCall)] + fake.deleteGlobalParameterArgsForCall = append(fake.deleteGlobalParameterArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.DeleteGlobalParameterStub + fakeReturns := fake.deleteGlobalParameterReturns + fake.recordInvocation("DeleteGlobalParameter", []interface{}{arg1}) + fake.deleteGlobalParameterMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterCallCount() int { + fake.deleteGlobalParameterMutex.RLock() + defer fake.deleteGlobalParameterMutex.RUnlock() + return len(fake.deleteGlobalParameterArgsForCall) +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterCalls(stub func(string) (*http.Response, error)) { + fake.deleteGlobalParameterMutex.Lock() + defer fake.deleteGlobalParameterMutex.Unlock() + fake.DeleteGlobalParameterStub = stub +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterArgsForCall(i int) string { + fake.deleteGlobalParameterMutex.RLock() + defer fake.deleteGlobalParameterMutex.RUnlock() + argsForCall := fake.deleteGlobalParameterArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterReturns(result1 *http.Response, result2 error) { + fake.deleteGlobalParameterMutex.Lock() + defer fake.deleteGlobalParameterMutex.Unlock() + fake.DeleteGlobalParameterStub = nil + fake.deleteGlobalParameterReturns = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + +func (fake *FakeRabbitMQClient) DeleteGlobalParameterReturnsOnCall(i int, result1 *http.Response, result2 error) { + fake.deleteGlobalParameterMutex.Lock() + defer fake.deleteGlobalParameterMutex.Unlock() + fake.DeleteGlobalParameterStub = nil + if fake.deleteGlobalParameterReturnsOnCall == nil { + fake.deleteGlobalParameterReturnsOnCall = make(map[int]struct { + result1 *http.Response + result2 error + }) + } + fake.deleteGlobalParameterReturnsOnCall[i] = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + func (fake *FakeRabbitMQClient) DeletePolicy(arg1 string, arg2 string) (*http.Response, error) { fake.deletePolicyMutex.Lock() ret, specificReturn := fake.deletePolicyReturnsOnCall[len(fake.deletePolicyArgsForCall)] @@ -1026,6 +1117,71 @@ func (fake *FakeRabbitMQClient) ListQueueBindingsBetweenReturnsOnCall(i int, res }{result1, result2} } +func (fake *FakeRabbitMQClient) PutGlobalParameter(arg1 string, arg2 interface{}) (*http.Response, error) { + fake.putGlobalParameterMutex.Lock() + ret, specificReturn := fake.putGlobalParameterReturnsOnCall[len(fake.putGlobalParameterArgsForCall)] + fake.putGlobalParameterArgsForCall = append(fake.putGlobalParameterArgsForCall, struct { + arg1 string + arg2 interface{} + }{arg1, arg2}) + stub := fake.PutGlobalParameterStub + fakeReturns := fake.putGlobalParameterReturns + fake.recordInvocation("PutGlobalParameter", []interface{}{arg1, arg2}) + fake.putGlobalParameterMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterCallCount() int { + fake.putGlobalParameterMutex.RLock() + defer fake.putGlobalParameterMutex.RUnlock() + return len(fake.putGlobalParameterArgsForCall) +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterCalls(stub func(string, interface{}) (*http.Response, error)) { + fake.putGlobalParameterMutex.Lock() + defer fake.putGlobalParameterMutex.Unlock() + fake.PutGlobalParameterStub = stub +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterArgsForCall(i int) (string, interface{}) { + fake.putGlobalParameterMutex.RLock() + defer fake.putGlobalParameterMutex.RUnlock() + argsForCall := fake.putGlobalParameterArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterReturns(result1 *http.Response, result2 error) { + fake.putGlobalParameterMutex.Lock() + defer fake.putGlobalParameterMutex.Unlock() + fake.PutGlobalParameterStub = nil + fake.putGlobalParameterReturns = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + +func (fake *FakeRabbitMQClient) PutGlobalParameterReturnsOnCall(i int, result1 *http.Response, result2 error) { + fake.putGlobalParameterMutex.Lock() + defer fake.putGlobalParameterMutex.Unlock() + fake.PutGlobalParameterStub = nil + if fake.putGlobalParameterReturnsOnCall == nil { + fake.putGlobalParameterReturnsOnCall = make(map[int]struct { + result1 *http.Response + result2 error + }) + } + fake.putGlobalParameterReturnsOnCall[i] = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + func (fake *FakeRabbitMQClient) PutPolicy(arg1 string, arg2 string, arg3 rabbithole.Policy) (*http.Response, error) { fake.putPolicyMutex.Lock() ret, specificReturn := fake.putPolicyReturnsOnCall[len(fake.putPolicyArgsForCall)] @@ -1303,6 +1459,8 @@ func (fake *FakeRabbitMQClient) Invocations() map[string][][]interface{} { defer fake.deleteBindingMutex.RUnlock() fake.deleteExchangeMutex.RLock() defer fake.deleteExchangeMutex.RUnlock() + fake.deleteGlobalParameterMutex.RLock() + defer fake.deleteGlobalParameterMutex.RUnlock() fake.deletePolicyMutex.RLock() defer fake.deletePolicyMutex.RUnlock() fake.deleteQueueMutex.RLock() @@ -1315,6 +1473,8 @@ func (fake *FakeRabbitMQClient) Invocations() map[string][][]interface{} { defer fake.listExchangeBindingsBetweenMutex.RUnlock() fake.listQueueBindingsBetweenMutex.RLock() defer fake.listQueueBindingsBetweenMutex.RUnlock() + fake.putGlobalParameterMutex.RLock() + defer fake.putGlobalParameterMutex.RUnlock() fake.putPolicyMutex.RLock() defer fake.putPolicyMutex.RUnlock() fake.putUserMutex.RLock() diff --git a/internal/rabbitmq_client_factory.go b/internal/rabbitmq_client_factory.go index bef4c9d8..bd44381a 100644 --- a/internal/rabbitmq_client_factory.go +++ b/internal/rabbitmq_client_factory.go @@ -43,6 +43,8 @@ type RabbitMQClient interface { DeleteExchange(string, string) (*http.Response, error) PutVhost(string, rabbithole.VhostSettings) (*http.Response, error) DeleteVhost(string) (*http.Response, error) + PutGlobalParameter(name string, value interface{}) (*http.Response, error) + DeleteGlobalParameter(name string) (*http.Response, error) } type RabbitMQClientFactory func(ctx context.Context, c client.Client, rmq topology.RabbitmqClusterReference, namespace string) (RabbitMQClient, error) diff --git a/internal/schema_replication.go b/internal/schema_replication.go new file mode 100644 index 00000000..ce65fd2b --- /dev/null +++ b/internal/schema_replication.go @@ -0,0 +1,47 @@ +/* +RabbitMQ Messaging Topology Kubernetes Operator +Copyright 2021 VMware, Inc. + +This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License. + +This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file. +*/ + +package internal + +import ( + "fmt" + corev1 "k8s.io/api/core/v1" + "strings" +) + +type UpstreamEndpoints struct { + Username string `json:"username"` + Password string `json:"password"` + Endpoints []string `json:"endpoints"` +} + +func GenerateSchemaReplicationParameters(secret *corev1.Secret) (UpstreamEndpoints, error) { + username, ok := secret.Data["username"] + if !ok { + return UpstreamEndpoints{}, fmt.Errorf("could not find username in secret %s", secret.Name) + } + password, ok := secret.Data["password"] + if !ok { + return UpstreamEndpoints{}, fmt.Errorf("could not find password in secret %s", secret.Name) + } + + endpoints, ok := secret.Data["endpoints"] + if !ok { + return UpstreamEndpoints{}, fmt.Errorf("could not find endpoints in secret %s", secret.Name) + } + + endpointsList := strings.Split(string(endpoints), ",") + + return UpstreamEndpoints{ + Username: string(username), + Password: string(password), + Endpoints: endpointsList, + }, nil + +} diff --git a/internal/schema_replication_test.go b/internal/schema_replication_test.go new file mode 100644 index 00000000..be55a376 --- /dev/null +++ b/internal/schema_replication_test.go @@ -0,0 +1,35 @@ +package internal_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/rabbitmq/messaging-topology-operator/internal" + corev1 "k8s.io/api/core/v1" +) + +var _ = Describe("GenerateSchemaReplicationParameters", func() { + var secret corev1.Secret + + BeforeEach(func() { + secret = corev1.Secret{ + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "username": []byte("a-random-user"), + "password": []byte("a-random-password"), + "endpoints": []byte("a.endpoints.local:5672,b.endpoints.local:5672,c.endpoints.local:5672"), + }, + } + }) + + It("generates expected replication parameters", func() { + parameters, err := internal.GenerateSchemaReplicationParameters(&secret) + Expect(err).NotTo(HaveOccurred()) + Expect(parameters.Username).To(Equal("a-random-user")) + Expect(parameters.Password).To(Equal("a-random-password")) + Expect(parameters.Endpoints).To(Equal([]string{ + "a.endpoints.local:5672", + "b.endpoints.local:5672", + "c.endpoints.local:5672", + })) + }) +}) diff --git a/main.go b/main.go index cd8c6f0b..659470ea 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ import ( rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" + rabbitmqcomv1alpha2 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" "github.com/rabbitmq/messaging-topology-operator/controllers" "github.com/rabbitmq/messaging-topology-operator/internal" @@ -37,6 +38,7 @@ func init() { _ = rabbitmqv1beta1.AddToScheme(scheme) _ = topology.AddToScheme(scheme) + _ = rabbitmqcomv1alpha2.AddToScheme(scheme) // +kubebuilder:scaffold:scheme } @@ -128,6 +130,16 @@ func main() { log.Error(err, "unable to create controller", "controller", controllers.PermissionControllerName) os.Exit(1) } + if err = (&controllers.SchemaReplicationReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("SchemaReplication"), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor(controllers.SchemaReplicationControllerName), + RabbitmqClientFactory: internal.RabbitholeClientFactory, + }).SetupWithManager(mgr); err != nil { + log.Error(err, "unable to create controller", "controller", controllers.SchemaReplicationControllerName) + os.Exit(1) + } if err = (&topology.Binding{}).SetupWebhookWithManager(mgr); err != nil { log.Error(err, "unable to create webhook", "webhook", "Binding") diff --git a/system_tests/schema_replication_system_test.go b/system_tests/schema_replication_system_test.go new file mode 100644 index 00000000..315c83d0 --- /dev/null +++ b/system_tests/schema_replication_system_test.go @@ -0,0 +1,103 @@ +package system_tests + +import ( + "context" + rabbithole "github.com/michaelklishin/rabbit-hole/v2" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" +) + +var _ = Describe("schema replication", func() { + + var ( + endpointsSecret corev1.Secret + namespace = MustHaveEnv("NAMESPACE") + ctx = context.Background() + replication = &topology.SchemaReplication{} + ) + + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &endpointsSecret, &client.DeleteOptions{})).To(Succeed()) + }) + + BeforeEach(func() { + endpointsSecret = corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoints-secret", + Namespace: namespace, + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "endpoints": []byte("abc.endpoints.local:5672,efg.endpoints.local:1234"), + "username": []byte("some-username"), + "password": []byte("some-password"), + }, + } + Expect(k8sClient.Create(ctx, &endpointsSecret, &client.CreateOptions{})).To(Succeed()) + replication = &topology.SchemaReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "replication", + Namespace: namespace, + }, + Spec: topology.SchemaReplicationSpec{ + UpstreamSecret: &corev1.LocalObjectReference{ + Name: "endpoints-secret", + }, + RabbitmqClusterReference: topology.RabbitmqClusterReference{ + Name: rmq.Name, + }, + }, + } + }) + + It("works", func() { + By("setting schema replication upstream global parameters successfully") + Expect(k8sClient.Create(ctx, replication, &client.CreateOptions{})).To(Succeed()) + var allGlobalParams []rabbithole.GlobalRuntimeParameter + Eventually(func() []rabbithole.GlobalRuntimeParameter { + var err error + allGlobalParams, err = rabbitClient.ListGlobalParameters() + Expect(err).NotTo(HaveOccurred()) + return allGlobalParams + }, 5, 2).Should(HaveLen(3)) // cluster_name and internal_cluster_id are set by default by RabbitMQ + + Expect(allGlobalParams).To(ContainElement( + rabbithole.GlobalRuntimeParameter{ + Name: "schema_definition_sync_upstream", + Value: map[string]interface{}{ + "endpoints": []interface{}{"abc.endpoints.local:5672", "efg.endpoints.local:1234"}, + "username": "some-username", + "password": "some-password", + }, + })) + + By("updating status condition 'Ready'") + updatedReplication := topology.SchemaReplication{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &updatedReplication)).To(Succeed()) + + Expect(updatedReplication.Status.Conditions).To(HaveLen(1)) + readyCondition := updatedReplication.Status.Conditions[0] + Expect(string(readyCondition.Type)).To(Equal("Ready")) + Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue)) + Expect(readyCondition.Reason).To(Equal("SuccessfulCreateOrUpdate")) + Expect(readyCondition.LastTransitionTime).NotTo(Equal(metav1.Time{})) + + By("setting status.observedGeneration") + Expect(updatedReplication.Status.ObservedGeneration).To(Equal(updatedReplication.GetGeneration())) + + By("unsetting schema replication upstream global parameters on deletion") + Expect(k8sClient.Delete(ctx, replication)).To(Succeed()) + Eventually(func() []rabbithole.GlobalRuntimeParameter { + var err error + allGlobalParams, err = rabbitClient.ListGlobalParameters() + Expect(err).NotTo(HaveOccurred()) + return allGlobalParams + }, 5, 2).Should(HaveLen(2)) // cluster_name and internal_cluster_id are set by default by RabbitMQ + }) +}) From c1e913d2ad41959597a6d254ddaf85c90f8dc72c Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Thu, 15 Apr 2021 15:57:39 +0100 Subject: [PATCH 3/3] Add webhook for schemareplication to prevent updates on rabbitmqClusterReference --- api/v1alpha2/schemareplication_types.go | 11 ++++- api/v1alpha2/schemareplication_webhook.go | 44 +++++++++++++++++++ .../schemareplication_webhook_test.go | 41 +++++++++++++++++ .../rabbitmq.com_schemareplications.yaml | 3 +- config/webhook/manifests.yaml | 20 +++++++++ docs/api/rabbitmq.com.ref.asciidoc | 2 +- go.sum | 30 +++++++++---- main.go | 6 ++- .../schema_replication_system_test.go | 6 +++ 9 files changed, 149 insertions(+), 14 deletions(-) create mode 100644 api/v1alpha2/schemareplication_webhook.go create mode 100644 api/v1alpha2/schemareplication_webhook_test.go diff --git a/api/v1alpha2/schemareplication_types.go b/api/v1alpha2/schemareplication_types.go index 1bd36b24..ab31bf82 100644 --- a/api/v1alpha2/schemareplication_types.go +++ b/api/v1alpha2/schemareplication_types.go @@ -12,6 +12,7 @@ package v1alpha2 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" ) // SchemaReplicationSpec defines the desired state of SchemaReplication @@ -20,7 +21,8 @@ type SchemaReplicationSpec struct { // +kubebuilder:validation:Required RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"` // Defines a Secret which contains credentials to be used for schema replication. - // The Secret must contain the keys `endpoints`, `username` and `password` in its Data field, or the controller errors. + // The Secret must contain the keys `endpoints`, `username` and `password` in its Data field, or operator will error. + // `endpoints` should be one or multiple endpoints separated by ','. // +kubebuilder:validation:Required UpstreamSecret *corev1.LocalObjectReference `json:"upstreamSecret,omitempty"` } @@ -59,3 +61,10 @@ type SchemaReplicationList struct { func init() { SchemeBuilder.Register(&SchemaReplication{}, &SchemaReplicationList{}) } + +func (s *SchemaReplication) GroupResource() schema.GroupResource { + return schema.GroupResource{ + Group: s.GroupVersionKind().Group, + Resource: s.GroupVersionKind().Kind, + } +} diff --git a/api/v1alpha2/schemareplication_webhook.go b/api/v1alpha2/schemareplication_webhook.go new file mode 100644 index 00000000..fa71eceb --- /dev/null +++ b/api/v1alpha2/schemareplication_webhook.go @@ -0,0 +1,44 @@ +package v1alpha2 + +import ( + "fmt" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation/field" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +func (s *SchemaReplication) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(s). + Complete() +} + +// +kubebuilder:webhook:verbs=create;update,path=/validate-rabbitmq-com-v1alpha2-schemareplication,mutating=false,failurePolicy=fail,groups=rabbitmq.com,resources=schemareplications,versions=v1alpha2,name=vschemareplication.kb.io,sideEffects=none,admissionReviewVersions=v1 + +var _ webhook.Validator = &SchemaReplication{} + +// no validation on create +func (s *SchemaReplication) ValidateCreate() error { + return nil +} + +// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type +func (s *SchemaReplication) ValidateUpdate(old runtime.Object) error { + oldReplication, ok := old.(*SchemaReplication) + if !ok { + return apierrors.NewBadRequest(fmt.Sprintf("expected a schema replication type but got a %T", old)) + } + + if s.Spec.RabbitmqClusterReference != oldReplication.Spec.RabbitmqClusterReference { + return apierrors.NewForbidden(s.GroupResource(), s.Name, + field.Forbidden(field.NewPath("spec", "rabbitmqClusterReference"), "update on rabbitmqClusterReference is forbidden")) + } + return nil +} + +// no validation on delete +func (s *SchemaReplication) ValidateDelete() error { + return nil +} diff --git a/api/v1alpha2/schemareplication_webhook_test.go b/api/v1alpha2/schemareplication_webhook_test.go new file mode 100644 index 00000000..8b9e3a4c --- /dev/null +++ b/api/v1alpha2/schemareplication_webhook_test.go @@ -0,0 +1,41 @@ +package v1alpha2 + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("schema-replication webhook", func() { + var replication = SchemaReplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-replication", + }, + Spec: SchemaReplicationSpec{ + UpstreamSecret: &corev1.LocalObjectReference{ + Name: "a-secret", + }, + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "a-cluster", + }, + }, + } + + It("does not allow updates on RabbitmqClusterReference", func() { + updated := replication.DeepCopy() + updated.Spec.RabbitmqClusterReference = RabbitmqClusterReference{ + Name: "different-cluster", + } + Expect(apierrors.IsForbidden(updated.ValidateUpdate(&replication))).To(BeTrue()) + }) + + It("allows updates on spec.upstreamSecret", func() { + updated := replication.DeepCopy() + updated.Spec.UpstreamSecret = &corev1.LocalObjectReference{ + Name: "a-different-secret", + } + Expect(updated.ValidateUpdate(&replication)).To(Succeed()) + }) +}) diff --git a/config/crd/bases/rabbitmq.com_schemareplications.yaml b/config/crd/bases/rabbitmq.com_schemareplications.yaml index 4d61c30a..5e1e45ba 100644 --- a/config/crd/bases/rabbitmq.com_schemareplications.yaml +++ b/config/crd/bases/rabbitmq.com_schemareplications.yaml @@ -50,7 +50,8 @@ spec: upstreamSecret: description: Defines a Secret which contains credentials to be used for schema replication. The Secret must contain the keys `endpoints`, - `username` and `password` in its Data field, or the controller errors. + `username` and `password` in its Data field, or operator will error. + `endpoints` should be one or multiple endpoints separated by ','. properties: name: description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 7bf8b410..4017475d 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -106,6 +106,26 @@ webhooks: resources: - queues sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-rabbitmq-com-v1alpha2-schemareplication + failurePolicy: Fail + name: vschemareplication.kb.io + rules: + - apiGroups: + - rabbitmq.com + apiVersions: + - v1alpha2 + operations: + - CREATE + - UPDATE + resources: + - schemareplications + sideEffects: None - admissionReviewVersions: - v1 clientConfig: diff --git a/docs/api/rabbitmq.com.ref.asciidoc b/docs/api/rabbitmq.com.ref.asciidoc index 5002585d..9f9e06f5 100644 --- a/docs/api/rabbitmq.com.ref.asciidoc +++ b/docs/api/rabbitmq.com.ref.asciidoc @@ -566,7 +566,7 @@ SchemaReplicationSpec defines the desired state of SchemaReplication |=== | Field | Description | *`rabbitmqClusterReference`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha2-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that schema replication would be set for. Must be an existing cluster. -| *`upstreamSecret`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#localobjectreference-v1-core[$$LocalObjectReference$$]__ | Defines a Secret which contains credentials to be used for schema replication. The Secret must contain the keys `endpoints`, `username` and `password` in its Data field, or the controller errors. +| *`upstreamSecret`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#localobjectreference-v1-core[$$LocalObjectReference$$]__ | Defines a Secret which contains credentials to be used for schema replication. The Secret must contain the keys `endpoints`, `username` and `password` in its Data field, or operator will error. `endpoints` should be one or multiple endpoints separated by ','. |=== diff --git a/go.sum b/go.sum index b30cd127..15a702eb 100644 --- a/go.sum +++ b/go.sum @@ -285,8 +285,9 @@ github.com/gofrs/flock v0.0.0-20190320160742-5135e617513b/go.mod h1:F1TvTiK9OcQq github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= -github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -433,6 +434,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/sqlstruct v0.0.0-20150923205031-648daed35d49/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kisom/goutils v1.1.0/go.mod h1:+UBTfd78habUYWFbNWTJNG+jNG/i/lGURakr4A/yNRw= @@ -493,8 +495,9 @@ github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182aff github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/maxbrunsfeld/counterfeiter/v6 v6.4.1 h1:hZD/8vBuw7x1WqRXD/WGjVjipbbo/HcDBgySYYbrUSk= github.com/maxbrunsfeld/counterfeiter/v6 v6.4.1/go.mod h1:DK1Cjkc0E49ShgRVs5jy5ASrM15svSnem3K/hiSGD8o= -github.com/michaelklishin/rabbit-hole/v2 v2.7.0 h1:y+M7/XG4KYGku90mjBP3RdvNbnlM7US2fEHlMogFUJY= github.com/michaelklishin/rabbit-hole/v2 v2.7.0/go.mod h1:VZQTDutXFmoyrLvlRjM79MEPb0+xCLLhV5yBTjwMWkM= +github.com/michaelklishin/rabbit-hole/v2 v2.8.0 h1:5tehiLwdVtCeDcrxOlvoveRqU/AJMOcMeQntSf63fdc= +github.com/michaelklishin/rabbit-hole/v2 v2.8.0/go.mod h1:VZQTDutXFmoyrLvlRjM79MEPb0+xCLLhV5yBTjwMWkM= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ= @@ -798,8 +801,9 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449 h1:xUIPaMhvROX9dhPvRCenIJtU78+lbEenGbgqB5hfHCQ= +golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -843,6 +847,7 @@ golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210224082022-3d97a244fca7/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c h1:KHUzaHIpjWVlVVNh65G3hhuj3KB1HnjY6Cq5cTvRQT8= golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -912,6 +917,7 @@ golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -987,7 +993,9 @@ golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw= golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200616133436-c1934b75d054/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1128,8 +1136,8 @@ k8s.io/client-go v0.20.5/go.mod h1:Ee5OOMMYvlH8FCZhDsacjMlCBwetbGZETwo1OA+e6Zw= k8s.io/code-generator v0.0.0-20190912054826-cd179ad6a269/go.mod h1:V5BD6M4CyaN5m+VthcclXWsVcT1Hu+glwa1bi3MIsyE= k8s.io/code-generator v0.20.1/go.mod h1:UsqdF+VX4PU2g46NC2JRs4gc+IfrctnwHb76RNbWHJg= k8s.io/code-generator v0.20.2/go.mod h1:UsqdF+VX4PU2g46NC2JRs4gc+IfrctnwHb76RNbWHJg= -k8s.io/code-generator v0.20.5 h1:qQp2F2ZnosUqeV7ZqKE6bQnf7x9Ps9RFfKZxw1r5HsM= -k8s.io/code-generator v0.20.5/go.mod h1:UsqdF+VX4PU2g46NC2JRs4gc+IfrctnwHb76RNbWHJg= +k8s.io/code-generator v0.21.0 h1:LGWJOvkbBNpuRBqBRXUjzfvymUh7F/iR2KDpwLnqCM4= +k8s.io/code-generator v0.21.0/go.mod h1:hUlps5+9QaTrKx+jiM4rmq7YmH8wPOIko64uZCHDh6Q= k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090/go.mod h1:933PBGtQFJky3TEwYx4aEPZ4IxqhWh3R6DCmzqIn1hA= k8s.io/component-base v0.20.1/go.mod h1:guxkoJnNoh8LNrbtiQOlyp2Y2XFCZQmrcg2n/DeYNLk= k8s.io/component-base v0.20.2 h1:LMmu5I0pLtwjpp5009KLuMGFqSc2S2isGw8t1hpYKLE= @@ -1137,20 +1145,23 @@ k8s.io/component-base v0.20.2/go.mod h1:pzFtCiwe/ASD0iV7ySMu8SYVJjCapNM9bjvk7ptp k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20190822140433-26a664648505/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= -k8s.io/gengo v0.0.0-20201113003025-83324d819ded h1:JApXBKYyB7l9xx+DK7/+mFjC7A9Bt5A93FPvFD0HIFE= k8s.io/gengo v0.0.0-20201113003025-83324d819ded/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= +k8s.io/gengo v0.0.0-20201214224949-b6c5ce23f027 h1:Uusb3oh8XcdzDF/ndlI4ToKTYVlkCSJP39SRY2mfRAw= +k8s.io/gengo v0.0.0-20201214224949-b6c5ce23f027/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v0.4.0 h1:lCJCxf/LIowc2IGS9TPjWDyXY4nOmdGdfcwwDQCOURQ= k8s.io/klog v0.4.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= -k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ= k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= +k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts= +k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E= k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o= -k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd h1:sOHNzJIkytDF6qadMNKhhDRpc6ODik8lVC6nOur7B2c= k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM= +k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 h1:vEx13qjvaZ4yfObSSXW7BrMc/KQBBT/Jyee8XtLf4x0= +k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210111153108-fddb29f9d009 h1:0T5IaWHO3sJTEmCP6mUlBvMukxPKUQWqiI/YuiBNMiQ= @@ -1186,8 +1197,9 @@ sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:w sigs.k8s.io/structured-merge-diff v0.0.0-20190817042607-6149e4549fca h1:6dsH6AYQWbyZmtttJNe8Gq1cXOeS1BdV3eW37zHilAQ= sigs.k8s.io/structured-merge-diff v0.0.0-20190817042607-6149e4549fca/go.mod h1:IIgPezJWb76P0hotTxzDbWsMYB8APh18qZnxkomBpxA= sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= -sigs.k8s.io/structured-merge-diff/v4 v4.0.2 h1:YHQV7Dajm86OuqnIR6zAelnDWBRjo+YhYV9PmGrh1s8= sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= +sigs.k8s.io/structured-merge-diff/v4 v4.1.0 h1:C4r9BgJ98vrKnnVCjwCSXcWjWe0NKcUQkmzDXZXGwH8= +sigs.k8s.io/structured-merge-diff/v4 v4.1.0/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= diff --git a/main.go b/main.go index 659470ea..afddab76 100644 --- a/main.go +++ b/main.go @@ -21,7 +21,6 @@ import ( rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" - rabbitmqcomv1alpha2 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2" "github.com/rabbitmq/messaging-topology-operator/controllers" "github.com/rabbitmq/messaging-topology-operator/internal" @@ -38,7 +37,6 @@ func init() { _ = rabbitmqv1beta1.AddToScheme(scheme) _ = topology.AddToScheme(scheme) - _ = rabbitmqcomv1alpha2.AddToScheme(scheme) // +kubebuilder:scaffold:scheme } @@ -169,6 +167,10 @@ func main() { log.Error(err, "unable to create webhook", "webhook", "Permission") os.Exit(1) } + if err = (&topology.SchemaReplication{}).SetupWebhookWithManager(mgr); err != nil { + log.Error(err, "unable to create webhook", "webhook", "SchemaReplication") + os.Exit(1) + } // +kubebuilder:scaffold:builder log.Info("starting manager") diff --git a/system_tests/schema_replication_system_test.go b/system_tests/schema_replication_system_test.go index 315c83d0..36e65615 100644 --- a/system_tests/schema_replication_system_test.go +++ b/system_tests/schema_replication_system_test.go @@ -91,6 +91,12 @@ var _ = Describe("schema replication", func() { By("setting status.observedGeneration") Expect(updatedReplication.Status.ObservedGeneration).To(Equal(updatedReplication.GetGeneration())) + By("not allowing updates on rabbitmqClusterReference") + updateTest := topology.SchemaReplication{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &updateTest)).To(Succeed()) + updateTest.Spec.RabbitmqClusterReference.Name = "new-cluster" + Expect(k8sClient.Update(ctx, &updateTest).Error()).To(ContainSubstring("spec.rabbitmqClusterReference: Forbidden: update on rabbitmqClusterReference is forbidden")) + By("unsetting schema replication upstream global parameters on deletion") Expect(k8sClient.Delete(ctx, replication)).To(Succeed()) Eventually(func() []rabbithole.GlobalRuntimeParameter {