Skip to content

Commit

Permalink
Merge pull request Kuadrant#621 from sergioifg94/Kuadrantgh-430
Browse files Browse the repository at this point in the history
Watch policies configured by Gateway params
  • Loading branch information
openshift-ci[bot] authored Oct 12, 2023
2 parents 600759e + 64b5a1a commit 5194972
Show file tree
Hide file tree
Showing 15 changed files with 676 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
annotations:
alm-examples: '[]'
capabilities: Basic Install
createdAt: "2023-09-21T16:23:26Z"
createdAt: "2023-10-10T15:03:19Z"
operators.operatorframework.io/builder: operator-sdk-v1.28.0
operators.operatorframework.io/project_layout: go.kubebuilder.io/v3
name: multicluster-gateway-controller.v0.0.0
Expand Down Expand Up @@ -292,6 +292,15 @@ spec:
- get
- patch
- update
- apiGroups:
- kuadrant.io
resources:
- authpolicies
- ratelimitpolicies
verbs:
- get
- list
- watch
- apiGroups:
- kuadrant.io
resources:
Expand Down
29 changes: 26 additions & 3 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ import (
clusterv1beta2 "open-cluster-management.io/api/cluster/v1beta1"
workv1 "open-cluster-management.io/api/work/v1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes/scheme"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -48,6 +53,7 @@ import (
"github.com/Kuadrant/multicluster-gateway-controller/pkg/dns/dnsprovider"
"github.com/Kuadrant/multicluster-gateway-controller/pkg/health"
"github.com/Kuadrant/multicluster-gateway-controller/pkg/placement"
"github.com/Kuadrant/multicluster-gateway-controller/pkg/policysync"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -175,10 +181,27 @@ func main() {
os.Exit(1)
}

dynamicClient := dynamic.NewForConfigOrDie(mgr.GetConfig())
dynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
dynamicClient,
0,
corev1.NamespaceAll,
nil,
)

policyInformersManager := policysync.NewPolicyInformersManager(dynamicInformerFactory)
if err := policyInformersManager.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to start policy informers manager")
os.Exit(1)
}

if err = (&gateway.GatewayReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Placement: placer,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Placement: placer,
PolicyInformersManager: policyInformersManager,
DynamicClient: dynamicClient,
WatchedPolicies: map[schema.GroupVersionResource]cache.ResourceEventHandlerRegistration{},
}).SetupWithManager(mgr, ctx); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Gateway")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,9 @@ metadata:
name: gateway-params
namespace: multi-cluster-gateways
data:
downstreamClass: "istio"
params: |
{
"policiesToSync": [
{ "group": "kuadrant.io", "version": "v1beta1", "resource": "authpolicies" }
]
}
9 changes: 9 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ rules:
- get
- patch
- update
- apiGroups:
- kuadrant.io
resources:
- authpolicies
- ratelimitpolicies
verbs:
- get
- list
- watch
- apiGroups:
- kuadrant.io
resources:
Expand Down
7 changes: 7 additions & 0 deletions pkg/_internal/slice/predicates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package slice

func EqualsTo[T comparable](x T) func(T) bool {
return func(y T) bool {
return x == y
}
}
10 changes: 10 additions & 0 deletions pkg/_internal/slice/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ func Filter[T any](slice []T, predicate func(T) bool) []T {
return result
}

func Map[T, R any](slice []T, f func(T) R) []R {
result := make([]R, len(slice))

for i, elem := range slice {
result[i] = f(elem)
}

return result
}

func MapErr[T, R any](slice []T, f func(T) (R, error)) ([]R, error) {
result := make([]R, len(slice))

Expand Down
70 changes: 67 additions & 3 deletions pkg/controllers/gateway/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -51,6 +53,7 @@ import (
"github.com/Kuadrant/multicluster-gateway-controller/pkg/_internal/slice"
"github.com/Kuadrant/multicluster-gateway-controller/pkg/apis/v1alpha1"
"github.com/Kuadrant/multicluster-gateway-controller/pkg/dns"
"github.com/Kuadrant/multicluster-gateway-controller/pkg/policysync"
)

const (
Expand Down Expand Up @@ -86,11 +89,16 @@ type GatewayPlacer interface {
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;delete
// +kubebuilder:rbac:groups="cert-manager.io",resources=certificates,verbs=get;list;watch;create;update;patch;delete

// +kubebuilder:rbac:groups="kuadrant.io",resources=authpolicies;ratelimitpolicies,verbs=get;list;watch

// GatewayReconciler reconciles a Gateway object
type GatewayReconciler struct {
client.Client
Scheme *runtime.Scheme
Placement GatewayPlacer
Scheme *runtime.Scheme
Placement GatewayPlacer
PolicyInformersManager *policysync.PolicyInformersManager
DynamicClient dynamic.Interface
WatchedPolicies map[schema.GroupVersionResource]cache.ResourceEventHandlerRegistration
}

func isDeleting(g *gatewayv1beta1.Gateway) bool {
Expand Down Expand Up @@ -367,14 +375,70 @@ func (r *GatewayReconciler) getTLSSecrets(ctx context.Context, upstreamGateway *
return tlsSecrets, listenerTLSErr
}

func (r *GatewayReconciler) reconcileParams(_ context.Context, gateway *gatewayv1beta1.Gateway, params *Params) error {
func (r *GatewayReconciler) reconcileParams(ctx context.Context, gateway *gatewayv1beta1.Gateway, params *Params) error {
log := crlog.FromContext(ctx)

downstreamClass := params.GetDownstreamClass()

// Set the annotations to sync the class name from the parameters

gateway.Spec.GatewayClassName = gatewayv1beta1.ObjectName(downstreamClass)

policiesToSync := slice.Map(params.PoliciesToSync, ParamsGroupVersionResource.ToGroupVersionResource)

for _, gvr := range policiesToSync {
// If it's already watched skip it
_, ok := r.WatchedPolicies[gvr]
if ok {
continue
}

log.Info("Creating event handler for policy", "gvr", gvr)

// Add the event handler for the policy
eventHandler := &policysync.ResourceEventHandler{
Log: log,
GVR: gvr,
Client: r.Client,
DynamicClient: r.DynamicClient,
Gateway: gateway,
Syncer: &policysync.FakeSyncer{},
}
informer := r.PolicyInformersManager.InformerFactory.ForResource(gvr).Informer()
reg, err := informer.AddEventHandler(eventHandler)
if err != nil {
return err
}

// Start the informer
if err := r.PolicyInformersManager.AddInformer(informer); err != nil {
return err
}

// Keep track of the watched policy
r.WatchedPolicies[gvr] = reg
}

// Stop watching policies if they're removed from the params
policiesToUnwatch := []schema.GroupVersionResource{}
for gvr, reg := range r.WatchedPolicies {
if slice.Contains(policiesToSync, slice.EqualsTo(gvr)) {
continue
}

log.Info("Stopping watch for policy", "gvr", gvr)

if err := r.PolicyInformersManager.InformerFactory.ForResource(gvr).Informer().RemoveEventHandler(reg); err != nil {
return err
}

policiesToUnwatch = append(policiesToUnwatch, gvr)
}

for _, gvr := range policiesToUnwatch {
delete(r.WatchedPolicies, gvr)
}

return nil
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/controllers/gateway/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@ type Params struct {
// DownstreamClass specifies what GatewayClassName to set in the
// downstream clusters. For example:
DownstreamClass string `json:"downstreamClass,omitempty"`

// PoliciesToSync specifies a listof Policy GVRs that will be watched
// in the hub and synced to the spokes
PoliciesToSync []ParamsGroupVersionResource `json:"policiesToSync,omitempty"`
}

type ParamsGroupVersionResource struct {
Group string `json:"group"`
Version string `json:"version"`
Resource string `json:"resource"`
}

func (gvr ParamsGroupVersionResource) ToGroupVersionResource() schema.GroupVersionResource {
return schema.GroupVersionResource{
Group: gvr.Group,
Version: gvr.Version,
Resource: gvr.Resource,
}
}

func (p *Params) GetDownstreamClass() string {
Expand Down
82 changes: 82 additions & 0 deletions pkg/policysync/eventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package policysync

import (
"context"
"fmt"

"github.com/go-logr/logr"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

type ResourceEventHandler struct {
Log logr.Logger
GVR schema.GroupVersionResource
Client client.Client
DynamicClient dynamic.Interface
Gateway *gatewayv1beta1.Gateway

Syncer Syncer
}

var _ cache.ResourceEventHandler = &ResourceEventHandler{}

func (h *ResourceEventHandler) OnAdd(reqObj interface{}) {
h.Log.Info("Got watch event for policy", "obj", reqObj)

ctx := context.Background()

obj, ok := reqObj.(client.Object)
if !ok {
h.Log.Error(fmt.Errorf("object %v does not inplement client.Object", reqObj), "")
return
}

if err := h.Client.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
h.Log.Error(err, "failed to get object", "object", obj)
}

policy, err := NewPolicyFor(obj)
if err != nil {
h.Log.Error(err, "failed to build policy from watched object", "object", obj)
return
}

if err := h.Syncer.SyncPolicy(ctx, h.Client, policy); err != nil {
h.Log.Error(err, "failed to sync policy", "policy", policy)
}
}

func (h *ResourceEventHandler) OnDelete(obj interface{}) {
h.Log.Info("Got watch event for policy", "obj", obj)
}

func (h *ResourceEventHandler) OnUpdate(_ interface{}, reqObj interface{}) {
h.Log.Info("Got watch event for policy", "obj", reqObj)

ctx := context.Background()

obj, ok := reqObj.(client.Object)
if !ok {
h.Log.Error(fmt.Errorf("object %v does not inplement client.Object", reqObj), "")
return
}

if err := h.Client.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
h.Log.Error(err, "failed to get object", "object", obj)
}

policy, err := NewPolicyFor(obj)
if err != nil {
h.Log.Error(err, "failed to build policy from watched object", "object", obj)
return
}

if err := h.Syncer.SyncPolicy(ctx, h.Client, policy); err != nil {
h.Log.Error(err, "failed to sync policy", "policy", policy)
}
}
Loading

0 comments on commit 5194972

Please sign in to comment.