Skip to content

Commit

Permalink
refactor: rate limit policy enforced status controller
Browse files Browse the repository at this point in the history
  • Loading branch information
KevFan committed May 3, 2024
1 parent 77ecda4 commit 037a9d0
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,27 @@ import (
limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"

kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1"
kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2"
"github.com/kuadrant/kuadrant-operator/pkg/common"
"github.com/kuadrant/kuadrant-operator/pkg/library/mappers"
)

var _ handler.EventHandler = &limitadorStatusEventHandler{}
var _ handler.EventHandler = &limitadorStatusRLPGatewayEventHandler{}

type limitadorStatusEventHandler struct {
type limitadorStatusRLPGatewayEventHandler struct {
Client client.Client
Logger logr.Logger
}

func (eh limitadorStatusEventHandler) Create(_ context.Context, _ event.CreateEvent, _ workqueue.RateLimitingInterface) {
func (eh limitadorStatusRLPGatewayEventHandler) Create(_ context.Context, _ event.CreateEvent, _ workqueue.RateLimitingInterface) {
}

func (eh limitadorStatusEventHandler) Update(ctx context.Context, e event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
func (eh limitadorStatusRLPGatewayEventHandler) Update(ctx context.Context, e event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
oldL := e.ObjectOld.(*limitadorv1alpha1.Limitador)
newL := e.ObjectNew.(*limitadorv1alpha1.Limitador)

Expand All @@ -44,17 +44,17 @@ func (eh limitadorStatusEventHandler) Update(ctx context.Context, e event.Update
}
}

func (eh limitadorStatusEventHandler) Delete(ctx context.Context, e event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
func (eh limitadorStatusRLPGatewayEventHandler) Delete(ctx context.Context, e event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
eh.Logger.V(1).Info("Limitador delete event detected")
if !eh.IsKuadrantInstalled(ctx, e.Object) || e.Object.GetName() == common.LimitadorName {
eh.enqueue(ctx, limitingInterface)
}
}

func (eh limitadorStatusEventHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) {
func (eh limitadorStatusRLPGatewayEventHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) {
}

func (eh limitadorStatusEventHandler) IsKuadrantInstalled(ctx context.Context, obj client.Object) bool {
func (eh limitadorStatusRLPGatewayEventHandler) IsKuadrantInstalled(ctx context.Context, obj client.Object) bool {
kuadrantList := &kuadrantv1beta1.KuadrantList{}
if err := eh.Client.List(ctx, kuadrantList, &client.ListOptions{Namespace: obj.GetNamespace()}); err != nil {
eh.Logger.V(1).Error(err, "failed to list kuadrant in namespace", "namespace", obj.GetNamespace())
Expand All @@ -69,16 +69,16 @@ func (eh limitadorStatusEventHandler) IsKuadrantInstalled(ctx context.Context, o

return true
}
func (eh limitadorStatusEventHandler) enqueue(ctx context.Context, limitingInterface workqueue.RateLimitingInterface) {
func (eh limitadorStatusRLPGatewayEventHandler) enqueue(ctx context.Context, limitingInterface workqueue.RateLimitingInterface) {
// List all RLPs as there's been an event from Limitador which may affect RLP status
rlpList := &kuadrantv1beta2.RateLimitPolicyList{}
if err := eh.Client.List(ctx, rlpList); err != nil {
eh.Logger.V(1).Error(err, "failed to list RLPs")
}
for idx := range rlpList.Items {
eh.Logger.V(1).Info("queueing rate limiting policy", "policy", rlpList.Items[idx].Name)
limitingInterface.Add(ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(&rlpList.Items[idx]),
})
gwRequests := mappers.NewPolicyToParentGatewaysEventMapper(mappers.WithClient(eh.Client), mappers.WithLogger(eh.Logger)).Map(ctx, &rlpList.Items[idx])
for _, gwRequest := range gwRequests {
limitingInterface.Add(gwRequest)
}
}
}
4 changes: 0 additions & 4 deletions controllers/ratelimitpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"

"github.com/go-logr/logr"
limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -43,8 +42,6 @@ const rateLimitPolicyFinalizer = "ratelimitpolicy.kuadrant.io/finalizer"
type RateLimitPolicyReconciler struct {
*reconcilers.BaseReconciler
TargetRefReconciler reconcilers.TargetRefReconciler
// AffectedPolicyMap tracks the affected policies to report their status.
AffectedPolicyMap *kuadrant.AffectedPolicyMap
}

//+kubebuilder:rbac:groups=kuadrant.io,resources=ratelimitpolicies,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -232,7 +229,6 @@ func (r *RateLimitPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
For(&kuadrantv1beta2.RateLimitPolicy{}).
Watches(&limitadorv1alpha1.Limitador{}, limitadorStatusEventHandler{Client: r.Client(), Logger: r.Logger().WithName("limitadorStatusToRLPsEventHandler")}).
Watches(
&gatewayapiv1.HTTPRoute{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) []reconcile.Request {
Expand Down
172 changes: 102 additions & 70 deletions controllers/ratelimitpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
}
}

limitadorContainsLimit := func(ctx context.Context, limit limitadorv1alpha1.RateLimit) func(g Gomega) {
return func(g Gomega) {
limitadorKey := client.ObjectKey{Name: common.LimitadorName, Namespace: kuadrantInstallationNS}
existingLimitador := &limitadorv1alpha1.Limitador{}
g.Expect(k8sClient.Get(ctx, limitadorKey, existingLimitador)).To(Succeed())
g.Expect(existingLimitador.Spec.Limits).To(ContainElements(limit))
}
}

beforeEachCallback := func(ctx SpecContext) {
testNamespace = CreateNamespaceWithContext(ctx)
gateway = testBuildBasicGateway(gwName, testNamespace)
Expand Down Expand Up @@ -303,71 +312,92 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
})

Context("RLP Defaults", func() {
It("HTTPRoute atomic default taking precedence over Gateway defaults", func(ctx SpecContext) {
// create httproute
httpRoute := testBuildBasicHttpRoute(routeName, gwName, testNamespace, []string{"*.example.com"})
Expect(k8sClient.Create(ctx, httpRoute)).To(Succeed())
Eventually(testRouteIsAccepted(client.ObjectKeyFromObject(httpRoute))).WithContext(ctx).Should(BeTrue())

// create GW RLP
gwRLP := policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Spec.TargetRef.Kind = "Gateway"
policy.Spec.TargetRef.Name = gatewayapiv1.ObjectName(gwName)
})
Expect(k8sClient.Create(ctx, gwRLP)).To(Succeed())
rlpKey := client.ObjectKey{Name: gwRLP.Name, Namespace: testNamespace}
Eventually(assertPolicyIsAcceptedAndEnforced(ctx, rlpKey)).WithContext(ctx).Should(BeTrue())

// Create HTTPRoute RLP with new default limits
routeRLP := policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Name = "httproute-rlp"
policy.Spec.CommonSpec().Limits = map[string]kuadrantv1beta2.Limit{
"l1": {
Rates: []kuadrantv1beta2.Rate{
{
Limit: 10, Duration: 5, Unit: "second",
Describe("Route policy defaults taking precedence over Gateway policy defaults", func() {
var (
gwRLP *kuadrantv1beta2.RateLimitPolicy
routeRLP *kuadrantv1beta2.RateLimitPolicy
)

BeforeEach(func(ctx SpecContext) {
// Common setup
// GW policy defaults are overridden and not enforced when Route has their own policy attached

// create httproute
httpRoute := testBuildBasicHttpRoute(routeName, gwName, testNamespace, []string{"*.example.com"})
Expect(k8sClient.Create(ctx, httpRoute)).To(Succeed())
Eventually(testRouteIsAccepted(client.ObjectKeyFromObject(httpRoute))).WithContext(ctx).Should(BeTrue())

// create GW RLP
gwRLP = policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Spec.TargetRef.Kind = "Gateway"
policy.Spec.TargetRef.Name = gatewayapiv1.ObjectName(gwName)
})
Expect(k8sClient.Create(ctx, gwRLP)).To(Succeed())
gwRLPKey := client.ObjectKey{Name: gwRLP.Name, Namespace: testNamespace}
Eventually(assertPolicyIsAcceptedAndEnforced(ctx, gwRLPKey)).WithContext(ctx).Should(BeTrue())

// Create HTTPRoute RLP with new default limits
routeRLP = policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Name = "httproute-rlp"
policy.Spec.CommonSpec().Limits = map[string]kuadrantv1beta2.Limit{
"l1": {
Rates: []kuadrantv1beta2.Rate{
{
Limit: 10, Duration: 5, Unit: "second",
},
},
},
},
}
})
Expect(k8sClient.Create(ctx, routeRLP)).To(Succeed())
rlpKey = client.ObjectKey{Name: routeRLP.Name, Namespace: testNamespace}
Eventually(assertPolicyIsAcceptedAndEnforced(ctx, rlpKey)).WithContext(ctx).Should(BeTrue())

// Check Gateway direct back reference
gwKey := client.ObjectKeyFromObject(gateway)
existingGateway := &gatewayapiv1.Gateway{}
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, gwKey, existingGateway)).To(Succeed())
g.Expect(existingGateway.GetAnnotations()).To(HaveKeyWithValue(
gwRLP.DirectReferenceAnnotationName(), client.ObjectKeyFromObject(gwRLP).String()))
}).WithContext(ctx).Should(Succeed())

// check limits
Eventually(func(g Gomega) {
limitadorKey := client.ObjectKey{Name: common.LimitadorName, Namespace: kuadrantInstallationNS}
existingLimitador := &limitadorv1alpha1.Limitador{}
g.Expect(k8sClient.Get(ctx, limitadorKey, existingLimitador)).To(Succeed())
g.Expect(existingLimitador.Spec.Limits).To(ContainElements(limitadorv1alpha1.RateLimit{
}
})
Expect(k8sClient.Create(ctx, routeRLP)).To(Succeed())
routeRLPKey := client.ObjectKey{Name: routeRLP.Name, Namespace: testNamespace}
Eventually(assertPolicyIsAcceptedAndEnforced(ctx, routeRLPKey)).WithContext(ctx).Should(BeTrue())
Eventually(testRLPIsEnforced(ctx, gwRLPKey)).WithContext(ctx).Should(BeFalse())

// Check Gateway direct back reference
gwKey := client.ObjectKeyFromObject(gateway)
existingGateway := &gatewayapiv1.Gateway{}
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, gwKey, existingGateway)).To(Succeed())
g.Expect(existingGateway.GetAnnotations()).To(HaveKeyWithValue(
gwRLP.DirectReferenceAnnotationName(), client.ObjectKeyFromObject(gwRLP).String()))
}).WithContext(ctx).Should(Succeed())

// check limits
Eventually(limitadorContainsLimit(ctx, limitadorv1alpha1.RateLimit{
MaxValue: 10,
Seconds: 5,
Namespace: rlptools.LimitsNamespaceFromRLP(routeRLP),
Conditions: []string{`limit.l1__2804bad6 == "1"`},
Variables: []string{},
Name: rlptools.LimitsNameFromRLP(routeRLP),
}))
}).WithContext(ctx).Should(Succeed())
})).WithContext(ctx).Should(Succeed())

// Gateway should contain HTTPRoute RLP in backreference
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, gwKey, existingGateway)).To(Succeed())
serialized, err := json.Marshal(routeRLPKey)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(existingGateway.GetAnnotations()).To(HaveKey(routeRLP.BackReferenceAnnotationName()))
g.Expect(existingGateway.GetAnnotations()[routeRLP.BackReferenceAnnotationName()]).To(ContainSubstring(string(serialized)))
}).WithContext(ctx).Should(Succeed())
})

// Gateway should contain HTTPRoute RLP in backreference
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, gwKey, existingGateway)).To(Succeed())
serialized, err := json.Marshal(rlpKey)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(existingGateway.GetAnnotations()).To(HaveKey(routeRLP.BackReferenceAnnotationName()))
g.Expect(existingGateway.GetAnnotations()[routeRLP.BackReferenceAnnotationName()]).To(ContainSubstring(string(serialized)))
}).WithContext(ctx).Should(Succeed())
}, testTimeOut)
When("Free route is created", func() {
It("Gateway policy should now be enforced", func(ctx SpecContext) {
route2 := testBuildBasicHttpRoute("route2", gwName, testNamespace, []string{"*.car.com"})
Expect(k8sClient.Create(ctx, route2)).To(Succeed())
Eventually(testRLPIsEnforced(ctx, client.ObjectKeyFromObject(gwRLP))).WithContext(ctx).Should(BeTrue())
}, testTimeOut)
})

When("Route policy is deleted", func() {
It("Gateway policy should now be enforced", func(ctx SpecContext) {
Expect(k8sClient.Delete(ctx, routeRLP)).To(Succeed())
Eventually(testRLPIsEnforced(ctx, client.ObjectKeyFromObject(gwRLP))).WithContext(ctx).Should(BeTrue())
}, testTimeOut)
})
})

It("Explicit defaults - no underlying routes to enforce policy", func(ctx SpecContext) {
gwRLP := policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
Expand Down Expand Up @@ -400,16 +430,6 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
var gwRLP *kuadrantv1beta2.RateLimitPolicy
var routeRLP *kuadrantv1beta2.RateLimitPolicy

limitadorContainsLimit := func(ctx context.Context, limit limitadorv1alpha1.RateLimit) func(g Gomega) {
return func(g Gomega) {
// check limits - should contain HTTPRoute RLP values
limitadorKey := client.ObjectKey{Name: common.LimitadorName, Namespace: kuadrantInstallationNS}
existingLimitador := &limitadorv1alpha1.Limitador{}
g.Expect(k8sClient.Get(ctx, limitadorKey, existingLimitador)).To(Succeed())
g.Expect(existingLimitador.Spec.Limits).To(ContainElements(limit))
}
}

BeforeEach(func(ctx SpecContext) {
// create httproute
httpRoute := testBuildBasicHttpRoute(routeName, gwName, testNamespace, []string{"*.example.com"})
Expand All @@ -426,7 +446,7 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
routeRLP = policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Name = "httproute-rlp"
policy.Spec.CommonSpec().Limits = map[string]kuadrantv1beta2.Limit{
"l1": {
"route": {
Rates: []kuadrantv1beta2.Rate{
{
Limit: 10, Duration: 5, Unit: "second",
Expand All @@ -435,7 +455,6 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
},
}
})

})

It("Gateway atomic override - gateway overrides exist and then route policy created", func(ctx SpecContext) {
Expand Down Expand Up @@ -477,6 +496,19 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
g.Expect(existingGateway.GetAnnotations()).To(HaveKey(routeRLP.BackReferenceAnnotationName()))
g.Expect(existingGateway.GetAnnotations()[routeRLP.BackReferenceAnnotationName()]).To(ContainSubstring(string(serialized)))
}).WithContext(ctx).Should(Succeed())

// Delete GW RLP -> Route RLP should be enforced
Expect(k8sClient.Delete(ctx, gwRLP)).To(Succeed())
Eventually(testRLPIsEnforced(ctx, routeRLPKey)).WithContext(ctx).Should(BeTrue())
// check limits - should be route RLP values
Eventually(limitadorContainsLimit(ctx, limitadorv1alpha1.RateLimit{
MaxValue: 10,
Seconds: 5,
Namespace: rlptools.LimitsNamespaceFromRLP(routeRLP),
Conditions: []string{`limit.route__8a84e406 == "1"`},
Variables: []string{},
Name: rlptools.LimitsNameFromRLP(routeRLP),
})).WithContext(ctx).Should(Succeed())
}, testTimeOut)

It("Gateway atomic override - route policy exits and then gateway policy created", func(ctx SpecContext) {
Expand Down Expand Up @@ -547,7 +579,7 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
MaxValue: 10,
Seconds: 5,
Namespace: rlptools.LimitsNamespaceFromRLP(routeRLP),
Conditions: []string{`limit.l1__2804bad6 == "1"`},
Conditions: []string{`limit.route__8a84e406 == "1"`},
Variables: []string{},
Name: rlptools.LimitsNameFromRLP(routeRLP),
})).WithContext(ctx).Should(Succeed())
Expand Down Expand Up @@ -619,7 +651,7 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
MaxValue: 10,
Seconds: 5,
Namespace: rlptools.LimitsNamespaceFromRLP(routeRLP),
Conditions: []string{`limit.l1__2804bad6 == "1"`},
Conditions: []string{`limit.route__8a84e406 == "1"`},
Variables: []string{},
Name: rlptools.LimitsNameFromRLP(routeRLP),
})).WithContext(ctx).Should(Succeed())
Expand Down
Loading

0 comments on commit 037a9d0

Please sign in to comment.