Skip to content

Commit

Permalink
dynamically retrieve instance types (#287)
Browse files Browse the repository at this point in the history
* dynamically retrieve instance types

* support arm, remove instance type fallbacks, and fail on errors for dynamic instance type retrieval

* in-lined many things in packing

* change slice make call to curly brace init syntax

* update info log when kubelet overhead fails to reserve

* fix condition on architecture

* fix error message wording

* 0 out pods reserve and change err return to nil

* remove reserve resources when getting node capacities

* change provisioningv1alpha1 to just provisioning

* inline filters append
  • Loading branch information
bwagner5 authored Mar 16, 2021
1 parent cd774f5 commit 5d55935
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 184 deletions.
2 changes: 2 additions & 0 deletions docs/aws/karpenter.cloudformation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Resources:
- "ec2:DescribeInstances"
- "ec2:DescribeSecurityGroups"
- "ec2:DescribeSubnets"
- "ec2:DescribeInstanceTypes"
- "ec2:DescribeInstanceTypeOfferings"
- "iam:GetInstanceProfile"
- "ssm:GetParameter"
KarpenterNodeInstanceProfile:
Expand Down
90 changes: 0 additions & 90 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/cloudprovider/aws/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/iam"
"github.com/aws/aws-sdk-go/service/ssm"
provisioningv1alpha1 "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
provisioning "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/packing"
"github.com/awslabs/karpenter/pkg/utils/log"
Expand Down Expand Up @@ -82,7 +82,7 @@ func NewFactory(options cloudprovider.Options) *Factory {
}
}

func (f *Factory) CapacityFor(spec *provisioningv1alpha1.ProvisionerSpec) cloudprovider.Capacity {
func (f *Factory) CapacityFor(spec *provisioning.ProvisionerSpec) cloudprovider.Capacity {
return &Capacity{
spec: spec,
nodeFactory: f.nodeFactory,
Expand Down
91 changes: 91 additions & 0 deletions pkg/cloudprovider/aws/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type EC2API struct {
DescribeLaunchTemplatesOutput *ec2.DescribeLaunchTemplatesOutput
DescribeSubnetsOutput *ec2.DescribeSubnetsOutput
DescribeSecurityGroupsOutput *ec2.DescribeSecurityGroupsOutput
DescribeInstanceTypesOutput *ec2.DescribeInstanceTypesOutput
WantErr error

CalledWithCreateFleetInput []ec2.CreateFleetInput
Expand Down Expand Up @@ -97,3 +98,93 @@ func (a *EC2API) DescribeSecurityGroupsWithContext(context.Context, *ec2.Describ
}
return &ec2.DescribeSecurityGroupsOutput{SecurityGroups: []*ec2.SecurityGroup{{GroupId: aws.String("test-group")}}}, nil
}

func (a *EC2API) DescribeInstanceTypesPagesWithContext(ctx context.Context, input *ec2.DescribeInstanceTypesInput, fn func(*ec2.DescribeInstanceTypesOutput, bool) bool, opts ...request.Option) error {
if a.WantErr != nil {
return a.WantErr
}
if a.DescribeInstanceTypesOutput != nil {
fn(a.DescribeInstanceTypesOutput, false)
return nil
}
fn(&ec2.DescribeInstanceTypesOutput{
InstanceTypes: []*ec2.InstanceTypeInfo{
{
InstanceType: aws.String("m5.large"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(2),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(8),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(3),
Ipv4AddressesPerInterface: aws.Int64(30),
},
},
{
InstanceType: aws.String("m5.xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(4),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(16),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(4),
Ipv4AddressesPerInterface: aws.Int64(60),
},
},
{
InstanceType: aws.String("m5.2xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(8),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(32),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(4),
Ipv4AddressesPerInterface: aws.Int64(60),
},
},
{
InstanceType: aws.String("m5.4xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(16),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(64),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(8),
Ipv4AddressesPerInterface: aws.Int64(240),
},
},
{
InstanceType: aws.String("m5.8xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(32),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(128),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(8),
Ipv4AddressesPerInterface: aws.Int64(240),
},
},
},
}, false)
return nil
}
85 changes: 18 additions & 67 deletions pkg/cloudprovider/aws/packing/nodecapacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,84 +15,35 @@ limitations under the License.
package packing

import (
"fmt"

"github.com/aws/aws-sdk-go/service/ec2"
resourcesUtil "github.com/awslabs/karpenter/pkg/utils/resources"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

// TODO get this information from node-instance-selector
var (
nodeCapacities = []*nodeCapacity{

{
instanceType: "m5.24xlarge",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("96000m"),
v1.ResourceMemory: resource.MustParse("384Gi"),
v1.ResourcePods: resource.MustParse("737"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
{
instanceType: "m5.8xlarge",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("32000m"),
v1.ResourceMemory: resource.MustParse("128Gi"),
v1.ResourcePods: resource.MustParse("234"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
{
instanceType: "m5.2xlarge",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("8000m"),
v1.ResourceMemory: resource.MustParse("32Gi"),
v1.ResourcePods: resource.MustParse("58"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
{
instanceType: "m5.xlarge",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4000m"),
v1.ResourceMemory: resource.MustParse("16Gi"),
v1.ResourcePods: resource.MustParse("58"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
{
instanceType: "m5.large",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("8Gi"),
v1.ResourcePods: resource.MustParse("29"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
}
)

type nodeCapacity struct {
instanceType string
reserved v1.ResourceList
total v1.ResourceList
}

func nodeCapacityFrom(instanceTypeInfo ec2.InstanceTypeInfo) *nodeCapacity {
return &nodeCapacity{
instanceType: *instanceTypeInfo.InstanceType,
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprint(*instanceTypeInfo.VCpuInfo.DefaultVCpus)),
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dMi", *instanceTypeInfo.MemoryInfo.SizeInMiB)),
// The number of pods per node is calculated using the formula:
// max number of ENIs * (IPv4 Addresses per ENI -1) + 2
// https://github.com/awslabs/amazon-eks-ami/blob/master/files/eni-max-pods.txt#L20
v1.ResourcePods: resource.MustParse(fmt.Sprint(
*instanceTypeInfo.NetworkInfo.MaximumNetworkInterfaces*(*instanceTypeInfo.NetworkInfo.Ipv4AddressesPerInterface-1) + 2)),
},
}
}

func (nc *nodeCapacity) Copy() *nodeCapacity {
return &nodeCapacity{nc.instanceType, nc.reserved.DeepCopy(), nc.total.DeepCopy()}
}
Expand Down
89 changes: 70 additions & 19 deletions pkg/cloudprovider/aws/packing/packing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ package packing

import (
"context"
"fmt"
"sort"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
provisioning "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/utils/binpacking"
"github.com/awslabs/karpenter/pkg/utils/resources"
Expand All @@ -27,10 +31,14 @@ import (
)

type podPacker struct {
// TODO use this ec2 API to get the instance types
ec2 ec2iface.EC2API
}

type packingResult struct {
packed []*v1.Pod
unpacked []*v1.Pod
}

// Packer helps pack the pods and calculates efficient placement on the instances.
type Packer interface {
// TODO use ctx when calling ec2 API
Expand Down Expand Up @@ -60,11 +68,15 @@ func (p *podPacker) Pack(ctx context.Context, constraints *cloudprovider.Constra
// Sort pods in decreasing order by the amount of CPU requested, if
// CPU requested is equal compare memory requested.
sort.Sort(sort.Reverse(binpacking.ByResourcesRequested{SortablePods: constraints.Pods}))
packings := []*Packing{}
var packings []*Packing
var packing *Packing
var err error
remainingPods := constraints.Pods
for len(remainingPods) > 0 {
packing, remainingPods = p.packWithLargestPod(remainingPods, constraints)
packing, remainingPods, err = p.packWithLargestPod(remainingPods, constraints)
if err != nil {
return packings, fmt.Errorf("packing with largest pod, %w", err)
}
// checked all instance type and found no packing option
if len(packing.Pods) == 0 {
zap.S().Warnf("Failed to find instance type for pod %s/%s ", remainingPods[0].Namespace, remainingPods[0].Name)
Expand All @@ -77,33 +89,72 @@ func (p *podPacker) Pack(ctx context.Context, constraints *cloudprovider.Constra
return packings, nil
}

// TODO filter instance types based on node contraints like availability zones etc.
func (p *podPacker) getNodeCapacities(constraints *cloudprovider.Constraints) []*nodeCapacity {
result := make([]*nodeCapacity, 0)
for _, nc := range nodeCapacities {
ncc := nc.Copy()
kubeletOverhead := binpacking.CalculateKubeletOverhead(ncc.total)
if ok := ncc.reserve(resources.Merge(constraints.Overhead, kubeletOverhead)); !ok {
zap.S().Errorf("Failed to reserve kubelet overhead for node capacity type %v", ncc.instanceType)
func (p *podPacker) getNodeCapacities(constraints *cloudprovider.Constraints) ([]*nodeCapacity, error) {
result := []*nodeCapacity{}

describeInstanceTypesInput := &ec2.DescribeInstanceTypesInput{
Filters: describeInstanceTypesFiltersFrom(constraints),
}

err := p.ec2.DescribeInstanceTypesPagesWithContext(context.TODO(), describeInstanceTypesInput, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool {
for _, instanceTypeInfo := range page.InstanceTypes {
nc := nodeCapacityFrom(*instanceTypeInfo)
kubeletOverhead := binpacking.CalculateKubeletOverhead(nc.total)
if ok := nc.reserve(resources.Merge(constraints.Overhead, kubeletOverhead)); !ok {
zap.S().Infof("Excluding instance type %s because there are not enough resources for the kubelet overhead", nc.instanceType)
}
result = append(result, nc)
}
result = append(result, ncc)
return lastPage
})

if err != nil {
return nil, fmt.Errorf("fetching instance types using ec2.DescribeInstanceTypes, %w", err)
}
return result
return result, nil
}

type packingResult struct {
packed []*v1.Pod
unpacked []*v1.Pod
func describeInstanceTypesFiltersFrom(constraints *cloudprovider.Constraints) []*ec2.Filter {
architecture := "x86_64"
if constraints.Architecture != nil && *constraints.Architecture == provisioning.ArchitectureArm64 {
architecture = string(*constraints.Architecture)
}

filters := []*ec2.Filter{
{
Name: aws.String("processor-info.supported-architecture"),
Values: []*string{&architecture},
},
{
Name: aws.String("supported-usage-class"),
Values: []*string{aws.String("on-demand")},
},
{
Name: aws.String("supported-virtualization-type"),
Values: []*string{aws.String("hvm")},
},
}
if len(constraints.InstanceTypes) != 0 {
filters = append(filters, &ec2.Filter{
Name: aws.String("instance-type"),
Values: aws.StringSlice(constraints.InstanceTypes),
})
}
return filters
}

// packWithLargestPod will try to pack max number of pods with largest pod in
// pods across all available node capacities. It returns Packing: max pod count
// that fit; with their node capacities and list of leftover pods
func (p *podPacker) packWithLargestPod(unpackedPods []*v1.Pod, constraints *cloudprovider.Constraints) (*Packing, []*v1.Pod) {
func (p *podPacker) packWithLargestPod(unpackedPods []*v1.Pod, constraints *cloudprovider.Constraints) (*Packing, []*v1.Pod, error) {
bestPackedPods := []*v1.Pod{}
bestCapacities := []*nodeCapacity{}
remainingPods := unpackedPods
for _, nc := range p.getNodeCapacities(constraints) {
nodeCapacities, err := p.getNodeCapacities(constraints)
if err != nil {
return nil, nil, err
}
for _, nc := range nodeCapacities {
// check how many pods we can fit with the available capacity
result := p.packPodsForCapacity(nc, unpackedPods)
if len(result.packed) == 0 {
Expand All @@ -125,7 +176,7 @@ func (p *podPacker) packWithLargestPod(unpackedPods []*v1.Pod, constraints *clou
for _, capacity := range bestCapacities {
capacityNames = append(capacityNames, capacity.instanceType)
}
return &Packing{Pods: bestPackedPods, InstanceTypes: capacityNames}, remainingPods
return &Packing{Pods: bestPackedPods, InstanceTypes: capacityNames}, remainingPods, nil
}

func (p *podPacker) packPodsForCapacity(capacity *nodeCapacity, pods []*v1.Pod) *packingResult {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/fake/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package fake
import (
"fmt"

provisioningv1alpha1 "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
provisioning "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
)

Expand Down Expand Up @@ -47,6 +47,6 @@ func NewNotImplementedFactory() *Factory {
return &Factory{WantErr: NotImplementedError}
}

func (f *Factory) CapacityFor(spec *provisioningv1alpha1.ProvisionerSpec) cloudprovider.Capacity {
func (f *Factory) CapacityFor(spec *provisioning.ProvisionerSpec) cloudprovider.Capacity {
return &Capacity{}
}
Loading

0 comments on commit 5d55935

Please sign in to comment.