Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tzneal committed Mar 26, 2022
1 parent 368c4f3 commit 9434fab
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 28 deletions.
5 changes: 1 addition & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *Provisioner) provision(ctx context.Context) error {
// Launch capacity and bind pods
workqueue.ParallelizeUntil(ctx, len(nodes), len(nodes), func(i int) {
if err := p.launch(ctx, nodes[i]); err != nil {
logging.FromContext(ctx).Errorf("Could not launch node, %s", err)
logging.FromContext(ctx).Errorf("Launching node, %s", err)
}
})
return nil
Expand All @@ -136,9 +136,6 @@ func (p *Provisioner) isProvisionable(ctx context.Context, candidate *v1.Pod) (b
}

func (p *Provisioner) launch(ctx context.Context, node *scheduling.Node) error {
if len(node.InstanceTypeOptions) == 0 {
return fmt.Errorf("no valid instance types")
}
// Check limits
latest := &v1alpha5.Provisioner{}
if err := p.kubeClient.Get(ctx, client.ObjectKeyFromObject(p.Provisioner), latest); err != nil {
Expand Down
51 changes: 28 additions & 23 deletions pkg/controllers/provisioning/scheduling/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"errors"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
Expand All @@ -34,13 +33,31 @@ type Node struct {

podResources v1.ResourceList
daemonResources v1.ResourceList
daemons []*v1.Pod
daemons []daemonInfo
}

func NewNode(constraints *v1alpha5.Constraints, daemons []*v1.Pod, instanceTypeOptions []cloudprovider.InstanceType, pods ...*v1.Pod) *Node {
type daemonInfo struct {
pod *v1.Pod
requirements v1alpha5.Requirements
resources v1.ResourceList
}

func NewNode(constraints *v1alpha5.Constraints, daemons []*v1.Pod, instanceTypeOptions []cloudprovider.InstanceType, pods ...*v1.Pod) (*Node, error) {
n := &Node{
Constraints: *constraints.DeepCopy(),
daemons: daemons,
}

for _, daemon := range daemons {
// skip any daemons that our provisioner configured taints would cause to not schedule
if err := n.Taints.Tolerates(daemon); err != nil {
continue
}
di := daemonInfo{
pod: daemon,
requirements: v1alpha5.NewPodRequirements(daemon),
resources: resources.RequestsForPods(daemon),
}
n.daemons = append(n.daemons, di)
}

// calculate daemon resource consumption so we can filter out instance types based on daemons + instance type overhead
Expand All @@ -59,13 +76,14 @@ func NewNode(constraints *v1alpha5.Constraints, daemons []*v1.Pod, instanceTypeO
}
n.InstanceTypeOptions = append(n.InstanceTypeOptions, it)
}
// technically we might create some invalid construct here if the pod can't be supported by any instance type
// but this is ok as we just won't have any valid instance types, this pod won't be schedulable and nothing else
// will be compatible with this node

for _, p := range pods {
n.Add(p)
}
return n
if len(n.InstanceTypeOptions) == 0 {
return nil, errors.New("no instance type satisfied requirements")
}
return n, nil
}

func (n Node) Compatible(pod *v1.Pod) error {
Expand All @@ -74,11 +92,6 @@ func (n Node) Compatible(pod *v1.Pod) error {
return err
}

// Taints should only arrive via the provisioner, so this should never occur but to be safe
if err := n.Taints.Tolerates(pod); err != nil {
return err
}

tightened := n.Requirements.Add(podRequirements.Requirements...)
// Ensure that at least one instance type of the instance types that we are already narrowed down to based on the
// existing pods can support the pod resources and combined pod + provider requirements
Expand Down Expand Up @@ -141,7 +154,6 @@ func (n *Node) Add(pod *v1.Pod) {
n.Pods = append(n.Pods, pod)
n.InstanceTypeOptions = instanceTypeOptions
n.podResources = resources.Merge(n.podResources, resources.RequestsForPods(pod))

}

// hasCompatibleResources tests if a given node selector and resource request list is compatible with an instance type
Expand All @@ -162,18 +174,11 @@ func (n Node) hasCompatibleResources(resourceList v1.ResourceList, it cloudprovi
func (n *Node) recalculateDaemonResources() {
n.daemonResources = nil
for _, daemon := range n.daemons {
podRequirements := v1alpha5.NewPodRequirements(daemon)
// we intentionally do not check if the DaemonSet pod will fit on the node here, we want large DS pods to cause
// us to choose larger instance types
if err := n.Requirements.Compatible(podRequirements); err != nil {
if err := n.Requirements.Compatible(daemon.requirements); err != nil {
continue
}
if err := n.Taints.Tolerates(daemon); err != nil {
continue
}

daemonsetResources := resources.RequestsForPods(daemon)
daemonsetResources[v1.ResourcePods] = resource.MustParse("1")
n.daemonResources = resources.Merge(n.daemonResources, daemonsetResources)
n.daemonResources = resources.Merge(n.daemonResources, daemon.resources)
}
}
7 changes: 6 additions & 1 deletion pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner
}
}
if !isScheduled {
nodeSet.Add(NewNode(constraints, nodeSet.daemons, instanceTypes, pod))
n, err := NewNode(constraints, nodeSet.daemons, instanceTypes, pod)
if err != nil {
logging.FromContext(ctx).With("pod", client.ObjectKeyFromObject(pod)).Errorf("scheduling pod, %s", err)
} else {
nodeSet.Add(n)
}
}
}
logging.FromContext(ctx).Infof("scheduled %d pods onto %d nodes in %s", len(pods), len(nodeSet.nodes), time.Since(start))
Expand Down

0 comments on commit 9434fab

Please sign in to comment.