Skip to content

Commit

Permalink
add region to instance types
Browse files Browse the repository at this point in the history
  • Loading branch information
bwagner5 committed Jan 27, 2022
1 parent e6f4d5c commit a4b4cfe
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 20 deletions.
8 changes: 4 additions & 4 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud
logging.FromContext(ctx).Debugf("Using AWS region %s", *sess.Config.Region)
ec2api := ec2.New(sess)
subnetProvider := NewSubnetProvider(ec2api)
instanceTypeProvider := NewInstanceTypeProvider(ec2api, subnetProvider)
instanceTypeProvider := NewInstanceTypeProvider(ec2api, *sess.Config.Region, subnetProvider)
return &CloudProvider{
instanceTypeProvider: instanceTypeProvider,
subnetProvider: subnetProvider,
instanceProvider: &InstanceProvider{ec2api, *sess.Config.Region, instanceTypeProvider, subnetProvider,
instanceProvider: &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider,
NewLaunchTemplateProvider(
ec2api,
NewAMIProvider(ssm.New(sess), options.ClientSet),
Expand Down Expand Up @@ -111,8 +111,8 @@ func withUserAgent(sess *session.Session) *session.Session {

// Create a node given the constraints.
func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error {
if constraints.Requirements.Regions().Len() != 0 && !constraints.Requirements.Regions().Has(c.instanceProvider.region) {
return fmt.Errorf("the current region \"%s\" is not in the region requirements %s", c.instanceProvider.region, constraints.Requirements.Regions().List())
if constraints.Requirements.Regions().Len() != 0 && !constraints.Requirements.Regions().Has(c.instanceTypeProvider.region) {
return fmt.Errorf("the current region \"%s\" is not in the region requirements %s", c.instanceTypeProvider.region, constraints.Requirements.Regions())
}
vendorConstraints, err := v1alpha1.Deserialize(constraints)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (

type InstanceProvider struct {
ec2api ec2iface.EC2API
region string
instanceTypeProvider *InstanceTypeProvider
subnetProvider *SubnetProvider
launchTemplateProvider *LaunchTemplateProvider
Expand Down Expand Up @@ -251,12 +250,19 @@ func (p *InstanceProvider) instanceToNode(ctx context.Context, instance *ec2.Ins
nodeName = aws.StringValue(instance.InstanceId)
}
zone := aws.StringValue(instance.Placement.AvailabilityZone)
region := ""
for _, offering := range instanceType.Offerings() {
if offering.Zone == zone {
region = offering.Region
break
}
}
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Labels: map[string]string{
v1.LabelTopologyZone: zone,
v1.LabelTopologyRegion: p.region,
v1.LabelTopologyRegion: region,
v1.LabelInstanceTypeStable: aws.StringValue(instance.InstanceType),
v1alpha5.LabelCapacityType: getCapacityType(instance),
},
Expand Down
19 changes: 11 additions & 8 deletions pkg/cloudprovider/aws/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ const (

type InstanceTypeProvider struct {
ec2api ec2iface.EC2API
region string
subnetProvider *SubnetProvider
// Has two entries: one for all the instance types and one for all zones; values cached *before* considering insufficient capacity errors
// from the unavailableOfferings cache
cache *cache.Cache
// key: <capacityType>:<instanceType>:<zone>, value: struct{}{}
// key: <capacityType>:<instanceType>:<region>:<zone>, value: struct{}{}
unavailableOfferings *cache.Cache
}

func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvider) *InstanceTypeProvider {
func NewInstanceTypeProvider(ec2api ec2iface.EC2API, region string, subnetProvider *SubnetProvider) *InstanceTypeProvider {
return &InstanceTypeProvider{
ec2api: ec2api,
region: region,
subnetProvider: subnetProvider,
cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval),
unavailableOfferings: cache.New(InsufficientCapacityErrorCacheTTL, InsufficientCapacityErrorCacheCleanupInterval),
Expand Down Expand Up @@ -101,8 +103,8 @@ func (p *InstanceTypeProvider) createOfferings(instanceType *InstanceType, subne
// while usage classes should be a distinct set, there's no guarantee of that
for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) {
// exclude any offerings that have recently seen an insufficient capacity error from EC2
if _, isUnavailable := p.unavailableOfferings.Get(UnavailableOfferingsCacheKey(capacityType, instanceType.Name(), zone)); !isUnavailable {
offerings = append(offerings, cloudprovider.Offering{Zone: zone, CapacityType: capacityType})
if _, isUnavailable := p.unavailableOfferings.Get(UnavailableOfferingsCacheKey(capacityType, instanceType.Name(), p.region, zone)); !isUnavailable {
offerings = append(offerings, cloudprovider.Offering{Region: p.region, Zone: zone, CapacityType: capacityType})
}
}
}
Expand Down Expand Up @@ -179,16 +181,17 @@ func (p *InstanceTypeProvider) filter(instanceType *ec2.InstanceTypeInfo) bool {
// CacheUnavailable allows the InstanceProvider to communicate recently observed temporary capacity shortages in
// the provided offerings
func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceType string, zone string, capacityType string) {
logging.FromContext(ctx).Debugf("%s for offering { instanceType: %s, zone: %s, capacityType: %s }, avoiding for %s",
logging.FromContext(ctx).Debugf("%s for offering { instanceType: %s, region: %s, zone: %s, capacityType: %s }, avoiding for %s",
InsufficientCapacityErrorCode,
instanceType,
p.region,
zone,
capacityType,
InsufficientCapacityErrorCacheTTL)
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), struct{}{})
p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, p.region, zone), struct{}{})
}

func UnavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string {
return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone)
func UnavailableOfferingsCacheKey(capacityType string, instanceType string, region string, zone string) string {
return fmt.Sprintf("%s:%s:%s:%s", capacityType, instanceType, region, zone)
}
10 changes: 6 additions & 4 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ var _ = BeforeSuite(func() {
subnetProvider := NewSubnetProvider(fakeEC2API)
instanceTypeProvider := &InstanceTypeProvider{
ec2api: fakeEC2API,
region: region,
subnetProvider: subnetProvider,
cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval),
unavailableOfferings: unavailableOfferingsCache,
Expand All @@ -99,7 +100,7 @@ var _ = BeforeSuite(func() {
subnetProvider: subnetProvider,
instanceTypeProvider: instanceTypeProvider,
instanceProvider: &InstanceProvider{
fakeEC2API, region, instanceTypeProvider, subnetProvider, &LaunchTemplateProvider{
fakeEC2API, instanceTypeProvider, subnetProvider, &LaunchTemplateProvider{
ec2api: fakeEC2API,
amiProvider: NewAMIProvider(&fake.SSMAPI{}, clientSet),
securityGroupProvider: securityGroupProvider,
Expand Down Expand Up @@ -334,7 +335,7 @@ var _ = Describe("Allocation", func() {
ExpectNotScheduled(ctx, env.Client, pod)
// capacity shortage is over - expire the item from the cache and try again
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{}
unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", "test-zone-1a"))
unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", region, "test-zone-1a"))
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge"))
Expand Down Expand Up @@ -732,12 +733,13 @@ var _ = Describe("Allocation", func() {
})
Context("Region", func() {
It("should launch capacity if region is allowed", func() {
provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{region}}}

provisioner.Spec.Requirements = v1alpha5.NewRequirements(v1.NodeSelectorRequirement{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{region}})
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
})
It("should not launch capacity if region is not allowed", func() {
provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{"bad-region"}}}
provisioner.Spec.Requirements = v1alpha5.NewRequirements(v1.NodeSelectorRequirement{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{"bad-region"}})
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectNotScheduled(ctx, env.Client, pod)
})
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ type InstanceType interface {
type Offering struct {
CapacityType string
Zone string
Region string
}
6 changes: 4 additions & 2 deletions pkg/controllers/provisioning/binpacking/packable.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,13 @@ func (p *Packable) validateOperatingSystems(constraints *v1alpha5.Constraints) e

func (p *Packable) validateOfferings(constraints *v1alpha5.Constraints) error {
for _, offering := range p.Offerings() {
if constraints.Requirements.CapacityTypes().Has(offering.CapacityType) && constraints.Requirements.Zones().Has(offering.Zone) {
if constraints.Requirements.CapacityTypes().Has(offering.CapacityType) && constraints.Requirements.Zones().Has(offering.Zone) &&
constraints.Requirements.Regions().Has(offering.Region) {
return nil
}
}
return fmt.Errorf("offerings %v are not available for capacity types %v and zones %v", p.Offerings(), constraints.Requirements.CapacityTypes().List(), constraints.Requirements.Zones().List())
return fmt.Errorf("offerings %v are not available for capacity types %v, zones %v, and regions %v",
p.Offerings(), constraints.Requirements.CapacityTypes().List(), constraints.Requirements.Zones().List(), constraints.Requirements.Regions().List())
}

func (p *Packable) validateGPUs(pods []*v1.Pod) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (c *Controller) List(ctx context.Context) []*Provisioner {
func requirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) {
supported := map[string]sets.String{
v1.LabelInstanceTypeStable: sets.NewString(),
v1.LabelTopologyRegion: sets.NewString(),
v1.LabelTopologyZone: sets.NewString(),
v1.LabelArchStable: sets.NewString(),
v1.LabelOSStable: sets.NewString(),
Expand All @@ -149,6 +150,7 @@ func requirements(instanceTypes []cloudprovider.InstanceType) (requirements v1al
for _, instanceType := range instanceTypes {
for _, offering := range instanceType.Offerings() {
supported[v1.LabelTopologyZone].Insert(offering.Zone)
supported[v1.LabelTopologyRegion].Insert(offering.Region)
supported[v1alpha5.LabelCapacityType].Insert(offering.CapacityType)
}
supported[v1.LabelInstanceTypeStable].Insert(instanceType.Name())
Expand Down

0 comments on commit a4b4cfe

Please sign in to comment.