Skip to content

Commit

Permalink
track volume mounts per node
Browse files Browse the repository at this point in the history
- allow instance types to report volume limits
- for AWS read the AWS_EBS_VOLUME_ATTACH_LIMIT environment variable to
  override the computer volume limits

Fixes #919 and #1888
  • Loading branch information
tzneal committed Jun 14, 2022
1 parent 8030cdb commit 4bde825
Show file tree
Hide file tree
Showing 21 changed files with 494 additions and 47 deletions.
6 changes: 6 additions & 0 deletions hack/docs/metrics_gen_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo {
}

func getFuncPackage(fun ast.Expr) string {
if pexpr, ok := fun.(*ast.ParenExpr); ok {
return getFuncPackage(pexpr.X)
}
if sexpr, ok := fun.(*ast.StarExpr); ok {
return getFuncPackage(sexpr.X)
}
if sel, ok := fun.(*ast.SelectorExpr); ok {
return fmt.Sprintf("%s", sel.X)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,4 @@ func (a *AWS) validateKubeletConfiguration(kubeletConfig *v1alpha5.KubeletConfig
}
}
return nil
}
}
37 changes: 37 additions & 0 deletions pkg/cloudprovider/aws/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
// EC2VMAvailableMemoryFactor assumes the EC2 VM will consume <7.25% of the memory of a given machine
const EC2VMAvailableMemoryFactor = .925

var _ cloudprovider.InstanceType = (*InstanceType)(nil)

type InstanceType struct {
*ec2.InstanceTypeInfo
offerings []cloudprovider.Offering
Expand All @@ -46,6 +48,11 @@ type InstanceType struct {
resources v1.ResourceList
provider *v1alpha1.AWS
maxPods *int32
volumeLimits map[string]int
}

func (i *InstanceType) VolumeLimits() map[string]int {
return i.volumeLimits
}

func (i *InstanceType) Name() string {
Expand Down Expand Up @@ -293,6 +300,36 @@ func (i *InstanceType) eniLimitedPods() int64 {
return *i.NetworkInfo.MaximumNetworkInterfaces*(*i.NetworkInfo.Ipv4AddressesPerInterface-1) + 2
}

func (i *InstanceType) computeVolumeLimits(ebsVolumeAttachLimit int) map[string]int {
result := map[string]int{
ebsCsiProvisioner: 27,
}
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html
switch i.Name() {
case "d3.8xlarge", "d3en.12xlarge":
result[ebsCsiProvisioner] = 3
case "inf1.xlarge", "inf1.2xlarge":
result[ebsCsiProvisioner] = 26
case "inf1.6xlarge":
result[ebsCsiProvisioner] = 23
case "inf1.24xlarge":
result[ebsCsiProvisioner] = 11
case "mac1.metal":
result[ebsCsiProvisioner] = 16
case "u-6tb1.metal", "u-9tb1.metal", "u-12tb1.metal":
result[ebsCsiProvisioner] = 19
}
if aws.BoolValue(i.BareMetal) {
result[ebsCsiProvisioner] = 31
}

// override EBS CSI volume limit
if ebsVolumeAttachLimit >= 0 {
result[ebsCsiProvisioner] = ebsVolumeAttachLimit
}
return result
}

func lowerKabobCase(s string) string {
return strings.ToLower(strings.ReplaceAll(s, " ", "-"))
}
8 changes: 6 additions & 2 deletions pkg/cloudprovider/aws/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
InstanceTypeZonesCacheKey = "zones"
InstanceTypesAndZonesCacheTTL = 5 * time.Minute
UnfulfillableCapacityErrorCacheTTL = 3 * time.Minute

ebsCsiProvisioner = "ebs.csi.aws.com"
)

type InstanceTypeProvider struct {
Expand Down Expand Up @@ -89,13 +91,15 @@ func (p *InstanceTypeProvider) newInstanceType(ctx context.Context, info *ec2.In
provider: provider,
offerings: offerings,
}
opts := injection.GetOptions(ctx)
// Precompute to minimize memory/compute overhead
instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI)
instanceType.resources = instanceType.computeResources(opts.AWSEnablePodENI)
instanceType.overhead = instanceType.computeOverhead()
instanceType.requirements = instanceType.computeRequirements()
if !injection.GetOptions(ctx).AWSENILimitedPodDensity {
if !opts.AWSENILimitedPodDensity {
instanceType.maxPods = ptr.Int32(110)
}
instanceType.volumeLimits = instanceType.computeVolumeLimits(opts.AWSEBSVolumeAttachLimit)
return instanceType
}

Expand Down
35 changes: 35 additions & 0 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ var _ = BeforeSuite(func() {
AWSNodeNameConvention: string(options.IPName),
AWSENILimitedPodDensity: true,
AWSEnablePodENI: true,
AWSEBSVolumeAttachLimit: -1,
AWSDefaultInstanceProfile: "test-instance-profile",
}
Expect(opts.Validate()).To(Succeed(), "Failed to validate options")
Expand Down Expand Up @@ -1308,6 +1309,40 @@ var _ = Describe("Allocation", func() {
ExpectScheduled(ctx, env.Client, pod)
})
})
Context("Volume Limits", func() {
It("should provide valid EBS volume limits", func() {
instanceTypeCache.Flush()
// provide no max volume attach limit
optsCopy := opts
optsCopy.AWSEBSVolumeAttachLimit = -1
its, err := cloudProvider.GetInstanceTypes(injection.WithOptions(ctx, optsCopy), provisioner.Spec.Provider)
Expect(err).To(BeNil())
for _, it := range its {
expected := 27
// pulled from https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html
switch it.Name() {
case "inf1.6xlarge":
expected = 23
case "m5.metal":
expected = 31
case "inf1.2xlarge":
expected = 26
}
Expect(it.VolumeLimits()).To(Equal(map[string]int{"ebs.csi.aws.com": expected}))
}
})
It("should respect the volume limits passed in the config", func() {
instanceTypeCache.Flush()
// specify a max volume attach limit
optsCopy := opts
optsCopy.AWSEBSVolumeAttachLimit = 3
its, err := cloudProvider.GetInstanceTypes(injection.WithOptions(ctx, optsCopy), provisioner.Spec.Provider)
Expect(err).To(BeNil())
for _, it := range its {
Expect(it.VolumeLimits()).To(Equal(map[string]int{"ebs.csi.aws.com": 3}))
}
})
})
})
Context("Defaulting", func() {
// Intent here is that if updates occur on the controller, the Provisioner doesn't need to be recreated
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CloudProvider struct {
CreateCalls []*cloudprovider.NodeRequest
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
var _ cloudprovider.InstanceType = (*InstanceType)(nil)

func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) {
c.mu.Lock()
c.CreateCalls = append(c.CreateCalls, nodeRequest)
Expand Down
9 changes: 8 additions & 1 deletion pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func NewInstanceType(options InstanceTypeOptions) *InstanceType {
OperatingSystems: options.OperatingSystems,
Resources: options.Resources,
Overhead: options.Overhead,
Price: options.Price},
Price: options.Price,
VolumeLimits: options.VolumeLimits,
},
}
}

Expand Down Expand Up @@ -153,12 +155,17 @@ type InstanceTypeOptions struct {
Overhead v1.ResourceList
Resources v1.ResourceList
Price float64
VolumeLimits map[string]int
}

type InstanceType struct {
options InstanceTypeOptions
}

func (i *InstanceType) VolumeLimits() map[string]int {
return i.options.VolumeLimits
}

func (i *InstanceType) Name() string {
return i.options.Name
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type InstanceType interface {
// Overhead is the amount of resource overhead expected to be used by kubelet and any other system daemons outside
// of Kubernetes.
Overhead() v1.ResourceList
// VolumeLimits returns the maximum number of volumes that can be mounted to the node per CSI provisioner. See
// https://kubernetes.io/docs/concepts/storage/storage-limits/ for more details.
VolumeLimits() map[string]int
// Price is a metric that is used to optimize pod placement onto nodes. This can be an actual monetary price per hour
// for the instance type, or just a weighting where lower 'prices' are preferred.
Price() float64
Expand Down
8 changes: 1 addition & 7 deletions pkg/config/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,7 @@ var _ = Describe("Batch Parameter", func() {
var changed int64
cfg.OnChange(func(c config.Config) {
defer GinkgoRecover()
// we can't unregister this, so just check for the one case we care about
if atomic.LoadInt64(&changed) == 0 {
atomic.StoreInt64(&changed, 1)
Expect(cfg.BatchIdleDuration()).To(Equal(1 * time.Second))
// shouldn't be changed
Expect(cfg.BatchMaxDuration()).To(Equal(10 * time.Second))
}
atomic.StoreInt64(&changed, 1)
})

// simulate user updating the config map with a bad max duration
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule
return nil, fmt.Errorf("getting daemon overhead, %w", err)
}

return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
return scheduler.NewScheduler(ctx, p.kubeClient, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods)
}

func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error {
Expand Down
20 changes: 17 additions & 3 deletions pkg/controllers/provisioning/scheduling/inflightnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,23 @@ type InFlightNode struct {
requirements scheduling.Requirements
available v1.ResourceList
startupTolerations []v1.Toleration
hostPortUsage *state.HostPortUsage
hostPortUsage *scheduling.HostPortUsage
volumeUsage *scheduling.VolumeLimits
maxVolumes scheduling.VolumeCount
}

func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode {
// the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled
remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested)

node := &InFlightNode{
Node: n.Node,
maxVolumes: n.InstanceType.VolumeLimits(),
available: n.Available,
topology: topology,
requests: remainingDaemonResources,
requirements: scheduling.NewLabelRequirements(n.Node.Labels),
hostPortUsage: n.HostPortUsage.Copy(),
volumeUsage: n.VolumeUsage.Copy(),
}

if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" {
Expand Down Expand Up @@ -87,9 +90,18 @@ func (n *InFlightNode) Add(pod *v1.Pod) error {
return err
}

if err := n.hostPortUsage.Add(pod); err != nil {
if err := n.hostPortUsage.Validate(pod); err != nil {
return err
}

// determine the number of volumes that will be mounted if the pod schedules
mountedVolumeCount, err := n.volumeUsage.Validate(pod)
if err != nil {
return err
}
if mountedVolumeCount.Exceeds(n.maxVolumes) {
return fmt.Errorf("would exceed max volume")
}

// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
// node, which at this point can't be increased in size
Expand Down Expand Up @@ -122,5 +134,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error {
n.requests = requests
n.requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.hostPortUsage.Add(pod)
n.volumeUsage.Add(pod)
return nil
}
31 changes: 23 additions & 8 deletions pkg/controllers/provisioning/scheduling/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ limitations under the License.
package scheduling

import (
"context"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"
"sync/atomic"

Expand All @@ -25,7 +27,6 @@ import (

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/controllers/state"
"github.com/aws/karpenter/pkg/scheduling"
"github.com/aws/karpenter/pkg/utils/resources"
"github.com/aws/karpenter/pkg/utils/sets"
Expand All @@ -40,12 +41,13 @@ type Node struct {

topology *Topology
requests v1.ResourceList
hostPortUsage *state.HostPortUsage
hostPortUsage *scheduling.HostPortUsage
volumeUsage *scheduling.VolumeLimits
}

var nodeID int64

func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node {
func NewNode(ctx context.Context, kubeClient client.Client, nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node {
// Copy the template, and add hostname
hostname := fmt.Sprintf("hostname-placeholder-%04d", atomic.AddInt64(&nodeID, 1))
topology.Register(v1.LabelHostname, hostname)
Expand All @@ -54,7 +56,8 @@ func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonRe
return &Node{
NodeTemplate: template,
InstanceTypeOptions: instanceTypes,
hostPortUsage: state.NewHostPortUsage(),
hostPortUsage: scheduling.NewHostPortUsage(ctx),
volumeUsage: scheduling.NewVolumeLimits(ctx, kubeClient),
topology: topology,
requests: daemonResources,
}
Expand All @@ -66,7 +69,8 @@ func (n *Node) Add(pod *v1.Pod) error {
return err
}

if err := n.hostPortUsage.Add(pod); err != nil {
// exposed host ports on the node
if err := n.hostPortUsage.Validate(pod); err != nil {
return err
}

Expand All @@ -91,7 +95,13 @@ func (n *Node) Add(pod *v1.Pod) error {

// Check instance type combinations
requests := resources.Merge(n.requests, resources.RequestsForPods(pod))
instanceTypes := filterInstanceTypes(n.InstanceTypeOptions, nodeRequirements, requests)
// determine the number of volumes that will be mounted if the pod schedules
mountedVolumeCount, err := n.volumeUsage.Validate(pod)
if err != nil {
return err
}

instanceTypes := filterInstanceTypes(n.InstanceTypeOptions, nodeRequirements, requests, mountedVolumeCount)
if len(instanceTypes) == 0 {
return fmt.Errorf("no instance type satisfied resources %s and requirements %s", resources.String(resources.RequestsForPods(pod)), nodeRequirements)
}
Expand All @@ -102,6 +112,8 @@ func (n *Node) Add(pod *v1.Pod) error {
n.requests = requests
n.Requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.hostPortUsage.Add(pod)
n.volumeUsage.Add(pod)
return nil
}

Expand All @@ -120,9 +132,12 @@ func (n *Node) String() string {
return fmt.Sprintf("node with %d pods requesting %s from types %s", len(n.Pods), resources.String(n.requests), itSb.String())
}

func filterInstanceTypes(instanceTypes []cloudprovider.InstanceType, requirements scheduling.Requirements, requests v1.ResourceList) []cloudprovider.InstanceType {
func filterInstanceTypes(instanceTypes []cloudprovider.InstanceType, requirements scheduling.Requirements, requests v1.ResourceList, volumesPerNode scheduling.VolumeCount) []cloudprovider.InstanceType {
return lo.Filter(instanceTypes, func(instanceType cloudprovider.InstanceType, _ int) bool {
return compatible(instanceType, requirements) && fits(instanceType, requests) && hasOffering(instanceType, requirements)
return compatible(instanceType, requirements) &&
fits(instanceType, requests) &&
hasOffering(instanceType, requirements) &&
scheduling.VolumeCount(instanceType.VolumeLimits()).Fits(volumesPerNode)
})
}

Expand Down
Loading

0 comments on commit 4bde825

Please sign in to comment.