Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ICE caching for AWS cloud provider and fix bug in filtering for SubnetProvider #816

Merged
merged 11 commits into from
Nov 25, 2021
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/imdario/mergo v0.3.12
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.16.0
github.com/onsi/gomega v1.17.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.11.0
go.uber.org/multierr v1.7.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,9 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/openzipkin/zipkin-go v0.3.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0R4l6Zg0P1tTQ=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,13 @@ func (c *CloudProvider) create(ctx context.Context, constraints *v1alpha5.Constr
return errs
}

// GetInstanceTypes returns all available InstanceTypes despite accepting a Constraints struct (note that it does not utilize Requirements)
func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) {
vendorConstraints, err := v1alpha1.Deserialize(constraints)
if err != nil {
return nil, apis.ErrGeneric(err.Error())
}
return c.instanceTypeProvider.Get(ctx, vendorConstraints)
return c.instanceTypeProvider.Get(ctx, vendorConstraints.AWS)
}

func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/cloudprovider/aws/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ var (
}
)

// InsufficientCapacityErrorCode indicates that EC2 is temporarily lacking capacity for this
// instance type and availability zone combination
const InsufficientCapacityErrorCode = "InsufficientInstanceCapacity"

// isNotFound returns true if the err is an AWS error (even if it's
// wrapped) and is a known to mean "not found" (as opposed to a more
// serious or unexpected error)
Expand Down
91 changes: 84 additions & 7 deletions pkg/cloudprovider/aws/fake/ec2api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import (
set "github.com/deckarep/golang-set"
)

type CapacityPool struct {
InstanceType string
Zone string
}

// EC2Behavior must be reset between tests otherwise tests will
// pollute each other.
type EC2Behavior struct {
Expand All @@ -44,19 +49,26 @@ type EC2Behavior struct {
CalledWithCreateLaunchTemplateInput set.Set
Instances sync.Map
LaunchTemplates sync.Map
InsufficientCapacityPools []CapacityPool
}

type EC2API struct {
ec2iface.EC2API
EC2Behavior
}

// DefaultSupportedUsageClasses is a var because []*string can't be a const
var DefaultSupportedUsageClasses = aws.StringSlice([]string{"on-demand", "spot"})

// Reset must be called between tests otherwise tests will pollute
// each other.
func (e *EC2API) Reset() {
e.EC2Behavior = EC2Behavior{
CalledWithCreateFleetInput: set.NewSet(),
CalledWithCreateLaunchTemplateInput: set.NewSet(),
Instances: sync.Map{},
LaunchTemplates: sync.Map{},
InsufficientCapacityPools: []CapacityPool{},
}
}

Expand All @@ -67,18 +79,50 @@ func (e *EC2API) CreateFleetWithContext(_ context.Context, input *ec2.CreateFlee
}
instances := []*ec2.Instance{}
instanceIds := []*string{}
skippedPools := []CapacityPool{}
for i := 0; i < int(*input.TargetCapacitySpecification.TotalTargetCapacity); i++ {
skipInstance := false
for _, pool := range e.InsufficientCapacityPools {
if pool.InstanceType == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].InstanceType) &&
pool.Zone == aws.StringValue(input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone) {
skippedPools = append(skippedPools, pool)
skipInstance = true
break
}
}
if skipInstance {
continue
}
instances = append(instances, &ec2.Instance{
InstanceId: aws.String(randomdata.SillyName()),
Placement: &ec2.Placement{AvailabilityZone: aws.String("test-zone-1a")},
Placement: &ec2.Placement{AvailabilityZone: input.LaunchTemplateConfigs[0].Overrides[0].AvailabilityZone},
PrivateDnsName: aws.String(randomdata.IpV4Address()),
InstanceType: input.LaunchTemplateConfigs[0].Overrides[0].InstanceType,
})
e.Instances.Store(*instances[i].InstanceId, instances[i])
instanceIds = append(instanceIds, instances[i].InstanceId)
}

return &ec2.CreateFleetOutput{Instances: []*ec2.CreateFleetInstance{{InstanceIds: instanceIds}}}, nil
result := &ec2.CreateFleetOutput{
Instances: []*ec2.CreateFleetInstance{{InstanceIds: instanceIds}}}
if len(skippedPools) > 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, meant to remove this. Sorry :)

for _, pool := range skippedPools {
result.Errors = append(result.Errors, &ec2.CreateFleetError{
ErrorCode: aws.String("InsufficientInstanceCapacity"),
LaunchTemplateAndOverrides: &ec2.LaunchTemplateAndOverridesResponse{
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecification{
LaunchTemplateId: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateId,
LaunchTemplateName: input.LaunchTemplateConfigs[0].LaunchTemplateSpecification.LaunchTemplateName,
},
Overrides: &ec2.FleetLaunchTemplateOverrides{
InstanceType: aws.String(pool.InstanceType),
AvailabilityZone: aws.String(pool.Zone),
},
},
})
}
}
return result, nil
}

func (e *EC2API) CreateLaunchTemplateWithContext(_ context.Context, input *ec2.CreateLaunchTemplateInput, _ ...request.Option) (*ec2.CreateLaunchTemplateOutput, error) {
Expand Down Expand Up @@ -166,7 +210,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2
InstanceTypes: []*ec2.InstanceTypeInfo{
{
InstanceType: aws.String("m5.large"),
SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")},
SupportedUsageClasses: DefaultSupportedUsageClasses,
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
BurstablePerformanceSupported: aws.Bool(false),
BareMetal: aws.Bool(false),
Expand All @@ -186,7 +230,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2
},
{
InstanceType: aws.String("m5.xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")},
SupportedUsageClasses: DefaultSupportedUsageClasses,
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
BurstablePerformanceSupported: aws.Bool(false),
BareMetal: aws.Bool(false),
Expand All @@ -206,7 +250,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2
},
{
InstanceType: aws.String("p3.8xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")},
SupportedUsageClasses: DefaultSupportedUsageClasses,
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
BurstablePerformanceSupported: aws.Bool(false),
BareMetal: aws.Bool(false),
Expand All @@ -232,7 +276,7 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2
},
{
InstanceType: aws.String("c6g.large"),
SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")},
SupportedUsageClasses: DefaultSupportedUsageClasses,
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
BurstablePerformanceSupported: aws.Bool(false),
BareMetal: aws.Bool(false),
Expand All @@ -250,9 +294,34 @@ func (e *EC2API) DescribeInstanceTypesPagesWithContext(_ context.Context, _ *ec2
Ipv4AddressesPerInterface: aws.Int64(60),
},
},
{
InstanceType: aws.String("inf1.2xlarge"),
SupportedUsageClasses: DefaultSupportedUsageClasses,
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
BurstablePerformanceSupported: aws.Bool(false),
BareMetal: aws.Bool(false),
ProcessorInfo: &ec2.ProcessorInfo{
SupportedArchitectures: aws.StringSlice([]string{"x86_64"}),
},
VCpuInfo: &ec2.VCpuInfo{
DefaultVCpus: aws.Int64(8),
},
MemoryInfo: &ec2.MemoryInfo{
SizeInMiB: aws.Int64(16384),
},
InferenceAcceleratorInfo: &ec2.InferenceAcceleratorInfo{
Accelerators: []*ec2.InferenceDeviceInfo{{
Manufacturer: aws.String("AWS"),
Count: aws.Int64(1),
}}},
NetworkInfo: &ec2.NetworkInfo{
MaximumNetworkInterfaces: aws.Int64(4),
Ipv4AddressesPerInterface: aws.Int64(60),
},
},
{
InstanceType: aws.String("inf1.6xlarge"),
SupportedUsageClasses: []*string{aws.String("on-demand"), aws.String("spot")},
SupportedUsageClasses: DefaultSupportedUsageClasses,
SupportedVirtualizationTypes: []*string{aws.String("hvm")},
BurstablePerformanceSupported: aws.Bool(false),
BareMetal: aws.Bool(false),
Expand Down Expand Up @@ -319,6 +388,14 @@ func (e *EC2API) DescribeInstanceTypeOfferingsPagesWithContext(_ context.Context
InstanceType: aws.String("p3.8xlarge"),
Location: aws.String("test-zone-1a"),
},
{
InstanceType: aws.String("p3.8xlarge"),
Location: aws.String("test-zone-1b"),
},
{
InstanceType: aws.String("inf1.2xlarge"),
Location: aws.String("test-zone-1a"),
},
{
InstanceType: aws.String("inf1.6xlarge"),
Location: aws.String("test-zone-1a"),
Expand Down
54 changes: 36 additions & 18 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a
if err != nil {
return nil, fmt.Errorf("creating fleet %w", err)
}
p.updateUnavailableOfferingsCache(ctx, createFleetOutput.Errors, capacityType)
instanceIds := combineFleetInstances(*createFleetOutput)
if len(instanceIds) == 0 {
return nil, combineFleetErrors(createFleetOutput.Errors)
Expand All @@ -155,7 +156,7 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a

func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constraints *v1alpha1.Constraints, instanceTypes []cloudprovider.InstanceType, capacityType string) ([]*ec2.FleetLaunchTemplateConfigRequest, error) {
// Get subnets given the constraints
subnets, err := p.subnetProvider.Get(ctx, constraints)
subnets, err := p.subnetProvider.Get(ctx, constraints.AWS)
if err != nil {
return nil, fmt.Errorf("getting subnets, %w", err)
}
Expand All @@ -166,7 +167,7 @@ func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constra
}
for launchTemplateName, instanceTypes := range launchTemplates {
launchTemplateConfigs = append(launchTemplateConfigs, &ec2.FleetLaunchTemplateConfigRequest{
Overrides: p.getOverrides(instanceTypes, subnets, capacityType),
Overrides: p.getOverrides(instanceTypes, subnets, constraints.Requirements.Zones(), capacityType),
LaunchTemplateSpecification: &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateName: aws.String(launchTemplateName),
Version: aws.String("$Default"),
Expand All @@ -176,30 +177,39 @@ func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constra
return launchTemplateConfigs, nil
}

func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.InstanceType, subnets []*ec2.Subnet, capacityType string) []*ec2.FleetLaunchTemplateOverridesRequest {
// getOverrides creates and returns launch template overrides for the cross product of instanceTypeOptions and subnets (with subnets being constrained by
// zones and the offerings in instanceTypeOptions)
func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.InstanceType, subnets []*ec2.Subnet, zones sets.String, capacityType string) []*ec2.FleetLaunchTemplateOverridesRequest {
var overrides []*ec2.FleetLaunchTemplateOverridesRequest
for i, instanceType := range instanceTypeOptions {
for _, offering := range instanceType.Offerings() {
// we can't assume that all zones will be available for all capacity types, hence this check
if offering.CapacityType != capacityType {
if capacityType != offering.CapacityType {
continue
}
if !zones.Has(offering.Zone) {
continue
}
for _, subnet := range subnets {
if aws.StringValue(subnet.AvailabilityZone) == offering.Zone {
override := &ec2.FleetLaunchTemplateOverridesRequest{
InstanceType: aws.String(instanceType.Name()),
SubnetId: subnet.SubnetId,
}
// Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy
// to reduce the likelihood of getting an excessively large instance type.
// instanceTypeOptions are sorted by vcpus and memory so this prioritizes smaller instance types.
if capacityType == v1alpha1.CapacityTypeSpot {
override.Priority = aws.Float64(float64(i))
}
overrides = append(overrides, override)
// FleetAPI cannot span subnets from the same AZ, so break after the first one.
break
if aws.StringValue(subnet.AvailabilityZone) != offering.Zone {
continue
}
override := &ec2.FleetLaunchTemplateOverridesRequest{
InstanceType: aws.String(instanceType.Name()),
SubnetId: subnet.SubnetId,
// This is technically redundant, but is useful if we have to parse insufficient capacity errors from
// CreateFleet so that we can figure out the zone rather than additional API calls to look up the subnet
AvailabilityZone: subnet.AvailabilityZone,
}
// Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy
// to reduce the likelihood of getting an excessively large instance type.
// instanceTypeOptions are sorted by vcpus and memory so this prioritizes smaller instance types.
if capacityType == v1alpha1.CapacityTypeSpot {
override.Priority = aws.Float64(float64(i))
}
overrides = append(overrides, override)
// FleetAPI cannot span subnets from the same AZ, so break after the first one.
break
}
}
}
Expand Down Expand Up @@ -267,6 +277,14 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes
return nil, fmt.Errorf("unrecognized instance type %s", aws.StringValue(instance.InstanceType))
}

func (p *InstanceProvider) updateUnavailableOfferingsCache(ctx context.Context, errors []*ec2.CreateFleetError, capacityType string) {
for _, err := range errors {
if InsufficientCapacityErrorCode == aws.StringValue(err.ErrorCode) {
p.instanceTypeProvider.CacheUnavailable(ctx, aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.InstanceType), aws.StringValue(err.LaunchTemplateAndOverrides.Overrides.AvailabilityZone), capacityType)
}
}
}

func getInstanceID(node *v1.Node) (*string, error) {
id := strings.Split(node.Spec.ProviderID, "/")
if len(id) < 5 {
Expand Down
Loading