Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented NodeAffinity #695

Merged
merged 1 commit into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion pkg/apis/provisioning/v1alpha4/provisioner_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ package v1alpha4

import (
"context"
"fmt"

"github.com/awslabs/karpenter/pkg/scheduling"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
)

// SetDefaults for the provisioner
Expand All @@ -25,5 +30,24 @@ func (p *Provisioner) SetDefaults(ctx context.Context) {

// Default the constraints
func (c *Constraints) Default(ctx context.Context) {
DefaultingHook(ctx, c)
DefaultHook(ctx, c)
}

// Constrain applies the pods' scheduling constraints to the constraints.
// Returns an error if the constraints cannot be applied.
func (c *Constraints) Constrain(ctx context.Context, pods ...*v1.Pod) (errs error) {
nodeAffinity := scheduling.NodeAffinityFor(pods...)
for label, constraint := range map[string]*[]string{
v1.LabelTopologyZone: &c.Zones,
v1.LabelInstanceTypeStable: &c.InstanceTypes,
v1.LabelArchStable: &c.Architectures,
v1.LabelOSStable: &c.OperatingSystems,
} {
values := nodeAffinity.GetLabelValues(label, *constraint, WellKnownLabels[label])
if len(values) == 0 {
errs = multierr.Append(errs, fmt.Errorf("label %s is too constrained", label))
}
*constraint = values
}
return multierr.Append(errs, ConstrainHook(ctx, c, pods...))
}
52 changes: 11 additions & 41 deletions pkg/apis/provisioning/v1alpha4/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func (c *Constraints) Validate(ctx context.Context) (errs *apis.FieldError) {
return errs.Also(
c.validateLabels(),
c.validateTaints(),
c.validateArchitecture(),
c.validateOperatingSystem(),
c.validateZones(),
c.validateInstanceTypes(),
ValidationHook(ctx, c),
ValidateWellKnown(v1.LabelTopologyZone, c.Zones, "zones"),
ValidateWellKnown(v1.LabelInstanceTypeStable, c.InstanceTypes, "instanceTypes"),
ValidateWellKnown(v1.LabelArchStable, c.Architectures, "architectures"),
ValidateWellKnown(v1.LabelOSStable, c.OperatingSystems, "operatingSystems"),
ValidateHook(ctx, c),
)
}

Expand Down Expand Up @@ -125,43 +125,13 @@ func (c *Constraints) validateTaints() (errs *apis.FieldError) {
return errs
}

func (c *Constraints) validateArchitecture() (errs *apis.FieldError) {
if c.Architectures == nil {
return nil
func ValidateWellKnown(key string, values []string, fieldName string) (errs *apis.FieldError) {
if values != nil && len(values) == 0 {
errs = errs.Also(apis.ErrMissingField(fieldName))
}
for _, architecture := range c.Architectures {
if !functional.ContainsString(SupportedArchitectures, architecture) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %v", architecture, SupportedArchitectures), "architecture"))
}
}
return errs
}

func (c *Constraints) validateOperatingSystem() (errs *apis.FieldError) {
if c.OperatingSystems == nil {
return nil
}
for _, operatingSystem := range c.OperatingSystems {
if !functional.ContainsString(SupportedOperatingSystems, operatingSystem) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %v", operatingSystem, SupportedOperatingSystems), "operatingSystem"))
}
}
return errs
}

func (c *Constraints) validateZones() (errs *apis.FieldError) {
for i, zone := range c.Zones {
if !functional.ContainsString(SupportedZones, zone) {
errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %v", zone, SupportedZones), "zones", i))
}
}
return errs
}

func (c *Constraints) validateInstanceTypes() (errs *apis.FieldError) {
for i, instanceType := range c.InstanceTypes {
if !functional.ContainsString(SupportedInstanceTypes, instanceType) {
errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %v", instanceType, SupportedInstanceTypes), "instanceTypes", i))
for i, value := range values {
if known := WellKnownLabels[key]; !functional.ContainsString(known, value) {
errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %v", value, known), fieldName, i))
}
}
return errs
Expand Down
28 changes: 16 additions & 12 deletions pkg/apis/provisioning/v1alpha4/provisioner_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ var _ = Describe("Validation", func() {
})
})
Context("Zones", func() {
SupportedZones = append(SupportedZones, "test-zone-1")
It("should succeed if unspecified", func() {
Expect(provisioner.Validate(ctx)).To(Succeed())
WellKnownLabels[v1.LabelTopologyZone] = append(WellKnownLabels[v1.LabelTopologyZone], "test-zone-1")
It("should fail if empty", func() {
provisioner.Spec.Zones = []string{}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should fail if not supported", func() {
provisioner.Spec.Zones = []string{"unknown"}
Expand All @@ -122,9 +123,10 @@ var _ = Describe("Validation", func() {
})

Context("InstanceTypes", func() {
SupportedInstanceTypes = append(SupportedInstanceTypes, "test-instance-type")
It("should succeed if unspecified", func() {
Expect(provisioner.Validate(ctx)).To(Succeed())
WellKnownLabels[v1.LabelInstanceTypeStable] = append(WellKnownLabels[v1.LabelInstanceTypeStable], "test-instance-type")
It("should fail if empty", func() {
provisioner.Spec.InstanceTypes = []string{}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should fail if not supported", func() {
provisioner.Spec.InstanceTypes = []string{"unknown"}
Expand All @@ -139,9 +141,10 @@ var _ = Describe("Validation", func() {
})

Context("Architecture", func() {
SupportedArchitectures = append(SupportedArchitectures, "test-architecture")
It("should succeed if unspecified", func() {
Expect(provisioner.Validate(ctx)).To(Succeed())
WellKnownLabels[v1.LabelArchStable] = append(WellKnownLabels[v1.LabelArchStable], "test-architecture")
It("should fail if empty", func() {
provisioner.Spec.Architectures = []string{}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should fail if not supported", func() {
provisioner.Spec.Architectures = []string{"unknown"}
Expand All @@ -154,9 +157,10 @@ var _ = Describe("Validation", func() {
})

Context("OperatingSystem", func() {
SupportedOperatingSystems = append(SupportedOperatingSystems, "test-operating-system")
It("should succeed if unspecified", func() {
Expect(provisioner.Validate(ctx)).To(Succeed())
WellKnownLabels[v1.LabelOSStable] = append(WellKnownLabels[v1.LabelOSStable], "test-operating-system")
It("should fail if empty", func() {
provisioner.Spec.OperatingSystems = []string{}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should fail if not supported", func() {
provisioner.Spec.OperatingSystems = []string{"unknown"}
Expand Down
17 changes: 10 additions & 7 deletions pkg/apis/provisioning/v1alpha4/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ var (
v1.LabelTopologyZone,
v1.LabelInstanceTypeStable,
// Used internally by provisioning logic
ProvisionerNameLabelKey,
EmptinessTimestampAnnotationKey,
v1.LabelHostname,
}
SupportedArchitectures = []string{}
SupportedOperatingSystems = []string{}
SupportedZones = []string{}
SupportedInstanceTypes = []string{}
ValidationHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil }
DefaultingHook = func(ctx context.Context, constraints *Constraints) {}
// WellKnownLabels supported by karpenter and their allowable values
WellKnownLabels = map[string][]string{
v1.LabelArchStable: {},
v1.LabelOSStable: {},
v1.LabelTopologyZone: {},
v1.LabelInstanceTypeStable: {},
}
DefaultHook = func(ctx context.Context, constraints *Constraints) {}
bwagner5 marked this conversation as resolved.
Show resolved Hide resolved
ValidateHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil }
ConstrainHook = func(ctx context.Context, constraints *Constraints, pods ...*v1.Pod) error { return nil }
)

var (
Expand Down
11 changes: 4 additions & 7 deletions pkg/cloudprovider/aws/ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4"
"github.com/awslabs/karpenter/pkg/cloudprovider"
v1alpha1 "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/awslabs/karpenter/pkg/utils/functional"
"github.com/patrickmn/go-cache"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -52,17 +53,13 @@ func (p *AMIProvider) getSSMParameter(ctx context.Context, constraints *v1alpha1
return "", fmt.Errorf("kube server version, %w", err)
}
var amiNameSuffix string
if len(constraints.Architectures) > 0 {
// select the first one if multiple supported
if constraints.Architectures[0] == v1alpha4.ArchitectureArm64 {
amiNameSuffix = "-arm64"
}
}
if needsGPUAmi(instanceTypes) {
if amiNameSuffix != "" {
if !functional.ContainsString(constraints.Architectures, v1alpha4.ArchitectureAmd64) {
return "", fmt.Errorf("no amazon-linux-2 ami available for both nvidia/neuron gpus and arm64 cpus")
}
amiNameSuffix = "-gpu"
} else if functional.ContainsString(constraints.Architectures, v1alpha4.ArchitectureArm64) {
amiNameSuffix = "-arm64"
}
return fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2%s/recommended/image_id", version, amiNameSuffix), nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/aws/apis/v1alpha1/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type AWS struct {
// CapacityType for the node. If not specified, defaults to on-demand.
// May be overriden by pods.spec.nodeSelector["node.k8s.aws/capacityType"]
// +optional
CapacityType *string `json:"capacityType,omitempty"`
CapacityTypes []string `json:"capacityTypes,omitempty"`
// LaunchTemplate for the node. If not specified, a launch template will be generated.
// +optional
LaunchTemplate *string `json:"launchTemplate,omitempty"`
Expand Down
29 changes: 20 additions & 9 deletions pkg/cloudprovider/aws/apis/v1alpha1/constraints_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@ import (
"context"
"fmt"

"knative.dev/pkg/ptr"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4"
"github.com/awslabs/karpenter/pkg/scheduling"
v1 "k8s.io/api/core/v1"
)

var ClusterDiscoveryTagKeyFormat = "kubernetes.io/cluster/%s"

// Default the constraints
// Default the constraints.
func (c *Constraints) Default(ctx context.Context) {
c.defaultCapacityType(ctx)
c.defaultCapacityTypes(ctx)
c.defaultSubnets(ctx)
c.defaultSecurityGroups(ctx)
}

func (c *Constraints) defaultCapacityType(ctx context.Context) {
if capacityType, ok := c.Labels[CapacityTypeLabel]; ok {
c.CapacityType = &capacityType
}
if c.CapacityType != nil {
func (c *Constraints) defaultCapacityTypes(ctx context.Context) {
if len(c.CapacityTypes) != 0 {
return
}
c.CapacityType = ptr.String(CapacityTypeOnDemand)
c.CapacityTypes = []string{CapacityTypeOnDemand}
}

func (c *Constraints) defaultSubnets(ctx context.Context) {
Expand All @@ -53,3 +52,15 @@ func (c *Constraints) defaultSecurityGroups(ctx context.Context) {
}
c.SecurityGroupsSelector = map[string]string{fmt.Sprintf(ClusterDiscoveryTagKeyFormat, c.Cluster.Name): ""}
}

// Constrain applies the pod's scheduling constraints to the constraints.
// Returns an error if the constraints cannot be applied.
func (c *Constraints) Constrain(ctx context.Context, pods ...*v1.Pod) error {
nodeAffinity := scheduling.NodeAffinityFor(pods...)
capacityTypes := nodeAffinity.GetLabelValues(CapacityTypeLabel, c.CapacityTypes, v1alpha4.WellKnownLabels[CapacityTypeLabel])
if len(capacityTypes) == 0 {
return fmt.Errorf("no valid capacity types")
}
c.CapacityTypes = capacityTypes
return nil
}
41 changes: 18 additions & 23 deletions pkg/cloudprovider/aws/apis/v1alpha1/constraints_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,50 @@ import (
"fmt"
"net/url"

"github.com/aws/aws-sdk-go/aws"
"github.com/awslabs/karpenter/pkg/utils/functional"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4"
"knative.dev/pkg/apis"
)

func (c *Constraints) Validate(ctx context.Context) (errs *apis.FieldError) {
return c.AWS.validate(ctx).ViaField("provider")
return c.validate(ctx).ViaField("provider")
}

func (a *AWS) validate(ctx context.Context) (errs *apis.FieldError) {
func (c *Constraints) validate(ctx context.Context) (errs *apis.FieldError) {
return errs.Also(
a.validateInstanceProfile(ctx),
a.validateCapacityType(ctx),
a.validateLaunchTemplate(ctx),
a.validateSubnets(ctx),
a.validateSecurityGroups(ctx),
a.Cluster.Validate(ctx).ViaField("cluster"),
c.validateInstanceProfile(ctx),
c.validateCapacityTypes(ctx),
c.validateLaunchTemplate(ctx),
c.validateSubnets(ctx),
c.validateSecurityGroups(ctx),
c.Cluster.Validate(ctx).ViaField("cluster"),
)
}

func (a *AWS) validateCapacityType(ctx context.Context) (errs *apis.FieldError) {
capacityTypes := []string{CapacityTypeSpot, CapacityTypeOnDemand}
if !functional.ContainsString(capacityTypes, aws.StringValue(a.CapacityType)) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %v", aws.StringValue(a.CapacityType), capacityTypes), "capacityType"))
}
return errs
func (c *Constraints) validateCapacityTypes(ctx context.Context) (errs *apis.FieldError) {
return v1alpha4.ValidateWellKnown(CapacityTypeLabel, c.CapacityTypes, "capacityTypes")
}

func (a *AWS) validateInstanceProfile(ctx context.Context) (errs *apis.FieldError) {
if a.InstanceProfile == "" {
func (c *Constraints) validateInstanceProfile(ctx context.Context) (errs *apis.FieldError) {
if c.InstanceProfile == "" {
errs = errs.Also(apis.ErrMissingField("instanceProfile"))
}
return errs
}

func (a *AWS) validateLaunchTemplate(ctx context.Context) (errs *apis.FieldError) {
func (c *Constraints) validateLaunchTemplate(ctx context.Context) (errs *apis.FieldError) {
// nothing to validate at the moment
return errs
}

func (a *AWS) validateSubnets(ctx context.Context) (errs *apis.FieldError) {
if a.SubnetSelector == nil {
func (c *Constraints) validateSubnets(ctx context.Context) (errs *apis.FieldError) {
if c.SubnetSelector == nil {
errs = errs.Also(apis.ErrMissingField("subnetSelector"))
}
return errs
}

func (a *AWS) validateSecurityGroups(ctx context.Context) (errs *apis.FieldError) {
if a.SecurityGroupsSelector == nil {
func (c *Constraints) validateSecurityGroups(ctx context.Context) (errs *apis.FieldError) {
if c.SecurityGroupsSelector == nil {
errs = errs.Also(apis.ErrMissingField("securityGroupSelector"))
}
return errs
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/aws/apis/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ var (
func init() {
Scheme.AddKnownTypes(schema.GroupVersion{Group: v1alpha4.ExtensionsGroup, Version: "v1alpha1"}, &AWS{})
v1alpha4.RestrictedLabels = append(v1alpha4.RestrictedLabels, AWSLabelPrefix)
v1alpha4.WellKnownLabels[CapacityTypeLabel] = []string{CapacityTypeSpot, CapacityTypeOnDemand}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading