Skip to content

Commit

Permalink
fix issue with not selecting from all viable domains during topology …
Browse files Browse the repository at this point in the history
…spread
  • Loading branch information
tzneal committed Apr 5, 2022
1 parent e341c90 commit f102009
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 58 deletions.
15 changes: 12 additions & 3 deletions pkg/controllers/provisioning/scheduling/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,26 @@ func NewNode(constraints *v1alpha5.Constraints, topology *Topology, daemonResour

func (n *Node) Add(pod *v1.Pod) error {
// Include topology requirements
requirements, err := n.topology.AddRequirements(v1alpha5.NewPodRequirements(pod), pod, n.Hostname)
results, err := n.topology.AddRequirements(v1alpha5.NewPodRequirements(pod), pod)
if err != nil {
return err
}
// Check node compatibility
if err = n.Constraints.Requirements.Compatible(requirements); err != nil {
if err = n.Constraints.Requirements.Compatible(results.requirements); err != nil {
return err
}
// Tighten requirements
requirements = n.Constraints.Requirements.Add(requirements.Requirements...)
requirements := n.Constraints.Requirements.Add(results.requirements.Requirements...)
requests := resources.Merge(n.requests, resources.RequestsForPods(pod))

// Collapse any topology spreads which have now been limited to the intersection of the domains that minimize
// skew and the domains that the provisioner can provision for. If we just picked a minimum domain from all of the
// minimum domains, it could potentially not be one that the provisioner could provision.
requirements, err = results.Collapse(requirements)
if err != nil {
return err
}

// Check instance type combinations
instanceTypes := cloudprovider.FilterInstanceTypes(n.InstanceTypeOptions, requirements, requests)
if len(instanceTypes) == 0 {
Expand Down
39 changes: 34 additions & 5 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,37 @@ var _ = Describe("Topology", func() {
// pod in zone-3 it can put a max of two per zone before it would violate max skew
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 2, 2))
})
It("should schedule to the non-minimum domain if its all that's available", func() {
topology := []v1.TopologySpreadConstraint{{
TopologyKey: v1.LabelTopologyZone,
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
MaxSkew: 5,
}}
// force this pod onto zone-1
provisioner.Spec.Requirements = v1alpha5.NewRequirements(
v1.NodeSelectorRequirement{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}})
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: labels}, TopologySpreadConstraints: topology}))
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1))

// force this pod onto zone-2
provisioner.Spec.Requirements = v1alpha5.NewRequirements(
v1.NodeSelectorRequirement{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2"}})
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: labels}, TopologySpreadConstraints: topology}))
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 1))

// now only allow scheduling pods on zone-3
provisioner.Spec.Requirements = v1alpha5.NewRequirements(
v1.NodeSelectorRequirement{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-3"}})
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
MakePods(10, test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: labels}, TopologySpreadConstraints: topology})...,
)

// max skew of 5, so test-zone-1/2 will have 1 pod each, test-zone-3 will have 6, and the rest will fail to schedule
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 1, 6))
})
It("should not violate max-skew when unsat = do not schedule", func() {
topology := []v1.TopologySpreadConstraint{{
TopologyKey: v1.LabelTopologyZone,
Expand Down Expand Up @@ -1002,7 +1033,9 @@ var _ = Describe("Topology", func() {
)
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(2, 2))
})
It("should not violate max-skew when unsat = do not schedule", func() {
It("should not violate max-skew when unsat = do not schedule (capacity type)", func() {
// this test can pass in a flaky manner if we don't restrict our min domain selection to valid choices
// per the provisioner spec
topology := []v1.TopologySpreadConstraint{{
TopologyKey: v1alpha5.LabelCapacityType,
WhenUnsatisfiable: v1.DoNotSchedule,
Expand Down Expand Up @@ -1323,10 +1356,6 @@ var _ = Describe("Topology", func() {
MaxSkew: 1,
}}

// need to limit the provisioner to only zone-1, zone-2 or else it will know that test-zone-3 has 0 pods and won't violate
// the max-skew
provisioner.Spec.Requirements = v1alpha5.NewRequirements(
v1.NodeSelectorRequirement{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2"}})
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
MakePods(6, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
Expand Down
66 changes: 40 additions & 26 deletions pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,53 @@ func (t *Topology) Record(p *v1.Pod, requirements v1alpha5.Requirements) {
}
}

type TopologyResult struct {
requirements v1alpha5.Requirements
collapse []string
}

func (r *TopologyResult) Collapse(requirements v1alpha5.Requirements) (v1alpha5.Requirements, error) {
var collapsedRequirements []v1.NodeSelectorRequirement
for _, topologyKey := range r.collapse {
domain, ok := requirements.Get(topologyKey).Any()
if ok {
collapsedRequirements = append(collapsedRequirements, v1.NodeSelectorRequirement{
Key: topologyKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{domain},
})
}
}
if len(collapsedRequirements) > 0 {
if err := requirements.Compatible(v1alpha5.NewRequirements(collapsedRequirements...)); err != nil {
return v1alpha5.Requirements{}, err
}
requirements = requirements.Add(collapsedRequirements...)
}
return requirements, nil
}

// AddRequirements tightens the input requirements by adding additional requirements that are being enforced by topology spreads
// affinities, anti-affinities or inverse anti-affinities. The nodeHostname is the hostname that we are currently considering
// placing the pod on. It returns these newly tightened requirements, or an error in the case of a set of requirements that
// cannot be satisfied.
func (t *Topology) AddRequirements(requirements v1alpha5.Requirements, p *v1.Pod, nodeHostname string) (v1alpha5.Requirements, error) {
func (t *Topology) AddRequirements(requirements v1alpha5.Requirements, p *v1.Pod) (TopologyResult, error) {
tr := TopologyResult{requirements: requirements}
for _, topology := range t.getMatchingTopologies(p) {
domains := sets.NewComplementSet()
if requirements.Has(topology.Key) {
domains = requirements.Get(topology.Key)
if tr.requirements.Has(topology.Key) {
domains = tr.requirements.Get(topology.Key)
}
domains = topology.Next(p, nodeHostname, domains)
domains = topology.Next(p, domains)
if domains.Len() == 0 {
return v1alpha5.Requirements{}, fmt.Errorf("unsatisfiable topology constraint for key %s", topology.Key)
return tr, fmt.Errorf("unsatisfiable topology constraint for key %s", topology.Key)
}
tr.requirements = tr.requirements.Add(v1.NodeSelectorRequirement{Key: topology.Key, Operator: v1.NodeSelectorOpIn, Values: domains.Values().List()})
if topology.Type == TopologyTypeSpread {
tr.collapse = append(tr.collapse, topology.Key)
}
requirements = requirements.Add(v1.NodeSelectorRequirement{Key: topology.Key, Operator: v1.NodeSelectorOpIn, Values: domains.Values().List()})
}
return requirements, nil
return tr, nil
}

// Register is used to register a domain as available across topologies for the given topology key.
Expand Down Expand Up @@ -209,7 +239,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *v1.Pod, d
return err
}

tg := NewTopologyGroup(pod, TopologyTypePodAntiAffinity, term.TopologyKey, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey])
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey])

hash := tg.Hash()
if existing, ok := t.inverseTopologies[hash]; !ok {
Expand Down Expand Up @@ -260,15 +290,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
func (t *Topology) newForTopologies(p *v1.Pod) []*TopologyGroup {
var topologyGroups []*TopologyGroup
for _, cs := range p.Spec.TopologySpreadConstraints {
topologyGroups = append(topologyGroups, NewTopologyGroup(
p,
TopologyTypeSpread,
cs.TopologyKey,
utilsets.NewString(p.Namespace),
cs.LabelSelector,
cs.MaxSkew,
t.domains[cs.TopologyKey]),
)
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, utilsets.NewString(p.Namespace), cs.LabelSelector, cs.MaxSkew, t.domains[cs.TopologyKey]))
}
return topologyGroups
}
Expand Down Expand Up @@ -305,15 +327,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *v1.Pod) ([]*Topology
if err != nil {
return nil, err
}
topologyGroups = append(topologyGroups, NewTopologyGroup(
p,
topologyType,
term.TopologyKey,
namespaces,
term.LabelSelector,
math.MaxInt32,
t.domains[term.TopologyKey]),
)
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey]))
}
}
return topologyGroups, nil
Expand Down
73 changes: 49 additions & 24 deletions pkg/controllers/provisioning/scheduling/topologygroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type TopologyGroup struct {
domains map[string]int32 // TODO(ellistarn) explore replacing with a minheap
}

func NewTopologyGroup(pod *v1.Pod, topologyType TopologyType, topologyKey string, namespaces utilsets.String, labelSelector *metav1.LabelSelector, maxSkew int32, domains utilsets.String) *TopologyGroup {
func NewTopologyGroup(topologyType TopologyType, topologyKey string, namespaces utilsets.String, labelSelector *metav1.LabelSelector, maxSkew int32, domains utilsets.String) *TopologyGroup {
domainCounts := map[string]int32{}
for domain := range domains {
domainCounts[domain] = 0
Expand All @@ -77,21 +77,9 @@ func NewTopologyGroup(pod *v1.Pod, topologyType TopologyType, topologyKey string
}
}

func (t *TopologyGroup) Next(pod *v1.Pod, nodeHostname string, domains sets.Set) sets.Set {
func (t *TopologyGroup) Next(pod *v1.Pod, domains sets.Set) sets.Set {
switch t.Type {
case TopologyTypeSpread:
// We are only considering putting the pod on a single node. Intersecting the list of viable domains with the node's
// hostname solves a problem where we have multiple nodes that the pod could land on because of a min-domain tie, but
// some of them are not viable due to arch/os. Since we look at each node one at a time and this returns a min
// domain at random, we potentially fail to schedule as it may return a node hostname that doesn't correspond to the
// node that we are considering. We can't add a node selector upstream to limit our available domains to just the node
// under consideration as this as would break pod self-affinity since it needs to consider the universe of valid
// domains to ensure that it only satisfies pod self-affinity the first time. With the additional node selector, the
// universe of domains would always be a single hostname and it would repeatedly allow pods to provide self affinity
// across different nodes
if t.Key == v1.LabelHostname {
domains = domains.Intersection(sets.NewSet(nodeHostname))
}
return t.nextDomainTopologySpread(domains)
case TopologyTypePodAffinity:
return t.nextDomainAffinity(pod, domains)
Expand Down Expand Up @@ -155,21 +143,58 @@ func (t *TopologyGroup) Hash() uint64 {
}

func (t *TopologyGroup) nextDomainTopologySpread(domains sets.Set) sets.Set {
// Pick the domain that minimizes skew.
min := int32(math.MaxInt32)
minDomain := ""
// Return all domains that don't violate max-skew. This is necessary as the provisioner may or may not be
// able to schedule to the domain that has the minimum skew, but can schedule to any that don't violate the
// max-skew.
min, max := t.domainMinMaxCounts(domains)

// Need to count skew for hostname
options := sets.NewSet()
currentSkew := max - min
for domain := range t.domains {
if domains.Has(domain) && t.domains[domain] < min {
min = t.domains[domain]
minDomain = domain
if domains.Has(domain) {
count := t.domains[domain]
// calculate what the skew will be if we choose this domain
nextSkew := currentSkew
decreasing := false
if count == min {
// adding to the min domain, so we're decreasing skew
nextSkew = currentSkew - 1
decreasing = true
} else if count == max {
// adding to the max domain, so we're increasing skew
nextSkew = currentSkew + 1
}

// if choosing it leaves us under the max-skew, or over it but still decreasing, it's a valid choice
if nextSkew <= t.maxSkew || decreasing {
options.Insert(domain)
}
}
}
if t.Key == v1.LabelHostname && min+1 > t.maxSkew {
return sets.NewSet()
return options
}

func (t *TopologyGroup) domainMinMaxCounts(domains sets.Set) (min int32, max int32) {
max = int32(0)
min = int32(math.MaxInt32)

// determine our current skew
for domain, count := range t.domains {
if domains.Has(domain) {
if count > max {
max = count
}
if count < min {
min = count
}
}
}

// hostname based topologies always have a min pod count of zero since we can create one
if t.Key == v1.LabelHostname {
min = 0
}
return sets.NewSet(minDomain)
return
}

func (t *TopologyGroup) nextDomainAffinity(pod *v1.Pod, domains sets.Set) sets.Set {
Expand Down
11 changes: 11 additions & 0 deletions pkg/utils/sets/sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ func (s Set) Values() sets.String {
return s.values
}

func (s Set) Any() (string, bool) {
if s.complement || len(s.values) == 0 {
return "", false
}
for k := range s.values {
return k, true
}
// unreachable
return "", false
}

// ComplementValues returns the values of the complement set.
// If the set is not a complement set, it will panic
func (s Set) ComplementValues() sets.String {
Expand Down

0 comments on commit f102009

Please sign in to comment.