Skip to content

Commit

Permalink
Implemented volume topology aware scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Dec 17, 2021
1 parent b405f79 commit 1db0bda
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 6 deletions.
23 changes: 17 additions & 6 deletions pkg/controllers/selection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/controllers/provisioning"
"github.com/aws/karpenter/pkg/utils/pod"
"github.com/go-logr/zapr"
Expand All @@ -37,17 +38,19 @@ import (

// Controller for the resource
type Controller struct {
kubeClient client.Client
provisioners *provisioning.Controller
preferences *Preferences
kubeClient client.Client
provisioners *provisioning.Controller
preferences *Preferences
volumeTopology *VolumeTopology
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client, provisioners *provisioning.Controller) *Controller {
return &Controller{
kubeClient: kubeClient,
provisioners: provisioners,
preferences: NewPreferences(),
kubeClient: kubeClient,
provisioners: provisioners,
preferences: NewPreferences(),
volumeTopology: NewVolumeTopology(kubeClient),
}
}

Expand Down Expand Up @@ -80,6 +83,14 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) {
// Relax preferences if pod has previously failed to schedule.
c.preferences.Relax(ctx, pod)

requirements, err := c.volumeTopology.GetRequirements(ctx, pod)
if err != nil {
return fmt.Errorf("getting volume topology requirements")
}

requirements = requirements.With(v1alpha5.PodRequirements(pod))

// Pick provisioner
var provisioner *provisioning.Provisioner
provisioners := c.provisioners.List(ctx)
Expand Down
110 changes: 110 additions & 0 deletions pkg/controllers/selection/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/aws/karpenter/pkg/controllers/selection"
"github.com/aws/karpenter/pkg/test"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

Expand Down Expand Up @@ -72,6 +73,115 @@ var _ = AfterEach(func() {
ExpectProvisioningCleanedUp(ctx, env.Client, provisioners)
})

var _ = Describe("Volume Topology", func() {
})

var _ = Describe("Preferential Fallback", func() {
Context("Required", func() {
It("should not relax the final term", func() {
provisioner.Spec.Requirements = v1alpha5.Requirements{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}},
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"default-instance-type"}},
}
pod := test.UnschedulablePod()
pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{NodeSelectorTerms: []v1.NodeSelectorTerm{
{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, // Should not be relaxed
}},
}}}}
// Don't relax
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
ExpectNotScheduled(ctx, env.Client, pod)

// Don't relax
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
ExpectNotScheduled(ctx, env.Client, pod)
})
It("should relax multiple terms", func() {
pod := test.UnschedulablePod()
pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{NodeSelectorTerms: []v1.NodeSelectorTerm{
{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}},
}},
{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}},
}},
{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}},
}},
{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}, // OR operator, never get to this one
}},
}}}}
// Remove first term
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
ExpectNotScheduled(ctx, env.Client, pod)
// Remove second term
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
ExpectNotScheduled(ctx, env.Client, pod)
// Success
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1"))
})
})
Context("Preferred", func() {
It("should relax all terms", func() {
pod := test.UnschedulablePod()
pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{
Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}},
}},
},
{
Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}},
}},
},
}}}
// Remove first term
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
ExpectNotScheduled(ctx, env.Client, pod)
// Remove second term
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
ExpectNotScheduled(ctx, env.Client, pod)
// Success
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
ExpectScheduled(ctx, env.Client, pod)
})
It("should relax to use lighter weights", func() {
provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2"}}}
pod := test.UnschedulablePod()
pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
{
Weight: 100, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-3"}},
}},
},
{
Weight: 50, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}},
}},
},
{
Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ // OR operator, never get to this one
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}},
}},
},
}}}
// Remove heaviest term
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
ExpectNotScheduled(ctx, env.Client, pod)
// Success
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2"))
})
})
})


var _ = Describe("Multiple Provisioners", func() {
It("should schedule to an explicitly selected provisioner", func() {
provisioner2 := provisioner.DeepCopy()
Expand Down
95 changes: 95 additions & 0 deletions pkg/controllers/selection/volumeTopology.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package selection

import (
"context"
"fmt"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func NewVolumeTopology(kubeClient client.Client) *VolumeTopology {
return &VolumeTopology{kubeClient: kubeClient}
}

type VolumeTopology struct {
kubeClient client.Client
}

func (v *VolumeTopology) GetRequirements(ctx context.Context, pod *v1.Pod) (v1alpha5.Requirements, error) {
var requirements v1alpha5.Requirements
for _, volume := range pod.Spec.Volumes {
req, err := v.getRequirements(ctx, pod, volume)
if err != nil {
return nil, err
}
requirements = append(requirements, req...)
}
return requirements, nil
}

func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) (v1alpha5.Requirements, error) {
// Get PVC
if volume.PersistentVolumeClaim == nil {
return nil, nil
}
pvc := &v1.PersistentVolumeClaim{}
if err := v.kubeClient.Get(ctx, types.NamespacedName{Name: volume.PersistentVolumeClaim.ClaimName, Namespace: pod.Namespace}, pvc); err != nil {
return nil, fmt.Errorf("getting persistent volume claim %s, %w", volume.PersistentVolumeClaim.ClaimName, err)
}
// Persistent Volume Requirements
if pvc.Spec.VolumeName != "" {
requirements, err := v.getPersistentVolumeRequirements(ctx, pod, pvc)
if err != nil {
return nil, fmt.Errorf("getting existing requirements, %w", err)
}
return requirements, nil
}
// Storage Class Requirements
if ptr.StringValue(pvc.Spec.StorageClassName) != "" {
requirements, err := v.getStorageClassRequirements(ctx, pvc)
if err != nil {
return nil, err
}
return requirements, nil
}
return nil, nil
}

func (v *VolumeTopology) getStorageClassRequirements(ctx context.Context, pvc *v1.PersistentVolumeClaim) (v1alpha5.Requirements, error) {
storageClass := &storagev1.StorageClass{}
if err := v.kubeClient.Get(ctx, types.NamespacedName{Name: ptr.StringValue(pvc.Spec.StorageClassName)}, storageClass); err != nil {
return nil, fmt.Errorf("getting storage class %q, %w", ptr.StringValue(pvc.Spec.StorageClassName), err)
}
var requirements v1alpha5.Requirements
if len(storageClass.AllowedTopologies) > 0 {
// Terms are ORed, only use the first term
for _, requirement := range storageClass.AllowedTopologies[0].MatchLabelExpressions {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: requirement.Key, Operator: v1.NodeSelectorOpIn, Values: requirement.Values})
}
}
return requirements, nil
}

func (v *VolumeTopology) getPersistentVolumeRequirements(ctx context.Context, pod *v1.Pod, pvc *v1.PersistentVolumeClaim) (v1alpha5.Requirements, error) {
pv := &v1.PersistentVolume{}
if err := v.kubeClient.Get(ctx, types.NamespacedName{Name: pvc.Spec.VolumeName, Namespace: pod.Namespace}, pv); err != nil {
return nil, fmt.Errorf("getting persistent volume %q, %w", pvc.Spec.VolumeName, err)
}
if pv.Spec.NodeAffinity == nil {
return nil, nil
}
if pv.Spec.NodeAffinity.Required == nil {
return nil, nil
}
var requirements v1alpha5.Requirements
if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) > 0 {
// Terms are ORed, only use the first term
requirements = append(requirements, pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions...)
}
return requirements, nil
}

0 comments on commit 1db0bda

Please sign in to comment.