diff --git a/cmd/controller/main.go b/cmd/controller/main.go index cd024282da96..00a6509bbc7a 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -29,6 +29,7 @@ import ( "github.com/aws/karpenter/pkg/controllers/provisioning" "github.com/aws/karpenter/pkg/controllers/selection" "github.com/aws/karpenter/pkg/controllers/termination" + "github.com/aws/karpenter/pkg/controllers/volume" "github.com/aws/karpenter/pkg/utils/injection" "github.com/aws/karpenter/pkg/utils/options" "github.com/go-logr/zapr" @@ -91,6 +92,7 @@ func main() { if err := manager.RegisterControllers(ctx, provisioningController, selection.NewController(manager.GetClient(), provisioningController), + volume.NewController(manager.GetClient()), termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider), node.NewController(manager.GetClient()), metrics.NewController(manager.GetClient(), cloudProvider), diff --git a/pkg/controllers/manager.go b/pkg/controllers/manager.go index ffa0d1ad2a77..1040c227650c 100644 --- a/pkg/controllers/manager.go +++ b/pkg/controllers/manager.go @@ -36,7 +36,9 @@ func NewManagerOrDie(ctx context.Context, config *rest.Config, options controlle if err != nil { panic(fmt.Sprintf("Failed to create controller newManager, %s", err.Error())) } - if err := newManager.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.nodeName", podSchedulingIndex); err != nil { + if err := newManager.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.nodeName", func(o client.Object) []string { + return []string{o.(*v1.Pod).Spec.NodeName} + }); err != nil { panic(fmt.Sprintf("Failed to setup pod indexer, %s", err.Error())) } return &GenericControllerManager{Manager: newManager} @@ -57,11 +59,3 @@ func (m *GenericControllerManager) RegisterControllers(ctx context.Context, cont } return m } - -func podSchedulingIndex(object client.Object) []string { - pod, ok := object.(*v1.Pod) - if !ok { - return nil - } - return []string{pod.Spec.NodeName} -} diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 0e857b669f2b..4019616c8a7a 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -202,11 +202,11 @@ func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constrai return p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error { node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels) node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...) - return p.create(ctx, node, <-pods) + return p.bind(ctx, node, <-pods) }) } -func (p *Provisioner) create(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { +func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))() // Add the Karpenter finalizer to the node to enable the termination workflow @@ -237,7 +237,7 @@ func (p *Provisioner) create(ctx context.Context, node *v1.Node, pods []*v1.Pod) // Bind pods var bound int64 workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) { - if err := p.bind(ctx, node, pods[i]); err != nil { + if err := p.coreV1Client.Pods(pods[i].Namespace).Bind(ctx, &v1.Binding{TypeMeta: pods[i].TypeMeta, ObjectMeta: pods[i].ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}, metav1.CreateOptions{}); err != nil { logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pods[i].Namespace, pods[i].Name, node.Name, err.Error()) } else { atomic.AddInt64(&bound, 1) @@ -247,26 +247,6 @@ func (p *Provisioner) create(ctx context.Context, node *v1.Node, pods []*v1.Pod) return nil } -func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error { - if err := p.coreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("binding pod %s/%s, %w", pod.Name, pod.Namespace, err) - } - for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim == nil { - continue - } - pvc := &v1.PersistentVolumeClaim{} - if err := p.kubeClient.Get(ctx, types.NamespacedName{Name: volume.PersistentVolumeClaim.ClaimName, Namespace: pod.Namespace}, pvc); err != nil { - return fmt.Errorf("getting persistent volume claim %s, %w", volume.PersistentVolumeClaim.ClaimName, err) - } - pvc.Annotations["volume.kubernetes.io/selected-node"] = node.Name - if err := p.kubeClient.Update(ctx, pvc); err != nil { - return fmt.Errorf("binding persistent volume claim %s/%s to node %q, %w", pvc.Namespace, pvc.Name, node.Name, err) - } - } - return nil -} - var bindTimeHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: metrics.Namespace, diff --git a/pkg/controllers/volume/controller.go b/pkg/controllers/volume/controller.go new file mode 100644 index 000000000000..bd805ca3da6a --- /dev/null +++ b/pkg/controllers/volume/controller.go @@ -0,0 +1,128 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volume + +import ( + "context" + "fmt" + + "github.com/aws/karpenter/pkg/utils/injection" + "github.com/aws/karpenter/pkg/utils/pod" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/logging" + + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + controllerName = "volume" + selectedNodeAnnotation = "volume.kubernetes.io/selected-node" +) + +// Controller for the resource +type Controller struct { + kubeClient client.Client +} + +// NewController is a constructor +func NewController(kubeClient client.Client) *Controller { + return &Controller{kubeClient: kubeClient} +} + +// Register the controller to the manager +func (c *Controller) Register(ctx context.Context, m manager.Manager) error { + return controllerruntime. + NewControllerManagedBy(m). + Named(controllerName). + For(&v1.PersistentVolumeClaim{}). + Watches(&source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(c.pvcForPod)). + Complete(c) +} + +// Reconcile a control loop for the resource +func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named(controllerName).With("resource", req.String())) + ctx = injection.WithNamespacedName(ctx, req.NamespacedName) + ctx = injection.WithControllerName(ctx, controllerName) + + logging.FromContext(ctx).Info("Reconciling pvc", req.Name) + pvc := &v1.PersistentVolumeClaim{} + if err := c.kubeClient.Get(ctx, req.NamespacedName, pvc); err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + if _, ok := pvc.Annotations[selectedNodeAnnotation]; ok { + return reconcile.Result{}, nil + } + pod, err := c.podForPvc(ctx, pvc) + if err != nil { + return reconcile.Result{}, err + } + if pod == nil { + logging.FromContext(ctx).Debugf("Skipping bind, no pod found for persistent volume claim") + return reconcile.Result{}, nil + } + if !c.isBindable(pod) { + logging.FromContext(ctx).Debugf("Skipping bind, pod %s/%s is not pending", pod.Namespace, pod.Name) + return reconcile.Result{}, nil + } + pvc.Annotations[selectedNodeAnnotation] = pod.Spec.NodeName + if err := c.kubeClient.Update(ctx, pvc); err != nil { + return reconcile.Result{}, fmt.Errorf("binding persistent volume claim to node %q, %w", pod.Spec.NodeName, err) + } + logging.FromContext(ctx).Infof("Bound persistent volume claim to node %s", pod.Spec.NodeName) + return reconcile.Result{}, nil +} + +func (c *Controller) podForPvc(ctx context.Context, pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) { + pods := &v1.PodList{} + if err := c.kubeClient.List(ctx, pods, client.InNamespace(pvc.Namespace)); err != nil { + return nil, err + } + for _, pod := range pods.Items { + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc.Name { + return &pod, nil + } + } + } + return nil, nil +} + +func (c *Controller) pvcForPod(o client.Object) (requests []reconcile.Request) { + if !c.isBindable(o.(*v1.Pod)) { + return requests + } + for _, volume := range o.(*v1.Pod).Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: o.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName}}) + } + return requests +} + +func (c *Controller) isBindable(p *v1.Pod) bool { + return pod.IsScheduled(p) && !pod.IsTerminal(p) && !pod.IsTerminating(p) +}