Skip to content

Commit

Permalink
Merge pull request #113 from rabbitmq/schema-sync-parameter
Browse files Browse the repository at this point in the history
Support for setting schema replication parameters
  • Loading branch information
ChunyiLyu authored Apr 15, 2021
2 parents 17614f8 + c1e913d commit 65bc9ed
Show file tree
Hide file tree
Showing 28 changed files with 1,423 additions and 19 deletions.
3 changes: 3 additions & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ resources:
- group: rabbitmq.com
kind: Permission
version: v1alpha2
- group: rabbitmq.com
kind: SchemaReplication
version: v1alpha2
version: "2"
70 changes: 70 additions & 0 deletions api/v1alpha2/schemareplication_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
RabbitMQ Messaging Topology Kubernetes Operator
Copyright 2021 VMware, Inc.
This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License.
This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file.
*/

package v1alpha2

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// SchemaReplicationSpec defines the desired state of SchemaReplication
type SchemaReplicationSpec struct {
// Reference to the RabbitmqCluster that schema replication would be set for. Must be an existing cluster.
// +kubebuilder:validation:Required
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
// Defines a Secret which contains credentials to be used for schema replication.
// The Secret must contain the keys `endpoints`, `username` and `password` in its Data field, or operator will error.
// `endpoints` should be one or multiple endpoints separated by ','.
// +kubebuilder:validation:Required
UpstreamSecret *corev1.LocalObjectReference `json:"upstreamSecret,omitempty"`
}

// SchemaReplicationStatus defines the observed state of SchemaReplication
type SchemaReplicationStatus struct {
// observedGeneration is the most recent successful generation observed for this Queue. It corresponds to the
// Queue's generation, which is updated on mutation by the API Server.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
Conditions []Condition `json:"conditions,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// SchemaReplication is the Schema for the schemareplications API
// This feature requires Tanzu RabbitMQ with schema replication plugin.
// For more information, see: https://tanzu.vmware.com/rabbitmq and https://www.rabbitmq.com/definitions-standby.html.
type SchemaReplication struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec SchemaReplicationSpec `json:"spec,omitempty"`
Status SchemaReplicationStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// SchemaReplicationList contains a list of SchemaReplication
type SchemaReplicationList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []SchemaReplication `json:"items"`
}

func init() {
SchemeBuilder.Register(&SchemaReplication{}, &SchemaReplicationList{})
}

func (s *SchemaReplication) GroupResource() schema.GroupResource {
return schema.GroupResource{
Group: s.GroupVersionKind().Group,
Resource: s.GroupVersionKind().Kind,
}
}
39 changes: 39 additions & 0 deletions api/v1alpha2/schemareplication_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package v1alpha2

import (
"context"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

var _ = Describe("schemaReplication spec", func() {
It("creates a schemaReplication", func() {
replication := SchemaReplication{
ObjectMeta: metav1.ObjectMeta{
Name: "replication",
Namespace: "default",
},
Spec: SchemaReplicationSpec{
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UpstreamSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
}}
Expect(k8sClient.Create(context.Background(), &replication)).To(Succeed())

fetched := &SchemaReplication{}
Expect(k8sClient.Get(context.Background(), types.NamespacedName{
Name: replication.Name,
Namespace: replication.Namespace,
}, fetched)).To(Succeed())
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "some-cluster",
}))
Expect(fetched.Spec.UpstreamSecret.Name).To(Equal("a-secret"))
})
})
44 changes: 44 additions & 0 deletions api/v1alpha2/schemareplication_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package v1alpha2

import (
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

func (s *SchemaReplication) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(s).
Complete()
}

// +kubebuilder:webhook:verbs=create;update,path=/validate-rabbitmq-com-v1alpha2-schemareplication,mutating=false,failurePolicy=fail,groups=rabbitmq.com,resources=schemareplications,versions=v1alpha2,name=vschemareplication.kb.io,sideEffects=none,admissionReviewVersions=v1

var _ webhook.Validator = &SchemaReplication{}

// no validation on create
func (s *SchemaReplication) ValidateCreate() error {
return nil
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (s *SchemaReplication) ValidateUpdate(old runtime.Object) error {
oldReplication, ok := old.(*SchemaReplication)
if !ok {
return apierrors.NewBadRequest(fmt.Sprintf("expected a schema replication type but got a %T", old))
}

if s.Spec.RabbitmqClusterReference != oldReplication.Spec.RabbitmqClusterReference {
return apierrors.NewForbidden(s.GroupResource(), s.Name,
field.Forbidden(field.NewPath("spec", "rabbitmqClusterReference"), "update on rabbitmqClusterReference is forbidden"))
}
return nil
}

// no validation on delete
func (s *SchemaReplication) ValidateDelete() error {
return nil
}
41 changes: 41 additions & 0 deletions api/v1alpha2/schemareplication_webhook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package v1alpha2

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("schema-replication webhook", func() {
var replication = SchemaReplication{
ObjectMeta: metav1.ObjectMeta{
Name: "test-replication",
},
Spec: SchemaReplicationSpec{
UpstreamSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "a-cluster",
},
},
}

It("does not allow updates on RabbitmqClusterReference", func() {
updated := replication.DeepCopy()
updated.Spec.RabbitmqClusterReference = RabbitmqClusterReference{
Name: "different-cluster",
}
Expect(apierrors.IsForbidden(updated.ValidateUpdate(&replication))).To(BeTrue())
})

It("allows updates on spec.upstreamSecret", func() {
updated := replication.DeepCopy()
updated.Spec.UpstreamSecret = &corev1.LocalObjectReference{
Name: "a-different-secret",
}
Expect(updated.ValidateUpdate(&replication)).To(Succeed())
})
})
102 changes: 102 additions & 0 deletions api/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 65bc9ed

Please sign in to comment.