Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamically retrieve instance types #287

Merged
merged 11 commits into from
Mar 16, 2021
2 changes: 2 additions & 0 deletions docs/aws/karpenter.cloudformation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Resources:
- "ec2:DescribeInstances"
- "ec2:DescribeSecurityGroups"
- "ec2:DescribeSubnets"
- "ec2:DescribeInstanceTypes"
- "ec2:DescribeInstanceTypeOfferings"
- "iam:GetInstanceProfile"
- "ssm:GetParameter"
KarpenterNodeInstanceProfile:
Expand Down
90 changes: 0 additions & 90 deletions go.sum

Large diffs are not rendered by default.

91 changes: 91 additions & 0 deletions pkg/cloudprovider/aws/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type EC2API struct {
DescribeLaunchTemplatesOutput *ec2.DescribeLaunchTemplatesOutput
DescribeSubnetsOutput *ec2.DescribeSubnetsOutput
DescribeSecurityGroupsOutput *ec2.DescribeSecurityGroupsOutput
DescribeInstanceTypesOutput *ec2.DescribeInstanceTypesOutput
WantErr error

CalledWithCreateFleetInput []ec2.CreateFleetInput
Expand Down Expand Up @@ -97,3 +98,93 @@ func (a *EC2API) DescribeSecurityGroupsWithContext(context.Context, *ec2.Describ
}
return &ec2.DescribeSecurityGroupsOutput{SecurityGroups: []*ec2.SecurityGroup{{GroupId: aws.String("test-group")}}}, nil
}

func (a *EC2API) DescribeInstanceTypesPagesWithContext(ctx context.Context, input *ec2.DescribeInstanceTypesInput, fn func(*ec2.DescribeInstanceTypesOutput, bool) bool, opts ...request.Option) error {
ellistarn marked this conversation as resolved.
Show resolved Hide resolved
if a.WantErr != nil {
return a.WantErr
}
if a.DescribeInstanceTypesOutput != nil {
fn(a.DescribeInstanceTypesOutput, false)
return nil
}
fn(&ec2.DescribeInstanceTypesOutput{
InstanceTypes: []*ec2.InstanceTypeInfo{
{
InstanceType: aws.String("m5.large"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(2),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(8),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(3),
Ipv4AddressesPerInterface: aws.Int64(30),
},
},
{
InstanceType: aws.String("m5.xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(4),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(16),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(4),
Ipv4AddressesPerInterface: aws.Int64(60),
},
},
{
InstanceType: aws.String("m5.2xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(8),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(32),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(4),
Ipv4AddressesPerInterface: aws.Int64(60),
},
},
{
InstanceType: aws.String("m5.4xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(16),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(64),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(8),
Ipv4AddressesPerInterface: aws.Int64(240),
},
},
{
InstanceType: aws.String("m5.8xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand")},
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(32),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(128),
},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(8),
Ipv4AddressesPerInterface: aws.Int64(240),
},
},
},
}, false)
return nil
}
90 changes: 23 additions & 67 deletions pkg/cloudprovider/aws/packing/nodecapacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,84 +15,40 @@ limitations under the License.
package packing

import (
"fmt"

"github.com/aws/aws-sdk-go/service/ec2"
resourcesUtil "github.com/awslabs/karpenter/pkg/utils/resources"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

// TODO get this information from node-instance-selector
var (
nodeCapacities = []*nodeCapacity{

{
instanceType: "m5.24xlarge",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("96000m"),
v1.ResourceMemory: resource.MustParse("384Gi"),
v1.ResourcePods: resource.MustParse("737"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
{
instanceType: "m5.8xlarge",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("32000m"),
v1.ResourceMemory: resource.MustParse("128Gi"),
v1.ResourcePods: resource.MustParse("234"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
{
instanceType: "m5.2xlarge",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("8000m"),
v1.ResourceMemory: resource.MustParse("32Gi"),
v1.ResourcePods: resource.MustParse("58"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
{
instanceType: "m5.xlarge",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("4000m"),
v1.ResourceMemory: resource.MustParse("16Gi"),
v1.ResourcePods: resource.MustParse("58"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
{
instanceType: "m5.large",
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("8Gi"),
v1.ResourcePods: resource.MustParse("29"),
},
reserved: v1.ResourceList{
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
},
},
}
)

type nodeCapacity struct {
instanceType string
reserved v1.ResourceList
total v1.ResourceList
}

func nodeCapacityFrom(instanceTypeInfo ec2.InstanceTypeInfo) *nodeCapacity {
return &nodeCapacity{
instanceType: *instanceTypeInfo.InstanceType,
total: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprint(*instanceTypeInfo.VCpuInfo.DefaultVCpus)),
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%dMi", *instanceTypeInfo.MemoryInfo.SizeInMiB)),
// The number of pods per node is calculated using the formula:
// max number of ENIs * (IPv4 Addresses per ENI -1) + 2
// https://github.com/awslabs/amazon-eks-ami/blob/master/files/eni-max-pods.txt#L20
v1.ResourcePods: resource.MustParse(fmt.Sprint(
*instanceTypeInfo.NetworkInfo.MaximumNetworkInterfaces*(*instanceTypeInfo.NetworkInfo.Ipv4AddressesPerInterface-1) + 2)),
},
reserved: v1.ResourceList{
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
v1.ResourceCPU: resource.Quantity{},
v1.ResourceMemory: resource.Quantity{},
v1.ResourcePods: resource.Quantity{},
},
}
}

func (nc *nodeCapacity) Copy() *nodeCapacity {
return &nodeCapacity{nc.instanceType, nc.reserved.DeepCopy(), nc.total.DeepCopy()}
}
Expand Down
90 changes: 71 additions & 19 deletions pkg/cloudprovider/aws/packing/packing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ package packing

import (
"context"
"fmt"
"sort"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
provisioningv1alpha1 "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/utils/binpacking"
"github.com/awslabs/karpenter/pkg/utils/resources"
Expand All @@ -27,10 +31,14 @@ import (
)

type podPacker struct {
// TODO use this ec2 API to get the instance types
ec2 ec2iface.EC2API
}

type packingResult struct {
JacobGabrielson marked this conversation as resolved.
Show resolved Hide resolved
packed []*v1.Pod
unpacked []*v1.Pod
}

// Packer helps pack the pods and calculates efficient placement on the instances.
type Packer interface {
// TODO use ctx when calling ec2 API
Expand Down Expand Up @@ -60,11 +68,15 @@ func (p *podPacker) Pack(ctx context.Context, constraints *cloudprovider.Constra
// Sort pods in decreasing order by the amount of CPU requested, if
// CPU requested is equal compare memory requested.
sort.Sort(sort.Reverse(binpacking.ByResourcesRequested{SortablePods: constraints.Pods}))
packings := []*Packing{}
var packings []*Packing
var packing *Packing
var err error
remainingPods := constraints.Pods
ellistarn marked this conversation as resolved.
Show resolved Hide resolved
for len(remainingPods) > 0 {
packing, remainingPods = p.packWithLargestPod(remainingPods, constraints)
packing, remainingPods, err = p.packWithLargestPod(remainingPods, constraints)
if err != nil {
return packings, err
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
}
// checked all instance type and found no packing option
if len(packing.Pods) == 0 {
zap.S().Warnf("Failed to find instance type for pod %s/%s ", remainingPods[0].Namespace, remainingPods[0].Name)
Expand All @@ -77,33 +89,73 @@ func (p *podPacker) Pack(ctx context.Context, constraints *cloudprovider.Constra
return packings, nil
}

// TODO filter instance types based on node contraints like availability zones etc.
func (p *podPacker) getNodeCapacities(constraints *cloudprovider.Constraints) []*nodeCapacity {
result := make([]*nodeCapacity, 0)
for _, nc := range nodeCapacities {
ncc := nc.Copy()
kubeletOverhead := binpacking.CalculateKubeletOverhead(ncc.total)
if ok := ncc.reserve(resources.Merge(constraints.Overhead, kubeletOverhead)); !ok {
zap.S().Errorf("Failed to reserve kubelet overhead for node capacity type %v", ncc.instanceType)
func (p *podPacker) getNodeCapacities(constraints *cloudprovider.Constraints) ([]*nodeCapacity, error) {
result := []*nodeCapacity{}

describeInstanceTypesInput := &ec2.DescribeInstanceTypesInput{
Filters: describeInstanceTypesFiltersFrom(constraints),
}

err := p.ec2.DescribeInstanceTypesPagesWithContext(context.TODO(), describeInstanceTypesInput, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool {
JacobGabrielson marked this conversation as resolved.
Show resolved Hide resolved
for _, instanceTypeInfo := range page.InstanceTypes {
nc := nodeCapacityFrom(*instanceTypeInfo)
kubeletOverhead := binpacking.CalculateKubeletOverhead(nc.total)
if ok := nc.reserve(resources.Merge(constraints.Overhead, kubeletOverhead)); !ok {
zap.S().Infof("Excluding instance type %s because there are not enough resources for the kubelet overhead", nc.instanceType)
}
result = append(result, nc)
}
result = append(result, ncc)
return lastPage
})

if err != nil {
return nil, fmt.Errorf("fetching instance types using ec2.DescribeInstanceTypes, %w", err)
}
return result
return result, nil
}

type packingResult struct {
packed []*v1.Pod
unpacked []*v1.Pod
func describeInstanceTypesFiltersFrom(constraints *cloudprovider.Constraints) []*ec2.Filter {
architecture := "x86_64"
if constraints.Architecture != nil && *constraints.Architecture == provisioningv1alpha1.ArchitectureArm64 {
architecture = string(*constraints.Architecture)
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
}

filters := []*ec2.Filter{
{
Name: aws.String("processor-info.supported-architecture"),
Values: []*string{&architecture},
},
{
Name: aws.String("supported-usage-class"),
Values: []*string{aws.String("on-demand")},
},
{
Name: aws.String("supported-virtualization-type"),
Values: []*string{aws.String("hvm")},
},
}
if len(constraints.InstanceTypes) != 0 {
instanceTypesFilter := &ec2.Filter{
Name: aws.String("instance-type"),
Values: aws.StringSlice(constraints.InstanceTypes),
}
filters = append(filters, instanceTypesFilter)
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
}
return filters
}

// packWithLargestPod will try to pack max number of pods with largest pod in
// pods across all available node capacities. It returns Packing: max pod count
// that fit; with their node capacities and list of leftover pods
func (p *podPacker) packWithLargestPod(unpackedPods []*v1.Pod, constraints *cloudprovider.Constraints) (*Packing, []*v1.Pod) {
func (p *podPacker) packWithLargestPod(unpackedPods []*v1.Pod, constraints *cloudprovider.Constraints) (*Packing, []*v1.Pod, error) {
bestPackedPods := []*v1.Pod{}
bestCapacities := []*nodeCapacity{}
remainingPods := unpackedPods
for _, nc := range p.getNodeCapacities(constraints) {
nodeCapacities, err := p.getNodeCapacities(constraints)
if err != nil {
return nil, nil, err
}
for _, nc := range nodeCapacities {
// check how many pods we can fit with the available capacity
result := p.packPodsForCapacity(nc, unpackedPods)
if len(result.packed) == 0 {
Expand All @@ -125,7 +177,7 @@ func (p *podPacker) packWithLargestPod(unpackedPods []*v1.Pod, constraints *clou
for _, capacity := range bestCapacities {
capacityNames = append(capacityNames, capacity.instanceType)
}
return &Packing{Pods: bestPackedPods, InstanceTypes: capacityNames}, remainingPods
return &Packing{Pods: bestPackedPods, InstanceTypes: capacityNames}, remainingPods, nil
}

func (p *podPacker) packPodsForCapacity(capacity *nodeCapacity, pods []*v1.Pod) *packingResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *Constraints) getNodeOverhead(ctx context.Context, node *v1.Node) (v1.Re
}

// 2. filter DaemonSets to include those that will schedule on this node
podSpecs := make([]*v1.PodSpec, 0)
podSpecs := []*v1.PodSpec{}
for _, daemonSet := range daemonSetList.Items {
if scheduling.IsSchedulable(&daemonSet.Spec.Template.Spec, node) {
podSpecs = append(podSpecs, &daemonSet.Spec.Template.Spec)
Expand Down