Skip to content

Commit

Permalink
PR Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Dec 30, 2021
1 parent cec6215 commit 889c5e1
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 32 deletions.
2 changes: 2 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/controllers/selection"
"github.com/aws/karpenter/pkg/controllers/termination"
"github.com/aws/karpenter/pkg/controllers/volume"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/options"
"github.com/go-logr/zapr"
Expand Down Expand Up @@ -91,6 +92,7 @@ func main() {
if err := manager.RegisterControllers(ctx,
provisioningController,
selection.NewController(manager.GetClient(), provisioningController),
volume.NewController(manager.GetClient()),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider),
node.NewController(manager.GetClient()),
metrics.NewController(manager.GetClient(), cloudProvider),
Expand Down
12 changes: 3 additions & 9 deletions pkg/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func NewManagerOrDie(ctx context.Context, config *rest.Config, options controlle
if err != nil {
panic(fmt.Sprintf("Failed to create controller newManager, %s", err.Error()))
}
if err := newManager.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.nodeName", podSchedulingIndex); err != nil {
if err := newManager.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.nodeName", func(o client.Object) []string {
return []string{o.(*v1.Pod).Spec.NodeName}
}); err != nil {
panic(fmt.Sprintf("Failed to setup pod indexer, %s", err.Error()))
}
return &GenericControllerManager{Manager: newManager}
Expand All @@ -57,11 +59,3 @@ func (m *GenericControllerManager) RegisterControllers(ctx context.Context, cont
}
return m
}

func podSchedulingIndex(object client.Object) []string {
pod, ok := object.(*v1.Pod)
if !ok {
return nil
}
return []string{pod.Spec.NodeName}
}
26 changes: 3 additions & 23 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constrai
return p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error {
node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels)
node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...)
return p.create(ctx, node, <-pods)
return p.bind(ctx, node, <-pods)
})
}

func (p *Provisioner) create(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) {
func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) {
defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))()

// Add the Karpenter finalizer to the node to enable the termination workflow
Expand Down Expand Up @@ -237,7 +237,7 @@ func (p *Provisioner) create(ctx context.Context, node *v1.Node, pods []*v1.Pod)
// Bind pods
var bound int64
workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) {
if err := p.bind(ctx, node, pods[i]); err != nil {
if err := p.coreV1Client.Pods(pods[i].Namespace).Bind(ctx, &v1.Binding{TypeMeta: pods[i].TypeMeta, ObjectMeta: pods[i].ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}, metav1.CreateOptions{}); err != nil {
logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pods[i].Namespace, pods[i].Name, node.Name, err.Error())
} else {
atomic.AddInt64(&bound, 1)
Expand All @@ -247,26 +247,6 @@ func (p *Provisioner) create(ctx context.Context, node *v1.Node, pods []*v1.Pod)
return nil
}

func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error {
if err := p.coreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("binding pod %s/%s, %w", pod.Name, pod.Namespace, err)
}
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
pvc := &v1.PersistentVolumeClaim{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Name: volume.PersistentVolumeClaim.ClaimName, Namespace: pod.Namespace}, pvc); err != nil {
return fmt.Errorf("getting persistent volume claim %s, %w", volume.PersistentVolumeClaim.ClaimName, err)
}
pvc.Annotations["volume.kubernetes.io/selected-node"] = node.Name
if err := p.kubeClient.Update(ctx, pvc); err != nil {
return fmt.Errorf("binding persistent volume claim %s/%s to node %q, %w", pvc.Namespace, pvc.Name, node.Name, err)
}
}
return nil
}

var bindTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down
128 changes: 128 additions & 0 deletions pkg/controllers/volume/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package volume

import (
"context"
"fmt"

"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/pod"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/logging"

controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

const (
controllerName = "volume"
selectedNodeAnnotation = "volume.kubernetes.io/selected-node"
)

// Controller for the resource
type Controller struct {
kubeClient client.Client
}

// NewController is a constructor
func NewController(kubeClient client.Client) *Controller {
return &Controller{kubeClient: kubeClient}
}

// Register the controller to the manager
func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
return controllerruntime.
NewControllerManagedBy(m).
Named(controllerName).
For(&v1.PersistentVolumeClaim{}).
Watches(&source.Kind{Type: &v1.Pod{}}, handler.EnqueueRequestsFromMapFunc(c.pvcForPod)).
Complete(c)
}

// Reconcile a control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named(controllerName).With("resource", req.String()))
ctx = injection.WithNamespacedName(ctx, req.NamespacedName)
ctx = injection.WithControllerName(ctx, controllerName)

logging.FromContext(ctx).Info("Reconciling pvc", req.Name)
pvc := &v1.PersistentVolumeClaim{}
if err := c.kubeClient.Get(ctx, req.NamespacedName, pvc); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
if _, ok := pvc.Annotations[selectedNodeAnnotation]; ok {
return reconcile.Result{}, nil
}
pod, err := c.podForPvc(ctx, pvc)
if err != nil {
return reconcile.Result{}, err
}
if pod == nil {
logging.FromContext(ctx).Debugf("Skipping bind, no pod found for persistent volume claim")
return reconcile.Result{}, nil
}
if !c.isBindable(pod) {
logging.FromContext(ctx).Debugf("Skipping bind, pod %s/%s is not pending", pod.Namespace, pod.Name)
return reconcile.Result{}, nil
}
pvc.Annotations[selectedNodeAnnotation] = pod.Spec.NodeName
if err := c.kubeClient.Update(ctx, pvc); err != nil {
return reconcile.Result{}, fmt.Errorf("binding persistent volume claim to node %q, %w", pod.Spec.NodeName, err)
}
logging.FromContext(ctx).Infof("Bound persistent volume claim to node %s", pod.Spec.NodeName)
return reconcile.Result{}, nil
}

func (c *Controller) podForPvc(ctx context.Context, pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) {
pods := &v1.PodList{}
if err := c.kubeClient.List(ctx, pods, client.InNamespace(pvc.Namespace)); err != nil {
return nil, err
}
for _, pod := range pods.Items {
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvc.Name {
return &pod, nil
}
}
}
return nil, nil
}

func (c *Controller) pvcForPod(o client.Object) (requests []reconcile.Request) {
if !c.isBindable(o.(*v1.Pod)) {
return requests
}
for _, volume := range o.(*v1.Pod).Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: o.GetNamespace(), Name: volume.PersistentVolumeClaim.ClaimName}})
}
return requests
}

func (c *Controller) isBindable(p *v1.Pod) bool {
return pod.IsScheduled(p) && !pod.IsTerminal(p) && !pod.IsTerminating(p)
}

0 comments on commit 889c5e1

Please sign in to comment.