Skip to content

Commit

Permalink
track volume mounts per node
Browse files Browse the repository at this point in the history
Assume nodes can support infinite volumes until they launch. Once the CSI driver is
reporting the current number of mountable volumes, use that value which may require
launching more nodes.

Fixes #919 and #1888
  • Loading branch information
tzneal committed Jun 15, 2022
1 parent 8030cdb commit fc34204
Show file tree
Hide file tree
Showing 18 changed files with 476 additions and 60 deletions.
3 changes: 3 additions & 0 deletions charts/karpenter/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ rules:
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"]
verbs: ["get", "watch", "list", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get", "watch", "list"]
6 changes: 6 additions & 0 deletions hack/docs/metrics_gen_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo {
}

func getFuncPackage(fun ast.Expr) string {
if pexpr, ok := fun.(*ast.ParenExpr); ok {
return getFuncPackage(pexpr.X)
}
if sexpr, ok := fun.(*ast.StarExpr); ok {
return getFuncPackage(sexpr.X)
}
if sel, ok := fun.(*ast.SelectorExpr); ok {
return fmt.Sprintf("%s", sel.X)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,4 @@ func (a *AWS) validateKubeletConfiguration(kubeletConfig *v1alpha5.KubeletConfig
}
}
return nil
}
}
2 changes: 2 additions & 0 deletions pkg/cloudprovider/aws/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
// EC2VMAvailableMemoryFactor assumes the EC2 VM will consume <7.25% of the memory of a given machine
const EC2VMAvailableMemoryFactor = .925

var _ cloudprovider.InstanceType = (*InstanceType)(nil)

type InstanceType struct {
*ec2.InstanceTypeInfo
offerings []cloudprovider.Offering
Expand Down
5 changes: 3 additions & 2 deletions pkg/cloudprovider/aws/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ func (p *InstanceTypeProvider) newInstanceType(ctx context.Context, info *ec2.In
provider: provider,
offerings: offerings,
}
opts := injection.GetOptions(ctx)
// Precompute to minimize memory/compute overhead
instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI)
instanceType.resources = instanceType.computeResources(opts.AWSEnablePodENI)
instanceType.overhead = instanceType.computeOverhead()
instanceType.requirements = instanceType.computeRequirements()
if !injection.GetOptions(ctx).AWSENILimitedPodDensity {
if !opts.AWSENILimitedPodDensity {
instanceType.maxPods = ptr.Int32(110)
}
return instanceType
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CloudProvider struct {
CreateCalls []*cloudprovider.NodeRequest
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
var _ cloudprovider.InstanceType = (*InstanceType)(nil)

func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) {
c.mu.Lock()
c.CreateCalls = append(c.CreateCalls, nodeRequest)
Expand Down
8 changes: 1 addition & 7 deletions pkg/config/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,7 @@ var _ = Describe("Batch Parameter", func() {
var changed int64
cfg.OnChange(func(c config.Config) {
defer GinkgoRecover()
// we can't unregister this, so just check for the one case we care about
if atomic.LoadInt64(&changed) == 0 {
atomic.StoreInt64(&changed, 1)
Expect(cfg.BatchIdleDuration()).To(Equal(1 * time.Second))
// shouldn't be changed
Expect(cfg.BatchMaxDuration()).To(Equal(10 * time.Second))
}
atomic.StoreInt64(&changed, 1)
})

// simulate user updating the config map with a bad max duration
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule
return nil, fmt.Errorf("getting daemon overhead, %w", err)
}

return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
return scheduler.NewScheduler(ctx, p.kubeClient, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
}

func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error {
Expand Down
23 changes: 19 additions & 4 deletions pkg/controllers/provisioning/scheduling/inflightnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package scheduling

import (
"context"
"fmt"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
Expand All @@ -35,20 +36,23 @@ type InFlightNode struct {
requirements scheduling.Requirements
available v1.ResourceList
startupTolerations []v1.Toleration
hostPortUsage *state.HostPortUsage
hostPortUsage *scheduling.HostPortUsage
volumeUsage *scheduling.VolumeLimits
volumeLimits scheduling.VolumeCount
}

func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode {
// the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled
remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested)

node := &InFlightNode{
Node: n.Node,
available: n.Available,
topology: topology,
requests: remainingDaemonResources,
requirements: scheduling.NewLabelRequirements(n.Node.Labels),
hostPortUsage: n.HostPortUsage.Copy(),
volumeUsage: n.VolumeUsage.Copy(),
volumeLimits: n.VolumeLimits,
}

if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" {
Expand Down Expand Up @@ -81,15 +85,24 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint
return node
}

func (n *InFlightNode) Add(pod *v1.Pod) error {
func (n *InFlightNode) Add(ctx context.Context, pod *v1.Pod) error {
// Check Taints
if err := scheduling.Taints(n.Node.Spec.Taints).Tolerates(pod, n.startupTolerations...); err != nil {
return err
}

if err := n.hostPortUsage.Add(pod); err != nil {
if err := n.hostPortUsage.Validate(pod); err != nil {
return err
}

// determine the number of volumes that will be mounted if the pod schedules
mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod)
if err != nil {
return err
}
if mountedVolumeCount.Exceeds(n.volumeLimits) {
return fmt.Errorf("would exceed node volume limits")
}

// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
// node, which at this point can't be increased in size
Expand Down Expand Up @@ -122,5 +135,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error {
n.requests = requests
n.requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.hostPortUsage.Add(ctx, pod)
n.volumeUsage.Add(ctx, pod)
return nil
}
15 changes: 8 additions & 7 deletions pkg/controllers/provisioning/scheduling/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@ limitations under the License.
package scheduling

import (
"context"
"fmt"
"strings"
"sync/atomic"

v1 "k8s.io/api/core/v1"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/controllers/state"
"github.com/aws/karpenter/pkg/scheduling"
"github.com/aws/karpenter/pkg/utils/resources"
"github.com/aws/karpenter/pkg/utils/sets"
Expand All @@ -40,7 +39,7 @@ type Node struct {

topology *Topology
requests v1.ResourceList
hostPortUsage *state.HostPortUsage
hostPortUsage *scheduling.HostPortUsage
}

var nodeID int64
Expand All @@ -54,19 +53,20 @@ func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonRe
return &Node{
NodeTemplate: template,
InstanceTypeOptions: instanceTypes,
hostPortUsage: state.NewHostPortUsage(),
hostPortUsage: scheduling.NewHostPortUsage(),
topology: topology,
requests: daemonResources,
}
}

func (n *Node) Add(pod *v1.Pod) error {
func (n *Node) Add(ctx context.Context, pod *v1.Pod) error {
// Check Taints
if err := n.Taints.Tolerates(pod); err != nil {
return err
}

if err := n.hostPortUsage.Add(pod); err != nil {
// exposed host ports on the node
if err := n.hostPortUsage.Validate(pod); err != nil {
return err
}

Expand Down Expand Up @@ -102,6 +102,7 @@ func (n *Node) Add(pod *v1.Pod) error {
n.requests = requests
n.Requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.hostPortUsage.Add(ctx, pod)
return nil
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import (
"github.com/aws/karpenter/pkg/utils/resources"
)

func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler {
func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler {
for provisioner := range instanceTypes {
sort.Slice(instanceTypes[provisioner], func(i, j int) bool {
return instanceTypes[provisioner][i].Price() < instanceTypes[provisioner][j].Price()
})
}
s := &Scheduler{
ctx: ctx,
kubeClient: kubeClient,
nodeTemplates: nodeTemplates,
topology: topology,
cluster: cluster,
Expand Down Expand Up @@ -84,6 +86,7 @@ func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alp
}

type Scheduler struct {
ctx context.Context
nodes []*Node
inflight []*InFlightNode
nodeTemplates []*scheduling.NodeTemplate
Expand All @@ -94,6 +97,7 @@ type Scheduler struct {
topology *Topology
cluster *state.Cluster
recorder events.Recorder
kubeClient client.Client
}

func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error) {
Expand All @@ -112,7 +116,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error)
}

// Schedule to existing nodes or create a new node
if errors[pod] = s.add(pod); errors[pod] == nil {
if errors[pod] = s.add(ctx, pod); errors[pod] == nil {
continue
}

Expand Down Expand Up @@ -165,10 +169,10 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod,
logging.FromContext(ctx).Infof("Computed %d unready node(s) will fit %d pod(s)", inflightCount, existingCount)
}

func (s *Scheduler) add(pod *v1.Pod) error {
func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {
// first try to schedule against an in-flight real node
for _, node := range s.inflight {
if err := node.Add(pod); err == nil {
if err := node.Add(ctx, pod); err == nil {
return nil
}
}
Expand All @@ -178,7 +182,7 @@ func (s *Scheduler) add(pod *v1.Pod) error {

// Pick existing node that we are about to create
for _, node := range s.nodes {
if err := node.Add(pod); err == nil {
if err := node.Add(ctx, pod); err == nil {
return nil
}
}
Expand All @@ -197,7 +201,7 @@ func (s *Scheduler) add(pod *v1.Pod) error {
}

node := NewNode(nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes)
if err := node.Add(pod); err != nil {
if err := node.Add(ctx, pod); err != nil {
errs = multierr.Append(errs, fmt.Errorf("incompatible with provisioner %q, %w", nodeTemplate.ProvisionerName, err))
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {

instanceTypes := fake.InstanceTypes(instanceCount)
cloudProv := &fake.CloudProvider{InstanceTypes: instanceTypes}
scheduler := NewScheduler(
scheduler := NewScheduler(ctx,
nil,
[]*scheduling.NodeTemplate{scheduling.NewNodeTemplate(provisioner)},
nil,
state.NewCluster(ctx, nil, cloudProv),
Expand Down
Loading

0 comments on commit fc34204

Please sign in to comment.