From 81e210aca095f008b991859e430a919b97757431 Mon Sep 17 00:00:00 2001 From: Ellis Tarn Date: Tue, 21 Sep 2021 16:52:23 -0700 Subject: [PATCH] Implemented NodeAffinity --- .../v1alpha4/provisioner_defaults.go | 26 +- .../v1alpha4/provisioner_validation.go | 52 +--- .../v1alpha4/provisioner_validation_test.go | 28 +- pkg/apis/provisioning/v1alpha4/register.go | 17 +- pkg/cloudprovider/aws/ami.go | 11 +- .../aws/apis/v1alpha1/constraints.go | 2 +- .../aws/apis/v1alpha1/constraints_defaults.go | 29 +- .../apis/v1alpha1/constraints_validation.go | 41 ++- .../aws/apis/v1alpha1/register.go | 1 + .../apis/v1alpha1/zz_generated.deepcopy.go | 8 +- pkg/cloudprovider/aws/cloudprovider.go | 38 +-- pkg/cloudprovider/aws/instance.go | 66 ++-- pkg/cloudprovider/aws/instancetype.go | 4 +- pkg/cloudprovider/aws/suite_test.go | 67 ++-- pkg/cloudprovider/fake/cloudprovider.go | 17 +- pkg/cloudprovider/registry/register.go | 15 +- pkg/cloudprovider/types.go | 13 +- .../allocation/binpacking/packable.go | 29 +- pkg/controllers/allocation/controller.go | 4 +- pkg/controllers/allocation/filter.go | 23 +- .../allocation/scheduling/constraints.go | 144 +++++++-- .../allocation/scheduling/scheduler.go | 50 ++- .../allocation/scheduling/suite_test.go | 294 +++++++++++++++++- .../allocation/scheduling/topology.go | 15 +- pkg/controllers/allocation/suite_test.go | 73 ++--- pkg/controllers/termination/terminate.go | 4 +- pkg/scheduling/nodeaffinity.go | 77 +++++ .../allocation => }/scheduling/taints.go | 33 +- pkg/test/pods.go | 23 ++ pkg/utils/functional/functional.go | 60 ++-- pkg/utils/functional/suite_test.go | 38 +++ 31 files changed, 911 insertions(+), 391 deletions(-) create mode 100644 pkg/scheduling/nodeaffinity.go rename pkg/{controllers/allocation => }/scheduling/taints.go (62%) diff --git a/pkg/apis/provisioning/v1alpha4/provisioner_defaults.go b/pkg/apis/provisioning/v1alpha4/provisioner_defaults.go index 3b359d215213..7cf4bae242b2 100644 --- a/pkg/apis/provisioning/v1alpha4/provisioner_defaults.go +++ b/pkg/apis/provisioning/v1alpha4/provisioner_defaults.go @@ -16,6 +16,11 @@ package v1alpha4 import ( "context" + "fmt" + + "github.com/awslabs/karpenter/pkg/scheduling" + "go.uber.org/multierr" + v1 "k8s.io/api/core/v1" ) // SetDefaults for the provisioner @@ -25,5 +30,24 @@ func (p *Provisioner) SetDefaults(ctx context.Context) { // Default the constraints func (c *Constraints) Default(ctx context.Context) { - DefaultingHook(ctx, c) + DefaultHook(ctx, c) +} + +// Constrain applies the pods' scheduling constraints to the constraints. +// Returns an error if the constraints cannot be applied. +func (c *Constraints) Constrain(ctx context.Context, pods ...*v1.Pod) (errs error) { + nodeAffinity := scheduling.NodeAffinityFor(pods...) + for label, constraint := range map[string]*[]string{ + v1.LabelTopologyZone: &c.Zones, + v1.LabelInstanceTypeStable: &c.InstanceTypes, + v1.LabelArchStable: &c.Architectures, + v1.LabelOSStable: &c.OperatingSystems, + } { + values := nodeAffinity.GetLabelValues(label, *constraint, WellKnownLabels[label]) + if len(values) == 0 { + errs = multierr.Append(errs, fmt.Errorf("label %s is too constrained", label)) + } + *constraint = values + } + return multierr.Append(errs, ConstrainHook(ctx, c, pods...)) } diff --git a/pkg/apis/provisioning/v1alpha4/provisioner_validation.go b/pkg/apis/provisioning/v1alpha4/provisioner_validation.go index 68b44c116a66..38ce142e2b61 100644 --- a/pkg/apis/provisioning/v1alpha4/provisioner_validation.go +++ b/pkg/apis/provisioning/v1alpha4/provisioner_validation.go @@ -80,11 +80,11 @@ func (c *Constraints) Validate(ctx context.Context) (errs *apis.FieldError) { return errs.Also( c.validateLabels(), c.validateTaints(), - c.validateArchitecture(), - c.validateOperatingSystem(), - c.validateZones(), - c.validateInstanceTypes(), - ValidationHook(ctx, c), + ValidateWellKnown(v1.LabelTopologyZone, c.Zones, "zones"), + ValidateWellKnown(v1.LabelInstanceTypeStable, c.InstanceTypes, "instanceTypes"), + ValidateWellKnown(v1.LabelArchStable, c.Architectures, "architectures"), + ValidateWellKnown(v1.LabelOSStable, c.OperatingSystems, "operatingSystems"), + ValidateHook(ctx, c), ) } @@ -125,43 +125,13 @@ func (c *Constraints) validateTaints() (errs *apis.FieldError) { return errs } -func (c *Constraints) validateArchitecture() (errs *apis.FieldError) { - if c.Architectures == nil { - return nil +func ValidateWellKnown(key string, values []string, fieldName string) (errs *apis.FieldError) { + if values != nil && len(values) == 0 { + errs = errs.Also(apis.ErrMissingField(fieldName)) } - for _, architecture := range c.Architectures { - if !functional.ContainsString(SupportedArchitectures, architecture) { - errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %v", architecture, SupportedArchitectures), "architecture")) - } - } - return errs -} - -func (c *Constraints) validateOperatingSystem() (errs *apis.FieldError) { - if c.OperatingSystems == nil { - return nil - } - for _, operatingSystem := range c.OperatingSystems { - if !functional.ContainsString(SupportedOperatingSystems, operatingSystem) { - errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %v", operatingSystem, SupportedOperatingSystems), "operatingSystem")) - } - } - return errs -} - -func (c *Constraints) validateZones() (errs *apis.FieldError) { - for i, zone := range c.Zones { - if !functional.ContainsString(SupportedZones, zone) { - errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %v", zone, SupportedZones), "zones", i)) - } - } - return errs -} - -func (c *Constraints) validateInstanceTypes() (errs *apis.FieldError) { - for i, instanceType := range c.InstanceTypes { - if !functional.ContainsString(SupportedInstanceTypes, instanceType) { - errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %v", instanceType, SupportedInstanceTypes), "instanceTypes", i)) + for i, value := range values { + if known := WellKnownLabels[key]; !functional.ContainsString(known, value) { + errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %v", value, known), fieldName, i)) } } return errs diff --git a/pkg/apis/provisioning/v1alpha4/provisioner_validation_test.go b/pkg/apis/provisioning/v1alpha4/provisioner_validation_test.go index 1910a85361bc..e363043ce123 100644 --- a/pkg/apis/provisioning/v1alpha4/provisioner_validation_test.go +++ b/pkg/apis/provisioning/v1alpha4/provisioner_validation_test.go @@ -107,9 +107,10 @@ var _ = Describe("Validation", func() { }) }) Context("Zones", func() { - SupportedZones = append(SupportedZones, "test-zone-1") - It("should succeed if unspecified", func() { - Expect(provisioner.Validate(ctx)).To(Succeed()) + WellKnownLabels[v1.LabelTopologyZone] = append(WellKnownLabels[v1.LabelTopologyZone], "test-zone-1") + It("should fail if empty", func() { + provisioner.Spec.Zones = []string{} + Expect(provisioner.Validate(ctx)).ToNot(Succeed()) }) It("should fail if not supported", func() { provisioner.Spec.Zones = []string{"unknown"} @@ -122,9 +123,10 @@ var _ = Describe("Validation", func() { }) Context("InstanceTypes", func() { - SupportedInstanceTypes = append(SupportedInstanceTypes, "test-instance-type") - It("should succeed if unspecified", func() { - Expect(provisioner.Validate(ctx)).To(Succeed()) + WellKnownLabels[v1.LabelInstanceTypeStable] = append(WellKnownLabels[v1.LabelInstanceTypeStable], "test-instance-type") + It("should fail if empty", func() { + provisioner.Spec.InstanceTypes = []string{} + Expect(provisioner.Validate(ctx)).ToNot(Succeed()) }) It("should fail if not supported", func() { provisioner.Spec.InstanceTypes = []string{"unknown"} @@ -139,9 +141,10 @@ var _ = Describe("Validation", func() { }) Context("Architecture", func() { - SupportedArchitectures = append(SupportedArchitectures, "test-architecture") - It("should succeed if unspecified", func() { - Expect(provisioner.Validate(ctx)).To(Succeed()) + WellKnownLabels[v1.LabelArchStable] = append(WellKnownLabels[v1.LabelArchStable], "test-architecture") + It("should fail if empty", func() { + provisioner.Spec.Architectures = []string{} + Expect(provisioner.Validate(ctx)).ToNot(Succeed()) }) It("should fail if not supported", func() { provisioner.Spec.Architectures = []string{"unknown"} @@ -154,9 +157,10 @@ var _ = Describe("Validation", func() { }) Context("OperatingSystem", func() { - SupportedOperatingSystems = append(SupportedOperatingSystems, "test-operating-system") - It("should succeed if unspecified", func() { - Expect(provisioner.Validate(ctx)).To(Succeed()) + WellKnownLabels[v1.LabelOSStable] = append(WellKnownLabels[v1.LabelOSStable], "test-operating-system") + It("should fail if empty", func() { + provisioner.Spec.OperatingSystems = []string{} + Expect(provisioner.Validate(ctx)).ToNot(Succeed()) }) It("should fail if not supported", func() { provisioner.Spec.OperatingSystems = []string{"unknown"} diff --git a/pkg/apis/provisioning/v1alpha4/register.go b/pkg/apis/provisioning/v1alpha4/register.go index aa456565de40..e50e8001f09b 100644 --- a/pkg/apis/provisioning/v1alpha4/register.go +++ b/pkg/apis/provisioning/v1alpha4/register.go @@ -52,16 +52,19 @@ var ( v1.LabelTopologyZone, v1.LabelInstanceTypeStable, // Used internally by provisioning logic - ProvisionerNameLabelKey, EmptinessTimestampAnnotationKey, v1.LabelHostname, } - SupportedArchitectures = []string{} - SupportedOperatingSystems = []string{} - SupportedZones = []string{} - SupportedInstanceTypes = []string{} - ValidationHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil } - DefaultingHook = func(ctx context.Context, constraints *Constraints) {} + // WellKnownLabels supported by karpenter and their allowable values + WellKnownLabels = map[string][]string{ + v1.LabelArchStable: {}, + v1.LabelOSStable: {}, + v1.LabelTopologyZone: {}, + v1.LabelInstanceTypeStable: {}, + } + DefaultHook = func(ctx context.Context, constraints *Constraints) {} + ValidateHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil } + ConstrainHook = func(ctx context.Context, constraints *Constraints, pods ...*v1.Pod) error { return nil } ) var ( diff --git a/pkg/cloudprovider/aws/ami.go b/pkg/cloudprovider/aws/ami.go index 232c7095c82b..4b0f6bd174b5 100644 --- a/pkg/cloudprovider/aws/ami.go +++ b/pkg/cloudprovider/aws/ami.go @@ -25,6 +25,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" "github.com/awslabs/karpenter/pkg/cloudprovider" v1alpha1 "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" + "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/patrickmn/go-cache" "k8s.io/client-go/kubernetes" "knative.dev/pkg/logging" @@ -52,17 +53,13 @@ func (p *AMIProvider) getSSMParameter(ctx context.Context, constraints *v1alpha1 return "", fmt.Errorf("kube server version, %w", err) } var amiNameSuffix string - if len(constraints.Architectures) > 0 { - // select the first one if multiple supported - if constraints.Architectures[0] == v1alpha4.ArchitectureArm64 { - amiNameSuffix = "-arm64" - } - } if needsGPUAmi(instanceTypes) { - if amiNameSuffix != "" { + if !functional.ContainsString(constraints.Architectures, v1alpha4.ArchitectureAmd64) { return "", fmt.Errorf("no amazon-linux-2 ami available for both nvidia/neuron gpus and arm64 cpus") } amiNameSuffix = "-gpu" + } else if functional.ContainsString(constraints.Architectures, v1alpha4.ArchitectureArm64) { + amiNameSuffix = "-arm64" } return fmt.Sprintf("/aws/service/eks/optimized-ami/%s/amazon-linux-2%s/recommended/image_id", version, amiNameSuffix), nil } diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/constraints.go b/pkg/cloudprovider/aws/apis/v1alpha1/constraints.go index da1d1c0ce3a4..13ccce788557 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/constraints.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/constraints.go @@ -55,7 +55,7 @@ type AWS struct { // CapacityType for the node. If not specified, defaults to on-demand. // May be overriden by pods.spec.nodeSelector["node.k8s.aws/capacityType"] // +optional - CapacityType *string `json:"capacityType,omitempty"` + CapacityTypes []string `json:"capacityTypes,omitempty"` // LaunchTemplate for the node. If not specified, a launch template will be generated. // +optional LaunchTemplate *string `json:"launchTemplate,omitempty"` diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/constraints_defaults.go b/pkg/cloudprovider/aws/apis/v1alpha1/constraints_defaults.go index 9fca5c9584c1..99b4d2478420 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/constraints_defaults.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/constraints_defaults.go @@ -18,26 +18,25 @@ import ( "context" "fmt" - "knative.dev/pkg/ptr" + "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" + "github.com/awslabs/karpenter/pkg/scheduling" + v1 "k8s.io/api/core/v1" ) var ClusterDiscoveryTagKeyFormat = "kubernetes.io/cluster/%s" -// Default the constraints +// Default the constraints. func (c *Constraints) Default(ctx context.Context) { - c.defaultCapacityType(ctx) + c.defaultCapacityTypes(ctx) c.defaultSubnets(ctx) c.defaultSecurityGroups(ctx) } -func (c *Constraints) defaultCapacityType(ctx context.Context) { - if capacityType, ok := c.Labels[CapacityTypeLabel]; ok { - c.CapacityType = &capacityType - } - if c.CapacityType != nil { +func (c *Constraints) defaultCapacityTypes(ctx context.Context) { + if len(c.CapacityTypes) != 0 { return } - c.CapacityType = ptr.String(CapacityTypeOnDemand) + c.CapacityTypes = []string{CapacityTypeOnDemand} } func (c *Constraints) defaultSubnets(ctx context.Context) { @@ -53,3 +52,15 @@ func (c *Constraints) defaultSecurityGroups(ctx context.Context) { } c.SecurityGroupsSelector = map[string]string{fmt.Sprintf(ClusterDiscoveryTagKeyFormat, c.Cluster.Name): ""} } + +// Constrain applies the pod's scheduling constraints to the constraints. +// Returns an error if the constraints cannot be applied. +func (c *Constraints) Constrain(ctx context.Context, pods ...*v1.Pod) error { + nodeAffinity := scheduling.NodeAffinityFor(pods...) + capacityTypes := nodeAffinity.GetLabelValues(CapacityTypeLabel, c.CapacityTypes, v1alpha4.WellKnownLabels[CapacityTypeLabel]) + if len(capacityTypes) == 0 { + return fmt.Errorf("no valid capacity types") + } + c.CapacityTypes = capacityTypes + return nil +} diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/constraints_validation.go b/pkg/cloudprovider/aws/apis/v1alpha1/constraints_validation.go index 821a881d33d2..09e83f5925ff 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/constraints_validation.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/constraints_validation.go @@ -19,55 +19,50 @@ import ( "fmt" "net/url" - "github.com/aws/aws-sdk-go/aws" - "github.com/awslabs/karpenter/pkg/utils/functional" + "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" "knative.dev/pkg/apis" ) func (c *Constraints) Validate(ctx context.Context) (errs *apis.FieldError) { - return c.AWS.validate(ctx).ViaField("provider") + return c.validate(ctx).ViaField("provider") } -func (a *AWS) validate(ctx context.Context) (errs *apis.FieldError) { +func (c *Constraints) validate(ctx context.Context) (errs *apis.FieldError) { return errs.Also( - a.validateInstanceProfile(ctx), - a.validateCapacityType(ctx), - a.validateLaunchTemplate(ctx), - a.validateSubnets(ctx), - a.validateSecurityGroups(ctx), - a.Cluster.Validate(ctx).ViaField("cluster"), + c.validateInstanceProfile(ctx), + c.validateCapacityTypes(ctx), + c.validateLaunchTemplate(ctx), + c.validateSubnets(ctx), + c.validateSecurityGroups(ctx), + c.Cluster.Validate(ctx).ViaField("cluster"), ) } -func (a *AWS) validateCapacityType(ctx context.Context) (errs *apis.FieldError) { - capacityTypes := []string{CapacityTypeSpot, CapacityTypeOnDemand} - if !functional.ContainsString(capacityTypes, aws.StringValue(a.CapacityType)) { - errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %v", aws.StringValue(a.CapacityType), capacityTypes), "capacityType")) - } - return errs +func (c *Constraints) validateCapacityTypes(ctx context.Context) (errs *apis.FieldError) { + return v1alpha4.ValidateWellKnown(CapacityTypeLabel, c.CapacityTypes, "capacityTypes") } -func (a *AWS) validateInstanceProfile(ctx context.Context) (errs *apis.FieldError) { - if a.InstanceProfile == "" { +func (c *Constraints) validateInstanceProfile(ctx context.Context) (errs *apis.FieldError) { + if c.InstanceProfile == "" { errs = errs.Also(apis.ErrMissingField("instanceProfile")) } return errs } -func (a *AWS) validateLaunchTemplate(ctx context.Context) (errs *apis.FieldError) { +func (c *Constraints) validateLaunchTemplate(ctx context.Context) (errs *apis.FieldError) { // nothing to validate at the moment return errs } -func (a *AWS) validateSubnets(ctx context.Context) (errs *apis.FieldError) { - if a.SubnetSelector == nil { +func (c *Constraints) validateSubnets(ctx context.Context) (errs *apis.FieldError) { + if c.SubnetSelector == nil { errs = errs.Also(apis.ErrMissingField("subnetSelector")) } return errs } -func (a *AWS) validateSecurityGroups(ctx context.Context) (errs *apis.FieldError) { - if a.SecurityGroupsSelector == nil { +func (c *Constraints) validateSecurityGroups(ctx context.Context) (errs *apis.FieldError) { + if c.SecurityGroupsSelector == nil { errs = errs.Also(apis.ErrMissingField("securityGroupSelector")) } return errs diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/register.go b/pkg/cloudprovider/aws/apis/v1alpha1/register.go index 5c14bd634965..84ed0256f5d9 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/register.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/register.go @@ -46,4 +46,5 @@ var ( func init() { Scheme.AddKnownTypes(schema.GroupVersion{Group: v1alpha4.ExtensionsGroup, Version: "v1alpha1"}, &AWS{}) v1alpha4.RestrictedLabels = append(v1alpha4.RestrictedLabels, AWSLabelPrefix) + v1alpha4.WellKnownLabels[CapacityTypeLabel] = []string{CapacityTypeSpot, CapacityTypeOnDemand} } diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/zz_generated.deepcopy.go b/pkg/cloudprovider/aws/apis/v1alpha1/zz_generated.deepcopy.go index 08e17a20f344..a054cfb60a22 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/zz_generated.deepcopy.go @@ -28,10 +28,10 @@ func (in *AWS) DeepCopyInto(out *AWS) { *out = *in out.TypeMeta = in.TypeMeta out.Cluster = in.Cluster - if in.CapacityType != nil { - in, out := &in.CapacityType, &out.CapacityType - *out = new(string) - **out = **in + if in.CapacityTypes != nil { + in, out := &in.CapacityTypes, &out.CapacityTypes + *out = make([]string, len(*in)) + copy(*out, *in) } if in.LaunchTemplate != nil { in, out := &in.LaunchTemplate, &out.LaunchTemplate diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index 2321fe1aad93..8e6122a85fac 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -29,7 +29,6 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" "github.com/awslabs/karpenter/pkg/cloudprovider" v1alpha1 "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" - "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/awslabs/karpenter/pkg/utils/parallel" "github.com/awslabs/karpenter/pkg/utils/project" v1 "k8s.io/api/core/v1" @@ -143,7 +142,7 @@ func (c *CloudProvider) create(ctx context.Context, v1alpha4constraints *v1alpha return fmt.Errorf("getting launch template, %w", err) } // 3. Create instance - node, err := c.instanceProvider.Create(ctx, launchTemplate, instanceTypes, subnets, aws.StringValue(constraints.CapacityType)) + node, err := c.instanceProvider.Create(ctx, launchTemplate, instanceTypes, subnets, constraints.CapacityTypes) if err != nil { return fmt.Errorf("launching instance, %w", err) } @@ -154,28 +153,12 @@ func (c *CloudProvider) GetInstanceTypes(ctx context.Context) ([]cloudprovider.I return c.instanceTypeProvider.Get(ctx) } -func (c *CloudProvider) GetZones(ctx context.Context, v1alpha4constraints *v1alpha4.Constraints) ([]string, error) { - constraints, err := v1alpha1.NewConstraints(v1alpha4constraints) - if err != nil { - return nil, err - } - subnets, err := c.subnetProvider.Get(ctx, constraints) - if err != nil { - return nil, fmt.Errorf("getting subnets, %w", err) - } - zones := []string{} - for _, subnet := range subnets { - zones = append(zones, aws.StringValue(subnet.AvailabilityZone)) - } - return functional.UniqueStrings(zones), nil -} - func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error { return c.instanceProvider.Terminate(ctx, node) } // Validate the constraints -func (c *CloudProvider) Validate(ctx context.Context, v1alpha4constraints *v1alpha4.Constraints) (errs *apis.FieldError) { +func (c *CloudProvider) Validate(ctx context.Context, v1alpha4constraints *v1alpha4.Constraints) *apis.FieldError { constraints, err := v1alpha1.NewConstraints(v1alpha4constraints) if err != nil { return apis.ErrGeneric(err.Error()) @@ -196,3 +179,20 @@ func (c *CloudProvider) Default(ctx context.Context, v1alpha4constraints *v1alph logging.FromContext(context.Background()).Errorf("failed to serialize provider, %s", err.Error()) } } + +// Constrain applies the pod's scheduling constraints to the constraints. +// Returns an error if the constraints cannot be applied. +func (c *CloudProvider) Constrain(ctx context.Context, constraints *v1alpha4.Constraints, pods ...*v1.Pod) error { + vendorConstraints, err := v1alpha1.NewConstraints(constraints) + if err != nil { + return fmt.Errorf("failed to deserialize provider, %w", err) + } + if err:= vendorConstraints.Constrain(ctx, pods...); err != nil { + return err + } + constraints.Provider.Raw, err = json.Marshal(vendorConstraints.AWS) + if err != nil { + return fmt.Errorf("failed to serialize provider, %w", err) + } + return nil +} diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index 7fa39ba0ab1d..4e3deb70ff4c 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -28,6 +28,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" "github.com/awslabs/karpenter/pkg/cloudprovider" v1alpha1 "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" + "github.com/awslabs/karpenter/pkg/utils/functional" "knative.dev/pkg/logging" "go.uber.org/multierr" @@ -53,10 +54,10 @@ func (p *InstanceProvider) Create(ctx context.Context, launchTemplate string, instanceTypes []cloudprovider.InstanceType, subnets []*ec2.Subnet, - capacityType string, + capacityTypes []string, ) (*v1.Node, error) { // 1. Launch Instance - id, err := p.launchInstance(ctx, launchTemplate, instanceTypes, subnets, capacityType) + id, err := p.launchInstance(ctx, launchTemplate, instanceTypes, subnets, capacityTypes) if err != nil { return nil, err } @@ -103,30 +104,19 @@ func (p *InstanceProvider) launchInstance(ctx context.Context, launchTemplateName string, instanceTypeOptions []cloudprovider.InstanceType, subnets []*ec2.Subnet, - capacityType string) (*string, error) { - // 1. Construct override options. - var overrides []*ec2.FleetLaunchTemplateOverridesRequest - for i, instanceType := range instanceTypeOptions { - for _, zone := range instanceType.Zones() { - for _, subnet := range subnets { - if aws.StringValue(subnet.AvailabilityZone) == zone { - override := &ec2.FleetLaunchTemplateOverridesRequest{ - InstanceType: aws.String(instanceType.Name()), - SubnetId: subnet.SubnetId, - } - // Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy - // to reduce the likelihood of getting an excessively large instance type. - // instanceTypeOptions are sorted by vcpus and memory so this prioritizes smaller instance types. - if capacityType == v1alpha1.CapacityTypeSpot { - override.Priority = aws.Float64(float64(i)) - } - overrides = append(overrides, override) - // FleetAPI cannot span subnets from the same AZ, so break after the first one. - break - } - } - } + capacityTypes []string) (*string, error) { + + // If unconstrained, default to spot to save on cost, otherwise use on + // demand. Constraint solving logic will guarantee that capacityTypes are + // either unconstrained (nil), or a valid WellKnownLabel. Provisioner + // defaulting logic will currently default to [on-demand] if unspecifed. + capacityType := v1alpha1.CapacityTypeSpot + if capacityTypes != nil && !functional.ContainsString(capacityTypes, v1alpha1.CapacityTypeSpot) { + capacityType = v1alpha1.CapacityTypeOnDemand } + + // 1. Construct override options. + overrides := p.getOverrides(instanceTypeOptions, subnets, capacityType) if len(overrides) == 0 { return nil, fmt.Errorf("no viable {subnet, instanceType} combination") } @@ -163,6 +153,32 @@ func (p *InstanceProvider) launchInstance(ctx context.Context, return createFleetOutput.Instances[0].InstanceIds[0], nil } +func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.InstanceType, subnets []*ec2.Subnet, capacityType string) []*ec2.FleetLaunchTemplateOverridesRequest { + var overrides []*ec2.FleetLaunchTemplateOverridesRequest + for i, instanceType := range instanceTypeOptions { + for _, zone := range instanceType.Zones() { + for _, subnet := range subnets { + if aws.StringValue(subnet.AvailabilityZone) == zone { + override := &ec2.FleetLaunchTemplateOverridesRequest{ + InstanceType: aws.String(instanceType.Name()), + SubnetId: subnet.SubnetId, + } + // Add a priority for spot requests since we are using the capacity-optimized-prioritized spot allocation strategy + // to reduce the likelihood of getting an excessively large instance type. + // instanceTypeOptions are sorted by vcpus and memory so this prioritizes smaller instance types. + if capacityType == v1alpha1.CapacityTypeSpot { + override.Priority = aws.Float64(float64(i)) + } + overrides = append(overrides, override) + // FleetAPI cannot span subnets from the same AZ, so break after the first one. + break + } + } + } + } + return overrides +} + func (p *InstanceProvider) getInstance(ctx context.Context, id *string, instance *ec2.Instance) error { describeInstancesOutput, err := p.ec2api.DescribeInstancesWithContext(ctx, &ec2.DescribeInstancesInput{InstanceIds: []*string{id}}) if aerr, ok := err.(awserr.Error); ok && aerr.Code() == EC2InstanceIDNotFoundErrCode { diff --git a/pkg/cloudprovider/aws/instancetype.go b/pkg/cloudprovider/aws/instancetype.go index 278288e2f97b..23c74e289924 100644 --- a/pkg/cloudprovider/aws/instancetype.go +++ b/pkg/cloudprovider/aws/instancetype.go @@ -44,7 +44,9 @@ func (i *InstanceType) Zones() []string { func (i *InstanceType) Architectures() []string { architectures := []string{} for _, architecture := range i.ProcessorInfo.SupportedArchitectures { - architectures = append(architectures, v1alpha1.AWSToKubeArchitectures[aws.StringValue(architecture)]) + if value, ok := v1alpha1.AWSToKubeArchitectures[aws.StringValue(architecture)]; ok { + architectures = append(architectures, value) + } } return architectures } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index f39b08216451..63d0cd3ea2c8 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -217,31 +217,54 @@ var _ = Describe("Allocation", func() { }) It("should default to a provisioner's specified capacity type", func() { // Setup - provisioner.Spec.Labels = map[string]string{v1alpha1.CapacityTypeLabel: v1alpha1.CapacityTypeSpot} - ExpectCreated(env.Client, provisioner) + provider.CapacityTypes = []string{v1alpha1.CapacityTypeSpot} + ExpectCreated(env.Client, ProvisionerWithProvider(provisioner, provider)) pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) // Assertions - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) - Expect(node.Labels).To(HaveKeyWithValue(v1alpha1.CapacityTypeLabel, v1alpha1.CapacityTypeSpot)) + ExpectNodeExists(env.Client, pods[0].Spec.NodeName) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) Expect(*input.TargetCapacitySpecification.DefaultTargetCapacityType).To(Equal(v1alpha1.CapacityTypeSpot)) }) - It("should allow a pod to override the capacity type", func() { + It("should launch spot capacity if flexible to both spot and on demand", func() { // Setup - ExpectCreated(env.Client, provisioner) + provider.CapacityTypes = []string{v1alpha1.CapacityTypeSpot, v1alpha1.CapacityTypeOnDemand} + ExpectCreated(env.Client, ProvisionerWithProvider(provisioner, provider)) pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1alpha1.CapacityTypeLabel: v1alpha1.CapacityTypeSpot}}), ) // Assertions - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) - Expect(node.Labels).To(HaveKeyWithValue(v1alpha1.CapacityTypeLabel, v1alpha1.CapacityTypeSpot)) + ExpectNodeExists(env.Client, pods[0].Spec.NodeName) Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) Expect(*input.TargetCapacitySpecification.DefaultTargetCapacityType).To(Equal(v1alpha1.CapacityTypeSpot)) }) + It("should allow a pod to constrain the capacity type", func() { + // Setup + provider.CapacityTypes = []string{v1alpha1.CapacityTypeSpot, v1alpha1.CapacityTypeOnDemand} + ExpectCreated(env.Client, ProvisionerWithProvider(provisioner, provider)) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1alpha1.CapacityTypeLabel: v1alpha1.CapacityTypeOnDemand}}), + ) + // Assertions + ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(fakeEC2API.CalledWithCreateFleetInput.Cardinality()).To(Equal(1)) + input := fakeEC2API.CalledWithCreateFleetInput.Pop().(*ec2.CreateFleetInput) + Expect(input.LaunchTemplateConfigs).To(HaveLen(1)) + Expect(*input.TargetCapacitySpecification.DefaultTargetCapacityType).To(Equal(v1alpha1.CapacityTypeOnDemand)) + }) + It("should not schedule a pod if outside of provisioner constraints", func() { + // Setup + provider.CapacityTypes = []string{v1alpha1.CapacityTypeOnDemand} + ExpectCreated(env.Client, ProvisionerWithProvider(provisioner, provider)) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1alpha1.CapacityTypeLabel: v1alpha1.CapacityTypeSpot}}), + ) + // Assertions + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) It("should not schedule a pod with an invalid capacityType", func() { // Setup ExpectCreated(env.Client, provisioner) @@ -333,7 +356,7 @@ var _ = Describe("Allocation", func() { provisioner.SetDefaults(ctx) constraints, err := v1alpha1.NewConstraints(&provisioner.Spec.Constraints) Expect(err).ToNot(HaveOccurred()) - Expect(aws.StringValue(constraints.CapacityType)).To(Equal(v1alpha1.CapacityTypeOnDemand)) + Expect(constraints.CapacityTypes).To(ConsistOf(v1alpha1.CapacityTypeOnDemand)) }) }) Context("Validation", func() { @@ -372,9 +395,6 @@ var _ = Describe("Allocation", func() { }) Context("Zones", func() { - It("should succeed if unspecified", func() { - Expect(provisioner.Validate(ctx)).To(Succeed()) - }) It("should fail if not supported", func() { provisioner.Spec.Zones = []string{"unknown"} Expect(provisioner.Validate(ctx)).ToNot(Succeed()) @@ -394,9 +414,6 @@ var _ = Describe("Allocation", func() { }) }) Context("InstanceTypes", func() { - It("should succeed if unspecified", func() { - Expect(provisioner.Validate(ctx)).To(Succeed()) - }) It("should fail if not supported", func() { provisioner.Spec.InstanceTypes = []string{"unknown"} Expect(provisioner.Validate(ctx)).ToNot(Succeed()) @@ -409,9 +426,6 @@ var _ = Describe("Allocation", func() { }) }) Context("Architecture", func() { - It("should succeed if unspecified", func() { - Expect(provisioner.Validate(ctx)).To(Succeed()) - }) It("should fail if not supported", func() { provisioner.Spec.Architectures = []string{"unknown"} Expect(provisioner.Validate(ctx)).ToNot(Succeed()) @@ -426,9 +440,6 @@ var _ = Describe("Allocation", func() { }) }) Context("OperatingSystem", func() { - It("should succeed if unspecified", func() { - Expect(provisioner.Validate(ctx)).To(Succeed()) - }) It("should fail if not supported", func() { provisioner.Spec.OperatingSystems = []string{"unknown"} Expect(provisioner.Validate(ctx)).ToNot(Succeed()) @@ -438,6 +449,20 @@ var _ = Describe("Allocation", func() { Expect(provisioner.Validate(ctx)).To(Succeed()) }) }) + Context("CapacityType", func() { + It("should fail if not supported", func() { + provider.CapacityTypes = []string{"unknown"} + Expect(ProvisionerWithProvider(provisioner, provider).Validate(ctx)).ToNot(Succeed()) + }) + It("should support spot", func() { + provider.CapacityTypes = []string{"spot"} + Expect(ProvisionerWithProvider(provisioner, provider).Validate(ctx)).ToNot(Succeed()) + }) + It("should support on demand", func() { + provider.CapacityTypes = []string{"on demand"} + Expect(ProvisionerWithProvider(provisioner, provider).Validate(ctx)).ToNot(Succeed()) + }) + }) }) }) diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index fe9dcab42c83..aa9396892aeb 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -23,11 +23,11 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/utils/functional" + "knative.dev/pkg/apis" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/pkg/apis" ) type CloudProvider struct{} @@ -48,7 +48,10 @@ func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha4.Constr err <- bind(&v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Labels: constraints.Labels, + Labels: map[string]string{ + v1.LabelTopologyZone: zone, + v1.LabelInstanceTypeStable: instance.Name(), + }, }, Spec: v1.NodeSpec{ ProviderID: fmt.Sprintf("fake:///%s/%s", name, zone), @@ -69,10 +72,6 @@ func (c *CloudProvider) Create(ctx context.Context, constraints *v1alpha4.Constr return err } -func (c *CloudProvider) GetZones(context context.Context, constraints *v1alpha4.Constraints) ([]string, error) { - return []string{"test-zone-1", "test-zone-2", "test-zone-3"}, nil -} - func (c *CloudProvider) GetInstanceTypes(ctx context.Context) ([]cloudprovider.InstanceType, error) { return []cloudprovider.InstanceType{ NewInstanceType(InstanceTypeOptions{ @@ -105,9 +104,13 @@ func (c *CloudProvider) Delete(context.Context, *v1.Node) error { return nil } +func (c *CloudProvider) Default(context.Context, *v1alpha4.Constraints) { +} + func (c *CloudProvider) Validate(context.Context, *v1alpha4.Constraints) *apis.FieldError { return nil } -func (c *CloudProvider) Default(context.Context, *v1alpha4.Constraints) { +func (c *CloudProvider) Constrain(context.Context, *v1alpha4.Constraints, ...*v1.Pod) error { + return nil } diff --git a/pkg/cloudprovider/registry/register.go b/pkg/cloudprovider/registry/register.go index 093ad91ace98..2e8d9d750372 100644 --- a/pkg/cloudprovider/registry/register.go +++ b/pkg/cloudprovider/registry/register.go @@ -20,6 +20,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" "github.com/awslabs/karpenter/pkg/cloudprovider" + v1 "k8s.io/api/core/v1" ) func NewCloudProvider(ctx context.Context, options cloudprovider.Options) cloudprovider.CloudProvider { @@ -42,7 +43,7 @@ func RegisterOrDie(cloudProvider cloudprovider.CloudProvider) { panic(fmt.Sprintf("Failed to retrieve instance types, %s", err.Error())) } for _, instanceType := range instanceTypes { - v1alpha4.SupportedInstanceTypes = append(v1alpha4.SupportedInstanceTypes, instanceType.Name()) + v1alpha4.WellKnownLabels[v1.LabelInstanceTypeStable] = append(v1alpha4.WellKnownLabels[v1.LabelInstanceTypeStable], instanceType.Name()) for _, zone := range instanceType.Zones() { zones[zone] = true } @@ -54,14 +55,16 @@ func RegisterOrDie(cloudProvider cloudprovider.CloudProvider) { } } for zone := range zones { - v1alpha4.SupportedZones = append(v1alpha4.SupportedZones, zone) + v1alpha4.WellKnownLabels[v1.LabelTopologyZone] = append(v1alpha4.WellKnownLabels[v1.LabelTopologyZone], zone) } for architecture := range architectures { - v1alpha4.SupportedArchitectures = append(v1alpha4.SupportedArchitectures, architecture) + v1alpha4.WellKnownLabels[v1.LabelArchStable] = append(v1alpha4.WellKnownLabels[v1.LabelArchStable], architecture) } for operatingSystem := range operatingSystems { - v1alpha4.SupportedOperatingSystems = append(v1alpha4.SupportedOperatingSystems, operatingSystem) + v1alpha4.WellKnownLabels[v1.LabelOSStable] = append(v1alpha4.WellKnownLabels[v1.LabelOSStable], operatingSystem) } - v1alpha4.ValidationHook = cloudProvider.Validate - v1alpha4.DefaultingHook = cloudProvider.Default + + v1alpha4.ValidateHook = cloudProvider.Validate + v1alpha4.DefaultHook = cloudProvider.Default + v1alpha4.ConstrainHook = cloudProvider.Constrain } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 14927bbef0ae..27eab376e68b 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -37,14 +37,13 @@ type CloudProvider interface { // GetInstanceTypes returns the instance types supported by the cloud // provider limited by the provided constraints and daemons. GetInstanceTypes(context.Context) ([]InstanceType, error) - // GetZones returns the zones supported by the cloud provider and the specified provisioner - GetZones(context.Context, *v1alpha4.Constraints) ([]string, error) - // Validate is a hook for additional validation logic. This method is not - // only called during Provisioner CRD validation, it is also used at - // provisioning time to ensure that pods are provisionable. - Validate(context.Context, *v1alpha4.Constraints) *apis.FieldError - // Default is a hook for additional defaulting logic specific. + // Default is a hook for additional defaulting logic at webhook time. Default(context.Context, *v1alpha4.Constraints) + // Validate is a hook for additional validation logic at webhook time. + Validate(context.Context, *v1alpha4.Constraints) *apis.FieldError + // Constrain is a hook for additional constraint logic at runtime. + // Returns an error if the constraints cannot be applied. + Constrain(context.Context, *v1alpha4.Constraints, ...*v1.Pod) error } // Options are injected into cloud providers' factories diff --git a/pkg/controllers/allocation/binpacking/packable.go b/pkg/controllers/allocation/binpacking/packable.go index 647316cede9a..7059f94f5d43 100644 --- a/pkg/controllers/allocation/binpacking/packable.go +++ b/pkg/controllers/allocation/binpacking/packable.go @@ -22,6 +22,7 @@ import ( "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/awslabs/karpenter/pkg/utils/resources" + "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "knative.dev/pkg/logging" @@ -48,17 +49,17 @@ func PackablesFor(ctx context.Context, instanceTypes []cloudprovider.InstanceTyp // additional filtering will be done by later steps (such as // removing instance types that obviously lack resources, such // as GPUs, for the workload being presented). - if err := functional.ValidateAll( - func() error { return packable.validateZones(schedule) }, - func() error { return packable.validateInstanceType(schedule) }, - func() error { return packable.validateArchitecture(schedule) }, - func() error { return packable.validateOperatingSystem(schedule) }, + if err := multierr.Combine( + packable.validateZones(schedule), + packable.validateInstanceType(schedule), + packable.validateArchitecture(schedule), + packable.validateOperatingSystem(schedule), // Although this will remove instances that have GPUs when // not required, removal of instance types that *lack* // GPUs will be done later. - func() error { return packable.validateNvidiaGpus(schedule) }, - func() error { return packable.validateAMDGpus(schedule) }, - func() error { return packable.validateAWSNeurons(schedule) }, + packable.validateNvidiaGpus(schedule), + packable.validateAMDGpus(schedule), + packable.validateAWSNeurons(schedule), ); err != nil { continue } @@ -151,9 +152,6 @@ func (p *Packable) reservePod(pod *v1.Pod) bool { } func (p *Packable) validateInstanceType(schedule *scheduling.Schedule) error { - if len(schedule.InstanceTypes) == 0 { - return nil - } if !functional.ContainsString(schedule.InstanceTypes, p.Name()) { return fmt.Errorf("instance type %s is not in %v", p.Name(), schedule.InstanceTypes) } @@ -161,9 +159,6 @@ func (p *Packable) validateInstanceType(schedule *scheduling.Schedule) error { } func (p *Packable) validateArchitecture(schedule *scheduling.Schedule) error { - if schedule.Architectures == nil { - return nil - } if len(functional.IntersectStringSlice(p.Architectures(), schedule.Architectures)) == 0 { return fmt.Errorf("architecture %s is not in %v", schedule.Architectures, p.Architectures()) } @@ -171,9 +166,6 @@ func (p *Packable) validateArchitecture(schedule *scheduling.Schedule) error { } func (p *Packable) validateOperatingSystem(schedule *scheduling.Schedule) error { - if schedule.OperatingSystems == nil { - return nil - } if len(functional.IntersectStringSlice(p.OperatingSystems(), schedule.OperatingSystems)) == 0 { return fmt.Errorf("operating system %s is not in %v", schedule.OperatingSystems, p.OperatingSystems()) } @@ -181,9 +173,6 @@ func (p *Packable) validateOperatingSystem(schedule *scheduling.Schedule) error } func (p *Packable) validateZones(schedule *scheduling.Schedule) error { - if len(schedule.Zones) == 0 { - return nil - } if len(functional.IntersectStringSlice(schedule.Zones, p.Zones())) == 0 { return fmt.Errorf("zones %v are not in %v", schedule.Zones, p.Zones()) } diff --git a/pkg/controllers/allocation/controller.go b/pkg/controllers/allocation/controller.go index bf338ea9bd47..8d8b3e1a70fd 100644 --- a/pkg/controllers/allocation/controller.go +++ b/pkg/controllers/allocation/controller.go @@ -112,7 +112,6 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco if err != nil { return result.RetryIfError(ctx, fmt.Errorf("solving scheduling constraints, %w", err)) } - // 5. Get Instance Types Options instanceTypes, err := c.CloudProvider.GetInstanceTypes(ctx) if err != nil { @@ -131,8 +130,9 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco packing := packings[index] errs[index] = <-c.CloudProvider.Create(ctx, packing.Constraints, packing.InstanceTypeOptions, func(node *v1.Node) error { node.Labels = functional.UnionStringMaps( - map[string]string{v1alpha4.ProvisionerNameLabelKey: provisioner.Name}, + node.Labels, packing.Constraints.Labels, + map[string]string{v1alpha4.ProvisionerNameLabelKey: provisioner.Name}, ) node.Spec.Taints = append(node.Spec.Taints, packing.Constraints.Taints...) return c.Binder.Bind(ctx, node, packing.Pods) diff --git a/pkg/controllers/allocation/filter.go b/pkg/controllers/allocation/filter.go index 6e03856b2ec6..b554cbaee51c 100644 --- a/pkg/controllers/allocation/filter.go +++ b/pkg/controllers/allocation/filter.go @@ -19,10 +19,9 @@ import ( "fmt" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" - "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" - "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/awslabs/karpenter/pkg/utils/pod" "github.com/awslabs/karpenter/pkg/utils/ptr" + "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" @@ -53,11 +52,9 @@ func (f *Filter) GetProvisionablePods(ctx context.Context, provisioner *v1alpha4 } func (f *Filter) isProvisionable(ctx context.Context, pod *v1.Pod, provisioner *v1alpha4.Provisioner) error { - return functional.ValidateAll( - func() error { return f.isUnschedulable(pod) }, - func() error { return f.matchesProvisioner(pod, provisioner) }, - func() error { return f.hasSupportedSchedulingConstraints(pod) }, - func() error { return scheduling.Tolerates(pod, provisioner.Spec.Taints...) }, + return multierr.Combine( + f.isUnschedulable(pod), + f.matchesProvisioner(pod, provisioner), ) } @@ -71,18 +68,6 @@ func (f *Filter) isUnschedulable(p *v1.Pod) error { return nil } -func (f *Filter) hasSupportedSchedulingConstraints(pod *v1.Pod) error { - if pod.Spec.Affinity != nil { - return fmt.Errorf("affinity is not supported") - } - for _, constraint := range pod.Spec.TopologySpreadConstraints { - if supported := []string{v1.LabelHostname, v1.LabelTopologyZone}; !functional.ContainsString(supported, constraint.TopologyKey) { - return fmt.Errorf("unsupported topology key, %s not in %s", constraint.TopologyKey, supported) - } - } - return nil -} - func (f *Filter) matchesProvisioner(pod *v1.Pod, provisioner *v1alpha4.Provisioner) error { name, ok := pod.Spec.NodeSelector[v1alpha4.ProvisionerNameLabelKey] if ok && provisioner.Name == name { diff --git a/pkg/controllers/allocation/scheduling/constraints.go b/pkg/controllers/allocation/scheduling/constraints.go index 3b7015aa7ffe..0a5302c38b9e 100644 --- a/pkg/controllers/allocation/scheduling/constraints.go +++ b/pkg/controllers/allocation/scheduling/constraints.go @@ -15,56 +15,138 @@ limitations under the License. package scheduling import ( + "context" + "fmt" + "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" + "github.com/awslabs/karpenter/pkg/scheduling" "github.com/awslabs/karpenter/pkg/utils/functional" + "go.uber.org/multierr" v1 "k8s.io/api/core/v1" ) -func NewConstraintsWithOverrides(constraints *v1alpha4.Constraints, pod *v1.Pod) *v1alpha4.Constraints { - return &v1alpha4.Constraints{ - Provider: constraints.Provider, - Labels: functional.UnionStringMaps(constraints.Labels, pod.Spec.NodeSelector), - Taints: overrideTaints(constraints.Taints, pod), - Zones: GetOrDefault(v1.LabelTopologyZone, pod.Spec.NodeSelector, constraints.Zones), - InstanceTypes: GetOrDefault(v1.LabelInstanceTypeStable, pod.Spec.NodeSelector, constraints.InstanceTypes), - Architectures: GetOrDefault(v1.LabelArchStable, pod.Spec.NodeSelector, constraints.Architectures), - OperatingSystems: GetOrDefault(v1.LabelOSStable, pod.Spec.NodeSelector, constraints.OperatingSystems), +// NewConstraints overrides the constraints with pod scheduling constraints +func NewConstraints(ctx context.Context, constraints *v1alpha4.Constraints, pod *v1.Pod) (*v1alpha4.Constraints, error) { + // Validate that the pod is viable + if err := multierr.Combine( + validateAffinity(pod), + validateTopology(pod), + scheduling.Taints(constraints.Taints).Tolerates(pod), + ); err != nil { + return nil, err } -} -// GetOrDefault uses a nodeSelector's value if exists, otherwise defaults -func GetOrDefault(key string, nodeSelector map[string]string, defaults []string) []string { - // Use override if set - if nodeSelector != nil && len(nodeSelector[key]) > 0 { - return []string{nodeSelector[key]} + // Copy constraints and apply pod scheduling constraints + constraints = constraints.DeepCopy() + if err := constraints.Constrain(ctx, pod); err != nil { + return nil, err + } + if err := generateLabels(constraints, pod); err != nil { + return nil, err + } + if err := generateTaints(constraints, pod); err != nil { + return nil, err } - // Otherwise use defaults - return defaults + return constraints, nil } -func overrideTaints(taints []v1.Taint, pod *v1.Pod) []v1.Taint { - overrides := []v1.Taint{} - // Generate taints from pod tolerations +func generateTaints(constraints *v1alpha4.Constraints, pod *v1.Pod) error { + taints := scheduling.Taints(constraints.Taints) for _, toleration := range pod.Spec.Tolerations { - // Only OpEqual is supported + // Only OpEqual is supported. OpExists does not make sense for + // provisioning -- in theory we could create a taint on the node with a + // random string, but it's unclear use case this would accomplish. if toleration.Operator != v1.TolerationOpEqual { continue } + var generated []v1.Taint // Use effect if defined, otherwise taint all effects if toleration.Effect != "" { - overrides = append(overrides, v1.Taint{Key: toleration.Key, Value: toleration.Value, Effect: toleration.Effect}) + generated = []v1.Taint{{Key: toleration.Key, Value: toleration.Value, Effect: toleration.Effect}} } else { - overrides = append(overrides, - v1.Taint{Key: toleration.Key, Value: toleration.Value, Effect: v1.TaintEffectNoSchedule}, - v1.Taint{Key: toleration.Key, Value: toleration.Value, Effect: v1.TaintEffectNoExecute}, - ) + generated = []v1.Taint{ + {Key: toleration.Key, Value: toleration.Value, Effect: v1.TaintEffectNoSchedule}, + {Key: toleration.Key, Value: toleration.Value, Effect: v1.TaintEffectNoExecute}, + } } + // Only add taints that do not already exist on constraints + for _, taint := range generated { + if !taints.Has(taint) { + taints = append(taints, taint) + } + } + } + constraints.Taints = taints + return nil +} + +func generateLabels(constraints *v1alpha4.Constraints, pod *v1.Pod) error { + labels := map[string]string{} + // Default to constraint labels + for key, value := range constraints.Labels { + labels[key] = value + } + // Override with pod labels + nodeAffinity := scheduling.NodeAffinityFor(pod) + for _, key := range nodeAffinity.GetLabels() { + if _, ok := v1alpha4.WellKnownLabels[key]; !ok { + var labelConstraints []string + if value, ok := constraints.Labels[key]; ok { + labelConstraints = append(labelConstraints, value) + } + values := nodeAffinity.GetLabelValues(key, labelConstraints) + if len(values) == 0 { + return fmt.Errorf("label %s is too constrained", key) + } + labels[key] = values[0] + } + } + constraints.Labels = labels + return nil +} + +func validateTopology(pod *v1.Pod) (errs error) { + for _, constraint := range pod.Spec.TopologySpreadConstraints { + if supported := []string{v1.LabelHostname, v1.LabelTopologyZone}; !functional.ContainsString(supported, constraint.TopologyKey) { + errs = multierr.Append(errs, fmt.Errorf("unsupported topology key, %s not in %s", constraint.TopologyKey, supported)) + } + } + return errs +} + +func validateAffinity(pod *v1.Pod) (errs error) { + if pod.Spec.Affinity == nil { + return nil + } + if pod.Spec.Affinity.PodAffinity != nil { + errs = multierr.Append(errs, fmt.Errorf("pod affinity is not supported")) + } + if pod.Spec.Affinity.PodAntiAffinity != nil { + errs = multierr.Append(errs, fmt.Errorf("pod anti-affinity is not supported")) + } + if pod.Spec.Affinity.NodeAffinity != nil { + for _, term := range pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + errs = multierr.Append(errs, validateNodeSelectorTerm(term.Preference, pod.Spec.NodeSelector)) + } + if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { + for _, term := range pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { + errs = multierr.Append(errs, validateNodeSelectorTerm(term, pod.Spec.NodeSelector)) + } + } + } + return errs +} + +func validateNodeSelectorTerm(term v1.NodeSelectorTerm, nodeSelector map[string]string) (errs error) { + if term.MatchFields != nil { + errs = multierr.Append(errs, fmt.Errorf("matchFields is not supported")) } - // Add default taints if not overriden by pod above - for _, taint := range taints { - if !HasTaint(overrides, taint.Key) { - overrides = append(overrides, taint) + if term.MatchExpressions != nil { + for _, requirement := range term.MatchExpressions { + if !functional.ContainsString([]string{string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn)}, string(requirement.Operator)) { + errs = multierr.Append(errs, fmt.Errorf("unsupported operator, %s", requirement.Operator)) + } } } - return overrides + return errs } diff --git a/pkg/controllers/allocation/scheduling/scheduler.go b/pkg/controllers/allocation/scheduling/scheduler.go index 159d8c209be8..3d4d22c82029 100644 --- a/pkg/controllers/allocation/scheduling/scheduler.go +++ b/pkg/controllers/allocation/scheduling/scheduler.go @@ -22,11 +22,11 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/metrics" + "github.com/awslabs/karpenter/pkg/scheduling" "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/client_golang/prometheus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" @@ -65,7 +65,6 @@ func NewScheduler(cloudProvider cloudprovider.CloudProvider, kubeClient client.C return &Scheduler{ KubeClient: kubeClient, Topology: &Topology{ - cloudProvider: cloudProvider, kubeClient: kubeClient, }, } @@ -101,54 +100,46 @@ func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha4.Provisioner } func (s *Scheduler) solve(ctx context.Context, constraints *v1alpha4.Constraints, pods []*v1.Pod) ([]*Schedule, error) { - // 1. Inject temporarily adds specific NodeSelectors to pods, which are then + // Apply runtime constraints + constraints = constraints.DeepCopy() + if err := constraints.Constrain(ctx); err != nil { + return nil, fmt.Errorf("applying constraints, %w", err) + } + // Inject temporarily adds specific NodeSelectors to pods, which are then // used by scheduling logic. This isn't strictly necessary, but is a useful // trick to avoid passing topology decisions through the scheduling code. It // lets us to treat TopologySpreadConstraints as just-in-time NodeSelectors. if err := s.Topology.Inject(ctx, constraints, pods); err != nil { return nil, fmt.Errorf("injecting topology, %w", err) } - - // 2. Separate pods into schedules of compatible scheduling constraints. + // Separate pods into schedules of isomorphic scheduling constraints. schedules, err := s.getSchedules(ctx, constraints, pods) if err != nil { return nil, fmt.Errorf("getting schedules, %w", err) } - - // 3. Remove labels injected by TopologySpreadConstraints. - for _, schedule := range schedules { - delete(schedule.Labels, v1.LabelHostname) - } - for _, pod := range pods { - delete(pod.Labels, v1.LabelHostname) - } return schedules, nil } // getSchedules separates pods into a set of schedules. All pods in each group -// contain compatible scheduling constarints and can be deployed together on the +// contain isomorphic scheduling constraints and can be deployed together on the // same node, or multiple similar nodes if the pods exceed one node's capacity. -func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha4.Constraints, pods []*v1.Pod) ([]*Schedule, error) { +func (s *Scheduler) getSchedules(ctx context.Context, v1alpha4constraints *v1alpha4.Constraints, pods []*v1.Pod) ([]*Schedule, error) { // schedule uniqueness is tracked by hash(Constraints) schedules := map[uint64]*Schedule{} for _, pod := range pods { - constraints := NewConstraintsWithOverrides(constraints, pod) - constraints.Default(ctx) - if err := constraints.Validate(ctx); err != nil { + constraints, err := NewConstraints(ctx, v1alpha4constraints, pod) + if err != nil { logging.FromContext(ctx).Debugf("Ignored pod %s/%s due to invalid constraints, %s", pod.Name, pod.Namespace, err.Error()) continue } - key, err := hashstructure.Hash(constraints, hashstructure.FormatV2, nil) + key, err := hashstructure.Hash(constraints, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) if err != nil { return nil, fmt.Errorf("hashing constraints, %w", err) } // Create new schedule if one doesn't exist if _, ok := schedules[key]; !ok { // Uses a theoretical node object to compute schedulablility of daemonset overhead. - daemons, err := s.getDaemons(ctx, &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Labels: constraints.Labels}, - Spec: v1.NodeSpec{Taints: constraints.Taints}, - }) + daemons, err := s.getDaemons(ctx, constraints) if err != nil { return nil, fmt.Errorf("computing node overhead, %w", err) } @@ -169,7 +160,7 @@ func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha4.Cons return result, nil } -func (s *Scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) { +func (s *Scheduler) getDaemons(ctx context.Context, constraints *v1alpha4.Constraints) ([]*v1.Pod, error) { // 1. Get DaemonSets daemonSetList := &appsv1.DaemonSetList{} if err := s.KubeClient.List(ctx, daemonSetList); err != nil { @@ -180,23 +171,24 @@ func (s *Scheduler) getDaemons(ctx context.Context, node *v1.Node) ([]*v1.Pod, e pods := []*v1.Pod{} for _, daemonSet := range daemonSetList.Items { pod := &v1.Pod{Spec: daemonSet.Spec.Template.Spec} - if IsSchedulable(pod, node) { + if DaemonWillSchedule(constraints, pod) { pods = append(pods, pod) } } return pods, nil } + // IsSchedulable returns true if the pod can schedule to the node -func IsSchedulable(pod *v1.Pod, node *v1.Node) bool { +func DaemonWillSchedule(constraints *v1alpha4.Constraints, pod *v1.Pod) bool { // Tolerate Taints - if err := Tolerates(pod, node.Spec.Taints...); err != nil { + if err := scheduling.Taints(constraints.Taints).Tolerates(pod); err != nil { return false } // Match Node Selector labels - if !labels.SelectorFromSet(pod.Spec.NodeSelector).Matches(labels.Set(node.Labels)) { + if !labels.SelectorFromSet(pod.Spec.NodeSelector).Matches(labels.Set(constraints.Labels)) { return false } - // TODO, support node affinity + // TODO support node affinity for daemonset return true } diff --git a/pkg/controllers/allocation/scheduling/suite_test.go b/pkg/controllers/allocation/scheduling/suite_test.go index 6da36190a02e..a3059d875b21 100644 --- a/pkg/controllers/allocation/scheduling/suite_test.go +++ b/pkg/controllers/allocation/scheduling/suite_test.go @@ -81,6 +81,293 @@ var _ = AfterEach(func() { ExpectCleanedUp(env.Client) }) +var _ = Describe("NodeAffinity", func() { + Context("Custom Labels", func() { + It("should schedule pods that have matching node selectors", func() { + provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) + }) + It("should generate custom labels for node selectors", func() { + provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{"another-key": "another-value"}, + })) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue("another-key", "another-value")) + Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) + }) + It("should not schedule pods that have conflicting node selectors", func() { + provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{"test-key": "different-value"}, + })) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should schedule pods that have matching requirements", func() { + provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: "test-key", Operator: v1.NodeSelectorOpIn, Values: []string{"test-value", "another-value"}}, + }}, + )) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) + }) + It("should not schedule pods that have conflicting requirements", func() { + provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: "test-key", Operator: v1.NodeSelectorOpIn, Values: []string{"another-value"}}, + }}, + )) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should generate custom labels for requirements", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: "test-key", Operator: v1.NodeSelectorOpIn, Values: []string{"test-value", "another-value"}}, + }}, + )) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) + }) + It("should schedule pods that have matching preferences", func() { + provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodePreferences: []v1.NodeSelectorRequirement{ + {Key: "test-key", Operator: v1.NodeSelectorOpIn, Values: []string{"test-value", "another-value"}}, + }}, + )) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) + }) + It("should not schedule pods with have conflicting preferences", func() { + provisioner.Spec.Labels = map[string]string{"test-key": "test-value"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodePreferences: []v1.NodeSelectorRequirement{ + {Key: "test-key", Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-value"}}, + }}, + )) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should generate custom labels for preferences", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + test.PodOptions{NodePreferences: []v1.NodeSelectorRequirement{ + {Key: "test-key", Operator: v1.NodeSelectorOpIn, Values: []string{"test-value", "another-value"}}, + }}, + )) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) + }) + It("should generate custom labels with both requirements and preferences", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod( + test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: "test-key", Operator: v1.NodeSelectorOpIn, Values: []string{"test-value", "another-value"}}, + }, + NodePreferences: []v1.NodeSelectorRequirement{ + {Key: "test-key", Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-value"}}, + }, + }, + )) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue("test-key", "another-value")) + }) + }) + Context("Well Known Labels", func() { + It("should use provisioner constraints", func() { + provisioner.Spec.Zones = []string{"test-zone-2"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) + }) + It("should use node selectors", func() { + provisioner.Spec.Zones = []string{"test-zone-1", "test-zone-2"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-2"}})) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) + }) + It("should not schedule the pod if nodeselector unknown", func() { + provisioner.Spec.Zones = []string{"test-zone-1"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "unknown"}})) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should not schedule if node selector outside of provisioner constraints", func() { + provisioner.Spec.Zones = []string{"test-zone-1"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-2"}})) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should schedule compatible requirements with Operator=In", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-3"}}}, + })) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-3")) + }) + It("should not schedule incompatible preferences and requirements with Operator=In", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"unknown"}}}, + })) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should schedule compatible requirements with Operator=NotIn", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-2", "unknown"}}}, + })) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-3")) + }) + It("should not schedule incompatible preferences and requirements with Operator=NotIn", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, + })) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should schedule compatible preferences and requirements with Operator=In", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, + NodePreferences: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-2", "unknown"}}}, + })) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) + }) + It("should not schedule incompatible preferences and requirements with Operator=In", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, + NodePreferences: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"unknown"}}}, + })) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should schedule compatible preferences and requirements with Operator=NotIn", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, + NodePreferences: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-3"}}}, + })) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-2")) + }) + It("should not schedule incompatible preferences and requirements with Operator=NotIn", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3", "unknown"}}}, + NodePreferences: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3"}}}, + })) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should schedule compatible node selectors, preferences and requirements", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-3"}, + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3"}}}, + NodePreferences: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-2", "test-zone-3"}}}, + })) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-3")) + }) + It("should not schedule incompatible node selectors, preferences and requirements", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-3"}, + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-3"}}}, + NodePreferences: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-2", "test-zone-3"}}}, + })) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + It("should combine multidimensional node selectors, preferences and requirements", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{ + v1.LabelTopologyZone: "test-zone-3", + v1.LabelInstanceTypeStable: "arm-instance-type", + }, + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-3"}}, + {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"default-instance-type", "arm-instance-type"}}, + }, + NodePreferences: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"unnknown"}}, + {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpNotIn, Values: []string{"unknown"}}, + }, + })) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-3")) + Expect(node.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "arm-instance-type")) + }) + It("should not combine incompatible multidimensional node selectors, preferences and requirements", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{ + NodeSelector: map[string]string{ + v1.LabelTopologyZone: "test-zone-3", + v1.LabelInstanceTypeStable: "arm-instance-type", + }, + NodeRequirements: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test-zone-1", "test-zone-3"}}, + {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpIn, Values: []string{"default-instance-type", "arm-instance-type"}}, + }, + NodePreferences: []v1.NodeSelectorRequirement{ + {Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test-zone-3"}}, + {Key: v1.LabelInstanceTypeStable, Operator: v1.NodeSelectorOpNotIn, Values: []string{"arm-instance-type"}}, + }, + })) + Expect(pods[0].Spec.NodeName).To(BeEmpty()) + }) + }) +}) + var _ = Describe("Topology", func() { labels := map[string]string{"test": "test"} @@ -97,7 +384,7 @@ var _ = Describe("Topology", func() { Expect(pods[0].Spec.NodeName).To(BeEmpty()) }) - Context("Zone", func() { + Context("Zonal", func() { It("should balance pods across zones", func() { ExpectCreated(env.Client, provisioner) topology := []v1.TopologySpreadConstraint{{ @@ -193,7 +480,7 @@ var _ = Describe("Topology", func() { }) }) - Context("Combined Hostname and Topology", func() { + Context("Combined Hostname and Zonal Topology", func() { It("should spread pods while respecting both constraints", func() { ExpectCreated(env.Client, provisioner) topology := []v1.TopologySpreadConstraint{{ @@ -248,6 +535,8 @@ var _ = Describe("Taints", func() { test.UnschedulablePod(), // key mismatch with OpExists test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Key: "invalid", Operator: v1.TolerationOpExists}}}), + // value mismatch + test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Key: "test-key", Operator: v1.TolerationOpEqual, Effect: v1.TaintEffectNoSchedule}}}), } ExpectCreated(env.Client, provisioner) ExpectCreatedWithStatus(env.Client, schedulable...) @@ -257,6 +546,7 @@ var _ = Describe("Taints", func() { nodes := &v1.NodeList{} Expect(env.Client.List(ctx, nodes)).To(Succeed()) Expect(len(nodes.Items)).To(Equal(1)) + Expect(nodes.Items[0].Spec.Taints[0]).To(Equal(provisioner.Spec.Taints[0])) for _, pod := range schedulable { scheduled := ExpectPodExists(env.Client, pod.GetName(), pod.GetNamespace()) ExpectNodeExists(env.Client, scheduled.Spec.NodeName) diff --git a/pkg/controllers/allocation/scheduling/topology.go b/pkg/controllers/allocation/scheduling/topology.go index 47f80797f0bc..8f1ed8d469e2 100644 --- a/pkg/controllers/allocation/scheduling/topology.go +++ b/pkg/controllers/allocation/scheduling/topology.go @@ -22,7 +22,7 @@ import ( "github.com/Pallinder/go-randomdata" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" - "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/awslabs/karpenter/pkg/scheduling" "github.com/awslabs/karpenter/pkg/utils/apiobject" "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/mitchellh/hashstructure/v2" @@ -32,8 +32,7 @@ import ( ) type Topology struct { - cloudProvider cloudprovider.CloudProvider - kubeClient client.Client + kubeClient client.Client } // Inject injects topology rules into pods using supported NodeSelectors @@ -111,15 +110,7 @@ func (t *Topology) computeHostnameTopology(ctx context.Context, topologyGroup *T // selection. For example, if a cloud provider or provisioner changes the viable // set of nodes, topology calculations will rebalance the new set of zones. func (t *Topology) computeZonalTopology(ctx context.Context, constraints *v1alpha4.Constraints, topologyGroup *TopologyGroup) error { - // 1. Get viable domains - zones, err := t.cloudProvider.GetZones(ctx, constraints) - if err != nil { - return fmt.Errorf("getting zones, %w", err) - } - if constrained := NewConstraintsWithOverrides(constraints, topologyGroup.Pods[0]).Zones; len(constrained) != 0 { - zones = functional.IntersectStringSlice(zones, constrained) - } - topologyGroup.Register(zones...) + topologyGroup.Register(scheduling.NodeAffinityFor(topologyGroup.Pods[0]).GetLabelValues(v1.LabelTopologyZone, constraints.Zones)...) // 2. Increment domains for matching pods if err := t.countMatchingPods(ctx, topologyGroup); err != nil { return fmt.Errorf("getting matching pods, %w", err) diff --git a/pkg/controllers/allocation/suite_test.go b/pkg/controllers/allocation/suite_test.go index 2f9d6787791e..ae4cbd7256b5 100644 --- a/pkg/controllers/allocation/suite_test.go +++ b/pkg/controllers/allocation/suite_test.go @@ -88,37 +88,6 @@ var _ = Describe("Allocation", func() { }) Context("Reconcilation", func() { - Context("Zones", func() { - It("should default to a cluster zone", func() { - // Setup - ExpectCreated(env.Client, provisioner) - ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner)) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - // Assertions - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) - Expect(node.Spec.ProviderID).To(ContainSubstring("test-zone-1")) - }) - It("should default to a provisioner's zone", func() { - // Setup - provisioner.Spec.Zones = []string{"test-zone-2"} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - // Assertions - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) - Expect(node.Spec.ProviderID).To(ContainSubstring("test-zone-2")) - }) - It("should allow a pod to override the zone", func() { - // Setup - provisioner.Spec.Zones = []string{"test-zone-1"} - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, - test.UnschedulablePod(test.PodOptions{NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-2"}}), - ) - // Assertions - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) - Expect(node.Spec.ProviderID).To(ContainSubstring("test-zone-2")) - }) - }) It("should provision nodes for unconstrained pods", func() { ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, @@ -168,10 +137,7 @@ var _ = Describe("Allocation", func() { Expect(len(nodes.Items)).To(Equal(6)) // 5 schedulable -> 5 node, 2 coschedulable -> 1 node for _, pod := range schedulable { scheduled := ExpectPodExists(env.Client, pod.GetName(), pod.GetNamespace()) - node := ExpectNodeExists(env.Client, scheduled.Spec.NodeName) - for key, value := range scheduled.Spec.NodeSelector { - Expect(node.Labels[key]).To(Equal(value)) - } + ExpectNodeExists(env.Client, scheduled.Spec.NodeName) } for _, pod := range unschedulable { unscheduled := ExpectPodExists(env.Client, pod.GetName(), pod.GetNamespace()) @@ -239,13 +205,38 @@ var _ = Describe("Allocation", func() { Expect(*nodes.Items[0].Status.Allocatable.Cpu()).To(Equal(resource.MustParse("4"))) Expect(*nodes.Items[0].Status.Allocatable.Memory()).To(Equal(resource.MustParse("4Gi"))) }) - It("should labels nodes with provisioner name", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod(test.PodOptions{})) - for _, pod := range pods { - node := ExpectNodeExists(env.Client, pod.Spec.NodeName) + + Context("Labels", func() { + It("should label nodes with provisioner labels", func() { + provisioner.Spec.Labels = map[string]string{"test-key": "test-value", "test-key-2": "test-value-2"} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) + Expect(node.Labels).To(HaveKeyWithValue("test-key-2", "test-value-2")) + }) + It("should labels nodes with provisioner name", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) Expect(node.Labels).To(HaveKeyWithValue(v1alpha4.ProvisionerNameLabelKey, provisioner.Name)) - } + }) + }) + Context("Taints", func() { + It("should apply unready taints", func() { + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Spec.Taints).To(ContainElement(v1.Taint{Key: v1alpha4.NotReadyTaintKey, Effect: v1.TaintEffectNoSchedule})) + }) + It("should taint nodes with provisioner taints", func() { + provisioner.Spec.Taints = []v1.Taint{{Key: "test", Value: "bar", Effect: v1.TaintEffectNoSchedule}} + ExpectCreated(env.Client, provisioner) + pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + test.UnschedulablePod(test.PodOptions{Tolerations: []v1.Toleration{{Effect: v1.TaintEffectNoSchedule, Operator: v1.TolerationOpExists}}})) + node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) + Expect(node.Spec.Taints).To(ContainElement(provisioner.Spec.Taints[0])) + }) }) }) }) diff --git a/pkg/controllers/termination/terminate.go b/pkg/controllers/termination/terminate.go index 4c8e1521fac1..77191cb50264 100644 --- a/pkg/controllers/termination/terminate.go +++ b/pkg/controllers/termination/terminate.go @@ -20,7 +20,7 @@ import ( provisioning "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha4" "github.com/awslabs/karpenter/pkg/cloudprovider" - "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" + "github.com/awslabs/karpenter/pkg/scheduling" "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/awslabs/karpenter/pkg/utils/injectabletime" "github.com/awslabs/karpenter/pkg/utils/ptr" @@ -114,7 +114,7 @@ func (t *Terminator) getEvictablePods(pods []*v1.Pod) []*v1.Pod { evictable := []*v1.Pod{} for _, pod := range pods { // Ignore if unschedulable is tolerated, since they will reschedule - if scheduling.Tolerates(pod, v1.Taint{Key: v1.TaintNodeUnschedulable, Effect: v1.TaintEffectNoSchedule}) == nil { + if scheduling.Taints(scheduling.Taints{{Key: v1.TaintNodeUnschedulable, Effect: v1.TaintEffectNoSchedule}}).Tolerates(pod) == nil { continue } // Ignore if kubelet is partitioned and pods are beyond graceful termination window diff --git a/pkg/scheduling/nodeaffinity.go b/pkg/scheduling/nodeaffinity.go new file mode 100644 index 000000000000..019f966d9d31 --- /dev/null +++ b/pkg/scheduling/nodeaffinity.go @@ -0,0 +1,77 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "github.com/awslabs/karpenter/pkg/utils/functional" + v1 "k8s.io/api/core/v1" +) + +type NodeAffinity []v1.NodeSelectorRequirement + +// NodeAffinityFor constructs a set of requirements for the pods +func NodeAffinityFor(pods ...*v1.Pod) (nodeAffinity NodeAffinity) { + for _, pod := range pods { + // Convert node selectors to requirements + for key, value := range pod.Spec.NodeSelector { + nodeAffinity = append(nodeAffinity, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) + } + if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil { + continue + } + // Preferences are treated as requirements. An outer loop will iteratively unconstrain them if unsatisfiable + for _, term := range pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + nodeAffinity = append(nodeAffinity, term.Preference.MatchExpressions...) + } + if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { + for _, term := range pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { + nodeAffinity = append(nodeAffinity, term.MatchExpressions...) + } + } + } + return nodeAffinity +} + +// GetLabels returns the label keys specified by the scheduling rules +func (n NodeAffinity) GetLabels() []string { + keys := map[string]bool{} + for _, requirement := range n { + keys[requirement.Key] = true + } + result := []string{} + for key := range keys { + result = append(result, key) + } + return result +} + +// GetLabelValues for the provided key. Default values are used to substract options for NotIn. +func (n NodeAffinity) GetLabelValues(label string, constraints ...[]string) []string { + // Intersect external constraints + result := functional.IntersectStringSlice(constraints...) + // OpIn + for _, requirement := range n { + if requirement.Key == label && requirement.Operator == v1.NodeSelectorOpIn { + result = functional.IntersectStringSlice(result, requirement.Values) + } + } + // OpNotIn + for _, requirement := range n { + if requirement.Key == label && requirement.Operator == v1.NodeSelectorOpNotIn { + result = functional.StringSliceWithout(result, requirement.Values...) + } + } + return result +} diff --git a/pkg/controllers/allocation/scheduling/taints.go b/pkg/scheduling/taints.go similarity index 62% rename from pkg/controllers/allocation/scheduling/taints.go rename to pkg/scheduling/taints.go index e83680262286..08972bf9e298 100644 --- a/pkg/controllers/allocation/scheduling/taints.go +++ b/pkg/scheduling/taints.go @@ -21,27 +21,28 @@ import ( v1 "k8s.io/api/core/v1" ) -// Tolerates returns an error if the pod does not tolerate the taints -// https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/#concepts -func Tolerates(pod *v1.Pod, taints ...v1.Taint) error { - var err error - for _, taint := range taints { +type Taints []v1.Taint + +// Has returns true if taints has a taint for the given key +func (ts Taints) Has(taint v1.Taint) bool { + for _, t := range ts { + if t.Key == taint.Key && t.Effect == taint.Effect { + return true + } + } + return false +} + +// Tolerates returns true if the pod tolerates all taints +func (ts Taints) Tolerates(pod *v1.Pod) (errs error) { + for _, taint := range ts { tolerates := false for _, t := range pod.Spec.Tolerations { tolerates = tolerates || t.ToleratesTaint(&taint) } if !tolerates { - err = multierr.Append(err, fmt.Errorf("did not tolerate %s=%s:%s", taint.Key, taint.Value, taint.Effect)) + errs = multierr.Append(errs, fmt.Errorf("did not tolerate %s=%s:%s", taint.Key, taint.Value, taint.Effect)) } } - return err -} - -func HasTaint(taints []v1.Taint, key string) bool { - for _, taint := range taints { - if taint.Key == key { - return true - } - } - return false + return errs } diff --git a/pkg/test/pods.go b/pkg/test/pods.go index 56a2295854e9..4613d89eadc7 100644 --- a/pkg/test/pods.go +++ b/pkg/test/pods.go @@ -35,6 +35,8 @@ type PodOptions struct { NodeName string ResourceRequirements v1.ResourceRequirements NodeSelector map[string]string + NodeRequirements []v1.NodeSelectorRequirement + NodePreferences []v1.NodeSelectorRequirement TopologySpreadConstraints []v1.TopologySpreadConstraint Tolerations []v1.Toleration Conditions []v1.PodCondition @@ -80,6 +82,7 @@ func Pod(overrides ...PodOptions) *v1.Pod { }, Spec: v1.PodSpec{ NodeSelector: options.NodeSelector, + Affinity: buildAffinity(options.NodeRequirements, options.NodePreferences), TopologySpreadConstraints: options.TopologySpreadConstraints, Tolerations: options.Tolerations, Containers: []v1.Container{{ @@ -127,3 +130,23 @@ func PodDisruptionBudget(overrides ...PDBOptions) *v1beta1.PodDisruptionBudget { }, } } + +func buildAffinity(nodeRequirements []v1.NodeSelectorRequirement, nodePreferences []v1.NodeSelectorRequirement) *v1.Affinity { + var affinity *v1.Affinity + if nodeRequirements == nil && nodePreferences == nil { + return affinity + } + affinity = &v1.Affinity{NodeAffinity: &v1.NodeAffinity{}} + + if nodeRequirements != nil { + affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{MatchExpressions: nodeRequirements}}, + } + } + if nodePreferences != nil { + affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = []v1.PreferredSchedulingTerm{ + {Weight: 1, Preference: v1.NodeSelectorTerm{MatchExpressions: nodePreferences}}, + } + } + return affinity +} diff --git a/pkg/utils/functional/functional.go b/pkg/utils/functional/functional.go index 9e68ec3395b3..aad0882145fb 100644 --- a/pkg/utils/functional/functional.go +++ b/pkg/utils/functional/functional.go @@ -16,8 +16,6 @@ package functional import ( "strings" - - "go.uber.org/multierr" ) // UnionStringMaps merges all key value pairs into a single map, last write wins. @@ -32,7 +30,10 @@ func UnionStringMaps(maps ...map[string]string) map[string]string { } func StringSliceWithout(vals []string, remove ...string) []string { - without := []string{} + if vals == nil { + return nil + } + var without []string for _, val := range vals { if ContainsString(remove, val) { continue @@ -42,31 +43,47 @@ func StringSliceWithout(vals []string, remove ...string) []string { return without } -// IntersectStringSlice takes the intersection of all string slices +// IntersectStringSlice takes the intersection of the slices. +// Semantically: +// 1. [],[a,b] -> []: Empty set will always result in [] +// 2. nil,[a,b] -> [a,b]: Nil is the universal set and does not constrain +// 3. ([a,b],[b]) -> [b]: Takes the intersection of the two sets func IntersectStringSlice(slices ...[]string) []string { - // count occurrences - counts := map[string]int{} - for _, strings := range slices { - for _, s := range UniqueStrings(strings) { - counts[s] = counts[s] + 1 - } + if len(slices) == 0 { + return nil + } + if len(slices) == 1 { + return UniqueStrings(slices[0]) + } + if slices[0] == nil { + return IntersectStringSlice(slices[1:]...) + } + if slices[1] == nil { + sliced := append(slices[:1], slices[2:]...) + return IntersectStringSlice(sliced...) } - // select if occurred in all - var intersection []string - for key, count := range counts { - if count == len(slices) { - intersection = append(intersection, key) + counts := map[string]bool{} + for _, s := range slices[0] { + counts[s] = true + } + intersection := []string{} + for _, s := range slices[1] { + if _, ok := counts[s]; ok { + intersection = append(intersection, s) } } - return intersection + return IntersectStringSlice(append(slices[2:], intersection)...) } func UniqueStrings(strings []string) []string { + if strings == nil { + return nil + } exists := map[string]bool{} for _, s := range strings { exists[s] = true } - var unique []string + unique := []string{} for s := range exists { unique = append(unique, s) } @@ -82,15 +99,6 @@ func ContainsString(strings []string, candidate string) bool { return false } -// ValidateAll returns nil if all errorables return nil, otherwise returns the concatenated failure messages. -func ValidateAll(errorables ...func() error) error { - var err error - for _, errorable := range errorables { - err = multierr.Append(err, errorable()) - } - return err -} - // HasAnyPrefix returns true if any of the provided prefixes match the given string s func HasAnyPrefix(s string, prefixes ...string) bool { for _, prefix := range prefixes { diff --git a/pkg/utils/functional/suite_test.go b/pkg/utils/functional/suite_test.go index e2fb16da625a..dfca77e81101 100644 --- a/pkg/utils/functional/suite_test.go +++ b/pkg/utils/functional/suite_test.go @@ -82,4 +82,42 @@ var _ = Describe("Functional", func() { Expect(UnionStringMaps(original, disjoiner, empty, uberwriter)).To(Equal(expected)) }) }) + Context("IntersectStringSlice", func() { + var nilset []string + empty := []string{} + universe := []string{"a", "b", "c"} + subset := []string{"a", "b"} + overlap := []string{"a", "b", "d"} + disjoint := []string{"d", "e"} + duplicates := []string{"a", "a"} + Specify("nil set", func() { + Expect(IntersectStringSlice()).To(BeNil()) + Expect(IntersectStringSlice(nilset)).To(BeNil()) + Expect(IntersectStringSlice(nilset, nilset)).To(BeNil()) + Expect(IntersectStringSlice(nilset, universe)).To(ConsistOf(universe)) + Expect(IntersectStringSlice(universe, nilset)).To(ConsistOf(universe)) + Expect(IntersectStringSlice(universe, nilset, nilset)).To(ConsistOf(universe)) + }) + Specify("empty set", func() { + Expect(IntersectStringSlice(empty, nilset)).To(And(BeEmpty(), Not(BeNil()))) + Expect(IntersectStringSlice(nilset, empty)).To(And(BeEmpty(), Not(BeNil()))) + Expect(IntersectStringSlice(universe, empty)).To(And(BeEmpty(), Not(BeNil()))) + Expect(IntersectStringSlice(universe, universe, empty)).To(And(BeEmpty(), Not(BeNil()))) + }) + Specify("intersect", func() { + Expect(IntersectStringSlice(universe, subset)).To(ConsistOf(subset)) + Expect(IntersectStringSlice(subset, universe)).To(ConsistOf(subset)) + Expect(IntersectStringSlice(universe, overlap)).To(ConsistOf(subset)) + Expect(IntersectStringSlice(overlap, universe)).To(ConsistOf(subset)) + Expect(IntersectStringSlice(universe, disjoint)).To(And(BeEmpty(), Not(BeNil()))) + Expect(IntersectStringSlice(disjoint, universe)).To(And(BeEmpty(), Not(BeNil()))) + Expect(IntersectStringSlice(overlap, disjoint, universe)).To(And(BeEmpty(), Not(BeNil()))) + }) + Specify("duplicates", func() { + Expect(IntersectStringSlice(duplicates)).To(ConsistOf("a")) + Expect(IntersectStringSlice(duplicates, nilset)).To(ConsistOf("a")) + Expect(IntersectStringSlice(duplicates, universe)).To(ConsistOf("a")) + Expect(IntersectStringSlice(duplicates, universe, subset)).To(ConsistOf("a")) + }) + }) })