Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
tzneal committed Apr 8, 2022
1 parent 61b7641 commit dcf64d1
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 131 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/provisioning/v1alpha5/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewRequirements(requirements ...v1.NodeSelectorRequirement) Requirements {

// NewLabelRequirements constructs requirements from labels
func NewLabelRequirements(labels map[string]string) Requirements {
requirements := []v1.NodeSelectorRequirement{}
var requirements []v1.NodeSelectorRequirement
for key, value := range labels {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (t *Topology) Update(ctx context.Context, p *v1.Pod) error {
func (t *Topology) Record(p *v1.Pod, requirements v1alpha5.Requirements) {
// once we've committed to a domain, we record the usage in every topology that cares about it
for _, tc := range t.topologies {
if tc.CountsPod(p, requirements) {
if tc.Counts(p, requirements) {
domains := requirements.Get(tc.Key)
if tc.Type == TopologyTypePodAntiAffinity {
// for anti-affinity topologies we need to block out all possible domains that the pod could land in
Expand Down Expand Up @@ -265,7 +265,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
}
// nodes may or may not be considered for counting purposes for topology spread constraints depending on if they
// are selected by the pod's node selectors and required node affinities. If these are unset, the node always counts.
if !tg.nodeSelector.Matches(node) {
if !tg.nodeFilter.Matches(node) {
continue
}
tg.Record(domain)
Expand Down Expand Up @@ -352,7 +352,7 @@ func (t *Topology) getMatchingTopologies(p *v1.Pod, requirements v1alpha5.Requir
}
}
for _, tc := range t.inverseTopologies {
if tc.CountsPod(p, requirements) {
if tc.Counts(p, requirements) {
matchingTopologies = append(matchingTopologies, tc)
}
}
Expand Down
82 changes: 41 additions & 41 deletions pkg/controllers/provisioning/scheduling/topologygroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ func (t TopologyType) String() string {
// TopologyGroup is used to track pod counts that match a selector by the topology domain (e.g. SELECT COUNT(*) FROM pods GROUP BY(topology_ke
type TopologyGroup struct {
// Hashed Fields
Key string
Type TopologyType
maxSkew int32
namespaces utilsets.String
selector *metav1.LabelSelector
nodeSelector *TopologyNodeFilter
Key string
Type TopologyType
maxSkew int32
namespaces utilsets.String
selector *metav1.LabelSelector
nodeFilter TopologyNodeFilter
// Index
owners map[types.UID]struct{} // Pods that have this topology as a scheduling rule
domains map[string]int32 // TODO(ellistarn) explore replacing with a minheap
Expand All @@ -70,19 +70,19 @@ func NewTopologyGroup(topologyType TopologyType, topologyKey string, pod *v1.Pod
domainCounts[domain] = 0
}
// the nil *TopologyNodeFilter always passes which is what we need for affinity/anti-affinity
var nodeSelector *TopologyNodeFilter
var nodeSelector TopologyNodeFilter
if topologyType == TopologyTypeSpread {
nodeSelector = NewTopologyNodeFilter(pod)
nodeSelector = MakeTopologyNodeFilter(pod)
}
return &TopologyGroup{
Type: topologyType,
Key: topologyKey,
namespaces: namespaces,
selector: labelSelector,
nodeSelector: nodeSelector,
maxSkew: maxSkew,
domains: domainCounts,
owners: map[types.UID]struct{}{},
Type: topologyType,
Key: topologyKey,
namespaces: namespaces,
selector: labelSelector,
nodeFilter: nodeSelector,
maxSkew: maxSkew,
domains: domainCounts,
owners: map[types.UID]struct{}{},
}
}

Expand All @@ -105,18 +105,10 @@ func (t *TopologyGroup) Record(domains ...string) {
}
}

// Selects returns true if the given pod is selected by this topology
func (t *TopologyGroup) Selects(pod *v1.Pod) bool {
selector, err := metav1.LabelSelectorAsSelector(t.selector)
runtime.Must(err)
return t.namespaces.Has(pod.Namespace) &&
selector.Matches(labels.Set(pod.Labels))
}

// CountsPod returns true if the pod would count for the topology, given that it schedule to a node with the provided
// Counts returns true if the pod would count for the topology, given that it schedule to a node with the provided
// requirements
func (t *TopologyGroup) CountsPod(pod *v1.Pod, requirements v1alpha5.Requirements) bool {
return t.Selects(pod) && t.nodeSelector.MatchesRequirements(requirements)
func (t *TopologyGroup) Counts(pod *v1.Pod, requirements v1alpha5.Requirements) bool {
return t.selects(pod) && t.nodeFilter.MatchesRequirements(requirements)
}

// Register ensures that the topology is aware of the given domain names.
Expand All @@ -143,19 +135,19 @@ func (t *TopologyGroup) IsOwnedBy(key types.UID) bool {
// with self anti-affinity, we track that as a single topology with 100 owners instead of 100x topologies.
func (t *TopologyGroup) Hash() uint64 {
hash, err := hashstructure.Hash(struct {
TopologyKey string
Type TopologyType
Namespaces utilsets.String
LabelSelector *metav1.LabelSelector
MaxSkew int32
NodeSelectorHash uint64
TopologyKey string
Type TopologyType
Namespaces utilsets.String
LabelSelector *metav1.LabelSelector
MaxSkew int32
NodeFilter TopologyNodeFilter
}{
TopologyKey: t.Key,
Type: t.Type,
Namespaces: t.namespaces,
LabelSelector: t.selector,
MaxSkew: t.maxSkew,
NodeSelectorHash: t.nodeSelector.Hash(),
TopologyKey: t.Key,
Type: t.Type,
Namespaces: t.namespaces,
LabelSelector: t.selector,
MaxSkew: t.maxSkew,
NodeFilter: t.nodeFilter,
}, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
runtime.Must(err)
return hash
Expand All @@ -164,7 +156,7 @@ func (t *TopologyGroup) Hash() uint64 {
func (t *TopologyGroup) nextDomainTopologySpread(pod *v1.Pod, podDomains, nodeDomains sets.Set) sets.Set {
// min count is calculated across all domains
min := t.domainMinCount(podDomains)
selfSelecting := t.Selects(pod)
selfSelecting := t.selects(pod)

minDomain := ""
minCount := int32(math.MaxInt32)
Expand Down Expand Up @@ -216,7 +208,7 @@ func (t *TopologyGroup) nextDomainAffinity(pod *v1.Pod, domains sets.Set) sets.S
}
}
// If pod is self selecting and no pod has been scheduled yet, pick a domain at random to bootstrap scheduling
if options.Len() == 0 && t.Selects(pod) {
if options.Len() == 0 && t.selects(pod) {
for domain := range t.domains {
if domains.Has(domain) {
options.Insert(domain)
Expand All @@ -236,3 +228,11 @@ func (t *TopologyGroup) nextDomainAntiAffinity(domains sets.Set) sets.Set {
}
return options
}

// selects returns true if the given pod is selected by this topology
func (t *TopologyGroup) selects(pod *v1.Pod) bool {
selector, err := metav1.LabelSelectorAsSelector(t.selector)
runtime.Must(err)
return t.namespaces.Has(pod.Namespace) &&
selector.Matches(labels.Set(pod.Labels))
}
112 changes: 26 additions & 86 deletions pkg/controllers/provisioning/scheduling/topologynodefilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,114 +15,54 @@ limitations under the License.
package scheduling

import (
"github.com/mitchellh/hashstructure/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
v1 "k8s.io/api/core/v1"
)

// TopologyNodeFilter is used to determine if a given actual node or scheduling node matches the pod's node selectors
// and required node affinity terms. This is used with topology spread constraints to determine if the node should be
// included for topology counting purposes. This is only used with topology spread constraints as affinities/anti-affinities
// always count across all nodes. A nil or zero-value TopologyNodeFilter behaves well and the filter returns true for
// all nodes.
type TopologyNodeFilter struct {
nodeSelector map[string]string
nodeSelectorTerms []v1.NodeSelectorTerm
}
type TopologyNodeFilter []v1alpha5.Requirements

func NewTopologyNodeFilter(p *v1.Pod) *TopologyNodeFilter {
selector := &TopologyNodeFilter{
nodeSelector: p.Spec.NodeSelector,
}
if p.Spec.Affinity != nil && p.Spec.Affinity.NodeAffinity != nil && p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
selector.nodeSelectorTerms = p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
func MakeTopologyNodeFilter(p *v1.Pod) TopologyNodeFilter {
nodeSelectorRequirements := v1alpha5.NewLabelRequirements(p.Spec.NodeSelector)
// if we only have a label selector, that's the only requirement that must match
if p.Spec.Affinity == nil || p.Spec.Affinity.NodeAffinity == nil || p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
return TopologyNodeFilter{nodeSelectorRequirements}
}
return selector
}

// Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology
func (t *TopologyNodeFilter) Matches(node *v1.Node) bool {
if t == nil {
return true
}
// if a node selector term is provided, it must match the node
for k, v := range t.nodeSelector {
if node.Labels[k] != v {
return false
}
}
if len(t.nodeSelectorTerms) == 0 {
return true
// otherwise, we need to match the combination of label selector and any term of the required node affinities since
// those terms are OR'd together
var filter TopologyNodeFilter
for _, term := range p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms {
requirements := nodeSelectorRequirements.Add(v1alpha5.NewRequirements(term.MatchExpressions...).Requirements...)
filter = append(filter, requirements)
}

for _, term := range t.nodeSelectorTerms {
requirement := v1alpha5.NewRequirements(term.MatchExpressions...)
if t.matchesRequirements(requirement, node) {
return true
}
}
return filter
}

// at this point we have node selector terms, but didn't match all of the requirements for any individual term
return false
// Matches returns true if the TopologyNodeFilter doesn't prohibit node from the participating in the topology
func (t TopologyNodeFilter) Matches(node *v1.Node) bool {
nodeLabels := v1alpha5.NewLabelRequirements(node.Labels)
return t.MatchesRequirements(nodeLabels)
}

// MatchesRequirements returns true if the TopologyNodeFilter doesn't prohibit a node with the requirements from
// participating in the topology. This method allows checking the requirements from a scheduling.Node to see if the
// node we will soon create participates in this topology.
func (t *TopologyNodeFilter) MatchesRequirements(nodeRequirements v1alpha5.Requirements) bool {
if t == nil {
return true
}
for k, v := range t.nodeSelector {
if !nodeRequirements.Get(k).Has(v) {
return false
}
}
if len(t.nodeSelectorTerms) == 0 {
func (t TopologyNodeFilter) MatchesRequirements(requirements v1alpha5.Requirements) bool {
// no requirements, so it always matches
if len(t) == 0 {
return true
}

for _, term := range t.nodeSelectorTerms {
requirement := v1alpha5.NewRequirements(term.MatchExpressions...)
matchesAllReqs := true
for key := range requirement.Keys() {
if requirement.Get(key).Intersection(nodeRequirements.Get(key)).Len() == 0 {
matchesAllReqs = false
break
}
}
// these terms are OR'd together, so if we match one full set of requirements, the filter passes
if matchesAllReqs {
// these are an OR, so if any passes the filter passes
for _, req := range t {
if err := requirements.Compatible(req); err == nil {
return true
}
}
return true
}

func (t *TopologyNodeFilter) Hash() uint64 {
if t == nil || (len(t.nodeSelector) == 0 && len(t.nodeSelectorTerms) == 0) {
return 0
}
hash, err := hashstructure.Hash(struct {
NodeSelector map[string]string
NodeSelectorTerms []v1.NodeSelectorTerm
}{
NodeSelector: t.nodeSelector,
NodeSelectorTerms: t.nodeSelectorTerms,
}, hashstructure.FormatV2,
&hashstructure.HashOptions{SlicesAsSets: true})
runtime.Must(err)
return hash

}

func (t *TopologyNodeFilter) matchesRequirements(requirement v1alpha5.Requirements, node *v1.Node) bool {
for key := range requirement.Keys() {
if !requirement.Get(key).Has(node.Labels[key]) {
return false
}
}
return true
return false
}

0 comments on commit dcf64d1

Please sign in to comment.