Skip to content

Commit

Permalink
introduce theoretical node (#1515)
Browse files Browse the repository at this point in the history
- fix issue related to invalid instance types
- start moving instance type compatibility from packer to scheduler
  • Loading branch information
tzneal authored Mar 15, 2022
1 parent 8c17f26 commit afc6759
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 179 deletions.
6 changes: 5 additions & 1 deletion pkg/cloudprovider/aws/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2
DefaultVCpus: aws.Int64(2),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(2 * 1024),
SizeInMiB: aws.Int64(4 * 1024),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(4),
Expand Down Expand Up @@ -450,6 +450,10 @@ func (e *EC2API) DescribeInstanceTypeOfferingsPagesWithContext(_ context.Context
InstanceType: aws.String("inf1.6xlarge"),
Location: aws.String("test-zone-1a"),
},
{
InstanceType: aws.String("c6g.large"),
Location: aws.String("test-zone-1a"),
},
},
}, false)
return nil
Expand Down
35 changes: 34 additions & 1 deletion pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ var _ = Describe("Allocation", func() {
Expect(supportsPodENI()).To(Equal(true))
}
})
// TODO(todd): this set of tests should move to scheduler once resource handling is made more generic
It("should launch instances for Nvidia GPU resource requests", func() {
nodeNames := sets.NewString()
for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
Expand Down Expand Up @@ -214,6 +215,37 @@ var _ = Describe("Allocation", func() {
}
Expect(nodeNames.Len()).To(Equal(2))
})
It("should launch pods with Nvidia and Neuron resources on different instances", func() {
nodeNames := sets.NewString()
for _, pod := range ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")},
Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")},
},
}),
// Should pack onto a different instance since no type has both Nvidia and Neuron
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")},
Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")},
},
})) {
node := ExpectScheduled(ctx, env.Client, pod)
nodeNames.Insert(node.Name)
}
Expect(nodeNames.Len()).To(Equal(2))
})
It("should fail to schedule a pod with both Nvidia and Neuron resources requests", func() {
pods := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1"), resources.AWSNeuron: resource.MustParse("1")},
Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1"), resources.AWSNeuron: resource.MustParse("1")},
},
}))
ExpectNotScheduled(ctx, env.Client, pods[0])
})
It("should not schedule a non-GPU workload on a node w/GPU", func() {
Skip("enable after scheduling and binpacking are merged into the same process")
nodeNames := sets.NewString()
Expand Down Expand Up @@ -536,7 +568,8 @@ var _ = Describe("Allocation", func() {
})
Context("Subnets", func() {
It("should default to the cluster's subnets", func() {
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod())[0]
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, ProvisionerWithProvider(provisioner, provider), test.UnschedulablePod(
test.PodOptions{NodeSelector: map[string]string{v1.LabelArchStable: v1alpha5.ArchitectureAmd64}}))[0]
ExpectScheduled(ctx, env.Client, pod)
Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1))
input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput)
Expand Down
99 changes: 4 additions & 95 deletions pkg/controllers/provisioning/binpacking/packable.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@ package binpacking

import (
"context"
"fmt"
"sort"

"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"knative.dev/pkg/logging"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/utils/resources"
)
Expand All @@ -40,26 +37,12 @@ type Result struct {
unpacked []*v1.Pod
}

// PackablesFor creates viable packables for the provided constraints, excluding
// those that can't fit resources or violate constraints.
func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceType, constraints *v1alpha5.Constraints, pods []*v1.Pod, daemons []*v1.Pod) []*Packable {
packables := []*Packable{}
// PackablesFor creates viable packables for the provided constraints, excluding those that can't fit the kubelet
// or daemonsets. This assumes that instanceTypes has already been pre-filtered for pod compatibility.
func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceType, daemons []*v1.Pod) []*Packable {
var packables []*Packable
for _, instanceType := range instanceTypes {
packable := PackableFor(instanceType)
// First pass at filtering down to viable instance types;
// additional filtering will be done by later steps (such as
// removing instance types that obviously lack resources, such
// as GPUs, for the workload being presented).
if err := multierr.Combine(
packable.validateOfferings(constraints),
packable.validateInstanceType(constraints),
packable.validateArchitecture(constraints),
packable.validateOperatingSystems(constraints),
packable.validateAWSPodENI(pods),
packable.validateGPUs(pods),
); err != nil {
continue
}
// Calculate Kubelet Overhead
if ok := packable.reserve(instanceType.Overhead()); !ok {
logging.FromContext(ctx).Debugf("Excluding instance type %s because there are not enough resources for kubelet and system overhead", packable.Name())
Expand Down Expand Up @@ -173,80 +156,6 @@ func (p *Packable) reservePod(pod *v1.Pod) bool {
return p.reserve(requests)
}

func (p *Packable) validateInstanceType(constraints *v1alpha5.Constraints) error {
if !constraints.Requirements.InstanceTypes().Has(p.Name()) {
return fmt.Errorf("instance type %s not in %s", p.Name(), constraints.Requirements.InstanceTypes())
}
return nil
}

func (p *Packable) validateArchitecture(constraints *v1alpha5.Constraints) error {
if !constraints.Requirements.Architectures().Has(p.Architecture()) {
return fmt.Errorf("architecture %s not in %s", p.Name(), constraints.Requirements.Architectures())
}
return nil
}

func (p *Packable) validateOperatingSystems(constraints *v1alpha5.Constraints) error {
if constraints.Requirements.OperatingSystems().Intersection(p.OperatingSystems()).Len() == 0 {
return fmt.Errorf("operating systems %s not in %s", p.Name(), constraints.Requirements.OperatingSystems())
}
return nil
}

func (p *Packable) validateOfferings(constraints *v1alpha5.Constraints) error {
for _, offering := range p.Offerings() {
if constraints.Requirements.CapacityTypes().Has(offering.CapacityType) && constraints.Requirements.Zones().Has(offering.Zone) {
return nil
}
}
return fmt.Errorf("offerings %v are not available for capacity types %s and zones %s", p.Offerings(), constraints.Requirements.CapacityTypes(), constraints.Requirements.Zones())
}

func (p *Packable) validateGPUs(pods []*v1.Pod) error {
gpuResources := map[v1.ResourceName]*resource.Quantity{
resources.NvidiaGPU: p.InstanceType.NvidiaGPUs(),
resources.AMDGPU: p.InstanceType.AMDGPUs(),
resources.AWSNeuron: p.InstanceType.AWSNeurons(),
}
for resourceName, instanceTypeResourceQuantity := range gpuResources {
if p.requiresResource(pods, resourceName) && instanceTypeResourceQuantity.IsZero() {
return fmt.Errorf("%s is required", resourceName)
} else if !p.requiresResource(pods, resourceName) && !instanceTypeResourceQuantity.IsZero() {
return fmt.Errorf("%s is not required", resourceName)
}
}
return nil
}

func (p *Packable) requiresResource(pods []*v1.Pod, resource v1.ResourceName) bool {
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
if _, ok := container.Resources.Requests[resource]; ok {
return true
}
if _, ok := container.Resources.Limits[resource]; ok {
return true
}
}
}
return false
}

func (p *Packable) validateAWSPodENI(pods []*v1.Pod) error {
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
if _, ok := container.Resources.Requests[resources.AWSPodENI]; ok {
if p.InstanceType.AWSPodENI().IsZero() {
return fmt.Errorf("aws pod eni is required")
}
return nil
}
}
}
return nil
}

func packableNames(instanceTypes []*Packable) []string {
names := []string{}
for _, instanceType := range instanceTypes {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/binpacking/packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (p *Packer) Pack(ctx context.Context, constraints *v1alpha5.Constraints, po
var packings []*Packing
var packing *Packing
remainingPods := pods
emptyPackables := PackablesFor(ctx, instanceTypes, constraints, pods, daemons)
emptyPackables := PackablesFor(ctx, instanceTypes, daemons)
for len(remainingPods) > 0 {
packables := []*Packable{}
for _, packable := range emptyPackables {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/binpacking/packer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func BenchmarkPacker(b *testing.B) {
},
})

schedule := &scheduling.Schedule{
node := &scheduling.Node{
Constraints: &v1alpha5.Constraints{
Requirements: v1alpha5.NewRequirements([]v1.NodeSelectorRequirement{
{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3"}},
Expand All @@ -67,7 +67,7 @@ func BenchmarkPacker(b *testing.B) {

// Pack benchmark
for i := 0; i < b.N; i++ {
if packings, err := packer.Pack(ctx, schedule.Constraints, pods, instanceTypes); err != nil || len(packings) == 0 {
if packings, err := packer.Pack(ctx, node.Constraints, pods, instanceTypes); err != nil || len(packings) == 0 {
b.FailNow()
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,19 @@ func (p *Provisioner) provision(ctx context.Context) error {
return fmt.Errorf("getting instance types, %w", err)
}
// Separate pods by scheduling constraints
schedules, err := p.scheduler.Solve(ctx, p.Provisioner, instanceTypes, pods)
nodes, err := p.scheduler.Solve(ctx, p.Provisioner, instanceTypes, pods)
if err != nil {
return fmt.Errorf("solving scheduling constraints, %w", err)
}
// Launch capacity and bind pods
workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(i int) {
packings, err := p.packer.Pack(ctx, schedules[i].Constraints, schedules[i].Pods, instanceTypes)
workqueue.ParallelizeUntil(ctx, len(nodes), len(nodes), func(i int) {
packings, err := p.packer.Pack(ctx, nodes[i].Constraints, nodes[i].Pods, nodes[i].InstanceTypeOptions)
if err != nil {
logging.FromContext(ctx).Errorf("Could not pack pods, %s", err)
return
}
workqueue.ParallelizeUntil(ctx, len(packings), len(packings), func(j int) {
if err := p.launch(ctx, schedules[i].Constraints, packings[j]); err != nil {
if err := p.launch(ctx, nodes[i].Constraints, packings[j]); err != nil {
logging.FromContext(ctx).Errorf("Could not launch node, %s", err)
return
}
Expand Down
Loading

0 comments on commit afc6759

Please sign in to comment.