From ffd50ce76882c2e64f2aa31af473b3cd4658ceee Mon Sep 17 00:00:00 2001 From: Ellis Tarn Date: Thu, 16 Dec 2021 16:00:23 -0800 Subject: [PATCH] Experimental support for volume topology aware scheduling --- aws-ebs-csi-driver | 1 + .../karpenter/templates/controller/rbac.yaml | 8 +- pkg/apis/provisioning/v1alpha5/constraints.go | 4 +- pkg/apis/provisioning/v1alpha5/register.go | 25 ---- .../provisioning/v1alpha5/requirements.go | 98 +++++++++++---- pkg/cloudprovider/aws/cloudprovider.go | 5 + pkg/controllers/provisioning/controller.go | 4 +- pkg/controllers/provisioning/provisioner.go | 33 ++++- .../provisioning/scheduling/topology.go | 2 +- pkg/controllers/selection/controller.go | 18 ++- pkg/controllers/selection/suite_test.go | 110 +++++++++++++++++ pkg/controllers/selection/volumeTopology.go | 114 ++++++++++++++++++ 12 files changed, 355 insertions(+), 67 deletions(-) create mode 160000 aws-ebs-csi-driver create mode 100644 pkg/controllers/selection/volumeTopology.go diff --git a/aws-ebs-csi-driver b/aws-ebs-csi-driver new file mode 160000 index 000000000000..e7b739f766f9 --- /dev/null +++ b/aws-ebs-csi-driver @@ -0,0 +1 @@ +Subproject commit e7b739f766f933eade4bc71a1e23dad8f9673c88 diff --git a/charts/karpenter/templates/controller/rbac.yaml b/charts/karpenter/templates/controller/rbac.yaml index 399e2e03db61..e35d747c897b 100644 --- a/charts/karpenter/templates/controller/rbac.yaml +++ b/charts/karpenter/templates/controller/rbac.yaml @@ -49,6 +49,12 @@ rules: - apiGroups: ["karpenter.sh"] resources: ["provisioners", "provisioners/status"] verbs: ["create", "delete", "patch", "get", "list", "watch"] +- apiGroups: [""] + resources: ["persistentvolumes", "persistentvolumeclaims"] + verbs: ["get", "list", "watch", "update"] +- apiGroups: ["storage.k8s.io"] + resources: ["storageclasses"] + verbs: ["get", "list", "watch"] - apiGroups: ["coordination.k8s.io"] resources: ["leases"] verbs: ["create", "get", "patch", "update", "watch"] @@ -67,4 +73,4 @@ rules: - apiGroups: ["apps"] resources: ["daemonsets"] verbs: ["list", "watch"] ---- \ No newline at end of file +--- diff --git a/pkg/apis/provisioning/v1alpha5/constraints.go b/pkg/apis/provisioning/v1alpha5/constraints.go index c295446e3904..aa809bb9e33a 100644 --- a/pkg/apis/provisioning/v1alpha5/constraints.go +++ b/pkg/apis/provisioning/v1alpha5/constraints.go @@ -53,7 +53,7 @@ func (c *Constraints) ValidatePod(pod *v1.Pod) error { } } // The combined requirements are not compatible - combined := c.Requirements.With(podRequirements) + combined := c.Requirements.Append(podRequirements...) for _, key := range podRequirements.Keys() { if combined.Requirement(key).Len() == 0 { return fmt.Errorf("invalid nodeSelector %q, %v not in %v", key, podRequirements.Requirement(key).UnsortedList(), c.Requirements.Requirement(key).UnsortedList()) @@ -65,7 +65,7 @@ func (c *Constraints) ValidatePod(pod *v1.Pod) error { func (c *Constraints) Tighten(pod *v1.Pod) *Constraints { return &Constraints{ Labels: c.Labels, - Requirements: c.Requirements.With(PodRequirements(pod)).Consolidate().WellKnown(), + Requirements: c.Requirements.Append(PodRequirements(pod)...).Consolidate().WellKnown(), Taints: c.Taints, Provider: c.Provider, } diff --git a/pkg/apis/provisioning/v1alpha5/register.go b/pkg/apis/provisioning/v1alpha5/register.go index fd4981a9e370..300916965f52 100644 --- a/pkg/apis/provisioning/v1alpha5/register.go +++ b/pkg/apis/provisioning/v1alpha5/register.go @@ -17,12 +17,10 @@ package v1alpha5 import ( "context" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/apis" ) @@ -40,29 +38,6 @@ var ( ) var ( - // RestrictedLabels are injected by Cloud Providers - RestrictedLabels = sets.NewString( - // Used internally by provisioning logic - EmptinessTimestampAnnotationKey, - v1.LabelHostname, - ) - // These are either prohibited by the kubelet or reserved by karpenter - KarpenterLabelDomain = "karpenter.sh" - RestrictedLabelDomains = sets.NewString( - "kubernetes.io", - "k8s.io", - KarpenterLabelDomain, - ) - LabelCapacityType = KarpenterLabelDomain + "/capacity-type" - // WellKnownLabels supported by karpenter - WellKnownLabels = sets.NewString( - v1.LabelTopologyZone, - v1.LabelInstanceTypeStable, - v1.LabelArchStable, - v1.LabelOSStable, - LabelCapacityType, - v1.LabelHostname, // Used internally for hostname topology spread - ) DefaultHook = func(ctx context.Context, constraints *Constraints) {} ValidateHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil } ) diff --git a/pkg/apis/provisioning/v1alpha5/requirements.go b/pkg/apis/provisioning/v1alpha5/requirements.go index 7c48b47c86ad..bbc7c5c4201a 100644 --- a/pkg/apis/provisioning/v1alpha5/requirements.go +++ b/pkg/apis/provisioning/v1alpha5/requirements.go @@ -21,6 +21,42 @@ import ( "k8s.io/apimachinery/pkg/util/sets" ) +var ( + // RestrictedLabels are injected by Cloud Providers + RestrictedLabels = sets.NewString( + // Used internally by provisioning logic + EmptinessTimestampAnnotationKey, + v1.LabelHostname, + ) + // These are either prohibited by the kubelet or reserved by karpenter + KarpenterLabelDomain = "karpenter.sh" + RestrictedLabelDomains = sets.NewString( + "kubernetes.io", + "k8s.io", + KarpenterLabelDomain, + ) + LabelCapacityType = KarpenterLabelDomain + "/capacity-type" + // WellKnownLabels supported by the controller + WellKnownLabels = sets.NewString( + v1.LabelTopologyZone, + v1.LabelInstanceTypeStable, + v1.LabelArchStable, + v1.LabelOSStable, + LabelCapacityType, + v1.LabelHostname, // Used internally for hostname topology spread + ) + // NormalizedLabels translate aliased concepts into the controller's + // WellKnownLabels. Pod requirements are translated for compatibility, + // however, Provisioner labels are still restricted to WellKnownLabels. + // Additional labels may be injected by cloud providers. + NormalizedLabels = map[string]string{ + v1.LabelFailureDomainBetaZone: v1.LabelTopologyZone, + "beta.kubernetes.io/arch": v1.LabelArchStable, + "beta.kubernetes.io/os": v1.LabelOSStable, + v1.LabelInstanceType: v1.LabelInstanceTypeStable, + } +) + // Requirements is a decorated alias type for []v1.NodeSelectorRequirements type Requirements []v1.NodeSelectorRequirement @@ -44,20 +80,50 @@ func (r Requirements) CapacityTypes() sets.String { return r.Requirement(LabelCapacityType) } -func (r Requirements) With(requirements Requirements) Requirements { - return append(r, requirements...) +func (r Requirements) Append(requirements ...v1.NodeSelectorRequirement) Requirements { + return append(r, Requirements(requirements).Normalize()...) +} + +// Normalize the requirements to use WellKnownLabels +func (r Requirements) Normalize() Requirements { + normalized := Requirements{} + for _, requirement := range r { + label := requirement.Key + if normalized, ok := NormalizedLabels[requirement.Key]; ok { + label = normalized + } + normalized = append(normalized, v1.NodeSelectorRequirement{Key: label, Operator: requirement.Operator, Values: requirement.Values}) + } + return normalized +} + +// Consolidate combines In and NotIn requirements for each unique key, producing +// an equivalent minimal representation of the requirements. This is useful as +// requirements may be appended from a variety of sources and then consolidated. +// Caution: If a key has contains a `NotIn` operator without a corresponding +// `In` operator, the requirement will permanently be [] after consolidation. To +// avoid this, include the broadest `In` requirements before consolidating. +func (r Requirements) Consolidate() (requirements Requirements) { + for _, key := range r.Keys() { + requirements = requirements.Append(v1.NodeSelectorRequirement{ + Key: key, + Operator: v1.NodeSelectorOpIn, + Values: r.Requirement(key).UnsortedList(), + }) + } + return requirements } func LabelRequirements(labels map[string]string) (r Requirements) { for key, value := range labels { - r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) + r = r.Append(v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) } return r } func PodRequirements(pod *v1.Pod) (r Requirements) { for key, value := range pod.Spec.NodeSelector { - r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) + r = r.Append(v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) } if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil { return r @@ -65,37 +131,21 @@ func PodRequirements(pod *v1.Pod) (r Requirements) { // Select heaviest preference and treat as a requirement. An outer loop will iteratively unconstrain them if unsatisfiable. if preferred := pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution; len(preferred) > 0 { sort.Slice(preferred, func(i int, j int) bool { return preferred[i].Weight > preferred[j].Weight }) - r = append(r, preferred[0].Preference.MatchExpressions...) + r = r.Append(preferred[0].Preference.MatchExpressions...) } // Select first requirement. An outer loop will iteratively remove OR requirements if unsatisfiable if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil && len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 { - r = append(r, pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions...) + r = r.Append(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions...) } return r } -// Consolidate combines In and NotIn requirements for each unique key, producing -// an equivalent minimal representation of the requirements. This is useful as -// requirements may be appended from a variety of sources and then consolidated. -// Caution: If a key has contains a `NotIn` operator without a corresponding -// `In` operator, the requirement will permanently be [] after consolidation. To -// avoid this, include the broadest `In` requirements before consolidating. -func (r Requirements) Consolidate() (requirements Requirements) { - for _, key := range r.Keys() { - requirements = append(requirements, v1.NodeSelectorRequirement{ - Key: key, - Operator: v1.NodeSelectorOpIn, - Values: r.Requirement(key).UnsortedList(), - }) - } - return requirements -} - func (r Requirements) WellKnown() (requirements Requirements) { for _, requirement := range r { if WellKnownLabels.Has(requirement.Key) { - requirements = append(requirements, requirement) + requirements = requirements.Append(requirement) + } } return requirements diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index 277748f00266..ba9d956712aa 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -28,6 +28,7 @@ import ( "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" + "github.com/aws/karpenter/pkg/utils/functional" "github.com/aws/karpenter/pkg/utils/parallel" "github.com/aws/karpenter/pkg/utils/project" @@ -55,6 +56,10 @@ const ( CacheCleanupInterval = 10 * time.Minute ) +func init() { + v1alpha5.NormalizedLabels = functional.UnionStringMaps(v1alpha5.NormalizedLabels, map[string]string{"topology.ebs.csi.aws.com/zone": v1.LabelTopologyZone}) +} + type CloudProvider struct { instanceTypeProvider *InstanceTypeProvider subnetProvider *SubnetProvider diff --git a/pkg/controllers/provisioning/controller.go b/pkg/controllers/provisioning/controller.go index b8c1d3663fbc..2f5560ad35e2 100644 --- a/pkg/controllers/provisioning/controller.go +++ b/pkg/controllers/provisioning/controller.go @@ -96,8 +96,8 @@ func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisione } provisioner.Spec.Labels = functional.UnionStringMaps(provisioner.Spec.Labels, map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}) provisioner.Spec.Requirements = provisioner.Spec.Requirements. - With(requirements(instanceTypes)). - With(v1alpha5.LabelRequirements(provisioner.Spec.Labels)). + Append(requirements(instanceTypes)...). + Append(v1alpha5.LabelRequirements(provisioner.Spec.Labels)...). Consolidate() // Update the provisioner if anything has changed if c.hasChanged(ctx, provisioner) { diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index a237f76e46e7..589a3ea7264f 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -202,11 +202,11 @@ func (p *Provisioner) launch(ctx context.Context, constraints *v1alpha5.Constrai return <-p.cloudProvider.Create(ctx, constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error { node.Labels = functional.UnionStringMaps(node.Labels, constraints.Labels) node.Spec.Taints = append(node.Spec.Taints, constraints.Taints...) - return p.bind(ctx, node, <-pods) + return p.create(ctx, node, <-pods) }) } -func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { +func (p *Provisioner) create(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { defer metrics.Measure(bindTimeHistogram.WithLabelValues(injection.GetNamespacedName(ctx).Name))() // Add the Karpenter finalizer to the node to enable the termination workflow @@ -237,10 +237,8 @@ func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) ( // Bind pods var bound int64 workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) { - pod := pods[i] - binding := &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}} - if err := p.coreV1Client.Pods(pods[i].Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil { - logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pod.Namespace, pod.Name, node.Name, err.Error()) + if err := p.bind(ctx, node, pods[i]); err != nil { + logging.FromContext(ctx).Errorf("Failed to bind %s/%s to %s, %s", pods[i].Namespace, pods[i].Name, node.Name, err.Error()) } else { atomic.AddInt64(&bound, 1) } @@ -249,6 +247,29 @@ func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) ( return nil } +func (p *Provisioner) bind(ctx context.Context, node *v1.Node, pod *v1.Pod) error { + if err := p.coreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{TypeMeta: pod.TypeMeta, ObjectMeta: pod.ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("binding pod %s/%s, %w", pod.Name, pod.Namespace, err) + } + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + pvc := &v1.PersistentVolumeClaim{} + if err := p.kubeClient.Get(ctx, types.NamespacedName{Name: volume.PersistentVolumeClaim.ClaimName, Namespace: pod.Namespace}, pvc); err != nil { + return fmt.Errorf("getting persistent volume claim %s, %w", volume.PersistentVolumeClaim.ClaimName, err) + } + // if _, ok := pvc.Annotations["volume.kubernetes.io/selected-node"]; ok { + // continue + // } + pvc.Annotations["volume.kubernetes.io/selected-node"] = node.Name + if err := p.kubeClient.Update(ctx, pvc); err != nil { + return fmt.Errorf("binding persistent volume claim %s/%s to node %q, %w", pvc.Namespace, pvc.Name, node.Name, err) + } + } + return nil +} + var bindTimeHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: metrics.Namespace, diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index 01fd1bae0c04..875b42380231 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -46,7 +46,7 @@ func (t *Topology) Inject(ctx context.Context, constraints *v1alpha5.Constraints return fmt.Errorf("computing topology, %w", err) } for _, pod := range topologyGroup.Pods { - domain := topologyGroup.NextDomain(constraints.Requirements.With(v1alpha5.PodRequirements(pod)).Requirement(topologyGroup.Constraint.TopologyKey)) + domain := topologyGroup.NextDomain(constraints.Requirements.Append(v1alpha5.PodRequirements(pod)...).Requirement(topologyGroup.Constraint.TopologyKey)) pod.Spec.NodeSelector = functional.UnionStringMaps(pod.Spec.NodeSelector, map[string]string{topologyGroup.Constraint.TopologyKey: domain}) } } diff --git a/pkg/controllers/selection/controller.go b/pkg/controllers/selection/controller.go index 3e40d223c153..c60eb446f73b 100644 --- a/pkg/controllers/selection/controller.go +++ b/pkg/controllers/selection/controller.go @@ -37,17 +37,19 @@ import ( // Controller for the resource type Controller struct { - kubeClient client.Client - provisioners *provisioning.Controller - preferences *Preferences + kubeClient client.Client + provisioners *provisioning.Controller + preferences *Preferences + volumeTopology *VolumeTopology } // NewController constructs a controller instance func NewController(kubeClient client.Client, provisioners *provisioning.Controller) *Controller { return &Controller{ - kubeClient: kubeClient, - provisioners: provisioners, - preferences: NewPreferences(), + kubeClient: kubeClient, + provisioners: provisioners, + preferences: NewPreferences(), + volumeTopology: NewVolumeTopology(kubeClient), } } @@ -80,6 +82,10 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco func (c *Controller) selectProvisioner(ctx context.Context, pod *v1.Pod) (errs error) { // Relax preferences if pod has previously failed to schedule. c.preferences.Relax(ctx, pod) + // Inject volume topological requirements + if err := c.volumeTopology.Inject(ctx, pod); err != nil { + return fmt.Errorf("getting volume topology requirements, %w", err) + } // Pick provisioner var provisioner *provisioning.Provisioner provisioners := c.provisioners.List(ctx) diff --git a/pkg/controllers/selection/suite_test.go b/pkg/controllers/selection/suite_test.go index 1cb864896dc2..0df6c530e409 100644 --- a/pkg/controllers/selection/suite_test.go +++ b/pkg/controllers/selection/suite_test.go @@ -25,6 +25,7 @@ import ( "github.com/aws/karpenter/pkg/controllers/selection" "github.com/aws/karpenter/pkg/test" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -72,6 +73,115 @@ var _ = AfterEach(func() { ExpectProvisioningCleanedUp(ctx, env.Client, provisioners) }) +var _ = Describe("Volume Topology", func() { +}) + +var _ = Describe("Preferential Fallback", func() { + Context("Required", func() { + It("should not relax the final term", func() { + provisioner.Spec.Requirements = v1alpha5.Requirements{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}, + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"default-instance-type"}}, + } + pod := test.UnschedulablePod() + pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{NodeSelectorTerms: []v1.NodeSelectorTerm{ + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, // Should not be relaxed + }}, + }}}} + // Don't relax + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + + // Don't relax + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + }) + It("should relax multiple terms", func() { + pod := test.UnschedulablePod() + pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{NodeSelectorTerms: []v1.NodeSelectorTerm{ + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, + }}, + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, + }}, + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}, + }}, + {MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}, // OR operator, never get to this one + }}, + }}}} + // Remove first term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Remove second term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Success + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1")) + }) + }) + Context("Preferred", func() { + It("should relax all terms", func() { + pod := test.UnschedulablePod() + pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, + }}, + }, + { + Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"invalid"}}, + }}, + }, + }}} + // Remove first term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Remove second term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Success + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectScheduled(ctx, env.Client, pod) + }) + It("should relax to use lighter weights", func() { + provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2"}}} + pod := test.UnschedulablePod() + pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Weight: 100, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-3"}}, + }}, + }, + { + Weight: 50, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}}, + }}, + }, + { + Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ // OR operator, never get to this one + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}, + }}, + }, + }}} + // Remove heaviest term + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + ExpectNotScheduled(ctx, env.Client, pod) + // Success + pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) + }) + }) +}) + + var _ = Describe("Multiple Provisioners", func() { It("should schedule to an explicitly selected provisioner", func() { provisioner2 := provisioner.DeepCopy() diff --git a/pkg/controllers/selection/volumeTopology.go b/pkg/controllers/selection/volumeTopology.go new file mode 100644 index 000000000000..ade0f7746aab --- /dev/null +++ b/pkg/controllers/selection/volumeTopology.go @@ -0,0 +1,114 @@ +package selection + +import ( + "context" + "fmt" + + "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func NewVolumeTopology(kubeClient client.Client) *VolumeTopology { + return &VolumeTopology{kubeClient: kubeClient} +} + +type VolumeTopology struct { + kubeClient client.Client +} + +func (v *VolumeTopology) Inject(ctx context.Context, pod *v1.Pod) error { + var requirements v1alpha5.Requirements + for _, volume := range pod.Spec.Volumes { + req, err := v.getRequirements(ctx, pod, volume) + if err != nil { + return err + } + requirements = append(requirements, req...) + } + if len(requirements) == 0 { + return nil + } + if pod.Spec.Affinity == nil { + pod.Spec.Affinity = &v1.Affinity{} + } + if pod.Spec.Affinity.NodeAffinity == nil { + pod.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{} + } + if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{} + } + if len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 { + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{{}} + } + for _, requirement := range requirements { + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions = append( + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions, requirement) + } + return nil +} + +func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) (v1alpha5.Requirements, error) { + // Get PVC + if volume.PersistentVolumeClaim == nil { + return nil, nil + } + pvc := &v1.PersistentVolumeClaim{} + if err := v.kubeClient.Get(ctx, types.NamespacedName{Name: volume.PersistentVolumeClaim.ClaimName, Namespace: pod.Namespace}, pvc); err != nil { + return nil, fmt.Errorf("getting persistent volume claim %s, %w", volume.PersistentVolumeClaim.ClaimName, err) + } + // Persistent Volume Requirements + if pvc.Spec.VolumeName != "" { + requirements, err := v.getPersistentVolumeRequirements(ctx, pod, pvc) + if err != nil { + return nil, fmt.Errorf("getting existing requirements, %w", err) + } + return requirements, nil + } + // Storage Class Requirements + if ptr.StringValue(pvc.Spec.StorageClassName) != "" { + requirements, err := v.getStorageClassRequirements(ctx, pvc) + if err != nil { + return nil, err + } + return requirements, nil + } + return nil, nil +} + +func (v *VolumeTopology) getStorageClassRequirements(ctx context.Context, pvc *v1.PersistentVolumeClaim) (v1alpha5.Requirements, error) { + storageClass := &storagev1.StorageClass{} + if err := v.kubeClient.Get(ctx, types.NamespacedName{Name: ptr.StringValue(pvc.Spec.StorageClassName)}, storageClass); err != nil { + return nil, fmt.Errorf("getting storage class %q, %w", ptr.StringValue(pvc.Spec.StorageClassName), err) + } + var requirements v1alpha5.Requirements + if len(storageClass.AllowedTopologies) > 0 { + // Terms are ORed, only use the first term + for _, requirement := range storageClass.AllowedTopologies[0].MatchLabelExpressions { + requirements = append(requirements, v1.NodeSelectorRequirement{Key: requirement.Key, Operator: v1.NodeSelectorOpIn, Values: requirement.Values}) + } + } + return requirements, nil +} + +func (v *VolumeTopology) getPersistentVolumeRequirements(ctx context.Context, pod *v1.Pod, pvc *v1.PersistentVolumeClaim) (v1alpha5.Requirements, error) { + pv := &v1.PersistentVolume{} + if err := v.kubeClient.Get(ctx, types.NamespacedName{Name: pvc.Spec.VolumeName, Namespace: pod.Namespace}, pv); err != nil { + return nil, fmt.Errorf("getting persistent volume %q, %w", pvc.Spec.VolumeName, err) + } + if pv.Spec.NodeAffinity == nil { + return nil, nil + } + if pv.Spec.NodeAffinity.Required == nil { + return nil, nil + } + var requirements v1alpha5.Requirements + if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) > 0 { + // Terms are ORed, only use the first term + requirements = append(requirements, pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions...) + } + return requirements, nil +}