Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add affinity and anti-affinity support #1626

Merged
merged 43 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1325162
implement pod affinity & anti-affinity
tzneal Mar 29, 2022
c449571
PR comments
tzneal Apr 1, 2022
d8f7a29
refactor
tzneal Apr 1, 2022
af97d62
PR comments
tzneal Apr 2, 2022
c9bb064
Collapsed NodeSet and Scheduler and removed some unnecessary checks
ellistarn Apr 3, 2022
6843cc4
Moved topology helpers to where they're used
ellistarn Apr 3, 2022
aeeb27b
rename Update -> Initialize
tzneal Apr 3, 2022
2f66d6f
fix benchmark after refactor
tzneal Apr 3, 2022
4bbf0c9
add some pod/node distribution stats
tzneal Apr 3, 2022
812e154
PR comments
tzneal Apr 3, 2022
48f4790
much faster if we don't use the constructor and provide an initial ca…
tzneal Apr 3, 2022
6af55e4
PR Comments: Fixed a bug with namespaces, logging touchups, naming to…
ellistarn Apr 3, 2022
469eb18
Rely on requirements instead of the full constraints
ellistarn Apr 3, 2022
15f73e0
Pair programming TODOs
ellistarn Apr 3, 2022
dedcd29
fix bug in namespace selector
tzneal Apr 3, 2022
63cfff2
implement TODOs
tzneal Apr 3, 2022
2a73baf
introduce scheduling queue
tzneal Apr 4, 2022
bb576ac
remove dead code
tzneal Apr 4, 2022
c3bb33a
PR comments
tzneal Apr 4, 2022
1b2c3b2
Simplified queue and moved public methods to the top
ellistarn Apr 4, 2022
e03123c
rework NextDomainMinimizeSkew
tzneal Apr 4, 2022
4ab0192
clarify why this is tricky
tzneal Apr 4, 2022
c24f44f
reorder preference relaxation
tzneal Apr 4, 2022
cd97cf0
Pair programming dedup TopologyGroup initialization
ellistarn Apr 4, 2022
6641166
fix scheduling bugs from refactoring
tzneal Apr 4, 2022
e7b93c2
cleanup
tzneal Apr 4, 2022
262ee8d
register hostnames upon node creation
tzneal Apr 4, 2022
424cc5a
update comments
tzneal Apr 4, 2022
3bd4c06
fix filtering of topology domains by node selector and required node …
tzneal Apr 5, 2022
fe7c767
Pair programming to fix combined topology and nodeaffinity
ellistarn Apr 5, 2022
fa44369
fix tests
tzneal Apr 5, 2022
6c51f8c
test fixes and cleanup
tzneal Apr 5, 2022
1a31769
more tests & cleanup
tzneal Apr 5, 2022
580aa2c
discover all domains across the cluster
tzneal Apr 5, 2022
9353a54
fix issue with not selecting from all viable domains during topology …
tzneal Apr 5, 2022
76bbd5a
update comments and fix a bug related to anti-affinities
tzneal Apr 6, 2022
b7af308
fix bug we've had for a long time w/topology spread
tzneal Apr 6, 2022
3e5578d
fix topology spread when max-skew is currently violated
tzneal Apr 6, 2022
1a33e18
fix topology domain counts for topology spread
tzneal Apr 6, 2022
a2226a4
check for provisioner existence prior to relaxing pods
tzneal Apr 7, 2022
c449938
speed up scheduling
tzneal Apr 7, 2022
89198d5
refactor
tzneal Apr 7, 2022
08ac9ec
fix typo
tzneal Apr 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/apis/provisioning/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var (
ArchitectureArm64 = "arm64"
OperatingSystemLinux = "linux"

// ValidTopologyKeys are the topology keys that Karpenter allows for topology spread and pod affinity/anti-affinity
ValidTopologyKeys = stringsets.NewString(v1.LabelHostname, v1.LabelTopologyZone, LabelCapacityType)

// Karpenter specific domains and labels
KarpenterLabelDomain = "karpenter.sh"
LabelCapacityType = KarpenterLabelDomain + "/capacity-type"
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/provisioning/v1alpha5/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func (s *ProvisionerSpec) validateRequirements() (errs *apis.FieldError) {
if e := IsRestrictedLabel(requirement.Key); e != nil {
err = multierr.Append(err, e)
}
// We don't support a 'NotExists' operator, but this turns into an empty set of values by re-building node selector requirements
if requirement.Operator == v1.NodeSelectorOpIn && len(requirement.Values) == 0 {
err = multierr.Append(err, fmt.Errorf("key %s is unsatisfiable due to unsupported operator or no values being provided", requirement.Key))
}
}
err = multierr.Append(err, s.Requirements.Validate())
if err != nil {
Expand Down
47 changes: 38 additions & 9 deletions pkg/apis/provisioning/v1alpha5/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewRequirements(requirements ...v1.NodeSelectorRequirement) Requirements {

// NewLabelRequirements constructs requirements from labels
func NewLabelRequirements(labels map[string]string) Requirements {
requirements := []v1.NodeSelectorRequirement{}
var requirements []v1.NodeSelectorRequirement
for key, value := range labels {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
Expand All @@ -53,7 +53,7 @@ func NewLabelRequirements(labels map[string]string) Requirements {

// NewPodRequirements constructs requirements from a pod
func NewPodRequirements(pod *v1.Pod) Requirements {
requirements := []v1.NodeSelectorRequirement{}
var requirements []v1.NodeSelectorRequirement
for key, value := range pod.Spec.NodeSelector {
requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}})
}
Expand All @@ -77,10 +77,10 @@ func NewPodRequirements(pod *v1.Pod) Requirements {
// Add function returns a new Requirements object with new requirements inserted.
func (r Requirements) Add(requirements ...v1.NodeSelectorRequirement) Requirements {
// Deep copy to avoid mutating existing requirements
r = *r.DeepCopy()
cp := *r.DeepCopy()
// This fail-safe measurement can be removed later when we implement test webhook.
if r.requirements == nil {
r.requirements = map[string]sets.Set{}
if cp.requirements == nil {
cp.requirements = map[string]sets.Set{}
}
for _, requirement := range requirements {
if normalized, ok := NormalizedLabels[requirement.Key]; ok {
Expand All @@ -89,7 +89,7 @@ func (r Requirements) Add(requirements ...v1.NodeSelectorRequirement) Requiremen
if IgnoredLabels.Has(requirement.Key) {
continue
}
r.Requirements = append(r.Requirements, requirement)
cp.Requirements = append(cp.Requirements, requirement)
var values sets.Set
switch requirement.Operator {
case v1.NodeSelectorOpIn:
Expand All @@ -101,12 +101,41 @@ func (r Requirements) Add(requirements ...v1.NodeSelectorRequirement) Requiremen
case v1.NodeSelectorOpDoesNotExist:
values = sets.NewSet()
}
if existing, ok := r.requirements[requirement.Key]; ok {
if existing, ok := cp.requirements[requirement.Key]; ok {
values = values.Intersection(existing)
}
r.requirements[requirement.Key] = values
cp.requirements[requirement.Key] = values
}
cp.rebuild()
return cp
}

// rebuild re-generates the node selector requirements based on the set based versions. This improves scheduling speed
// as it causes us to not carry around redundant requirements (e.g. 20x copies of instance-type in [it0, it1, ..., it400])
func (r *Requirements) rebuild() {
tzneal marked this conversation as resolved.
Show resolved Hide resolved
existing := r.Requirements
r.Requirements = nil
for key, values := range r.requirements {
req := v1.NodeSelectorRequirement{
Key: key,
}
if values.IsComplement() {
req.Operator = v1.NodeSelectorOpNotIn
req.Values = values.ComplementValues().UnsortedList()
} else {
req.Operator = v1.NodeSelectorOpIn
req.Values = values.Values().UnsortedList()
}
r.Requirements = append(r.Requirements, req)
}
// re-add any requirements that the set based versions don't handle so we can properly validate
for _, req := range existing {
switch req.Operator {
case v1.NodeSelectorOpIn, v1.NodeSelectorOpNotIn, v1.NodeSelectorOpExists, v1.NodeSelectorOpDoesNotExist:
default:
r.Requirements = append(r.Requirements, req)
}
}
return r
}

// Keys returns unique set of the label keys from the requirements
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Compatible(it InstanceType, requirements v1alpha5.Requirements) bool {
if !requirements.Get(v1.LabelArchStable).Has(it.Architecture()) {
return false
}
if !requirements.Get(v1.LabelOSStable).HasAny(it.OperatingSystems().List()...) {
if !requirements.Get(v1.LabelOSStable).HasAny(it.OperatingSystems()) {
return false
}
// acceptable if we have any offering that is valid
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/metrics/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (c *Controller) record(ctx context.Context, node *v1.Node) error {
allocatableGaugeVec: allocatable,
} {
if err := c.set(resourceList, node, gaugeVec); err != nil {
logging.FromContext(ctx).Errorf("Failed to generate gauge: %w", err)
logging.FromContext(ctx).Errorf("Failed to generate gauge: %s", err)
}
}
return nil
Expand Down
16 changes: 9 additions & 7 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func NewProvisioner(ctx context.Context, provisioner *v1alpha5.Provisioner, kube
running, stop := context.WithCancel(ctx)
p := &Provisioner{
Provisioner: provisioner,
batcher: NewBatcher(running),
Stop: stop,
batcher: NewBatcher(running),
cloudProvider: cloudProvider,
kubeClient: kubeClient,
coreV1Client: coreV1Client,
Expand Down Expand Up @@ -83,9 +83,8 @@ func (p *Provisioner) provision(ctx context.Context) error {
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
items, window := p.batcher.Wait()
defer p.batcher.Flush()
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window)
// Filter pods
pods := []*v1.Pod{}
var pods []*v1.Pod
for _, item := range items {
provisionable, err := p.isProvisionable(ctx, item.(*v1.Pod))
if err != nil {
Expand All @@ -95,20 +94,23 @@ func (p *Provisioner) provision(ctx context.Context) error {
pods = append(pods, item.(*v1.Pod))
}
}
if len(pods) == 0 {
return nil
}
logging.FromContext(ctx).Infof("Batched %d pod(s) in %s", len(pods), window)

// Get instance type options
instanceTypes, err := p.cloudProvider.GetInstanceTypes(ctx, p.Spec.Provider)
if err != nil {
return fmt.Errorf("getting instance types, %w", err)
}

// Separate pods by scheduling constraints
nodes, err := p.scheduler.Solve(ctx, p.Provisioner, instanceTypes, pods)
nodes, err := p.scheduler.Solve(ctx, &p.Provisioner.Spec.Constraints, instanceTypes, pods)
if err != nil {
return fmt.Errorf("solving scheduling constraints, %w", err)
}
if err != nil {
return err
}

// Launch capacity and bind pods
workqueue.ParallelizeUntil(ctx, len(nodes), len(nodes), func(i int) {
if err := p.launch(ctx, nodes[i]); err != nil {
Expand Down
42 changes: 34 additions & 8 deletions pkg/controllers/provisioning/scheduling/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package scheduling
import (
"fmt"
"strings"
"sync/atomic"

v1 "k8s.io/api/core/v1"

Expand All @@ -28,36 +29,61 @@ import (
// Node is a set of constraints, compatible pods, and possible instance types that could fulfill these constraints. This
// will be turned into one or more actual node instances within the cluster after bin packing.
type Node struct {
Hostname string
Constraints *v1alpha5.Constraints
InstanceTypeOptions []cloudprovider.InstanceType
Pods []*v1.Pod

topology *Topology
requests v1.ResourceList
}

func NewNode(constraints *v1alpha5.Constraints, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node {
return &Node{
var nodeID int64

func NewNode(constraints *v1alpha5.Constraints, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node {
n := &Node{
Hostname: fmt.Sprintf("hostname-placeholder-%04d", atomic.AddInt64(&nodeID, 1)),
Constraints: constraints.DeepCopy(),
InstanceTypeOptions: instanceTypes,
topology: topology,
requests: daemonResources,
}

n.Constraints.Requirements = n.Constraints.Requirements.Add(v1.NodeSelectorRequirement{
Key: v1.LabelHostname,
Operator: v1.NodeSelectorOpIn,
Values: []string{n.Hostname},
})
return n
}

func (n *Node) Add(pod *v1.Pod) error {
podRequirements := v1alpha5.NewPodRequirements(pod)
// Check initial compatibility
if err := n.Constraints.Requirements.Compatible(podRequirements); err != nil {
return err
}
nodeRequirements := n.Constraints.Requirements.Add(podRequirements.Requirements...)

if len(n.Pods) != 0 {
// TODO: remove this check for n.Pods once we properly support hostname topology spread
if err := n.Constraints.Requirements.Compatible(podRequirements); err != nil {
return err
}
// Include topology requirements
requirements, err := n.topology.AddRequirements(podRequirements, nodeRequirements, pod)
if err != nil {
return err
}
// Check node compatibility
if err = n.Constraints.Requirements.Compatible(requirements); err != nil {
return err
}
requirements := n.Constraints.Requirements.Add(podRequirements.Requirements...)
// Tighten requirements
requirements = n.Constraints.Requirements.Add(requirements.Requirements...)
requests := resources.Merge(n.requests, resources.RequestsForPods(pod))

// Check instance type combinations
instanceTypes := cloudprovider.FilterInstanceTypes(n.InstanceTypeOptions, requirements, requests)
if len(instanceTypes) == 0 {
return fmt.Errorf("no instance type satisfied resources %s and requirements %s", resources.String(resources.RequestsForPods(pod)), n.Constraints.Requirements)
}
// Update node
n.Pods = append(n.Pods, pod)
n.InstanceTypeOptions = instanceTypes
n.requests = requests
Expand Down
78 changes: 0 additions & 78 deletions pkg/controllers/provisioning/scheduling/nodeset.go

This file was deleted.

Loading