Skip to content

Commit

Permalink
controllers: add first version of rate limit controller
Browse files Browse the repository at this point in the history
  • Loading branch information
davidor committed Jan 14, 2021
1 parent a311867 commit e8b8693
Show file tree
Hide file tree
Showing 5 changed files with 360 additions and 10 deletions.
152 changes: 147 additions & 5 deletions controllers/ratelimit_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,178 @@ package controllers

import (
"context"

"github.com/3scale/limitador-operator/pkg/helpers"
"github.com/3scale/limitador-operator/pkg/limitador"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"net/url"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"strconv"

limitadorv1alpha1 "github.com/3scale/limitador-operator/api/v1alpha1"
)

const rateLimitFinalizer = "finalizer.ratelimit.limitador.3scale.net"

// Assumes that there's only one Limitador per namespace. We might want to
// change this in the future.
type LimitadorServiceDiscovery interface {
URL(namespace string) (*url.URL, error)
}

type defaultLimitadorServiceDiscovery struct{}

func (LimitadorServiceDiscovery *defaultLimitadorServiceDiscovery) URL(namespace string) (*url.URL, error) {
serviceUrl := "http://" + limitador.ServiceName + "." + namespace + ".svc.cluster.local:" +
strconv.Itoa(limitador.ServiceHTTPPort)

return url.Parse(serviceUrl)
}

// RateLimitReconciler reconciles a RateLimit object
type RateLimitReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Log logr.Logger
Scheme *runtime.Scheme
limitadorDiscovery LimitadorServiceDiscovery
}

func NewRateLimitReconciler(kubeClient client.Client, logger logr.Logger, scheme *runtime.Scheme) RateLimitReconciler {
limitadorServiceDiscovery := defaultLimitadorServiceDiscovery{}

return RateLimitReconciler{
Client: kubeClient,
Log: logger,
Scheme: scheme,
limitadorDiscovery: &limitadorServiceDiscovery,
}
}

// +kubebuilder:rbac:groups=limitador.3scale.net,resources=ratelimits,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=limitador.3scale.net,resources=ratelimits/status,verbs=get;update;patch

func (r *RateLimitReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
_ = context.Background()
_ = r.Log.WithValues("ratelimit", req.NamespacedName)
reqLogger := r.Log.WithValues("ratelimit", req.NamespacedName)

limit := &limitadorv1alpha1.RateLimit{}
if err := r.Get(context.TODO(), req.NamespacedName, limit); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}

reqLogger.Error(err, "Failed to get RateLimit object.")
return ctrl.Result{}, err
}

isLimitMarkedToBeDeleted := limit.GetDeletionTimestamp() != nil
if isLimitMarkedToBeDeleted {
if helpers.Contains(limit.GetFinalizers(), rateLimitFinalizer) {
if err := r.finalizeRateLimit(limit); err != nil {
return ctrl.Result{}, err
}

// Remove finalizer. Once all finalizers have been removed, the
// object will be deleted.
controllerutil.RemoveFinalizer(limit, rateLimitFinalizer)
if err := r.Update(context.TODO(), limit); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

if err := r.ensureFinalizerIsAdded(limit, reqLogger); err != nil {
return ctrl.Result{}, err
}

// your logic here
if err := r.createLimitInLimitador(limit); err != nil {
reqLogger.Error(err, "Failed to create rate limit in Limitador.")
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

func (r *RateLimitReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&limitadorv1alpha1.RateLimit{}).
WithEventFilter(r.updateLimitPredicate()).
Complete(r)
}

// This should be temporary. This is not how a filter should be used. However,
// with the current Limitador API, when updating a limit, we need both the
// current and the previous version. After updating the Limitador API to work
// with IDs, this won't be needed.
func (r *RateLimitReconciler) updateLimitPredicate() predicate.Predicate {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
oldVersion := e.ObjectOld.(*limitadorv1alpha1.RateLimit)
newVersion := e.ObjectNew.(*limitadorv1alpha1.RateLimit)

if oldVersion.ObjectMeta.Generation == newVersion.ObjectMeta.Generation {
return false
}

// The namespace should be the same in the old and the new version,
// so we can use either.
limitadorUrl, err := r.limitadorDiscovery.URL(newVersion.Namespace)
if err != nil {
return false
}

limitadorClient := limitador.NewClient(*limitadorUrl)

// Try to create the new version even if the old one can't be
// deleted. This might leave in Limitador limits that should no
// longer be there. As this function should only be temporary this
// should be fine for a first version of the controller.
_ = limitadorClient.DeleteLimit(&oldVersion.Spec)

return true
},
}
}

func (r *RateLimitReconciler) createLimitInLimitador(limit *limitadorv1alpha1.RateLimit) error {
limitadorUrl, err := r.limitadorDiscovery.URL(limit.Namespace)
if err != nil {
return err
}

limitadorClient := limitador.NewClient(*limitadorUrl)
return limitadorClient.CreateLimit(&limit.Spec)
}

func (r *RateLimitReconciler) ensureFinalizerIsAdded(limit *limitadorv1alpha1.RateLimit, reqLogger logr.Logger) error {
numberOfFinalizers := len(limit.GetFinalizers())
controllerutil.AddFinalizer(limit, rateLimitFinalizer)
if numberOfFinalizers == len(limit.GetFinalizers()) {
// The finalizer was already there, no need to update
return nil
}

if err := r.Update(context.TODO(), limit); err != nil {
reqLogger.Error(err, "Failed to update the rate limit with finalizer")
return err
}

return nil
}

func (r *RateLimitReconciler) finalizeRateLimit(rateLimit *limitadorv1alpha1.RateLimit) error {
limitadorUrl, err := r.limitadorDiscovery.URL(rateLimit.Namespace)
if err != nil {
return err
}

limitadorClient := limitador.NewClient(*limitadorUrl)
return limitadorClient.DeleteLimit(&rateLimit.Spec)
}
169 changes: 169 additions & 0 deletions controllers/ratelimit_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package controllers

import (
"context"
limitadorv1alpha1 "github.com/3scale/limitador-operator/api/v1alpha1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/ghttp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/uuid"
"sigs.k8s.io/controller-runtime/pkg/client"
"sync"
"time"
)

var _ = Describe("RateLimit controller", func() {
const (
timeout = time.Second * 10
interval = time.Millisecond * 250
)

// Used to generate a different limit on every test so they don't collide.
var newRateLimit = func() limitadorv1alpha1.RateLimit {
// The name can't start with a number.
name := "a" + string(uuid.NewUUID())

return limitadorv1alpha1.RateLimit{
TypeMeta: metav1.TypeMeta{
Kind: "RateLimit",
APIVersion: "limitador.3scale.net/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
},
Spec: limitadorv1alpha1.RateLimitSpec{
Conditions: []string{"req.method == GET"},
MaxValue: 10,
Namespace: "test-namespace",
Seconds: 60,
Variables: []string{"user_id"},
},
}
}

// The next couple of functions are useful to verify that an HTTP request is
// made after a call to the kubernetesClient.
// The functions are wrappers for k8sClient.Create and k8sClient.Delete, so
// the signature is the same.
// We know that after creating, deleting, etc. a RateLimit CR, an HTTP
// request is made to create, delete, etc. the limit in Limitador. These
// functions are useful for waiting until the state is synchronized.

// Wraps a function with the same signature as k8sClient.Create and waits
// for an HTTP request.
var runCreateAndWaitHTTPReq = func(f func(ctx context.Context,
object runtime.Object,
opts ...client.CreateOption,
) error) func(ctx context.Context, object runtime.Object, opts ...client.CreateOption) error {
return func(ctx context.Context, object runtime.Object, opts ...client.CreateOption) error {
reqsAtStart := len(mockedHTTPServer.ReceivedRequests())

err := f(ctx, object, opts...)
if err != nil {
return err
}

Eventually(func() bool {
return len(mockedHTTPServer.ReceivedRequests()) > reqsAtStart
}, timeout, interval).Should(BeTrue())

return nil
}
}

// Wraps a function with the same signature as k8sClient.Delete and waits
// for an HTTP request.
var runDeleteAndWaitHTTPReq = func(f func(ctx context.Context,
object runtime.Object,
opts ...client.DeleteOption,
) error) func(ctx context.Context, object runtime.Object, opts ...client.DeleteOption) error {
return func(ctx context.Context, object runtime.Object, opts ...client.DeleteOption) error {
reqsAtStart := len(mockedHTTPServer.ReceivedRequests())

err := f(ctx, object, opts...)
if err != nil {
return err
}

Eventually(func() bool {
return len(mockedHTTPServer.ReceivedRequests()) > reqsAtStart
}, timeout, interval).Should(BeTrue())

return nil
}
}

var addHandlerForLimitCreation = func(limitSpecJson string) {
mockedHTTPServer.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("POST", "/limits"),
ghttp.VerifyJSON(limitSpecJson),
),
)
}

var addHandlerForLimitDeletion = func(limitSpecJson string) {
mockedHTTPServer.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("DELETE", "/limits"),
ghttp.VerifyJSON(limitSpecJson),
),
)
}

// These tests make HTTP requests to the same mocked server. Running them in
// parallel makes it difficult to reason about them.
var sequentialTestLock sync.Mutex

BeforeEach(func() {
sequentialTestLock.Lock()
defer sequentialTestLock.Unlock()
mockedHTTPServer.Reset()
})

Context("Creating a new RateLimit object", func() {
testLimit := newRateLimit()
testLimitSpecJson, _ := json.Marshal(testLimit.Spec)

BeforeEach(func() {
addHandlerForLimitCreation(string(testLimitSpecJson))
})

AfterEach(func() {
Expect(runDeleteAndWaitHTTPReq(k8sClient.Delete)(
context.TODO(), &testLimit,
)).Should(Succeed())
})

It("Should create a limit in Limitador", func() {
Expect(runCreateAndWaitHTTPReq(k8sClient.Create)(
context.TODO(), &testLimit,
)).Should(Succeed())
})
})

Context("Deleting a RateLimit object", func() {
testLimit := newRateLimit()
testLimitSpecJson, _ := json.Marshal(testLimit.Spec)

BeforeEach(func() {
addHandlerForLimitCreation(string(testLimitSpecJson))

Expect(runCreateAndWaitHTTPReq(k8sClient.Create)(
context.TODO(), &testLimit,
)).Should(Succeed())

addHandlerForLimitDeletion(string(testLimitSpecJson))
})

It("Should delete the limit in Limitador", func() {
Expect(runDeleteAndWaitHTTPReq(k8sClient.Delete)(
context.TODO(), &testLimit,
)).Should(Succeed())
})
})
})
Loading

0 comments on commit e8b8693

Please sign in to comment.