Skip to content

Commit

Permalink
Addressed first round of comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Elton Pinto committed Nov 23, 2021
1 parent 9436849 commit 4fb6c8c
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 54 deletions.
14 changes: 7 additions & 7 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ssm"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
Expand Down Expand Up @@ -79,7 +80,7 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud
return &CloudProvider{
instanceTypeProvider: instanceTypeProvider,
subnetProvider: subnetProvider,
instanceProvider: NewInstanceProvider(ec2api, instanceTypeProvider, subnetProvider, sess, options.ClientSet),
instanceProvider: NewInstanceProvider(ec2api, instanceTypeProvider, subnetProvider, ssm.New(sess), options.ClientSet),
creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst),
}
}
Expand Down Expand Up @@ -125,18 +126,17 @@ func (c *CloudProvider) create(ctx context.Context, constraints *v1alpha5.Constr
return errs
}

// Despite accepting a Constraints struct, note that it does not utilize Requirements at all and instead
// returns all available InstanceTypes.
// GetInstanceTypes returns all available InstanceTypes despite accepting a Constraints struct (note that it does not utilize Requirements)
func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) {
vendorConstraints, err := v1alpha1.Deserialize(constraints)
if err != nil {
return nil, apis.ErrGeneric(err.Error())
}
allInstanceTypes, err := c.instanceTypeProvider.Get(ctx, vendorConstraints)
instanceTypes, err := c.instanceTypeProvider.Get(ctx, vendorConstraints)
if err != nil {
return nil, err
}
return c.instanceProvider.DiscardICEdInstanceTypes(ctx, allInstanceTypes), nil
return c.instanceProvider.WithoutUnavailableOfferings(ctx, instanceTypes), nil
}

func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error {
Expand All @@ -156,11 +156,11 @@ func (c *CloudProvider) Validate(ctx context.Context, constraints *v1alpha5.Cons
func (c *CloudProvider) Default(ctx context.Context, constraints *v1alpha5.Constraints) {
vendorConstraints, err := v1alpha1.Deserialize(constraints)
if err != nil {
logging.FromContext(ctx).Errorf("Failed to deserialize provider, %s", err.Error())
logging.FromContext(ctx).Fatalf("Failed to deserialize provider, %s", err.Error())
return
}
vendorConstraints.Default(ctx)
if err := vendorConstraints.Serialize(constraints); err != nil {
logging.FromContext(ctx).Errorf("Failed to serialize provider, %s", err.Error())
logging.FromContext(ctx).Fatalf("Failed to serialize provider, %s", err.Error())
}
}
9 changes: 4 additions & 5 deletions pkg/cloudprovider/aws/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ var (
"InvalidInstanceID.NotFound",
"InvalidLaunchTemplateName.NotFoundException",
}
iceErrorCode = "InsufficientInstanceCapacity"
)

// InsufficientCapacityErrorCode indicates that EC2 is temporarily lacking capacity for this
// instance type and availability zone combination
const InsufficientCapacityErrorCode = "InsufficientInstanceCapacity"

// isNotFound returns true if the err is an AWS error (even if it's
// wrapped) and is a known to mean "not found" (as opposed to a more
// serious or unexpected error)
Expand All @@ -38,7 +41,3 @@ func isNotFound(err error) bool {
}
return false
}

func isInsufficientCapacityCode(errorCode string) bool {
return errorCode == iceErrorCode
}
112 changes: 70 additions & 42 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/avast/retry-go"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/ssm"
Expand All @@ -37,32 +36,38 @@ import (
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/awslabs/karpenter/pkg/utils/injectabletime"
"github.com/awslabs/karpenter/pkg/utils/options"
)

const (
iceCacheODTTL = 15 * time.Second
iceCacheSpotTTL = 45 * time.Second
iceCacheCleanupInterval = 5 * time.Minute
InsufficientCapacityErrorCacheOnDemandTTL = 15 * time.Second
InsufficientCapacityErrorCacheSpotTTL = 45 * time.Second
InsufficientCapacityErrorCacheCleanupInterval = 5 * time.Minute
)

type CachedZones struct {
zones sets.String
initialCacheTime time.Time
}

type InstanceProvider struct {
ec2api ec2iface.EC2API
instanceTypeProvider *InstanceTypeProvider
subnetProvider *SubnetProvider
launchTemplateProvider *LaunchTemplateProvider
iceCache *cache.Cache
unavailableOfferings *cache.Cache
}

func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, sess *session.Session,
func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, ssm *ssm.SSM,
clientSet *kubernetes.Clientset) *InstanceProvider {
return &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider,
NewLaunchTemplateProvider(
ec2api,
NewAMIProvider(ssm.New(sess), clientSet),
NewAMIProvider(ssm, clientSet),
NewSecurityGroupProvider(ec2api),
),
cache.New(iceCacheSpotTTL, iceCacheCleanupInterval),
cache.New(-1, InsufficientCapacityErrorCacheCleanupInterval),
}
}

Expand Down Expand Up @@ -128,32 +133,35 @@ func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error {
return nil
}

// The intention is to remove Offerings from the provided possible InstanceTypes based on recently observed ICEs, which will
// indirectly impact the packer to keep it from spinning its wheels on packings that are doomed to fail due to EC2 capacity constraints.
func (p *InstanceProvider) DiscardICEdInstanceTypes(ctx context.Context, instanceTypes []InstanceType) []cloudprovider.InstanceType {
// WithoutUnavailableOfferings will remove Offerings from the provided possible InstanceTypes based on recently observed
// insufficient capacity errors, which will indirectly impact the packer to keep it from spending time on packings that are
// likely to fail due to short-term EC2 capacity constraints.
func (p *InstanceProvider) WithoutUnavailableOfferings(ctx context.Context, instanceTypes []InstanceType) []cloudprovider.InstanceType {
var cleanedInstanceTypes = []cloudprovider.InstanceType{}
for index, instanceType := range instanceTypes {
odIceZones, hasODIces := p.iceCache.Get(createIceCacheKey(instanceType.Name(), v1alpha1.CapacityTypeOnDemand))
spotIceZones, hasSpotIces := p.iceCache.Get(createIceCacheKey(instanceType.Name(), v1alpha1.CapacityTypeSpot))
for _, instanceType := range instanceTypes {
currInstanceType := instanceType
cpInstanceType := cloudprovider.InstanceType(&currInstanceType)
odIceZones, hasODIces := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(instanceType.Name(), v1alpha1.CapacityTypeOnDemand))
spotIceZones, hasSpotIces := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(instanceType.Name(), v1alpha1.CapacityTypeSpot))
if !hasODIces && !hasSpotIces {
cleanedInstanceTypes = append(cleanedInstanceTypes, &instanceTypes[index])
cleanedInstanceTypes = append(cleanedInstanceTypes, cpInstanceType)
continue
}
cleanedOfferings := []cloudprovider.Offering{}
for _, offering := range instanceType.Offerings() {
if hasODIces && offering.CapacityType == v1alpha1.CapacityTypeOnDemand {
if !odIceZones.(sets.String).Has(offering.Zone) {
for _, offering := range currInstanceType.Offerings() {
if offering.CapacityType == v1alpha1.CapacityTypeOnDemand {
if !hasODIces || !odIceZones.(CachedZones).zones.Has(offering.Zone) {
cleanedOfferings = append(cleanedOfferings, offering)
}
} else if hasSpotIces {
if !spotIceZones.(sets.String).Has(offering.Zone) {
} else {
if !hasSpotIces || !spotIceZones.(CachedZones).zones.Has(offering.Zone) {
cleanedOfferings = append(cleanedOfferings, offering)
}
}
}
if len(cleanedOfferings) > 0 {
instanceType.AvailableOfferings = cleanedOfferings
cleanedInstanceTypes = append(cleanedInstanceTypes, &instanceTypes[index])
cleanedInstanceTypes = append(cleanedInstanceTypes, cpInstanceType)
}
}
return cleanedInstanceTypes
Expand Down Expand Up @@ -197,13 +205,13 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a
if err != nil {
return nil, fmt.Errorf("creating fleet %w", err)
}
if len(createFleetOutput.Errors) > 0 {
for _, fleetError := range createFleetOutput.Errors {
if isInsufficientCapacityCode(*fleetError.ErrorCode) {
p.addToIceCache(fleetError.LaunchTemplateAndOverrides.Overrides, capacityType)
}
insufficientCapacityOfferings := map[string]sets.String{}
for _, err := range createFleetOutput.Errors {
if InsufficientCapacityErrorCode == aws.StringValue(err.ErrorCode) {
createOrAppendToMapValue(insufficientCapacityOfferings, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone))
}
}
p.updateUnavailableOfferingsCache(ctx, insufficientCapacityOfferings, capacityType)
instanceIds := combineFleetInstances(*createFleetOutput)
if len(instanceIds) == 0 {
return nil, combineFleetErrors(createFleetOutput.Errors)
Expand Down Expand Up @@ -247,12 +255,12 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst
}
for _, subnet := range subnets {
subnetZone := aws.StringValue(subnet.AvailabilityZone)
if subnetZone == offering.Zone && (constrainedZones == nil || constrainedZones.Has(subnetZone)) {
if subnetZone == offering.Zone && constrainedZones.Has(subnetZone) {
override := &ec2.FleetLaunchTemplateOverridesRequest{
InstanceType: aws.String(instanceType.Name()),
SubnetId: subnet.SubnetId,
// This is technically redundant, but is useful if we have to parse ICEs from CreateFleet
// in figuring out the zone rather than additional API calls to look up the subnet
// This is technically redundant, but is useful if we have to parse insufficient capacity errors from
// CreateFleet so that we can figure out the zone rather than additional API calls to look up the subnet
AvailabilityZone: subnet.AvailabilityZone,
}
// Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy
Expand Down Expand Up @@ -327,19 +335,30 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes
return nil, fmt.Errorf("unrecognized instance type %s", aws.StringValue(instance.InstanceType))
}

func (p *InstanceProvider) addToIceCache(overrides *ec2.FleetLaunchTemplateOverrides, capacityType string) {
instanceType := aws.StringValue(overrides.InstanceType)
zone := aws.StringValue(overrides.AvailabilityZone)
cacheTTL := iceCacheSpotTTL
func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, offerings map[string]sets.String, capacityType string) {
cacheTTL := InsufficientCapacityErrorCacheSpotTTL
if capacityType == v1alpha1.CapacityTypeOnDemand {
cacheTTL = iceCacheODTTL
cacheTTL = InsufficientCapacityErrorCacheOnDemandTTL
}
cacheKey := createIceCacheKey(instanceType, capacityType)
if zones, exists := p.iceCache.Get(cacheKey); exists {
zones.(sets.String).Insert(zone)
p.iceCache.Set(cacheKey, zones, cacheTTL)
} else {
p.iceCache.Set(cacheKey, sets.String{zone: sets.Empty{}}, cacheTTL)

for instanceType, zones := range offerings {
cacheKey := unavailableOfferingsCacheKey(instanceType, capacityType)
logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zones: %s, capacityType: %s }, avoiding it for %s",
InsufficientCapacityErrorCode,
instanceType,
zones,
capacityType,
cacheTTL)
// because we don't track a cache TTL for each zone and don't remove individual zones from the cache we need to mitigate the risk of a zone
// staying in the cache forever, so we will only add to an existing cached entry within a certain time period (after expiry, we just replace the cache
// entry and reset the clock on it)
if existingZones, exists := p.unavailableOfferings.Get(cacheKey); exists && existingZones.(CachedZones).initialCacheTime.UTC().Add(InsufficientCapacityErrorCacheCleanupInterval).Before(injectabletime.Now()) {
existingZones.(CachedZones).zones.Insert(zones.UnsortedList()...)
// though we're updating existingZones already, we still need to call Set to update the cached entry's TTL
p.unavailableOfferings.Set(cacheKey, existingZones, cacheTTL)
} else {
p.unavailableOfferings.Set(cacheKey, CachedZones{zones: zones, initialCacheTime: injectabletime.Now()}, cacheTTL)
}
}
}

Expand Down Expand Up @@ -386,6 +405,15 @@ func combineReservations(reservations []*ec2.Reservation) []*ec2.Instance {
return instances
}

func createIceCacheKey(instanceType string, capacityType string) string {
return fmt.Sprintf("%s-%s", instanceType, capacityType)
func unavailableOfferingsCacheKey(instanceType string, capacityType string) string {
return fmt.Sprintf("%s:%s", instanceType, capacityType)
}

func createOrAppendToMapValue(mapToUpdate map[string]sets.String, key string, newValue string) {
existingValueSet, hasValue := mapToUpdate[key]
if hasValue {
existingValueSet.Insert(newValue)
} else {
mapToUpdate[key] = sets.String{newValue: sets.Empty{}}
}
}

0 comments on commit 4fb6c8c

Please sign in to comment.