From af94c355a3ee9613c6d3198ec41a8946349f4af7 Mon Sep 17 00:00:00 2001 From: Todd Neal Date: Mon, 13 Jun 2022 09:10:44 -0500 Subject: [PATCH] track volume mounts per node - allow instance types to report volume limits - for AWS read the AWS_EBS_VOLUME_ATTACH_LIMIT environment variable to override the computer volume limits Fixes #919 and #1888 --- hack/docs/metrics_gen_docs.go | 6 + .../aws/apis/v1alpha1/provider_validation.go | 2 +- pkg/cloudprovider/aws/instancetype.go | 37 ++++ pkg/cloudprovider/aws/instancetypes.go | 8 +- pkg/cloudprovider/aws/suite_test.go | 35 +++ pkg/cloudprovider/fake/cloudprovider.go | 3 + pkg/cloudprovider/fake/instancetype.go | 9 +- pkg/cloudprovider/types.go | 3 + pkg/config/suite_test.go | 8 +- pkg/controllers/provisioning/provisioner.go | 2 +- .../provisioning/scheduling/inflightnode.go | 23 +- .../provisioning/scheduling/node.go | 36 +++- .../provisioning/scheduling/scheduler.go | 18 +- .../scheduling/scheduling_benchmark_test.go | 3 +- .../provisioning/scheduling/suite_test.go | 100 ++++++++- pkg/controllers/state/cluster.go | 19 +- .../state => scheduling}/hostportusage.go | 26 ++- pkg/scheduling/suite_test.go | 3 +- pkg/scheduling/volumelimits.go | 199 ++++++++++++++++++ pkg/test/storage.go | 10 +- pkg/utils/options/options.go | 3 + 21 files changed, 499 insertions(+), 54 deletions(-) rename pkg/{controllers/state => scheduling}/hostportusage.go (82%) create mode 100644 pkg/scheduling/volumelimits.go diff --git a/hack/docs/metrics_gen_docs.go b/hack/docs/metrics_gen_docs.go index fe5f461881ba..0eb3de01ae12 100644 --- a/hack/docs/metrics_gen_docs.go +++ b/hack/docs/metrics_gen_docs.go @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo { } func getFuncPackage(fun ast.Expr) string { + if pexpr, ok := fun.(*ast.ParenExpr); ok { + return getFuncPackage(pexpr.X) + } + if sexpr, ok := fun.(*ast.StarExpr); ok { + return getFuncPackage(sexpr.X) + } if sel, ok := fun.(*ast.SelectorExpr); ok { return fmt.Sprintf("%s", sel.X) } diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go b/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go index 710f18fdc50d..c37fa8ddd871 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go @@ -279,4 +279,4 @@ func (a *AWS) validateKubeletConfiguration(kubeletConfig *v1alpha5.KubeletConfig } } return nil -} \ No newline at end of file +} diff --git a/pkg/cloudprovider/aws/instancetype.go b/pkg/cloudprovider/aws/instancetype.go index 86ec90d8fe63..f50209ec60e1 100644 --- a/pkg/cloudprovider/aws/instancetype.go +++ b/pkg/cloudprovider/aws/instancetype.go @@ -38,6 +38,8 @@ import ( // EC2VMAvailableMemoryFactor assumes the EC2 VM will consume <7.25% of the memory of a given machine const EC2VMAvailableMemoryFactor = .925 +var _ cloudprovider.InstanceType = (*InstanceType)(nil) + type InstanceType struct { *ec2.InstanceTypeInfo offerings []cloudprovider.Offering @@ -46,6 +48,11 @@ type InstanceType struct { resources v1.ResourceList provider *v1alpha1.AWS maxPods *int32 + volumeLimits map[string]int +} + +func (i *InstanceType) VolumeLimits() map[string]int { + return i.volumeLimits } func (i *InstanceType) Name() string { @@ -293,6 +300,36 @@ func (i *InstanceType) eniLimitedPods() int64 { return *i.NetworkInfo.MaximumNetworkInterfaces*(*i.NetworkInfo.Ipv4AddressesPerInterface-1) + 2 } +func (i *InstanceType) computeVolumeLimits(ebsVolumeAttachLimit int) map[string]int { + result := map[string]int{ + ebsCsiProvisioner: 27, + } + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html + switch i.Name() { + case "d3.8xlarge", "d3en.12xlarge": + result[ebsCsiProvisioner] = 3 + case "inf1.xlarge", "inf1.2xlarge": + result[ebsCsiProvisioner] = 26 + case "inf1.6xlarge": + result[ebsCsiProvisioner] = 23 + case "inf1.24xlarge": + result[ebsCsiProvisioner] = 11 + case "mac1.metal": + result[ebsCsiProvisioner] = 16 + case "u-6tb1.metal", "u-9tb1.metal", "u-12tb1.metal": + result[ebsCsiProvisioner] = 19 + } + if aws.BoolValue(i.BareMetal) { + result[ebsCsiProvisioner] = 31 + } + + // override EBS CSI volume limit + if ebsVolumeAttachLimit >= 0 { + result[ebsCsiProvisioner] = ebsVolumeAttachLimit + } + return result +} + func lowerKabobCase(s string) string { return strings.ToLower(strings.ReplaceAll(s, " ", "-")) } diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index 9a9db125dd01..9dfa832cb5f1 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -40,6 +40,8 @@ const ( InstanceTypeZonesCacheKey = "zones" InstanceTypesAndZonesCacheTTL = 5 * time.Minute UnfulfillableCapacityErrorCacheTTL = 3 * time.Minute + + ebsCsiProvisioner = "ebs.csi.aws.com" ) type InstanceTypeProvider struct { @@ -89,13 +91,15 @@ func (p *InstanceTypeProvider) newInstanceType(ctx context.Context, info *ec2.In provider: provider, offerings: offerings, } + opts := injection.GetOptions(ctx) // Precompute to minimize memory/compute overhead - instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI) + instanceType.resources = instanceType.computeResources(opts.AWSEnablePodENI) instanceType.overhead = instanceType.computeOverhead() instanceType.requirements = instanceType.computeRequirements() - if !injection.GetOptions(ctx).AWSENILimitedPodDensity { + if !opts.AWSENILimitedPodDensity { instanceType.maxPods = ptr.Int32(110) } + instanceType.volumeLimits = instanceType.computeVolumeLimits(opts.AWSEBSVolumeAttachLimit) return instanceType } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index ab30857f75ec..da9207c4ee88 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -85,6 +85,7 @@ var _ = BeforeSuite(func() { AWSNodeNameConvention: string(options.IPName), AWSENILimitedPodDensity: true, AWSEnablePodENI: true, + AWSEBSVolumeAttachLimit: -1, AWSDefaultInstanceProfile: "test-instance-profile", } Expect(opts.Validate()).To(Succeed(), "Failed to validate options") @@ -1308,6 +1309,40 @@ var _ = Describe("Allocation", func() { ExpectScheduled(ctx, env.Client, pod) }) }) + Context("Volume Limits", func() { + It("should provide valid EBS volume limits", func() { + instanceTypeCache.Flush() + // provide no max volume attach limit + optsCopy := opts + optsCopy.AWSEBSVolumeAttachLimit = -1 + its, err := cloudProvider.GetInstanceTypes(injection.WithOptions(ctx, optsCopy), provisioner.Spec.Provider) + Expect(err).To(BeNil()) + for _, it := range its { + expected := 27 + // pulled from https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html + switch it.Name() { + case "inf1.6xlarge": + expected = 23 + case "m5.metal": + expected = 31 + case "inf1.2xlarge": + expected = 26 + } + Expect(it.VolumeLimits()).To(Equal(map[string]int{"ebs.csi.aws.com": expected})) + } + }) + It("should respect the volume limits passed in the config", func() { + instanceTypeCache.Flush() + // specify a max volume attach limit + optsCopy := opts + optsCopy.AWSEBSVolumeAttachLimit = 3 + its, err := cloudProvider.GetInstanceTypes(injection.WithOptions(ctx, optsCopy), provisioner.Spec.Provider) + Expect(err).To(BeNil()) + for _, it := range its { + Expect(it.VolumeLimits()).To(Equal(map[string]int{"ebs.csi.aws.com": 3})) + } + }) + }) }) Context("Defaulting", func() { // Intent here is that if updates occur on the controller, the Provisioner doesn't need to be recreated diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 47104927a09b..03146a34d730 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -49,6 +49,9 @@ type CloudProvider struct { CreateCalls []*cloudprovider.NodeRequest } +var _ cloudprovider.CloudProvider = (*CloudProvider)(nil) +var _ cloudprovider.InstanceType = (*InstanceType)(nil) + func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) { c.mu.Lock() c.CreateCalls = append(c.CreateCalls, nodeRequest) diff --git a/pkg/cloudprovider/fake/instancetype.go b/pkg/cloudprovider/fake/instancetype.go index 18e5863b99ec..ae711356fd47 100644 --- a/pkg/cloudprovider/fake/instancetype.go +++ b/pkg/cloudprovider/fake/instancetype.go @@ -87,7 +87,9 @@ func NewInstanceType(options InstanceTypeOptions) *InstanceType { OperatingSystems: options.OperatingSystems, Resources: options.Resources, Overhead: options.Overhead, - Price: options.Price}, + Price: options.Price, + VolumeLimits: options.VolumeLimits, + }, } } @@ -153,12 +155,17 @@ type InstanceTypeOptions struct { Overhead v1.ResourceList Resources v1.ResourceList Price float64 + VolumeLimits map[string]int } type InstanceType struct { options InstanceTypeOptions } +func (i *InstanceType) VolumeLimits() map[string]int { + return i.options.VolumeLimits +} + func (i *InstanceType) Name() string { return i.options.Name } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 6a03a6ca01c3..399bc9a45dbb 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -74,6 +74,9 @@ type InstanceType interface { // Overhead is the amount of resource overhead expected to be used by kubelet and any other system daemons outside // of Kubernetes. Overhead() v1.ResourceList + // VolumeLimits returns the maximum number of volumes that can be mounted to the node per CSI provisioner. See + // https://kubernetes.io/docs/concepts/storage/storage-limits/ for more details. + VolumeLimits() map[string]int // Price is a metric that is used to optimize pod placement onto nodes. This can be an actual monetary price per hour // for the instance type, or just a weighting where lower 'prices' are preferred. Price() float64 diff --git a/pkg/config/suite_test.go b/pkg/config/suite_test.go index bd5edb0701b3..d1a12f4680f3 100644 --- a/pkg/config/suite_test.go +++ b/pkg/config/suite_test.go @@ -124,13 +124,7 @@ var _ = Describe("Batch Parameter", func() { var changed int64 cfg.OnChange(func(c config.Config) { defer GinkgoRecover() - // we can't unregister this, so just check for the one case we care about - if atomic.LoadInt64(&changed) == 0 { - atomic.StoreInt64(&changed, 1) - Expect(cfg.BatchIdleDuration()).To(Equal(1 * time.Second)) - // shouldn't be changed - Expect(cfg.BatchMaxDuration()).To(Equal(10 * time.Second)) - } + atomic.StoreInt64(&changed, 1) }) // simulate user updating the config map with a bad max duration diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index d074bd746500..11c5c596a9ab 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -220,7 +220,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule return nil, fmt.Errorf("getting daemon overhead, %w", err) } - return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods) + return scheduler.NewScheduler(ctx, p.kubeClient, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods) } func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error { diff --git a/pkg/controllers/provisioning/scheduling/inflightnode.go b/pkg/controllers/provisioning/scheduling/inflightnode.go index 839651994c53..23c1aae759f6 100644 --- a/pkg/controllers/provisioning/scheduling/inflightnode.go +++ b/pkg/controllers/provisioning/scheduling/inflightnode.go @@ -15,6 +15,7 @@ limitations under the License. package scheduling import ( + "context" "fmt" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -35,20 +36,23 @@ type InFlightNode struct { requirements scheduling.Requirements available v1.ResourceList startupTolerations []v1.Toleration - hostPortUsage *state.HostPortUsage + hostPortUsage *scheduling.HostPortUsage + volumeUsage *scheduling.VolumeLimits + volumeLimits scheduling.VolumeCount } func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode { // the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested) - node := &InFlightNode{ Node: n.Node, + volumeLimits: n.InstanceType.VolumeLimits(), available: n.Available, topology: topology, requests: remainingDaemonResources, requirements: scheduling.NewLabelRequirements(n.Node.Labels), hostPortUsage: n.HostPortUsage.Copy(), + volumeUsage: n.VolumeUsage.Copy(), } if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" { @@ -81,15 +85,24 @@ func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint return node } -func (n *InFlightNode) Add(pod *v1.Pod) error { +func (n *InFlightNode) Add(ctx context.Context, pod *v1.Pod) error { // Check Taints if err := scheduling.Taints(n.Node.Spec.Taints).Tolerates(pod, n.startupTolerations...); err != nil { return err } - if err := n.hostPortUsage.Add(pod); err != nil { + if err := n.hostPortUsage.Validate(pod); err != nil { + return err + } + + // determine the number of volumes that will be mounted if the pod schedules + mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod) + if err != nil { return err } + if mountedVolumeCount.Exceeds(n.volumeLimits) { + return fmt.Errorf("would exceed node volume limits") + } // check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight // node, which at this point can't be increased in size @@ -122,5 +135,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error { n.requests = requests n.requirements = nodeRequirements n.topology.Record(pod, nodeRequirements) + n.hostPortUsage.Add(ctx, pod) + n.volumeUsage.Add(ctx, pod) return nil } diff --git a/pkg/controllers/provisioning/scheduling/node.go b/pkg/controllers/provisioning/scheduling/node.go index bb67a1b8f7c6..f7e89f700c8e 100644 --- a/pkg/controllers/provisioning/scheduling/node.go +++ b/pkg/controllers/provisioning/scheduling/node.go @@ -15,17 +15,17 @@ limitations under the License. package scheduling import ( + "context" "fmt" "strings" "sync/atomic" - v1 "k8s.io/api/core/v1" - "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" - "github.com/aws/karpenter/pkg/controllers/state" "github.com/aws/karpenter/pkg/scheduling" "github.com/aws/karpenter/pkg/utils/resources" "github.com/aws/karpenter/pkg/utils/sets" @@ -40,12 +40,13 @@ type Node struct { topology *Topology requests v1.ResourceList - hostPortUsage *state.HostPortUsage + hostPortUsage *scheduling.HostPortUsage + volumeUsage *scheduling.VolumeLimits } var nodeID int64 -func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node { +func NewNode(kubeClient client.Client, nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node { // Copy the template, and add hostname hostname := fmt.Sprintf("hostname-placeholder-%04d", atomic.AddInt64(&nodeID, 1)) topology.Register(v1.LabelHostname, hostname) @@ -54,19 +55,21 @@ func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonRe return &Node{ NodeTemplate: template, InstanceTypeOptions: instanceTypes, - hostPortUsage: state.NewHostPortUsage(), + hostPortUsage: scheduling.NewHostPortUsage(), + volumeUsage: scheduling.NewVolumeLimits(kubeClient), topology: topology, requests: daemonResources, } } -func (n *Node) Add(pod *v1.Pod) error { +func (n *Node) Add(ctx context.Context, pod *v1.Pod) error { // Check Taints if err := n.Taints.Tolerates(pod); err != nil { return err } - if err := n.hostPortUsage.Add(pod); err != nil { + // exposed host ports on the node + if err := n.hostPortUsage.Validate(pod); err != nil { return err } @@ -89,9 +92,15 @@ func (n *Node) Add(pod *v1.Pod) error { } nodeRequirements.Add(topologyRequirements) + // determine the number of volumes that will be mounted if the pod schedules + mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod) + if err != nil { + return err + } + // Check instance type combinations requests := resources.Merge(n.requests, resources.RequestsForPods(pod)) - instanceTypes := filterInstanceTypes(n.InstanceTypeOptions, nodeRequirements, requests) + instanceTypes := filterInstanceTypes(n.InstanceTypeOptions, nodeRequirements, requests, mountedVolumeCount) if len(instanceTypes) == 0 { return fmt.Errorf("no instance type satisfied resources %s and requirements %s", resources.String(resources.RequestsForPods(pod)), nodeRequirements) } @@ -102,6 +111,8 @@ func (n *Node) Add(pod *v1.Pod) error { n.requests = requests n.Requirements = nodeRequirements n.topology.Record(pod, nodeRequirements) + n.hostPortUsage.Add(ctx, pod) + n.volumeUsage.Add(ctx, pod) return nil } @@ -120,9 +131,12 @@ func (n *Node) String() string { return fmt.Sprintf("node with %d pods requesting %s from types %s", len(n.Pods), resources.String(n.requests), itSb.String()) } -func filterInstanceTypes(instanceTypes []cloudprovider.InstanceType, requirements scheduling.Requirements, requests v1.ResourceList) []cloudprovider.InstanceType { +func filterInstanceTypes(instanceTypes []cloudprovider.InstanceType, requirements scheduling.Requirements, requests v1.ResourceList, mountedVolumeCount scheduling.VolumeCount) []cloudprovider.InstanceType { return lo.Filter(instanceTypes, func(instanceType cloudprovider.InstanceType, _ int) bool { - return compatible(instanceType, requirements) && fits(instanceType, requests) && hasOffering(instanceType, requirements) + return compatible(instanceType, requirements) && + fits(instanceType, requests) && + hasOffering(instanceType, requirements) && + scheduling.VolumeCount(instanceType.VolumeLimits()).Fits(mountedVolumeCount) }) } diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 1382962a60a7..64bea371b908 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -33,13 +33,15 @@ import ( "github.com/aws/karpenter/pkg/utils/resources" ) -func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler { +func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler { for provisioner := range instanceTypes { sort.Slice(instanceTypes[provisioner], func(i, j int) bool { return instanceTypes[provisioner][i].Price() < instanceTypes[provisioner][j].Price() }) } s := &Scheduler{ + ctx: ctx, + kubeClient: kubeClient, nodeTemplates: nodeTemplates, topology: topology, cluster: cluster, @@ -84,6 +86,7 @@ func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alp } type Scheduler struct { + ctx context.Context nodes []*Node inflight []*InFlightNode nodeTemplates []*scheduling.NodeTemplate @@ -94,6 +97,7 @@ type Scheduler struct { topology *Topology cluster *state.Cluster recorder events.Recorder + kubeClient client.Client } func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error) { @@ -112,7 +116,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) ([]*Node, error) } // Schedule to existing nodes or create a new node - if errors[pod] = s.add(pod); errors[pod] == nil { + if errors[pod] = s.add(ctx, pod); errors[pod] == nil { continue } @@ -165,10 +169,10 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod, logging.FromContext(ctx).Infof("Computed %d unready node(s) will fit %d pod(s)", inflightCount, existingCount) } -func (s *Scheduler) add(pod *v1.Pod) error { +func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error { // first try to schedule against an in-flight real node for _, node := range s.inflight { - if err := node.Add(pod); err == nil { + if err := node.Add(ctx, pod); err == nil { return nil } } @@ -178,7 +182,7 @@ func (s *Scheduler) add(pod *v1.Pod) error { // Pick existing node that we are about to create for _, node := range s.nodes { - if err := node.Add(pod); err == nil { + if err := node.Add(ctx, pod); err == nil { return nil } } @@ -196,8 +200,8 @@ func (s *Scheduler) add(pod *v1.Pod) error { } } - node := NewNode(nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes) - if err := node.Add(pod); err != nil { + node := NewNode(s.kubeClient, nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes) + if err := node.Add(ctx, pod); err != nil { errs = multierr.Append(errs, fmt.Errorf("incompatible with provisioner %q, %w", nodeTemplate.ProvisionerName, err)) continue } diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index 9349f0e9f12e..cb079cf6d7ea 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -112,7 +112,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { instanceTypes := fake.InstanceTypes(instanceCount) cloudProv := &fake.CloudProvider{InstanceTypes: instanceTypes} - scheduler := NewScheduler( + scheduler := NewScheduler(ctx, + nil, []*scheduling.NodeTemplate{scheduling.NewNodeTemplate(provisioner)}, nil, state.NewCluster(ctx, nil, cloudProv), diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 283ceb51a479..b945a5ecf613 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -16,6 +16,7 @@ package scheduling_test import ( "context" + "fmt" "math" "math/rand" "strings" @@ -2958,11 +2959,11 @@ var _ = Describe("Instance Type Compatibility", func() { ExpectApplied(ctx, env.Client, provisioner) pods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{ - fake.LabelInstanceSize: "large", + fake.LabelInstanceSize: "large", v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[0].Name(), }}), test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{ - fake.LabelInstanceSize: "small", + fake.LabelInstanceSize: "small", v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[4].Name(), }}), ) @@ -3848,6 +3849,101 @@ var _ = Describe("No Pre-Binding", func() { }) }) +var _ = Describe("Volume Limits", func() { + It("should launch multiple nodes if required due to volume limits", func() { + const csiProvider = "fake.csi.provider" + cloudProv.InstanceTypes = []cloudprovider.InstanceType{ + fake.NewInstanceType( + fake.InstanceTypeOptions{ + Name: "instance-type", + VolumeLimits: map[string]int{ + csiProvider: 10, + }, + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1024"), + v1.ResourcePods: resource.MustParse("1024"), + }, + }), + } + + sc := test.StorageClass(test.StorageClassOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class"}, + Provisioner: aws.String(csiProvider), + Zones: []string{"test-zone-1"}}) + ExpectApplied(ctx, env.Client, sc) + + var pods []*v1.Pod + for i := 0; i < 100; i++ { + pvcA := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + StorageClassName: aws.String("my-storage-class"), + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-a-%d", i)}, + }) + pvcB := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + StorageClassName: aws.String("my-storage-class"), + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-b-%d", i)}, + }) + ExpectApplied(ctx, env.Client, pvcA, pvcB) + pods = append(pods, test.UnschedulablePod(test.PodOptions{ + PersistentVolumeClaims: []string{pvcA.Name, pvcB.Name}, + })) + } + provisioner.Spec.Limits = nil + ExpectApplied(ctx, env.Client, provisioner) + ExpectProvisioned(ctx, env.Client, controller, pods...) + var nodeList v1.NodeList + Expect(env.Client.List(ctx, &nodeList)).To(Succeed()) + // 200 volumes / 10 volumes per node = 20 nodes + Expect(nodeList.Items).To(HaveLen(20)) + }) + It("should launch a single node if all pods use the same PVC", func() { + const csiProvider = "fake.csi.provider" + cloudProv.InstanceTypes = []cloudprovider.InstanceType{ + fake.NewInstanceType( + fake.InstanceTypeOptions{ + Name: "instance-type", + VolumeLimits: map[string]int{ + csiProvider: 10, + }, + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1024"), + v1.ResourcePods: resource.MustParse("1024"), + }, + }), + } + + sc := test.StorageClass(test.StorageClassOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-storage-class"}, + Provisioner: aws.String(csiProvider), + Zones: []string{"test-zone-1"}}) + ExpectApplied(ctx, env.Client, sc) + + pv := test.PersistentVolume(test.PersistentVolumeOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-volume"}, + Zones: []string{"test-zone-1"}}) + + pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-claim"}, + StorageClassName: aws.String("my-storage-class"), + VolumeName: pv.Name, + }) + ExpectApplied(ctx, env.Client, pv, pvc) + + var pods []*v1.Pod + for i := 0; i < 100; i++ { + pods = append(pods, test.UnschedulablePod(test.PodOptions{ + PersistentVolumeClaims: []string{pvc.Name, pvc.Name}, + })) + } + provisioner.Spec.Limits = nil + ExpectApplied(ctx, env.Client, provisioner) + ExpectProvisioned(ctx, env.Client, controller, pods...) + var nodeList v1.NodeList + Expect(env.Client.List(ctx, &nodeList)).To(Succeed()) + // 100 of the same PVC should all be schedulable on the same node + Expect(nodeList.Items).To(HaveLen(1)) + }) +}) + func MakePods(count int, options test.PodOptions) (pods []*v1.Pod) { for i := 0; i < count; i++ { pods = append(pods, test.UnschedulablePod(options)) diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index 1d276fd631bb..3d3136d96147 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -20,6 +20,8 @@ import ( "sort" "sync" + "github.com/aws/karpenter/pkg/scheduling" + "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -76,7 +78,8 @@ type Node struct { // included in the calculation for Available. DaemonSetRequested v1.ResourceList // HostPort usage of all pods that are bound to the node - HostPortUsage *HostPortUsage + HostPortUsage *scheduling.HostPortUsage + VolumeUsage *scheduling.VolumeLimits // Provisioner is the provisioner used to create the node. Provisioner *v1alpha5.Provisioner @@ -129,7 +132,8 @@ func (c *Cluster) ForEachNode(f func(n *Node) bool) { func (c *Cluster) newNode(node *v1.Node) *Node { n := &Node{ Node: node, - HostPortUsage: NewHostPortUsage(), + HostPortUsage: scheduling.NewHostPortUsage(), + VolumeUsage: scheduling.NewVolumeLimits(c.kubeClient), podRequests: map[types.NamespacedName]v1.ResourceList{}, } @@ -167,9 +171,8 @@ func (c *Cluster) newNode(node *v1.Node) *Node { daemonsetRequested = append(daemonsetRequested, requests) } requested = append(requested, requests) - if err := n.HostPortUsage.Add(pod); err != nil { - logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err) - } + n.HostPortUsage.Add(c.ctx, pod) + n.VolumeUsage.Add(c.ctx, pod) } n.DaemonSetRequested = resources.Merge(daemonsetRequested...) @@ -253,6 +256,7 @@ func (c *Cluster) updateNodeUsageFromPodDeletion(podKey types.NamespacedName) { n.Available = resources.Merge(n.Available, n.podRequests[podKey]) delete(n.podRequests, podKey) n.HostPortUsage.DeletePod(podKey) + n.VolumeUsage.DeletePod(podKey) // We can't easily track the changes to the DaemonsetRequested here as we no longer have the pod. We could keep up // with this separately, but if a daemonset pod is being deleted, it usually means the node is going down. In the @@ -331,9 +335,8 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { if podutils.IsOwnedByDaemonSet(pod) { n.DaemonSetRequested = resources.Merge(n.DaemonSetRequested, podRequests) } - if err := n.HostPortUsage.Add(pod); err != nil { - logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err) - } + n.HostPortUsage.Add(c.ctx, pod) + n.VolumeUsage.Add(c.ctx, pod) n.podRequests[podKey] = podRequests c.bindings[podKey] = n.Node.Name } diff --git a/pkg/controllers/state/hostportusage.go b/pkg/scheduling/hostportusage.go similarity index 82% rename from pkg/controllers/state/hostportusage.go rename to pkg/scheduling/hostportusage.go index 3f01c75af489..8f38e79fd4d1 100644 --- a/pkg/controllers/state/hostportusage.go +++ b/pkg/scheduling/hostportusage.go @@ -12,14 +12,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package state +package scheduling import ( + "context" "fmt" "net" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -61,17 +63,31 @@ func NewHostPortUsage() *HostPortUsage { } // Add adds a port to the HostPortUsage, returning an error in the case of a conflict -func (u *HostPortUsage) Add(pod *v1.Pod) error { +func (u *HostPortUsage) Add(ctx context.Context, pod *v1.Pod) { + newUsage, err := u.validate(pod) + if err != nil { + logging.FromContext(ctx).Errorf("invariant violated registering host port usage, %s, please file an issue", err) + } + u.reserved = append(u.reserved, newUsage...) +} + +// Validate performs host port conflict validation to allow for determining if we can schedule the pod to the node +// before doing so. +func (u *HostPortUsage) Validate(pod *v1.Pod) error { + _, err := u.validate(pod) + return err +} + +func (u *HostPortUsage) validate(pod *v1.Pod) ([]entry, error) { newUsage := getHostPorts(pod) for _, newEntry := range newUsage { for _, existing := range u.reserved { if newEntry.matches(existing) { - return fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing) + return nil, fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing) } } } - u.reserved = append(u.reserved, newUsage...) - return nil + return newUsage, nil } // DeletePod deletes all host port usage from the HostPortUsage that were created by the pod with the given name. diff --git a/pkg/scheduling/suite_test.go b/pkg/scheduling/suite_test.go index ef5c1792870b..91b0bcbb5ef6 100644 --- a/pkg/scheduling/suite_test.go +++ b/pkg/scheduling/suite_test.go @@ -15,12 +15,11 @@ limitations under the License. package scheduling_test import ( - v1 "k8s.io/api/core/v1" - "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/scheduling" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" ) var _ = Describe("Scheduling", func() { diff --git a/pkg/scheduling/volumelimits.go b/pkg/scheduling/volumelimits.go new file mode 100644 index 000000000000..1e04760c3a26 --- /dev/null +++ b/pkg/scheduling/volumelimits.go @@ -0,0 +1,199 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "context" + "fmt" + + "knative.dev/pkg/logging" + + v1 "k8s.io/api/core/v1" + sv1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// VolumeLimits tracks volume limits on a per node basis. The number of volumes that can be mounted varies by instance +// type. We need to be aware and track the mounted volume usage to inform our awareness of which pods can schedule to +// which nodes. +type VolumeLimits struct { + volumes volumeUsage + podVolumes map[types.NamespacedName]volumeUsage + kubeClient client.Client +} + +type volumeUsage map[string]sets.String + +func (u volumeUsage) Add(provisioner string, pvcID string) { + existing, ok := u[provisioner] + if !ok { + existing = sets.NewString() + u[provisioner] = existing + } + existing.Insert(pvcID) +} + +func (u volumeUsage) union(volumes volumeUsage) volumeUsage { + cp := volumeUsage{} + for k, v := range u { + cp[k] = sets.NewString(v.List()...) + } + for k, v := range volumes { + existing, ok := cp[k] + if !ok { + existing = sets.NewString() + cp[k] = existing + } + existing.Insert(v.List()...) + } + return cp +} + +func (u volumeUsage) insert(volumes volumeUsage) { + for k, v := range volumes { + existing, ok := u[k] + if !ok { + existing = sets.NewString() + u[k] = existing + } + existing.Insert(v.List()...) + } +} + +func (u volumeUsage) copy() volumeUsage { + cp := volumeUsage{} + for k, v := range u { + cp[k] = sets.NewString(v.List()...) + } + return cp +} + +func NewVolumeLimits(kubeClient client.Client) *VolumeLimits { + return &VolumeLimits{ + kubeClient: kubeClient, + volumes: volumeUsage{}, + podVolumes: map[types.NamespacedName]volumeUsage{}, + } +} + +func (v *VolumeLimits) Add(ctx context.Context, pod *v1.Pod) { + podVolumes, err := v.validate(ctx, pod) + if err != nil { + logging.FromContext(ctx).Errorf("inconsistent state error adding volume, %s, please file an issue", err) + } + v.podVolumes[client.ObjectKeyFromObject(pod)] = podVolumes + v.volumes = v.volumes.union(podVolumes) +} + +type VolumeCount map[string]int + +// Exceeds returns true if the volume count exceeds the limits provided. If there is no value for a storage provider, it +// is treated as unlimited. +func (c VolumeCount) Exceeds(limits VolumeCount) bool { + for k, v := range c { + limit, hasLimit := limits[k] + if !hasLimit { + continue + } + if v > limit { + return true + } + } + return false +} + +// Fits returns true if the rhs 'fits' within the volume count. +func (c VolumeCount) Fits(rhs VolumeCount) bool { + for k, v := range rhs { + limit, hasLimit := c[k] + if !hasLimit { + continue + } + if v > limit { + return false + } + } + return true +} + +func (v *VolumeLimits) Validate(ctx context.Context, pod *v1.Pod) (VolumeCount, error) { + podVolumes, err := v.validate(ctx, pod) + if err != nil { + return nil, err + } + result := VolumeCount{} + for k, v := range v.volumes.union(podVolumes) { + result[k] += len(v) + } + return result, nil +} + +func (v *VolumeLimits) validate(ctx context.Context, pod *v1.Pod) (volumeUsage, error) { + podPVCs := volumeUsage{} + + for _, volume := range pod.Spec.Volumes { + var pvcID string + var storageClassName *string + if volume.PersistentVolumeClaim != nil { + var pvc v1.PersistentVolumeClaim + if err := v.kubeClient.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.PersistentVolumeClaim.ClaimName}, &pvc); err != nil { + return nil, err + } + + pvcID = fmt.Sprintf("%s/%s", pod.Namespace, volume.PersistentVolumeClaim.ClaimName) + storageClassName = pvc.Spec.StorageClassName + } else if volume.Ephemeral != nil { + // generated name per https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/#persistentvolumeclaim-naming + pvcID = fmt.Sprintf("%s/%s-%s", pod.Namespace, pod.Name, volume.Name) + storageClassName = volume.Ephemeral.VolumeClaimTemplate.Spec.StorageClassName + } else { + continue + } + + provisioner := "unspecified" + if storageClassName != nil { + var sc sv1.StorageClass + if err := v.kubeClient.Get(ctx, client.ObjectKey{Name: *storageClassName}, &sc); err != nil { + return nil, err + } + provisioner = sc.Provisioner + } + podPVCs.Add(provisioner, pvcID) + } + return podPVCs, nil +} + +func (v *VolumeLimits) DeletePod(key types.NamespacedName) { + delete(v.podVolumes, key) + // volume names could be duplicated, so we re-create our volumes + v.volumes = volumeUsage{} + for _, c := range v.podVolumes { + v.volumes.insert(c) + } +} + +func (v *VolumeLimits) Copy() *VolumeLimits { + cp := &VolumeLimits{ + kubeClient: v.kubeClient, + volumes: v.volumes.copy(), + podVolumes: map[types.NamespacedName]volumeUsage{}, + } + for k, v := range v.podVolumes { + cp.podVolumes[k] = v.copy() + } + return cp +} diff --git a/pkg/test/storage.go b/pkg/test/storage.go index 8386cb81636b..ab9b507dade9 100644 --- a/pkg/test/storage.go +++ b/pkg/test/storage.go @@ -17,6 +17,8 @@ package test import ( "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/imdario/mergo" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -77,7 +79,8 @@ func PersistentVolumeClaim(overrides ...PersistentVolumeClaimOptions) *v1.Persis type StorageClassOptions struct { metav1.ObjectMeta - Zones []string + Zones []string + Provisioner *string } func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass { @@ -92,10 +95,13 @@ func StorageClass(overrides ...StorageClassOptions) *storagev1.StorageClass { if options.Zones != nil { allowedTopologies = []v1.TopologySelectorTerm{{MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{{Key: v1.LabelTopologyZone, Values: options.Zones}}}} } + if options.Provisioner == nil { + options.Provisioner = aws.String("test-provisioner") + } return &storagev1.StorageClass{ ObjectMeta: ObjectMeta(options.ObjectMeta), - Provisioner: "test-provisioner", + Provisioner: *options.Provisioner, AllowedTopologies: allowedTopologies, } } diff --git a/pkg/utils/options/options.go b/pkg/utils/options/options.go index f2e98d4ab948..75d20977c90d 100644 --- a/pkg/utils/options/options.go +++ b/pkg/utils/options/options.go @@ -45,6 +45,8 @@ func MustParse() Options { flag.BoolVar(&opts.AWSENILimitedPodDensity, "aws-eni-limited-pod-density", env.WithDefaultBool("AWS_ENI_LIMITED_POD_DENSITY", true), "Indicates whether new nodes should use ENI-based pod density") flag.StringVar(&opts.AWSDefaultInstanceProfile, "aws-default-instance-profile", env.WithDefaultString("AWS_DEFAULT_INSTANCE_PROFILE", ""), "The default instance profile to use when provisioning nodes in AWS") flag.BoolVar(&opts.AWSEnablePodENI, "aws-enable-pod-eni", env.WithDefaultBool("AWS_ENABLE_POD_ENI", false), "If true then instances that support pod ENI will report a vpc.amazonaws.com/pod-eni resource") + // See https://github.com/kubernetes-sigs/aws-ebs-csi-driver/blob/master/docs/options.md for the EBS CSI volume-attach-limit docs + flag.IntVar(&opts.AWSEBSVolumeAttachLimit, "aws-ebs-volume-attach-limit", env.WithDefaultInt("AWS_EBS_VOLUME_ATTACH_LIMIT", -1), "If set to a value >=0, this is the maximum number of EBS volumes attachable per node and should match the value passed to the EBS CSI driver.") flag.Parse() if err := opts.Validate(); err != nil { panic(err) @@ -66,6 +68,7 @@ type Options struct { AWSENILimitedPodDensity bool AWSDefaultInstanceProfile string AWSEnablePodENI bool + AWSEBSVolumeAttachLimit int } func (o Options) Validate() (err error) {