From fdd88e291810434775b024e9538628ad81a8bc41 Mon Sep 17 00:00:00 2001 From: Ellis Tarn Date: Thu, 16 Dec 2021 16:00:23 -0800 Subject: [PATCH] Implemented volume topology aware scheduling --- pkg/controllers/selection/controller.go | 18 ++-- pkg/controllers/selection/suite_test.go | 110 +++++++++++++++++++ pkg/controllers/selection/volumeTopology.go | 114 ++++++++++++++++++++ 3 files changed, 236 insertions(+), 6 deletions(-) create mode 100644 pkg/controllers/selection/volumeTopology.go diff --git a/pkg/controllers/selection/controller.go b/pkg/controllers/selection/controller.go index 3e40d223c153..c60eb446f73b 100644 --- a/pkg/controllers/selection/controller.go +++ b/pkg/controllers/selection/controller.go @@ -37,17 +37,19 @@ import ( // Controller for the resource type Controller struct { - kubeClient client.Client - provisioners *provisioning.Controller - preferences *Preferences + kubeClient client.Client + provisioners *provisioning.Controller + preferences *Preferences + volumeTopology *VolumeTopology } // NewController constructs a controller instance func NewController(kubeClient client.Client, provisioners *provisioning.Controller) *Controller { return &Controller{ - kubeClient: kubeClient, - provisioners: provisioners, - preferences: NewPreferences(), + kubeClient: kubeClient, + provisioners: provisioners, + preferences: NewPreferences(), + volumeTopology: NewVolumeTopology(kubeClient), } } @@ -80,6 +82,10 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) { // Relax preferences if pod has previously failed to schedule. c.preferences.Relax(ctx, pod) + // Inject volume topological requirements + if err := c.volumeTopology.Inject(ctx, pod); err != nil { + return fmt.Errorf("getting volume topology requirements, %w", err) + } // Pick provisioner var provisioner *provisioning.Provisioner provisioners := c.provisioners.List(ctx) diff --git a/pkg/controllers/selection/suite_test.go b/pkg/controllers/selection/suite_test.go index 1cb864896dc2..0df6c530e409 100644 --- a/pkg/controllers/selection/suite_test.go +++ b/pkg/controllers/selection/suite_test.go @@ -25,6 +25,7 @@ import ( "github.com/aws/karpenter/pkg/controllers/selection" "github.com/aws/karpenter/pkg/test" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -72,6 +73,115 @@ var _ = AfterEach(func() { ExpectProvisioningCleanedUp(ctx, env.Client, provisioners) }) +var _ = Describe("Volume Topology", func() { +}) + +var _ = Describe("Preferential Fallback", func() { + Context("Required", func() { + It("should not relax the final term", func() { + provisioner.Spec.Requirements = v1alpha5.Requirements{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}, + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"default-instance-type"}}, + } + pod := test.UnschedulablePod() + pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{NodeSelectorTerms: []v1.NodeSelectorTerm{ + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, // Should not be relaxed + }}, + }}}} + // Don't relax + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + + // Don't relax + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + }) + It("should relax multiple terms", func() { + pod := test.UnschedulablePod() + pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{NodeSelectorTerms: []v1.NodeSelectorTerm{ + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, + }}, + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, + }}, + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}, + }}, + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}, // OR operator, never get to this one + }}, + }}}} + // Remove first term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Remove second term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Success + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1")) + }) + }) + Context("Preferred", func() { + It("should relax all terms", func() { + pod := test.UnschedulablePod() + pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, + }}, + }, + { + Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, + }}, + }, + }}} + // Remove first term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Remove second term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Success + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectScheduled(ctx, env.Client, pod) + }) + It("should relax to use lighter weights", func() { + provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2"}}} + pod := test.UnschedulablePod() + pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Weight: 100, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-3"}}, + }}, + }, + { + Weight: 50, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}, + }}, + }, + { + Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ // OR operator, never get to this one + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}, + }}, + }, + }}} + // Remove heaviest term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Success + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) + }) + }) +}) + + var _ = Describe("Multiple Provisioners", func() { It("should schedule to an explicitly selected provisioner", func() { provisioner2 := provisioner.DeepCopy() diff --git a/pkg/controllers/selection/volumeTopology.go b/pkg/controllers/selection/volumeTopology.go new file mode 100644 index 000000000000..e68fe80ece22 --- /dev/null +++ b/pkg/controllers/selection/volumeTopology.go @@ -0,0 +1,114 @@ +package selection + +import ( + "context" + "fmt" + + "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func NewVolumeTopology(kubeClient client.Client) *VolumeTopology { + return &VolumeTopology{kubeClient: kubeClient} +} + +type VolumeTopology struct { + kubeClient client.Client +} + +func (v *VolumeTopology) Inject(ctx context.Context, pod *v1.Pod) error { + var requirements v1alpha5.Requirements + for _, volume := range pod.Spec.Volumes { + req, err := v.getRequirements(ctx, pod, volume) + if err != nil { + return err + } + requirements = append(requirements, req...) + } + if len(requirements) == 0 { + return nil + } + if pod.Spec.Affinity == nil { + pod.Spec.Affinity = &v1.Affinity{} + } + if pod.Spec.Affinity.NodeAffinity == nil { + pod.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{} + } + if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{} + } + if len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 { + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0] = v1.NodeSelectorTerm{} + } + for _, requirement := range requirements { + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = append( + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, requirement) + } + return nil +} + +func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) (v1alpha5.Requirements, error) { + // Get PVC + if volume.PersistentVolumeClaim == nil { + return nil, nil + } + pvc := &v1.PersistentVolumeClaim{} + if err := v.kubeClient.Get(ctx, types.NamespacedName{Name: volume.PersistentVolumeClaim.ClaimName, Namespace: pod.Namespace}, pvc); err != nil { + return nil, fmt.Errorf("getting persistent volume claim %s, %w", volume.PersistentVolumeClaim.ClaimName, err) + } + // Persistent Volume Requirements + if pvc.Spec.VolumeName != "" { + requirements, err := v.getPersistentVolumeRequirements(ctx, pod, pvc) + if err != nil { + return nil, fmt.Errorf("getting existing requirements, %w", err) + } + return requirements, nil + } + // Storage Class Requirements + if ptr.StringValue(pvc.Spec.StorageClassName) != "" { + requirements, err := v.getStorageClassRequirements(ctx, pvc) + if err != nil { + return nil, err + } + return requirements, nil + } + return nil, nil +} + +func (v *VolumeTopology) getStorageClassRequirements(ctx context.Context, pvc *v1.PersistentVolumeClaim) (v1alpha5.Requirements, error) { + storageClass := &storagev1.StorageClass{} + if err := v.kubeClient.Get(ctx, types.NamespacedName{Name: ptr.StringValue(pvc.Spec.StorageClassName)}, storageClass); err != nil { + return nil, fmt.Errorf("getting storage class %q, %w", ptr.StringValue(pvc.Spec.StorageClassName), err) + } + var requirements v1alpha5.Requirements + if len(storageClass.AllowedTopologies) > 0 { + // Terms are ORed, only use the first term + for _, requirement := range storageClass.AllowedTopologies[0].MatchLabelExpressions { + requirements = append(requirements, v1.NodeSelectorRequirement{Key: requirement.Key, Operator: v1.NodeSelectorOpIn, Values: requirement.Values}) + } + } + return requirements, nil +} + +func (v *VolumeTopology) getPersistentVolumeRequirements(ctx context.Context, pod *v1.Pod, pvc *v1.PersistentVolumeClaim) (v1alpha5.Requirements, error) { + pv := &v1.PersistentVolume{} + if err := v.kubeClient.Get(ctx, types.NamespacedName{Name: pvc.Spec.VolumeName, Namespace: pod.Namespace}, pv); err != nil { + return nil, fmt.Errorf("getting persistent volume %q, %w", pvc.Spec.VolumeName, err) + } + if pv.Spec.NodeAffinity == nil { + return nil, nil + } + if pv.Spec.NodeAffinity.Required == nil { + return nil, nil + } + var requirements v1alpha5.Requirements + if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) > 0 { + // Terms are ORed, only use the first term + requirements = append(requirements, pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions...) + } + return requirements, nil +}