diff --git a/Dockerfile b/Dockerfile index cefa1ca8..397edb5a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM golang:1.19 as builder +FROM golang:1.20 as builder LABEL org.opencontainers.image.source=https://github.com/kong/blixt LABEL org.opencontainers.image.description="An experimental layer 4 load-balancer built using eBPF/XDP with ebpf-go \ diff --git a/config/extras/dataplane.yaml b/config/extras/dataplane.yaml new file mode 100644 index 00000000..faa0266b --- /dev/null +++ b/config/extras/dataplane.yaml @@ -0,0 +1,30 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: extra-dataplane + namespace: system + labels: + app: blixt + component: dataplane +spec: + selector: + matchLabels: + app: blixt + component: dataplane + template: + metadata: + labels: + app: blixt + component: dataplane + spec: + hostNetwork: true + containers: + - name: dataplane + image: ghcr.io/kong/blixt-dataplane:latest + securityContext: + privileged: true + args: ["-i", "eth0"] + env: + - name: RUST_LOG + value: debug + imagePullPolicy: IfNotPresent diff --git a/controllers/dataplane_controller.go b/controllers/dataplane_controller.go new file mode 100644 index 00000000..b30eda30 --- /dev/null +++ b/controllers/dataplane_controller.go @@ -0,0 +1,154 @@ +package controllers + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + dataplane "github.com/kong/blixt/internal/dataplane/client" + "github.com/kong/blixt/pkg/vars" +) + +//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/finalizers,verbs=update + +//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=core,resources=services/status,verbs=get + +//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=core,resources=endpoints/status,verbs=get + +// DataplaneReconciler reconciles the dataplane pods. +type DataplaneReconciler struct { + client.Client + Scheme *runtime.Scheme + + BackendsClientManager *dataplane.BackendsClientManager + + updates chan event.GenericEvent +} + +func NewDataplaneReconciler(client client.Client, schema *runtime.Scheme, manager *dataplane.BackendsClientManager) *DataplaneReconciler { + return &DataplaneReconciler{ + Client: client, + Scheme: schema, + BackendsClientManager: manager, + updates: make(chan event.GenericEvent, 1000), + } +} + +var ( + podOwnerKey = ".metadata.controller" + apiGVStr = appsv1.SchemeGroupVersion.String() +) + +// SetupWithManager loads the controller into the provided controller manager. +func (r *DataplaneReconciler) SetupWithManager(mgr ctrl.Manager) error { + + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, podOwnerKey, func(rawObj client.Object) []string { + // grab the pod object, extract the owner... + pod := rawObj.(*corev1.Pod) + owner := metav1.GetControllerOf(pod) + if owner == nil { + return nil + } + // ...make sure it's a DaemonSet... + if owner.APIVersion != apiGVStr || owner.Kind != "DaemonSet" { + return nil + } + + // ...and if so, return it + return []string{owner.Name} + }); err != nil { + return err + } + + return ctrl.NewControllerManagedBy(mgr). + For(&appsv1.DaemonSet{}, + builder.WithPredicates(predicate.NewPredicateFuncs(r.daemonsetHasMatchingAnnotations)), + ). + Complete(r) +} + +func (r *DataplaneReconciler) daemonsetHasMatchingAnnotations(obj client.Object) bool { + daemonset, ok := obj.(*appsv1.DaemonSet) + if !ok { + return false + } + + // determine if this is a blixt daemonset + matchLabels := daemonset.Spec.Selector.MatchLabels + app, ok := matchLabels["app"] + if !ok || app != vars.DefaultDataPlaneAppLabel { + return false + } + + // verify that it's the dataplane daemonset + component, ok := matchLabels["component"] + if !ok || component != vars.DefaultDataPlaneComponentLabel { + return false + } + + return true +} + +// Reconcile provisions (and de-provisions) resources relevant to this controller. +func (r *DataplaneReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + ds := new(appsv1.DaemonSet) + if err := r.Client.Get(ctx, req.NamespacedName, ds); err != nil { + if errors.IsNotFound(err) { + logger.Info("DataplaneReconciler", "reconcile status", "object enqueued no longer exists, skipping") + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + var childPods corev1.PodList + if err := r.List(ctx, &childPods, client.InNamespace(req.Namespace), client.MatchingFields{podOwnerKey: req.Name}); err != nil { + logger.Error(err, "DataplaneReconciler", "reconcile status", "unable to list child pods") + return ctrl.Result{}, err + } + + readyPodByNN := make(map[types.NamespacedName]corev1.Pod) + for _, pod := range childPods.Items { + for _, container := range pod.Status.ContainerStatuses { + if container.Name == vars.DefaultDataPlaneComponentLabel && container.Ready { + key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + readyPodByNN[key] = pod + } + } + } + + logger.Info("DataplaneReconciler", "reconcile status", "setting updated backends client list", "num ready pods", len(readyPodByNN)) + updated, err := r.BackendsClientManager.SetClientsList(ctx, readyPodByNN) + if updated { + logger.Info("DataplaneReconciler", "reconcile status", "backends client list updated, sending generic event") + r.updates <- event.GenericEvent{Object: ds} + logger.Info("DataplaneReconciler", "reconcile status", "generic event sent") + } + if err != nil { + logger.Error(err, "DataplaneReconciler", "reconcile status", "partial failure for backends client list update") + return ctrl.Result{Requeue: true}, err + } + + logger.Info("DataplaneReconciler", "reconcile status", "done") + return ctrl.Result{}, nil +} + +func (r *DataplaneReconciler) GetUpdates() <-chan event.GenericEvent { + return r.updates +} diff --git a/controllers/tcproute_controller.go b/controllers/tcproute_controller.go index 6f015ece..1a0d4c26 100644 --- a/controllers/tcproute_controller.go +++ b/controllers/tcproute_controller.go @@ -6,15 +6,16 @@ import ( "time" "github.com/go-logr/logr" - appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -35,7 +36,9 @@ type TCPRouteReconciler struct { client.Client Scheme *runtime.Scheme - log logr.Logger + log logr.Logger + ReconcileRequestChan <-chan event.GenericEvent + BackendsClientManager *dataplane.BackendsClientManager } // SetupWithManager sets up the controller with the Manager. @@ -44,8 +47,8 @@ func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&gatewayv1alpha2.TCPRoute{}). - Watches( - &appsv1.DaemonSet{}, + WatchesRawSource( + &source.Channel{Source: r.ReconcileRequestChan}, handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToTCPRoutes), ). Watches( @@ -185,18 +188,11 @@ func (r *TCPRouteReconciler) ensureTCPRouteConfiguredInDataPlane(ctx context.Con return err } - // TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46 - dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client) - if err != nil { - return err - } - - confirmation, err := dataplaneClient.Update(context.Background(), targets) - if err != nil { + if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil { return err } - r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String())) + r.log.Info("successful data-plane UPDATE") return nil } @@ -208,19 +204,12 @@ func (r *TCPRouteReconciler) ensureTCPRouteDeletedInDataPlane(ctx context.Contex return err } - // TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46 - dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client) - if err != nil { - return err - } - // delete the target from the dataplane - confirmation, err := dataplaneClient.Delete(context.Background(), targets.Vip) - if err != nil { + if _, err = r.BackendsClientManager.Delete(ctx, targets.Vip); err != nil { return err } - r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String())) + r.log.Info("successful data-plane DELETE") oldFinalizers := tcproute.GetFinalizers() newFinalizers := make([]string, 0, len(oldFinalizers)-1) diff --git a/controllers/udproute_controller.go b/controllers/udproute_controller.go index f2f7b02e..c6dde3c3 100644 --- a/controllers/udproute_controller.go +++ b/controllers/udproute_controller.go @@ -6,15 +6,16 @@ import ( "time" "github.com/go-logr/logr" - appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -35,7 +36,9 @@ type UDPRouteReconciler struct { client.Client Scheme *runtime.Scheme - log logr.Logger + log logr.Logger + ReconcileRequestChan <-chan event.GenericEvent + BackendsClientManager *dataplane.BackendsClientManager } // SetupWithManager sets up the controller with the Manager. @@ -44,8 +47,8 @@ func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&gatewayv1alpha2.UDPRoute{}). - Watches( - &appsv1.DaemonSet{}, + WatchesRawSource( + &source.Channel{Source: r.ReconcileRequestChan}, handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToUDPRoutes), ). Watches( @@ -185,18 +188,11 @@ func (r *UDPRouteReconciler) ensureUDPRouteConfiguredInDataPlane(ctx context.Con return err } - // TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46 - dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client) - if err != nil { - return err - } - - confirmation, err := dataplaneClient.Update(context.Background(), targets) - if err != nil { + if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil { return err } - r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String())) + r.log.Info("successful data-plane UPDATE") return nil } @@ -208,19 +204,12 @@ func (r *UDPRouteReconciler) ensureUDPRouteDeletedInDataPlane(ctx context.Contex return err } - // TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46 - dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client) - if err != nil { - return err - } - // delete the target from the dataplane - confirmation, err := dataplaneClient.Delete(context.Background(), targets.Vip) - if err != nil { + if _, err = r.BackendsClientManager.Delete(ctx, targets.Vip); err != nil { return err } - r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String())) + r.log.Info("successful data-plane DELETE") oldFinalizers := udproute.GetFinalizers() newFinalizers := make([]string, 0, len(oldFinalizers)-1) diff --git a/controllers/udproute_controller_watch_utils.go b/controllers/udproute_controller_watch_utils.go index f42a77a7..9c8fc53a 100644 --- a/controllers/udproute_controller_watch_utils.go +++ b/controllers/udproute_controller_watch_utils.go @@ -5,13 +5,14 @@ import ( "fmt" "reflect" - "github.com/kong/blixt/pkg/vars" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/kong/blixt/pkg/vars" ) // mapDataPlaneDaemonsetToUDPRoutes is a mapping function to map dataplane diff --git a/go.mod b/go.mod index 583f34bc..65c1651e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/kong/blixt -go 1.19 +go 1.20 require ( github.com/go-logr/logr v1.2.4 diff --git a/internal/dataplane/client/client.go b/internal/dataplane/client/client.go index c7986085..1e87e294 100644 --- a/internal/dataplane/client/client.go +++ b/internal/dataplane/client/client.go @@ -2,64 +2,212 @@ package client import ( "context" + "errors" "fmt" + "sync" + "github.com/go-logr/logr" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/log" "github.com/kong/blixt/pkg/vars" ) -// NewDataPlaneClient provides a new client for communicating with the grpc API -// of the data-plane given a function which can provide the API endpoint. -func NewDataPlaneClient(ctx context.Context, c client.Client) (BackendsClient, error) { - endpoints, err := GetDataPlaneEndpointsFromDefaultPods(ctx, c) +// clientInfo encapsulates the gathered information about a BackendsClient +// along with the gRPC client connection. +type clientInfo struct { + conn *grpc.ClientConn + client BackendsClient + name string +} + +// BackendsClientManager is managing the connections and interactions with +// the available BackendsClient servers. +type BackendsClientManager struct { + log logr.Logger + clientset *kubernetes.Clientset + + mu sync.RWMutex + clients map[types.NamespacedName]clientInfo +} + +// NewBackendsClientManager returns an initialized instance of BackendsClientManager. +func NewBackendsClientManager(config *rest.Config) (*BackendsClientManager, error) { + clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } - if len(endpoints) < 1 { - return nil, fmt.Errorf("no endpoints could be found for the dataplane API") + return &BackendsClientManager{ + log: log.FromContext(context.Background()), + clientset: clientset, + mu: sync.RWMutex{}, + clients: map[types.NamespacedName]clientInfo{}, + }, nil +} + +func (c *BackendsClientManager) SetClientsList(ctx context.Context, readyPods map[types.NamespacedName]corev1.Pod) (bool, error) { + // TODO: close and connect to the different clients concurrently. + + clientListUpdated := false + var err error + + // Remove old clients + for nn, backendInfo := range c.clients { + if _, ok := readyPods[nn]; !ok { + c.mu.Lock() + delete(c.clients, nn) + c.mu.Unlock() + + if closeErr := backendInfo.conn.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + continue + } + clientListUpdated = true + } } - if len(endpoints) > 1 { - return nil, fmt.Errorf("TODO: multiple endpoints not currently supported") + // Add new clients + for _, pod := range readyPods { + key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + if _, ok := c.clients[key]; !ok { + + if pod.Status.PodIP == "" { + continue + } + + endpoint := fmt.Sprintf("%s:%d", pod.Status.PodIP, vars.DefaultDataPlaneAPIPort) + c.log.Info("BackendsClientManager", "status", "connecting", "pod", pod.GetName(), "endpoint", endpoint) + + conn, dialErr := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + if dialErr != nil { + c.log.Error(dialErr, "BackendsClientManager", "status", "connection failure", "pod", pod.GetName()) + err = errors.Join(err, dialErr) + continue + } + + c.mu.Lock() + c.clients[key] = clientInfo{ + conn: conn, + client: NewBackendsClient(conn), + name: pod.Name, + } + c.mu.Unlock() + + c.log.Info("BackendsClientManager", "status", "connected", "pod", pod.GetName()) + + clientListUpdated = true + } } - endpoint := endpoints[0] - // TODO: mTLS https://github.com/Kong/blixt/issues/50 - conn, err := grpc.Dial(endpoint, grpc.WithInsecure(), grpc.WithBlock()) //nolint:staticcheck - if err != nil { - return nil, err + return clientListUpdated, err +} + +func (c *BackendsClientManager) Close() { + c.log.Info("BackendsClientManager", "status", "shutting down") + + c.mu.Lock() + defer c.mu.Unlock() + + var wg sync.WaitGroup + wg.Add(len(c.clients)) + + for key, cc := range c.clients { + go func(cc clientInfo) { + defer wg.Done() + cc.conn.Close() + }(cc) + + delete(c.clients, key) } - client := NewBackendsClient(conn) + wg.Wait() - return client, nil + c.log.Info("BackendsClientManager", "status", "shutdown completed") } -// GetDataPlaneEndpointsFromDefaultPods provides a list of endpoints for the -// dataplane API assuming all the default deployment settings (e.g., namespace, -// API port, e.t.c.). -func GetDataPlaneEndpointsFromDefaultPods(ctx context.Context, c client.Client) (endpoints []string, err error) { - pods := new(corev1.PodList) - if err = c.List(context.Background(), pods, client.MatchingLabels{ - "app": vars.DefaultDataPlaneAppLabel, - "component": vars.DefaultDataPlaneComponentLabel, - }, client.InNamespace(vars.DefaultNamespace)); err != nil { - return +func (c *BackendsClientManager) getClientsInfo() []clientInfo { + c.mu.RLock() + defer c.mu.RUnlock() + + backends := make([]clientInfo, 0, len(c.clients)) + for _, backendClient := range c.clients { + backends = append(backends, backendClient) } - for _, pod := range pods.Items { - if pod.Status.PodIP == "" { - err = fmt.Errorf("pod %s/%s doesn't have an IP yet", pod.Namespace, pod.Name) - return - } + return backends +} + +// Update sends an update request to all available BackendsClient servers concurrently. +func (c *BackendsClientManager) Update(ctx context.Context, in *Targets, opts ...grpc.CallOption) (*Confirmation, error) { + clientsInfo := c.getClientsInfo() + + var wg sync.WaitGroup + wg.Add(len(clientsInfo)) + + errs := make(chan error, len(clientsInfo)) + + for _, ci := range clientsInfo { + go func(ci clientInfo) { + defer wg.Done() + + conf, err := ci.client.Update(ctx, in, opts...) + if err != nil { + c.log.Error(err, "BackendsClientManager", "operation", "update", "pod", ci.name) + errs <- err + return + } + c.log.Info("BackendsClientManager", "operation", "update", "pod", ci.name, "confirmation", conf.Confirmation) + }(ci) + } + + wg.Wait() + close(errs) + + var err error + for e := range errs { + err = errors.Join(err, e) + } + + return nil, err +} + +// Delete sends an delete request to all available BackendsClient servers concurrently. +func (c *BackendsClientManager) Delete(ctx context.Context, in *Vip, opts ...grpc.CallOption) (*Confirmation, error) { + clientsInfo := c.getClientsInfo() + + var wg sync.WaitGroup + wg.Add(len(clientsInfo)) + + errs := make(chan error, len(clientsInfo)) + + for _, ci := range clientsInfo { + go func(ci clientInfo) { + defer wg.Done() + + conf, err := ci.client.Delete(ctx, in, opts...) + if err != nil { + c.log.Error(err, "BackendsClientManager", "operation", "delete", "pod", ci.name) + errs <- err + return + } + c.log.Info("BackendsClientManager", "operation", "delete", "pod", ci.name, "confirmation", conf.Confirmation) + + }(ci) + } + + wg.Wait() + close(errs) - newEndpoint := fmt.Sprintf("%s:%d", pod.Status.PodIP, vars.DefaultDataPlaneAPIPort) - endpoints = append(endpoints, newEndpoint) + var err error + for e := range errs { + err = errors.Join(err, e) } - return + return nil, err } diff --git a/main.go b/main.go index c8cbb6a9..2175bc83 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "os" @@ -18,6 +19,7 @@ import ( gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" "github.com/kong/blixt/controllers" + "github.com/kong/blixt/internal/dataplane/client" //+kubebuilder:scaffold:imports ) @@ -51,7 +53,8 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + cfg := ctrl.GetConfigOrDie() + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, @@ -75,6 +78,21 @@ func main() { os.Exit(1) } + clientsManager, err := client.NewBackendsClientManager(cfg) + if err != nil { + setupLog.Error(err, "unable to create backends client manager") + } + defer clientsManager.Close() + + dataplaneReconciler := controllers.NewDataplaneReconciler(mgr.GetClient(), mgr.GetScheme(), clientsManager) + if err = dataplaneReconciler.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Dataplane") + os.Exit(1) + } + + ctx := ctrl.SetupSignalHandler() + udpReconcileRequestChan, tcpReconcileRequestChan := Tee(ctx, dataplaneReconciler.GetUpdates()) + if err = (&controllers.GatewayReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -90,15 +108,19 @@ func main() { os.Exit(1) } if err = (&controllers.UDPRouteReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ReconcileRequestChan: udpReconcileRequestChan, + BackendsClientManager: clientsManager, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "UDPRoute") os.Exit(1) } if err = (&controllers.TCPRouteReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ReconcileRequestChan: tcpReconcileRequestChan, + BackendsClientManager: clientsManager, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TCPRoute") os.Exit(1) @@ -115,8 +137,60 @@ func main() { } setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } } + +// Tee consumes the received channel and mirrors the messages into 2 new channels. +func Tee[T any](ctx context.Context, in <-chan T) (_, _ <-chan T) { + out1, out2 := make(chan T), make(chan T) + + OrDone := func(ctx context.Context, in <-chan T) <-chan T { + out := make(chan T) + go func() { + defer close(out) + + for { + select { + case <-ctx.Done(): + return + case i, ok := <-in: + if !ok { + return + } + select { + case out <- i: + case <-ctx.Done(): + } + } + } + }() + return out + } + + go func() { + defer close(out1) + defer close(out2) + + for val := range OrDone(ctx, in) { + select { + case <-ctx.Done(): + + case out1 <- val: + select { + case <-ctx.Done(): + case out2 <- val: + } + + case out2 <- val: + select { + case <-ctx.Done(): + case out1 <- val: + } + } + } + }() + return out1, out2 +}