From a4b4cfe08ec18de3dc6da7fde5f6273595890128 Mon Sep 17 00:00:00 2001 From: Brandon Date: Thu, 27 Jan 2022 17:18:47 -0600 Subject: [PATCH] add region to instance types --- pkg/cloudprovider/aws/cloudprovider.go | 8 ++++---- pkg/cloudprovider/aws/instance.go | 10 ++++++++-- pkg/cloudprovider/aws/instancetypes.go | 19 +++++++++++-------- pkg/cloudprovider/aws/suite_test.go | 10 ++++++---- pkg/cloudprovider/types.go | 1 + .../provisioning/binpacking/packable.go | 6 ++++-- pkg/controllers/provisioning/controller.go | 2 ++ 7 files changed, 36 insertions(+), 20 deletions(-) diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index bd32b052b4b1..26df8e69bb17 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -79,11 +79,11 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud logging.FromContext(ctx).Debugf("Using AWS region %s", *sess.Config.Region) ec2api := ec2.New(sess) subnetProvider := NewSubnetProvider(ec2api) - instanceTypeProvider := NewInstanceTypeProvider(ec2api, subnetProvider) + instanceTypeProvider := NewInstanceTypeProvider(ec2api, *sess.Config.Region, subnetProvider) return &CloudProvider{ instanceTypeProvider: instanceTypeProvider, subnetProvider: subnetProvider, - instanceProvider: &InstanceProvider{ec2api, *sess.Config.Region, instanceTypeProvider, subnetProvider, + instanceProvider: &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider, NewLaunchTemplateProvider( ec2api, NewAMIProvider(ssm.New(sess), options.ClientSet), @@ -111,8 +111,8 @@ func withUserAgent(sess *session.Session) *session.Session { // Create a node given the constraints. func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error { - if constraints.Requirements.Regions().Len() != 0 && !constraints.Requirements.Regions().Has(c.instanceProvider.region) { - return fmt.Errorf("the current region \"%s\" is not in the region requirements %s", c.instanceProvider.region, constraints.Requirements.Regions().List()) + if constraints.Requirements.Regions().Len() != 0 && !constraints.Requirements.Regions().Has(c.instanceTypeProvider.region) { + return fmt.Errorf("the current region \"%s\" is not in the region requirements %s", c.instanceTypeProvider.region, constraints.Requirements.Regions()) } vendorConstraints, err := v1alpha1.Deserialize(constraints) if err != nil { diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 89468f407fd2..ba90fcc940e8 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -39,7 +39,6 @@ import ( type InstanceProvider struct { ec2api ec2iface.EC2API - region string instanceTypeProvider *InstanceTypeProvider subnetProvider *SubnetProvider launchTemplateProvider *LaunchTemplateProvider @@ -251,12 +250,19 @@ func (p *InstanceProvider) instanceToNode(ctx context.Context, instance *ec2.Ins nodeName = aws.StringValue(instance.InstanceId) } zone := aws.StringValue(instance.Placement.AvailabilityZone) + region := "" + for _, offering := range instanceType.Offerings() { + if offering.Zone == zone { + region = offering.Region + break + } + } return &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeName, Labels: map[string]string{ v1.LabelTopologyZone: zone, - v1.LabelTopologyRegion: p.region, + v1.LabelTopologyRegion: region, v1.LabelInstanceTypeStable: aws.StringValue(instance.InstanceType), v1alpha5.LabelCapacityType: getCapacityType(instance), }, diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index 05a0c534f359..0e99e8972308 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -43,17 +43,19 @@ const ( type InstanceTypeProvider struct { ec2api ec2iface.EC2API + region string subnetProvider *SubnetProvider // Has two entries: one for all the instance types and one for all zones; values cached *before* considering insufficient capacity errors // from the unavailableOfferings cache cache *cache.Cache - // key: ::, value: struct{}{} + // key: :::, value: struct{}{} unavailableOfferings *cache.Cache } -func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvider) *InstanceTypeProvider { +func NewInstanceTypeProvider(ec2api ec2iface.EC2API, region string, subnetProvider *SubnetProvider) *InstanceTypeProvider { return &InstanceTypeProvider{ ec2api: ec2api, + region: region, subnetProvider: subnetProvider, cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval), unavailableOfferings: cache.New(InsufficientCapacityErrorCacheTTL, InsufficientCapacityErrorCacheCleanupInterval), @@ -101,8 +103,8 @@ func (p *InstanceTypeProvider) createOfferings(instanceType *InstanceType, subne // while usage classes should be a distinct set, there's no guarantee of that for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) { // exclude any offerings that have recently seen an insufficient capacity error from EC2 - if _, isUnavailable := p.unavailableOfferings.Get(UnavailableOfferingsCacheKey(capacityType, instanceType.Name(), zone)); !isUnavailable { - offerings = append(offerings, cloudprovider.Offering{Zone: zone, CapacityType: capacityType}) + if _, isUnavailable := p.unavailableOfferings.Get(UnavailableOfferingsCacheKey(capacityType, instanceType.Name(), p.region, zone)); !isUnavailable { + offerings = append(offerings, cloudprovider.Offering{Region: p.region, Zone: zone, CapacityType: capacityType}) } } } @@ -179,16 +181,17 @@ func (p *InstanceTypeProvider) filter(instanceType *ec2.InstanceTypeInfo) bool { // CacheUnavailable allows the InstanceProvider to communicate recently observed temporary capacity shortages in // the provided offerings func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceType string, zone string, capacityType string) { - logging.FromContext(ctx).Debugf("%s for offering { instanceType: %s, zone: %s, capacityType: %s }, avoiding for %s", + logging.FromContext(ctx).Debugf("%s for offering { instanceType: %s, region: %s, zone: %s, capacityType: %s }, avoiding for %s", InsufficientCapacityErrorCode, instanceType, + p.region, zone, capacityType, InsufficientCapacityErrorCacheTTL) // even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL - p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), struct{}{}) + p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, p.region, zone), struct{}{}) } -func UnavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string { - return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone) +func UnavailableOfferingsCacheKey(capacityType string, instanceType string, region string, zone string) string { + return fmt.Sprintf("%s:%s:%s:%s", capacityType, instanceType, region, zone) } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index e0cc305eaf4b..3153d2c02a0f 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -86,6 +86,7 @@ var _ = BeforeSuite(func() { subnetProvider := NewSubnetProvider(fakeEC2API) instanceTypeProvider := &InstanceTypeProvider{ ec2api: fakeEC2API, + region: region, subnetProvider: subnetProvider, cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval), unavailableOfferings: unavailableOfferingsCache, @@ -99,7 +100,7 @@ var _ = BeforeSuite(func() { subnetProvider: subnetProvider, instanceTypeProvider: instanceTypeProvider, instanceProvider: &InstanceProvider{ - fakeEC2API, region, instanceTypeProvider, subnetProvider, &LaunchTemplateProvider{ + fakeEC2API, instanceTypeProvider, subnetProvider, &LaunchTemplateProvider{ ec2api: fakeEC2API, amiProvider: NewAMIProvider(&fake.SSMAPI{}, clientSet), securityGroupProvider: securityGroupProvider, @@ -334,7 +335,7 @@ var _ = Describe("Allocation", func() { ExpectNotScheduled(ctx, env.Client, pod) // capacity shortage is over - expire the item from the cache and try again fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{} - unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", "test-zone-1a")) + unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", region, "test-zone-1a")) pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0] node := ExpectScheduled(ctx, env.Client, pod) Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge")) @@ -732,12 +733,13 @@ var _ = Describe("Allocation", func() { }) Context("Region", func() { It("should launch capacity if region is allowed", func() { - provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{region}}} + + provisioner.Spec.Requirements = v1alpha5.NewRequirements(v1.NodeSelectorRequirement{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{region}}) pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0] ExpectScheduled(ctx, env.Client, pod) }) It("should not launch capacity if region is not allowed", func() { - provisioner.Spec.Requirements = v1alpha5.Requirements{{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{"bad-region"}}} + provisioner.Spec.Requirements = v1alpha5.NewRequirements(v1.NodeSelectorRequirement{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{"bad-region"}}) pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0] ExpectNotScheduled(ctx, env.Client, pod) }) diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index af23ea61714c..1b6d8ec63918 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -73,4 +73,5 @@ type InstanceType interface { type Offering struct { CapacityType string Zone string + Region string } diff --git a/pkg/controllers/provisioning/binpacking/packable.go b/pkg/controllers/provisioning/binpacking/packable.go index cf8a7f9b2510..dd7f0c8a2368 100644 --- a/pkg/controllers/provisioning/binpacking/packable.go +++ b/pkg/controllers/provisioning/binpacking/packable.go @@ -195,11 +195,13 @@ func (p *Packable) validateOperatingSystems(constraints *v1alpha5.Constraints) e func (p *Packable) validateOfferings(constraints *v1alpha5.Constraints) error { for _, offering := range p.Offerings() { - if constraints.Requirements.CapacityTypes().Has(offering.CapacityType) && constraints.Requirements.Zones().Has(offering.Zone) { + if constraints.Requirements.CapacityTypes().Has(offering.CapacityType) && constraints.Requirements.Zones().Has(offering.Zone) && + constraints.Requirements.Regions().Has(offering.Region) { return nil } } - return fmt.Errorf("offerings %v are not available for capacity types %v and zones %v", p.Offerings(), constraints.Requirements.CapacityTypes().List(), constraints.Requirements.Zones().List()) + return fmt.Errorf("offerings %v are not available for capacity types %v, zones %v, and regions %v", + p.Offerings(), constraints.Requirements.CapacityTypes().List(), constraints.Requirements.Zones().List(), constraints.Requirements.Regions().List()) } func (p *Packable) validateGPUs(pods []*v1.Pod) error { diff --git a/pkg/controllers/provisioning/controller.go b/pkg/controllers/provisioning/controller.go index c7d3e77358bd..f5c1edab2f92 100644 --- a/pkg/controllers/provisioning/controller.go +++ b/pkg/controllers/provisioning/controller.go @@ -141,6 +141,7 @@ func (c *Controller) List(ctx context.Context) []*Provisioner { func requirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) { supported := map[string]sets.String{ v1.LabelInstanceTypeStable: sets.NewString(), + v1.LabelTopologyRegion: sets.NewString(), v1.LabelTopologyZone: sets.NewString(), v1.LabelArchStable: sets.NewString(), v1.LabelOSStable: sets.NewString(), @@ -149,6 +150,7 @@ func requirements(instanceTypes []cloudprovider.InstanceType) (requirements v1al for _, instanceType := range instanceTypes { for _, offering := range instanceType.Offerings() { supported[v1.LabelTopologyZone].Insert(offering.Zone) + supported[v1.LabelTopologyRegion].Insert(offering.Region) supported[v1alpha5.LabelCapacityType].Insert(offering.CapacityType) } supported[v1.LabelInstanceTypeStable].Insert(instanceType.Name())