diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index 36f0bb8e9c5b..0bfb8a265d9c 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -24,6 +24,7 @@ import ( "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ssm" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" @@ -79,7 +80,7 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud return &CloudProvider{ instanceTypeProvider: instanceTypeProvider, subnetProvider: subnetProvider, - instanceProvider: NewInstanceProvider(ec2api, instanceTypeProvider, subnetProvider, sess, options.ClientSet), + instanceProvider: NewInstanceProvider(ec2api, instanceTypeProvider, subnetProvider, ssm.New(sess), options.ClientSet), creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } } @@ -125,18 +126,17 @@ func (c *CloudProvider) create(ctx context.Context, constraints *v1alpha5.Constr return errs } -// Despite accepting a Constraints struct, note that it does not utilize Requirements at all and instead -// returns all available InstanceTypes. +// GetInstanceTypes returns all available InstanceTypes despite accepting a Constraints struct (note that it does not utilize Requirements) func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) { vendorConstraints, err := v1alpha1.Deserialize(constraints) if err != nil { return nil, apis.ErrGeneric(err.Error()) } - allInstanceTypes, err := c.instanceTypeProvider.Get(ctx, vendorConstraints) + instanceTypes, err := c.instanceTypeProvider.Get(ctx, vendorConstraints) if err != nil { return nil, err } - return c.instanceProvider.DiscardICEdInstanceTypes(ctx, allInstanceTypes), nil + return c.instanceProvider.WithoutUnavailableOfferings(ctx, instanceTypes), nil } func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error { @@ -156,11 +156,11 @@ func (c *CloudProvider) Validate(ctx context.Context, constraints *v1alpha5.Cons func (c *CloudProvider) Default(ctx context.Context, constraints *v1alpha5.Constraints) { vendorConstraints, err := v1alpha1.Deserialize(constraints) if err != nil { - logging.FromContext(ctx).Errorf("Failed to deserialize provider, %s", err.Error()) + logging.FromContext(ctx).Fatalf("Failed to deserialize provider, %s", err.Error()) return } vendorConstraints.Default(ctx) if err := vendorConstraints.Serialize(constraints); err != nil { - logging.FromContext(ctx).Errorf("Failed to serialize provider, %s", err.Error()) + logging.FromContext(ctx).Fatalf("Failed to serialize provider, %s", err.Error()) } } diff --git a/pkg/cloudprovider/aws/errors.go b/pkg/cloudprovider/aws/errors.go index e00c4a6f455c..584440c462de 100644 --- a/pkg/cloudprovider/aws/errors.go +++ b/pkg/cloudprovider/aws/errors.go @@ -25,9 +25,12 @@ var ( "InvalidInstanceID.NotFound", "InvalidLaunchTemplateName.NotFoundException", } - iceErrorCode = "InsufficientInstanceCapacity" ) +// InsufficientCapacityErrorCode indicates that EC2 is temporarily lacking capacity for this +// instance type and availability zone combination +const InsufficientCapacityErrorCode = "InsufficientInstanceCapacity" + // isNotFound returns true if the err is an AWS error (even if it's // wrapped) and is a known to mean "not found" (as opposed to a more // serious or unexpected error) @@ -38,7 +41,3 @@ func isNotFound(err error) bool { } return false } - -func isInsufficientCapacityCode(errorCode string) bool { - return errorCode == iceErrorCode -} diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 7b9380aec9a5..de3aa5ad06c4 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -22,7 +22,6 @@ import ( "github.com/avast/retry-go" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/aws-sdk-go/service/ssm" @@ -37,32 +36,38 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" + "github.com/awslabs/karpenter/pkg/utils/injectabletime" "github.com/awslabs/karpenter/pkg/utils/options" ) const ( - iceCacheODTTL = 15 * time.Second - iceCacheSpotTTL = 45 * time.Second - iceCacheCleanupInterval = 5 * time.Minute + InsufficientCapacityErrorCacheOnDemandTTL = 15 * time.Second + InsufficientCapacityErrorCacheSpotTTL = 45 * time.Second + InsufficientCapacityErrorCacheCleanupInterval = 5 * time.Minute ) +type CachedZones struct { + zones sets.String + initialCacheTime time.Time +} + type InstanceProvider struct { ec2api ec2iface.EC2API instanceTypeProvider *InstanceTypeProvider subnetProvider *SubnetProvider launchTemplateProvider *LaunchTemplateProvider - iceCache *cache.Cache + unavailableOfferings *cache.Cache } -func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, sess *session.Session, +func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, ssm *ssm.SSM, clientSet *kubernetes.Clientset) *InstanceProvider { return &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider, NewLaunchTemplateProvider( ec2api, - NewAMIProvider(ssm.New(sess), clientSet), + NewAMIProvider(ssm, clientSet), NewSecurityGroupProvider(ec2api), ), - cache.New(iceCacheSpotTTL, iceCacheCleanupInterval), + cache.New(-1, InsufficientCapacityErrorCacheCleanupInterval), } } @@ -128,32 +133,35 @@ func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error { return nil } -// The intention is to remove Offerings from the provided possible InstanceTypes based on recently observed ICEs, which will -// indirectly impact the packer to keep it from spinning its wheels on packings that are doomed to fail due to EC2 capacity constraints. -func (p *InstanceProvider) DiscardICEdInstanceTypes(ctx context.Context, instanceTypes []InstanceType) []cloudprovider.InstanceType { +// WithoutUnavailableOfferings will remove Offerings from the provided possible InstanceTypes based on recently observed +// insufficient capacity errors, which will indirectly impact the packer to keep it from spending time on packings that are +// likely to fail due to short-term EC2 capacity constraints. +func (p *InstanceProvider) WithoutUnavailableOfferings(ctx context.Context, instanceTypes []InstanceType) []cloudprovider.InstanceType { var cleanedInstanceTypes = []cloudprovider.InstanceType{} - for index, instanceType := range instanceTypes { - odIceZones, hasODIces := p.iceCache.Get(createIceCacheKey(instanceType.Name(), v1alpha1.CapacityTypeOnDemand)) - spotIceZones, hasSpotIces := p.iceCache.Get(createIceCacheKey(instanceType.Name(), v1alpha1.CapacityTypeSpot)) + for _, instanceType := range instanceTypes { + currInstanceType := instanceType + cpInstanceType := cloudprovider.InstanceType(&currInstanceType) + odIceZones, hasODIces := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(instanceType.Name(), v1alpha1.CapacityTypeOnDemand)) + spotIceZones, hasSpotIces := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(instanceType.Name(), v1alpha1.CapacityTypeSpot)) if !hasODIces && !hasSpotIces { - cleanedInstanceTypes = append(cleanedInstanceTypes, &instanceTypes[index]) + cleanedInstanceTypes = append(cleanedInstanceTypes, cpInstanceType) continue } cleanedOfferings := []cloudprovider.Offering{} - for _, offering := range instanceType.Offerings() { - if hasODIces && offering.CapacityType == v1alpha1.CapacityTypeOnDemand { - if !odIceZones.(sets.String).Has(offering.Zone) { + for _, offering := range currInstanceType.Offerings() { + if offering.CapacityType == v1alpha1.CapacityTypeOnDemand { + if !hasODIces || !odIceZones.(CachedZones).zones.Has(offering.Zone) { cleanedOfferings = append(cleanedOfferings, offering) } - } else if hasSpotIces { - if !spotIceZones.(sets.String).Has(offering.Zone) { + } else { + if !hasSpotIces || !spotIceZones.(CachedZones).zones.Has(offering.Zone) { cleanedOfferings = append(cleanedOfferings, offering) } } } if len(cleanedOfferings) > 0 { instanceType.AvailableOfferings = cleanedOfferings - cleanedInstanceTypes = append(cleanedInstanceTypes, &instanceTypes[index]) + cleanedInstanceTypes = append(cleanedInstanceTypes, cpInstanceType) } } return cleanedInstanceTypes @@ -197,13 +205,13 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a if err != nil { return nil, fmt.Errorf("creating fleet %w", err) } - if len(createFleetOutput.Errors) > 0 { - for _, fleetError := range createFleetOutput.Errors { - if isInsufficientCapacityCode(*fleetError.ErrorCode) { - p.addToIceCache(fleetError.LaunchTemplateAndOverrides.Overrides, capacityType) - } + insufficientCapacityOfferings := map[string]sets.String{} + for _, err := range createFleetOutput.Errors { + if InsufficientCapacityErrorCode == aws.StringValue(err.ErrorCode) { + createOrAppendToMapValue(insufficientCapacityOfferings, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone)) } } + p.updateUnavailableOfferingsCache(ctx, insufficientCapacityOfferings, capacityType) instanceIds := combineFleetInstances(*createFleetOutput) if len(instanceIds) == 0 { return nil, combineFleetErrors(createFleetOutput.Errors) @@ -247,12 +255,12 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst } for _, subnet := range subnets { subnetZone := aws.StringValue(subnet.AvailabilityZone) - if subnetZone == offering.Zone && (constrainedZones == nil || constrainedZones.Has(subnetZone)) { + if subnetZone == offering.Zone && constrainedZones.Has(subnetZone) { override := &ec2.FleetLaunchTemplateOverridesRequest{ InstanceType: aws.String(instanceType.Name()), SubnetId: subnet.SubnetId, - // This is technically redundant, but is useful if we have to parse ICEs from CreateFleet - // in figuring out the zone rather than additional API calls to look up the subnet + // This is technically redundant, but is useful if we have to parse insufficient capacity errors from + // CreateFleet so that we can figure out the zone rather than additional API calls to look up the subnet AvailabilityZone: subnet.AvailabilityZone, } // Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy @@ -327,19 +335,30 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes return nil, fmt.Errorf("unrecognized instance type %s", aws.StringValue(instance.InstanceType)) } -func (p *InstanceProvider) addToIceCache(overrides *ec2.FleetLaunchTemplateOverrides, capacityType string) { - instanceType := aws.StringValue(overrides.InstanceType) - zone := aws.StringValue(overrides.AvailabilityZone) - cacheTTL := iceCacheSpotTTL +func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, offerings map[string]sets.String, capacityType string) { + cacheTTL := InsufficientCapacityErrorCacheSpotTTL if capacityType == v1alpha1.CapacityTypeOnDemand { - cacheTTL = iceCacheODTTL + cacheTTL = InsufficientCapacityErrorCacheOnDemandTTL } - cacheKey := createIceCacheKey(instanceType, capacityType) - if zones, exists := p.iceCache.Get(cacheKey); exists { - zones.(sets.String).Insert(zone) - p.iceCache.Set(cacheKey, zones, cacheTTL) - } else { - p.iceCache.Set(cacheKey, sets.String{zone: sets.Empty{}}, cacheTTL) + + for instanceType, zones := range offerings { + cacheKey := unavailableOfferingsCacheKey(instanceType, capacityType) + logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zones: %s, capacityType: %s }, avoiding it for %s", + InsufficientCapacityErrorCode, + instanceType, + zones, + capacityType, + cacheTTL) + // because we don't track a cache TTL for each zone and don't remove individual zones from the cache we need to mitigate the risk of a zone + // staying in the cache forever, so we will only add to an existing cached entry within a certain time period (after expiry, we just replace the cache + // entry and reset the clock on it) + if existingZones, exists := p.unavailableOfferings.Get(cacheKey); exists && existingZones.(CachedZones).initialCacheTime.UTC().Add(InsufficientCapacityErrorCacheCleanupInterval).Before(injectabletime.Now()) { + existingZones.(CachedZones).zones.Insert(zones.UnsortedList()...) + // though we're updating existingZones already, we still need to call Set to update the cached entry's TTL + p.unavailableOfferings.Set(cacheKey, existingZones, cacheTTL) + } else { + p.unavailableOfferings.Set(cacheKey, CachedZones{zones: zones, initialCacheTime: injectabletime.Now()}, cacheTTL) + } } } @@ -386,6 +405,15 @@ func combineReservations(reservations []*ec2.Reservation) []*ec2.Instance { return instances } -func createIceCacheKey(instanceType string, capacityType string) string { - return fmt.Sprintf("%s-%s", instanceType, capacityType) +func unavailableOfferingsCacheKey(instanceType string, capacityType string) string { + return fmt.Sprintf("%s:%s", instanceType, capacityType) +} + +func createOrAppendToMapValue(mapToUpdate map[string]sets.String, key string, newValue string) { + existingValueSet, hasValue := mapToUpdate[key] + if hasValue { + existingValueSet.Insert(newValue) + } else { + mapToUpdate[key] = sets.String{newValue: sets.Empty{}} + } }