Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support new topologySpread scheduling constraints #852

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 2 additions & 38 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -234,8 +233,8 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
nodepoolutils.OrderByWeight(nodePools)

instanceTypes := map[string][]*cloudprovider.InstanceType{}
domains := map[string]sets.Set[string]{}
for _, np := range nodePools {
// Get instance type options
its, err := p.cloudProvider.GetInstanceTypes(ctx, np)
if err != nil {
log.FromContext(ctx).WithValues("NodePool", klog.KRef("", np.Name)).Error(err, "skipping, unable to resolve instance types")
Expand All @@ -245,49 +244,14 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
log.FromContext(ctx).WithValues("NodePool", klog.KRef("", np.Name)).Info("skipping, no resolved instance types found")
continue
}

instanceTypes[np.Name] = its

// Construct Topology Domains
for _, it := range its {
// We need to intersect the instance type requirements with the current nodePool requirements. This
// ensures that something like zones from an instance type don't expand the universe of valid domains.
requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(np.Spec.Template.Spec.Requirements...)
requirements.Add(scheduling.NewLabelRequirements(np.Spec.Template.Labels).Values()...)
requirements.Add(it.Requirements.Values()...)

for key, requirement := range requirements {
// This code used to execute a Union between domains[key] and requirement.Values().
// The downside of this is that Union is immutable and takes a copy of the set it is executed upon.
// This resulted in a lot of memory pressure on the heap and poor performance
// https://github.com/aws/karpenter/issues/3565
if domains[key] == nil {
domains[key] = sets.New(requirement.Values()...)
} else {
domains[key].Insert(requirement.Values()...)
}
}
}

requirements := scheduling.NewNodeSelectorRequirementsWithMinValues(np.Spec.Template.Spec.Requirements...)
requirements.Add(scheduling.NewLabelRequirements(np.Spec.Template.Labels).Values()...)
for key, requirement := range requirements {
if requirement.Operator() == corev1.NodeSelectorOpIn {
// The following is a performance optimisation, for the explanation see the comment above
if domains[key] == nil {
domains[key] = sets.New(requirement.Values()...)
} else {
domains[key].Insert(requirement.Values()...)
}
}
}
}

// inject topology constraints
pods = p.injectVolumeTopologyRequirements(ctx, pods)

// Calculate cluster topology
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domains, pods)
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, stateNodes, nodePools, instanceTypes, pods)
if err != nil {
return nil, fmt.Errorf("tracking topology counts, %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/scheduling/existingnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, taints []v1.Taint,

func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podData *PodData) error {
// Check Taints
if err := scheduling.Taints(n.cachedTaints).Tolerates(pod); err != nil {
if err := scheduling.Taints(n.cachedTaints).ToleratesPod(pod); err != nil {
return err
}
// determine the volumes that will be mounted if the pod schedules
Expand Down Expand Up @@ -100,7 +100,7 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v
nodeRequirements.Add(podData.Requirements.Values()...)

// Check Topology Requirements
topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeRequirements, pod)
topologyRequirements, err := n.topology.AddRequirements(pod, n.cachedTaints, podData.StrictRequirements, nodeRequirements)
if err != nil {
return err
}
Expand All @@ -113,7 +113,7 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v
n.Pods = append(n.Pods, pod)
n.requests = requests
n.requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.topology.Record(pod, n.cachedTaints, nodeRequirements)
n.HostPortUsage().Add(pod, hostPorts)
n.VolumeUsage().Add(pod, volumes)
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem

func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error {
// Check Taints
if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil {
if err := scheduling.Taints(n.Spec.Taints).ToleratesPod(pod); err != nil {
return err
}

Expand All @@ -84,7 +84,7 @@ func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error {
nodeClaimRequirements.Add(podData.Requirements.Values()...)

// Check Topology Requirements
topologyRequirements, err := n.topology.AddRequirements(podData.StrictRequirements, nodeClaimRequirements, pod, scheduling.AllowUndefinedWellKnownLabels)
topologyRequirements, err := n.topology.AddRequirements(pod, n.NodeClaimTemplate.Spec.Taints, podData.StrictRequirements, nodeClaimRequirements, scheduling.AllowUndefinedWellKnownLabels)
if err != nil {
return err
}
Expand All @@ -108,7 +108,7 @@ func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error {
n.InstanceTypeOptions = remaining
n.Spec.Resources.Requests = requests
n.Requirements = nodeClaimRequirements
n.topology.Record(pod, nodeClaimRequirements, scheduling.AllowUndefinedWellKnownLabels)
n.topology.Record(pod, n.NodeClaim.Spec.Taints, nodeClaimRequirements, scheduling.AllowUndefinedWellKnownLabels)
n.hostPortUsage.Add(pod, hostPorts)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (s *Scheduler) calculateExistingNodeClaims(stateNodes []*state.StateNode, d
taints := node.Taints()
var daemons []*corev1.Pod
for _, p := range daemonSetPods {
if err := scheduling.Taints(taints).Tolerates(p); err != nil {
if err := scheduling.Taints(taints).ToleratesPod(p); err != nil {
continue
}
if err := scheduling.NewLabelRequirements(node.Labels()).Compatible(scheduling.NewPodRequirements(p)); err != nil {
Expand Down Expand Up @@ -388,7 +388,7 @@ func isDaemonPodCompatible(nodeClaimTemplate *NodeClaimTemplate, pod *corev1.Pod
preferences := &Preferences{}
// Add a toleration for PreferNoSchedule since a daemon pod shouldn't respect the preference
_ = preferences.toleratePreferNoScheduleTaints(pod)
if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).Tolerates(pod); err != nil {
if err := scheduling.Taints(nodeClaimTemplate.Spec.Taints).ToleratesPod(pod); err != nil {
return false
}
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -156,8 +155,9 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
pods := makeDiversePods(podCount)
clock := &clock.RealClock{}
cluster = state.NewCluster(clock, client, cloudProvider)

topology, err := scheduling.NewTopology(ctx, client, cluster, getDomains(instanceTypes), pods)
topology, err := scheduling.NewTopology(ctx, client, cluster, nil, []*v1.NodePool{nodePool}, map[string][]*cloudprovider.InstanceType{
nodePool.Name: instanceTypes,
}, pods)
if err != nil {
b.Fatalf("creating topology, %s", err)
}
Expand Down Expand Up @@ -221,24 +221,6 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
}
}

func getDomains(instanceTypes []*cloudprovider.InstanceType) map[string]sets.Set[string] {
domains := map[string]sets.Set[string]{}
for _, it := range instanceTypes {
for key, requirement := range it.Requirements {
// This code used to execute a Union between domains[key] and requirement.Values().
// The downside of this is that Union is immutable and takes a copy of the set it is executed upon.
// This resulted in a lot of memory pressure on the heap and poor performance
// https://github.com/aws/karpenter/issues/3565
if domains[key] == nil {
domains[key] = sets.New(requirement.Values()...)
} else {
domains[key].Insert(requirement.Values()...)
}
}
}
return domains
}

func makeDiversePods(count int) []*corev1.Pod {
var pods []*corev1.Pod
numTypes := 5
Expand Down
Loading