Skip to content

Commit

Permalink
Short Drift Detection
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Apr 3, 2024
1 parent de0dcd4 commit 347747f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 9 deletions.
28 changes: 20 additions & 8 deletions pkg/controllers/nodeclaim/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand All @@ -48,7 +49,8 @@ type nodeClaimReconciler interface {
// Controller is a disruption controller that adds StatusConditions to nodeclaims when they meet certain disruption conditions
// e.g. When the NodeClaim has surpassed its owning provisioner's expirationTTL, then it is marked as "Expired" in the StatusConditions
type Controller struct {
kubeClient client.Client
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider

drift *Drift
expiration *Expiration
Expand All @@ -58,10 +60,11 @@ type Controller struct {
// NewController constructs a nodeclaim disruption controller
func NewController(clk clock.Clock, kubeClient client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider) operatorcontroller.Controller {
return operatorcontroller.Typed[*v1beta1.NodeClaim](kubeClient, &Controller{
kubeClient: kubeClient,
drift: &Drift{cloudProvider: cloudProvider},
expiration: &Expiration{kubeClient: kubeClient, clock: clk},
emptiness: &Emptiness{kubeClient: kubeClient, cluster: cluster, clock: clk},
kubeClient: kubeClient,
cloudProvider: cloudProvider,
drift: &Drift{cloudProvider: cloudProvider},
expiration: &Expiration{kubeClient: kubeClient, clock: clk},
emptiness: &Emptiness{kubeClient: kubeClient, cluster: cluster, clock: clk},
})
}

Expand Down Expand Up @@ -114,7 +117,7 @@ func (c *Controller) Name() string {
}

func (c *Controller) Builder(_ context.Context, m manager.Manager) operatorcontroller.Builder {
return operatorcontroller.Adapt(controllerruntime.
builder := controllerruntime.
NewControllerManagedBy(m).
For(&v1beta1.NodeClaim{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
Expand All @@ -125,6 +128,15 @@ func (c *Controller) Builder(_ context.Context, m manager.Manager) operatorcontr
Watches(
&v1.Pod{},
nodeclaimutil.PodEventHandler(c.kubeClient),
),
)
)
for _, ncGVK := range c.cloudProvider.GetSupportedNodeClasses() {
nodeclass := &unstructured.Unstructured{}
nodeclass.SetGroupVersionKind(ncGVK)
builder.Watches(
nodeclass,
nodeclaimutil.NodeClassEventHandler(c.kubeClient),
)
}

return operatorcontroller.Adapt(builder)
}
9 changes: 9 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ func NewOperator() (context.Context, *Operator) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "status.providerID", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Status.ProviderID}
}), "failed to setup nodeclaim provider id indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "spec.nodeClassRef.apiVersion", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Spec.NodeClassRef.APIVersion}
}), "failed to setup nodeclaim nodeclassref apiversion indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "spec.nodeClassRef.kind", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Spec.NodeClassRef.Kind}
}), "failed to setup nodeclaim nodeclassref kind indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "spec.nodeClassRef.name", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Spec.NodeClassRef.Name}
}), "failed to setup nodeclaim nodeclassref name indexer")

lo.Must0(mgr.AddReadyzCheck("manager", func(req *http.Request) error {
return lo.Ternary(mgr.GetCache().WaitForCacheSync(req.Context()), nil, fmt.Errorf("failed to sync caches"))
Expand Down
22 changes: 21 additions & 1 deletion pkg/utils/nodeclaim/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NodeEventHandler(c client.Client) handler.EventHandler {
})
}

// NodePoolEventHandler is a watcher on v1beta1.NodeClaim that maps Provisioner to NodeClaims based
// NodePoolEventHandler is a watcher on v1beta1.NodeClaim that maps NodePool to NodeClaims based
// on the v1beta1.NodePoolLabelKey and enqueues reconcile.Requests for the NodeClaim
func NodePoolEventHandler(c client.Client) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) (requests []reconcile.Request) {
Expand All @@ -89,6 +89,26 @@ func NodePoolEventHandler(c client.Client) handler.EventHandler {
})
}

// NodeClassEventHandler is a watcher on v1beta1.NodeClaim that maps NodeClass to NodeClaims based
// on the nodeClassRef and enqueues reconcile.Requests for the NodeClaim
func NodeClassEventHandler(c client.Client) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) (requests []reconcile.Request) {
nodeClaimList := &v1beta1.NodeClaimList{}
if err := c.List(ctx, nodeClaimList, client.MatchingFields{
"spec.nodeClassRef.apiVersion": o.GetObjectKind().GroupVersionKind().GroupVersion().String(),
"spec.nodeClassRef.kind": o.GetObjectKind().GroupVersionKind().Kind,
"spec.nodeClassRef.name": o.GetName(),
}); err != nil {
return requests
}
return lo.Map(nodeClaimList.Items, func(n v1beta1.NodeClaim, _ int) reconcile.Request {
return reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(&n),
}
})
})
}

// NodeNotFoundError is an error returned when no v1.Nodes are found matching the passed providerID
type NodeNotFoundError struct {
ProviderID string
Expand Down

0 comments on commit 347747f

Please sign in to comment.