diff --git a/pkg/apis/provisioning/v1alpha5/requirements.go b/pkg/apis/provisioning/v1alpha5/requirements.go index eb4a7baf1045..db4a0f112e87 100644 --- a/pkg/apis/provisioning/v1alpha5/requirements.go +++ b/pkg/apis/provisioning/v1alpha5/requirements.go @@ -53,7 +53,7 @@ func NewLabelRequirements(labels map[string]string) Requirements { // NewPodRequirements constructs requirements from a pod func NewPodRequirements(pod *v1.Pod) Requirements { - requirements := []v1.NodeSelectorRequirement{} + var requirements []v1.NodeSelectorRequirement for key, value := range pod.Spec.NodeSelector { requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) } diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 4c3269b5021e..dcdbe492815f 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -876,10 +876,10 @@ var _ = Describe("Topology", func() { LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, MaxSkew: 1, }} - // force this pod onto zone-1 so that the cluster is aware zone-1 exists + // force this pod onto zone-1 provisioner.Spec.Requirements = v1alpha5.NewRequirements( v1.NodeSelectorRequirement{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}) - ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod()) + ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: labels}})) // now only allow scheduling pods on zone-2 and zone-3 provisioner.Spec.Requirements = v1alpha5.NewRequirements( @@ -888,9 +888,9 @@ var _ = Describe("Topology", func() { MakePods(10, test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: labels}, TopologySpreadConstraints: topology})..., ) - // max skew of 1, so test-zone-2/3 will have 1 nodes each and the rest of the pods will fail to schedule since - // test-zone-1 has zero pods in it. - ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 1)) + // max skew of 1, so test-zone-2/3 will have 2 nodes each and the rest of the pods will fail to schedule since + // test-zone-1 has 1 pods in it. + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 2, 2)) }) It("should only count running/scheduled pods with matching labels scheduled to nodes with a corresponding domain", func() { wrongNamespace := strings.ToLower(randomdata.SillyName()) @@ -1211,6 +1211,85 @@ var _ = Describe("Topology", func() { } Expect(nodeNames).To(HaveLen(1)) }) + It("should balance pods across capacity-types (node required affinity constrained)", func() { + // launch this on-demand pod in zone-1 + provisioner.Spec.Requirements = v1alpha5.NewRequirements( + v1.NodeSelectorRequirement{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}}) + pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, MakePods(1, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + NodeRequirements: []v1.NodeSelectorRequirement{ + { + Key: v1alpha5.LabelCapacityType, + Operator: v1.NodeSelectorOpIn, + Values: []string{"on-demand"}, + }, + }, + })...) + ExpectScheduled(ctx, env.Client, pod[0]) + + topology := []v1.TopologySpreadConstraint{{ + TopologyKey: v1alpha5.LabelCapacityType, + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + }} + + // limit our provisioner to only creating spot nodes + provisioner.Spec.Requirements = v1alpha5.NewRequirements( + v1.NodeSelectorRequirement{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{"spot"}}) + + // Try to run 5 pods, with a node selector restricted to test-zone-2, they should all schedule on the same + // spot node. This doesn't violate the max-skew of 1 as the node selector requirement here excludes the + // existing on-demand pod from counting within this topology. + ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, + MakePods(5, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + NodeRequirements: []v1.NodeSelectorRequirement{ + { + Key: v1.LabelTopologyZone, + Operator: v1.NodeSelectorOpIn, + Values: []string{"test-zone-2"}, + }, + }, + TopologySpreadConstraints: topology, + })..., + ) + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 5)) + }) + It("should balance pods across capacity-types (no constraints)", func() { + pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + NodeRequirements: []v1.NodeSelectorRequirement{ + { + Key: v1alpha5.LabelCapacityType, + Operator: v1.NodeSelectorOpIn, + Values: []string{"on-demand"}, + }, + }, + })) + ExpectScheduled(ctx, env.Client, pod[0]) + + topology := []v1.TopologySpreadConstraint{{ + TopologyKey: v1alpha5.LabelCapacityType, + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + }} + + // limit our provisioner to only creating spot nodes + provisioner.Spec.Requirements = v1alpha5.NewRequirements( + v1.NodeSelectorRequirement{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{"spot"}}) + + // since there is no node selector on this pod, the topology can see the single on-demand node that already + // exists and that limits us to scheduling 2 more spot pods before we would violate max-skew + ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, + MakePods(5, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + TopologySpreadConstraints: topology, + })..., + ) + ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 2)) + }) }) Context("Combined Hostname and Zonal Topology", func() { @@ -1353,7 +1432,7 @@ var _ = Describe("Topology", func() { // add varying numbers of pods, checking after each scheduling to ensure that our max required max skew // has not been violated for each constraint - for i := 1; i < 25; i++ { + for i := 1; i < 15; i++ { pods := MakePods(i, test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: labels}, TopologySpreadConstraints: topology}) ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pods...) ExpectMaxSkew(ctx, env.Client, "default", &topology[0]).To(BeNumerically("<=", 1)) @@ -1619,18 +1698,20 @@ var _ = Describe("Topology", func() { }) It("should respect self pod affinity for first empty topology domain only (hostname)", func() { affLabels := map[string]string{"security": "s2"} - pods := MakePods(10, test.PodOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: affLabels, - }, - PodRequirements: []v1.PodAffinityTerm{{ - LabelSelector: &metav1.LabelSelector{ - MatchLabels: affLabels, + createPods := func() []*v1.Pod { + return MakePods(10, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: affLabels, }, - TopologyKey: v1.LabelHostname, - }}, - }) - pods = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pods...) + PodRequirements: []v1.PodAffinityTerm{{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: affLabels, + }, + TopologyKey: v1.LabelHostname, + }}, + }) + } + pods := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, createPods()...) nodeNames := map[string]struct{}{} unscheduledCount := 0 scheduledCount := 0 @@ -1643,16 +1724,21 @@ var _ = Describe("Topology", func() { scheduledCount++ } } - // the node can only hold 5 pods, so we should get a single node with 5 pods and 5 unschedulable pods + // the node can only hold 5 pods, so we should get a single node with 5 pods and 5 unschedulable pods from that batch Expect(len(nodeNames)).To(Equal(1)) Expect(scheduledCount).To(BeNumerically("==", 5)) Expect(unscheduledCount).To(BeNumerically("==", 5)) + + // and pods in a different batch should not schedule as well even if the node is not ready yet + pods = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, createPods()...) + for _, p := range pods { + ExpectNotScheduled(ctx, env.Client, p) + } }) It("should respect self pod affinity for first empty topology domain only (hostname/constrained zones)", func() { affLabels := map[string]string{"security": "s2"} - - // put one pod in test-zone-1, this shouldn't affect the remainder of the test as the node selectors are - // disjoint between this pod and the other 10 pods + // put one pod in test-zone-1, this does affect pod affinity even though we have different node selectors. + // The node selector and required node affinity restrictions to topology counting only apply to topology spread. ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ ObjectMeta: metav1.ObjectMeta{ Labels: affLabels, @@ -1687,22 +1773,10 @@ var _ = Describe("Topology", func() { }}, }) pods = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pods...) - nodeNames := map[string]struct{}{} - unscheduledCount := 0 - scheduledCount := 0 for _, p := range pods { - p = ExpectPodExists(ctx, env.Client, p.Name, p.Namespace) - if p.Spec.NodeName == "" { - unscheduledCount++ - } else { - nodeNames[p.Spec.NodeName] = struct{}{} - scheduledCount++ - } + // none of this should schedule + ExpectNotScheduled(ctx, env.Client, p) } - // the node can only hold 5 pods, so we should get a single node with 5 pods and 5 unschedulable pods - Expect(len(nodeNames)).To(Equal(1)) - Expect(scheduledCount).To(BeNumerically("==", 5)) - Expect(unscheduledCount).To(BeNumerically("==", 5)) }) It("should respect self pod affinity (zone)", func() { affLabels := map[string]string{"security": "s2"} diff --git a/pkg/controllers/provisioning/scheduling/topology.go b/pkg/controllers/provisioning/scheduling/topology.go index ff3e709facef..223e52194ed7 100644 --- a/pkg/controllers/provisioning/scheduling/topology.go +++ b/pkg/controllers/provisioning/scheduling/topology.go @@ -57,23 +57,13 @@ func NewTopology(ctx context.Context, kubeClient client.Client, requirements *v1 topologies: map[uint64]*TopologyGroup{}, inverseTopologies: map[uint64]*TopologyGroup{}, } - var nodeList v1.NodeList - if err := kubeClient.List(ctx, &nodeList); err != nil { - return nil, fmt.Errorf("listing nodes, %w", err) - } - // generate the universe domains which is the union of the ones specified in the provisioner spec and the ones that - // exist - for topologyKey := range requirements.Keys() { - domains := utilsets.NewString() - for _, node := range nodeList.Items { - if domain, ok := node.Labels[topologyKey]; ok { - domains.Insert(domain) - } - } - domains = domains.Union(requirements.Get(topologyKey).Values()) - t.domains[topologyKey] = domains + // Update the universe of valid domains per the provisioner spec. We can't pull all the domains from all of the nodes + // here as these are passed on to topology spreads which can be limited by node selector/required node affinities. + for topologyKey := range v1alpha5.ValidTopologyKeys { + t.domains[topologyKey] = requirements.Get(topologyKey).Values() } + errs := t.updateInverseAffinities(ctx) for i := range pods { errs = multierr.Append(errs, t.Update(ctx, pods[i])) @@ -125,7 +115,7 @@ func (t *Topology) Update(ctx context.Context, p *v1.Pod) error { func (t *Topology) Record(p *v1.Pod, requirements v1alpha5.Requirements) { // once we've committed to a domain, we record the usage in every topology that cares about it for _, tc := range t.topologies { - if tc.Matches(p.Namespace, p.Labels) { + if tc.CountsPod(p, requirements) { domains := requirements.Get(tc.Key) if tc.Type == TopologyTypePodAntiAffinity { // for anti-affinity topologies we need to block out all possible domains that the pod could land in @@ -153,17 +143,17 @@ func (t *Topology) Record(p *v1.Pod, requirements v1alpha5.Requirements) { // cannot be satisfied. func (t *Topology) AddRequirements(podRequirements, nodeRequirements v1alpha5.Requirements, p *v1.Pod) (v1alpha5.Requirements, error) { requirements := nodeRequirements - for _, topology := range t.getMatchingTopologies(p) { - domains := sets.NewComplementSet() + for _, topology := range t.getMatchingTopologies(p, nodeRequirements) { + podDomains := sets.NewComplementSet() if podRequirements.Has(topology.Key) { - domains = podRequirements.Get(topology.Key) + podDomains = podRequirements.Get(topology.Key) } nodeDomains := sets.NewComplementSet() if nodeRequirements.Has(topology.Key) { nodeDomains = nodeRequirements.Get(topology.Key) } - domains = topology.Get(p, domains, nodeDomains) + domains := topology.Get(p, podDomains, nodeDomains) if domains.Len() == 0 { return v1alpha5.Requirements{}, fmt.Errorf("unsatisfiable topology constraint for key %s", topology.Key) } @@ -222,7 +212,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *v1.Pod, d return err } - tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey]) + tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey]) hash := tg.Hash() if existing, ok := t.inverseTopologies[hash]; !ok { @@ -262,9 +252,22 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { return fmt.Errorf("getting node %s, %w", p.Spec.NodeName, err) } domain, ok := node.Labels[tg.Key] + // Kubelet sets the hostname label, but the node may not be ready yet so there is no label. We fall back and just + // treat the node name as the label. It probably is in most cases, but even if not we at least count the existence + // of the pods in some domain, even if not in the correct one. This is needed to handle the case of pods with + // self-affinity only fulfilling that affinity if all domains are empty. + if !ok && tg.Key == v1.LabelHostname { + domain = node.Name + ok = true + } if !ok { continue // Don't include pods if node doesn't contain domain https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/#conventions } + // nodes may or may not be considered for counting purposes for topology spread constraints depending on if they + // are selected by the pod's node selectors and required node affinities. If these are unset, the node always counts. + if !tg.nodeSelector.Matches(node) { + continue + } tg.Record(domain) } return nil @@ -273,7 +276,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error { func (t *Topology) newForTopologies(p *v1.Pod) []*TopologyGroup { var topologyGroups []*TopologyGroup for _, cs := range p.Spec.TopologySpreadConstraints { - topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, utilsets.NewString(p.Namespace), cs.LabelSelector, cs.MaxSkew, t.domains[cs.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, utilsets.NewString(p.Namespace), cs.LabelSelector, cs.MaxSkew, t.domains[cs.TopologyKey])) } return topologyGroups } @@ -310,7 +313,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *v1.Pod) ([]*Topology if err != nil { return nil, err } - topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey])) + topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey])) } } return topologyGroups, nil @@ -341,7 +344,7 @@ func (t *Topology) buildNamespaceList(ctx context.Context, namespace string, nam // getMatchingTopologies returns a sorted list of topologies that either control the scheduling of pod p, or for which // the topology selects pod p and the scheduling of p affects the count per topology domain -func (t *Topology) getMatchingTopologies(p *v1.Pod) []*TopologyGroup { +func (t *Topology) getMatchingTopologies(p *v1.Pod, requirements v1alpha5.Requirements) []*TopologyGroup { var matchingTopologies []*TopologyGroup for _, tc := range t.topologies { if tc.IsOwnedBy(p.UID) { @@ -349,7 +352,7 @@ func (t *Topology) getMatchingTopologies(p *v1.Pod) []*TopologyGroup { } } for _, tc := range t.inverseTopologies { - if tc.Matches(p.Namespace, p.Labels) { + if tc.CountsPod(p, requirements) { matchingTopologies = append(matchingTopologies, tc) } } diff --git a/pkg/controllers/provisioning/scheduling/topologygroup.go b/pkg/controllers/provisioning/scheduling/topologygroup.go index 56df42883701..ea750ff14f35 100644 --- a/pkg/controllers/provisioning/scheduling/topologygroup.go +++ b/pkg/controllers/provisioning/scheduling/topologygroup.go @@ -17,6 +17,8 @@ package scheduling import ( "math" + "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" + "github.com/mitchellh/hashstructure/v2" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,29 +53,36 @@ func (t TopologyType) String() string { // TopologyGroup is used to track pod counts that match a selector by the topology domain (e.g. SELECT COUNT(*) FROM pods GROUP BY(topology_ke type TopologyGroup struct { // Hashed Fields - Key string - Type TopologyType - maxSkew int32 - namespaces utilsets.String - selector *metav1.LabelSelector + Key string + Type TopologyType + maxSkew int32 + namespaces utilsets.String + selector *metav1.LabelSelector + nodeSelector *TopologyNodeFilter // Index owners map[types.UID]struct{} // Pods that have this topology as a scheduling rule domains map[string]int32 // TODO(ellistarn) explore replacing with a minheap } -func NewTopologyGroup(topologyType TopologyType, topologyKey string, namespaces utilsets.String, labelSelector *metav1.LabelSelector, maxSkew int32, domains utilsets.String) *TopologyGroup { +func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod, namespaces utilsets.String, labelSelector *metav1.LabelSelector, maxSkew int32, domains utilsets.String) *TopologyGroup { domainCounts := map[string]int32{} for domain := range domains { domainCounts[domain] = 0 } + // the nil *TopologyNodeFilter always passes which is what we need for affinity/anti-affinity + var nodeSelector *TopologyNodeFilter + if topologyType == TopologyTypeSpread { + nodeSelector = NewTopologyNodeFilter(pod) + } return &TopologyGroup{ - Type: topologyType, - Key: topologyKey, - namespaces: namespaces, - selector: labelSelector, - maxSkew: maxSkew, - domains: domainCounts, - owners: map[types.UID]struct{}{}, + Type: topologyType, + Key: topologyKey, + namespaces: namespaces, + selector: labelSelector, + nodeSelector: nodeSelector, + maxSkew: maxSkew, + domains: domainCounts, + owners: map[types.UID]struct{}{}, } } @@ -96,10 +105,18 @@ func (t *TopologyGroup) Record(domains ...string) { } } -func (t *TopologyGroup) Matches(namespace string, podLabels labels.Set) bool { +// Selects returns true if the given pod is selected by this topology +func (t *TopologyGroup) Selects(pod *v1.Pod) bool { selector, err := metav1.LabelSelectorAsSelector(t.selector) runtime.Must(err) - return t.namespaces.Has(namespace) && selector.Matches(podLabels) + return t.namespaces.Has(pod.Namespace) && + selector.Matches(labels.Set(pod.Labels)) +} + +// CountsPod returns true if the pod would count for the topology, given that it schedule to a node with the provided +// requirements +func (t *TopologyGroup) CountsPod(pod *v1.Pod, requirements v1alpha5.Requirements) bool { + return t.Selects(pod) && t.nodeSelector.MatchesRequirements(requirements) } // Register ensures that the topology is aware of the given domain names. @@ -126,17 +143,19 @@ func (t *TopologyGroup) IsOwnedBy(key types.UID) bool { // with self anti-affinity, we track that as a single topology with 100 owners instead of 100x topologies. func (t *TopologyGroup) Hash() uint64 { hash, err := hashstructure.Hash(struct { - TopologyKey string - Type TopologyType - Namespaces utilsets.String - LabelSelector *metav1.LabelSelector - MaxSkew int32 + TopologyKey string + Type TopologyType + Namespaces utilsets.String + LabelSelector *metav1.LabelSelector + MaxSkew int32 + NodeSelectorHash uint64 }{ - TopologyKey: t.Key, - Type: t.Type, - Namespaces: t.namespaces, - LabelSelector: t.selector, - MaxSkew: t.maxSkew, + TopologyKey: t.Key, + Type: t.Type, + Namespaces: t.namespaces, + LabelSelector: t.selector, + MaxSkew: t.maxSkew, + NodeSelectorHash: t.nodeSelector.Hash(), }, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) runtime.Must(err) return hash @@ -145,7 +164,7 @@ func (t *TopologyGroup) Hash() uint64 { func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains sets.Set) sets.Set { // min count is calculated across all domains min := t.domainMinCount(podDomains) - selfSelecting := t.Matches(pod.Namespace, pod.Labels) + selfSelecting := t.Selects(pod) minDomain := "" minCount := int32(math.MaxInt32) @@ -197,7 +216,7 @@ func (t *TopologyGroup) nextDomainAffinity(pod *v1.Pod, domains sets.Set) sets.S } } // If pod is self selecting and no pod has been scheduled yet, pick a domain at random to bootstrap scheduling - if options.Len() == 0 && t.Matches(pod.Namespace, pod.Labels) { + if options.Len() == 0 && t.Selects(pod) { for domain := range t.domains { if domains.Has(domain) { options.Insert(domain) diff --git a/pkg/controllers/provisioning/scheduling/topologynodefilter.go b/pkg/controllers/provisioning/scheduling/topologynodefilter.go new file mode 100644 index 000000000000..9383dd869b7c --- /dev/null +++ b/pkg/controllers/provisioning/scheduling/topologynodefilter.go @@ -0,0 +1,128 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "github.com/mitchellh/hashstructure/v2" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/runtime" + + "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" +) + +// TopologyNodeFilter is used to determine if a given actual node or scheduling node matches the pod's node selectors +// and required node affinity terms. This is used with topology spread constraints to determine if the node should be +// included for topology counting purposes. This is only used with topology spread constraints as affinities/anti-affinities +// always count across all nodes. A nil or zero-value TopologyNodeFilter behaves well and the filter returns true for +// all nodes. +type TopologyNodeFilter struct { + nodeSelector map[string]string + nodeSelectorTerms []v1.NodeSelectorTerm +} + +func NewTopologyNodeFilter(p *v1.Pod) *TopologyNodeFilter { + selector := &TopologyNodeFilter{ + nodeSelector: p.Spec.NodeSelector, + } + if p.Spec.Affinity != nil && p.Spec.Affinity.NodeAffinity != nil && p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { + selector.nodeSelectorTerms = p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + } + return selector +} + +// Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology +func (t *TopologyNodeFilter) Matches(node *v1.Node) bool { + if t == nil { + return true + } + // if a node selector term is provided, it must match the node + for k, v := range t.nodeSelector { + if node.Labels[k] != v { + return false + } + } + if len(t.nodeSelectorTerms) == 0 { + return true + } + + for _, term := range t.nodeSelectorTerms { + requirement := v1alpha5.NewRequirements(term.MatchExpressions...) + if t.matchesRequirements(requirement, node) { + return true + } + } + + // at this point we have node selector terms, but didn't match all of the requirements for any individual term + return false +} + +// MatchesRequirements returns true if the TopologyNodeFilter doesn't prohibit a node with the requirements from +// participating in the topology. This method allows checking the requirements from a scheduling.Node to see if the +// node we will soon create participates in this topology. +func (t *TopologyNodeFilter) MatchesRequirements(nodeRequirements v1alpha5.Requirements) bool { + if t == nil { + return true + } + for k, v := range t.nodeSelector { + if !nodeRequirements.Get(k).Has(v) { + return false + } + } + if len(t.nodeSelectorTerms) == 0 { + return true + } + + for _, term := range t.nodeSelectorTerms { + requirement := v1alpha5.NewRequirements(term.MatchExpressions...) + matchesAllReqs := true + for key := range requirement.Keys() { + if requirement.Get(key).Intersection(nodeRequirements.Get(key)).Len() == 0 { + matchesAllReqs = false + break + } + } + // these terms are OR'd together, so if we match one full set of requirements, the filter passes + if matchesAllReqs { + return true + } + } + return true +} + +func (t *TopologyNodeFilter) Hash() uint64 { + if t == nil || (len(t.nodeSelector) == 0 && len(t.nodeSelectorTerms) == 0) { + return 0 + } + hash, err := hashstructure.Hash(struct { + NodeSelector map[string]string + NodeSelectorTerms []v1.NodeSelectorTerm + }{ + NodeSelector: t.nodeSelector, + NodeSelectorTerms: t.nodeSelectorTerms, + }, hashstructure.FormatV2, + &hashstructure.HashOptions{SlicesAsSets: true}) + runtime.Must(err) + return hash + +} + +func (t *TopologyNodeFilter) matchesRequirements(requirement v1alpha5.Requirements, node *v1.Node) bool { + for key := range requirement.Keys() { + if !requirement.Get(key).Has(node.Labels[key]) { + return false + } + } + return true +}