Skip to content

Commit

Permalink
track volume mounts per node
Browse files Browse the repository at this point in the history
- allow instance types to report volume limits
- for AWS read the AWS_VOLUME_ATTACH_LIMIT environment variable to
  override the computer volume limits


Fixes #919 and #1888
  • Loading branch information
tzneal committed Jun 13, 2022
1 parent 2873859 commit d75a5a2
Show file tree
Hide file tree
Showing 17 changed files with 346 additions and 35 deletions.
6 changes: 6 additions & 0 deletions hack/docs/metrics_gen_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo {
}

func getFuncPackage(fun ast.Expr) string {
if pexpr, ok := fun.(*ast.ParenExpr); ok {
return getFuncPackage(pexpr.X)
}
if sexpr, ok := fun.(*ast.StarExpr); ok {
return getFuncPackage(sexpr.X)
}
if sel, ok := fun.(*ast.SelectorExpr); ok {
return fmt.Sprintf("%s", sel.X)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,4 @@ func (a *AWS) validateKubeletConfiguration(kubeletConfig *v1alpha5.KubeletConfig
}
}
return nil
}
}
30 changes: 30 additions & 0 deletions pkg/cloudprovider/aws/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
// EC2VMAvailableMemoryFactor assumes the EC2 VM will consume <7.25% of the memory of a given machine
const EC2VMAvailableMemoryFactor = .925

var _ cloudprovider.InstanceType = (*InstanceType)(nil)

type InstanceType struct {
*ec2.InstanceTypeInfo
offerings []cloudprovider.Offering
Expand All @@ -46,6 +48,34 @@ type InstanceType struct {
resources v1.ResourceList
provider *v1alpha1.AWS
maxPods *int32
maxVolumes *int32
}

func (i *InstanceType) MaxVolumes() int {
if i.maxVolumes != nil {
return int(*i.maxVolumes)
}
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html
switch i.Name() {
case "d3.8xlarge", "d3en.12xlarge":
return 3
case "inf1.xlarge", "inf1.2xlarge":
return 26
case "inf1.6xlarge":
return 23
case "inf1.24xlarge":
return 11
case "mac1.metal":
return 16
case "u-6tb1.metal", "u-9tb1.metal", "u-12tb1.metal":
return 19
}

if aws.BoolValue(i.BareMetal) {
return 31
}

return 27
}

func (i *InstanceType) Name() string {
Expand Down
8 changes: 6 additions & 2 deletions pkg/cloudprovider/aws/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,17 @@ func (p *InstanceTypeProvider) newInstanceType(ctx context.Context, info *ec2.In
provider: provider,
offerings: offerings,
}
opts := injection.GetOptions(ctx)
// Precompute to minimize memory/compute overhead
instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI)
instanceType.resources = instanceType.computeResources(opts.AWSEnablePodENI)
instanceType.overhead = instanceType.computeOverhead()
instanceType.requirements = instanceType.computeRequirements()
if !injection.GetOptions(ctx).AWSENILimitedPodDensity {
if !opts.AWSENILimitedPodDensity {
instanceType.maxPods = ptr.Int32(110)
}
if opts.AWSVolumeAttachLimit >= 0 {
instanceType.maxVolumes = ptr.Int32(int32(opts.AWSVolumeAttachLimit))
}
return instanceType
}

Expand Down
35 changes: 35 additions & 0 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ var _ = BeforeSuite(func() {
AWSNodeNameConvention: string(options.IPName),
AWSENILimitedPodDensity: true,
AWSEnablePodENI: true,
AWSVolumeAttachLimit: -1,
AWSDefaultInstanceProfile: "test-instance-profile",
}
Expect(opts.Validate()).To(Succeed(), "Failed to validate options")
Expand Down Expand Up @@ -1308,6 +1309,40 @@ var _ = Describe("Allocation", func() {
ExpectScheduled(ctx, env.Client, pod)
})
})
Context("Volume Limits", func() {
It("should provide valid volume limits", func() {
instanceTypeCache.Flush()
// provide no max volume attach limit
optsCopy := opts
optsCopy.AWSVolumeAttachLimit = -1
its, err := cloudProvider.GetInstanceTypes(injection.WithOptions(ctx, optsCopy), provisioner.Spec.Provider)
Expect(err).To(BeNil())
for _, it := range its {
expected := 27
// pulled from https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html
switch it.Name() {
case "inf1.6xlarge":
expected = 23
case "m5.metal":
expected = 31
case "inf1.2xlarge":
expected = 26
}
Expect(it.MaxVolumes()).To(Equal(expected))
}
})
It("should respect the volume limits passed in the config", func() {
instanceTypeCache.Flush()
// specify a max volume attach limit
optsCopy := opts
optsCopy.AWSVolumeAttachLimit = 3
its, err := cloudProvider.GetInstanceTypes(injection.WithOptions(ctx, optsCopy), provisioner.Spec.Provider)
Expect(err).To(BeNil())
for _, it := range its {
Expect(it.MaxVolumes()).To(Equal(3))
}
})
})
})
Context("Defaulting", func() {
// Intent here is that if updates occur on the controller, the Provisioner doesn't need to be recreated
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CloudProvider struct {
CreateCalls []*cloudprovider.NodeRequest
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
var _ cloudprovider.InstanceType = (*InstanceType)(nil)

func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) {
c.mu.Lock()
c.CreateCalls = append(c.CreateCalls, nodeRequest)
Expand Down
12 changes: 11 additions & 1 deletion pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func NewInstanceType(options InstanceTypeOptions) *InstanceType {
OperatingSystems: options.OperatingSystems,
Resources: options.Resources,
Overhead: options.Overhead,
Price: options.Price},
Price: options.Price,
VolumeLimits: options.VolumeLimits,
},
}
}

Expand Down Expand Up @@ -153,12 +155,20 @@ type InstanceTypeOptions struct {
Overhead v1.ResourceList
Resources v1.ResourceList
Price float64
VolumeLimits *int
}

type InstanceType struct {
options InstanceTypeOptions
}

func (i *InstanceType) MaxVolumes() int {
if i.options.VolumeLimits != nil {
return *i.options.VolumeLimits
}
return 27
}

func (i *InstanceType) Name() string {
return i.options.Name
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type InstanceType interface {
// Overhead is the amount of resource overhead expected to be used by kubelet and any other system daemons outside
// of Kubernetes.
Overhead() v1.ResourceList
// MaxVolumes returns the maximum number of volumes that can be mounted to the node.
MaxVolumes() int
// Price is a metric that is used to optimize pod placement onto nodes. This can be an actual monetary price per hour
// for the instance type, or just a weighting where lower 'prices' are preferred.
Price() float64
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule
return nil, fmt.Errorf("getting daemon overhead, %w", err)
}

return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
return scheduler.NewScheduler(ctx, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
}

func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error {
Expand Down
20 changes: 17 additions & 3 deletions pkg/controllers/provisioning/scheduling/inflightnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,23 @@ type InFlightNode struct {
requirements scheduling.Requirements
available v1.ResourceList
startupTolerations []v1.Toleration
hostPortUsage *state.HostPortUsage
hostPortUsage *scheduling.HostPortUsage
volumeUsage *scheduling.VolumeLimits
maxVolumes int
}

func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode {
// the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled
remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested)

node := &InFlightNode{
Node: n.Node,
maxVolumes: n.InstanceType.MaxVolumes(),
available: n.Available,
topology: topology,
requests: remainingDaemonResources,
requirements: scheduling.NewLabelRequirements(n.Node.Labels),
hostPortUsage: n.HostPortUsage.Copy(),
volumeUsage: n.VolumeUsage.Copy(),
}

if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" {
Expand Down Expand Up @@ -87,9 +90,18 @@ func (n *InFlightNode) Add(pod *v1.Pod) error {
return err
}

if err := n.hostPortUsage.Add(pod); err != nil {
if err := n.hostPortUsage.Validate(pod); err != nil {
return err
}

// determine the number of volumes that will be mounted if the pod schedules
mountedVolumeCount, err := n.volumeUsage.Validate(pod)
if err != nil {
return err
}
if mountedVolumeCount > n.maxVolumes {
return fmt.Errorf("%d would exceed max volume count of %d", mountedVolumeCount, n.maxVolumes)
}

// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
// node, which at this point can't be increased in size
Expand Down Expand Up @@ -122,5 +134,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error {
n.requests = requests
n.requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.hostPortUsage.Add(pod)
n.volumeUsage.Add(pod)
return nil
}
29 changes: 21 additions & 8 deletions pkg/controllers/provisioning/scheduling/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package scheduling

import (
"context"
"fmt"
"strings"
"sync/atomic"
Expand All @@ -25,7 +26,6 @@ import (

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/controllers/state"
"github.com/aws/karpenter/pkg/scheduling"
"github.com/aws/karpenter/pkg/utils/resources"
"github.com/aws/karpenter/pkg/utils/sets"
Expand All @@ -40,12 +40,13 @@ type Node struct {

topology *Topology
requests v1.ResourceList
hostPortUsage *state.HostPortUsage
hostPortUsage *scheduling.HostPortUsage
volumeUsage *scheduling.VolumeLimits
}

var nodeID int64

func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node {
func NewNode(ctx context.Context, nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node {
// Copy the template, and add hostname
hostname := fmt.Sprintf("hostname-placeholder-%04d", atomic.AddInt64(&nodeID, 1))
topology.Register(v1.LabelHostname, hostname)
Expand All @@ -54,7 +55,8 @@ func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonRe
return &Node{
NodeTemplate: template,
InstanceTypeOptions: instanceTypes,
hostPortUsage: state.NewHostPortUsage(),
hostPortUsage: scheduling.NewHostPortUsage(ctx),
volumeUsage: scheduling.NewVolumeLimits(ctx),
topology: topology,
requests: daemonResources,
}
Expand All @@ -66,7 +68,8 @@ func (n *Node) Add(pod *v1.Pod) error {
return err
}

if err := n.hostPortUsage.Add(pod); err != nil {
// exposed host ports on the node
if err := n.hostPortUsage.Validate(pod); err != nil {
return err
}

Expand All @@ -91,7 +94,12 @@ func (n *Node) Add(pod *v1.Pod) error {

// Check instance type combinations
requests := resources.Merge(n.requests, resources.RequestsForPods(pod))
instanceTypes := filterInstanceTypes(n.InstanceTypeOptions, nodeRequirements, requests)
// determine the number of volumes that will be mounted if the pod schedules
mountedVolumeCount, err := n.volumeUsage.Validate(pod)
if err != nil {
return err
}
instanceTypes := filterInstanceTypes(n.InstanceTypeOptions, nodeRequirements, requests, mountedVolumeCount)
if len(instanceTypes) == 0 {
return fmt.Errorf("no instance type satisfied resources %s and requirements %s", resources.String(resources.RequestsForPods(pod)), nodeRequirements)
}
Expand All @@ -102,6 +110,8 @@ func (n *Node) Add(pod *v1.Pod) error {
n.requests = requests
n.Requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.hostPortUsage.Add(pod)
n.volumeUsage.Add(pod)
return nil
}

Expand All @@ -120,9 +130,12 @@ func (n *Node) String() string {
return fmt.Sprintf("node with %d pods requesting %s from types %s", len(n.Pods), resources.String(n.requests), itSb.String())
}

func filterInstanceTypes(instanceTypes []cloudprovider.InstanceType, requirements scheduling.Requirements, requests v1.ResourceList) []cloudprovider.InstanceType {
func filterInstanceTypes(instanceTypes []cloudprovider.InstanceType, requirements scheduling.Requirements, requests v1.ResourceList, volumesPerNode int) []cloudprovider.InstanceType {
return lo.Filter(instanceTypes, func(instanceType cloudprovider.InstanceType, _ int) bool {
return compatible(instanceType, requirements) && fits(instanceType, requests) && hasOffering(instanceType, requirements)
return compatible(instanceType, requirements) &&
fits(instanceType, requests) &&
hasOffering(instanceType, requirements) &&
instanceType.MaxVolumes() >= volumesPerNode
})
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ import (
"github.com/aws/karpenter/pkg/utils/resources"
)

func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler {
func NewScheduler(ctx context.Context, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler {
for provisioner := range instanceTypes {
sort.Slice(instanceTypes[provisioner], func(i, j int) bool {
return instanceTypes[provisioner][i].Price() < instanceTypes[provisioner][j].Price()
})
}
s := &Scheduler{
ctx: ctx,
nodeTemplates: nodeTemplates,
topology: topology,
cluster: cluster,
Expand Down Expand Up @@ -84,6 +85,7 @@ func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alp
}

type Scheduler struct {
ctx context.Context
nodes []*Node
inflight []*InFlightNode
nodeTemplates []*scheduling.NodeTemplate
Expand Down Expand Up @@ -196,7 +198,7 @@ func (s *Scheduler) add(pod *v1.Pod) error {
}
}

node := NewNode(nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes)
node := NewNode(s.ctx, nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes)
if err := node.Add(pod); err != nil {
errs = multierr.Append(errs, fmt.Errorf("incompatible with provisioner %q, %w", nodeTemplate.ProvisionerName, err))
continue
Expand Down
Loading

0 comments on commit d75a5a2

Please sign in to comment.