Skip to content

Commit

Permalink
refactor: rate limit policy enforced status controller
Browse files Browse the repository at this point in the history
  • Loading branch information
KevFan committed May 2, 2024
1 parent 77ecda4 commit 4bf6588
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 186 deletions.
4 changes: 0 additions & 4 deletions controllers/ratelimitpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"

"github.com/go-logr/logr"
limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -43,8 +42,6 @@ const rateLimitPolicyFinalizer = "ratelimitpolicy.kuadrant.io/finalizer"
type RateLimitPolicyReconciler struct {
*reconcilers.BaseReconciler
TargetRefReconciler reconcilers.TargetRefReconciler
// AffectedPolicyMap tracks the affected policies to report their status.
AffectedPolicyMap *kuadrant.AffectedPolicyMap
}

//+kubebuilder:rbac:groups=kuadrant.io,resources=ratelimitpolicies,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -232,7 +229,6 @@ func (r *RateLimitPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
For(&kuadrantv1beta2.RateLimitPolicy{}).
Watches(&limitadorv1alpha1.Limitador{}, limitadorStatusEventHandler{Client: r.Client(), Logger: r.Logger().WithName("limitadorStatusToRLPsEventHandler")}).
Watches(
&gatewayapiv1.HTTPRoute{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) []reconcile.Request {
Expand Down
172 changes: 102 additions & 70 deletions controllers/ratelimitpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
}
}

limitadorContainsLimit := func(ctx context.Context, limit limitadorv1alpha1.RateLimit) func(g Gomega) {
return func(g Gomega) {
limitadorKey := client.ObjectKey{Name: common.LimitadorName, Namespace: kuadrantInstallationNS}
existingLimitador := &limitadorv1alpha1.Limitador{}
g.Expect(k8sClient.Get(ctx, limitadorKey, existingLimitador)).To(Succeed())
g.Expect(existingLimitador.Spec.Limits).To(ContainElements(limit))
}
}

beforeEachCallback := func(ctx SpecContext) {
testNamespace = CreateNamespaceWithContext(ctx)
gateway = testBuildBasicGateway(gwName, testNamespace)
Expand Down Expand Up @@ -303,71 +312,92 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
})

Context("RLP Defaults", func() {
It("HTTPRoute atomic default taking precedence over Gateway defaults", func(ctx SpecContext) {
// create httproute
httpRoute := testBuildBasicHttpRoute(routeName, gwName, testNamespace, []string{"*.example.com"})
Expect(k8sClient.Create(ctx, httpRoute)).To(Succeed())
Eventually(testRouteIsAccepted(client.ObjectKeyFromObject(httpRoute))).WithContext(ctx).Should(BeTrue())

// create GW RLP
gwRLP := policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Spec.TargetRef.Kind = "Gateway"
policy.Spec.TargetRef.Name = gatewayapiv1.ObjectName(gwName)
})
Expect(k8sClient.Create(ctx, gwRLP)).To(Succeed())
rlpKey := client.ObjectKey{Name: gwRLP.Name, Namespace: testNamespace}
Eventually(assertPolicyIsAcceptedAndEnforced(ctx, rlpKey)).WithContext(ctx).Should(BeTrue())

// Create HTTPRoute RLP with new default limits
routeRLP := policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Name = "httproute-rlp"
policy.Spec.CommonSpec().Limits = map[string]kuadrantv1beta2.Limit{
"l1": {
Rates: []kuadrantv1beta2.Rate{
{
Limit: 10, Duration: 5, Unit: "second",
Describe("Route policy defaults taking precedence over Gateway policy defaults", func() {
var (
gwRLP *kuadrantv1beta2.RateLimitPolicy
routeRLP *kuadrantv1beta2.RateLimitPolicy
)

BeforeEach(func(ctx SpecContext) {
// Common setup
// GW policy defaults are overridden and not enforced when Route has their own policy attached

// create httproute
httpRoute := testBuildBasicHttpRoute(routeName, gwName, testNamespace, []string{"*.example.com"})
Expect(k8sClient.Create(ctx, httpRoute)).To(Succeed())
Eventually(testRouteIsAccepted(client.ObjectKeyFromObject(httpRoute))).WithContext(ctx).Should(BeTrue())

// create GW RLP
gwRLP = policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Spec.TargetRef.Kind = "Gateway"
policy.Spec.TargetRef.Name = gatewayapiv1.ObjectName(gwName)
})
Expect(k8sClient.Create(ctx, gwRLP)).To(Succeed())
gwRLPKey := client.ObjectKey{Name: gwRLP.Name, Namespace: testNamespace}
Eventually(assertPolicyIsAcceptedAndEnforced(ctx, gwRLPKey)).WithContext(ctx).Should(BeTrue())

// Create HTTPRoute RLP with new default limits
routeRLP = policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Name = "httproute-rlp"
policy.Spec.CommonSpec().Limits = map[string]kuadrantv1beta2.Limit{
"l1": {
Rates: []kuadrantv1beta2.Rate{
{
Limit: 10, Duration: 5, Unit: "second",
},
},
},
},
}
})
Expect(k8sClient.Create(ctx, routeRLP)).To(Succeed())
rlpKey = client.ObjectKey{Name: routeRLP.Name, Namespace: testNamespace}
Eventually(assertPolicyIsAcceptedAndEnforced(ctx, rlpKey)).WithContext(ctx).Should(BeTrue())

// Check Gateway direct back reference
gwKey := client.ObjectKeyFromObject(gateway)
existingGateway := &gatewayapiv1.Gateway{}
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, gwKey, existingGateway)).To(Succeed())
g.Expect(existingGateway.GetAnnotations()).To(HaveKeyWithValue(
gwRLP.DirectReferenceAnnotationName(), client.ObjectKeyFromObject(gwRLP).String()))
}).WithContext(ctx).Should(Succeed())

// check limits
Eventually(func(g Gomega) {
limitadorKey := client.ObjectKey{Name: common.LimitadorName, Namespace: kuadrantInstallationNS}
existingLimitador := &limitadorv1alpha1.Limitador{}
g.Expect(k8sClient.Get(ctx, limitadorKey, existingLimitador)).To(Succeed())
g.Expect(existingLimitador.Spec.Limits).To(ContainElements(limitadorv1alpha1.RateLimit{
}
})
Expect(k8sClient.Create(ctx, routeRLP)).To(Succeed())
routeRLPKey := client.ObjectKey{Name: routeRLP.Name, Namespace: testNamespace}
Eventually(assertPolicyIsAcceptedAndEnforced(ctx, routeRLPKey)).WithContext(ctx).Should(BeTrue())
Eventually(testRLPIsEnforced(ctx, gwRLPKey)).WithContext(ctx).Should(BeFalse())

// Check Gateway direct back reference
gwKey := client.ObjectKeyFromObject(gateway)
existingGateway := &gatewayapiv1.Gateway{}
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, gwKey, existingGateway)).To(Succeed())
g.Expect(existingGateway.GetAnnotations()).To(HaveKeyWithValue(
gwRLP.DirectReferenceAnnotationName(), client.ObjectKeyFromObject(gwRLP).String()))
}).WithContext(ctx).Should(Succeed())

// check limits
Eventually(limitadorContainsLimit(ctx, limitadorv1alpha1.RateLimit{
MaxValue: 10,
Seconds: 5,
Namespace: rlptools.LimitsNamespaceFromRLP(routeRLP),
Conditions: []string{`limit.l1__2804bad6 == "1"`},
Variables: []string{},
Name: rlptools.LimitsNameFromRLP(routeRLP),
}))
}).WithContext(ctx).Should(Succeed())
})).WithContext(ctx).Should(Succeed())

// Gateway should contain HTTPRoute RLP in backreference
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, gwKey, existingGateway)).To(Succeed())
serialized, err := json.Marshal(routeRLPKey)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(existingGateway.GetAnnotations()).To(HaveKey(routeRLP.BackReferenceAnnotationName()))
g.Expect(existingGateway.GetAnnotations()[routeRLP.BackReferenceAnnotationName()]).To(ContainSubstring(string(serialized)))
}).WithContext(ctx).Should(Succeed())
})

// Gateway should contain HTTPRoute RLP in backreference
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, gwKey, existingGateway)).To(Succeed())
serialized, err := json.Marshal(rlpKey)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(existingGateway.GetAnnotations()).To(HaveKey(routeRLP.BackReferenceAnnotationName()))
g.Expect(existingGateway.GetAnnotations()[routeRLP.BackReferenceAnnotationName()]).To(ContainSubstring(string(serialized)))
}).WithContext(ctx).Should(Succeed())
}, testTimeOut)
When("Free route is created", func() {
It("Gateway policy should now be enforced", func(ctx SpecContext) {
route2 := testBuildBasicHttpRoute("route2", gwName, testNamespace, []string{"*.car.com"})
Expect(k8sClient.Create(ctx, route2)).To(Succeed())
Eventually(testRLPIsEnforced(ctx, client.ObjectKeyFromObject(gwRLP))).WithContext(ctx).Should(BeTrue())
}, testTimeOut)
})

When("Route policy is deleted", func() {
It("Gateway policy should now be enforced", func(ctx SpecContext) {
Expect(k8sClient.Delete(ctx, routeRLP)).To(Succeed())
Eventually(testRLPIsEnforced(ctx, client.ObjectKeyFromObject(gwRLP))).WithContext(ctx).Should(BeTrue())
}, testTimeOut)
})
})

It("Explicit defaults - no underlying routes to enforce policy", func(ctx SpecContext) {
gwRLP := policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
Expand Down Expand Up @@ -400,16 +430,6 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
var gwRLP *kuadrantv1beta2.RateLimitPolicy
var routeRLP *kuadrantv1beta2.RateLimitPolicy

limitadorContainsLimit := func(ctx context.Context, limit limitadorv1alpha1.RateLimit) func(g Gomega) {
return func(g Gomega) {
// check limits - should contain HTTPRoute RLP values
limitadorKey := client.ObjectKey{Name: common.LimitadorName, Namespace: kuadrantInstallationNS}
existingLimitador := &limitadorv1alpha1.Limitador{}
g.Expect(k8sClient.Get(ctx, limitadorKey, existingLimitador)).To(Succeed())
g.Expect(existingLimitador.Spec.Limits).To(ContainElements(limit))
}
}

BeforeEach(func(ctx SpecContext) {
// create httproute
httpRoute := testBuildBasicHttpRoute(routeName, gwName, testNamespace, []string{"*.example.com"})
Expand All @@ -426,7 +446,7 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
routeRLP = policyFactory(func(policy *kuadrantv1beta2.RateLimitPolicy) {
policy.Name = "httproute-rlp"
policy.Spec.CommonSpec().Limits = map[string]kuadrantv1beta2.Limit{
"l1": {
"route": {
Rates: []kuadrantv1beta2.Rate{
{
Limit: 10, Duration: 5, Unit: "second",
Expand All @@ -435,7 +455,6 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
},
}
})

})

It("Gateway atomic override - gateway overrides exist and then route policy created", func(ctx SpecContext) {
Expand Down Expand Up @@ -477,6 +496,19 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
g.Expect(existingGateway.GetAnnotations()).To(HaveKey(routeRLP.BackReferenceAnnotationName()))
g.Expect(existingGateway.GetAnnotations()[routeRLP.BackReferenceAnnotationName()]).To(ContainSubstring(string(serialized)))
}).WithContext(ctx).Should(Succeed())

// Delete GW RLP -> Route RLP should be enforced
Expect(k8sClient.Delete(ctx, gwRLP)).To(Succeed())
Eventually(testRLPIsEnforced(ctx, routeRLPKey)).WithContext(ctx).Should(BeTrue())
// check limits - should be route RLP values
Eventually(limitadorContainsLimit(ctx, limitadorv1alpha1.RateLimit{
MaxValue: 10,
Seconds: 5,
Namespace: rlptools.LimitsNamespaceFromRLP(routeRLP),
Conditions: []string{`limit.route__8a84e406 == "1"`},
Variables: []string{},
Name: rlptools.LimitsNameFromRLP(routeRLP),
})).WithContext(ctx).Should(Succeed())
}, testTimeOut)

It("Gateway atomic override - route policy exits and then gateway policy created", func(ctx SpecContext) {
Expand Down Expand Up @@ -547,7 +579,7 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
MaxValue: 10,
Seconds: 5,
Namespace: rlptools.LimitsNamespaceFromRLP(routeRLP),
Conditions: []string{`limit.l1__2804bad6 == "1"`},
Conditions: []string{`limit.route__8a84e406 == "1"`},
Variables: []string{},
Name: rlptools.LimitsNameFromRLP(routeRLP),
})).WithContext(ctx).Should(Succeed())
Expand Down Expand Up @@ -619,7 +651,7 @@ var _ = Describe("RateLimitPolicy controller", Ordered, func() {
MaxValue: 10,
Seconds: 5,
Namespace: rlptools.LimitsNamespaceFromRLP(routeRLP),
Conditions: []string{`limit.l1__2804bad6 == "1"`},
Conditions: []string{`limit.route__8a84e406 == "1"`},
Variables: []string{},
Name: rlptools.LimitsNameFromRLP(routeRLP),
})).WithContext(ctx).Should(Succeed())
Expand Down
10 changes: 5 additions & 5 deletions controllers/ratelimitpolicy_limitador_status_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"

kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1"
kuadrantv1beta2 "github.com/kuadrant/kuadrant-operator/api/v1beta2"
"github.com/kuadrant/kuadrant-operator/pkg/common"
"github.com/kuadrant/kuadrant-operator/pkg/library/mappers"
)

var _ handler.EventHandler = &limitadorStatusEventHandler{}
Expand Down Expand Up @@ -76,9 +76,9 @@ func (eh limitadorStatusEventHandler) enqueue(ctx context.Context, limitingInter
eh.Logger.V(1).Error(err, "failed to list RLPs")
}
for idx := range rlpList.Items {
eh.Logger.V(1).Info("queueing rate limiting policy", "policy", rlpList.Items[idx].Name)
limitingInterface.Add(ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(&rlpList.Items[idx]),
})
gwRequests := mappers.NewPolicyToParentGatewaysEventMapper(mappers.WithClient(eh.Client), mappers.WithLogger(eh.Logger)).Map(ctx, &rlpList.Items[idx])
for _, gwRequest := range gwRequests {
limitingInterface.Add(gwRequest)
}
}
}
Loading

0 comments on commit 4bf6588

Please sign in to comment.