Skip to content

Commit

Permalink
Address last round of comments, mostly stylistic changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Elton Pinto committed Nov 25, 2021
1 parent cf8dc0e commit b056e59
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 72 deletions.
12 changes: 9 additions & 3 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,14 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud
return &CloudProvider{
instanceTypeProvider: instanceTypeProvider,
subnetProvider: subnetProvider,
instanceProvider: NewInstanceProvider(ec2api, instanceTypeProvider, subnetProvider, ssm.New(sess), options.ClientSet),
creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst),
instanceProvider: &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider,
NewLaunchTemplateProvider(
ec2api,
NewAMIProvider(ssm.New(sess), options.ClientSet),
NewSecurityGroupProvider(ec2api),
),
},
creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst),
}
}

Expand Down Expand Up @@ -132,7 +138,7 @@ func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alp
if err != nil {
return nil, apis.ErrGeneric(err.Error())
}
return c.instanceTypeProvider.Get(ctx, vendorConstraints)
return c.instanceTypeProvider.Get(ctx, vendorConstraints.AWS)
}

func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error {
Expand Down
51 changes: 34 additions & 17 deletions pkg/cloudprovider/aws/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import (
set "github.com/deckarep/golang-set"
)

type CapacityPool struct {
InstanceType string
Zone string
}

// EC2Behavior must be reset between tests otherwise tests will
// pollute each other.
type EC2Behavior struct {
Expand All @@ -44,7 +49,7 @@ type EC2Behavior struct {
CalledWithCreateLaunchTemplateInput set.Set
Instances sync.Map
LaunchTemplates sync.Map
ShouldTriggerInsufficientCapacity bool
InsufficientCapacityPools []CapacityPool
}

type EC2API struct {
Expand All @@ -55,8 +60,6 @@ type EC2API struct {
// DefaultSupportedUsageClasses is a var because []*string can't be a const
var DefaultSupportedUsageClasses = aws.StringSlice([]string{"on-demand", "spot"})

const InsufficientCapacityInstanceType = "inf1.6xlarge"

// Reset must be called between tests otherwise tests will pollute
// each other.
func (e *EC2API) Reset() {
Expand All @@ -65,7 +68,7 @@ func (e *EC2API) Reset() {
CalledWithCreateLaunchTemplateInput: set.NewSet(),
Instances: sync.Map{},
LaunchTemplates: sync.Map{},
ShouldTriggerInsufficientCapacity: false,
InsufficientCapacityPools: []CapacityPool{},
}
}

Expand All @@ -76,8 +79,18 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee
}
instances := []*ec2.Instance{}
instanceIds := []*string{}
skippedPools := []CapacityPool{}
for i := 0; i < int(*input.TargetCapacitySpecification.TotalTargetCapacity); i++ {
if e.ShouldTriggerInsufficientCapacity && aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].InstanceType) == InsufficientCapacityInstanceType {
skipInstance := false
for _, pool := range e.InsufficientCapacityPools {
if pool.InstanceType == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].InstanceType) &&
pool.Zone == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone) {
skippedPools = append(skippedPools, pool)
skipInstance = true
break
}
}
if skipInstance {
continue
}
instances = append(instances, &ec2.Instance{
Expand All @@ -92,18 +105,22 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee

result := &ec2.CreateFleetOutput{
Instances: []*ec2.CreateFleetInstance{{InstanceIds: instanceIds}}}
if e.ShouldTriggerInsufficientCapacity {
result.Errors = []*ec2.CreateFleetError{{
ErrorCode: aws.String("InsufficientInstanceCapacity"),
LaunchTemplateAndOverrides: &ec2.LaunchTemplateAndOverridesResponse{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecification{
LaunchTemplateId: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateId,
LaunchTemplateName: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName},
Overrides: &ec2.FleetLaunchTemplateOverrides{
InstanceType: aws.String(InsufficientCapacityInstanceType),
AvailabilityZone: aws.String("test-zone-1a"),
SubnetId: aws.String("test-subnet-1")},
}}}
if len(skippedPools) > 0 {
for _, pool := range skippedPools {
result.Errors = append(result.Errors, &ec2.CreateFleetError{
ErrorCode: aws.String("InsufficientInstanceCapacity"),
LaunchTemplateAndOverrides: &ec2.LaunchTemplateAndOverridesResponse{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecification{
LaunchTemplateId: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateId,
LaunchTemplateName: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName,
},
Overrides: &ec2.FleetLaunchTemplateOverrides{
InstanceType: aws.String(pool.InstanceType),
AvailabilityZone: aws.String(pool.Zone),
},
},
})
}
}
return result, nil
}
Expand Down
36 changes: 6 additions & 30 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/ssm"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/logging"

"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
Expand All @@ -45,17 +43,6 @@ type InstanceProvider struct {
launchTemplateProvider *LaunchTemplateProvider
}

func NewInstanceProvider(ec2api ec2iface.EC2API, instanceTypeProvider *InstanceTypeProvider, subnetProvider *SubnetProvider, ssm *ssm.SSM,
clientSet *kubernetes.Clientset) *InstanceProvider {
return &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider,
NewLaunchTemplateProvider(
ec2api,
NewAMIProvider(ssm, clientSet),
NewSecurityGroupProvider(ec2api),
),
}
}

// Create an instance given the constraints.
// instanceTypes should be sorted by priority for spot capacity type.
// If spot is not used, the instanceTypes are not required to be sorted
Expand Down Expand Up @@ -197,12 +184,14 @@ func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.Inst
for i, instanceType := range instanceTypeOptions {
for _, offering := range instanceType.Offerings() {
// we can't assume that all zones will be available for all capacity types, hence this check
if offering.CapacityType != capacityType {
if capacityType != offering.CapacityType {
continue
}
if !zones.Has(offering.Zone) {
continue
}
for _, subnet := range subnets {
subnetZone := aws.StringValue(subnet.AvailabilityZone)
if subnetZone != offering.Zone || !zones.Has(offering.Zone) {
if aws.StringValue(subnet.AvailabilityZone) != offering.Zone {
continue
}
override := &ec2.FleetLaunchTemplateOverridesRequest{
Expand Down Expand Up @@ -289,15 +278,11 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes
}

func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, errors []*ec2.CreateFleetError, capacityType string) {
insufficientCapacityOfferings := map[string]sets.String{}
for _, err := range errors {
if InsufficientCapacityErrorCode == aws.StringValue(err.ErrorCode) {
createOrAppendToMapValue(insufficientCapacityOfferings, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone))
p.instanceTypeProvider.CacheUnavailable(ctx, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone), capacityType)
}
}
if len(insufficientCapacityOfferings) > 0 {
p.instanceTypeProvider.TrackUnavailableOfferings(ctx, insufficientCapacityOfferings, capacityType)
}
}

func getInstanceID(node *v1.Node) (*string, error) {
Expand Down Expand Up @@ -342,12 +327,3 @@ func combineReservations(reservations []*ec2.Reservation) []*ec2.Instance {
}
return instances
}

func createOrAppendToMapValue(mapToUpdate map[string]sets.String, key string, newValue string) {
existingValueSet, hasValue := mapToUpdate[key]
if hasValue {
existingValueSet.Insert(newValue)
} else {
mapToUpdate[key] = sets.NewString(newValue)
}
}
34 changes: 15 additions & 19 deletions pkg/cloudprovider/aws/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvi
}

// Get all instance type options (the constraints are only used for tag filtering on subnets, not for Requirements filtering)
func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints) ([]cloudprovider.InstanceType, error) {
func (p *InstanceTypeProvider) Get(ctx context.Context, provider *v1alpha1.AWS) ([]cloudprovider.InstanceType, error) {
// Get InstanceTypes from EC2
instanceTypes, err := p.getInstanceTypes(ctx)
if err != nil {
return nil, err
}
// Get Viable AZs from subnets
subnets, err := p.subnetProvider.Get(ctx, constraints.AWS)
subnets, err := p.subnetProvider.Get(ctx, provider)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func (p *InstanceTypeProvider) createOfferings(instanceType *InstanceType, subne
// while usage classes should be a distinct set, there's no guarantee of that
for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) {
// exclude any offerings that have recently seen an insufficient capacity error from EC2
if _, isUnavailable := p.unavailableOfferings.Get(unavailableOfferingsCacheKey(capacityType, instanceType.Name(), zone)); !isUnavailable {
if _, isUnavailable := p.unavailableOfferings.Get(UnavailableOfferingsCacheKey(capacityType, instanceType.Name(), zone)); !isUnavailable {
offerings = append(offerings, cloudprovider.Offering{Zone: zone, CapacityType: capacityType})
}
}
Expand Down Expand Up @@ -169,24 +169,20 @@ func (p *InstanceTypeProvider) filter(instanceType *ec2.InstanceTypeInfo) bool {
)
}

// TrackUnavailableOfferings allows the InstanceProvider to communicate recently observed temporary capacity shortages in
// CacheUnavailable allows the InstanceProvider to communicate recently observed temporary capacity shortages in
// the provided offerings
func (p *InstanceTypeProvider) TrackUnavailableOfferings(ctx context.Context, offerings map[string]sets.String, capacityType string) {
for instanceType, zones := range offerings {
for zone := range zones {
cacheKey := unavailableOfferingsCacheKey(capacityType, instanceType, zone)
logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zone: %s, capacityType: %s }, avoiding for %s",
InsufficientCapacityErrorCode,
instanceType,
zone,
capacityType,
InsufficientCapacityErrorCacheTTL)
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
p.unavailableOfferings.SetDefault(cacheKey, struct{}{})
}
}
func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceType string, zone string, capacityType string) {
cacheKey := UnavailableOfferingsCacheKey(capacityType, instanceType, zone)
logging.FromContext(ctx).Debugf("Saw %s for offering { instanceType: %s, zone: %s, capacityType: %s }, avoiding for %s",
InsufficientCapacityErrorCode,
instanceType,
zone,
capacityType,
InsufficientCapacityErrorCacheTTL)
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
p.unavailableOfferings.SetDefault(cacheKey, struct{}{})
}

func unavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string {
func UnavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string {
return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone)
}
6 changes: 3 additions & 3 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ var _ = Describe("Allocation", func() {
})
Context("Insufficient Capacity Error Cache", func() {
It("should launch instances on second recon attempt with Insufficient Capacity Error Cache fallback", func() {
fakeEC2API.ShouldTriggerInsufficientCapacity = true
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}}
pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Expand All @@ -203,7 +203,7 @@ var _ = Describe("Allocation", func() {
Expect(nodeNames.Len()).To(Equal(2))
})
It("should launch instances on later recon attempt with Insufficient Capacity Error Cache expiry", func() {
fakeEC2API.ShouldTriggerInsufficientCapacity = true
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{{InstanceType: "inf1.6xlarge", Zone: "test-zone-1a"}}
pods := ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner,
test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Expand All @@ -223,7 +223,7 @@ var _ = Describe("Allocation", func() {
ExpectNotScheduled(ctx, env.Client, pod)
}
// capacity shortage is over - wait for expiry (N.B. the Karpenter logging will not show the overridden cache expiry in this test context)
fakeEC2API.ShouldTriggerInsufficientCapacity = false
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{}
Eventually(func(g Gomega) int {
nodeNames := sets.NewString()
for _, pod := range ExpectProvisioned(ctx, env.Client, scheduler, provisioners, provisioner, pods...) {
Expand Down

0 comments on commit b056e59

Please sign in to comment.