Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support region wellknown label #1228

Merged
merged 9 commits into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions pkg/apis/provisioning/v1alpha5/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
// WellKnownLabels supported by karpenter
WellKnownLabels = sets.NewString(
v1.LabelTopologyZone,
v1.LabelTopologyRegion,
v1.LabelInstanceTypeStable,
v1.LabelArchStable,
v1.LabelOSStable,
Expand All @@ -63,10 +64,11 @@ var (
// however, Provisioner labels are still restricted to WellKnownLabels.
// Additional labels may be injected by cloud providers.
NormalizedLabels = map[string]string{
v1.LabelFailureDomainBetaZone: v1.LabelTopologyZone,
"beta.kubernetes.io/arch": v1.LabelArchStable,
"beta.kubernetes.io/os": v1.LabelOSStable,
v1.LabelInstanceType: v1.LabelInstanceTypeStable,
v1.LabelFailureDomainBetaZone: v1.LabelTopologyZone,
v1.LabelFailureDomainBetaRegion: v1.LabelTopologyRegion,
"beta.kubernetes.io/arch": v1.LabelArchStable,
"beta.kubernetes.io/os": v1.LabelOSStable,
v1.LabelInstanceType: v1.LabelInstanceTypeStable,
}
)

Expand All @@ -77,6 +79,10 @@ func (r Requirements) Zones() sets.String {
return r.Requirement(v1.LabelTopologyZone)
}

func (r Requirements) Regions() sets.String {
return r.Requirement(v1.LabelTopologyRegion)
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
}

func (r Requirements) InstanceTypes() sets.String {
return r.Requirement(v1.LabelInstanceTypeStable)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud
logging.FromContext(ctx).Debugf("Using AWS region %s", *sess.Config.Region)
ec2api := ec2.New(sess)
subnetProvider := NewSubnetProvider(ec2api)
instanceTypeProvider := NewInstanceTypeProvider(ec2api, subnetProvider)
instanceTypeProvider := NewInstanceTypeProvider(ec2api, *sess.Config.Region, subnetProvider)
return &CloudProvider{
instanceTypeProvider: instanceTypeProvider,
subnetProvider: subnetProvider,
Expand Down Expand Up @@ -111,6 +111,9 @@ func withUserAgent(sess *session.Session) *session.Session {

// Create a node given the constraints.
func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha5.Constraints, instanceTypes []cloudprovider.InstanceType, quantity int, callback func(*v1.Node) error) error {
if constraints.Requirements.Regions().Len() != 0 && !constraints.Requirements.Regions().Has(c.instanceTypeProvider.region) {
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("the current region \"%s\" is not in the region requirements %s", c.instanceTypeProvider.region, constraints.Requirements.Regions())
}
vendorConstraints, err := v1alpha1.Deserialize(constraints)
if err != nil {
return err
Expand Down
13 changes: 11 additions & 2 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,17 +249,26 @@ func (p *InstanceProvider) instanceToNode(ctx context.Context, instance *ec2.Ins
if injection.GetOptions(ctx).GetAWSNodeNameConvention() == options.ResourceName {
nodeName = aws.StringValue(instance.InstanceId)
}
zone := aws.StringValue(instance.Placement.AvailabilityZone)
region := ""
for _, offering := range instanceType.Offerings() {
if offering.Zone == zone {
region = offering.Region
break
}
}
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Labels: map[string]string{
v1.LabelTopologyZone: aws.StringValue(instance.Placement.AvailabilityZone),
v1.LabelTopologyZone: zone,
v1.LabelTopologyRegion: region,
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
v1.LabelInstanceTypeStable: aws.StringValue(instance.InstanceType),
v1alpha5.LabelCapacityType: getCapacityType(instance),
},
},
Spec: v1.NodeSpec{
ProviderID: fmt.Sprintf("aws:///%s/%s", aws.StringValue(instance.Placement.AvailabilityZone), aws.StringValue(instance.InstanceId)),
ProviderID: fmt.Sprintf("aws:///%s/%s", zone, aws.StringValue(instance.InstanceId)),
},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
Expand Down
22 changes: 13 additions & 9 deletions pkg/cloudprovider/aws/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package aws
import (
"context"
"fmt"
"time"

"github.com/aws/karpenter/pkg/utils/injection"
"knative.dev/pkg/ptr"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
Expand All @@ -42,17 +43,19 @@ const (

type InstanceTypeProvider struct {
ec2api ec2iface.EC2API
region string
subnetProvider *SubnetProvider
// Has two entries: one for all the instance types and one for all zones; values cached *before* considering insufficient capacity errors
// from the unavailableOfferings cache
cache *cache.Cache
// key: <capacityType>:<instanceType>:<zone>, value: struct{}{}
// key: <capacityType>:<instanceType>:<region>:<zone>, value: struct{}{}
unavailableOfferings *cache.Cache
}

func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvider) *InstanceTypeProvider {
func NewInstanceTypeProvider(ec2api ec2iface.EC2API, region string, subnetProvider *SubnetProvider) *InstanceTypeProvider {
return &InstanceTypeProvider{
ec2api: ec2api,
region: region,
subnetProvider: subnetProvider,
cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval),
unavailableOfferings: cache.New(InsufficientCapacityErrorCacheTTL, InsufficientCapacityErrorCacheCleanupInterval),
Expand Down Expand Up @@ -100,8 +103,8 @@ func (p *InstanceTypeProvider) createOfferings(instanceType *InstanceType, subne
// while usage classes should be a distinct set, there's no guarantee of that
for capacityType := range sets.NewString(aws.StringValueSlice(instanceType.SupportedUsageClasses)...) {
// exclude any offerings that have recently seen an insufficient capacity error from EC2
if _, isUnavailable := p.unavailableOfferings.Get(UnavailableOfferingsCacheKey(capacityType, instanceType.Name(), zone)); !isUnavailable {
offerings = append(offerings, cloudprovider.Offering{Zone: zone, CapacityType: capacityType})
if _, isUnavailable := p.unavailableOfferings.Get(UnavailableOfferingsCacheKey(capacityType, instanceType.Name(), p.region, zone)); !isUnavailable {
offerings = append(offerings, cloudprovider.Offering{Region: p.region, Zone: zone, CapacityType: capacityType})
}
}
}
Expand Down Expand Up @@ -178,16 +181,17 @@ func (p *InstanceTypeProvider) filter(instanceType *ec2.InstanceTypeInfo) bool {
// CacheUnavailable allows the InstanceProvider to communicate recently observed temporary capacity shortages in
// the provided offerings
func (p *InstanceTypeProvider) CacheUnavailable(ctx context.Context, instanceType string, zone string, capacityType string) {
logging.FromContext(ctx).Debugf("%s for offering { instanceType: %s, zone: %s, capacityType: %s }, avoiding for %s",
logging.FromContext(ctx).Debugf("%s for offering { instanceType: %s, region: %s, zone: %s, capacityType: %s }, avoiding for %s",
InsufficientCapacityErrorCode,
instanceType,
p.region,
zone,
capacityType,
InsufficientCapacityErrorCacheTTL)
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, zone), struct{}{})
p.unavailableOfferings.SetDefault(UnavailableOfferingsCacheKey(capacityType, instanceType, p.region, zone), struct{}{})
}

func UnavailableOfferingsCacheKey(capacityType string, instanceType string, zone string) string {
return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone)
func UnavailableOfferingsCacheKey(capacityType string, instanceType string, region string, zone string) string {
return fmt.Sprintf("%s:%s:%s:%s", capacityType, instanceType, region, zone)
}
18 changes: 17 additions & 1 deletion pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
var ctx context.Context
var opts options.Options
var env *test.Environment
var region string
var launchTemplateCache *cache.Cache
var securityGroupCache *cache.Cache
var unavailableOfferingsCache *cache.Cache
Expand All @@ -77,13 +78,15 @@ var _ = BeforeSuite(func() {
}
Expect(opts.Validate()).To(Succeed(), "Failed to validate options")
ctx = injection.WithOptions(ctx, opts)
region = "test-region-1"
launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval)
unavailableOfferingsCache = cache.New(InsufficientCapacityErrorCacheTTL, InsufficientCapacityErrorCacheCleanupInterval)
securityGroupCache = cache.New(CacheTTL, CacheCleanupInterval)
fakeEC2API = &fake.EC2API{}
subnetProvider := NewSubnetProvider(fakeEC2API)
instanceTypeProvider := &InstanceTypeProvider{
ec2api: fakeEC2API,
region: region,
subnetProvider: subnetProvider,
cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval),
unavailableOfferings: unavailableOfferingsCache,
Expand Down Expand Up @@ -332,7 +335,7 @@ var _ = Describe("Allocation", func() {
ExpectNotScheduled(ctx, env.Client, pod)
// capacity shortage is over - expire the item from the cache and try again
fakeEC2API.InsufficientCapacityPools = []fake.CapacityPool{}
unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", "test-zone-1a"))
unavailableOfferingsCache.Delete(UnavailableOfferingsCacheKey(v1alpha1.CapacityTypeOnDemand, "inf1.6xlarge", region, "test-zone-1a"))
pod = ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, pod)[0]
node := ExpectScheduled(ctx, env.Client, pod)
Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "inf1.6xlarge"))
Expand Down Expand Up @@ -728,6 +731,19 @@ var _ = Describe("Allocation", func() {
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
})
Context("Region", func() {
It("should launch capacity if region is allowed", func() {

provisioner.Spec.Requirements = v1alpha5.Requirements{v1.NodeSelectorRequirement{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{region}}}
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectScheduled(ctx, env.Client, pod)
})
It("should not launch capacity if region is not allowed", func() {
provisioner.Spec.Requirements = v1alpha5.Requirements{v1.NodeSelectorRequirement{Key: v1.LabelTopologyRegion, Operator: v1.NodeSelectorOpIn, Values: []string{"bad-region"}}}
pod := ExpectProvisioned(ctx, env.Client, selectionController, provisioners, provisioner, test.UnschedulablePod())[0]
ExpectNotScheduled(ctx, env.Client, pod)
})
})
})
})
})
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ type InstanceType interface {
type Offering struct {
CapacityType string
Zone string
Region string
}
6 changes: 4 additions & 2 deletions pkg/controllers/provisioning/binpacking/packable.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,13 @@ func (p *Packable) validateOperatingSystems(constraints *v1alpha5.Constraints) e

func (p *Packable) validateOfferings(constraints *v1alpha5.Constraints) error {
for _, offering := range p.Offerings() {
if constraints.Requirements.CapacityTypes().Has(offering.CapacityType) && constraints.Requirements.Zones().Has(offering.Zone) {
if constraints.Requirements.CapacityTypes().Has(offering.CapacityType) && constraints.Requirements.Zones().Has(offering.Zone) &&
constraints.Requirements.Regions().Has(offering.Region) {
return nil
}
}
return fmt.Errorf("offerings %v are not available for capacity types %v and zones %v", p.Offerings(), constraints.Requirements.CapacityTypes().List(), constraints.Requirements.Zones().List())
return fmt.Errorf("offerings %v are not available for capacity types %v, zones %v, and regions %v",
p.Offerings(), constraints.Requirements.CapacityTypes().List(), constraints.Requirements.Zones().List(), constraints.Requirements.Regions().List())
}

func (p *Packable) validateGPUs(pods []*v1.Pod) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (c *Controller) List(ctx context.Context) []*Provisioner {
func requirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) {
supported := map[string]sets.String{
v1.LabelInstanceTypeStable: sets.NewString(),
v1.LabelTopologyRegion: sets.NewString(),
v1.LabelTopologyZone: sets.NewString(),
v1.LabelArchStable: sets.NewString(),
v1.LabelOSStable: sets.NewString(),
Expand All @@ -149,6 +150,7 @@ func requirements(instanceTypes []cloudprovider.InstanceType) (requirements v1al
for _, instanceType := range instanceTypes {
for _, offering := range instanceType.Offerings() {
supported[v1.LabelTopologyZone].Insert(offering.Zone)
supported[v1.LabelTopologyRegion].Insert(offering.Region)
supported[v1alpha5.LabelCapacityType].Insert(offering.CapacityType)
}
supported[v1.LabelInstanceTypeStable].Insert(instanceType.Name())
Expand Down