Skip to content

Commit

Permalink
Implemented NodeAffinity
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Sep 24, 2021
1 parent bcc0bbe commit 28ab8de
Show file tree
Hide file tree
Showing 31 changed files with 911 additions and 391 deletions.
26 changes: 25 additions & 1 deletion pkg/apis/provisioning/v1alpha4/provisioner_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ package v1alpha4

import (
"context"
"fmt"

"github.com/awslabs/karpenter/pkg/scheduling"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
)

// SetDefaults for the provisioner
Expand All @@ -25,5 +30,24 @@ func (p *Provisioner) SetDefaults(ctx context.Context) {

// Default the constraints
func (c *Constraints) Default(ctx context.Context) {
DefaultingHook(ctx, c)
DefaultHook(ctx, c)
}

// Constrain applies the pods' scheduling constraints to the constraints.
// Returns an error if the constraints cannot be applied.
func (c *Constraints) Constrain(ctx context.Context, pods ...*v1.Pod) (errs error) {
nodeAffinity := scheduling.NodeAffinityFor(pods...)
for label, constraint := range map[string]*[]string{
v1.LabelTopologyZone: &c.Zones,
v1.LabelInstanceTypeStable: &c.InstanceTypes,
v1.LabelArchStable: &c.Architectures,
v1.LabelOSStable: &c.OperatingSystems,
} {
values := nodeAffinity.GetLabelValues(label, *constraint, WellKnownLabels[label])
if len(values) == 0 {
errs = multierr.Append(errs, fmt.Errorf("%s is too constrained", label))
}
*constraint = values
}
return multierr.Append(errs, ConstrainHook(ctx, c, pods...))
}
52 changes: 11 additions & 41 deletions pkg/apis/provisioning/v1alpha4/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func (c *Constraints) Validate(ctx context.Context) (errs *apis.FieldError) {
return errs.Also(
c.validateLabels(),
c.validateTaints(),
c.validateArchitecture(),
c.validateOperatingSystem(),
c.validateZones(),
c.validateInstanceTypes(),
ValidationHook(ctx, c),
ValidateWellKnown(v1.LabelTopologyZone, c.Zones, "zones"),
ValidateWellKnown(v1.LabelInstanceTypeStable, c.InstanceTypes, "instanceTypes"),
ValidateWellKnown(v1.LabelArchStable, c.Architectures, "architectures"),
ValidateWellKnown(v1.LabelOSStable, c.OperatingSystems, "operatingSystems"),
ValidateHook(ctx, c),
)
}

Expand Down Expand Up @@ -125,43 +125,13 @@ func (c *Constraints) validateTaints() (errs *apis.FieldError) {
return errs
}

func (c *Constraints) validateArchitecture() (errs *apis.FieldError) {
if c.Architectures == nil {
return nil
func ValidateWellKnown(key string, values []string, fieldName string) (errs *apis.FieldError) {
if values != nil && len(values) == 0 {
errs = errs.Also(apis.ErrMissingField(fieldName))
}
for _, architecture := range c.Architectures {
if !functional.ContainsString(SupportedArchitectures, architecture) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %v", architecture, SupportedArchitectures), "architecture"))
}
}
return errs
}

func (c *Constraints) validateOperatingSystem() (errs *apis.FieldError) {
if c.OperatingSystems == nil {
return nil
}
for _, operatingSystem := range c.OperatingSystems {
if !functional.ContainsString(SupportedOperatingSystems, operatingSystem) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %v", operatingSystem, SupportedOperatingSystems), "operatingSystem"))
}
}
return errs
}

func (c *Constraints) validateZones() (errs *apis.FieldError) {
for i, zone := range c.Zones {
if !functional.ContainsString(SupportedZones, zone) {
errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %v", zone, SupportedZones), "zones", i))
}
}
return errs
}

func (c *Constraints) validateInstanceTypes() (errs *apis.FieldError) {
for i, instanceType := range c.InstanceTypes {
if !functional.ContainsString(SupportedInstanceTypes, instanceType) {
errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %v", instanceType, SupportedInstanceTypes), "instanceTypes", i))
for i, value := range values {
if known := WellKnownLabels[key]; !functional.ContainsString(known, value) {
errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %v", value, known), fieldName, i))
}
}
return errs
Expand Down
28 changes: 16 additions & 12 deletions pkg/apis/provisioning/v1alpha4/provisioner_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ var _ = Describe("Validation", func() {
})
})
Context("Zones", func() {
SupportedZones = append(SupportedZones, "test-zone-1")
It("should succeed if unspecified", func() {
Expect(provisioner.Validate(ctx)).To(Succeed())
WellKnownLabels[v1.LabelTopologyZone] = append(WellKnownLabels[v1.LabelTopologyZone], "test-zone-1")
It("should fail if empty", func() {
provisioner.Spec.Zones = []string{}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should fail if not supported", func() {
provisioner.Spec.Zones = []string{"unknown"}
Expand All @@ -122,9 +123,10 @@ var _ = Describe("Validation", func() {
})

Context("InstanceTypes", func() {
SupportedInstanceTypes = append(SupportedInstanceTypes, "test-instance-type")
It("should succeed if unspecified", func() {
Expect(provisioner.Validate(ctx)).To(Succeed())
WellKnownLabels[v1.LabelInstanceTypeStable] = append(WellKnownLabels[v1.LabelInstanceTypeStable], "test-instance-type")
It("should fail if empty", func() {
provisioner.Spec.InstanceTypes = []string{}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should fail if not supported", func() {
provisioner.Spec.InstanceTypes = []string{"unknown"}
Expand All @@ -139,9 +141,10 @@ var _ = Describe("Validation", func() {
})

Context("Architecture", func() {
SupportedArchitectures = append(SupportedArchitectures, "test-architecture")
It("should succeed if unspecified", func() {
Expect(provisioner.Validate(ctx)).To(Succeed())
WellKnownLabels[v1.LabelArchStable] = append(WellKnownLabels[v1.LabelArchStable], "test-architecture")
It("should fail if empty", func() {
provisioner.Spec.Architectures = []string{}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should fail if not supported", func() {
provisioner.Spec.Architectures = []string{"unknown"}
Expand All @@ -154,9 +157,10 @@ var _ = Describe("Validation", func() {
})

Context("OperatingSystem", func() {
SupportedOperatingSystems = append(SupportedOperatingSystems, "test-operating-system")
It("should succeed if unspecified", func() {
Expect(provisioner.Validate(ctx)).To(Succeed())
WellKnownLabels[v1.LabelOSStable] = append(WellKnownLabels[v1.LabelOSStable], "test-operating-system")
It("should fail if empty", func() {
provisioner.Spec.OperatingSystems = []string{}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should fail if not supported", func() {
provisioner.Spec.OperatingSystems = []string{"unknown"}
Expand Down
17 changes: 10 additions & 7 deletions pkg/apis/provisioning/v1alpha4/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ var (
v1.LabelTopologyZone,
v1.LabelInstanceTypeStable,
// Used internally by provisioning logic
ProvisionerNameLabelKey,
EmptinessTimestampAnnotationKey,
v1.LabelHostname,
}
SupportedArchitectures = []string{}
SupportedOperatingSystems = []string{}
SupportedZones = []string{}
SupportedInstanceTypes = []string{}
ValidationHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil }
DefaultingHook = func(ctx context.Context, constraints *Constraints) {}
// WellKnownLabels supported by karpenter and their allowable values
WellKnownLabels = map[string][]string{
v1.LabelArchStable: {},
v1.LabelOSStable: {},
v1.LabelTopologyZone: {},
v1.LabelInstanceTypeStable: {},
}
DefaultHook = func(ctx context.Context, constraints *Constraints) {}
ValidateHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil }
ConstrainHook = func(ctx context.Context, constraints *Constraints, pods ...*v1.Pod) error { return nil }
)

var (
Expand Down
11 changes: 4 additions & 7 deletions pkg/cloudprovider/aws/ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4"
"github.com/awslabs/karpenter/pkg/cloudprovider"
v1alpha1 "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/awslabs/karpenter/pkg/utils/functional"
"github.com/patrickmn/go-cache"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -52,17 +53,13 @@ func (p *AMIProvider) getSSMParameter(ctx context.Context, constraints *v1alpha1
return "", fmt.Errorf("kube server version, %w", err)
}
var amiNameSuffix string
if len(constraints.Architectures) > 0 {
// select the first one if multiple supported
if constraints.Architectures[0] == v1alpha4.ArchitectureArm64 {
amiNameSuffix = "-arm64"
}
}
if needsGPUAmi(instanceTypes) {
if amiNameSuffix != "" {
if !functional.ContainsString(constraints.Architectures, v1alpha4.ArchitectureAmd64) {
return "", fmt.Errorf("no amazon-linux-2 ami available for both nvidia/neuron gpus and arm64 cpus")
}
amiNameSuffix = "-gpu"
} else if functional.ContainsString(constraints.Architectures, v1alpha4.ArchitectureArm64) {
amiNameSuffix = "-arm64"
}
return fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2%s/recommended/image_id", version, amiNameSuffix), nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/apis/v1alpha1/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type AWS struct {
// CapacityType for the node. If not specified, defaults to on-demand.
// May be overriden by pods.spec.nodeSelector["node.k8s.aws/capacityType"]
// +optional
CapacityType *string `json:"capacityType,omitempty"`
CapacityTypes []string `json:"capacityTypes,omitempty"`
// LaunchTemplate for the node. If not specified, a launch template will be generated.
// +optional
LaunchTemplate *string `json:"launchTemplate,omitempty"`
Expand Down
29 changes: 20 additions & 9 deletions pkg/cloudprovider/aws/apis/v1alpha1/constraints_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@ import (
"context"
"fmt"

"knative.dev/pkg/ptr"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4"
"github.com/awslabs/karpenter/pkg/scheduling"
v1 "k8s.io/api/core/v1"
)

var ClusterDiscoveryTagKeyFormat = "kubernetes.io/cluster/%s"

// Default the constraints
// Default the constraints.
func (c *Constraints) Default(ctx context.Context) {
c.defaultCapacityType(ctx)
c.defaultCapacityTypes(ctx)
c.defaultSubnets(ctx)
c.defaultSecurityGroups(ctx)
}

func (c *Constraints) defaultCapacityType(ctx context.Context) {
if capacityType, ok := c.Labels[CapacityTypeLabel]; ok {
c.CapacityType = &capacityType
}
if c.CapacityType != nil {
func (c *Constraints) defaultCapacityTypes(ctx context.Context) {
if len(c.CapacityTypes) != 0 {
return
}
c.CapacityType = ptr.String(CapacityTypeOnDemand)
c.CapacityTypes = []string{CapacityTypeOnDemand}
}

func (c *Constraints) defaultSubnets(ctx context.Context) {
Expand All @@ -53,3 +52,15 @@ func (c *Constraints) defaultSecurityGroups(ctx context.Context) {
}
c.SecurityGroupsSelector = map[string]string{fmt.Sprintf(ClusterDiscoveryTagKeyFormat, c.Cluster.Name): ""}
}

// Constrain applies the pod's scheduling constraints to the constraints.
// Returns an error if the constraints cannot be applied.
func (c *Constraints) Constrain(ctx context.Context, pods ...*v1.Pod) error {
nodeAffinity := scheduling.NodeAffinityFor(pods...)
capacityTypes := nodeAffinity.GetLabelValues(CapacityTypeLabel, c.CapacityTypes, v1alpha4.WellKnownLabels[CapacityTypeLabel])
if len(capacityTypes) == 0 {
return fmt.Errorf("no valid capacity types")
}
c.CapacityTypes = capacityTypes
return nil
}
41 changes: 18 additions & 23 deletions pkg/cloudprovider/aws/apis/v1alpha1/constraints_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,50 @@ import (
"fmt"
"net/url"

"github.com/aws/aws-sdk-go/aws"
"github.com/awslabs/karpenter/pkg/utils/functional"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4"
"knative.dev/pkg/apis"
)

func (c *Constraints) Validate(ctx context.Context) (errs *apis.FieldError) {
return c.AWS.validate(ctx).ViaField("provider")
return c.validate(ctx).ViaField("provider")
}

func (a *AWS) validate(ctx context.Context) (errs *apis.FieldError) {
func (c *Constraints) validate(ctx context.Context) (errs *apis.FieldError) {
return errs.Also(
a.validateInstanceProfile(ctx),
a.validateCapacityType(ctx),
a.validateLaunchTemplate(ctx),
a.validateSubnets(ctx),
a.validateSecurityGroups(ctx),
a.Cluster.Validate(ctx).ViaField("cluster"),
c.validateInstanceProfile(ctx),
c.validateCapacityType(ctx),
c.validateLaunchTemplate(ctx),
c.validateSubnets(ctx),
c.validateSecurityGroups(ctx),
c.Cluster.Validate(ctx).ViaField("cluster"),
)
}

func (a *AWS) validateCapacityType(ctx context.Context) (errs *apis.FieldError) {
capacityTypes := []string{CapacityTypeSpot, CapacityTypeOnDemand}
if !functional.ContainsString(capacityTypes, aws.StringValue(a.CapacityType)) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %v", aws.StringValue(a.CapacityType), capacityTypes), "capacityType"))
}
return errs
func (c *Constraints) validateCapacityType(ctx context.Context) (errs *apis.FieldError) {
return v1alpha4.ValidateWellKnown(CapacityTypeLabel, c.CapacityTypes, "capacityTypes")
}

func (a *AWS) validateInstanceProfile(ctx context.Context) (errs *apis.FieldError) {
if a.InstanceProfile == "" {
func (c *Constraints) validateInstanceProfile(ctx context.Context) (errs *apis.FieldError) {
if c.InstanceProfile == "" {
errs = errs.Also(apis.ErrMissingField("instanceProfile"))
}
return errs
}

func (a *AWS) validateLaunchTemplate(ctx context.Context) (errs *apis.FieldError) {
func (c *Constraints) validateLaunchTemplate(ctx context.Context) (errs *apis.FieldError) {
// nothing to validate at the moment
return errs
}

func (a *AWS) validateSubnets(ctx context.Context) (errs *apis.FieldError) {
if a.SubnetSelector == nil {
func (c *Constraints) validateSubnets(ctx context.Context) (errs *apis.FieldError) {
if c.SubnetSelector == nil {
errs = errs.Also(apis.ErrMissingField("subnetSelector"))
}
return errs
}

func (a *AWS) validateSecurityGroups(ctx context.Context) (errs *apis.FieldError) {
if a.SecurityGroupsSelector == nil {
func (c *Constraints) validateSecurityGroups(ctx context.Context) (errs *apis.FieldError) {
if c.SecurityGroupsSelector == nil {
errs = errs.Also(apis.ErrMissingField("securityGroupSelector"))
}
return errs
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/aws/apis/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ var (
func init() {
Scheme.AddKnownTypes(schema.GroupVersion{Group: v1alpha4.ExtensionsGroup, Version: "v1alpha1"}, &AWS{})
v1alpha4.RestrictedLabels = append(v1alpha4.RestrictedLabels, AWSLabelPrefix)
v1alpha4.WellKnownLabels[CapacityTypeLabel] = []string{CapacityTypeSpot, CapacityTypeOnDemand}
}
8 changes: 4 additions & 4 deletions pkg/cloudprovider/aws/apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 28ab8de

Please sign in to comment.