Skip to content

Commit

Permalink
fix topology domain counts for topology spread
Browse files Browse the repository at this point in the history
We only count nodes that match the pod node required affinities.
  • Loading branch information
tzneal committed Apr 7, 2022
1 parent d830dd5 commit ce87b8c
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 88 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/provisioning/v1alpha5/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewLabelRequirements(labels map[string]string) Requirements {

// NewPodRequirements constructs requirements from a pod
func NewPodRequirements(pod *v1.Pod) Requirements {
requirements := []v1.NodeSelectorRequirement{}
var requirements []v1.NodeSelectorRequirement
for key, value := range pod.Spec.NodeSelector {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
Expand Down
144 changes: 109 additions & 35 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,10 +876,10 @@ var _ = Describe("Topology", func() {
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
MaxSkew: 1,
}}
// force this pod onto zone-1 so that the cluster is aware zone-1 exists
// force this pod onto zone-1
provisioner.Spec.Requirements = v1alpha5.NewRequirements(
v1.NodeSelectorRequirement{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}})
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: labels}}))

// now only allow scheduling pods on zone-2 and zone-3
provisioner.Spec.Requirements = v1alpha5.NewRequirements(
Expand All @@ -888,9 +888,9 @@ var _ = Describe("Topology", func() {
MakePods(10, test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: labels}, TopologySpreadConstraints: topology})...,
)

// max skew of 1, so test-zone-2/3 will have 1 nodes each and the rest of the pods will fail to schedule since
// test-zone-1 has zero pods in it.
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 1))
// max skew of 1, so test-zone-2/3 will have 2 nodes each and the rest of the pods will fail to schedule since
// test-zone-1 has 1 pods in it.
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 2, 2))
})
It("should only count running/scheduled pods with matching labels scheduled to nodes with a corresponding domain", func() {
wrongNamespace := strings.ToLower(randomdata.SillyName())
Expand Down Expand Up @@ -1211,6 +1211,85 @@ var _ = Describe("Topology", func() {
}
Expect(nodeNames).To(HaveLen(1))
})
It("should balance pods across capacity-types (node required affinity constrained)", func() {
// launch this on-demand pod in zone-1
provisioner.Spec.Requirements = v1alpha5.NewRequirements(
v1.NodeSelectorRequirement{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1"}})
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, MakePods(1, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
NodeRequirements: []v1.NodeSelectorRequirement{
{
Key: v1alpha5.LabelCapacityType,
Operator: v1.NodeSelectorOpIn,
Values: []string{"on-demand"},
},
},
})...)
ExpectScheduled(ctx, env.Client, pod[0])

topology := []v1.TopologySpreadConstraint{{
TopologyKey: v1alpha5.LabelCapacityType,
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
MaxSkew: 1,
}}

// limit our provisioner to only creating spot nodes
provisioner.Spec.Requirements = v1alpha5.NewRequirements(
v1.NodeSelectorRequirement{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{"spot"}})

// Try to run 5 pods, with a node selector restricted to test-zone-2, they should all schedule on the same
// spot node. This doesn't violate the max-skew of 1 as the node selector requirement here excludes the
// existing on-demand pod from counting within this topology.
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
MakePods(5, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
NodeRequirements: []v1.NodeSelectorRequirement{
{
Key: v1.LabelTopologyZone,
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-zone-2"},
},
},
TopologySpreadConstraints: topology,
})...,
)
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 5))
})
It("should balance pods across capacity-types (no constraints)", func() {
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
NodeRequirements: []v1.NodeSelectorRequirement{
{
Key: v1alpha5.LabelCapacityType,
Operator: v1.NodeSelectorOpIn,
Values: []string{"on-demand"},
},
},
}))
ExpectScheduled(ctx, env.Client, pod[0])

topology := []v1.TopologySpreadConstraint{{
TopologyKey: v1alpha5.LabelCapacityType,
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
MaxSkew: 1,
}}

// limit our provisioner to only creating spot nodes
provisioner.Spec.Requirements = v1alpha5.NewRequirements(
v1.NodeSelectorRequirement{Key: v1alpha5.LabelCapacityType, Operator: v1.NodeSelectorOpIn, Values: []string{"spot"}})

// since there is no node selector on this pod, the topology can see the single on-demand node that already
// exists and that limits us to scheduling 2 more spot pods before we would violate max-skew
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
MakePods(5, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
TopologySpreadConstraints: topology,
})...,
)
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 2))
})
})

Context("Combined Hostname and Zonal Topology", func() {
Expand Down Expand Up @@ -1353,7 +1432,7 @@ var _ = Describe("Topology", func() {

// add varying numbers of pods, checking after each scheduling to ensure that our max required max skew
// has not been violated for each constraint
for i := 1; i < 25; i++ {
for i := 1; i < 15; i++ {
pods := MakePods(i, test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: labels}, TopologySpreadConstraints: topology})
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pods...)
ExpectMaxSkew(ctx, env.Client, "default", &topology[0]).To(BeNumerically("<=", 1))
Expand Down Expand Up @@ -1619,18 +1698,20 @@ var _ = Describe("Topology", func() {
})
It("should respect self pod affinity for first empty topology domain only (hostname)", func() {
affLabels := map[string]string{"security": "s2"}
pods := MakePods(10, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: affLabels,
},
PodRequirements: []v1.PodAffinityTerm{{
LabelSelector: &metav1.LabelSelector{
MatchLabels: affLabels,
createPods := func() []*v1.Pod {
return MakePods(10, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: affLabels,
},
TopologyKey: v1.LabelHostname,
}},
})
pods = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pods...)
PodRequirements: []v1.PodAffinityTerm{{
LabelSelector: &metav1.LabelSelector{
MatchLabels: affLabels,
},
TopologyKey: v1.LabelHostname,
}},
})
}
pods := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, createPods()...)
nodeNames := map[string]struct{}{}
unscheduledCount := 0
scheduledCount := 0
Expand All @@ -1643,16 +1724,21 @@ var _ = Describe("Topology", func() {
scheduledCount++
}
}
// the node can only hold 5 pods, so we should get a single node with 5 pods and 5 unschedulable pods
// the node can only hold 5 pods, so we should get a single node with 5 pods and 5 unschedulable pods from that batch
Expect(len(nodeNames)).To(Equal(1))
Expect(scheduledCount).To(BeNumerically("==", 5))
Expect(unscheduledCount).To(BeNumerically("==", 5))

// and pods in a different batch should not schedule as well even if the node is not ready yet
pods = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, createPods()...)
for _, p := range pods {
ExpectNotScheduled(ctx, env.Client, p)
}
})
It("should respect self pod affinity for first empty topology domain only (hostname/constrained zones)", func() {
affLabels := map[string]string{"security": "s2"}

// put one pod in test-zone-1, this shouldn't affect the remainder of the test as the node selectors are
// disjoint between this pod and the other 10 pods
// put one pod in test-zone-1, this does affect pod affinity even though we have different node selectors.
// The node selector and required node affinity restrictions to topology counting only apply to topology spread.
ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: affLabels,
Expand Down Expand Up @@ -1687,22 +1773,10 @@ var _ = Describe("Topology", func() {
}},
})
pods = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pods...)
nodeNames := map[string]struct{}{}
unscheduledCount := 0
scheduledCount := 0
for _, p := range pods {
p = ExpectPodExists(ctx, env.Client, p.Name, p.Namespace)
if p.Spec.NodeName == "" {
unscheduledCount++
} else {
nodeNames[p.Spec.NodeName] = struct{}{}
scheduledCount++
}
// none of this should schedule
ExpectNotScheduled(ctx, env.Client, p)
}
// the node can only hold 5 pods, so we should get a single node with 5 pods and 5 unschedulable pods
Expect(len(nodeNames)).To(Equal(1))
Expect(scheduledCount).To(BeNumerically("==", 5))
Expect(unscheduledCount).To(BeNumerically("==", 5))
})
It("should respect self pod affinity (zone)", func() {
affLabels := map[string]string{"security": "s2"}
Expand Down
53 changes: 28 additions & 25 deletions pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,13 @@ func NewTopology(ctx context.Context, kubeClient client.Client, requirements *v1
topologies: map[uint64]*TopologyGroup{},
inverseTopologies: map[uint64]*TopologyGroup{},
}
var nodeList v1.NodeList
if err := kubeClient.List(ctx, &nodeList); err != nil {
return nil, fmt.Errorf("listing nodes, %w", err)
}

// generate the universe domains which is the union of the ones specified in the provisioner spec and the ones that
// exist
for topologyKey := range requirements.Keys() {
domains := utilsets.NewString()
for _, node := range nodeList.Items {
if domain, ok := node.Labels[topologyKey]; ok {
domains.Insert(domain)
}
}
domains = domains.Union(requirements.Get(topologyKey).Values())
t.domains[topologyKey] = domains
// Update the universe of valid domains per the provisioner spec. We can't pull all the domains from all of the nodes
// here as these are passed on to topology spreads which can be limited by node selector/required node affinities.
for topologyKey := range v1alpha5.ValidTopologyKeys {
t.domains[topologyKey] = requirements.Get(topologyKey).Values()
}

errs := t.updateInverseAffinities(ctx)
for i := range pods {
errs = multierr.Append(errs, t.Update(ctx, pods[i]))
Expand Down Expand Up @@ -125,7 +115,7 @@ func (t *Topology) Update(ctx context.Context, p *v1.Pod) error {
func (t *Topology) Record(p *v1.Pod, requirements v1alpha5.Requirements) {
// once we've committed to a domain, we record the usage in every topology that cares about it
for _, tc := range t.topologies {
if tc.Matches(p.Namespace, p.Labels) {
if tc.CountsPod(p, requirements) {
domains := requirements.Get(tc.Key)
if tc.Type == TopologyTypePodAntiAffinity {
// for anti-affinity topologies we need to block out all possible domains that the pod could land in
Expand Down Expand Up @@ -153,17 +143,17 @@ func (t *Topology) Record(p *v1.Pod, requirements v1alpha5.Requirements) {
// cannot be satisfied.
func (t *Topology) AddRequirements(podRequirements, nodeRequirements v1alpha5.Requirements, p *v1.Pod) (v1alpha5.Requirements, error) {
requirements := nodeRequirements
for _, topology := range t.getMatchingTopologies(p) {
domains := sets.NewComplementSet()
for _, topology := range t.getMatchingTopologies(p, nodeRequirements) {
podDomains := sets.NewComplementSet()
if podRequirements.Has(topology.Key) {
domains = podRequirements.Get(topology.Key)
podDomains = podRequirements.Get(topology.Key)
}
nodeDomains := sets.NewComplementSet()
if nodeRequirements.Has(topology.Key) {
nodeDomains = nodeRequirements.Get(topology.Key)
}

domains = topology.Get(p, domains, nodeDomains)
domains := topology.Get(p, podDomains, nodeDomains)
if domains.Len() == 0 {
return v1alpha5.Requirements{}, fmt.Errorf("unsatisfiable topology constraint for key %s", topology.Key)
}
Expand Down Expand Up @@ -222,7 +212,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *v1.Pod, d
return err
}

tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey])
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey])

hash := tg.Hash()
if existing, ok := t.inverseTopologies[hash]; !ok {
Expand Down Expand Up @@ -262,9 +252,22 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
return fmt.Errorf("getting node %s, %w", p.Spec.NodeName, err)
}
domain, ok := node.Labels[tg.Key]
// Kubelet sets the hostname label, but the node may not be ready yet so there is no label. We fall back and just
// treat the node name as the label. It probably is in most cases, but even if not we at least count the existence
// of the pods in some domain, even if not in the correct one. This is needed to handle the case of pods with
// self-affinity only fulfilling that affinity if all domains are empty.
if !ok && tg.Key == v1.LabelHostname {
domain = node.Name
ok = true
}
if !ok {
continue // Don't include pods if node doesn't contain domain https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/#conventions
}
// nodes may or may not be considered for counting purposes for topology spread constraints depending on if they
// are selected by the pod's node selectors and required node affinities. If these are unset, the node always counts.
if !tg.nodeSelector.Matches(node) {
continue
}
tg.Record(domain)
}
return nil
Expand All @@ -273,7 +276,7 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
func (t *Topology) newForTopologies(p *v1.Pod) []*TopologyGroup {
var topologyGroups []*TopologyGroup
for _, cs := range p.Spec.TopologySpreadConstraints {
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, utilsets.NewString(p.Namespace), cs.LabelSelector, cs.MaxSkew, t.domains[cs.TopologyKey]))
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, utilsets.NewString(p.Namespace), cs.LabelSelector, cs.MaxSkew, t.domains[cs.TopologyKey]))
}
return topologyGroups
}
Expand Down Expand Up @@ -310,7 +313,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *v1.Pod) ([]*Topology
if err != nil {
return nil, err
}
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey]))
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, t.domains[term.TopologyKey]))
}
}
return topologyGroups, nil
Expand Down Expand Up @@ -341,15 +344,15 @@ func (t *Topology) buildNamespaceList(ctx context.Context, namespace string, nam

// getMatchingTopologies returns a sorted list of topologies that either control the scheduling of pod p, or for which
// the topology selects pod p and the scheduling of p affects the count per topology domain
func (t *Topology) getMatchingTopologies(p *v1.Pod) []*TopologyGroup {
func (t *Topology) getMatchingTopologies(p *v1.Pod, requirements v1alpha5.Requirements) []*TopologyGroup {
var matchingTopologies []*TopologyGroup
for _, tc := range t.topologies {
if tc.IsOwnedBy(p.UID) {
matchingTopologies = append(matchingTopologies, tc)
}
}
for _, tc := range t.inverseTopologies {
if tc.Matches(p.Namespace, p.Labels) {
if tc.CountsPod(p, requirements) {
matchingTopologies = append(matchingTopologies, tc)
}
}
Expand Down
Loading

0 comments on commit ce87b8c

Please sign in to comment.