-
Notifications
You must be signed in to change notification settings - Fork 980
/
topology.go
152 lines (139 loc) · 6.09 KB
/
topology.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduling
import (
"context"
"fmt"
"math"
"strings"
"github.com/Pallinder/go-randomdata"
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/utils/apiobject"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/mitchellh/hashstructure/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type Topology struct {
kubeClient client.Client
}
// Inject injects topology rules into pods using supported NodeSelectors
func (t *Topology) Inject(ctx context.Context, constraints *v1alpha5.Constraints, pods []*v1.Pod) error {
// Group pods by equivalent topology spread constraints
topologyGroups := t.getTopologyGroups(pods)
// Compute spread
for _, topologyGroup := range topologyGroups {
if err := t.computeCurrentTopology(ctx, constraints, topologyGroup); err != nil {
return fmt.Errorf("computing topology, %w", err)
}
for _, pod := range topologyGroup.Pods {
domain := topologyGroup.NextDomain(constraints.Requirements.With(v1alpha5.PodRequirements(pod)).Requirement(topologyGroup.Constraint.TopologyKey))
pod.Spec.NodeSelector = functional.UnionStringMaps(pod.Spec.NodeSelector, map[string]string{topologyGroup.Constraint.TopologyKey: domain})
}
}
return nil
}
// getTopologyGroups separates pods with equivalent topology rules
func (t *Topology) getTopologyGroups(pods []*v1.Pod) []*TopologyGroup {
topologyGroupMap := map[uint64]*TopologyGroup{}
for _, pod := range pods {
for _, constraint := range pod.Spec.TopologySpreadConstraints {
// Add to existing group if exists, using a hash for efficient collision detection
key := topologyGroupKey(pod.Namespace, constraint)
if topologyGroup, ok := topologyGroupMap[key]; ok {
topologyGroup.Pods = append(topologyGroup.Pods, pod)
} else {
topologyGroupMap[key] = NewTopologyGroup(pod, constraint)
}
}
}
topologyGroups := []*TopologyGroup{}
for _, topologyGroup := range topologyGroupMap {
topologyGroups = append(topologyGroups, topologyGroup)
}
return topologyGroups
}
func (t *Topology) computeCurrentTopology(ctx context.Context, constraints *v1alpha5.Constraints, topologyGroup *TopologyGroup) error {
switch topologyGroup.Constraint.TopologyKey {
case v1.LabelHostname:
return t.computeHostnameTopology(topologyGroup, constraints)
case v1.LabelTopologyZone:
return t.computeZonalTopology(ctx, constraints.Requirements, topologyGroup)
default:
return nil
}
}
// computeHostnameTopology for the topology group. Hostnames are guaranteed to
// be unique when new nodes join the cluster. Nodes that join the cluster do not
// contain any pods, so we can assume that the global minimum domain count for
// `hostname` is 0. Thus, we can always improve topology skew (computed against
// the global minimum) by adding pods to the cluster. We will generate
// len(pods)/MaxSkew number of domains, to ensure that skew is not violated for
// new instances.
func (t *Topology) computeHostnameTopology(topologyGroup *TopologyGroup, constraints *v1alpha5.Constraints) error {
domains := []string{}
for i := 0; i < int(math.Ceil(float64(len(topologyGroup.Pods))/float64(topologyGroup.Constraint.MaxSkew))); i++ {
domains = append(domains, strings.ToLower(randomdata.Alphanumeric(8)))
}
topologyGroup.Register(domains...)
// This is a bit of a hack that allows the constraints to recognize viable hostname topologies
constraints.Requirements = append(constraints.Requirements,
v1.NodeSelectorRequirement{Key: topologyGroup.Constraint.TopologyKey, Operator: v1.NodeSelectorOpIn, Values: domains})
return nil
}
// computeZonalTopology for the topology group. Zones include viable zones for
// the { cloudprovider, provisioner, pod }. If these zones change over time,
// topology skew calculations will only include the current viable zone
// selection. For example, if a cloud provider or provisioner changes the viable
// set of nodes, topology calculations will rebalance the new set of zones.
func (t *Topology) computeZonalTopology(ctx context.Context, requirements v1alpha5.Requirements, topologyGroup *TopologyGroup) error {
topologyGroup.Register(requirements.Zones().UnsortedList()...)
if err := t.countMatchingPods(ctx, topologyGroup); err != nil {
return fmt.Errorf("getting matching pods, %w", err)
}
return nil
}
func (t *Topology) countMatchingPods(ctx context.Context, topologyGroup *TopologyGroup) error {
podList := &v1.PodList{}
if err := t.kubeClient.List(ctx, podList,
client.InNamespace(topologyGroup.Pods[0].Namespace),
apiobject.MatchingLabelsSelector(topologyGroup.Constraint.LabelSelector),
); err != nil {
return fmt.Errorf("listing pods, %w", err)
}
for _, pod := range podList.Items {
if len(pod.Spec.NodeName) == 0 {
continue // Don't include pods that aren't scheduled
}
node := &v1.Node{}
if err := t.kubeClient.Get(ctx, types.NamespacedName{Name: pod.Spec.NodeName}, node); err != nil {
return fmt.Errorf("getting node %s, %w", pod.Spec.NodeName, err)
}
domain, ok := node.Labels[topologyGroup.Constraint.TopologyKey]
if !ok {
continue // Don't include pods if node doesn't contain domain https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/#conventions
}
topologyGroup.Increment(domain)
}
return nil
}
func topologyGroupKey(namespace string, constraint v1.TopologySpreadConstraint) uint64 {
hash, err := hashstructure.Hash(struct {
Namespace string
Constraint v1.TopologySpreadConstraint
}{namespace, constraint}, hashstructure.FormatV2, nil)
if err != nil {
panic(fmt.Errorf("unexpected failure hashing topology, %w", err))
}
return hash
}