Skip to content

Commit

Permalink
Add affinity and anti-affinity support (#1626)
Browse files Browse the repository at this point in the history
 implement pod affinity & anti-affinity

- implement pod affinity/anti-affinity
- rework topology spread support

Fixes #942 and #985 

Co-authored-by: Ellis Tarn <[email protected]>
  • Loading branch information
tzneal and ellistarn authored Apr 13, 2022
1 parent 4cc9681 commit befc00c
Show file tree
Hide file tree
Showing 24 changed files with 1,349 additions and 544 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/provisioning/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var (
ArchitectureArm64 = "arm64"
OperatingSystemLinux = "linux"

// ValidTopologyKeys are the topology keys that Karpenter allows for topology spread and pod affinity/anti-affinity
ValidTopologyKeys = stringsets.NewString(v1.LabelHostname, v1.LabelTopologyZone, LabelCapacityType)

// Karpenter specific domains and labels
KarpenterLabelDomain = "karpenter.sh"
LabelCapacityType = KarpenterLabelDomain + "/capacity-type"
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/provisioning/v1alpha5/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func (s *ProvisionerSpec) validateRequirements() (errs *apis.FieldError) {
if e := IsRestrictedLabel(requirement.Key); e != nil {
err = multierr.Append(err, e)
}
// We don't support a 'NotExists' operator, but this turns into an empty set of values by re-building node selector requirements
if requirement.Operator == v1.NodeSelectorOpIn && len(requirement.Values) == 0 {
err = multierr.Append(err, fmt.Errorf("key %s is unsatisfiable due to unsupported operator or no values being provided", requirement.Key))
}
}
err = multierr.Append(err, s.Requirements.Validate())
if err != nil {
Expand Down
47 changes: 38 additions & 9 deletions pkg/apis/provisioning/v1alpha5/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewRequirements(requirements ...v1.NodeSelectorRequirement) Requirements {

// NewLabelRequirements constructs requirements from labels
func NewLabelRequirements(labels map[string]string) Requirements {
requirements := []v1.NodeSelectorRequirement{}
var requirements []v1.NodeSelectorRequirement
for key, value := range labels {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
Expand All @@ -53,7 +53,7 @@ func NewLabelRequirements(labels map[string]string) Requirements {

// NewPodRequirements constructs requirements from a pod
func NewPodRequirements(pod *v1.Pod) Requirements {
requirements := []v1.NodeSelectorRequirement{}
var requirements []v1.NodeSelectorRequirement
for key, value := range pod.Spec.NodeSelector {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
Expand All @@ -77,10 +77,10 @@ func NewPodRequirements(pod *v1.Pod) Requirements {
// Add function returns a new Requirements object with new requirements inserted.
func (r Requirements) Add(requirements ...v1.NodeSelectorRequirement) Requirements {
// Deep copy to avoid mutating existing requirements
r = *r.DeepCopy()
cp := *r.DeepCopy()
// This fail-safe measurement can be removed later when we implement test webhook.
if r.requirements == nil {
r.requirements = map[string]sets.Set{}
if cp.requirements == nil {
cp.requirements = map[string]sets.Set{}
}
for _, requirement := range requirements {
if normalized, ok := NormalizedLabels[requirement.Key]; ok {
Expand All @@ -89,7 +89,7 @@ func (r Requirements) Add(requirements ...v1.NodeSelectorRequirement) Requiremen
if IgnoredLabels.Has(requirement.Key) {
continue
}
r.Requirements = append(r.Requirements, requirement)
cp.Requirements = append(cp.Requirements, requirement)
var values sets.Set
switch requirement.Operator {
case v1.NodeSelectorOpIn:
Expand All @@ -101,12 +101,41 @@ func (r Requirements) Add(requirements ...v1.NodeSelectorRequirement) Requiremen
case v1.NodeSelectorOpDoesNotExist:
values = sets.NewSet()
}
if existing, ok := r.requirements[requirement.Key]; ok {
if existing, ok := cp.requirements[requirement.Key]; ok {
values = values.Intersection(existing)
}
r.requirements[requirement.Key] = values
cp.requirements[requirement.Key] = values
}
cp.rebuild()
return cp
}

// rebuild re-generates the node selector requirements based on the set based versions. This improves scheduling speed
// as it causes us to not carry around redundant requirements (e.g. 20x copies of instance-type in [it0, it1, ..., it400])
func (r *Requirements) rebuild() {
existing := r.Requirements
r.Requirements = nil
for key, values := range r.requirements {
req := v1.NodeSelectorRequirement{
Key: key,
}
if values.IsComplement() {
req.Operator = v1.NodeSelectorOpNotIn
req.Values = values.ComplementValues().UnsortedList()
} else {
req.Operator = v1.NodeSelectorOpIn
req.Values = values.Values().UnsortedList()
}
r.Requirements = append(r.Requirements, req)
}
// re-add any requirements that the set based versions don't handle so we can properly validate
for _, req := range existing {
switch req.Operator {
case v1.NodeSelectorOpIn, v1.NodeSelectorOpNotIn, v1.NodeSelectorOpExists, v1.NodeSelectorOpDoesNotExist:
default:
r.Requirements = append(r.Requirements, req)
}
}
return r
}

// Keys returns unique set of the label keys from the requirements
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Compatible(it InstanceType, requirements v1alpha5.Requirements) bool {
if !requirements.Get(v1.LabelArchStable).Has(it.Architecture()) {
return false
}
if !requirements.Get(v1.LabelOSStable).HasAny(it.OperatingSystems().List()...) {
if !requirements.Get(v1.LabelOSStable).HasAny(it.OperatingSystems()) {
return false
}
// acceptable if we have any offering that is valid
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/metrics/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (c *Controller) record(ctx context.Context, node *v1.Node) error {
allocatableGaugeVec: allocatable,
} {
if err := c.set(resourceList, node, gaugeVec); err != nil {
logging.FromContext(ctx).Errorf("Failed to generate gauge: %w", err)
logging.FromContext(ctx).Errorf("Failed to generate gauge: %s", err)
}
}
return nil
Expand Down
16 changes: 9 additions & 7 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube
running, stop := context.WithCancel(ctx)
p := &Provisioner{
Provisioner: provisioner,
batcher: NewBatcher(running),
Stop: stop,
batcher: NewBatcher(running),
cloudProvider: cloudProvider,
kubeClient: kubeClient,
coreV1Client: coreV1Client,
Expand Down Expand Up @@ -83,9 +83,8 @@ func (p *Provisioner) provision(ctx context.Context) error {
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
items, window := p.batcher.Wait()
defer p.batcher.Flush()
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window)
// Filter pods
pods := []*v1.Pod{}
var pods []*v1.Pod
for _, item := range items {
provisionable, err := p.isProvisionable(ctx, item.(*v1.Pod))
if err != nil {
Expand All @@ -95,20 +94,23 @@ func (p *Provisioner) provision(ctx context.Context) error {
pods = append(pods, item.(*v1.Pod))
}
}
if len(pods) == 0 {
return nil
}
logging.FromContext(ctx).Infof("Batched %d pod(s) in %s", len(pods), window)

// Get instance type options
instanceTypes, err := p.cloudProvider.GetInstanceTypes(ctx, p.Spec.Provider)
if err != nil {
return fmt.Errorf("getting instance types, %w", err)
}

// Separate pods by scheduling constraints
nodes, err := p.scheduler.Solve(ctx, p.Provisioner, instanceTypes, pods)
nodes, err := p.scheduler.Solve(ctx, &p.Provisioner.Spec.Constraints, instanceTypes, pods)
if err != nil {
return fmt.Errorf("solving scheduling constraints, %w", err)
}
if err != nil {
return err
}

// Launch capacity and bind pods
workqueue.ParallelizeUntil(ctx, len(nodes), len(nodes), func(i int) {
if err := p.launch(ctx, nodes[i]); err != nil {
Expand Down
42 changes: 34 additions & 8 deletions pkg/controllers/provisioning/scheduling/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package scheduling
import (
"fmt"
"strings"
"sync/atomic"

v1 "k8s.io/api/core/v1"

Expand All @@ -28,36 +29,61 @@ import (
// Node is a set of constraints, compatible pods, and possible instance types that could fulfill these constraints. This
// will be turned into one or more actual node instances within the cluster after bin packing.
type Node struct {
Hostname string
Constraints *v1alpha5.Constraints
InstanceTypeOptions []cloudprovider.InstanceType
Pods []*v1.Pod

topology *Topology
requests v1.ResourceList
}

func NewNode(constraints *v1alpha5.Constraints, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node {
return &Node{
var nodeID int64

func NewNode(constraints *v1alpha5.Constraints, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node {
n := &Node{
Hostname: fmt.Sprintf("hostname-placeholder-%04d", atomic.AddInt64(&nodeID, 1)),
Constraints: constraints.DeepCopy(),
InstanceTypeOptions: instanceTypes,
topology: topology,
requests: daemonResources,
}

n.Constraints.Requirements = n.Constraints.Requirements.Add(v1.NodeSelectorRequirement{
Key: v1.LabelHostname,
Operator: v1.NodeSelectorOpIn,
Values: []string{n.Hostname},
})
return n
}

func (n *Node) Add(pod *v1.Pod) error {
podRequirements := v1alpha5.NewPodRequirements(pod)
// Check initial compatibility
if err := n.Constraints.Requirements.Compatible(podRequirements); err != nil {
return err
}
nodeRequirements := n.Constraints.Requirements.Add(podRequirements.Requirements...)

if len(n.Pods) != 0 {
// TODO: remove this check for n.Pods once we properly support hostname topology spread
if err := n.Constraints.Requirements.Compatible(podRequirements); err != nil {
return err
}
// Include topology requirements
requirements, err := n.topology.AddRequirements(podRequirements, nodeRequirements, pod)
if err != nil {
return err
}
// Check node compatibility
if err = n.Constraints.Requirements.Compatible(requirements); err != nil {
return err
}
requirements := n.Constraints.Requirements.Add(podRequirements.Requirements...)
// Tighten requirements
requirements = n.Constraints.Requirements.Add(requirements.Requirements...)
requests := resources.Merge(n.requests, resources.RequestsForPods(pod))

// Check instance type combinations
instanceTypes := cloudprovider.FilterInstanceTypes(n.InstanceTypeOptions, requirements, requests)
if len(instanceTypes) == 0 {
return fmt.Errorf("no instance type satisfied resources %s and requirements %s", resources.String(resources.RequestsForPods(pod)), n.Constraints.Requirements)
}
// Update node
n.Pods = append(n.Pods, pod)
n.InstanceTypeOptions = instanceTypes
n.requests = requests
Expand Down
78 changes: 0 additions & 78 deletions pkg/controllers/provisioning/scheduling/nodeset.go

This file was deleted.

Loading

0 comments on commit befc00c

Please sign in to comment.