From afc6759d3f9309412d3243233af11aa14f074373 Mon Sep 17 00:00:00 2001 From: Todd Date: Tue, 15 Mar 2022 14:17:14 -0500 Subject: [PATCH] introduce theoretical node (#1515) - fix issue related to invalid instance types - start moving instance type compatibility from packer to scheduler --- pkg/cloudprovider/aws/fake/ec2api.go | 6 +- pkg/cloudprovider/aws/suite_test.go | 35 +++- .../provisioning/binpacking/packable.go | 99 +--------- .../provisioning/binpacking/packer.go | 2 +- .../provisioning/binpacking/packer_test.go | 4 +- pkg/controllers/provisioning/provisioner.go | 8 +- .../provisioning/scheduling/node.go | 173 ++++++++++++++++++ .../provisioning/scheduling/scheduler.go | 84 +++------ .../provisioning/scheduling/suite_test.go | 98 ++++++++++ pkg/utils/resources/resources.go | 13 -- 10 files changed, 343 insertions(+), 179 deletions(-) create mode 100644 pkg/controllers/provisioning/scheduling/node.go diff --git a/pkg/cloudprovider/aws/fake/ec2api.go b/pkg/cloudprovider/aws/fake/ec2api.go index 97179ad890ed..b247ea33036f 100644 --- a/pkg/cloudprovider/aws/fake/ec2api.go +++ b/pkg/cloudprovider/aws/fake/ec2api.go @@ -323,7 +323,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2 DefaultVCpus: aws.Int64(2), }, MemoryInfo: &ec2.MemoryInfo{ - SizeInMiB: aws.Int64(2 * 1024), + SizeInMiB: aws.Int64(4 * 1024), }, NetworkInfo: &ec2.NetworkInfo{ MaximumNetworkInterfaces: aws.Int64(4), @@ -450,6 +450,10 @@ func (e *EC2API) DescribeInstanceTypeOfferingsPagesWithContext(_ context.Context InstanceType: aws.String("inf1.6xlarge"), Location: aws.String("test-zone-1a"), }, + { + InstanceType: aws.String("c6g.large"), + Location: aws.String("test-zone-1a"), + }, }, }, false) return nil diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index a4d9e1686a12..c625ab3e93de 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -184,6 +184,7 @@ var _ = Describe("Allocation", func() { Expect(supportsPodENI()).To(Equal(true)) } }) + // TODO(todd): this set of tests should move to scheduler once resource handling is made more generic It("should launch instances for Nvidia GPU resource requests", func() { nodeNames := sets.NewString() for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, @@ -214,6 +215,37 @@ var _ = Describe("Allocation", func() { } Expect(nodeNames.Len()).To(Equal(2)) }) + It("should launch pods with Nvidia and Neuron resources on different instances", func() { + nodeNames := sets.NewString() + for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, + Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, + }, + }), + // Should pack onto a different instance since no type has both Nvidia and Neuron + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, + Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, + }, + })) { + node := ExpectScheduled(ctx, env.Client, pod) + nodeNames.Insert(node.Name) + } + Expect(nodeNames.Len()).To(Equal(2)) + }) + It("should fail to schedule a pod with both Nvidia and Neuron resources requests", func() { + pods := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1"), resources.AWSNeuron: resource.MustParse("1")}, + Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1"), resources.AWSNeuron: resource.MustParse("1")}, + }, + })) + ExpectNotScheduled(ctx, env.Client, pods[0]) + }) It("should not schedule a non-GPU workload on a node w/GPU", func() { Skip("enable after scheduling and binpacking are merged into the same process") nodeNames := sets.NewString() @@ -536,7 +568,8 @@ var _ = Describe("Allocation", func() { }) Context("Subnets", func() { It("should default to the cluster's subnets", func() { - pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0] + pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod( + test.PodOptions{NodeSelector: map[string]string{v1.LabelArchStable: v1alpha5.ArchitectureAmd64}}))[0] ExpectScheduled(ctx, env.Client, pod) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) diff --git a/pkg/controllers/provisioning/binpacking/packable.go b/pkg/controllers/provisioning/binpacking/packable.go index d2b355acc145..83c9f6125d5d 100644 --- a/pkg/controllers/provisioning/binpacking/packable.go +++ b/pkg/controllers/provisioning/binpacking/packable.go @@ -16,15 +16,12 @@ package binpacking import ( "context" - "fmt" "sort" - "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "knative.dev/pkg/logging" - "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/utils/resources" ) @@ -40,26 +37,12 @@ type Result struct { unpacked []*v1.Pod } -// PackablesFor creates viable packables for the provided constraints, excluding -// those that can't fit resources or violate constraints. -func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceType, constraints *v1alpha5.Constraints, pods []*v1.Pod, daemons []*v1.Pod) []*Packable { - packables := []*Packable{} +// PackablesFor creates viable packables for the provided constraints, excluding those that can't fit the kubelet +// or daemonsets. This assumes that instanceTypes has already been pre-filtered for pod compatibility. +func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceType, daemons []*v1.Pod) []*Packable { + var packables []*Packable for _, instanceType := range instanceTypes { packable := PackableFor(instanceType) - // First pass at filtering down to viable instance types; - // additional filtering will be done by later steps (such as - // removing instance types that obviously lack resources, such - // as GPUs, for the workload being presented). - if err := multierr.Combine( - packable.validateOfferings(constraints), - packable.validateInstanceType(constraints), - packable.validateArchitecture(constraints), - packable.validateOperatingSystems(constraints), - packable.validateAWSPodENI(pods), - packable.validateGPUs(pods), - ); err != nil { - continue - } // Calculate Kubelet Overhead if ok := packable.reserve(instanceType.Overhead()); !ok { logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for kubelet and system overhead", packable.Name()) @@ -173,80 +156,6 @@ func (p *Packable) reservePod(pod *v1.Pod) bool { return p.reserve(requests) } -func (p *Packable) validateInstanceType(constraints *v1alpha5.Constraints) error { - if !constraints.Requirements.InstanceTypes().Has(p.Name()) { - return fmt.Errorf("instance type %s not in %s", p.Name(), constraints.Requirements.InstanceTypes()) - } - return nil -} - -func (p *Packable) validateArchitecture(constraints *v1alpha5.Constraints) error { - if !constraints.Requirements.Architectures().Has(p.Architecture()) { - return fmt.Errorf("architecture %s not in %s", p.Name(), constraints.Requirements.Architectures()) - } - return nil -} - -func (p *Packable) validateOperatingSystems(constraints *v1alpha5.Constraints) error { - if constraints.Requirements.OperatingSystems().Intersection(p.OperatingSystems()).Len() == 0 { - return fmt.Errorf("operating systems %s not in %s", p.Name(), constraints.Requirements.OperatingSystems()) - } - return nil -} - -func (p *Packable) validateOfferings(constraints *v1alpha5.Constraints) error { - for _, offering := range p.Offerings() { - if constraints.Requirements.CapacityTypes().Has(offering.CapacityType) && constraints.Requirements.Zones().Has(offering.Zone) { - return nil - } - } - return fmt.Errorf("offerings %v are not available for capacity types %s and zones %s", p.Offerings(), constraints.Requirements.CapacityTypes(), constraints.Requirements.Zones()) -} - -func (p *Packable) validateGPUs(pods []*v1.Pod) error { - gpuResources := map[v1.ResourceName]*resource.Quantity{ - resources.NvidiaGPU: p.InstanceType.NvidiaGPUs(), - resources.AMDGPU: p.InstanceType.AMDGPUs(), - resources.AWSNeuron: p.InstanceType.AWSNeurons(), - } - for resourceName, instanceTypeResourceQuantity := range gpuResources { - if p.requiresResource(pods, resourceName) && instanceTypeResourceQuantity.IsZero() { - return fmt.Errorf("%s is required", resourceName) - } else if !p.requiresResource(pods, resourceName) && !instanceTypeResourceQuantity.IsZero() { - return fmt.Errorf("%s is not required", resourceName) - } - } - return nil -} - -func (p *Packable) requiresResource(pods []*v1.Pod, resource v1.ResourceName) bool { - for _, pod := range pods { - for _, container := range pod.Spec.Containers { - if _, ok := container.Resources.Requests[resource]; ok { - return true - } - if _, ok := container.Resources.Limits[resource]; ok { - return true - } - } - } - return false -} - -func (p *Packable) validateAWSPodENI(pods []*v1.Pod) error { - for _, pod := range pods { - for _, container := range pod.Spec.Containers { - if _, ok := container.Resources.Requests[resources.AWSPodENI]; ok { - if p.InstanceType.AWSPodENI().IsZero() { - return fmt.Errorf("aws pod eni is required") - } - return nil - } - } - } - return nil -} - func packableNames(instanceTypes []*Packable) []string { names := []string{} for _, instanceType := range instanceTypes { diff --git a/pkg/controllers/provisioning/binpacking/packer.go b/pkg/controllers/provisioning/binpacking/packer.go index 25a171c4c0dd..c9967847fb68 100644 --- a/pkg/controllers/provisioning/binpacking/packer.go +++ b/pkg/controllers/provisioning/binpacking/packer.go @@ -105,7 +105,7 @@ func (p *Packer) Pack(ctx context.Context, constraints *v1alpha5.Constraints, po var packings []*Packing var packing *Packing remainingPods := pods - emptyPackables := PackablesFor(ctx, instanceTypes, constraints, pods, daemons) + emptyPackables := PackablesFor(ctx, instanceTypes, daemons) for len(remainingPods) > 0 { packables := []*Packable{} for _, packable := range emptyPackables { diff --git a/pkg/controllers/provisioning/binpacking/packer_test.go b/pkg/controllers/provisioning/binpacking/packer_test.go index 899791849ab2..2c3836f9eeea 100644 --- a/pkg/controllers/provisioning/binpacking/packer_test.go +++ b/pkg/controllers/provisioning/binpacking/packer_test.go @@ -52,7 +52,7 @@ func BenchmarkPacker(b *testing.B) { }, }) - schedule := &scheduling.Schedule{ + node := &scheduling.Node{ Constraints: &v1alpha5.Constraints{ Requirements: v1alpha5.NewRequirements([]v1.NodeSelectorRequirement{ {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3"}}, @@ -67,7 +67,7 @@ func BenchmarkPacker(b *testing.B) { // Pack benchmark for i := 0; i < b.N; i++ { - if packings, err := packer.Pack(ctx, schedule.Constraints, pods, instanceTypes); err != nil || len(packings) == 0 { + if packings, err := packer.Pack(ctx, node.Constraints, pods, instanceTypes); err != nil || len(packings) == 0 { b.FailNow() } } diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 13e7c22b4a4a..822a4a26e85d 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -105,19 +105,19 @@ func (p *Provisioner) provision(ctx context.Context) error { return fmt.Errorf("getting instance types, %w", err) } // Separate pods by scheduling constraints - schedules, err := p.scheduler.Solve(ctx, p.Provisioner, instanceTypes, pods) + nodes, err := p.scheduler.Solve(ctx, p.Provisioner, instanceTypes, pods) if err != nil { return fmt.Errorf("solving scheduling constraints, %w", err) } // Launch capacity and bind pods - workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(i int) { - packings, err := p.packer.Pack(ctx, schedules[i].Constraints, schedules[i].Pods, instanceTypes) + workqueue.ParallelizeUntil(ctx, len(nodes), len(nodes), func(i int) { + packings, err := p.packer.Pack(ctx, nodes[i].Constraints, nodes[i].Pods, nodes[i].InstanceTypeOptions) if err != nil { logging.FromContext(ctx).Errorf("Could not pack pods, %s", err) return } workqueue.ParallelizeUntil(ctx, len(packings), len(packings), func(j int) { - if err := p.launch(ctx, schedules[i].Constraints, packings[j]); err != nil { + if err := p.launch(ctx, nodes[i].Constraints, packings[j]); err != nil { logging.FromContext(ctx).Errorf("Could not launch node, %s", err) return } diff --git a/pkg/controllers/provisioning/scheduling/node.go b/pkg/controllers/provisioning/scheduling/node.go new file mode 100644 index 000000000000..dd5fc6889055 --- /dev/null +++ b/pkg/controllers/provisioning/scheduling/node.go @@ -0,0 +1,173 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "errors" + + v1 "k8s.io/api/core/v1" + + "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" + "github.com/aws/karpenter/pkg/cloudprovider" + "github.com/aws/karpenter/pkg/utils/resources" +) + +// Node is a set of constraints, compatible pods, and possible instance types that could fulfill these constraints. This +// will be turned into one or more actual node instances within the cluster after bin packing. +type Node struct { + Constraints *v1alpha5.Constraints + InstanceTypeOptions []cloudprovider.InstanceType + Pods []*v1.Pod +} + +func NewNode(constraints *v1alpha5.Constraints, instanceTypeOptions []cloudprovider.InstanceType, pods ...*v1.Pod) *Node { + n := &Node{Constraints: constraints.DeepCopy()} + + instanceTypes := constraints.Requirements.InstanceTypes() + for _, it := range instanceTypeOptions { + // provisioner must list the instance type by name (defaults to all instance types) + if !instanceTypes.Has(it.Name()) { + continue + } + included := false + // and the instance type must have some valid offering combination per the provisioner constraints + for _, off := range it.Offerings() { + if constraints.Requirements.Zones().Has(off.Zone) && constraints.Requirements.CapacityTypes().Has(off.CapacityType) { + included = true + break + } + } + if included { + n.InstanceTypeOptions = append(n.InstanceTypeOptions, it) + } + } + // technically we might create some invalid construct here if the pod can't be supported by any instance type + // but this is ok as we just won't have any valid instance types, this pod won't be schedulable and nothing else + // will be compatible with this node + for _, p := range pods { + n.Add(p) + } + return n +} +func (n Node) Compatible(pod *v1.Pod) error { + podRequirements := v1alpha5.NewPodRequirements(pod) + if err := n.Constraints.Requirements.Compatible(podRequirements); err != nil { + return err + } + + // Ensure that at least one instance type of the instance types that we are already narrowed down to based on the + // existing pods can support the pod + for _, it := range n.InstanceTypeOptions { + if n.isPodCompatible(pod, it) { + return nil + } + } + + return errors.New("no matching instance type found") +} + +func (n *Node) Add(pod *v1.Pod) { + n.Pods = append(n.Pods, pod) + n.Constraints = n.Constraints.Tighten(pod) + var instanceTypeOptions []cloudprovider.InstanceType + for _, it := range n.InstanceTypeOptions { + if n.isPodCompatible(pod, it) { + instanceTypeOptions = append(instanceTypeOptions, it) + } + } + n.InstanceTypeOptions = instanceTypeOptions +} + +// hasCompatibleNodeSelector returns true if a given set of node selectors match the instance type +//gocyclo:ignore +func (n Node) hasCompatibleNodeSelector(nodeSelector map[string]string, it cloudprovider.InstanceType) bool { + if ns, ok := nodeSelector[v1.LabelInstanceTypeStable]; ok { + if ns != it.Name() { + return false + } + } + + provReqs := n.Constraints.Requirements + hasOffering := func(pred func(off cloudprovider.Offering) bool) bool { + for _, off := range it.Offerings() { + // to be valid offering, the offering must be both supported by the pod and the provisioner + if pred(off) && provReqs.Zones().Has(off.Zone) && provReqs.CapacityTypes().Has(off.CapacityType) { + return true + } + } + return false + } + + ct, hasCt := nodeSelector[v1alpha5.LabelCapacityType] + zone, hasZone := nodeSelector[v1.LabelTopologyZone] + if hasCt && hasZone { + if !hasOffering(func(off cloudprovider.Offering) bool { return off.CapacityType == ct && off.Zone == zone }) { + return false + } + } else if hasCt { + if !hasOffering(func(off cloudprovider.Offering) bool { return off.CapacityType == ct }) { + return false + } + } else if hasZone { + if !hasOffering(func(off cloudprovider.Offering) bool { return off.Zone == zone }) { + return false + } + } + + if arch, ok := nodeSelector[v1.LabelArchStable]; ok { + if it.Architecture() != arch { + return false + } + } + + if os, ok := nodeSelector[v1.LabelOSStable]; ok { + if !it.OperatingSystems().Has(os) { + return false + } + } + return true +} + +func (n Node) isPodCompatible(pod *v1.Pod, it cloudprovider.InstanceType) bool { + return n.hasCompatibleResources(resources.RequestsForPods(pod), it) && + n.hasCompatibleNodeSelector(pod.Spec.NodeSelector, it) +} + +// hasCompatibleResources tests if a given node selector and resource request list is compatible with an instance type +func (n Node) hasCompatibleResources(resourceList v1.ResourceList, it cloudprovider.InstanceType) bool { + for name, quantity := range resourceList { + switch name { + case resources.NvidiaGPU: + if it.NvidiaGPUs().Cmp(quantity) < 0 { + return false + } + case resources.AWSNeuron: + if it.AWSNeurons().Cmp(quantity) < 0 { + return false + } + case resources.AMDGPU: + if it.AMDGPUs().Cmp(quantity) < 0 { + return false + } + case resources.AWSPodENI: + if it.AWSPodENI().Cmp(quantity) < 0 { + return false + } + default: + continue + } + } + return true +} diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 5c79f5b17e14..34a850830db6 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -45,87 +45,47 @@ func init() { } type Scheduler struct { - KubeClient client.Client - Topology *Topology -} - -type Schedule struct { - *v1alpha5.Constraints - // Pods is a set of pods that may schedule to the node; used for binpacking. - Pods []*v1.Pod + kubeClient client.Client + topology *Topology } func NewScheduler(kubeClient client.Client) *Scheduler { return &Scheduler{ - KubeClient: kubeClient, - Topology: &Topology{kubeClient: kubeClient}, + kubeClient: kubeClient, + topology: &Topology{kubeClient: kubeClient}, } } -func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypes []cloudprovider.InstanceType, pods []*v1.Pod) ([]*Schedule, error) { +func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypes []cloudprovider.InstanceType, pods []*v1.Pod) ([]*Node, error) { defer metrics.Measure(schedulingDuration.WithLabelValues(injection.GetNamespacedName(ctx).Name))() constraints := provisioner.Spec.Constraints.DeepCopy() // Inject temporarily adds specific NodeSelectors to pods, which are then // used by scheduling logic. This isn't strictly necessary, but is a useful // trick to avoid passing topology decisions through the scheduling code. It - // lets us to treat TopologySpreadConstraints as just-in-time NodeSelectors. - if err := s.Topology.Inject(ctx, constraints, pods); err != nil { + // lets us treat TopologySpreadConstraints as just-in-time NodeSelectors. + if err := s.topology.Inject(ctx, constraints, pods); err != nil { return nil, fmt.Errorf("injecting topology, %w", err) } - // Separate pods into schedules of isomorphic scheduling constraints. - return s.getSchedules(constraints, instanceTypes, pods), nil + return s.schedule(constraints, instanceTypes, pods), nil } -// getSchedules separates pods into a set of schedules. All pods in each group -// contain isomorphic scheduling constraints and can be deployed together on the -// same node, or multiple similar nodes if the pods exceed one node's capacity. -func (s *Scheduler) getSchedules(constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, pods []*v1.Pod) []*Schedule { - schedules := []*Schedule{} +// schedule separates pods into a list of TheoreticalNodes that contain the pods. All pods in each theoretical node +// contain isomorphic scheduling constraints and can be deployed together on the same node, or multiple similar nodes if +// the pods exceed one node's capacity. +func (s *Scheduler) schedule(constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, pods []*v1.Pod) []*Node { + var nodes []*Node for _, pod := range pods { - isCompatible := false - for _, schedule := range schedules { - if err := schedule.Requirements.Compatible(v1alpha5.NewPodRequirements(pod)); err == nil { - // Test if there is any instance type that can support the combined constraints - // TODO: Implement a virtual node approach solution that combine scheduling and node selection to solve this problem. - c := schedule.Tighten(pod) - for _, instanceType := range instanceTypes { - if support(instanceType, c) { - schedule.Constraints = c - schedule.Pods = append(schedule.Pods, pod) - isCompatible = true - break - } - } + isScheduled := false + for _, node := range nodes { + if err := node.Compatible(pod); err == nil { + node.Add(pod) + isScheduled = true + break } } - if !isCompatible { - schedules = append(schedules, &Schedule{Constraints: constraints.Tighten(pod), Pods: []*v1.Pod{pod}}) - } - } - return schedules -} - -func support(instanceType cloudprovider.InstanceType, constraints *v1alpha5.Constraints) bool { - req := []v1.NodeSelectorRequirement{} - for key := range constraints.Requirements.Keys() { - req = append(req, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpExists}) - } - for _, offering := range instanceType.Offerings() { - supported := map[string][]string{} - supported[v1.LabelTopologyZone] = []string{offering.Zone} - supported[v1alpha5.LabelCapacityType] = []string{offering.CapacityType} - supported[v1.LabelInstanceTypeStable] = []string{instanceType.Name()} - supported[v1.LabelArchStable] = []string{instanceType.Architecture()} - supported[v1.LabelOSStable] = instanceType.OperatingSystems().UnsortedList() - r := make([]v1.NodeSelectorRequirement, len(req)) - copy(r, req) - for key, values := range supported { - r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: values}) - } - requirements := v1alpha5.NewRequirements(r...) - if err := requirements.Compatible(constraints.Requirements); err == nil { - return true + if !isScheduled { + nodes = append(nodes, NewNode(constraints, instanceTypes, pod)) } } - return false + return nodes } diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index d22af89ad285..676f594a37b5 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -16,6 +16,7 @@ package scheduling_test import ( "context" + "k8s.io/apimachinery/pkg/api/resource" "strings" "testing" "time" @@ -1552,6 +1553,103 @@ var _ = Describe("Taints", func() { }) }) +var _ = Describe("Instance Type Compatibility", func() { + It("should not schedule if requesting more resources than any instance type has", func() { + pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("512"), + }}, + })) + ExpectNotScheduled(ctx, env.Client, pod[0]) + }) + It("should launch pods with different archs on different instances", func() { + provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ + { + Key: v1.LabelArchStable, + Operator: v1.NodeSelectorOpIn, + Values: []string{v1alpha5.ArchitectureArm64, v1alpha5.ArchitectureAmd64}, + }, + } + nodeNames := sets.NewString() + for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelArchStable: v1alpha5.ArchitectureAmd64}, + }), + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelArchStable: v1alpha5.ArchitectureArm64}, + })) { + node := ExpectScheduled(ctx, env.Client, pod) + nodeNames.Insert(node.Name) + } + Expect(nodeNames.Len()).To(Equal(2)) + }) + It("should launch pods with different operating systems on different instances", func() { + provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ + { + Key: v1.LabelArchStable, + Operator: v1.NodeSelectorOpIn, + Values: []string{v1alpha5.ArchitectureArm64, v1alpha5.ArchitectureAmd64}, + }, + } + nodeNames := sets.NewString() + for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelOSStable: "linux"}, + }), + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelOSStable: "windows"}, + })) { + node := ExpectScheduled(ctx, env.Client, pod) + nodeNames.Insert(node.Name) + } + Expect(nodeNames.Len()).To(Equal(2)) + }) + It("should launch pods with different instance type node selectors on different instances", func() { + provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ + { + Key: v1.LabelArchStable, + Operator: v1.NodeSelectorOpIn, + Values: []string{v1alpha5.ArchitectureArm64, v1alpha5.ArchitectureAmd64}, + }, + } + nodeNames := sets.NewString() + for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelInstanceType: "small-instance-type"}, + }), + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelInstanceTypeStable: "default-instance-type"}, + })) { + node := ExpectScheduled(ctx, env.Client, pod) + nodeNames.Insert(node.Name) + } + Expect(nodeNames.Len()).To(Equal(2)) + }) + It("should launch pods with different zone selectors on different instances", func() { + provisioner.Spec.Requirements.Requirements = []v1.NodeSelectorRequirement{ + { + Key: v1.LabelArchStable, + Operator: v1.NodeSelectorOpIn, + Values: []string{v1alpha5.ArchitectureArm64, v1alpha5.ArchitectureAmd64}, + }, + } + nodeNames := sets.NewString() + for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1"}, + }), + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-2"}, + })) { + node := ExpectScheduled(ctx, env.Client, pod) + nodeNames.Insert(node.Name) + } + Expect(nodeNames.Len()).To(Equal(2)) + }) +}) + var _ = Describe("Networking constraints", func() { Context("HostPort", func() { It("shouldn't co-locate pods that use the same HostPort and protocol", func() { diff --git a/pkg/utils/resources/resources.go b/pkg/utils/resources/resources.go index 67324ad8e619..d0f7d1297ebb 100644 --- a/pkg/utils/resources/resources.go +++ b/pkg/utils/resources/resources.go @@ -48,19 +48,6 @@ func LimitsForPods(pods ...*v1.Pod) v1.ResourceList { return Merge(resources...) } -// GPULimitsFor returns a resource list of GPU limits from a pod -// GPUs must be specified in the Limits section of the pod resources per -// https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/ -func GPULimitsFor(pod *v1.Pod) v1.ResourceList { - resources := v1.ResourceList{} - for key, value := range LimitsForPods(pod) { - if key == AMDGPU || key == AWSNeuron || key == NvidiaGPU { - resources[key] = value - } - } - return resources -} - // Merge the resources from the variadic into a single v1.ResourceList func Merge(resources ...v1.ResourceList) v1.ResourceList { result := v1.ResourceList{}