diff --git a/hack/docs/metrics_gen_docs.go b/hack/docs/metrics_gen_docs.go index fe5f461881ba..0eb3de01ae12 100644 --- a/hack/docs/metrics_gen_docs.go +++ b/hack/docs/metrics_gen_docs.go @@ -205,6 +205,12 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo { } func getFuncPackage(fun ast.Expr) string { + if pexpr, ok := fun.(*ast.ParenExpr); ok { + return getFuncPackage(pexpr.X) + } + if sexpr, ok := fun.(*ast.StarExpr); ok { + return getFuncPackage(sexpr.X) + } if sel, ok := fun.(*ast.SelectorExpr); ok { return fmt.Sprintf("%s", sel.X) } diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go b/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go index 710f18fdc50d..c37fa8ddd871 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/provider_validation.go @@ -279,4 +279,4 @@ func (a *AWS) validateKubeletConfiguration(kubeletConfig *v1alpha5.KubeletConfig } } return nil -} \ No newline at end of file +} diff --git a/pkg/cloudprovider/aws/instancetype.go b/pkg/cloudprovider/aws/instancetype.go index 86ec90d8fe63..2722201ea7a3 100644 --- a/pkg/cloudprovider/aws/instancetype.go +++ b/pkg/cloudprovider/aws/instancetype.go @@ -38,6 +38,8 @@ import ( // EC2VMAvailableMemoryFactor assumes the EC2 VM will consume <7.25% of the memory of a given machine const EC2VMAvailableMemoryFactor = .925 +var _ cloudprovider.InstanceType = (*InstanceType)(nil) + type InstanceType struct { *ec2.InstanceTypeInfo offerings []cloudprovider.Offering @@ -46,6 +48,34 @@ type InstanceType struct { resources v1.ResourceList provider *v1alpha1.AWS maxPods *int32 + maxVolumes *int32 +} + +func (i *InstanceType) MaxVolumes() int { + if i.maxVolumes != nil { + return int(*i.maxVolumes) + } + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html + switch i.Name() { + case "d3.8xlarge", "d3en.12xlarge": + return 3 + case "inf1.xlarge", "inf1.2xlarge": + return 26 + case "inf1.6xlarge": + return 23 + case "inf1.24xlarge": + return 11 + case "mac1.metal": + return 16 + case "u-6tb1.metal", "u-9tb1.metal", "u-12tb1.metal": + return 19 + } + + if aws.BoolValue(i.BareMetal) { + return 31 + } + + return 27 } func (i *InstanceType) Name() string { diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index 9a9db125dd01..263b6c8c1059 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -89,13 +89,17 @@ func (p *InstanceTypeProvider) newInstanceType(ctx context.Context, info *ec2.In provider: provider, offerings: offerings, } + opts := injection.GetOptions(ctx) // Precompute to minimize memory/compute overhead - instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI) + instanceType.resources = instanceType.computeResources(opts.AWSEnablePodENI) instanceType.overhead = instanceType.computeOverhead() instanceType.requirements = instanceType.computeRequirements() - if !injection.GetOptions(ctx).AWSENILimitedPodDensity { + if !opts.AWSENILimitedPodDensity { instanceType.maxPods = ptr.Int32(110) } + if opts.AWSVolumeAttachLimit >= 0 { + instanceType.maxVolumes = ptr.Int32(int32(opts.AWSVolumeAttachLimit)) + } return instanceType } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index ab30857f75ec..8356986beb51 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -85,6 +85,7 @@ var _ = BeforeSuite(func() { AWSNodeNameConvention: string(options.IPName), AWSENILimitedPodDensity: true, AWSEnablePodENI: true, + AWSVolumeAttachLimit: -1, AWSDefaultInstanceProfile: "test-instance-profile", } Expect(opts.Validate()).To(Succeed(), "Failed to validate options") @@ -1308,6 +1309,40 @@ var _ = Describe("Allocation", func() { ExpectScheduled(ctx, env.Client, pod) }) }) + Context("Volume Limits", func() { + It("should provide valid volume limits", func() { + instanceTypeCache.Flush() + // provide no max volume attach limit + optsCopy := opts + optsCopy.AWSVolumeAttachLimit = -1 + its, err := cloudProvider.GetInstanceTypes(injection.WithOptions(ctx, optsCopy), provisioner.Spec.Provider) + Expect(err).To(BeNil()) + for _, it := range its { + expected := 27 + // pulled from https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html + switch it.Name() { + case "inf1.6xlarge": + expected = 23 + case "m5.metal": + expected = 31 + case "inf1.2xlarge": + expected = 26 + } + Expect(it.MaxVolumes()).To(Equal(expected)) + } + }) + It("should respect the volume limits passed in the config", func() { + instanceTypeCache.Flush() + // specify a max volume attach limit + optsCopy := opts + optsCopy.AWSVolumeAttachLimit = 3 + its, err := cloudProvider.GetInstanceTypes(injection.WithOptions(ctx, optsCopy), provisioner.Spec.Provider) + Expect(err).To(BeNil()) + for _, it := range its { + Expect(it.MaxVolumes()).To(Equal(3)) + } + }) + }) }) Context("Defaulting", func() { // Intent here is that if updates occur on the controller, the Provisioner doesn't need to be recreated diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 47104927a09b..03146a34d730 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -49,6 +49,9 @@ type CloudProvider struct { CreateCalls []*cloudprovider.NodeRequest } +var _ cloudprovider.CloudProvider = (*CloudProvider)(nil) +var _ cloudprovider.InstanceType = (*InstanceType)(nil) + func (c *CloudProvider) Create(ctx context.Context, nodeRequest *cloudprovider.NodeRequest) (*v1.Node, error) { c.mu.Lock() c.CreateCalls = append(c.CreateCalls, nodeRequest) diff --git a/pkg/cloudprovider/fake/instancetype.go b/pkg/cloudprovider/fake/instancetype.go index 18e5863b99ec..5070cebf574d 100644 --- a/pkg/cloudprovider/fake/instancetype.go +++ b/pkg/cloudprovider/fake/instancetype.go @@ -87,7 +87,9 @@ func NewInstanceType(options InstanceTypeOptions) *InstanceType { OperatingSystems: options.OperatingSystems, Resources: options.Resources, Overhead: options.Overhead, - Price: options.Price}, + Price: options.Price, + VolumeLimits: options.VolumeLimits, + }, } } @@ -153,12 +155,20 @@ type InstanceTypeOptions struct { Overhead v1.ResourceList Resources v1.ResourceList Price float64 + VolumeLimits *int } type InstanceType struct { options InstanceTypeOptions } +func (i *InstanceType) MaxVolumes() int { + if i.options.VolumeLimits != nil { + return *i.options.VolumeLimits + } + return 27 +} + func (i *InstanceType) Name() string { return i.options.Name } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 6a03a6ca01c3..6698bf8f9b01 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -74,6 +74,8 @@ type InstanceType interface { // Overhead is the amount of resource overhead expected to be used by kubelet and any other system daemons outside // of Kubernetes. Overhead() v1.ResourceList + // MaxVolumes returns the maximum number of volumes that can be mounted to the node. + MaxVolumes() int // Price is a metric that is used to optimize pod placement onto nodes. This can be an actual monetary price per hour // for the instance type, or just a weighting where lower 'prices' are preferred. Price() float64 diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index d074bd746500..b27141f7361f 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -220,7 +220,7 @@ func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod) ([]*schedule return nil, fmt.Errorf("getting daemon overhead, %w", err) } - return scheduler.NewScheduler(nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods) + return scheduler.NewScheduler(ctx, nodeTemplates, provisionerList.Items, p.cluster, topology, instanceTypes, daemonOverhead, p.recorder).Solve(ctx, pods) } func (p *Provisioner) launch(ctx context.Context, node *scheduler.Node) error { diff --git a/pkg/controllers/provisioning/scheduling/inflightnode.go b/pkg/controllers/provisioning/scheduling/inflightnode.go index 839651994c53..626cc4b2796a 100644 --- a/pkg/controllers/provisioning/scheduling/inflightnode.go +++ b/pkg/controllers/provisioning/scheduling/inflightnode.go @@ -35,20 +35,23 @@ type InFlightNode struct { requirements scheduling.Requirements available v1.ResourceList startupTolerations []v1.Toleration - hostPortUsage *state.HostPortUsage + hostPortUsage *scheduling.HostPortUsage + volumeUsage *scheduling.VolumeLimits + maxVolumes int } func NewInFlightNode(n *state.Node, topology *Topology, startupTaints []v1.Taint, daemonResources v1.ResourceList) *InFlightNode { // the remaining daemonResources to schedule are the total daemonResources minus what has already scheduled remainingDaemonResources := resources.Subtract(daemonResources, n.DaemonSetRequested) - node := &InFlightNode{ Node: n.Node, + maxVolumes: n.InstanceType.MaxVolumes(), available: n.Available, topology: topology, requests: remainingDaemonResources, requirements: scheduling.NewLabelRequirements(n.Node.Labels), hostPortUsage: n.HostPortUsage.Copy(), + volumeUsage: n.VolumeUsage.Copy(), } if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" { @@ -87,9 +90,18 @@ func (n *InFlightNode) Add(pod *v1.Pod) error { return err } - if err := n.hostPortUsage.Add(pod); err != nil { + if err := n.hostPortUsage.Validate(pod); err != nil { + return err + } + + // determine the number of volumes that will be mounted if the pod schedules + mountedVolumeCount, err := n.volumeUsage.Validate(pod) + if err != nil { return err } + if mountedVolumeCount > n.maxVolumes { + return fmt.Errorf("%d would exceed max volume count of %d", mountedVolumeCount, n.maxVolumes) + } // check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight // node, which at this point can't be increased in size @@ -122,5 +134,7 @@ func (n *InFlightNode) Add(pod *v1.Pod) error { n.requests = requests n.requirements = nodeRequirements n.topology.Record(pod, nodeRequirements) + n.hostPortUsage.Add(pod) + n.volumeUsage.Add(pod) return nil } diff --git a/pkg/controllers/provisioning/scheduling/node.go b/pkg/controllers/provisioning/scheduling/node.go index bb67a1b8f7c6..c6c40914bbc1 100644 --- a/pkg/controllers/provisioning/scheduling/node.go +++ b/pkg/controllers/provisioning/scheduling/node.go @@ -15,6 +15,7 @@ limitations under the License. package scheduling import ( + "context" "fmt" "strings" "sync/atomic" @@ -25,7 +26,6 @@ import ( "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" - "github.com/aws/karpenter/pkg/controllers/state" "github.com/aws/karpenter/pkg/scheduling" "github.com/aws/karpenter/pkg/utils/resources" "github.com/aws/karpenter/pkg/utils/sets" @@ -40,12 +40,13 @@ type Node struct { topology *Topology requests v1.ResourceList - hostPortUsage *state.HostPortUsage + hostPortUsage *scheduling.HostPortUsage + volumeUsage *scheduling.VolumeLimits } var nodeID int64 -func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node { +func NewNode(ctx context.Context, nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonResources v1.ResourceList, instanceTypes []cloudprovider.InstanceType) *Node { // Copy the template, and add hostname hostname := fmt.Sprintf("hostname-placeholder-%04d", atomic.AddInt64(&nodeID, 1)) topology.Register(v1.LabelHostname, hostname) @@ -54,7 +55,8 @@ func NewNode(nodeTemplate *scheduling.NodeTemplate, topology *Topology, daemonRe return &Node{ NodeTemplate: template, InstanceTypeOptions: instanceTypes, - hostPortUsage: state.NewHostPortUsage(), + hostPortUsage: scheduling.NewHostPortUsage(ctx), + volumeUsage: scheduling.NewVolumeLimits(ctx), topology: topology, requests: daemonResources, } @@ -66,7 +68,8 @@ func (n *Node) Add(pod *v1.Pod) error { return err } - if err := n.hostPortUsage.Add(pod); err != nil { + // exposed host ports on the node + if err := n.hostPortUsage.Validate(pod); err != nil { return err } @@ -91,7 +94,12 @@ func (n *Node) Add(pod *v1.Pod) error { // Check instance type combinations requests := resources.Merge(n.requests, resources.RequestsForPods(pod)) - instanceTypes := filterInstanceTypes(n.InstanceTypeOptions, nodeRequirements, requests) + // determine the number of volumes that will be mounted if the pod schedules + mountedVolumeCount, err := n.volumeUsage.Validate(pod) + if err != nil { + return err + } + instanceTypes := filterInstanceTypes(n.InstanceTypeOptions, nodeRequirements, requests, mountedVolumeCount) if len(instanceTypes) == 0 { return fmt.Errorf("no instance type satisfied resources %s and requirements %s", resources.String(resources.RequestsForPods(pod)), nodeRequirements) } @@ -102,6 +110,8 @@ func (n *Node) Add(pod *v1.Pod) error { n.requests = requests n.Requirements = nodeRequirements n.topology.Record(pod, nodeRequirements) + n.hostPortUsage.Add(pod) + n.volumeUsage.Add(pod) return nil } @@ -120,9 +130,12 @@ func (n *Node) String() string { return fmt.Sprintf("node with %d pods requesting %s from types %s", len(n.Pods), resources.String(n.requests), itSb.String()) } -func filterInstanceTypes(instanceTypes []cloudprovider.InstanceType, requirements scheduling.Requirements, requests v1.ResourceList) []cloudprovider.InstanceType { +func filterInstanceTypes(instanceTypes []cloudprovider.InstanceType, requirements scheduling.Requirements, requests v1.ResourceList, volumesPerNode int) []cloudprovider.InstanceType { return lo.Filter(instanceTypes, func(instanceType cloudprovider.InstanceType, _ int) bool { - return compatible(instanceType, requirements) && fits(instanceType, requests) && hasOffering(instanceType, requirements) + return compatible(instanceType, requirements) && + fits(instanceType, requests) && + hasOffering(instanceType, requirements) && + instanceType.MaxVolumes() >= volumesPerNode }) } diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 1382962a60a7..c332332fa02d 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -33,13 +33,14 @@ import ( "github.com/aws/karpenter/pkg/utils/resources" ) -func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler { +func NewScheduler(ctx context.Context, nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alpha5.Provisioner, cluster *state.Cluster, topology *Topology, instanceTypes map[string][]cloudprovider.InstanceType, daemonOverhead map[*scheduling.NodeTemplate]v1.ResourceList, recorder events.Recorder) *Scheduler { for provisioner := range instanceTypes { sort.Slice(instanceTypes[provisioner], func(i, j int) bool { return instanceTypes[provisioner][i].Price() < instanceTypes[provisioner][j].Price() }) } s := &Scheduler{ + ctx: ctx, nodeTemplates: nodeTemplates, topology: topology, cluster: cluster, @@ -84,6 +85,7 @@ func NewScheduler(nodeTemplates []*scheduling.NodeTemplate, provisioners []v1alp } type Scheduler struct { + ctx context.Context nodes []*Node inflight []*InFlightNode nodeTemplates []*scheduling.NodeTemplate @@ -196,7 +198,7 @@ func (s *Scheduler) add(pod *v1.Pod) error { } } - node := NewNode(nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes) + node := NewNode(s.ctx, nodeTemplate, s.topology, s.daemonOverhead[nodeTemplate], instanceTypes) if err := node.Add(pod); err != nil { errs = multierr.Append(errs, fmt.Errorf("incompatible with provisioner %q, %w", nodeTemplate.ProvisionerName, err)) continue diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 283ceb51a479..aee40d92f555 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -16,6 +16,7 @@ package scheduling_test import ( "context" + "fmt" "math" "math/rand" "strings" @@ -2958,11 +2959,11 @@ var _ = Describe("Instance Type Compatibility", func() { ExpectApplied(ctx, env.Client, provisioner) pods := ExpectProvisioned(ctx, env.Client, controller, test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{ - fake.LabelInstanceSize: "large", + fake.LabelInstanceSize: "large", v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[0].Name(), }}), test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{ - fake.LabelInstanceSize: "small", + fake.LabelInstanceSize: "small", v1.LabelInstanceTypeStable: cloudProv.InstanceTypes[4].Name(), }}), ) @@ -3848,6 +3849,80 @@ var _ = Describe("No Pre-Binding", func() { }) }) +var _ = Describe("Volume Limits", func() { + It("should launch multiple nodes if required due to volume limits", func() { + cloudProv.InstanceTypes = []cloudprovider.InstanceType{ + fake.NewInstanceType( + fake.InstanceTypeOptions{ + Name: "instance-type", + VolumeLimits: aws.Int(10), + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1024"), + v1.ResourcePods: resource.MustParse("1024"), + }, + }), + } + + var pods []*v1.Pod + for i := 0; i < 100; i++ { + pvcA := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-a-%d", i)}, + }) + pvcB := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("my-claim-b-%d", i)}, + }) + ExpectApplied(ctx, env.Client, pvcA, pvcB) + pods = append(pods, test.UnschedulablePod(test.PodOptions{ + PersistentVolumeClaims: []string{pvcA.Name, pvcB.Name}, + })) + } + provisioner.Spec.Limits = nil + ExpectApplied(ctx, env.Client, provisioner) + ExpectProvisioned(ctx, env.Client, controller, pods...) + var nodeList v1.NodeList + Expect(env.Client.List(ctx, &nodeList)).To(Succeed()) + // 200 volumes / 10 volumes per node = 20 nodes + Expect(nodeList.Items).To(HaveLen(20)) + }) + It("should launch a single node if all pods use the same PVC", func() { + cloudProv.InstanceTypes = []cloudprovider.InstanceType{ + fake.NewInstanceType( + fake.InstanceTypeOptions{ + Name: "instance-type", + VolumeLimits: aws.Int(10), + Resources: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1024"), + v1.ResourcePods: resource.MustParse("1024"), + }, + }), + } + + pv := test.PersistentVolume(test.PersistentVolumeOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-volume"}, + Zones: []string{"test-zone-1"}}) + + pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + ObjectMeta: metav1.ObjectMeta{Name: "my-claim"}, + VolumeName: pv.Name, + }) + ExpectApplied(ctx, env.Client, pv, pvc) + + var pods []*v1.Pod + for i := 0; i < 100; i++ { + pods = append(pods, test.UnschedulablePod(test.PodOptions{ + PersistentVolumeClaims: []string{pvc.Name, pvc.Name}, + })) + } + provisioner.Spec.Limits = nil + ExpectApplied(ctx, env.Client, provisioner) + ExpectProvisioned(ctx, env.Client, controller, pods...) + var nodeList v1.NodeList + Expect(env.Client.List(ctx, &nodeList)).To(Succeed()) + // 100 of the same PVC should all be schedulable on the same node + Expect(nodeList.Items).To(HaveLen(1)) + }) +}) + func MakePods(count int, options test.PodOptions) (pods []*v1.Pod) { for i := 0; i < count; i++ { pods = append(pods, test.UnschedulablePod(options)) diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index 1d276fd631bb..a105e27367ab 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -20,6 +20,8 @@ import ( "sort" "sync" + "github.com/aws/karpenter/pkg/scheduling" + "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -76,7 +78,8 @@ type Node struct { // included in the calculation for Available. DaemonSetRequested v1.ResourceList // HostPort usage of all pods that are bound to the node - HostPortUsage *HostPortUsage + HostPortUsage *scheduling.HostPortUsage + VolumeUsage *scheduling.VolumeLimits // Provisioner is the provisioner used to create the node. Provisioner *v1alpha5.Provisioner @@ -129,7 +132,8 @@ func (c *Cluster) ForEachNode(f func(n *Node) bool) { func (c *Cluster) newNode(node *v1.Node) *Node { n := &Node{ Node: node, - HostPortUsage: NewHostPortUsage(), + HostPortUsage: scheduling.NewHostPortUsage(c.ctx), + VolumeUsage: scheduling.NewVolumeLimits(c.ctx), podRequests: map[types.NamespacedName]v1.ResourceList{}, } @@ -167,9 +171,8 @@ func (c *Cluster) newNode(node *v1.Node) *Node { daemonsetRequested = append(daemonsetRequested, requests) } requested = append(requested, requests) - if err := n.HostPortUsage.Add(pod); err != nil { - logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err) - } + n.HostPortUsage.Add(pod) + n.VolumeUsage.Add(pod) } n.DaemonSetRequested = resources.Merge(daemonsetRequested...) @@ -253,6 +256,7 @@ func (c *Cluster) updateNodeUsageFromPodDeletion(podKey types.NamespacedName) { n.Available = resources.Merge(n.Available, n.podRequests[podKey]) delete(n.podRequests, podKey) n.HostPortUsage.DeletePod(podKey) + n.VolumeUsage.DeletePod(podKey) // We can't easily track the changes to the DaemonsetRequested here as we no longer have the pod. We could keep up // with this separately, but if a daemonset pod is being deleted, it usually means the node is going down. In the @@ -331,9 +335,8 @@ func (c *Cluster) updateNodeUsageFromPod(pod *v1.Pod) { if podutils.IsOwnedByDaemonSet(pod) { n.DaemonSetRequested = resources.Merge(n.DaemonSetRequested, podRequests) } - if err := n.HostPortUsage.Add(pod); err != nil { - logging.FromContext(c.ctx).Errorf("inconsistent state, error tracking host port usage on node %s, %s", n.Node.Name, err) - } + n.HostPortUsage.Add(pod) + n.VolumeUsage.Add(pod) n.podRequests[podKey] = podRequests c.bindings[podKey] = n.Node.Name } diff --git a/pkg/controllers/state/hostportusage.go b/pkg/scheduling/hostportusage.go similarity index 80% rename from pkg/controllers/state/hostportusage.go rename to pkg/scheduling/hostportusage.go index 3f01c75af489..7caa1d2c6669 100644 --- a/pkg/controllers/state/hostportusage.go +++ b/pkg/scheduling/hostportusage.go @@ -12,14 +12,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package state +package scheduling import ( + "context" "fmt" "net" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -28,6 +30,7 @@ import ( // together. type HostPortUsage struct { reserved []entry + ctx context.Context } type entry struct { podName types.NamespacedName @@ -56,22 +59,36 @@ func (e entry) matches(rhs entry) bool { return true } -func NewHostPortUsage() *HostPortUsage { - return &HostPortUsage{} +func NewHostPortUsage(ctx context.Context) *HostPortUsage { + return &HostPortUsage{ctx: ctx} } // Add adds a port to the HostPortUsage, returning an error in the case of a conflict -func (u *HostPortUsage) Add(pod *v1.Pod) error { +func (u *HostPortUsage) Add(pod *v1.Pod) { + newUsage, err := u.validate(pod) + if err != nil { + logging.FromContext(u.ctx).Errorf("inconsistent state registering host port usage, %s", err) + } + u.reserved = append(u.reserved, newUsage...) +} + +// Validate performs host port conflict validation to allow for determining if we can schedule the pod to the node +// before doing so. +func (u *HostPortUsage) Validate(pod *v1.Pod) error { + _, err := u.validate(pod) + return err +} + +func (u *HostPortUsage) validate(pod *v1.Pod) ([]entry, error) { newUsage := getHostPorts(pod) for _, newEntry := range newUsage { for _, existing := range u.reserved { if newEntry.matches(existing) { - return fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing) + return nil, fmt.Errorf("%s conflicts with existing HostPort configuration %s", newEntry, existing) } } } - u.reserved = append(u.reserved, newUsage...) - return nil + return newUsage, nil } // DeletePod deletes all host port usage from the HostPortUsage that were created by the pod with the given name. diff --git a/pkg/scheduling/volumelimits.go b/pkg/scheduling/volumelimits.go new file mode 100644 index 000000000000..11faa878949b --- /dev/null +++ b/pkg/scheduling/volumelimits.go @@ -0,0 +1,94 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/pkg/logging" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// VolumeLimits tracks volume limits on a per node basis. The number of volumes that can be mounted varies by instance +// type. We need to be aware and track the mounted volume usage to inform our awareness of which pods can schedule to +// which nodes. +type VolumeLimits struct { + ctx context.Context + volumes sets.String + podVolumes map[types.NamespacedName]sets.String +} + +func NewVolumeLimits(ctx context.Context) *VolumeLimits { + return &VolumeLimits{ + ctx: ctx, + volumes: sets.NewString(), + podVolumes: map[types.NamespacedName]sets.String{}, + } +} + +func (v *VolumeLimits) Add(pod *v1.Pod) { + podVolumes, err := v.validate(pod) + if err != nil { + logging.FromContext(v.ctx).Errorf("inconsistent state registering volume li mit, %s", err) + } + v.podVolumes[client.ObjectKeyFromObject(pod)] = podVolumes + v.volumes = v.volumes.Union(podVolumes) +} + +func (v *VolumeLimits) Validate(pod *v1.Pod) (int, error) { + podVolumes, err := v.validate(pod) + result := v.volumes.Union(podVolumes) + return len(result), err +} + +func (v *VolumeLimits) validate(pod *v1.Pod) (sets.String, error) { + podPVCs := sets.String{} + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil { + podPVCs.Insert(fmt.Sprintf("%s/%s", pod.Namespace, volume.PersistentVolumeClaim.ClaimName)) + } else if volume.Ephemeral != nil { + // generated name per https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/#persistentvolumeclaim-naming + pvcName := fmt.Sprintf("%s-%s", pod.Name, volume.Name) + podPVCs.Insert(fmt.Sprintf("%s/%s", pod.Namespace, pvcName)) + } + } + + return podPVCs, nil +} + +func (v *VolumeLimits) DeletePod(key types.NamespacedName) { + delete(v.podVolumes, key) + // volume names could be duplicated, so we re-create our volumes + v.volumes = sets.NewString() + for _, c := range v.podVolumes { + v.volumes.Insert(c.List()...) + } +} + +func (v *VolumeLimits) Copy() *VolumeLimits { + cp := &VolumeLimits{ + ctx: v.ctx, + volumes: sets.NewString(v.volumes.List()...), + podVolumes: map[types.NamespacedName]sets.String{}, + } + for k, v := range v.podVolumes { + cp.podVolumes[k] = sets.NewString(v.List()...) + } + return cp +} diff --git a/pkg/utils/options/options.go b/pkg/utils/options/options.go index f2e98d4ab948..8103fdc18022 100644 --- a/pkg/utils/options/options.go +++ b/pkg/utils/options/options.go @@ -45,6 +45,7 @@ func MustParse() Options { flag.BoolVar(&opts.AWSENILimitedPodDensity, "aws-eni-limited-pod-density", env.WithDefaultBool("AWS_ENI_LIMITED_POD_DENSITY", true), "Indicates whether new nodes should use ENI-based pod density") flag.StringVar(&opts.AWSDefaultInstanceProfile, "aws-default-instance-profile", env.WithDefaultString("AWS_DEFAULT_INSTANCE_PROFILE", ""), "The default instance profile to use when provisioning nodes in AWS") flag.BoolVar(&opts.AWSEnablePodENI, "aws-enable-pod-eni", env.WithDefaultBool("AWS_ENABLE_POD_ENI", false), "If true then instances that support pod ENI will report a vpc.amazonaws.com/pod-eni resource") + flag.IntVar(&opts.AWSVolumeAttachLimit, "aws-volume-attach-limit", env.WithDefaultInt("AWS_VOLUME_ATTACH_LIMIT", -1), "If set to a value >=0, this is the maximum number of volumes attachable per node and should match the value passed to the EBS CSI driver.") flag.Parse() if err := opts.Validate(); err != nil { panic(err) @@ -66,6 +67,8 @@ type Options struct { AWSENILimitedPodDensity bool AWSDefaultInstanceProfile string AWSEnablePodENI bool + // AWSVolumeAttachLimit is the setting of the volume-attach-limit from the ebs CSI driver + AWSVolumeAttachLimit int } func (o Options) Validate() (err error) {