Skip to content

Commit

Permalink
Add dataplane controller and multi gRPC clients manager
Browse files Browse the repository at this point in the history
  • Loading branch information
levikobi committed Aug 17, 2023
1 parent 73eca32 commit 8d65029
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 87 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.19 as builder
FROM golang:1.20 as builder

LABEL org.opencontainers.image.source=https://github.com/kong/blixt
LABEL org.opencontainers.image.description="An experimental layer 4 load-balancer built using eBPF/XDP with ebpf-go \
Expand Down
154 changes: 154 additions & 0 deletions controllers/dataplane_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package controllers

import (
"context"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

dataplane "github.com/kong/blixt/internal/dataplane/client"
"github.com/kong/blixt/pkg/vars"
)

//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways/finalizers,verbs=update

//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services/status,verbs=get

//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=endpoints/status,verbs=get

// DataplaneReconciler reconciles the dataplane pods.
type DataplaneReconciler struct {
client.Client
Scheme *runtime.Scheme

BackendsClientManager *dataplane.BackendsClientManager

updates chan event.GenericEvent
}

func NewDataplaneReconciler(client client.Client, schema *runtime.Scheme, manager *dataplane.BackendsClientManager) *DataplaneReconciler {
return &DataplaneReconciler{
Client: client,
Scheme: schema,
BackendsClientManager: manager,
updates: make(chan event.GenericEvent, 1000),
}
}

var (
podOwnerKey = ".metadata.controller"
apiGVStr = appsv1.SchemeGroupVersion.String()
)

// SetupWithManager loads the controller into the provided controller manager.
func (r *DataplaneReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, podOwnerKey, func(rawObj client.Object) []string {
// grab the pod object, extract the owner...
pod := rawObj.(*corev1.Pod)
owner := metav1.GetControllerOf(pod)
if owner == nil {
return nil
}
// ...make sure it's a DaemonSet...
if owner.APIVersion != apiGVStr || owner.Kind != "DaemonSet" {
return nil
}

// ...and if so, return it
return []string{owner.Name}
}); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&appsv1.DaemonSet{},
builder.WithPredicates(predicate.NewPredicateFuncs(r.daemonsetHasMatchingAnnotations)),
).
Complete(r)
}

func (r *DataplaneReconciler) daemonsetHasMatchingAnnotations(obj client.Object) bool {
daemonset, ok := obj.(*appsv1.DaemonSet)
if !ok {
return false
}

// determine if this is a blixt daemonset
matchLabels := daemonset.Spec.Selector.MatchLabels
app, ok := matchLabels["app"]
if !ok || app != vars.DefaultDataPlaneAppLabel {
return false
}

// verify that it's the dataplane daemonset
component, ok := matchLabels["component"]
if !ok || component != vars.DefaultDataPlaneComponentLabel {
return false
}

return true
}

// Reconcile provisions (and de-provisions) resources relevant to this controller.
func (r *DataplaneReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

ds := new(appsv1.DaemonSet)
if err := r.Client.Get(ctx, req.NamespacedName, ds); err != nil {
if errors.IsNotFound(err) {
logger.Info("DataplaneReconciler", "reconcile status", "object enqueued no longer exists, skipping")
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

var childPods corev1.PodList
if err := r.List(ctx, &childPods, client.InNamespace(req.Namespace), client.MatchingFields{podOwnerKey: req.Name}); err != nil {
logger.Error(err, "DataplaneReconciler", "reconcile status", "unable to list child pods")
return ctrl.Result{}, err
}

readyPodByNN := make(map[types.NamespacedName]corev1.Pod)
for _, pod := range childPods.Items {
for _, container := range pod.Status.ContainerStatuses {
if container.Name == vars.DefaultDataPlaneComponentLabel && container.Ready {
key := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
readyPodByNN[key] = pod
}
}
}

logger.Info("DataplaneReconciler", "reconcile status", "setting updated backends client list", "num ready pods", len(readyPodByNN))
updated, err := r.BackendsClientManager.SetClientsList(ctx, readyPodByNN)
if updated {
logger.Info("DataplaneReconciler", "reconcile status", "backends client list updated, sending generic event")
r.updates <- event.GenericEvent{Object: ds}
logger.Info("DataplaneReconciler", "reconcile status", "generic event sent")
}
if err != nil {
logger.Error(err, "DataplaneReconciler", "reconcile status", "partial failure for backends client list update")
return ctrl.Result{Requeue: true}, err
}

logger.Info("DataplaneReconciler", "reconcile status", "done")
return ctrl.Result{}, nil
}

func (r *DataplaneReconciler) GetUpdates() <-chan event.GenericEvent {
return r.updates
}
33 changes: 11 additions & 22 deletions controllers/tcproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

Expand All @@ -35,7 +36,9 @@ type TCPRouteReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
log logr.Logger
ReconcileRequestChan <-chan event.GenericEvent
BackendsClientManager *dataplane.BackendsClientManager
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -44,8 +47,8 @@ func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
For(&gatewayv1alpha2.TCPRoute{}).
Watches(
&appsv1.DaemonSet{},
WatchesRawSource(
&source.Channel{Source: r.ReconcileRequestChan},
handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToTCPRoutes),
).
Watches(
Expand Down Expand Up @@ -185,18 +188,11 @@ func (r *TCPRouteReconciler) ensureTCPRouteConfiguredInDataPlane(ctx context.Con
return err
}

// TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
return err
}

confirmation, err := dataplaneClient.Update(context.Background(), targets)
if err != nil {
if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane UPDATE")

return nil
}
Expand All @@ -208,19 +204,12 @@ func (r *TCPRouteReconciler) ensureTCPRouteDeletedInDataPlane(ctx context.Contex
return err
}

// TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
return err
}

// delete the target from the dataplane
confirmation, err := dataplaneClient.Delete(context.Background(), targets.Vip)
if err != nil {
if _, err = r.BackendsClientManager.Delete(ctx, targets.Vip); err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane DELETE")

oldFinalizers := tcproute.GetFinalizers()
newFinalizers := make([]string, 0, len(oldFinalizers)-1)
Expand Down
33 changes: 11 additions & 22 deletions controllers/udproute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

Expand All @@ -35,7 +36,9 @@ type UDPRouteReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
log logr.Logger
ReconcileRequestChan <-chan event.GenericEvent
BackendsClientManager *dataplane.BackendsClientManager
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -44,8 +47,8 @@ func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
For(&gatewayv1alpha2.UDPRoute{}).
Watches(
&appsv1.DaemonSet{},
WatchesRawSource(
&source.Channel{Source: r.ReconcileRequestChan},
handler.EnqueueRequestsFromMapFunc(r.mapDataPlaneDaemonsetToUDPRoutes),
).
Watches(
Expand Down Expand Up @@ -185,18 +188,11 @@ func (r *UDPRouteReconciler) ensureUDPRouteConfiguredInDataPlane(ctx context.Con
return err
}

// TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
return err
}

confirmation, err := dataplaneClient.Update(context.Background(), targets)
if err != nil {
if _, err = r.BackendsClientManager.Update(ctx, targets); err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane UPDATE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane UPDATE")

return nil
}
Expand All @@ -208,19 +204,12 @@ func (r *UDPRouteReconciler) ensureUDPRouteDeletedInDataPlane(ctx context.Contex
return err
}

// TODO: add multiple endpoint support https://github.com/Kong/blixt/issues/46
dataplaneClient, err := dataplane.NewDataPlaneClient(context.Background(), r.Client)
if err != nil {
return err
}

// delete the target from the dataplane
confirmation, err := dataplaneClient.Delete(context.Background(), targets.Vip)
if err != nil {
if _, err = r.BackendsClientManager.Delete(ctx, targets.Vip); err != nil {
return err
}

r.log.Info(fmt.Sprintf("successful data-plane DELETE, confirmation: %s", confirmation.String()))
r.log.Info("successful data-plane DELETE")

oldFinalizers := udproute.GetFinalizers()
newFinalizers := make([]string, 0, len(oldFinalizers)-1)
Expand Down
3 changes: 2 additions & 1 deletion controllers/udproute_controller_watch_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"fmt"
"reflect"

"github.com/kong/blixt/pkg/vars"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/kong/blixt/pkg/vars"
)

// mapDataPlaneDaemonsetToUDPRoutes is a mapping function to map dataplane
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/kong/blixt

go 1.19
go 1.20

require (
github.com/go-logr/logr v1.2.4
Expand Down
Loading

0 comments on commit 8d65029

Please sign in to comment.