Skip to content

Commit

Permalink
- Add resourceMapper.XXXXToIAMAuthPolicy() and XXXXToAccessLogPolicy(…
Browse files Browse the repository at this point in the history
…) methods for event handling

- Extend GetAttachedPolicy method to make it be able to map to multiple IAMAuthPolicy and AccessLogPolicy attached policies
- Add IAMAuthPolicyController stub code
  • Loading branch information
Zijun Wang committed Oct 17, 2023
1 parent c3d0ab9 commit d6cd855
Show file tree
Hide file tree
Showing 16 changed files with 605 additions and 111 deletions.
11 changes: 7 additions & 4 deletions cmd/aws-application-networking-k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,10 @@ func addOptionalCRDs(scheme *runtime.Scheme) {
Version: "v1alpha1",
}
scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.TargetGroupPolicy{}, &anv1alpha1.TargetGroupPolicyList{})
metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)

scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.VpcAssociationPolicy{}, &anv1alpha1.VpcAssociationPolicyList{})
metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)

scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.AccessLogPolicy{}, &anv1alpha1.AccessLogPolicyList{})
scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.IAMAuthPolicy{}, &anv1alpha1.IAMAuthPolicyList{})

metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)
}

Expand Down Expand Up @@ -186,6 +184,11 @@ func main() {
setupLog.Fatalf("accesslogpolicy controller setup failed: %s", err)
}

err = controllers.RegisterIAMAuthPolicyController(ctrlLog.Named("iam-auth-policy"), cloud, latticeDataStore, finalizerManager, mgr)
if err != nil {
setupLog.Fatalf("iamauthpolicy controller setup failed: %s", err)
}

go latticestore.GetDefaultLatticeDataStore().ServeIntrospection()

//+kubebuilder:scaffold:builder
Expand Down
13 changes: 12 additions & 1 deletion controllers/accesslogpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ import (
pkg_builder "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

gwvv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/aws/aws-application-networking-k8s/controllers/eventhandlers"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/aws/services"
Expand Down Expand Up @@ -88,8 +92,15 @@ func RegisterAccessLogPolicyController(
stackMarshaller: stackMarshaller,
}

gatewayEventHandler := eventhandlers.NewGatewayEventHandler(log, mgrClient)
httpRouteEventHandler := eventhandlers.NewHTTPRouteEventHandler(log, mgrClient)
grpcRouteEventHandler := eventhandlers.NewGRPCRouteEventHandler(log, mgrClient)

builder := ctrl.NewControllerManagedBy(mgr).
For(&anv1alpha1.AccessLogPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))
For(&anv1alpha1.AccessLogPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwvv1beta1.Gateway{}}, gatewayEventHandler.MapToAccessLogPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwvv1beta1.HTTPRoute{}}, httpRouteEventHandler.MapToAccessLogPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwv1alpha2.GRPCRoute{}}, grpcRouteEventHandler.MapToAccessLogPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))

return builder.Complete(r)
}
Expand Down
41 changes: 41 additions & 0 deletions controllers/eventhandlers/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"

Expand All @@ -21,6 +22,7 @@ import (
"github.com/aws/aws-application-networking-k8s/pkg/config"
)

// TODO: Remove `enqueueRequestsForGatewayEvent`, and use `gatewayEventHandler` only
type enqueueRequestsForGatewayEvent struct {
log gwlog.Logger
client client.Client
Expand Down Expand Up @@ -119,3 +121,42 @@ func (h *enqueueRequestsForGatewayEvent) enqueueImpactedRoutes(queue workqueue.R
}
}
}

type gatewayEventHandler struct {
log gwlog.Logger
client client.Client
mapper *resourceMapper
}

func NewGatewayEventHandler(log gwlog.Logger, client client.Client) *gatewayEventHandler {
return &gatewayEventHandler{log: log, client: client,
mapper: &resourceMapper{log: log, client: client}}
}

func (h *gatewayEventHandler) MapToIAMAuthPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if gw, ok := obj.(*gateway_api.Gateway); ok {
policies := h.mapper.GatewayToIAMAuthPolicies(context.Background(), gw)
for _, p := range policies {
h.log.Infof("Gateway [%s/%s] resource change triggers IAMAuthPolicy [%s/%s] resource change", gw.Namespace, gw.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}

func (h *gatewayEventHandler) MapToAccessLogPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if gw, ok := obj.(*gateway_api.Gateway); ok {
policies := h.mapper.GatewayToAccessLogPolicies(context.Background(), gw)
for _, p := range policies {
h.log.Infof("Gateway [%s/%s] resource change triggers AccessLogPolicy [%s/%s] resource change", gw.Namespace, gw.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}
53 changes: 53 additions & 0 deletions controllers/eventhandlers/grpcroute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package eventhandlers

import (
"context"

"sigs.k8s.io/gateway-api/apis/v1alpha2"

"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type grpcRouteEventHandler struct {
log gwlog.Logger
client client.Client
mapper *resourceMapper
}

func NewGRPCRouteEventHandler(log gwlog.Logger, client client.Client) *grpcRouteEventHandler {
return &grpcRouteEventHandler{log: log, client: client,
mapper: &resourceMapper{log: log, client: client}}
}

func (h *grpcRouteEventHandler) MapToIAMAuthPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if route, ok := obj.(*v1alpha2.GRPCRoute); ok {
policies := h.mapper.GRPCRouteToIAMAuthPolicies(context.Background(), route)
for _, p := range policies {
h.log.Infof("GRPCRoute [%s/%s] resource change triggers IAMAuthPolicy [%s/%s] resource change", route.Namespace, route.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}

func (h *grpcRouteEventHandler) MapToAccessLogPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if route, ok := obj.(*v1alpha2.GRPCRoute); ok {
policies := h.mapper.GRPCRouteToAccessLogPolicies(context.Background(), route)
for _, p := range policies {
h.log.Infof("GRPCRoute [%s/%s] resource change triggers AccessLogPolicy [%s/%s] resource change", route.Namespace, route.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}
53 changes: 53 additions & 0 deletions controllers/eventhandlers/httproute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package eventhandlers

import (
"context"

"sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type httpRouteEventHandler struct {
log gwlog.Logger
client client.Client
mapper *resourceMapper
}

func NewHTTPRouteEventHandler(log gwlog.Logger, client client.Client) *httpRouteEventHandler {
return &httpRouteEventHandler{log: log, client: client,
mapper: &resourceMapper{log: log, client: client}}
}

func (h *httpRouteEventHandler) MapToIAMAuthPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if route, ok := obj.(*v1beta1.HTTPRoute); ok {
policies := h.mapper.HTTPRouteToIAMAuthPolicies(context.Background(), route)
for _, p := range policies {
h.log.Infof("HTTPRoute [%s/%s] resource change triggers IAMAuthPolicy [%s/%s] resource change", route.Namespace, route.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}

func (h *httpRouteEventHandler) MapToAccessLogPolicies() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
if route, ok := obj.(*v1beta1.HTTPRoute); ok {
policies := h.mapper.HTTPRouteToAccessLogPolicies(context.Background(), route)
for _, p := range policies {
h.log.Infof("HTTPRoute [%s/%s] resource change triggers AccessLogPolicy [%s/%s] resource change", route.Namespace, route.Name, p.Namespace, p.Name)
requests = append(requests, reconcile.Request{NamespacedName: k8s.NamespacedName(p)})
}
}
return requests
})
}
33 changes: 33 additions & 0 deletions controllers/eventhandlers/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1"
mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/gateway"

"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
Expand Down Expand Up @@ -73,6 +76,36 @@ func (r *resourceMapper) VpcAssociationPolicyToGateway(ctx context.Context, vap
return policyToTargetRefObj(r, ctx, vap, &gateway_api.Gateway{})
}

func (r *resourceMapper) GatewayToIAMAuthPolicies(ctx context.Context, gw *gateway_api.Gateway) []*anv1alpha1.IAMAuthPolicy {
policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(gw), &anv1alpha1.IAMAuthPolicy{})
return policies
}

func (r *resourceMapper) GatewayToAccessLogPolicies(ctx context.Context, gw *gateway_api.Gateway) []*anv1alpha1.AccessLogPolicy {
policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(gw), &anv1alpha1.AccessLogPolicy{})
return policies
}

func (r *resourceMapper) HTTPRouteToIAMAuthPolicies(ctx context.Context, route *gateway_api.HTTPRoute) []*anv1alpha1.IAMAuthPolicy {
policy, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(route), &anv1alpha1.IAMAuthPolicy{})
return policy
}

func (r *resourceMapper) HTTPRouteToAccessLogPolicies(ctx context.Context, route *gateway_api.HTTPRoute) []*anv1alpha1.AccessLogPolicy {
policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(route), &anv1alpha1.AccessLogPolicy{})
return policies
}

func (r *resourceMapper) GRPCRouteToIAMAuthPolicies(ctx context.Context, route *gateway_api_v1alpha2.GRPCRoute) []*anv1alpha1.IAMAuthPolicy {
policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(route), &anv1alpha1.IAMAuthPolicy{})
return policies
}

func (r *resourceMapper) GRPCRouteToAccessLogPolicies(ctx context.Context, route *gateway_api_v1alpha2.GRPCRoute) []*anv1alpha1.AccessLogPolicy {
policies, _ := gateway.GetAttachedPolicies(ctx, r.client, k8s.NamespacedName(route), &anv1alpha1.AccessLogPolicy{})
return policies
}

func policyToTargetRefObj[T client.Object](r *resourceMapper, ctx context.Context, policy core.Policy, retObj T) T {
null := *new(T)
if policy == nil {
Expand Down
110 changes: 110 additions & 0 deletions controllers/iamauthpolicy_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2021.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
pkg_builder "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwvv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/aws/aws-application-networking-k8s/controllers/eventhandlers"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/deploy"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/latticestore"
lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)

const (
authPolicyFinalizer = "iamauthpolicy.k8s.aws/resources"
)

type authPolicyReconciler struct {
log gwlog.Logger
client client.Client
scheme *runtime.Scheme
finalizerManager k8s.FinalizerManager
eventRecorder record.EventRecorder
cloud aws.Cloud
dataStore *latticestore.LatticeDataStore
stackMarshaller deploy.StackMarshaller
}

func RegisterIAMAuthPolicyController(
log gwlog.Logger,
cloud aws.Cloud,
dataStore *latticestore.LatticeDataStore,
finalizerManager k8s.FinalizerManager,
mgr ctrl.Manager,
) error {
k8sClient := mgr.GetClient()
scheme := mgr.GetScheme()
evtRec := mgr.GetEventRecorderFor("iamauthpolicy")

stackMarshaller := deploy.NewDefaultStackMarshaller()

r := &authPolicyReconciler{
log: log,
client: k8sClient,
scheme: scheme,
finalizerManager: finalizerManager,
eventRecorder: evtRec,
cloud: cloud,
stackMarshaller: stackMarshaller,
dataStore: dataStore,
}

gatewayEventHandler := eventhandlers.NewGatewayEventHandler(log, k8sClient)
httpRouteEventHandler := eventhandlers.NewHTTPRouteEventHandler(log, k8sClient)
grpcRouteEventHandler := eventhandlers.NewGRPCRouteEventHandler(log, k8sClient)

builder := ctrl.NewControllerManagedBy(mgr).
For(&anv1alpha1.IAMAuthPolicy{}, pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwvv1beta1.Gateway{}}, gatewayEventHandler.MapToIAMAuthPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwvv1beta1.HTTPRoute{}}, httpRouteEventHandler.MapToIAMAuthPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &gwv1alpha2.GRPCRoute{}}, grpcRouteEventHandler.MapToIAMAuthPolicies(), pkg_builder.WithPredicates(predicate.GenerationChangedPredicate{}))
return builder.Complete(r)
}

func (r *authPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.log.Infow("reconcile", "name", req.Name)
recErr := r.reconcile(ctx, req)
res, retryErr := lattice_runtime.HandleReconcileError(recErr)
if res.RequeueAfter != 0 {
r.log.Infow("requeue request", "name", req.Name, "requeueAfter", res.RequeueAfter)
} else if res.Requeue {
r.log.Infow("requeue request", "name", req.Name)
} else if retryErr == nil {
r.log.Infow("reconciled", "name", req.Name)
}
return res, retryErr
}

func (r *authPolicyReconciler) reconcile(ctx context.Context, req ctrl.Request) error {
//TODO: implement reconcile
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (p *AccessLogPolicy) GetNamespacedName() types.NamespacedName {

func (pl *AccessLogPolicyList) GetItems() []core.Policy {
items := make([]core.Policy, len(pl.Items))
for i, item := range pl.Items {
items[i] = &item
for i := range pl.Items {
items[i] = &pl.Items[i]
}
return items
}
Loading

0 comments on commit d6cd855

Please sign in to comment.