From 5414947a97acb8244b07103becd9cb14fb05dae5 Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Fri, 19 Nov 2021 08:04:40 +0000 Subject: [PATCH 01/11] Implement ICE caching for AWS cloud provider and fix bug in filtering for SubnetProvider --- pkg/cloudprovider/aws/cloudprovider.go | 19 +++--- pkg/cloudprovider/aws/errors.go | 5 ++ pkg/cloudprovider/aws/instance.go | 93 ++++++++++++++++++++++++-- pkg/cloudprovider/aws/instancetypes.go | 12 ++-- pkg/cloudprovider/aws/subnets.go | 12 +--- pkg/cloudprovider/aws/suite_test.go | 8 ++- 6 files changed, 116 insertions(+), 33 deletions(-) diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index 76d176fe035a..32cc0a3224e6 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -24,7 +24,6 @@ import ( "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" - "github.com/aws/aws-sdk-go/service/ssm" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" @@ -80,14 +79,8 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud return &CloudProvider{ instanceTypeProvider: instanceTypeProvider, subnetProvider: subnetProvider, - instanceProvider: &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider, - NewLaunchTemplateProvider( - ec2api, - NewAMIProvider(ssm.New(sess), options.ClientSet), - NewSecurityGroupProvider(ec2api), - ), - }, - creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), + instanceProvider: NewInstanceProvider(ec2api, instanceTypeProvider, subnetProvider, sess, options.ClientSet), + creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } } @@ -132,12 +125,18 @@ func (c *CloudProvider) create(ctx context.Context, constraints *v1alpha5.Constr return errs } +// Despite accepting a Constraints struct, note that it does not utilize Requirements at all and instead +// returns all available InstanceTypes. func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) { vendorConstraints, err := v1alpha1.Deserialize(constraints) if err != nil { return nil, apis.ErrGeneric(err.Error()) } - return c.instanceTypeProvider.Get(ctx, vendorConstraints) + allInstanceTypes, err := c.instanceTypeProvider.Get(ctx, vendorConstraints) + if err != nil { + return nil, err + } + return c.instanceProvider.DiscardICEdInstanceTypes(ctx, allInstanceTypes), nil } func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error { diff --git a/pkg/cloudprovider/aws/errors.go b/pkg/cloudprovider/aws/errors.go index 50c1f3e91528..77425047923d 100644 --- a/pkg/cloudprovider/aws/errors.go +++ b/pkg/cloudprovider/aws/errors.go @@ -25,6 +25,7 @@ var ( "InvalidInstanceID.NotFound", "InvalidLaunchTemplateName.NotFoundException", } + iceErrorCode = "InsufficientInstanceCapacity" ) // isNotFound returns true if the err is an AWS error (even if it's @@ -37,3 +38,7 @@ func isNotFound(err error) bool { } return false } + +func isInsufficientCapacityCode(errorCode string) bool { + return errorCode == iceErrorCode +} diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 582583403b3a..9be88529df91 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -22,12 +22,16 @@ import ( "github.com/avast/retry-go" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" + "github.com/aws/aws-sdk-go/service/ssm" + "github.com/patrickmn/go-cache" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" "knative.dev/pkg/logging" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -36,11 +40,30 @@ import ( "github.com/aws/karpenter/pkg/utils/injection" ) +const ( + iceCacheODTTL = 15 * time.Second + iceCacheSpotTTL = 45 * time.Second + iceCacheCleanupInterval = 5 * time.Minute +) + type InstanceProvider struct { ec2api ec2iface.EC2API instanceTypeProvider *InstanceTypeProvider subnetProvider *SubnetProvider launchTemplateProvider *LaunchTemplateProvider + iceCache *cache.Cache +} + +func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, sess *session.Session, + clientSet *kubernetes.Clientset) *InstanceProvider { + return &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider, + NewLaunchTemplateProvider( + ec2api, + NewAMIProvider(ssm.New(sess), clientSet), + NewSecurityGroupProvider(ec2api), + ), + cache.New(iceCacheSpotTTL, iceCacheCleanupInterval), + } } // Create an instance given the constraints. @@ -105,6 +128,37 @@ func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error { return nil } +// The intention is to remove Offerings from the provided possible InstanceTypes based on recently observed ICEs, which will +// indirectly impact the packer to keep it from spinning its wheels on packings that are doomed to fail due to EC2 capacity constraints. +func (p *InstanceProvider) DiscardICEdInstanceTypes(ctx context.Context, instanceTypes []InstanceType) []cloudprovider.InstanceType { + var cleanedInstanceTypes = []cloudprovider.InstanceType{} + for index, instanceType := range instanceTypes { + odIceZones, hasODIces := p.iceCache.Get(createIceCacheKey(instanceType.Name(), v1alpha1.CapacityTypeOnDemand)) + spotIceZones, hasSpotIces := p.iceCache.Get(createIceCacheKey(instanceType.Name(), v1alpha1.CapacityTypeSpot)) + if !hasODIces && !hasSpotIces { + cleanedInstanceTypes = append(cleanedInstanceTypes, &instanceTypes[index]) + continue + } + cleanedOfferings := []cloudprovider.Offering{} + for _, offering := range instanceType.Offerings() { + if hasODIces && offering.CapacityType == v1alpha1.CapacityTypeOnDemand { + if !odIceZones.(sets.String).Has(offering.Zone) { + cleanedOfferings = append(cleanedOfferings, offering) + } + } else if hasSpotIces { + if !spotIceZones.(sets.String).Has(offering.Zone) { + cleanedOfferings = append(cleanedOfferings, offering) + } + } + } + if len(cleanedOfferings) > 0 { + instanceType.AvailableOfferings = cleanedOfferings + cleanedInstanceTypes = append(cleanedInstanceTypes, &instanceTypes[index]) + } + } + return cleanedInstanceTypes +} + func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int) ([]*string, error) { // Default to on-demand unless constrained otherwise or if flexible to spot and // on-demand. This code assumes two options: {spot, on-demand}, which is enforced @@ -143,6 +197,13 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a if err != nil { return nil, fmt.Errorf("creating fleet %w", err) } + if len(createFleetOutput.Errors) > 0 { + for _, fleetError := range createFleetOutput.Errors { + if isInsufficientCapacityCode(*fleetError.ErrorCode) { + p.addToIceCache(fleetError.LaunchTemplateAndOverrides.Overrides, capacityType) + } + } + } instanceIds := combineFleetInstances(*createFleetOutput) if len(instanceIds) == 0 { return nil, combineFleetErrors(createFleetOutput.Errors) @@ -155,7 +216,7 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, capacityType string) ([]*ec2.FleetLaunchTemplateConfigRequest, error) { // Get subnets given the constraints - subnets, err := p.subnetProvider.Get(ctx, constraints) + subnets, err := p.subnetProvider.Get(ctx, constraints.AWS) if err != nil { return nil, fmt.Errorf("getting subnets, %w", err) } @@ -166,7 +227,7 @@ func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constra } for launchTemplateName, instanceTypes := range launchTemplates { launchTemplateConfigs = append(launchTemplateConfigs, &ec2.FleetLaunchTemplateConfigRequest{ - Overrides: p.getOverrides(instanceTypes, subnets, capacityType), + Overrides: p.getOverrides(instanceTypes, subnets, constraints.Requirements.Zones(), capacityType), LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{ LaunchTemplateName: aws.String(launchTemplateName), Version: aws.String("$Default"), @@ -176,7 +237,7 @@ func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constra return launchTemplateConfigs, nil } -func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.InstanceType, subnets []*ec2.Subnet, capacityType string) []*ec2.FleetLaunchTemplateOverridesRequest { +func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.InstanceType, subnets []*ec2.Subnet, constrainedZones sets.String, capacityType string) []*ec2.FleetLaunchTemplateOverridesRequest { var overrides []*ec2.FleetLaunchTemplateOverridesRequest for i, instanceType := range instanceTypeOptions { for _, offering := range instanceType.Offerings() { @@ -185,10 +246,14 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst continue } for _, subnet := range subnets { - if aws.StringValue(subnet.AvailabilityZone) == offering.Zone { + subnetZone := aws.StringValue(subnet.AvailabilityZone) + if subnetZone == offering.Zone && (constrainedZones == nil || constrainedZones.Has(subnetZone)) { override := &ec2.FleetLaunchTemplateOverridesRequest{ InstanceType: aws.String(instanceType.Name()), SubnetId: subnet.SubnetId, + // This is technically redundant, but is useful if we have to parse ICEs from CreateFleet + // in figuring out the zone rather than additional API calls to look up the subnet + AvailabilityZone: subnet.AvailabilityZone, } // Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy // to reduce the likelihood of getting an excessively large instance type. @@ -267,6 +332,22 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes return nil, fmt.Errorf("unrecognized instance type %s", aws.StringValue(instance.InstanceType)) } +func (p *InstanceProvider) addToIceCache(overrides *ec2.FleetLaunchTemplateOverrides, capacityType string) { + instanceType := aws.StringValue(overrides.InstanceType) + zone := aws.StringValue(overrides.AvailabilityZone) + cacheTTL := iceCacheSpotTTL + if capacityType == v1alpha1.CapacityTypeOnDemand { + cacheTTL = iceCacheODTTL + } + cacheKey := createIceCacheKey(instanceType, capacityType) + if zones, exists := p.iceCache.Get(cacheKey); exists { + zones.(sets.String).Insert(zone) + p.iceCache.Set(cacheKey, zones, cacheTTL) + } else { + p.iceCache.Set(cacheKey, sets.String{zone: sets.Empty{}}, cacheTTL) + } +} + func getInstanceID(node *v1.Node) (*string, error) { id := strings.Split(node.Spec.ProviderID, "/") if len(id) < 5 { @@ -309,3 +390,7 @@ func combineReservations(reservations []*ec2.Reservation) []*ec2.Instance { } return instances } + +func createIceCacheKey(instanceType string, capacityType string) string { + return fmt.Sprintf("%s-%s", instanceType, capacityType) +} diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index 95f70883a8da..f58f36076c73 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -50,15 +50,15 @@ func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvi } } -// Get instance type options given the constraints -func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints) ([]cloudprovider.InstanceType, error) { +// Get all instance type options (the constraints are only used for tag filtering on subnets, not for Requirements filtering) +func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints) ([]InstanceType, error) { // Get InstanceTypes from EC2 instanceTypes, err := p.getInstanceTypes(ctx) if err != nil { return nil, err } // Get Viable AZs from subnets - subnets, err := p.subnetProvider.Get(ctx, constraints) + subnets, err := p.subnetProvider.Get(ctx, constraints.AWS) if err != nil { return nil, err } @@ -71,10 +71,8 @@ func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Co if err != nil { return nil, err } - // Convert to cloudprovider.InstanceType - result := []cloudprovider.InstanceType{} + result := []InstanceType{} for _, instanceType := range instanceTypes { - //TODO filter out possible zones and capacity types using an ICE cache https://github.com/aws/karpenter/issues/371 offerings := []cloudprovider.Offering{} for zone := range subnetZones.Intersection(instanceTypeZones[instanceType.Name()]) { // while usage classes should be a distinct set, there's no guarantee of that @@ -83,7 +81,7 @@ func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Co } } instanceType.AvailableOfferings = offerings - result = append(result, instanceType) + result = append(result, *instanceType) } return result, nil } diff --git a/pkg/cloudprovider/aws/subnets.go b/pkg/cloudprovider/aws/subnets.go index 4f3b2ce63b94..5f9fecd8effa 100644 --- a/pkg/cloudprovider/aws/subnets.go +++ b/pkg/cloudprovider/aws/subnets.go @@ -40,7 +40,7 @@ func NewSubnetProvider(ec2api ec2iface.EC2API) *SubnetProvider { } } -func (s *SubnetProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints) ([]*ec2.Subnet, error) { +func (s *SubnetProvider) Get(ctx context.Context, constraints *v1alpha1.AWS) ([]*ec2.Subnet, error) { filters := getFilters(constraints) hash, err := hashstructure.Hash(filters, hashstructure.FormatV2, nil) if err != nil { @@ -61,15 +61,9 @@ func (s *SubnetProvider) Get(ctx context.Context, constraints *v1alpha1.Constrai return output.Subnets, nil } -func getFilters(constraints *v1alpha1.Constraints) []*ec2.Filter { +func getFilters(constraints *v1alpha1.AWS) []*ec2.Filter { filters := []*ec2.Filter{} - // Filter by zone - if zones := constraints.Requirements.Zones(); zones != nil { - filters = append(filters, &ec2.Filter{ - Name: aws.String("availability-zone"), - Values: aws.StringSlice(zones.UnsortedList()), - }) - } + // Filter by subnet for key, value := range constraints.SubnetSelector { if value == "*" { diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 6cf8dfc218a1..11b1748a11d9 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/Pallinder/go-randomdata" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -77,6 +78,7 @@ var _ = BeforeSuite(func() { securityGroupProvider: NewSecurityGroupProvider(fakeEC2API), cache: launchTemplateCache, }, + cache.New(5*time.Second, 30*time.Second), }, creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } @@ -269,9 +271,9 @@ var _ = Describe("Allocation", func() { input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) Expect(input.LaunchTemplateConfigs[0].Overrides).To(ContainElements( - &ec2.FleetLaunchTemplateOverridesRequest{SubnetId: aws.String("test-subnet-1"), InstanceType: aws.String("m5.large")}, - &ec2.FleetLaunchTemplateOverridesRequest{SubnetId: aws.String("test-subnet-2"), InstanceType: aws.String("m5.large")}, - &ec2.FleetLaunchTemplateOverridesRequest{SubnetId: aws.String("test-subnet-3"), InstanceType: aws.String("m5.large")}, + &ec2.FleetLaunchTemplateOverridesRequest{SubnetId: aws.String("test-subnet-1"), InstanceType: aws.String("m5.large"), AvailabilityZone: aws.String("test-zone-1a")}, + &ec2.FleetLaunchTemplateOverridesRequest{SubnetId: aws.String("test-subnet-2"), InstanceType: aws.String("m5.large"), AvailabilityZone: aws.String("test-zone-1b")}, + &ec2.FleetLaunchTemplateOverridesRequest{SubnetId: aws.String("test-subnet-3"), InstanceType: aws.String("m5.large"), AvailabilityZone: aws.String("test-zone-1c")}, )) }) }) From fa3ebfab4f3c9b98de6e3d9156c623775c701140 Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Tue, 23 Nov 2021 00:59:50 +0000 Subject: [PATCH 02/11] Addressed first round of comments - rebase --- pkg/cloudprovider/aws/cloudprovider.go | 14 ++-- pkg/cloudprovider/aws/errors.go | 9 +- pkg/cloudprovider/aws/instance.go | 112 +++++++++++++++---------- 3 files changed, 81 insertions(+), 54 deletions(-) diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index 32cc0a3224e6..4a47df44809f 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -24,6 +24,7 @@ import ( "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ssm" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" @@ -79,7 +80,7 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud return &CloudProvider{ instanceTypeProvider: instanceTypeProvider, subnetProvider: subnetProvider, - instanceProvider: NewInstanceProvider(ec2api, instanceTypeProvider, subnetProvider, sess, options.ClientSet), + instanceProvider: NewInstanceProvider(ec2api, instanceTypeProvider, subnetProvider, ssm.New(sess), options.ClientSet), creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } } @@ -125,18 +126,17 @@ func (c *CloudProvider) create(ctx context.Context, constraints *v1alpha5.Constr return errs } -// Despite accepting a Constraints struct, note that it does not utilize Requirements at all and instead -// returns all available InstanceTypes. +// GetInstanceTypes returns all available InstanceTypes despite accepting a Constraints struct (note that it does not utilize Requirements) func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) { vendorConstraints, err := v1alpha1.Deserialize(constraints) if err != nil { return nil, apis.ErrGeneric(err.Error()) } - allInstanceTypes, err := c.instanceTypeProvider.Get(ctx, vendorConstraints) + instanceTypes, err := c.instanceTypeProvider.Get(ctx, vendorConstraints) if err != nil { return nil, err } - return c.instanceProvider.DiscardICEdInstanceTypes(ctx, allInstanceTypes), nil + return c.instanceProvider.WithoutUnavailableOfferings(ctx, instanceTypes), nil } func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error { @@ -156,11 +156,11 @@ func (c *CloudProvider) Validate(ctx context.Context, constraints *v1alpha5.Cons func (c *CloudProvider) Default(ctx context.Context, constraints *v1alpha5.Constraints) { vendorConstraints, err := v1alpha1.Deserialize(constraints) if err != nil { - logging.FromContext(ctx).Errorf("Failed to deserialize provider, %s", err.Error()) + logging.FromContext(ctx).Fatalf("Failed to deserialize provider, %s", err.Error()) return } vendorConstraints.Default(ctx) if err := vendorConstraints.Serialize(constraints); err != nil { - logging.FromContext(ctx).Errorf("Failed to serialize provider, %s", err.Error()) + logging.FromContext(ctx).Fatalf("Failed to serialize provider, %s", err.Error()) } } diff --git a/pkg/cloudprovider/aws/errors.go b/pkg/cloudprovider/aws/errors.go index 77425047923d..cab42c1c2365 100644 --- a/pkg/cloudprovider/aws/errors.go +++ b/pkg/cloudprovider/aws/errors.go @@ -25,9 +25,12 @@ var ( "InvalidInstanceID.NotFound", "InvalidLaunchTemplateName.NotFoundException", } - iceErrorCode = "InsufficientInstanceCapacity" ) +// InsufficientCapacityErrorCode indicates that EC2 is temporarily lacking capacity for this +// instance type and availability zone combination +const InsufficientCapacityErrorCode = "InsufficientInstanceCapacity" + // isNotFound returns true if the err is an AWS error (even if it's // wrapped) and is a known to mean "not found" (as opposed to a more // serious or unexpected error) @@ -38,7 +41,3 @@ func isNotFound(err error) bool { } return false } - -func isInsufficientCapacityCode(errorCode string) bool { - return errorCode == iceErrorCode -} diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 9be88529df91..7a080901f204 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -22,7 +22,6 @@ import ( "github.com/avast/retry-go" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/aws-sdk-go/service/ssm" @@ -37,32 +36,38 @@ import ( "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" + "github.com/aws/karpenter/pkg/utils/injectabletime" "github.com/aws/karpenter/pkg/utils/injection" ) const ( - iceCacheODTTL = 15 * time.Second - iceCacheSpotTTL = 45 * time.Second - iceCacheCleanupInterval = 5 * time.Minute + InsufficientCapacityErrorCacheOnDemandTTL = 15 * time.Second + InsufficientCapacityErrorCacheSpotTTL = 45 * time.Second + InsufficientCapacityErrorCacheCleanupInterval = 5 * time.Minute ) +type CachedZones struct { + zones sets.String + initialCacheTime time.Time +} + type InstanceProvider struct { ec2api ec2iface.EC2API instanceTypeProvider *InstanceTypeProvider subnetProvider *SubnetProvider launchTemplateProvider *LaunchTemplateProvider - iceCache *cache.Cache + unavailableOfferings *cache.Cache } -func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, sess *session.Session, +func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, ssm *ssm.SSM, clientSet *kubernetes.Clientset) *InstanceProvider { return &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider, NewLaunchTemplateProvider( ec2api, - NewAMIProvider(ssm.New(sess), clientSet), + NewAMIProvider(ssm, clientSet), NewSecurityGroupProvider(ec2api), ), - cache.New(iceCacheSpotTTL, iceCacheCleanupInterval), + cache.New(-1, InsufficientCapacityErrorCacheCleanupInterval), } } @@ -128,32 +133,35 @@ func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error { return nil } -// The intention is to remove Offerings from the provided possible InstanceTypes based on recently observed ICEs, which will -// indirectly impact the packer to keep it from spinning its wheels on packings that are doomed to fail due to EC2 capacity constraints. -func (p *InstanceProvider) DiscardICEdInstanceTypes(ctx context.Context, instanceTypes []InstanceType) []cloudprovider.InstanceType { +// WithoutUnavailableOfferings will remove Offerings from the provided possible InstanceTypes based on recently observed +// insufficient capacity errors, which will indirectly impact the packer to keep it from spending time on packings that are +// likely to fail due to short-term EC2 capacity constraints. +func (p *InstanceProvider) WithoutUnavailableOfferings(ctx context.Context, instanceTypes []InstanceType) []cloudprovider.InstanceType { var cleanedInstanceTypes = []cloudprovider.InstanceType{} - for index, instanceType := range instanceTypes { - odIceZones, hasODIces := p.iceCache.Get(createIceCacheKey(instanceType.Name(), v1alpha1.CapacityTypeOnDemand)) - spotIceZones, hasSpotIces := p.iceCache.Get(createIceCacheKey(instanceType.Name(), v1alpha1.CapacityTypeSpot)) + for _, instanceType := range instanceTypes { + currInstanceType := instanceType + cpInstanceType := cloudprovider.InstanceType(&currInstanceType) + odIceZones, hasODIces := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(instanceType.Name(), v1alpha1.CapacityTypeOnDemand)) + spotIceZones, hasSpotIces := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(instanceType.Name(), v1alpha1.CapacityTypeSpot)) if !hasODIces && !hasSpotIces { - cleanedInstanceTypes = append(cleanedInstanceTypes, &instanceTypes[index]) + cleanedInstanceTypes = append(cleanedInstanceTypes, cpInstanceType) continue } cleanedOfferings := []cloudprovider.Offering{} - for _, offering := range instanceType.Offerings() { - if hasODIces && offering.CapacityType == v1alpha1.CapacityTypeOnDemand { - if !odIceZones.(sets.String).Has(offering.Zone) { + for _, offering := range currInstanceType.Offerings() { + if offering.CapacityType == v1alpha1.CapacityTypeOnDemand { + if !hasODIces || !odIceZones.(CachedZones).zones.Has(offering.Zone) { cleanedOfferings = append(cleanedOfferings, offering) } - } else if hasSpotIces { - if !spotIceZones.(sets.String).Has(offering.Zone) { + } else { + if !hasSpotIces || !spotIceZones.(CachedZones).zones.Has(offering.Zone) { cleanedOfferings = append(cleanedOfferings, offering) } } } if len(cleanedOfferings) > 0 { instanceType.AvailableOfferings = cleanedOfferings - cleanedInstanceTypes = append(cleanedInstanceTypes, &instanceTypes[index]) + cleanedInstanceTypes = append(cleanedInstanceTypes, cpInstanceType) } } return cleanedInstanceTypes @@ -197,13 +205,13 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a if err != nil { return nil, fmt.Errorf("creating fleet %w", err) } - if len(createFleetOutput.Errors) > 0 { - for _, fleetError := range createFleetOutput.Errors { - if isInsufficientCapacityCode(*fleetError.ErrorCode) { - p.addToIceCache(fleetError.LaunchTemplateAndOverrides.Overrides, capacityType) - } + insufficientCapacityOfferings := map[string]sets.String{} + for _, err := range createFleetOutput.Errors { + if InsufficientCapacityErrorCode == aws.StringValue(err.ErrorCode) { + createOrAppendToMapValue(insufficientCapacityOfferings, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone)) } } + p.updateUnavailableOfferingsCache(ctx, insufficientCapacityOfferings, capacityType) instanceIds := combineFleetInstances(*createFleetOutput) if len(instanceIds) == 0 { return nil, combineFleetErrors(createFleetOutput.Errors) @@ -247,12 +255,12 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst } for _, subnet := range subnets { subnetZone := aws.StringValue(subnet.AvailabilityZone) - if subnetZone == offering.Zone && (constrainedZones == nil || constrainedZones.Has(subnetZone)) { + if subnetZone == offering.Zone && constrainedZones.Has(subnetZone) { override := &ec2.FleetLaunchTemplateOverridesRequest{ InstanceType: aws.String(instanceType.Name()), SubnetId: subnet.SubnetId, - // This is technically redundant, but is useful if we have to parse ICEs from CreateFleet - // in figuring out the zone rather than additional API calls to look up the subnet + // This is technically redundant, but is useful if we have to parse insufficient capacity errors from + // CreateFleet so that we can figure out the zone rather than additional API calls to look up the subnet AvailabilityZone: subnet.AvailabilityZone, } // Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy @@ -332,19 +340,30 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes return nil, fmt.Errorf("unrecognized instance type %s", aws.StringValue(instance.InstanceType)) } -func (p *InstanceProvider) addToIceCache(overrides *ec2.FleetLaunchTemplateOverrides, capacityType string) { - instanceType := aws.StringValue(overrides.InstanceType) - zone := aws.StringValue(overrides.AvailabilityZone) - cacheTTL := iceCacheSpotTTL +func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, offerings map[string]sets.String, capacityType string) { + cacheTTL := InsufficientCapacityErrorCacheSpotTTL if capacityType == v1alpha1.CapacityTypeOnDemand { - cacheTTL = iceCacheODTTL + cacheTTL = InsufficientCapacityErrorCacheOnDemandTTL } - cacheKey := createIceCacheKey(instanceType, capacityType) - if zones, exists := p.iceCache.Get(cacheKey); exists { - zones.(sets.String).Insert(zone) - p.iceCache.Set(cacheKey, zones, cacheTTL) - } else { - p.iceCache.Set(cacheKey, sets.String{zone: sets.Empty{}}, cacheTTL) + + for instanceType, zones := range offerings { + cacheKey := unavailableOfferingsCacheKey(instanceType, capacityType) + logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zones: %s, capacityType: %s }, avoiding it for %s", + InsufficientCapacityErrorCode, + instanceType, + zones, + capacityType, + cacheTTL) + // because we don't track a cache TTL for each zone and don't remove individual zones from the cache we need to mitigate the risk of a zone + // staying in the cache forever, so we will only add to an existing cached entry within a certain time period (after expiry, we just replace the cache + // entry and reset the clock on it) + if existingZones, exists := p.unavailableOfferings.Get(cacheKey); exists && existingZones.(CachedZones).initialCacheTime.UTC().Add(InsufficientCapacityErrorCacheCleanupInterval).Before(injectabletime.Now()) { + existingZones.(CachedZones).zones.Insert(zones.UnsortedList()...) + // though we're updating existingZones already, we still need to call Set to update the cached entry's TTL + p.unavailableOfferings.Set(cacheKey, existingZones, cacheTTL) + } else { + p.unavailableOfferings.Set(cacheKey, CachedZones{zones: zones, initialCacheTime: injectabletime.Now()}, cacheTTL) + } } } @@ -391,6 +410,15 @@ func combineReservations(reservations []*ec2.Reservation) []*ec2.Instance { return instances } -func createIceCacheKey(instanceType string, capacityType string) string { - return fmt.Sprintf("%s-%s", instanceType, capacityType) +func unavailableOfferingsCacheKey(instanceType string, capacityType string) string { + return fmt.Sprintf("%s:%s", instanceType, capacityType) +} + +func createOrAppendToMapValue(mapToUpdate map[string]sets.String, key string, newValue string) { + existingValueSet, hasValue := mapToUpdate[key] + if hasValue { + existingValueSet.Insert(newValue) + } else { + mapToUpdate[key] = sets.String{newValue: sets.Empty{}} + } } From f13ee96babb846e33679848b692c22e69b00db9b Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Tue, 23 Nov 2021 01:23:55 +0000 Subject: [PATCH 03/11] Minor logging update --- pkg/cloudprovider/aws/instance.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 7a080901f204..2d0b1e8c61fe 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -348,18 +348,18 @@ func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, for instanceType, zones := range offerings { cacheKey := unavailableOfferingsCacheKey(instanceType, capacityType) - logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zones: %s, capacityType: %s }, avoiding it for %s", + logging.FromContext(ctx).Debugf("Saw %s for offering(s) { instanceType: %s, zone(s): %s, capacityType: %s }, avoiding for %s", InsufficientCapacityErrorCode, instanceType, zones, capacityType, cacheTTL) // because we don't track a cache TTL for each zone and don't remove individual zones from the cache we need to mitigate the risk of a zone - // staying in the cache forever, so we will only add to an existing cached entry within a certain time period (after expiry, we just replace the cache - // entry and reset the clock on it) + // staying in the cache indefinitely, so we will only append a zone to an existing cached entry within a certain time period (after expiry, + // we replace the whole cache entry and reset the clock on it) if existingZones, exists := p.unavailableOfferings.Get(cacheKey); exists && existingZones.(CachedZones).initialCacheTime.UTC().Add(InsufficientCapacityErrorCacheCleanupInterval).Before(injectabletime.Now()) { existingZones.(CachedZones).zones.Insert(zones.UnsortedList()...) - // though we're updating existingZones already, we still need to call Set to update the cached entry's TTL + // though we're updating existingZones already, we still need to call Set to extend the cached entry's TTL p.unavailableOfferings.Set(cacheKey, existingZones, cacheTTL) } else { p.unavailableOfferings.Set(cacheKey, CachedZones{zones: zones, initialCacheTime: injectabletime.Now()}, cacheTTL) From 5a9cb81eed7ce4a22dd64e2dc8009fe764e9aa2d Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Tue, 23 Nov 2021 20:41:28 +0000 Subject: [PATCH 04/11] Move ICE caching logic to InstanceTypeProvider --- pkg/cloudprovider/aws/cloudprovider.go | 10 +- pkg/cloudprovider/aws/instance.go | 133 ++++++------------------- pkg/cloudprovider/aws/instancetypes.go | 77 +++++++++++--- pkg/cloudprovider/aws/suite_test.go | 2 - 4 files changed, 95 insertions(+), 127 deletions(-) diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index 4a47df44809f..f34e8c37e0e2 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -132,11 +132,7 @@ func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alp if err != nil { return nil, apis.ErrGeneric(err.Error()) } - instanceTypes, err := c.instanceTypeProvider.Get(ctx, vendorConstraints) - if err != nil { - return nil, err - } - return c.instanceProvider.WithoutUnavailableOfferings(ctx, instanceTypes), nil + return c.instanceTypeProvider.Get(ctx, vendorConstraints) } func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error { @@ -156,11 +152,11 @@ func (c *CloudProvider) Validate(ctx context.Context, constraints *v1alpha5.Cons func (c *CloudProvider) Default(ctx context.Context, constraints *v1alpha5.Constraints) { vendorConstraints, err := v1alpha1.Deserialize(constraints) if err != nil { - logging.FromContext(ctx).Fatalf("Failed to deserialize provider, %s", err.Error()) + logging.FromContext(ctx).Errorf("Failed to deserialize provider, %s", err.Error()) return } vendorConstraints.Default(ctx) if err := vendorConstraints.Serialize(constraints); err != nil { - logging.FromContext(ctx).Fatalf("Failed to serialize provider, %s", err.Error()) + logging.FromContext(ctx).Errorf("Failed to serialize provider, %s", err.Error()) } } diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 2d0b1e8c61fe..006515e2040f 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -25,7 +25,6 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/aws-sdk-go/service/ssm" - "github.com/patrickmn/go-cache" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,27 +35,14 @@ import ( "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/cloudprovider" "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" - "github.com/aws/karpenter/pkg/utils/injectabletime" "github.com/aws/karpenter/pkg/utils/injection" ) -const ( - InsufficientCapacityErrorCacheOnDemandTTL = 15 * time.Second - InsufficientCapacityErrorCacheSpotTTL = 45 * time.Second - InsufficientCapacityErrorCacheCleanupInterval = 5 * time.Minute -) - -type CachedZones struct { - zones sets.String - initialCacheTime time.Time -} - type InstanceProvider struct { ec2api ec2iface.EC2API instanceTypeProvider *InstanceTypeProvider subnetProvider *SubnetProvider launchTemplateProvider *LaunchTemplateProvider - unavailableOfferings *cache.Cache } func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, ssm *ssm.SSM, @@ -67,7 +53,6 @@ func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceT NewAMIProvider(ssm, clientSet), NewSecurityGroupProvider(ec2api), ), - cache.New(-1, InsufficientCapacityErrorCacheCleanupInterval), } } @@ -133,40 +118,6 @@ func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error { return nil } -// WithoutUnavailableOfferings will remove Offerings from the provided possible InstanceTypes based on recently observed -// insufficient capacity errors, which will indirectly impact the packer to keep it from spending time on packings that are -// likely to fail due to short-term EC2 capacity constraints. -func (p *InstanceProvider) WithoutUnavailableOfferings(ctx context.Context, instanceTypes []InstanceType) []cloudprovider.InstanceType { - var cleanedInstanceTypes = []cloudprovider.InstanceType{} - for _, instanceType := range instanceTypes { - currInstanceType := instanceType - cpInstanceType := cloudprovider.InstanceType(&currInstanceType) - odIceZones, hasODIces := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(instanceType.Name(), v1alpha1.CapacityTypeOnDemand)) - spotIceZones, hasSpotIces := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(instanceType.Name(), v1alpha1.CapacityTypeSpot)) - if !hasODIces && !hasSpotIces { - cleanedInstanceTypes = append(cleanedInstanceTypes, cpInstanceType) - continue - } - cleanedOfferings := []cloudprovider.Offering{} - for _, offering := range currInstanceType.Offerings() { - if offering.CapacityType == v1alpha1.CapacityTypeOnDemand { - if !hasODIces || !odIceZones.(CachedZones).zones.Has(offering.Zone) { - cleanedOfferings = append(cleanedOfferings, offering) - } - } else { - if !hasSpotIces || !spotIceZones.(CachedZones).zones.Has(offering.Zone) { - cleanedOfferings = append(cleanedOfferings, offering) - } - } - } - if len(cleanedOfferings) > 0 { - instanceType.AvailableOfferings = cleanedOfferings - cleanedInstanceTypes = append(cleanedInstanceTypes, cpInstanceType) - } - } - return cleanedInstanceTypes -} - func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int) ([]*string, error) { // Default to on-demand unless constrained otherwise or if flexible to spot and // on-demand. This code assumes two options: {spot, on-demand}, which is enforced @@ -205,13 +156,7 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a if err != nil { return nil, fmt.Errorf("creating fleet %w", err) } - insufficientCapacityOfferings := map[string]sets.String{} - for _, err := range createFleetOutput.Errors { - if InsufficientCapacityErrorCode == aws.StringValue(err.ErrorCode) { - createOrAppendToMapValue(insufficientCapacityOfferings, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone)) - } - } - p.updateUnavailableOfferingsCache(ctx, insufficientCapacityOfferings, capacityType) + p.updateUnavailableOfferingsCache(ctx, createFleetOutput.Errors, capacityType) instanceIds := combineFleetInstances(*createFleetOutput) if len(instanceIds) == 0 { return nil, combineFleetErrors(createFleetOutput.Errors) @@ -245,7 +190,9 @@ func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constra return launchTemplateConfigs, nil } -func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.InstanceType, subnets []*ec2.Subnet, constrainedZones sets.String, capacityType string) []*ec2.FleetLaunchTemplateOverridesRequest { +// getOverrides creates and returns launch template overrides for the cross product of instanceTypeOptions and subnets (with subnets being constrained by +// zones and the offerings in instanceTypeOptions) +func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.InstanceType, subnets []*ec2.Subnet, zones sets.String, capacityType string) []*ec2.FleetLaunchTemplateOverridesRequest { var overrides []*ec2.FleetLaunchTemplateOverridesRequest for i, instanceType := range instanceTypeOptions { for _, offering := range instanceType.Offerings() { @@ -255,24 +202,25 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst } for _, subnet := range subnets { subnetZone := aws.StringValue(subnet.AvailabilityZone) - if subnetZone == offering.Zone && constrainedZones.Has(subnetZone) { - override := &ec2.FleetLaunchTemplateOverridesRequest{ - InstanceType: aws.String(instanceType.Name()), - SubnetId: subnet.SubnetId, - // This is technically redundant, but is useful if we have to parse insufficient capacity errors from - // CreateFleet so that we can figure out the zone rather than additional API calls to look up the subnet - AvailabilityZone: subnet.AvailabilityZone, - } - // Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy - // to reduce the likelihood of getting an excessively large instance type. - // instanceTypeOptions are sorted by vcpus and memory so this prioritizes smaller instance types. - if capacityType == v1alpha1.CapacityTypeSpot { - override.Priority = aws.Float64(float64(i)) - } - overrides = append(overrides, override) - // FleetAPI cannot span subnets from the same AZ, so break after the first one. - break + if subnetZone != offering.Zone || !zones.Has(offering.Zone) { + continue } + override := &ec2.FleetLaunchTemplateOverridesRequest{ + InstanceType: aws.String(instanceType.Name()), + SubnetId: subnet.SubnetId, + // This is technically redundant, but is useful if we have to parse insufficient capacity errors from + // CreateFleet so that we can figure out the zone rather than additional API calls to look up the subnet + AvailabilityZone: subnet.AvailabilityZone, + } + // Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy + // to reduce the likelihood of getting an excessively large instance type. + // instanceTypeOptions are sorted by vcpus and memory so this prioritizes smaller instance types. + if capacityType == v1alpha1.CapacityTypeSpot { + override.Priority = aws.Float64(float64(i)) + } + overrides = append(overrides, override) + // FleetAPI cannot span subnets from the same AZ, so break after the first one. + break } } } @@ -340,31 +288,16 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes return nil, fmt.Errorf("unrecognized instance type %s", aws.StringValue(instance.InstanceType)) } -func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, offerings map[string]sets.String, capacityType string) { - cacheTTL := InsufficientCapacityErrorCacheSpotTTL - if capacityType == v1alpha1.CapacityTypeOnDemand { - cacheTTL = InsufficientCapacityErrorCacheOnDemandTTL - } - - for instanceType, zones := range offerings { - cacheKey := unavailableOfferingsCacheKey(instanceType, capacityType) - logging.FromContext(ctx).Debugf("Saw %s for offering(s) { instanceType: %s, zone(s): %s, capacityType: %s }, avoiding for %s", - InsufficientCapacityErrorCode, - instanceType, - zones, - capacityType, - cacheTTL) - // because we don't track a cache TTL for each zone and don't remove individual zones from the cache we need to mitigate the risk of a zone - // staying in the cache indefinitely, so we will only append a zone to an existing cached entry within a certain time period (after expiry, - // we replace the whole cache entry and reset the clock on it) - if existingZones, exists := p.unavailableOfferings.Get(cacheKey); exists && existingZones.(CachedZones).initialCacheTime.UTC().Add(InsufficientCapacityErrorCacheCleanupInterval).Before(injectabletime.Now()) { - existingZones.(CachedZones).zones.Insert(zones.UnsortedList()...) - // though we're updating existingZones already, we still need to call Set to extend the cached entry's TTL - p.unavailableOfferings.Set(cacheKey, existingZones, cacheTTL) - } else { - p.unavailableOfferings.Set(cacheKey, CachedZones{zones: zones, initialCacheTime: injectabletime.Now()}, cacheTTL) +func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, errors []*ec2.CreateFleetError, capacityType string) { + insufficientCapacityOfferings := map[string]sets.String{} + for _, err := range errors { + if InsufficientCapacityErrorCode == aws.StringValue(err.ErrorCode) { + createOrAppendToMapValue(insufficientCapacityOfferings, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone)) } } + if len(insufficientCapacityOfferings) > 0 { + p.instanceTypeProvider.TrackUnavailableOfferings(ctx, insufficientCapacityOfferings, capacityType) + } } func getInstanceID(node *v1.Node) (*string, error) { @@ -410,15 +343,11 @@ func combineReservations(reservations []*ec2.Reservation) []*ec2.Instance { return instances } -func unavailableOfferingsCacheKey(instanceType string, capacityType string) string { - return fmt.Sprintf("%s:%s", instanceType, capacityType) -} - func createOrAppendToMapValue(mapToUpdate map[string]sets.String, key string, newValue string) { existingValueSet, hasValue := mapToUpdate[key] if hasValue { existingValueSet.Insert(newValue) } else { - mapToUpdate[key] = sets.String{newValue: sets.Empty{}} + mapToUpdate[key] = sets.NewString(newValue) } } diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index f58f36076c73..e4715f760cff 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -31,27 +31,35 @@ import ( ) const ( - instanceTypesCacheKey = "types" - instanceTypeZonesCacheKey = "zones" - instanceTypesAndZonesCacheTTL = 5 * time.Minute + instanceTypesCacheKey = "types" + instanceTypeZonesCacheKey = "zones" + instanceTypesAndZonesCacheTTL = 5 * time.Minute + InsufficientCapacityErrorCacheOnDemandTTL = 15 * time.Second + InsufficientCapacityErrorCacheSpotTTL = 45 * time.Second + InsufficientCapacityErrorCacheCleanupInterval = 5 * time.Minute ) type InstanceTypeProvider struct { ec2api ec2iface.EC2API subnetProvider *SubnetProvider - cache *cache.Cache + // Has two entries: one for all the instance types and one for all zones; values cached *before* considering insufficient capacity errors + // from the unavailableOfferings cache + cache *cache.Cache + // key: ::, value: struct{}{} + unavailableOfferings *cache.Cache } func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvider) *InstanceTypeProvider { return &InstanceTypeProvider{ - ec2api: ec2api, - subnetProvider: subnetProvider, - cache: cache.New(instanceTypesAndZonesCacheTTL, CacheCleanupInterval), + ec2api: ec2api, + subnetProvider: subnetProvider, + cache: cache.New(instanceTypesAndZonesCacheTTL, CacheCleanupInterval), + unavailableOfferings: cache.New(-1, InsufficientCapacityErrorCacheCleanupInterval), } } // Get all instance type options (the constraints are only used for tag filtering on subnets, not for Requirements filtering) -func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints) ([]InstanceType, error) { +func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints) ([]cloudprovider.InstanceType, error) { // Get InstanceTypes from EC2 instanceTypes, err := p.getInstanceTypes(ctx) if err != nil { @@ -71,19 +79,29 @@ func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Co if err != nil { return nil, err } - result := []InstanceType{} + result := []cloudprovider.InstanceType{} for _, instanceType := range instanceTypes { - offerings := []cloudprovider.Offering{} - for zone := range subnetZones.Intersection(instanceTypeZones[instanceType.Name()]) { - // while usage classes should be a distinct set, there's no guarantee of that - for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) { + offerings := p.createOfferings(instanceType, subnetZones, instanceTypeZones[instanceType.Name()]) + if len(offerings) > 0 { + instanceType.AvailableOfferings = offerings + result = append(result, instanceType) + } + } + return result, nil +} + +func (p *InstanceTypeProvider) createOfferings(instanceType *InstanceType, subnetZones sets.String, availableZones sets.String) []cloudprovider.Offering { + offerings := []cloudprovider.Offering{} + for zone := range subnetZones.Intersection(availableZones) { + // while usage classes should be a distinct set, there's no guarantee of that + for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) { + // exclude any offerings that have recently seen an insufficient capacity error from EC2 + if _, isUnavailable := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(capacityType, instanceType.Name(), zone)); !isUnavailable { offerings = append(offerings, cloudprovider.Offering{Zone: zone, CapacityType: capacityType}) } } - instanceType.AvailableOfferings = offerings - result = append(result, *instanceType) } - return result, nil + return offerings } func (p *InstanceTypeProvider) getInstanceTypeZones(ctx context.Context) (map[string]sets.String, error) { @@ -151,3 +169,30 @@ func (p *InstanceTypeProvider) filter(instanceType *ec2.InstanceTypeInfo) bool { "p", "inf", "g", // Accelerators ) } + +// TrackUnavailableOfferings allows the InstanceProvider to communicate recently observed temporary capacity shortages in +// the provided offerings +func (p *InstanceTypeProvider) TrackUnavailableOfferings(ctx context.Context, offerings map[string]sets.String, capacityType string) { + cacheTTL := InsufficientCapacityErrorCacheSpotTTL + if capacityType == v1alpha1.CapacityTypeOnDemand { + cacheTTL = InsufficientCapacityErrorCacheOnDemandTTL + } + + for instanceType, zones := range offerings { + for zone := range zones { + cacheKey := unavailableOfferingsCacheKey(capacityType, instanceType, zone) + logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zone: %s, capacityType: %s }, avoiding for %s", + InsufficientCapacityErrorCode, + instanceType, + zone, + capacityType, + cacheTTL) + // even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL + p.unavailableOfferings.Set(cacheKey, struct{}{}, cacheTTL) + } + } +} + +func unavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string { + return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone) +} diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 11b1748a11d9..7aa35d1ce43b 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "testing" - "time" "github.com/Pallinder/go-randomdata" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -78,7 +77,6 @@ var _ = BeforeSuite(func() { securityGroupProvider: NewSecurityGroupProvider(fakeEC2API), cache: launchTemplateCache, }, - cache.New(5*time.Second, 30*time.Second), }, creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } From cf8dc0e9f9714525addac01592522757ed953f06 Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Wed, 24 Nov 2021 09:31:37 +0000 Subject: [PATCH 05/11] Add unit tests, merge spot and OD ICE cache TTLs --- go.mod | 2 +- go.sum | 3 +- pkg/cloudprovider/aws/fake/ec2api.go | 68 ++++++++++++++++++-- pkg/cloudprovider/aws/instancetypes.go | 30 ++++----- pkg/cloudprovider/aws/suite_test.go | 88 +++++++++++++++++++++----- pkg/test/expectations/expectations.go | 34 +++++++++- 6 files changed, 180 insertions(+), 45 deletions(-) diff --git a/go.mod b/go.mod index 31ca434c2621..3b73d7e3d0d7 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/imdario/mergo v0.3.12 github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/onsi/ginkgo v1.16.5 - github.com/onsi/gomega v1.16.0 + github.com/onsi/gomega v1.17.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/prometheus/client_golang v1.11.0 go.uber.org/multierr v1.7.0 diff --git a/go.sum b/go.sum index eda50add2320..ec599af32725 100644 --- a/go.sum +++ b/go.sum @@ -472,8 +472,9 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= -github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.3.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0R4l6Zg0P1tTQ= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= diff --git a/pkg/cloudprovider/aws/fake/ec2api.go b/pkg/cloudprovider/aws/fake/ec2api.go index 8075968e2368..60361e402beb 100644 --- a/pkg/cloudprovider/aws/fake/ec2api.go +++ b/pkg/cloudprovider/aws/fake/ec2api.go @@ -44,6 +44,7 @@ type EC2Behavior struct { CalledWithCreateLaunchTemplateInput set.Set Instances sync.Map LaunchTemplates sync.Map + ShouldTriggerInsufficientCapacity bool } type EC2API struct { @@ -51,12 +52,20 @@ type EC2API struct { EC2Behavior } +// DefaultSupportedUsageClasses is a var because []*string can't be a const +var DefaultSupportedUsageClasses = aws.StringSlice([]string{"on-demand", "spot"}) + +const InsufficientCapacityInstanceType = "inf1.6xlarge" + // Reset must be called between tests otherwise tests will pollute // each other. func (e *EC2API) Reset() { e.EC2Behavior = EC2Behavior{ CalledWithCreateFleetInput: set.NewSet(), CalledWithCreateLaunchTemplateInput: set.NewSet(), + Instances: sync.Map{}, + LaunchTemplates: sync.Map{}, + ShouldTriggerInsufficientCapacity: false, } } @@ -68,6 +77,9 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee instances := []*ec2.Instance{} instanceIds := []*string{} for i := 0; i < int(*input.TargetCapacitySpecification.TotalTargetCapacity); i++ { + if e.ShouldTriggerInsufficientCapacity && aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].InstanceType) == InsufficientCapacityInstanceType { + continue + } instances = append(instances, &ec2.Instance{ InstanceId: aws.String(randomdata.SillyName()), Placement: &ec2.Placement{AvailabilityZone: aws.String("test-zone-1a")}, @@ -78,7 +90,22 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee instanceIds = append(instanceIds, instances[i].InstanceId) } - return &ec2.CreateFleetOutput{Instances: []*ec2.CreateFleetInstance{{InstanceIds: instanceIds}}}, nil + result := &ec2.CreateFleetOutput{ + Instances: []*ec2.CreateFleetInstance{{InstanceIds: instanceIds}}} + if e.ShouldTriggerInsufficientCapacity { + result.Errors = []*ec2.CreateFleetError{{ + ErrorCode: aws.String("InsufficientInstanceCapacity"), + LaunchTemplateAndOverrides: &ec2.LaunchTemplateAndOverridesResponse{ + LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecification{ + LaunchTemplateId: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateId, + LaunchTemplateName: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName}, + Overrides: &ec2.FleetLaunchTemplateOverrides{ + InstanceType: aws.String(InsufficientCapacityInstanceType), + AvailabilityZone: aws.String("test-zone-1a"), + SubnetId: aws.String("test-subnet-1")}, + }}} + } + return result, nil } func (e *EC2API) CreateLaunchTemplateWithContext(_ context.Context, input *ec2.CreateLaunchTemplateInput, _ ...request.Option) (*ec2.CreateLaunchTemplateOutput, error) { @@ -166,7 +193,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2 InstanceTypes: []*ec2.InstanceTypeInfo{ { InstanceType: aws.String("m5.large"), - SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")}, + SupportedUsageClasses: DefaultSupportedUsageClasses, SupportedVirtualizationTypes: []*string{aws.String("hvm")}, BurstablePerformanceSupported: aws.Bool(false), BareMetal: aws.Bool(false), @@ -186,7 +213,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2 }, { InstanceType: aws.String("m5.xlarge"), - SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")}, + SupportedUsageClasses: DefaultSupportedUsageClasses, SupportedVirtualizationTypes: []*string{aws.String("hvm")}, BurstablePerformanceSupported: aws.Bool(false), BareMetal: aws.Bool(false), @@ -206,7 +233,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2 }, { InstanceType: aws.String("p3.8xlarge"), - SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")}, + SupportedUsageClasses: DefaultSupportedUsageClasses, SupportedVirtualizationTypes: []*string{aws.String("hvm")}, BurstablePerformanceSupported: aws.Bool(false), BareMetal: aws.Bool(false), @@ -232,7 +259,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2 }, { InstanceType: aws.String("c6g.large"), - SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")}, + SupportedUsageClasses: DefaultSupportedUsageClasses, SupportedVirtualizationTypes: []*string{aws.String("hvm")}, BurstablePerformanceSupported: aws.Bool(false), BareMetal: aws.Bool(false), @@ -250,9 +277,34 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2 Ipv4AddressesPerInterface: aws.Int64(60), }, }, + { + InstanceType: aws.String("inf1.2xlarge"), + SupportedUsageClasses: DefaultSupportedUsageClasses, + SupportedVirtualizationTypes: []*string{aws.String("hvm")}, + BurstablePerformanceSupported: aws.Bool(false), + BareMetal: aws.Bool(false), + ProcessorInfo: &ec2.ProcessorInfo{ + SupportedArchitectures: aws.StringSlice([]string{"x86_64"}), + }, + VCpuInfo: &ec2.VCpuInfo{ + DefaultVCpus: aws.Int64(8), + }, + MemoryInfo: &ec2.MemoryInfo{ + SizeInMiB: aws.Int64(16384), + }, + InferenceAcceleratorInfo: &ec2.InferenceAcceleratorInfo{ + Accelerators: []*ec2.InferenceDeviceInfo{{ + Manufacturer: aws.String("AWS"), + Count: aws.Int64(1), + }}}, + NetworkInfo: &ec2.NetworkInfo{ + MaximumNetworkInterfaces: aws.Int64(4), + Ipv4AddressesPerInterface: aws.Int64(60), + }, + }, { InstanceType: aws.String("inf1.6xlarge"), - SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")}, + SupportedUsageClasses: DefaultSupportedUsageClasses, SupportedVirtualizationTypes: []*string{aws.String("hvm")}, BurstablePerformanceSupported: aws.Bool(false), BareMetal: aws.Bool(false), @@ -319,6 +371,10 @@ func (e *EC2API) DescribeInstanceTypeOfferingsPagesWithContext(_ context.Context InstanceType: aws.String("p3.8xlarge"), Location: aws.String("test-zone-1a"), }, + { + InstanceType: aws.String("inf1.2xlarge"), + Location: aws.String("test-zone-1a"), + }, { InstanceType: aws.String("inf1.6xlarge"), Location: aws.String("test-zone-1a"), diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index e4715f760cff..e71e7f1b96c4 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -31,11 +31,10 @@ import ( ) const ( - instanceTypesCacheKey = "types" - instanceTypeZonesCacheKey = "zones" - instanceTypesAndZonesCacheTTL = 5 * time.Minute - InsufficientCapacityErrorCacheOnDemandTTL = 15 * time.Second - InsufficientCapacityErrorCacheSpotTTL = 45 * time.Second + InstanceTypesCacheKey = "types" + InstanceTypeZonesCacheKey = "zones" + InstanceTypesAndZonesCacheTTL = 5 * time.Minute + InsufficientCapacityErrorCacheTTL = 45 * time.Second InsufficientCapacityErrorCacheCleanupInterval = 5 * time.Minute ) @@ -53,8 +52,8 @@ func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvi return &InstanceTypeProvider{ ec2api: ec2api, subnetProvider: subnetProvider, - cache: cache.New(instanceTypesAndZonesCacheTTL, CacheCleanupInterval), - unavailableOfferings: cache.New(-1, InsufficientCapacityErrorCacheCleanupInterval), + cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval), + unavailableOfferings: cache.New(InsufficientCapacityErrorCacheTTL, InsufficientCapacityErrorCacheCleanupInterval), } } @@ -105,7 +104,7 @@ func (p *InstanceTypeProvider) createOfferings(instanceType *InstanceType, subne } func (p *InstanceTypeProvider) getInstanceTypeZones(ctx context.Context) (map[string]sets.String, error) { - if cached, ok := p.cache.Get(instanceTypeZonesCacheKey); ok { + if cached, ok := p.cache.Get(InstanceTypeZonesCacheKey); ok { return cached.(map[string]sets.String), nil } zones := map[string]sets.String{} @@ -122,13 +121,13 @@ func (p *InstanceTypeProvider) getInstanceTypeZones(ctx context.Context) (map[st return nil, fmt.Errorf("describing instance type zone offerings, %w", err) } logging.FromContext(ctx).Debugf("Discovered EC2 instance types zonal offerings") - p.cache.SetDefault(instanceTypeZonesCacheKey, zones) + p.cache.SetDefault(InstanceTypeZonesCacheKey, zones) return zones, nil } // getInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) (map[string]*InstanceType, error) { - if cached, ok := p.cache.Get(instanceTypesCacheKey); ok { + if cached, ok := p.cache.Get(InstanceTypesCacheKey); ok { return cached.(map[string]*InstanceType), nil } instanceTypes := map[string]*InstanceType{} @@ -150,7 +149,7 @@ func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) (map[string return nil, fmt.Errorf("fetching instance types using ec2.DescribeInstanceTypes, %w", err) } logging.FromContext(ctx).Debugf("Discovered %d EC2 instance types", len(instanceTypes)) - p.cache.SetDefault(instanceTypesCacheKey, instanceTypes) + p.cache.SetDefault(InstanceTypesCacheKey, instanceTypes) return instanceTypes, nil } @@ -173,11 +172,6 @@ func (p *InstanceTypeProvider) filter(instanceType *ec2.InstanceTypeInfo) bool { // TrackUnavailableOfferings allows the InstanceProvider to communicate recently observed temporary capacity shortages in // the provided offerings func (p *InstanceTypeProvider) TrackUnavailableOfferings(ctx context.Context, offerings map[string]sets.String, capacityType string) { - cacheTTL := InsufficientCapacityErrorCacheSpotTTL - if capacityType == v1alpha1.CapacityTypeOnDemand { - cacheTTL = InsufficientCapacityErrorCacheOnDemandTTL - } - for instanceType, zones := range offerings { for zone := range zones { cacheKey := unavailableOfferingsCacheKey(capacityType, instanceType, zone) @@ -186,9 +180,9 @@ func (p *InstanceTypeProvider) TrackUnavailableOfferings(ctx context.Context, of instanceType, zone, capacityType, - cacheTTL) + InsufficientCapacityErrorCacheTTL) // even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL - p.unavailableOfferings.Set(cacheKey, struct{}{}, cacheTTL) + p.unavailableOfferings.SetDefault(cacheKey, struct{}{}) } } } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 7aa35d1ce43b..1d15ce9aaca0 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/Pallinder/go-randomdata" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -42,6 +43,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" . "knative.dev/pkg/logging/testing" ) @@ -53,6 +55,8 @@ var fakeEC2API *fake.EC2API var provisioners *provisioning.Controller var scheduler *scheduling.Controller +const shortenedUnavailableOfferingsTTL = 2 * time.Second + func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) RegisterFailHandler(Fail) @@ -65,7 +69,12 @@ var _ = BeforeSuite(func() { launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval) fakeEC2API = &fake.EC2API{} subnetProvider := NewSubnetProvider(fakeEC2API) - instanceTypeProvider := NewInstanceTypeProvider(fakeEC2API, subnetProvider) + instanceTypeProvider := &InstanceTypeProvider{ + ec2api: fakeEC2API, + subnetProvider: subnetProvider, + cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval), + unavailableOfferings: cache.New(shortenedUnavailableOfferingsTTL, InsufficientCapacityErrorCacheCleanupInterval), + } clientSet := kubernetes.NewForConfigOrDie(e.Config) cloudProvider := &CloudProvider{ subnetProvider: subnetProvider, @@ -110,6 +119,7 @@ var _ = Describe("Allocation", func() { Context("Reconciliation", func() { Context("Specialized Hardware", func() { It("should launch instances for Nvidia GPU resource requests", func() { + nodeNames := sets.NewString() for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{ @@ -132,17 +142,12 @@ var _ = Describe("Allocation", func() { }, })) { ExpectScheduled(ctx, env.Client, pod) + nodeNames.Insert(ExpectScheduledWithInstanceType(ctx, env.Client, pod, "p3.8xlarge").Name) } - Expect(InstancesLaunchedFrom(fakeEC2API.CalledWithCreateFleetInput.Iter())).To(Equal(2)) - overrides := []*ec2.FleetLaunchTemplateOverridesRequest{} - for i := range fakeEC2API.CalledWithCreateFleetInput.Iter() { - overrides = append(overrides, i.(*ec2.CreateFleetInput).LaunchTemplateConfigs[0].Overrides...) - } - for _, override := range overrides { - Expect(*override.InstanceType).To(Equal("p3.8xlarge")) - } + Expect(nodeNames.Len()).To(Equal(2)) }) It("should launch instances for AWS Neuron resource requests", func() { + nodeNames := sets.NewString() for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{ @@ -165,16 +170,67 @@ var _ = Describe("Allocation", func() { }, }), ) { - ExpectScheduled(ctx, env.Client, pod) + nodeNames.Insert(ExpectScheduledWithInstanceType(ctx, env.Client, pod, "inf1.6xlarge").Name) + } + Expect(nodeNames.Len()).To(Equal(2)) + }) + }) + Context("Insufficient Capacity Error Cache", func() { + It("should launch instances on second recon attempt with Insufficient Capacity Error Cache fallback", func() { + fakeEC2API.ShouldTriggerInsufficientCapacity = true + pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, + Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, + }, + }), + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, + Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, + }, + }), + ) + // it should've tried to pack them on a single inf1.6xlarge then hit an insufficient capacity error + for _, pod := range pods { + ExpectNotScheduled(ctx, env.Client, pod) } - Expect(InstancesLaunchedFrom(fakeEC2API.CalledWithCreateFleetInput.Iter())).To(Equal(2)) - overrides := []*ec2.FleetLaunchTemplateOverridesRequest{} - for input := range fakeEC2API.CalledWithCreateFleetInput.Iter() { - overrides = append(overrides, input.(*ec2.CreateFleetInput).LaunchTemplateConfigs[0].Overrides...) + nodeNames := sets.NewString() + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { + nodeNames.Insert(ExpectScheduledWithInstanceType(ctx, env.Client, pod, "inf1.2xlarge").Name) } - for _, override := range overrides { - Expect(*override.InstanceType).To(Equal("inf1.6xlarge")) + Expect(nodeNames.Len()).To(Equal(2)) + }) + It("should launch instances on later recon attempt with Insufficient Capacity Error Cache expiry", func() { + fakeEC2API.ShouldTriggerInsufficientCapacity = true + pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, + Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, + }, + }), + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, + Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, + }, + }), + ) + // it should've tried to pack them on a single inf1.6xlarge then hit an insufficient capacity error + for _, pod := range pods { + ExpectNotScheduled(ctx, env.Client, pod) } + // capacity shortage is over - wait for expiry (N.B. the Karpenter logging will not show the overridden cache expiry in this test context) + fakeEC2API.ShouldTriggerInsufficientCapacity = false + Eventually(func(g Gomega) int { + nodeNames := sets.NewString() + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { + nodeNames = nodeNames.Insert(ExpectScheduledWithInstanceTypeAndGomega(ctx, env.Client, pod, "inf1.6xlarge", g).Name) + } + return len(nodeNames) + }, shortenedUnavailableOfferingsTTL*2, RequestInterval).Should(Equal(1)) }) }) Context("CapacityType", func() { diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index fd1ed1475eca..6f02739b6343 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -41,15 +41,30 @@ const ( RequestInterval = 1 * time.Second ) +func defaultGomegaIfNil(g Gomega) Gomega { + if g != nil { + return g + } + return Default +} + func ExpectPodExists(c client.Client, name string, namespace string) *v1.Pod { + return ExpectPodExistsWithGomega(c, name, namespace, nil) +} + +func ExpectPodExistsWithGomega(c client.Client, name string, namespace string, g Gomega) *v1.Pod { pod := &v1.Pod{} - Expect(c.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, pod)).To(Succeed()) + defaultGomegaIfNil(g).Expect(c.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, pod)).To(Succeed()) return pod } func ExpectNodeExists(c client.Client, name string) *v1.Node { + return ExpectNodeExistsWithGomega(c, name, nil) +} + +func ExpectNodeExistsWithGomega(c client.Client, name string, g Gomega) *v1.Node { node := &v1.Node{} - Expect(c.Get(context.Background(), client.ObjectKey{Name: name}, node)).To(Succeed()) + defaultGomegaIfNil(g).Expect(c.Get(context.Background(), client.ObjectKey{Name: name}, node)).To(Succeed()) return node } @@ -65,10 +80,23 @@ func ExpectNotFound(c client.Client, objects ...client.Object) { func ExpectScheduled(ctx context.Context, c client.Client, pod *v1.Pod) *v1.Node { p := ExpectPodExists(c, pod.Name, pod.Namespace) - Expect(p.Spec.NodeName).ToNot(BeEmpty(), fmt.Sprintf("expected %s/%s to scheduled", pod.Namespace, pod.Name)) + Expect(p.Spec.NodeName).ToNot(BeEmpty(), fmt.Sprintf("expected %s/%s to be scheduled", pod.Namespace, pod.Name)) return ExpectNodeExists(c, p.Spec.NodeName) } +func ExpectScheduledWithInstanceType(ctx context.Context, c client.Client, pod *v1.Pod, instanceType string) *v1.Node { + return ExpectScheduledWithInstanceTypeAndGomega(ctx, c, pod, instanceType, nil) +} + +func ExpectScheduledWithInstanceTypeAndGomega(ctx context.Context, c client.Client, pod *v1.Pod, instanceType string, g Gomega) *v1.Node { + p := ExpectPodExistsWithGomega(c, pod.Name, pod.Namespace, defaultGomegaIfNil(g)) + defaultGomegaIfNil(g).Expect(p.Spec.NodeName).ToNot(BeEmpty(), fmt.Sprintf("expected %s/%s to be scheduled", pod.Namespace, pod.Name)) + node := ExpectNodeExistsWithGomega(c, p.Spec.NodeName, defaultGomegaIfNil(g)) + defaultGomegaIfNil(g).Expect(node.Labels["node.kubernetes.io/instance-type"]).To(Equal(instanceType), + fmt.Sprintf("expected %s/%s to be scheduled to a node with instance type %s", pod.Namespace, pod.Name, instanceType)) + return node +} + func ExpectNotScheduled(ctx context.Context, c client.Client, pod *v1.Pod) { p := ExpectPodExists(c, pod.Name, pod.Namespace) Eventually(p.Spec.NodeName).Should(BeEmpty(), fmt.Sprintf("expected %s/%s to not be scheduled", pod.Namespace, pod.Name)) From b056e59c8cdc9fadd7abf91e71458879362d2b40 Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Thu, 25 Nov 2021 01:06:42 +0000 Subject: [PATCH 06/11] Address last round of comments, mostly stylistic changes --- pkg/cloudprovider/aws/cloudprovider.go | 12 ++++-- pkg/cloudprovider/aws/fake/ec2api.go | 51 +++++++++++++++++--------- pkg/cloudprovider/aws/instance.go | 36 +++--------------- pkg/cloudprovider/aws/instancetypes.go | 34 ++++++++--------- pkg/cloudprovider/aws/suite_test.go | 6 +-- 5 files changed, 67 insertions(+), 72 deletions(-) diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index f34e8c37e0e2..bbea07b56f22 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -80,8 +80,14 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud return &CloudProvider{ instanceTypeProvider: instanceTypeProvider, subnetProvider: subnetProvider, - instanceProvider: NewInstanceProvider(ec2api, instanceTypeProvider, subnetProvider, ssm.New(sess), options.ClientSet), - creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), + instanceProvider: &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider, + NewLaunchTemplateProvider( + ec2api, + NewAMIProvider(ssm.New(sess), options.ClientSet), + NewSecurityGroupProvider(ec2api), + ), + }, + creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } } @@ -132,7 +138,7 @@ func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alp if err != nil { return nil, apis.ErrGeneric(err.Error()) } - return c.instanceTypeProvider.Get(ctx, vendorConstraints) + return c.instanceTypeProvider.Get(ctx, vendorConstraints.AWS) } func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error { diff --git a/pkg/cloudprovider/aws/fake/ec2api.go b/pkg/cloudprovider/aws/fake/ec2api.go index 60361e402beb..3b1e6ce0fc01 100644 --- a/pkg/cloudprovider/aws/fake/ec2api.go +++ b/pkg/cloudprovider/aws/fake/ec2api.go @@ -30,6 +30,11 @@ import ( set "github.com/deckarep/golang-set" ) +type CapacityPool struct { + InstanceType string + Zone string +} + // EC2Behavior must be reset between tests otherwise tests will // pollute each other. type EC2Behavior struct { @@ -44,7 +49,7 @@ type EC2Behavior struct { CalledWithCreateLaunchTemplateInput set.Set Instances sync.Map LaunchTemplates sync.Map - ShouldTriggerInsufficientCapacity bool + InsufficientCapacityPools []CapacityPool } type EC2API struct { @@ -55,8 +60,6 @@ type EC2API struct { // DefaultSupportedUsageClasses is a var because []*string can't be a const var DefaultSupportedUsageClasses = aws.StringSlice([]string{"on-demand", "spot"}) -const InsufficientCapacityInstanceType = "inf1.6xlarge" - // Reset must be called between tests otherwise tests will pollute // each other. func (e *EC2API) Reset() { @@ -65,7 +68,7 @@ func (e *EC2API) Reset() { CalledWithCreateLaunchTemplateInput: set.NewSet(), Instances: sync.Map{}, LaunchTemplates: sync.Map{}, - ShouldTriggerInsufficientCapacity: false, + InsufficientCapacityPools: []CapacityPool{}, } } @@ -76,8 +79,18 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee } instances := []*ec2.Instance{} instanceIds := []*string{} + skippedPools := []CapacityPool{} for i := 0; i < int(*input.TargetCapacitySpecification.TotalTargetCapacity); i++ { - if e.ShouldTriggerInsufficientCapacity && aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].InstanceType) == InsufficientCapacityInstanceType { + skipInstance := false + for _, pool := range e.InsufficientCapacityPools { + if pool.InstanceType == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].InstanceType) && + pool.Zone == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone) { + skippedPools = append(skippedPools, pool) + skipInstance = true + break + } + } + if skipInstance { continue } instances = append(instances, &ec2.Instance{ @@ -92,18 +105,22 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee result := &ec2.CreateFleetOutput{ Instances: []*ec2.CreateFleetInstance{{InstanceIds: instanceIds}}} - if e.ShouldTriggerInsufficientCapacity { - result.Errors = []*ec2.CreateFleetError{{ - ErrorCode: aws.String("InsufficientInstanceCapacity"), - LaunchTemplateAndOverrides: &ec2.LaunchTemplateAndOverridesResponse{ - LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecification{ - LaunchTemplateId: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateId, - LaunchTemplateName: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName}, - Overrides: &ec2.FleetLaunchTemplateOverrides{ - InstanceType: aws.String(InsufficientCapacityInstanceType), - AvailabilityZone: aws.String("test-zone-1a"), - SubnetId: aws.String("test-subnet-1")}, - }}} + if len(skippedPools) > 0 { + for _, pool := range skippedPools { + result.Errors = append(result.Errors, &ec2.CreateFleetError{ + ErrorCode: aws.String("InsufficientInstanceCapacity"), + LaunchTemplateAndOverrides: &ec2.LaunchTemplateAndOverridesResponse{ + LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecification{ + LaunchTemplateId: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateId, + LaunchTemplateName: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName, + }, + Overrides: &ec2.FleetLaunchTemplateOverrides{ + InstanceType: aws.String(pool.InstanceType), + AvailabilityZone: aws.String(pool.Zone), + }, + }, + }) + } } return result, nil } diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 006515e2040f..e14418ec5670 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -24,12 +24,10 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" - "github.com/aws/aws-sdk-go/service/ssm" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/kubernetes" "knative.dev/pkg/logging" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -45,17 +43,6 @@ type InstanceProvider struct { launchTemplateProvider *LaunchTemplateProvider } -func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, ssm *ssm.SSM, - clientSet *kubernetes.Clientset) *InstanceProvider { - return &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider, - NewLaunchTemplateProvider( - ec2api, - NewAMIProvider(ssm, clientSet), - NewSecurityGroupProvider(ec2api), - ), - } -} - // Create an instance given the constraints. // instanceTypes should be sorted by priority for spot capacity type. // If spot is not used, the instanceTypes are not required to be sorted @@ -197,12 +184,14 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst for i, instanceType := range instanceTypeOptions { for _, offering := range instanceType.Offerings() { // we can't assume that all zones will be available for all capacity types, hence this check - if offering.CapacityType != capacityType { + if capacityType != offering.CapacityType { + continue + } + if !zones.Has(offering.Zone) { continue } for _, subnet := range subnets { - subnetZone := aws.StringValue(subnet.AvailabilityZone) - if subnetZone != offering.Zone || !zones.Has(offering.Zone) { + if aws.StringValue(subnet.AvailabilityZone) != offering.Zone { continue } override := &ec2.FleetLaunchTemplateOverridesRequest{ @@ -289,15 +278,11 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes } func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, errors []*ec2.CreateFleetError, capacityType string) { - insufficientCapacityOfferings := map[string]sets.String{} for _, err := range errors { if InsufficientCapacityErrorCode == aws.StringValue(err.ErrorCode) { - createOrAppendToMapValue(insufficientCapacityOfferings, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone)) + p.instanceTypeProvider.CacheUnavailable(ctx, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone), capacityType) } } - if len(insufficientCapacityOfferings) > 0 { - p.instanceTypeProvider.TrackUnavailableOfferings(ctx, insufficientCapacityOfferings, capacityType) - } } func getInstanceID(node *v1.Node) (*string, error) { @@ -342,12 +327,3 @@ func combineReservations(reservations []*ec2.Reservation) []*ec2.Instance { } return instances } - -func createOrAppendToMapValue(mapToUpdate map[string]sets.String, key string, newValue string) { - existingValueSet, hasValue := mapToUpdate[key] - if hasValue { - existingValueSet.Insert(newValue) - } else { - mapToUpdate[key] = sets.NewString(newValue) - } -} diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index e71e7f1b96c4..e635fadcd3d0 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -58,14 +58,14 @@ func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvi } // Get all instance type options (the constraints are only used for tag filtering on subnets, not for Requirements filtering) -func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints) ([]cloudprovider.InstanceType, error) { +func (p *InstanceTypeProvider) Get(ctx context.Context, provider *v1alpha1.AWS) ([]cloudprovider.InstanceType, error) { // Get InstanceTypes from EC2 instanceTypes, err := p.getInstanceTypes(ctx) if err != nil { return nil, err } // Get Viable AZs from subnets - subnets, err := p.subnetProvider.Get(ctx, constraints.AWS) + subnets, err := p.subnetProvider.Get(ctx, provider) if err != nil { return nil, err } @@ -95,7 +95,7 @@ func (p *InstanceTypeProvider) createOfferings(instanceType *InstanceType, subne // while usage classes should be a distinct set, there's no guarantee of that for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) { // exclude any offerings that have recently seen an insufficient capacity error from EC2 - if _, isUnavailable := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(capacityType, instanceType.Name(), zone)); !isUnavailable { + if _, isUnavailable := p.unavailableOfferings.Get(UnavailableOfferingsCacheKey(capacityType, instanceType.Name(), zone)); !isUnavailable { offerings = append(offerings, cloudprovider.Offering{Zone: zone, CapacityType: capacityType}) } } @@ -169,24 +169,20 @@ func (p *InstanceTypeProvider) filter(instanceType *ec2.InstanceTypeInfo) bool { ) } -// TrackUnavailableOfferings allows the InstanceProvider to communicate recently observed temporary capacity shortages in +// CacheUnavailable allows the InstanceProvider to communicate recently observed temporary capacity shortages in // the provided offerings -func (p *InstanceTypeProvider) TrackUnavailableOfferings(ctx context.Context, offerings map[string]sets.String, capacityType string) { - for instanceType, zones := range offerings { - for zone := range zones { - cacheKey := unavailableOfferingsCacheKey(capacityType, instanceType, zone) - logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zone: %s, capacityType: %s }, avoiding for %s", - InsufficientCapacityErrorCode, - instanceType, - zone, - capacityType, - InsufficientCapacityErrorCacheTTL) - // even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL - p.unavailableOfferings.SetDefault(cacheKey, struct{}{}) - } - } +func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceType string, zone string, capacityType string) { + cacheKey := UnavailableOfferingsCacheKey(capacityType, instanceType, zone) + logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zone: %s, capacityType: %s }, avoiding for %s", + InsufficientCapacityErrorCode, + instanceType, + zone, + capacityType, + InsufficientCapacityErrorCacheTTL) + // even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL + p.unavailableOfferings.SetDefault(cacheKey, struct{}{}) } -func unavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string { +func UnavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string { return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone) } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 1d15ce9aaca0..ed9afc4a6e3e 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -177,7 +177,7 @@ var _ = Describe("Allocation", func() { }) Context("Insufficient Capacity Error Cache", func() { It("should launch instances on second recon attempt with Insufficient Capacity Error Cache fallback", func() { - fakeEC2API.ShouldTriggerInsufficientCapacity = true + fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}} pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{ @@ -203,7 +203,7 @@ var _ = Describe("Allocation", func() { Expect(nodeNames.Len()).To(Equal(2)) }) It("should launch instances on later recon attempt with Insufficient Capacity Error Cache expiry", func() { - fakeEC2API.ShouldTriggerInsufficientCapacity = true + fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}} pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ ResourceRequirements: v1.ResourceRequirements{ @@ -223,7 +223,7 @@ var _ = Describe("Allocation", func() { ExpectNotScheduled(ctx, env.Client, pod) } // capacity shortage is over - wait for expiry (N.B. the Karpenter logging will not show the overridden cache expiry in this test context) - fakeEC2API.ShouldTriggerInsufficientCapacity = false + fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{} Eventually(func(g Gomega) int { nodeNames := sets.NewString() for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { From 0bccea1fb21615630bc315a525375f1b159cecdc Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Thu, 25 Nov 2021 01:28:07 +0000 Subject: [PATCH 07/11] Simplify cache expiry test for unavailable offerings caching --- pkg/cloudprovider/aws/suite_test.go | 22 +++++++++---------- pkg/test/expectations/expectations.go | 31 ++++++--------------------- 2 files changed, 16 insertions(+), 37 deletions(-) diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index ed9afc4a6e3e..63059c7d79fc 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "testing" - "time" "github.com/Pallinder/go-randomdata" "github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5" @@ -51,12 +50,11 @@ import ( var ctx context.Context var env *test.Environment var launchTemplateCache *cache.Cache +var unavailableOfferingsCache *cache.Cache var fakeEC2API *fake.EC2API var provisioners *provisioning.Controller var scheduler *scheduling.Controller -const shortenedUnavailableOfferingsTTL = 2 * time.Second - func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) RegisterFailHandler(Fail) @@ -67,13 +65,14 @@ var _ = BeforeSuite(func() { env = test.NewEnvironment(ctx, func(e *test.Environment) { ctx = injection.WithOptions(ctx, options.Options{ClusterName: "test-cluster", ClusterEndpoint: "https://test-cluster"}) launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval) + unavailableOfferingsCache = cache.New(InsufficientCapacityErrorCacheTTL, InsufficientCapacityErrorCacheCleanupInterval) fakeEC2API = &fake.EC2API{} subnetProvider := NewSubnetProvider(fakeEC2API) instanceTypeProvider := &InstanceTypeProvider{ ec2api: fakeEC2API, subnetProvider: subnetProvider, cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval), - unavailableOfferings: cache.New(shortenedUnavailableOfferingsTTL, InsufficientCapacityErrorCacheCleanupInterval), + unavailableOfferings: unavailableOfferingsCache, } clientSet := kubernetes.NewForConfigOrDie(e.Config) cloudProvider := &CloudProvider{ @@ -222,15 +221,14 @@ var _ = Describe("Allocation", func() { for _, pod := range pods { ExpectNotScheduled(ctx, env.Client, pod) } - // capacity shortage is over - wait for expiry (N.B. the Karpenter logging will not show the overridden cache expiry in this test context) + // capacity shortage is over - expire the item from the cache and try again fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{} - Eventually(func(g Gomega) int { - nodeNames := sets.NewString() - for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { - nodeNames = nodeNames.Insert(ExpectScheduledWithInstanceTypeAndGomega(ctx, env.Client, pod, "inf1.6xlarge", g).Name) - } - return len(nodeNames) - }, shortenedUnavailableOfferingsTTL*2, RequestInterval).Should(Equal(1)) + unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", "test-zone-1a")) + nodeNames := sets.NewString() + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { + nodeNames = nodeNames.Insert(ExpectScheduledWithInstanceType(ctx, env.Client, pod, "inf1.6xlarge").Name) + } + Expect(len(nodeNames)).To(Equal(1)) }) }) Context("CapacityType", func() { diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 6f02739b6343..c193ca7c743c 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -41,30 +41,15 @@ const ( RequestInterval = 1 * time.Second ) -func defaultGomegaIfNil(g Gomega) Gomega { - if g != nil { - return g - } - return Default -} - func ExpectPodExists(c client.Client, name string, namespace string) *v1.Pod { - return ExpectPodExistsWithGomega(c, name, namespace, nil) -} - -func ExpectPodExistsWithGomega(c client.Client, name string, namespace string, g Gomega) *v1.Pod { pod := &v1.Pod{} - defaultGomegaIfNil(g).Expect(c.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, pod)).To(Succeed()) + Expect(c.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, pod)).To(Succeed()) return pod } func ExpectNodeExists(c client.Client, name string) *v1.Node { - return ExpectNodeExistsWithGomega(c, name, nil) -} - -func ExpectNodeExistsWithGomega(c client.Client, name string, g Gomega) *v1.Node { node := &v1.Node{} - defaultGomegaIfNil(g).Expect(c.Get(context.Background(), client.ObjectKey{Name: name}, node)).To(Succeed()) + Expect(c.Get(context.Background(), client.ObjectKey{Name: name}, node)).To(Succeed()) return node } @@ -85,14 +70,10 @@ func ExpectScheduled(ctx context.Context, c client.Client, pod *v1.Pod) *v1.Node } func ExpectScheduledWithInstanceType(ctx context.Context, c client.Client, pod *v1.Pod, instanceType string) *v1.Node { - return ExpectScheduledWithInstanceTypeAndGomega(ctx, c, pod, instanceType, nil) -} - -func ExpectScheduledWithInstanceTypeAndGomega(ctx context.Context, c client.Client, pod *v1.Pod, instanceType string, g Gomega) *v1.Node { - p := ExpectPodExistsWithGomega(c, pod.Name, pod.Namespace, defaultGomegaIfNil(g)) - defaultGomegaIfNil(g).Expect(p.Spec.NodeName).ToNot(BeEmpty(), fmt.Sprintf("expected %s/%s to be scheduled", pod.Namespace, pod.Name)) - node := ExpectNodeExistsWithGomega(c, p.Spec.NodeName, defaultGomegaIfNil(g)) - defaultGomegaIfNil(g).Expect(node.Labels["node.kubernetes.io/instance-type"]).To(Equal(instanceType), + p := ExpectPodExists(c, pod.Name, pod.Namespace) + Expect(p.Spec.NodeName).ToNot(BeEmpty(), fmt.Sprintf("expected %s/%s to be scheduled", pod.Namespace, pod.Name)) + node := ExpectNodeExists(c, p.Spec.NodeName) + Expect(node.Labels["node.kubernetes.io/instance-type"]).To(Equal(instanceType), fmt.Sprintf("expected %s/%s to be scheduled to a node with instance type %s", pod.Namespace, pod.Name, instanceType)) return node } From 6dc13d19bd225ae5ae0128d2bd63de80b4331d8f Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Thu, 25 Nov 2021 01:33:12 +0000 Subject: [PATCH 08/11] Addressed 1 comment --- pkg/cloudprovider/aws/instancetypes.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index e635fadcd3d0..1601b3f7cc41 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -172,7 +172,6 @@ func (p *InstanceTypeProvider) filter(instanceType *ec2.InstanceTypeInfo) bool { // CacheUnavailable allows the InstanceProvider to communicate recently observed temporary capacity shortages in // the provided offerings func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceType string, zone string, capacityType string) { - cacheKey := UnavailableOfferingsCacheKey(capacityType, instanceType, zone) logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zone: %s, capacityType: %s }, avoiding for %s", InsufficientCapacityErrorCode, instanceType, @@ -180,7 +179,7 @@ func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceTyp capacityType, InsufficientCapacityErrorCacheTTL) // even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL - p.unavailableOfferings.SetDefault(cacheKey, struct{}{}) + p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), struct{}{}) } func UnavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string { From 7e42f051e193e2d0136b4cc4a9eca8afee7371d5 Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Thu, 25 Nov 2021 02:18:58 +0000 Subject: [PATCH 09/11] Add test for zonal fallback for insufficient capacity cache tests, remove unnecessary expectation method --- pkg/cloudprovider/aws/fake/ec2api.go | 6 ++- pkg/cloudprovider/aws/instancetypes.go | 2 +- pkg/cloudprovider/aws/suite_test.go | 61 +++++++++++++++++++++++--- pkg/test/expectations/expectations.go | 9 ---- 4 files changed, 61 insertions(+), 17 deletions(-) diff --git a/pkg/cloudprovider/aws/fake/ec2api.go b/pkg/cloudprovider/aws/fake/ec2api.go index 3b1e6ce0fc01..7bbfe09a8da5 100644 --- a/pkg/cloudprovider/aws/fake/ec2api.go +++ b/pkg/cloudprovider/aws/fake/ec2api.go @@ -95,7 +95,7 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee } instances = append(instances, &ec2.Instance{ InstanceId: aws.String(randomdata.SillyName()), - Placement: &ec2.Placement{AvailabilityZone: aws.String("test-zone-1a")}, + Placement: &ec2.Placement{AvailabilityZone: input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone}, PrivateDnsName: aws.String(randomdata.IpV4Address()), InstanceType: input.LaunchTemplateConfigs[0].Overrides[0].InstanceType, }) @@ -388,6 +388,10 @@ func (e *EC2API) DescribeInstanceTypeOfferingsPagesWithContext(_ context.Context InstanceType: aws.String("p3.8xlarge"), Location: aws.String("test-zone-1a"), }, + { + InstanceType: aws.String("p3.8xlarge"), + Location: aws.String("test-zone-1b"), + }, { InstanceType: aws.String("inf1.2xlarge"), Location: aws.String("test-zone-1a"), diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index 1601b3f7cc41..f5bc1e6f4fc0 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -179,7 +179,7 @@ func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceTyp capacityType, InsufficientCapacityErrorCacheTTL) // even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL - p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), struct{}{}) + p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), sets.Empty{}) } func UnavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string { diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 63059c7d79fc..b08f633c8653 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -55,6 +55,9 @@ var fakeEC2API *fake.EC2API var provisioners *provisioning.Controller var scheduler *scheduling.Controller +const InstanceTypeLabel = "node.kubernetes.io/instance-type" +const ZoneLabel = "topology.kubernetes.io/zone" + func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) RegisterFailHandler(Fail) @@ -140,8 +143,9 @@ var _ = Describe("Allocation", func() { Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, }, })) { - ExpectScheduled(ctx, env.Client, pod) - nodeNames.Insert(ExpectScheduledWithInstanceType(ctx, env.Client, pod, "p3.8xlarge").Name) + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(InstanceTypeLabel, "p3.8xlarge")) + nodeNames.Insert(node.Name) } Expect(nodeNames.Len()).To(Equal(2)) }) @@ -169,13 +173,15 @@ var _ = Describe("Allocation", func() { }, }), ) { - nodeNames.Insert(ExpectScheduledWithInstanceType(ctx, env.Client, pod, "inf1.6xlarge").Name) + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(InstanceTypeLabel, "inf1.6xlarge")) + nodeNames.Insert(node.Name) } Expect(nodeNames.Len()).To(Equal(2)) }) }) Context("Insufficient Capacity Error Cache", func() { - It("should launch instances on second recon attempt with Insufficient Capacity Error Cache fallback", func() { + It("should launch instances of different type on second recon attempt with Insufficient Capacity Error Cache fallback", func() { fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}} pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ @@ -197,7 +203,48 @@ var _ = Describe("Allocation", func() { } nodeNames := sets.NewString() for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { - nodeNames.Insert(ExpectScheduledWithInstanceType(ctx, env.Client, pod, "inf1.2xlarge").Name) + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(InstanceTypeLabel, "inf1.2xlarge")) + nodeNames.Insert(node.Name) + } + Expect(nodeNames.Len()).To(Equal(2)) + }) + It("should launch instances of different type on second recon attempt with Insufficient Capacity Error Cache fallback", func() { + fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "p3.8xlarge", Zone: "test-zone-1a"}} + pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, + Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, + }, + }), + // Should pack onto same instance + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("2")}, + Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("2")}, + }, + }), + // Should pack onto a separate instance + test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, + Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, + }, + }), + ) + // it should've tried to pack them in test-zone-1a on p3.8xlages then hit ICEs, so the next attempt will avoid that pool and try test-zone-1b + for _, pod := range pods { + ExpectNotScheduled(ctx, env.Client, pod) + } + + nodeNames := sets.NewString() + for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(SatisfyAll( + HaveKeyWithValue(InstanceTypeLabel, "p3.8xlarge"), + HaveKeyWithValue(ZoneLabel, "test-zone-1b"))) + nodeNames.Insert(node.Name) } Expect(nodeNames.Len()).To(Equal(2)) }) @@ -226,7 +273,9 @@ var _ = Describe("Allocation", func() { unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", "test-zone-1a")) nodeNames := sets.NewString() for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { - nodeNames = nodeNames.Insert(ExpectScheduledWithInstanceType(ctx, env.Client, pod, "inf1.6xlarge").Name) + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(InstanceTypeLabel, "inf1.6xlarge")) + nodeNames.Insert(node.Name) } Expect(len(nodeNames)).To(Equal(1)) }) diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index c193ca7c743c..1e59cc7b0e4f 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -69,15 +69,6 @@ func ExpectScheduled(ctx context.Context, c client.Client, pod *v1.Pod) *v1.Node return ExpectNodeExists(c, p.Spec.NodeName) } -func ExpectScheduledWithInstanceType(ctx context.Context, c client.Client, pod *v1.Pod, instanceType string) *v1.Node { - p := ExpectPodExists(c, pod.Name, pod.Namespace) - Expect(p.Spec.NodeName).ToNot(BeEmpty(), fmt.Sprintf("expected %s/%s to be scheduled", pod.Namespace, pod.Name)) - node := ExpectNodeExists(c, p.Spec.NodeName) - Expect(node.Labels["node.kubernetes.io/instance-type"]).To(Equal(instanceType), - fmt.Sprintf("expected %s/%s to be scheduled to a node with instance type %s", pod.Namespace, pod.Name, instanceType)) - return node -} - func ExpectNotScheduled(ctx context.Context, c client.Client, pod *v1.Pod) { p := ExpectPodExists(c, pod.Name, pod.Namespace) Eventually(p.Spec.NodeName).Should(BeEmpty(), fmt.Sprintf("expected %s/%s to not be scheduled", pod.Namespace, pod.Name)) From 38e07a27bfd29f52cf70d83a4114a188bdc86929 Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Thu, 25 Nov 2021 02:54:12 +0000 Subject: [PATCH 10/11] Addressed minor comments, fixed test race condition --- pkg/cloudprovider/aws/subnets.go | 1 - pkg/cloudprovider/aws/suite_test.go | 16 +++++++--------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/pkg/cloudprovider/aws/subnets.go b/pkg/cloudprovider/aws/subnets.go index 5f9fecd8effa..5999c430b8eb 100644 --- a/pkg/cloudprovider/aws/subnets.go +++ b/pkg/cloudprovider/aws/subnets.go @@ -63,7 +63,6 @@ func (s *SubnetProvider) Get(ctx context.Context, constraints *v1alpha1.AWS) ([] func getFilters(constraints *v1alpha1.AWS) []*ec2.Filter { filters := []*ec2.Filter{} - // Filter by subnet for key, value := range constraints.SubnetSelector { if value == "*" { diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index b08f633c8653..ce58cb1672ca 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -55,9 +55,6 @@ var fakeEC2API *fake.EC2API var provisioners *provisioning.Controller var scheduler *scheduling.Controller -const InstanceTypeLabel = "node.kubernetes.io/instance-type" -const ZoneLabel = "topology.kubernetes.io/zone" - func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) RegisterFailHandler(Fail) @@ -116,6 +113,7 @@ var _ = Describe("Allocation", func() { fakeEC2API.Reset() ExpectCleanedUp(env.Client) launchTemplateCache.Flush() + unavailableOfferingsCache.Flush() }) Context("Reconciliation", func() { @@ -144,7 +142,7 @@ var _ = Describe("Allocation", func() { }, })) { node := ExpectScheduled(ctx, env.Client, pod) - Expect(node.Labels).To(HaveKeyWithValue(InstanceTypeLabel, "p3.8xlarge")) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "p3.8xlarge")) nodeNames.Insert(node.Name) } Expect(nodeNames.Len()).To(Equal(2)) @@ -174,7 +172,7 @@ var _ = Describe("Allocation", func() { }), ) { node := ExpectScheduled(ctx, env.Client, pod) - Expect(node.Labels).To(HaveKeyWithValue(InstanceTypeLabel, "inf1.6xlarge")) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge")) nodeNames.Insert(node.Name) } Expect(nodeNames.Len()).To(Equal(2)) @@ -204,7 +202,7 @@ var _ = Describe("Allocation", func() { nodeNames := sets.NewString() for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { node := ExpectScheduled(ctx, env.Client, pod) - Expect(node.Labels).To(HaveKeyWithValue(InstanceTypeLabel, "inf1.2xlarge")) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.2xlarge")) nodeNames.Insert(node.Name) } Expect(nodeNames.Len()).To(Equal(2)) @@ -242,8 +240,8 @@ var _ = Describe("Allocation", func() { for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(SatisfyAll( - HaveKeyWithValue(InstanceTypeLabel, "p3.8xlarge"), - HaveKeyWithValue(ZoneLabel, "test-zone-1b"))) + HaveKeyWithValue(v1.LabelInstanceTypeStable, "p3.8xlarge"), + HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1b"))) nodeNames.Insert(node.Name) } Expect(nodeNames.Len()).To(Equal(2)) @@ -274,7 +272,7 @@ var _ = Describe("Allocation", func() { nodeNames := sets.NewString() for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { node := ExpectScheduled(ctx, env.Client, pod) - Expect(node.Labels).To(HaveKeyWithValue(InstanceTypeLabel, "inf1.6xlarge")) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge")) nodeNames.Insert(node.Name) } Expect(len(nodeNames)).To(Equal(1)) From 408a81a19bd1093e13b49fdfa72896db91afc92a Mon Sep 17 00:00:00 2001 From: Elton Pinto Date: Thu, 25 Nov 2021 03:33:14 +0000 Subject: [PATCH 11/11] Fixed insufficient capacity cache zonal fallback test, simplified a couple of other tests --- pkg/cloudprovider/aws/suite_test.go | 90 +++++++++++------------------ 1 file changed, 33 insertions(+), 57 deletions(-) diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index ce58cb1672ca..3a53dc60bf6b 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -179,16 +179,18 @@ var _ = Describe("Allocation", func() { }) }) Context("Insufficient Capacity Error Cache", func() { - It("should launch instances of different type on second recon attempt with Insufficient Capacity Error Cache fallback", func() { + It("should launch instances of different type on second reconciliation attempt with Insufficient Capacity Error Cache fallback", func() { fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}} pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}, ResourceRequirements: v1.ResourceRequirements{ Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, }, }), test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1a"}, ResourceRequirements: v1.ResourceRequirements{ Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("1")}, @@ -207,75 +209,49 @@ var _ = Describe("Allocation", func() { } Expect(nodeNames.Len()).To(Equal(2)) }) - It("should launch instances of different type on second recon attempt with Insufficient Capacity Error Cache fallback", func() { + It("should launch instances in a different zone on second reconciliation attempt with Insufficient Capacity Error Cache fallback", func() { fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "p3.8xlarge", Zone: "test-zone-1a"}} - pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, - test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, - Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, - }, - }), - // Should pack onto same instance - test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("2")}, - Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("2")}, - }, - }), - // Should pack onto a separate instance - test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, - Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("4")}, - }, - }), - ) - // it should've tried to pack them in test-zone-1a on p3.8xlages then hit ICEs, so the next attempt will avoid that pool and try test-zone-1b - for _, pod := range pods { - ExpectNotScheduled(ctx, env.Client, pod) - } + pod := test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, + Limits: v1.ResourceList{resources.NvidiaGPU: resource.MustParse("1")}, + }, + }) + pod.Spec.Affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1a"}}, + }}, + }, + }}} + pod = ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pod)[0] + // it should've tried to pack them in test-zone-1a on a p3.8xlarge then hit insufficient capacity, the next attempt will try test-zone-1b + ExpectNotScheduled(ctx, env.Client, pod) - nodeNames := sets.NewString() - for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { - node := ExpectScheduled(ctx, env.Client, pod) - Expect(node.Labels).To(SatisfyAll( - HaveKeyWithValue(v1.LabelInstanceTypeStable, "p3.8xlarge"), - HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1b"))) - nodeNames.Insert(node.Name) - } - Expect(nodeNames.Len()).To(Equal(2)) + pod = ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pod)[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(SatisfyAll( + HaveKeyWithValue(v1.LabelInstanceTypeStable, "p3.8xlarge"), + HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1b"))) }) - It("should launch instances on later recon attempt with Insufficient Capacity Error Cache expiry", func() { + It("should launch instances on later reconciliation attempt with Insufficient Capacity Error Cache expiry", func() { fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}} - pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, - test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, - Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, - }, - }), + pod := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelInstanceTypeStable: "inf1.6xlarge"}, ResourceRequirements: v1.ResourceRequirements{ Requests: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, Limits: v1.ResourceList{resources.AWSNeuron: resource.MustParse("2")}, }, }), - ) - // it should've tried to pack them on a single inf1.6xlarge then hit an insufficient capacity error - for _, pod := range pods { - ExpectNotScheduled(ctx, env.Client, pod) - } + )[0] + ExpectNotScheduled(ctx, env.Client, pod) // capacity shortage is over - expire the item from the cache and try again fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{} unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", "test-zone-1a")) - nodeNames := sets.NewString() - for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) { - node := ExpectScheduled(ctx, env.Client, pod) - Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge")) - nodeNames.Insert(node.Name) - } - Expect(len(nodeNames)).To(Equal(1)) + pod = ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pod)[0] + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge")) }) }) Context("CapacityType", func() {