Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bwagner5 committed Mar 9, 2022
1 parent 368b106 commit b3eb44c
Show file tree
Hide file tree
Showing 17 changed files with 267 additions and 253 deletions.
100 changes: 0 additions & 100 deletions pkg/cloudprovider/aws/ami.go

This file was deleted.

5 changes: 4 additions & 1 deletion pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ssm"
"github.com/patrickmn/go-cache"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/aws/karpenter/pkg/cloudprovider/aws/ltresolver"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/project"
Expand Down Expand Up @@ -87,7 +89,8 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud
NewLaunchTemplateProvider(
ctx,
ec2api,
NewAMIProvider(ssm.New(sess), options.ClientSet),
options.ClientSet,
ltresolver.New(ssm.New(sess), cache.New(CacheTTL, CacheCleanupInterval)),
NewSecurityGroupProvider(ec2api),
getCABundle(ctx),
),
Expand Down
18 changes: 12 additions & 6 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ const (
CreationQPS = 2
// CreationBurst limits the additional burst requests.
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html#throttling-limits
CreationBurst = 100
CreationBurst = 100
nvidiaGPUResourceName v1.ResourceName = "nvidia.com/gpu"
amdGPUResourceName v1.ResourceName = "amd.com/gpu"
awsNeuronResourceName v1.ResourceName = "aws.amazon.com/neuron"
)

type InstanceProvider struct {
Expand All @@ -55,11 +58,14 @@ type InstanceProvider struct {
launchTemplateProvider *LaunchTemplateProvider
}

const (
nvidiaGPUResourceName v1.ResourceName = "nvidia.com/gpu"
amdGPUResourceName v1.ResourceName = "amd.com/gpu"
awsNeuronResourceName v1.ResourceName = "aws.amazon.com/neuron"
)
func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, launchTemplateProvider *LaunchTemplateProvider) *InstanceProvider {
return &InstanceProvider{
ec2api: ec2api,
instanceTypeProvider: instanceTypeProvider,
subnetProvider: subnetProvider,
launchTemplateProvider: launchTemplateProvider,
}
}

// Create an instance given the constraints.
// instanceTypes should be sorted by priority for spot capacity type.
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloudprovider/aws/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (p *InstanceTypeProvider) createOfferings(instanceType *InstanceType, subne
}

func (p *InstanceTypeProvider) getInstanceTypeZones(ctx context.Context) (map[string]sets.String, error) {
if Cached, ok := p.cache.Get(InstanceTypeZonesCacheKey); ok {
return Cached.(map[string]sets.String), nil
if cached, ok := p.cache.Get(InstanceTypeZonesCacheKey); ok {
return cached.(map[string]sets.String), nil
}
zones := map[string]sets.String{}
if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")},
Expand All @@ -133,8 +133,8 @@ func (p *InstanceTypeProvider) getInstanceTypeZones(ctx context.Context) (map[st

// getInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters
func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) (map[string]*InstanceType, error) {
if Cached, ok := p.cache.Get(InstanceTypesCacheKey); ok {
return Cached.(map[string]*InstanceType), nil
if cached, ok := p.cache.Get(InstanceTypesCacheKey); ok {
return cached.(map[string]*InstanceType), nil
}
instanceTypes := map[string]*InstanceType{}
if err := p.ec2api.DescribeInstanceTypesPagesWithContext(ctx, &ec2.DescribeInstanceTypesInput{
Expand Down
67 changes: 43 additions & 24 deletions pkg/cloudprovider/aws/launchtemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -28,35 +29,39 @@ import (
"github.com/patrickmn/go-cache"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"

"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/aws/karpenter/pkg/cloudprovider/aws/launchtemplate"
"github.com/aws/karpenter/pkg/cloudprovider/aws/ltresolver"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/aws/karpenter/pkg/utils/injection"
)

const (
launchTemplateNameFormat = "Karpenter-%s-%s"
launchTemplateNameFormat = "Karpenter-%s-%s"
kubernetesVersionCacheKey = "kubernetesVersion"
)

type LaunchTemplateProvider struct {
sync.Mutex
ec2api ec2iface.EC2API
amiProvider *AMIProvider
clientSet *kubernetes.Clientset
ltResolver *ltresolver.Resolver
securityGroupProvider *SecurityGroupProvider
cache *cache.Cache
logger *zap.SugaredLogger
caBundle *string
}

func NewLaunchTemplateProvider(ctx context.Context, ec2api ec2iface.EC2API, amiProvider *AMIProvider, securityGroupProvider *SecurityGroupProvider, caBundle *string) *LaunchTemplateProvider {
func NewLaunchTemplateProvider(ctx context.Context, ec2api ec2iface.EC2API, clientSet *kubernetes.Clientset, ltResolver *ltresolver.Resolver, securityGroupProvider *SecurityGroupProvider, caBundle *string) *LaunchTemplateProvider {
l := &LaunchTemplateProvider{
ec2api: ec2api,
clientSet: clientSet,
logger: logging.FromContext(ctx).Named("launchtemplate"),
amiProvider: amiProvider,
ltResolver: ltResolver,
securityGroupProvider: securityGroupProvider,
cache: cache.New(CacheTTL, CacheCleanupInterval),
caBundle: caBundle,
Expand All @@ -66,7 +71,7 @@ func NewLaunchTemplateProvider(ctx context.Context, ec2api ec2iface.EC2API, amiP
return l
}

func launchTemplateName(options *launchtemplate.Resolved) string {
func launchTemplateName(options *ltresolver.ResolvedTemplate) string {
hash, err := hashstructure.Hash(options, hashstructure.FormatV2, nil)
if err != nil {
panic(fmt.Sprintf("hashing launch template, %s", err))
Expand All @@ -88,35 +93,35 @@ func (p *LaunchTemplateProvider) Get(ctx context.Context, constraints *v1alpha1.
if err != nil {
return nil, err
}
// Get constrained AMI ID
amis, err := p.amiProvider.Get(ctx, constraints, instanceTypes, launchtemplate.GetAMIFamily(constraints.AMIFamily, nil))
kubeServerVersion, err := p.kubeServerVersion(ctx)
if err != nil {
return nil, err
}
// Construct launch templates
resolvedLaunchTemplates := p.ltResolver.Resolve(constraints, instanceTypes, &ltresolver.Options{
ClusterName: injection.GetOptions(ctx).ClusterName,
ClusterEndpoint: injection.GetOptions(ctx).ClusterEndpoint,
AWSENILimitedPodDensity: injection.GetOptions(ctx).AWSENILimitedPodDensity,
InstanceProfile: instanceProfile,
SecurityGroupsIDs: securityGroupsIDs,
Tags: constraints.Tags,
Labels: functional.UnionStringMaps(constraints.Labels, additionalLabels),
CABundle: p.caBundle,
KubernetesVersion: kubeServerVersion,
})

launchTemplates := map[string][]cloudprovider.InstanceType{}
for amiID, instanceTypes := range amis {
for _, resolvedLaunchTemplate := range resolvedLaunchTemplates {
// Ensure the launch template exists, or create it
launchTemplate, err := p.ensureLaunchTemplate(ctx, launchtemplate.Get(constraints, instanceTypes, &launchtemplate.Options{
ClusterName: injection.GetOptions(ctx).ClusterName,
ClusterEndpoint: injection.GetOptions(ctx).ClusterEndpoint,
AWSENILimitedPodDensity: injection.GetOptions(ctx).AWSENILimitedPodDensity,
InstanceProfile: instanceProfile,
AMIID: amiID,
SecurityGroupsIDs: securityGroupsIDs,
Tags: constraints.Tags,
Labels: functional.UnionStringMaps(constraints.Labels, additionalLabels),
CABundle: p.caBundle,
}))
ec2LaunchTemplate, err := p.ensureLaunchTemplate(ctx, resolvedLaunchTemplate)
if err != nil {
return nil, err
}
launchTemplates[aws.StringValue(launchTemplate.LaunchTemplateName)] = instanceTypes
launchTemplates[*ec2LaunchTemplate.LaunchTemplateName] = resolvedLaunchTemplate.InstanceTypes
}
return launchTemplates, nil
}

func (p *LaunchTemplateProvider) ensureLaunchTemplate(ctx context.Context, options *launchtemplate.Resolved) (*ec2.LaunchTemplate, error) {
func (p *LaunchTemplateProvider) ensureLaunchTemplate(ctx context.Context, options *ltresolver.ResolvedTemplate) (*ec2.LaunchTemplate, error) {
// Ensure that multiple threads don't attempt to create the same launch template
p.Lock()
defer p.Unlock()
Expand Down Expand Up @@ -150,7 +155,7 @@ func (p *LaunchTemplateProvider) ensureLaunchTemplate(ctx context.Context, optio
return launchTemplate, nil
}

func (p *LaunchTemplateProvider) createLaunchTemplate(ctx context.Context, options *launchtemplate.Resolved) (*ec2.LaunchTemplate, error) {
func (p *LaunchTemplateProvider) createLaunchTemplate(ctx context.Context, options *ltresolver.ResolvedTemplate) (*ec2.LaunchTemplate, error) {
output, err := p.ec2api.CreateLaunchTemplateWithContext(ctx, &ec2.CreateLaunchTemplateInput{
LaunchTemplateName: aws.String(launchTemplateName(options)),
LaunchTemplateData: &ec2.RequestLaunchTemplateData{
Expand Down Expand Up @@ -241,3 +246,17 @@ func (p *LaunchTemplateProvider) getInstanceProfile(ctx context.Context, constra
}
return defaultProfile, nil
}

func (p *LaunchTemplateProvider) kubeServerVersion(ctx context.Context) (string, error) {
if version, ok := p.cache.Get(kubernetesVersionCacheKey); ok {
return version.(string), nil
}
serverVersion, err := p.clientSet.Discovery().ServerVersion()
if err != nil {
return "", err
}
version := fmt.Sprintf("%s.%s", serverVersion.Major, strings.TrimSuffix(serverVersion.Minor, "+"))
p.cache.SetDefault(kubernetesVersionCacheKey, version)
logging.FromContext(ctx).Debugf("Discovered kubernetes version %s", version)
return version, nil
}
Loading

0 comments on commit b3eb44c

Please sign in to comment.