Skip to content

Commit

Permalink
Implemented a well known label for subnets
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Jun 16, 2021
1 parent 7fcd1bb commit 4d50be5
Show file tree
Hide file tree
Showing 11 changed files with 575 additions and 269 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQ
github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.31.12/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.38.11 h1:jmxKh557ZRc+Z8fALnGrL01Ctjks2aSUFLb7n/BZoEs=
github.com/aws/aws-sdk-go v1.38.11/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.38.62 h1:w7r48cTciWCJK//YH+oN8HhNXzPDdlucV3XT6KGDMjE=
github.com/aws/aws-sdk-go v1.38.62/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
Expand Down
18 changes: 8 additions & 10 deletions pkg/cloudprovider/aws/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/service/ec2"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/utils/functional"
)

// Capacity cloud provider implementation using AWS Fleet.
Expand All @@ -41,23 +39,23 @@ func (c *Capacity) Create(ctx context.Context, packings []*cloudprovider.Packing
for _, packing := range packings {
constraints := Constraints(*packing.Constraints)
// 1. Get Subnets and constrain by zones
zonalSubnets, err := c.subnetProvider.GetZonalSubnets(ctx, c.provisioner.Spec.Cluster.Name)
subnets, err := c.subnetProvider.Get(ctx, c.provisioner, &constraints)
if err != nil {
return nil, fmt.Errorf("getting zonal subnets, %w", err)
}
zonalSubnetOptions := map[string][]*ec2.Subnet{}
for zone, subnets := range zonalSubnets {
if len(constraints.Zones) == 0 || functional.ContainsString(constraints.Zones, zone) {
zonalSubnetOptions[zone] = subnets
}
}
// zonalSubnetOptions := map[string][]*ec2.Subnet{}
// for zone, subnets := range zonalSubnets {
// if len(constraints.Zones) == 0 || functional.ContainsString(constraints.Zones, zone) {
// zonalSubnetOptions[zone] = subnets
// }
// }
// 2. Get Launch Template
launchTemplate, err := c.launchTemplateProvider.Get(ctx, c.provisioner, &constraints)
if err != nil {
return nil, fmt.Errorf("getting launch template, %w", err)
}
// 3. Create instance
instanceID, err := c.instanceProvider.Create(ctx, launchTemplate, packing.InstanceTypeOptions, zonalSubnets, constraints.GetCapacityType())
instanceID, err := c.instanceProvider.Create(ctx, launchTemplate, packing.InstanceTypeOptions, subnets, constraints.GetCapacityType())
if err != nil {
// TODO Aggregate errors and continue
return nil, fmt.Errorf("creating capacity %w", err)
Expand Down
33 changes: 29 additions & 4 deletions pkg/cloudprovider/aws/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,32 @@ limitations under the License.
package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/utils/functional"
)

const (
CapacityTypeSpot = "spot"
CapacityTypeOnDemand = "on-demand"
defaultLaunchTemplateVersion = "$Default"
DefaultLaunchTemplateVersion = "$Default"
)

var (
AWSLabelPrefix = "node.k8s.aws/"
CapacityTypeLabel = AWSLabelPrefix + "capacity-type"
LaunchTemplateIdLabel = AWSLabelPrefix + "launch-template-id"
LaunchTemplateVersionLabel = AWSLabelPrefix + "launch-template-version"
AllowedLabels = []string{CapacityTypeLabel, LaunchTemplateIdLabel, LaunchTemplateVersionLabel}
AWSToKubeArchitectures = map[string]string{
SubnetNameLabel = AWSLabelPrefix + "subnet"
SubnetTagKeyLabel = AWSLabelPrefix + "subnet-tag-key"
AllowedLabels = []string{
CapacityTypeLabel,
LaunchTemplateIdLabel,
LaunchTemplateVersionLabel,
SubnetNameLabel,
SubnetTagKeyLabel,
}
AWSToKubeArchitectures = map[string]string{
"x86_64": v1alpha1.ArchitectureAmd64,
v1alpha1.ArchitectureArm64: v1alpha1.ArchitectureArm64,
}
Expand Down Expand Up @@ -61,10 +70,26 @@ func (c *Constraints) GetLaunchTemplate() *LaunchTemplate {
}
version, ok := c.Labels[LaunchTemplateVersionLabel]
if !ok {
version = defaultLaunchTemplateVersion
version = DefaultLaunchTemplateVersion
}
return &LaunchTemplate{
Id: &id,
Version: &version,
}
}

func (c *Constraints) GetSubnetName() *string {
subnetName, ok := c.Labels[SubnetNameLabel]
if !ok {
return nil
}
return aws.String(subnetName)
}

func (c *Constraints) GetSubnetTagKey() *string {
subnetTag, ok := c.Labels[SubnetTagKeyLabel]
if !ok {
return nil
}
return aws.String(subnetTag)
}
34 changes: 17 additions & 17 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package aws
import (
"context"
"fmt"
"math/rand"
"strings"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -40,29 +39,30 @@ type InstanceProvider struct {
func (p *InstanceProvider) Create(ctx context.Context,
launchTemplate *LaunchTemplate,
instanceTypeOptions []cloudprovider.InstanceType,
zonalSubnetOptions map[string][]*ec2.Subnet,
subnets []*ec2.Subnet,
capacityType string,
) (*string, error) {
// 1. Construct override options.
var overrides []*ec2.FleetLaunchTemplateOverridesRequest
for i, instanceType := range instanceTypeOptions {
for _, zone := range instanceType.Zones() {
subnets := zonalSubnetOptions[zone]
if len(subnets) == 0 {
continue
for _, subnet := range subnets {
if aws.StringValue(subnet.AvailabilityZone) == zone {
override := &ec2.FleetLaunchTemplateOverridesRequest{
InstanceType: aws.String(instanceType.Name()),
SubnetId: subnet.SubnetId,
}
// Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy
// to reduce the likelihood of getting an excessively large instance type.
// instanceTypeOptions are sorted by vcpus and memory so this prioritizes smaller instance types.
if capacityType == CapacityTypeSpot {
override.Priority = aws.Float64(float64(i))
}
overrides = append(overrides, override)
// FleetAPI cannot span subnets from the same AZ, so break after the first one.
break
}
}
override := &ec2.FleetLaunchTemplateOverridesRequest{
InstanceType: aws.String(instanceType.Name()),
// FleetAPI cannot span subnets from the same AZ, so randomize.
SubnetId: aws.String(*subnets[rand.Intn(len(subnets))].SubnetId),
}
// Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy
// to reduce the likelihood of getting an excessively large instance type.
// instanceTypeOptions are sorted by vcpus and memory so this prioritizes smaller instance types.
if capacityType == CapacityTypeSpot {
override.Priority = aws.Float64(float64(i))
}
overrides = append(overrides, override)
}
}
// 2. Create fleet
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/launchtemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (p *LaunchTemplateProvider) Get(ctx context.Context, provisioner *v1alpha1.
return nil, fmt.Errorf("hashing launch template, %w", err)
}

result := &LaunchTemplate{Version: aws.String(defaultLaunchTemplateVersion)}
result := &LaunchTemplate{Version: aws.String(DefaultLaunchTemplateVersion)}
if cached, ok := p.cache.Get(fmt.Sprint(key)); ok {
result.Id = cached.(*ec2.LaunchTemplate).LaunchTemplateId
return result, nil
Expand Down
74 changes: 55 additions & 19 deletions pkg/cloudprovider/aws/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/patrickmn/go-cache"
"go.uber.org/zap"
)
Expand All @@ -37,32 +38,67 @@ func NewSubnetProvider(ec2api ec2iface.EC2API) *SubnetProvider {
}
}

func (s *SubnetProvider) GetZonalSubnets(ctx context.Context, clusterName string) (map[string][]*ec2.Subnet, error) {
if zonalSubnets, ok := s.cache.Get(clusterName); ok {
return zonalSubnets.(map[string][]*ec2.Subnet), nil
}
zonalSubnets, err := s.getZonalSubnets(ctx, clusterName)
func (s *SubnetProvider) Get(ctx context.Context, provisioner *v1alpha1.Provisioner, constraints *Constraints) ([]*ec2.Subnet, error) {
// 1. Get all viable subnets for this provisioner
subnets, err := s.getSubnets(ctx, provisioner)
if err != nil {
return nil, err
}
s.cache.Set(clusterName, zonalSubnets, CacheTTL)
zap.S().Debugf("Successfully discovered subnets in %d zones for cluster %s", len(zonalSubnets), clusterName)
return zonalSubnets, nil
// 2. Filter by subnet name if constrained
if name := constraints.GetSubnetName(); name != nil {
subnets = filter(byName(aws.StringValue(name)), subnets)
}
// 2. Filter by subnet tag key if constrained
if tagKey := constraints.GetSubnetTagKey(); tagKey != nil {
subnets = filter(byTagKey(*tagKey), subnets)
}
return subnets, nil
}

func (s *SubnetProvider) getZonalSubnets(ctx context.Context, clusterName string) (map[string][]*ec2.Subnet, error) {
describeSubnetOutput, err := s.ec2api.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{
Filters: []*ec2.Filter{{
Name: aws.String("tag-key"),
Values: []*string{aws.String(fmt.Sprintf(ClusterTagKeyFormat, clusterName))},
}},
})
func (s *SubnetProvider) getSubnets(ctx context.Context, provisioner *v1alpha1.Provisioner) ([]*ec2.Subnet, error) {
if subnets, ok := s.cache.Get(provisioner.Spec.Cluster.Name); ok {
return subnets.([]*ec2.Subnet), nil
}
output, err := s.ec2api.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{Filters: []*ec2.Filter{{
Name: aws.String("tag-key"), // Subnets must be tagged for the cluster
Values: []*string{aws.String(fmt.Sprintf(ClusterTagKeyFormat, provisioner.Spec.Cluster.Name))},
}}})
if err != nil {
return nil, fmt.Errorf("describing subnets, %w", err)
}
zonalSubnetMap := map[string][]*ec2.Subnet{}
for _, subnet := range describeSubnetOutput.Subnets {
zonalSubnetMap[*subnet.AvailabilityZone] = append(zonalSubnetMap[*subnet.AvailabilityZone], subnet)
zap.S().Debugf("Successfully discovered %d subnets for cluster %s", len(output.Subnets), provisioner.Spec.Cluster.Name)
s.cache.Set(provisioner.Spec.Cluster.Name, output.Subnets, CacheTTL)
return output.Subnets, nil
}

func filter(predicate func(*ec2.Subnet) bool, subnets []*ec2.Subnet) []*ec2.Subnet {
result := []*ec2.Subnet{}
for _, subnet := range subnets {
if predicate(subnet) {
result = append(result, subnet)
}
}
return result
}

func byName(name string) func(*ec2.Subnet) bool {
return func(subnet *ec2.Subnet) bool {
for _, tag := range subnet.Tags {
if aws.StringValue(tag.Key) == "Name" {
return aws.StringValue(tag.Value) == name
}
}
return false
}
}

func byTagKey(tagKey string) func(*ec2.Subnet) bool {
return func(subnet *ec2.Subnet) bool {
for _, tag := range subnet.Tags {
if aws.StringValue(tag.Key) == tagKey {
return true
}
}
return false
}
return zonalSubnetMap, nil
}
Loading

0 comments on commit 4d50be5

Please sign in to comment.