diff --git a/pkg/apis/provisioning/v1alpha5/provisioner.go b/pkg/apis/provisioning/v1alpha5/provisioner.go index 7c78a284f2a9..b650a493ba04 100644 --- a/pkg/apis/provisioning/v1alpha5/provisioner.go +++ b/pkg/apis/provisioning/v1alpha5/provisioner.go @@ -17,10 +17,10 @@ package v1alpha5 import ( "sort" - "github.com/awslabs/karpenter/pkg/utils/functional" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" ) // ProvisionerSpec is the top level provisioner specification. Provisioners @@ -95,105 +95,128 @@ type ProvisionerList struct { } // Zones for the constraints -func (c *Constraints) Zones() []string { - return c.Requirements.GetLabelValues(v1.LabelTopologyZone) +func (r Requirements) Zones() sets.String { + return r.Requirement(v1.LabelTopologyZone) } // InstanceTypes for the constraints -func (c *Constraints) InstanceTypes() []string { - return c.Requirements.GetLabelValues(v1.LabelInstanceTypeStable) +func (r Requirements) InstanceTypes() sets.String { + return r.Requirement(v1.LabelInstanceTypeStable) } // Architectures for the constraints -func (c *Constraints) Architectures() []string { - return c.Requirements.GetLabelValues(v1.LabelArchStable) +func (r Requirements) Architectures() sets.String { + return r.Requirement(v1.LabelArchStable) } // OperatingSystems for the constraints -func (c *Constraints) OperatingSystems() []string { - return c.Requirements.GetLabelValues(v1.LabelOSStable) +func (r Requirements) OperatingSystems() sets.String { + return r.Requirement(v1.LabelOSStable) } -// Consolidate and copy the constraints -func (c *Constraints) Consolidate() *Constraints { - // Combine labels and requirements - combined := append(Requirements{}, c.Requirements...) - for key, value := range c.Labels { - combined = append(combined, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) +func (r Requirements) WithProvisioner(provisioner Provisioner) Requirements { + return r. + With(provisioner.Spec.Requirements). + WithLabels(provisioner.Spec.Labels). + WithLabels(map[string]string{ProvisionerNameLabelKey: provisioner.Name}) +} + +func (r Requirements) With(requirements Requirements) Requirements { + return append(r, requirements...) +} + +func (r Requirements) WithLabels(labels map[string]string) Requirements { + for key, value := range labels { + r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) } - // Simplify to a single OpIn per label - requirements := Requirements{} - for _, label := range combined.GetLabels() { + return r +} + +func (r Requirements) WithPod(pod *v1.Pod) Requirements { + for key, value := range pod.Spec.NodeSelector { + r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) + } + if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil { + return r + } + // Select heaviest preference and treat as a requirement. An outer loop will iteratively unconstrain them if unsatisfiable. + if preferred := pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution; len(preferred) > 0 { + sort.Slice(preferred, func(i int, j int) bool { return preferred[i].Weight > preferred[j].Weight }) + r = append(r, preferred[0].Preference.MatchExpressions...) + } + // Select first requirement. An outer loop will iteratively remove OR requirements if unsatisfiable + if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil && + len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 { + r = append(r, pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions...) + } + return r +} + +// Consolidate combines In and NotIn requirements for each unique key, producing +// an equivalent minimal representation of the requirements. This is useful as +// requirements may be appended from a variety of sources and then consolidated. +// Caution: If a key has contains a `NotIn` operator without a corresponding +// `In` operator, the requirement will permanently be [] after consolidation. To +// avoid this, include the broadest `In` requirements before consolidating. +func (r Requirements) Consolidate() (requirements Requirements) { + for _, key := range r.Keys() { requirements = append(requirements, v1.NodeSelectorRequirement{ - Key: label, + Key: key, Operator: v1.NodeSelectorOpIn, - Values: combined.GetLabelValues(label), + Values: r.Requirement(key).UnsortedList(), }) } - return &Constraints{ - Labels: c.Labels, - Taints: c.Taints, - Requirements: requirements, - Provider: c.Provider, - } + return requirements } -// With adds additional requirements from the pods -func (r Requirements) With(pods ...*v1.Pod) Requirements { - for _, pod := range pods { - for key, value := range pod.Spec.NodeSelector { - r = append(r, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}}) - } - if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil { - continue - } - // Select heaviest preference and treat as a requirement. An outer loop will iteratively unconstrain them if unsatisfiable. - if preferred := pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution; len(preferred) > 0 { - sort.Slice(preferred, func(i int, j int) bool { return preferred[i].Weight > preferred[j].Weight }) - r = append(r, preferred[0].Preference.MatchExpressions...) - } - // Select first requirement. An outer loop will iteratively remove OR requirements if unsatisfiable - if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil && - len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 { - r = append(r, pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions...) +func (r Requirements) CustomLabels() map[string]string { + labels := map[string]string{} + for _, key := range r.Keys() { + if !WellKnownLabels.Has(key) { + if requirement := r.Requirement(key); len(requirement) > 0 { + labels[key] = requirement.UnsortedList()[0] + } } } - return r + return labels } -// GetLabels returns unique set of the label keys from the requirements -func (r Requirements) GetLabels() []string { - keys := map[string]bool{} +func (r Requirements) WellKnown() (requirements Requirements) { for _, requirement := range r { - keys[requirement.Key] = true - } - result := []string{} - for key := range keys { - result = append(result, key) + if WellKnownLabels.Has(requirement.Key) { + requirements = append(requirements, requirement) + } } - return result + return requirements } -// GetLabelValues for the provided key constrained by the requirements -func (r Requirements) GetLabelValues(label string) []string { - var result []string - if known, ok := WellKnownLabels[label]; ok { - result = known +// Keys returns unique set of the label keys from the requirements +func (r Requirements) Keys() []string { + keys := sets.NewString() + for _, requirement := range r { + keys.Insert(requirement.Key) } + return keys.UnsortedList() +} + +// Requirements for the provided key, nil if unconstrained +func (r Requirements) Requirement(key string) sets.String { + var result sets.String // OpIn for _, requirement := range r { - if requirement.Key == label && requirement.Operator == v1.NodeSelectorOpIn { - result = functional.IntersectStringSlice(result, requirement.Values) + if requirement.Key == key && requirement.Operator == v1.NodeSelectorOpIn { + if result == nil { + result = sets.NewString(requirement.Values...) + } else { + result = result.Intersection(sets.NewString(requirement.Values...)) + } } } // OpNotIn for _, requirement := range r { - if requirement.Key == label && requirement.Operator == v1.NodeSelectorOpNotIn { - result = functional.StringSliceWithout(result, requirement.Values...) + if requirement.Key == key && requirement.Operator == v1.NodeSelectorOpNotIn { + result = result.Difference(sets.NewString(requirement.Values...)) } } - if len(result) == 0 { - result = []string{} - } return result } diff --git a/pkg/apis/provisioning/v1alpha5/provisioner_validation.go b/pkg/apis/provisioning/v1alpha5/provisioner_validation.go index a1c816772363..ee653560798d 100644 --- a/pkg/apis/provisioning/v1alpha5/provisioner_validation.go +++ b/pkg/apis/provisioning/v1alpha5/provisioner_validation.go @@ -82,7 +82,6 @@ func (c *Constraints) Validate(ctx context.Context) (errs *apis.FieldError) { c.validateLabels(), c.validateTaints(), c.validateRequirements(), - c.Consolidate().Requirements.Validate(), ValidateHook(ctx, c), ) } @@ -95,11 +94,8 @@ func (c *Constraints) validateLabels() (errs *apis.FieldError) { for _, err := range validation.IsValidLabelValue(value) { errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s, %s", value, err), fmt.Sprintf("labels[%s]", key))) } - if known, ok := WellKnownLabels[key]; ok && !functional.ContainsString(known, value) { - errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %s", value, known), fmt.Sprintf("labels[%s]", key))) - } if _, ok := WellKnownLabels[key]; !ok && IsRestrictedLabelDomain(key) { - errs = errs.Also(apis.ErrInvalidKeyName(key, "labels", "label prefix not supported")) + errs = errs.Also(apis.ErrInvalidKeyName(key, "labels", "label domain not allowed")) } } return errs @@ -157,8 +153,8 @@ func (c *Constraints) validateRequirements() (errs *apis.FieldError) { } func (r Requirements) Validate() (errs *apis.FieldError) { - for _, label := range r.GetLabels() { - if len(r.GetLabelValues(label)) == 0 { + for _, label := range r.Keys() { + if r.Requirement(label).Len() == 0 { errs = errs.Also(apis.ErrGeneric(fmt.Sprintf("%s is too constrained", label))) } } @@ -173,11 +169,6 @@ func validateRequirement(requirement v1.NodeSelectorRequirement) (errs *apis.Fie for _, err := range validation.IsValidLabelValue(value) { errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s, %s", value, err), "values", i)) } - if known, ok := WellKnownLabels[requirement.Key]; ok { - if !functional.ContainsString(known, value) { - errs = errs.Also(apis.ErrInvalidArrayValue(fmt.Sprintf("%s not in %s", value, known), "values", i)) - } - } } if !functional.ContainsString(SupportedNodeSelectorOps, string(requirement.Operator)) { errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s not in %s", requirement.Operator, SupportedNodeSelectorOps), "operator")) diff --git a/pkg/apis/provisioning/v1alpha5/provisioner_validation_test.go b/pkg/apis/provisioning/v1alpha5/provisioner_validation_test.go index 953181880f25..86832b7dd852 100644 --- a/pkg/apis/provisioning/v1alpha5/provisioner_validation_test.go +++ b/pkg/apis/provisioning/v1alpha5/provisioner_validation_test.go @@ -77,30 +77,12 @@ var _ = Describe("Validation", func() { Expect(provisioner.Validate(ctx)).ToNot(Succeed()) } }) - It("should fail for restricted prefixes when not well known labels", func() { + It("should fail for restricted label domains", func() { for _, label := range RestrictedLabelDomains { provisioner.Spec.Labels = map[string]string{label + "/unknown": randomdata.SillyName()} Expect(provisioner.Validate(ctx)).ToNot(Succeed()) } }) - It("should succeed for well known label values", func() { - WellKnownLabels[v1.LabelTopologyZone] = []string{"test-1", "test1"} - WellKnownLabels[v1.LabelInstanceTypeStable] = []string{"test-1", "test1"} - WellKnownLabels[v1.LabelArchStable] = []string{"test-1", "test1"} - WellKnownLabels[v1.LabelOSStable] = []string{"test-1", "test1"} - for key, values := range WellKnownLabels { - for _, value := range values { - provisioner.Spec.Labels = map[string]string{key: value} - Expect(provisioner.Validate(ctx)).To(Succeed()) - } - } - }) - It("should fail for invalid well known label values", func() { - for key := range WellKnownLabels { - provisioner.Spec.Labels = map[string]string{key: "unknown"} - Expect(provisioner.Validate(ctx)).ToNot(Succeed()) - } - }) }) Context("Taints", func() { It("should succeed for valid taints", func() { @@ -143,38 +125,5 @@ var _ = Describe("Validation", func() { Expect(provisioner.Validate(ctx)).ToNot(Succeed()) } }) - It("should validate well known labels", func() { - WellKnownLabels[v1.LabelTopologyZone] = []string{"test"} - provisioner.Spec.Requirements = Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test"}}} - Expect(provisioner.Validate(ctx)).To(Succeed()) - provisioner.Spec.Labels = map[string]string{} - provisioner.Spec.Requirements = Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test"}}} - Expect(provisioner.Validate(ctx)).To(Succeed()) - provisioner.Spec.Labels = map[string]string{v1.LabelTopologyZone: "test"} - provisioner.Spec.Requirements = Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"test"}}} - Expect(provisioner.Validate(ctx)).To(Succeed()) - provisioner.Spec.Labels = map[string]string{v1.LabelTopologyZone: "test"} - provisioner.Spec.Requirements = Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{"test"}}} - Expect(provisioner.Validate(ctx)).ToNot(Succeed()) - provisioner.Spec.Labels = map[string]string{v1.LabelTopologyZone: "test"} - provisioner.Spec.Requirements = Requirements{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{"unknown"}}} - Expect(provisioner.Validate(ctx)).ToNot(Succeed()) - }) - It("should validate custom labels", func() { - provisioner.Spec.Requirements = Requirements{{Key: "test", Operator: v1.NodeSelectorOpIn, Values: []string{"test"}}} - Expect(provisioner.Validate(ctx)).To(Succeed()) - provisioner.Spec.Labels = map[string]string{} - provisioner.Spec.Requirements = Requirements{{Key: "test", Operator: v1.NodeSelectorOpIn, Values: []string{"test"}}} - Expect(provisioner.Validate(ctx)).To(Succeed()) - provisioner.Spec.Labels = map[string]string{"test": "test"} - provisioner.Spec.Requirements = Requirements{{Key: "test", Operator: v1.NodeSelectorOpIn, Values: []string{"test"}}} - Expect(provisioner.Validate(ctx)).To(Succeed()) - provisioner.Spec.Labels = map[string]string{"test": "test"} - provisioner.Spec.Requirements = Requirements{{Key: "test", Operator: v1.NodeSelectorOpNotIn, Values: []string{"test"}}} - Expect(provisioner.Validate(ctx)).ToNot(Succeed()) - provisioner.Spec.Labels = map[string]string{"test": "test"} - provisioner.Spec.Requirements = Requirements{{Key: "test", Operator: v1.NodeSelectorOpIn, Values: []string{"unknown"}}} - Expect(provisioner.Validate(ctx)).ToNot(Succeed()) - }) }) }) diff --git a/pkg/apis/provisioning/v1alpha5/register.go b/pkg/apis/provisioning/v1alpha5/register.go index fdc8185a6bc7..89f0f4842a72 100644 --- a/pkg/apis/provisioning/v1alpha5/register.go +++ b/pkg/apis/provisioning/v1alpha5/register.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/apis" ) @@ -45,19 +46,19 @@ var ( EmptinessTimestampAnnotationKey, v1.LabelHostname, } - // WellKnownLabels supported by karpenter and their allowable values - WellKnownLabels = map[string][]string{ - v1.LabelTopologyZone: {}, - v1.LabelInstanceTypeStable: {}, - v1.LabelArchStable: {}, - v1.LabelOSStable: {}, - } // These are either prohibited by the kubelet or reserved by karpenter RestrictedLabelDomains = []string{ "kubernetes.io", "k8s.io", "karpenter.sh", } + // WellKnownLabels supported by karpenter + WellKnownLabels = sets.NewString( + v1.LabelTopologyZone, + v1.LabelInstanceTypeStable, + v1.LabelArchStable, + v1.LabelOSStable, + ) DefaultHook = func(ctx context.Context, constraints *Constraints) {} ValidateHook = func(ctx context.Context, constraints *Constraints) *apis.FieldError { return nil } ) diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/provider_defaults.go b/pkg/cloudprovider/aws/apis/v1alpha1/provider_defaults.go index 8cba3f4dcba1..be6fc7f97a17 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/provider_defaults.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/provider_defaults.go @@ -34,7 +34,10 @@ func (c *Constraints) Default(ctx context.Context) { } func (c *Constraints) defaultCapacityTypes() { - if functional.ContainsString(c.Consolidate().Requirements.GetLabels(), CapacityTypeLabel) { + if _, ok := c.Labels[CapacityTypeLabel]; ok { + return + } + if functional.ContainsString(c.Requirements.Keys(), CapacityTypeLabel) { return } c.Requirements = append(c.Requirements, v1.NodeSelectorRequirement{ @@ -45,7 +48,10 @@ func (c *Constraints) defaultCapacityTypes() { } func (c *Constraints) defaultArchitecture() { - if functional.ContainsString(c.Consolidate().Requirements.GetLabels(), v1.LabelArchStable) { + if _, ok := c.Labels[v1.LabelArchStable]; ok { + return + } + if functional.ContainsString(c.Requirements.Keys(), v1.LabelArchStable) { return } c.Requirements = append(c.Requirements, v1.NodeSelectorRequirement{ diff --git a/pkg/cloudprovider/aws/apis/v1alpha1/register.go b/pkg/cloudprovider/aws/apis/v1alpha1/register.go index acb985a0e081..21c55fef5c16 100644 --- a/pkg/cloudprovider/aws/apis/v1alpha1/register.go +++ b/pkg/cloudprovider/aws/apis/v1alpha1/register.go @@ -44,6 +44,6 @@ var ( func init() { Scheme.AddKnownTypes(schema.GroupVersion{Group: v1alpha5.ExtensionsGroup, Version: "v1alpha1"}, &AWS{}) v1alpha5.RestrictedLabels = append(v1alpha5.RestrictedLabels, AWSLabelPrefix) - v1alpha5.WellKnownLabels[CapacityTypeLabel] = []string{CapacityTypeSpot, CapacityTypeOnDemand} v1alpha5.RestrictedLabelDomains = append(v1alpha5.RestrictedLabelDomains, AWSRestrictedLabelDomains...) + v1alpha5.WellKnownLabels.Insert(CapacityTypeLabel) } diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index af55c131e243..63e1c5e8938c 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -29,8 +29,8 @@ import ( "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" "github.com/awslabs/karpenter/pkg/utils/parallel" - "github.com/awslabs/karpenter/pkg/utils/pretty" "github.com/awslabs/karpenter/pkg/utils/project" + "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "knative.dev/pkg/apis" @@ -57,6 +57,7 @@ const ( type CloudProvider struct { instanceTypeProvider *InstanceTypeProvider + subnetProvider *SubnetProvider instanceProvider *InstanceProvider creationQueue *parallel.WorkQueue } @@ -74,16 +75,17 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) *Cloud } logging.FromContext(ctx).Debugf("Using AWS region %s", *sess.Config.Region) ec2api := ec2.New(sess) - instanceTypeProvider := NewInstanceTypeProvider(ec2api) + subnetProvider := NewSubnetProvider(ec2api) + instanceTypeProvider := NewInstanceTypeProvider(ec2api, subnetProvider) return &CloudProvider{ instanceTypeProvider: instanceTypeProvider, - instanceProvider: &InstanceProvider{ec2api, instanceTypeProvider, + subnetProvider: subnetProvider, + instanceProvider: &InstanceProvider{ec2api, instanceTypeProvider, subnetProvider, NewLaunchTemplateProvider( ec2api, NewAMIProvider(ssm.New(sess), options.ClientSet), NewSecurityGroupProvider(ec2api), ), - NewSubnetProvider(ec2api), }, creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } @@ -121,20 +123,21 @@ func (c *CloudProvider) create(ctx context.Context, constraints *v1alpha5.Constr // Partial fulfillment will be logged nodes, err := c.instanceProvider.Create(ctx, vendorConstraints, instanceTypes, quantity) if err != nil { - return fmt.Errorf("launching %d instance(s) with constraints %s and instance types %s, %w", - quantity, pretty.Concise(vendorConstraints), pretty.Concise(instanceTypes), err) + return fmt.Errorf("launching instances, %w", err) } - + var errs error for _, node := range nodes { - if cErr := callback(node); err != nil { - err = multierr.Append(err, cErr) - } + errs = multierr.Append(errs, callback(node)) } - return err + return errs } -func (c *CloudProvider) GetInstanceTypes(ctx context.Context) ([]cloudprovider.InstanceType, error) { - return c.instanceTypeProvider.Get(ctx) +func (c *CloudProvider) GetInstanceTypes(ctx context.Context, constraints *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) { + vendorConstraints, err := v1alpha1.Deserialize(constraints) + if err != nil { + return nil, apis.ErrGeneric(err.Error()) + } + return c.instanceTypeProvider.Get(ctx, vendorConstraints) } func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error { diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index eeda6fd9b24a..5dc454c4710d 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -38,8 +38,8 @@ import ( type InstanceProvider struct { ec2api ec2iface.EC2API instanceTypeProvider *InstanceTypeProvider - launchTemplateProvider *LaunchTemplateProvider subnetProvider *SubnetProvider + launchTemplateProvider *LaunchTemplateProvider } // Create an instance given the constraints. @@ -61,7 +61,7 @@ func (p *InstanceProvider) Create(ctx context.Context, constraints *v1alpha1.Con ); err != nil && len(instances) == 0 { return nil, err } else if err != nil { - logging.FromContext(ctx).Errorf("retrieving node name for %d instances out of %d", quantity-len(instances), quantity) + logging.FromContext(ctx).Errorf("retrieving node name for %d/%d instances", quantity-len(instances), quantity) } nodes := []*v1.Node{} @@ -110,10 +110,10 @@ func (p *InstanceProvider) launchInstances(ctx context.Context, constraints *v1a // by constraints.Constrain(). Spot may be selected by constraining the provisioner, // or using nodeSelectors, required node affinity, or preferred node affinity. capacityType := v1alpha1.CapacityTypeOnDemand - if capacityTypes := constraints.Requirements.GetLabelValues(v1alpha1.CapacityTypeLabel); len(capacityTypes) == 0 { + if capacityTypes := constraints.Requirements.Requirement(v1alpha1.CapacityTypeLabel); len(capacityTypes) == 0 { return nil, fmt.Errorf("invariant violated, must contain at least one capacity type") } else if len(capacityTypes) == 1 { - capacityType = capacityTypes[0] + capacityType = capacityTypes.UnsortedList()[0] } // Get Launch Template Configs, which may differ due to GPU or Architecture requirements launchTemplateConfigs, err := p.getLaunchTemplateConfigs(ctx, constraints, instanceTypes, capacityType) @@ -152,10 +152,8 @@ func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constra if err != nil { return nil, fmt.Errorf("getting subnets, %w", err) } - - additionalLabels := map[string]string{v1alpha1.CapacityTypeLabel: capacityType} var launchTemplateConfigs []*ec2.FleetLaunchTemplateConfigRequest - launchTemplates, err := p.launchTemplateProvider.Get(ctx, constraints, instanceTypes, additionalLabels) + launchTemplates, err := p.launchTemplateProvider.Get(ctx, constraints, instanceTypes, map[string]string{v1alpha1.CapacityTypeLabel: capacityType}) if err != nil { return nil, fmt.Errorf("getting launch templates, %w", err) } @@ -174,7 +172,7 @@ func (p *InstanceProvider) getLaunchTemplateConfigs(ctx context.Context, constra func (p *InstanceProvider) getOverrides(instanceTypeOptions []cloudprovider.InstanceType, subnets []*ec2.Subnet, capacityType string) []*ec2.FleetLaunchTemplateOverridesRequest { var overrides []*ec2.FleetLaunchTemplateOverridesRequest for i, instanceType := range instanceTypeOptions { - for _, zone := range instanceType.Zones() { + for zone := range instanceType.Zones() { for _, subnet := range subnets { if aws.StringValue(subnet.AvailabilityZone) == zone { override := &ec2.FleetLaunchTemplateOverridesRequest{ @@ -227,6 +225,8 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes ObjectMeta: metav1.ObjectMeta{ Name: aws.StringValue(instance.PrivateDnsName), Labels: map[string]string{ + v1.LabelTopologyZone: aws.StringValue(instance.Placement.AvailabilityZone), + v1.LabelInstanceTypeStable: aws.StringValue(instance.InstanceType), v1alpha1.CapacityTypeLabel: getCapacityType(instance), }, }, @@ -240,7 +240,7 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes v1.ResourceMemory: *instanceType.Memory(), }, NodeInfo: v1.NodeSystemInfo{ - Architecture: aws.StringValue(instance.Architecture), + Architecture: v1alpha1.AWSToKubeArchitectures[aws.StringValue(instance.Architecture)], OSImage: aws.StringValue(instance.ImageId), OperatingSystem: v1alpha5.OperatingSystemLinux, }, @@ -264,7 +264,7 @@ func combineFleetErrors(errors []*ec2.CreateFleetError) (errs error) { for _, err := range errors { unique.Insert(fmt.Sprintf("%s: %s", aws.StringValue(err.ErrorCode), aws.StringValue(err.ErrorMessage))) } - for _, errorCode := range unique.List() { + for errorCode := range unique { errs = multierr.Append(errs, fmt.Errorf(errorCode)) } return fmt.Errorf("with fleet error(s), %w", errs) diff --git a/pkg/cloudprovider/aws/instancetype.go b/pkg/cloudprovider/aws/instancetype.go index b193853fe377..a351499aacb6 100644 --- a/pkg/cloudprovider/aws/instancetype.go +++ b/pkg/cloudprovider/aws/instancetype.go @@ -24,6 +24,7 @@ import ( "github.com/awslabs/karpenter/pkg/utils/resources" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" ) // EC2VMAvailableMemoryFactor assumes the EC2 VM will consume <7.25% of the memory of a given machine @@ -31,19 +32,19 @@ const EC2VMAvailableMemoryFactor = .925 type InstanceType struct { ec2.InstanceTypeInfo - ZoneOptions []string + ZoneOptions sets.String } func (i *InstanceType) Name() string { return aws.StringValue(i.InstanceType) } -func (i *InstanceType) Zones() []string { +func (i *InstanceType) Zones() sets.String { return i.ZoneOptions } -func (i *InstanceType) CapacityTypes() []string { - return aws.StringValueSlice(i.SupportedUsageClasses) +func (i *InstanceType) CapacityTypes() sets.String { + return sets.NewString(aws.StringValueSlice(i.SupportedUsageClasses)...) } func (i *InstanceType) Architecture() string { @@ -55,8 +56,8 @@ func (i *InstanceType) Architecture() string { return fmt.Sprint(aws.StringValueSlice(i.ProcessorInfo.SupportedArchitectures)) // Unrecognized, but used for error printing } -func (i *InstanceType) OperatingSystems() []string { - return []string{v1alpha5.OperatingSystemLinux} +func (i *InstanceType) OperatingSystems() sets.String { + return sets.NewString(v1alpha5.OperatingSystemLinux) } func (i *InstanceType) CPU() *resource.Quantity { diff --git a/pkg/cloudprovider/aws/instancetypes.go b/pkg/cloudprovider/aws/instancetypes.go index d3f8a8c87bd2..4fbd84e8857f 100644 --- a/pkg/cloudprovider/aws/instancetypes.go +++ b/pkg/cloudprovider/aws/instancetypes.go @@ -22,78 +22,91 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/awslabs/karpenter/pkg/cloudprovider/aws/apis/v1alpha1" "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/patrickmn/go-cache" + "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/logging" ) const ( - allInstanceTypesKey = "all" + instanceTypesCacheKey = "types" + instanceTypeZonesCacheKey = "zones" ) type InstanceTypeProvider struct { - ec2api ec2iface.EC2API - cache *cache.Cache + ec2api ec2iface.EC2API + subnetProvider *SubnetProvider + cache *cache.Cache } -func NewInstanceTypeProvider(ec2api ec2iface.EC2API) *InstanceTypeProvider { +func NewInstanceTypeProvider(ec2api ec2iface.EC2API, subnetProvider *SubnetProvider) *InstanceTypeProvider { return &InstanceTypeProvider{ - ec2api: ec2api, - cache: cache.New(CacheTTL, CacheCleanupInterval), + ec2api: ec2api, + subnetProvider: subnetProvider, + cache: cache.New(CacheTTL, CacheCleanupInterval), } } -// Get all instance types that are available per availability zone -func (p *InstanceTypeProvider) Get(ctx context.Context) ([]cloudprovider.InstanceType, error) { - var instanceTypes []cloudprovider.InstanceType - if cached, ok := p.cache.Get(allInstanceTypesKey); ok { - instanceTypes = cached.([]cloudprovider.InstanceType) - } else { - var err error - instanceTypes, err = p.get(ctx) - if err != nil { - return nil, err - } - p.cache.SetDefault(allInstanceTypesKey, instanceTypes) - logging.FromContext(ctx).Debugf("Discovered %d EC2 instance types", len(instanceTypes)) - } - return instanceTypes, nil -} - -func (p *InstanceTypeProvider) get(ctx context.Context) ([]cloudprovider.InstanceType, error) { - // 1. Get InstanceTypes from EC2 +// Get instance type options given the constraints +func (p *InstanceTypeProvider) Get(ctx context.Context, constraints *v1alpha1.Constraints) ([]cloudprovider.InstanceType, error) { + // Get InstanceTypes from EC2 instanceTypes, err := p.getInstanceTypes(ctx) if err != nil { return nil, fmt.Errorf("retrieving all instance types, %w", err) } - - err = p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{ - LocationType: aws.String("availability-zone"), - }, func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool { - for _, offering := range output.InstanceTypeOfferings { - for _, instanceType := range instanceTypes { - if instanceType.Name() == aws.StringValue(offering.InstanceType) { - instanceType.ZoneOptions = append(instanceType.ZoneOptions, aws.StringValue(offering.Location)) - } - } - } - return true - }) + // Get Viable AZs from subnets + subnets, err := p.subnetProvider.Get(ctx, constraints) if err != nil { - return nil, fmt.Errorf("describing instance type zone offerings, %w", err) + return nil, fmt.Errorf("getting subnets, %w", err) } - - // convert to cloudprovider.InstanceType + subnetZones := sets.NewString() + for _, subnet := range subnets { + subnetZones.Insert(aws.StringValue(subnet.AvailabilityZone)) + } + // Get Viable EC2 Purchase offerings + instanceTypeZones, err := p.getInstanceTypeZones(ctx) + if err != nil { + return nil, err + } + // Convert to cloudprovider.InstanceType result := []cloudprovider.InstanceType{} for _, instanceType := range instanceTypes { + instanceType.ZoneOptions = subnetZones.Intersection(instanceTypeZones[instanceType.Name()]) result = append(result, instanceType) } return result, nil } +func (p *InstanceTypeProvider) getInstanceTypeZones(ctx context.Context) (result map[string]sets.String, err error) { + if cached, ok := p.cache.Get(instanceTypeZonesCacheKey); ok { + return cached.(map[string]sets.String), nil + } + defer func() { p.cache.SetDefault(instanceTypeZonesCacheKey, result) }() + zones := map[string]sets.String{} + if err := p.ec2api.DescribeInstanceTypeOfferingsPagesWithContext(ctx, &ec2.DescribeInstanceTypeOfferingsInput{LocationType: aws.String("availability-zone")}, + func(output *ec2.DescribeInstanceTypeOfferingsOutput, lastPage bool) bool { + for _, offering := range output.InstanceTypeOfferings { + if _, ok := zones[aws.StringValue(offering.InstanceType)]; !ok { + zones[aws.StringValue(offering.InstanceType)] = sets.NewString() + } + zones[aws.StringValue(offering.InstanceType)].Insert(aws.StringValue(offering.Location)) + } + return true + }); err != nil { + return nil, fmt.Errorf("describing instance type zone offerings, %w", err) + } + logging.FromContext(ctx).Debugf("Discovered EC2 instance types zonal offerings") + return zones, nil +} + // getInstanceTypes retrieves all instance types from the ec2 DescribeInstanceTypes API using some opinionated filters -func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) ([]*InstanceType, error) { - instanceTypes := []*InstanceType{} +func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) (result map[string]*InstanceType, err error) { + if cached, ok := p.cache.Get(instanceTypesCacheKey); ok { + return cached.(map[string]*InstanceType), nil + } + defer func() { p.cache.SetDefault(instanceTypesCacheKey, result) }() + instanceTypes := map[string]*InstanceType{} if err := p.ec2api.DescribeInstanceTypesPagesWithContext(ctx, &ec2.DescribeInstanceTypesInput{ Filters: []*ec2.Filter{ { @@ -104,13 +117,14 @@ func (p *InstanceTypeProvider) getInstanceTypes(ctx context.Context) ([]*Instanc }, func(page *ec2.DescribeInstanceTypesOutput, lastPage bool) bool { for _, instanceType := range page.InstanceTypes { if p.filter(instanceType) { - instanceTypes = append(instanceTypes, &InstanceType{InstanceTypeInfo: *instanceType}) + instanceTypes[aws.StringValue(instanceType.InstanceType)] = &InstanceType{InstanceTypeInfo: *instanceType} } } return true }); err != nil { return nil, fmt.Errorf("fetching instance types using ec2.DescribeInstanceTypes, %w", err) } + logging.FromContext(ctx).Debugf("Discovered %d EC2 instance types", len(instanceTypes)) return instanceTypes, nil } diff --git a/pkg/cloudprovider/aws/subnets.go b/pkg/cloudprovider/aws/subnets.go index 6bfee0e6779c..495d2c31787a 100644 --- a/pkg/cloudprovider/aws/subnets.go +++ b/pkg/cloudprovider/aws/subnets.go @@ -56,10 +56,10 @@ func (s *SubnetProvider) Get(ctx context.Context, constraints *v1alpha1.Constrai func (s *SubnetProvider) getFilters(constraints *v1alpha1.Constraints) []*ec2.Filter { filters := []*ec2.Filter{} // Filter by zone - if zones := constraints.Zones(); zones != nil { + if zones := constraints.Requirements.Zones(); zones != nil { filters = append(filters, &ec2.Filter{ Name: aws.String("availability-zone"), - Values: aws.StringSlice(zones), + Values: aws.StringSlice(zones.UnsortedList()), }) } // Filter by selector @@ -92,14 +92,14 @@ func (s *SubnetProvider) getSubnets(ctx context.Context, filters []*ec2.Filter) return nil, fmt.Errorf("describing subnets %+v, %w", filters, err) } s.cache.Set(fmt.Sprint(hash), output.Subnets, CacheTTL) - logging.FromContext(ctx).Debugf("Discovered subnets: %s", s.subnetIds(output.Subnets)) + logging.FromContext(ctx).Debugf("Discovered subnets: %s", s.prettySubnets(output.Subnets)) return output.Subnets, nil } -func (s *SubnetProvider) subnetIds(subnets []*ec2.Subnet) []string { +func (s *SubnetProvider) prettySubnets(subnets []*ec2.Subnet) []string { names := []string{} for _, subnet := range subnets { - names = append(names, aws.StringValue(subnet.SubnetId)) + names = append(names, fmt.Sprintf("%s (%s)", aws.StringValue(subnet.SubnetId), aws.StringValue(subnet.AvailabilityZone))) } return names } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index 240c68bfa177..aea350363a0a 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -63,30 +63,36 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval) fakeEC2API = &fake.EC2API{} - instanceTypeProvider := NewInstanceTypeProvider(fakeEC2API) + subnetProvider := NewSubnetProvider(fakeEC2API) + instanceTypeProvider := NewInstanceTypeProvider(fakeEC2API, subnetProvider) env = test.NewEnvironment(ctx, func(e *test.Environment) { clientSet := kubernetes.NewForConfigOrDie(e.Config) cloudProvider := &CloudProvider{ + subnetProvider: subnetProvider, instanceTypeProvider: instanceTypeProvider, - instanceProvider: &InstanceProvider{fakeEC2API, instanceTypeProvider, &LaunchTemplateProvider{ - fakeEC2API, - NewAMIProvider(&fake.SSMAPI{}, clientSet), - NewSecurityGroupProvider(fakeEC2API), - launchTemplateCache, - }, - NewSubnetProvider(fakeEC2API), + instanceProvider: &InstanceProvider{ + fakeEC2API, instanceTypeProvider, subnetProvider, &LaunchTemplateProvider{ + fakeEC2API, + NewAMIProvider(&fake.SSMAPI{}, clientSet), + NewSecurityGroupProvider(fakeEC2API), + launchTemplateCache, + }, }, creationQueue: parallel.NewWorkQueue(CreationQPS, CreationBurst), } registry.RegisterOrDie(ctx, cloudProvider) controller = &allocation.Controller{ - Filter: &allocation.Filter{KubeClient: e.Client}, - Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: clientSet.CoreV1()}, - Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), - Scheduler: scheduling.NewScheduler(e.Client), - Packer: binpacking.NewPacker(), - CloudProvider: cloudProvider, + Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), + Filter: &allocation.Filter{KubeClient: e.Client}, + Scheduler: scheduling.NewScheduler(e.Client, cloudProvider), + Launcher: &allocation.Launcher{ + Packer: &binpacking.Packer{}, + KubeClient: e.Client, + CoreV1Client: clientSet.CoreV1(), + CloudProvider: cloudProvider, + }, KubeClient: e.Client, + CloudProvider: cloudProvider, } }) @@ -350,8 +356,8 @@ var _ = Describe("Allocation", func() { }) It("should default requirements", func() { provisioner.SetDefaults(ctx) - Expect(provisioner.Spec.Requirements.GetLabelValues(v1alpha1.CapacityTypeLabel)).To(ConsistOf(v1alpha1.CapacityTypeOnDemand)) - Expect(provisioner.Spec.Requirements.GetLabelValues(v1.LabelArchStable)).To(ConsistOf(v1alpha5.ArchitectureAmd64)) + Expect(provisioner.Spec.Requirements.Requirement(v1alpha1.CapacityTypeLabel).UnsortedList()).To(ConsistOf(v1alpha1.CapacityTypeOnDemand)) + Expect(provisioner.Spec.Requirements.Architectures().UnsortedList()).To(ConsistOf(v1alpha5.ArchitectureAmd64)) }) }) Context("Validation", func() { diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 9ed700dab21a..96b4d905ad7d 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -22,12 +22,12 @@ import ( "github.com/Pallinder/go-randomdata" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" - "github.com/awslabs/karpenter/pkg/utils/functional" "knative.dev/pkg/apis" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" ) type CloudProvider struct{} @@ -36,14 +36,9 @@ func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constrai err := make(chan error) for i := 0; i < quantity; i++ { name := strings.ToLower(randomdata.SillyName()) - // Pick first instance type option instance := instanceTypes[0] - // Pick first zone - zones := instance.Zones() - if len(constraints.Zones()) != 0 { - zones = functional.IntersectStringSlice(constraints.Zones(), instance.Zones()) - } - zone := zones[0] + zone := instance.Zones().Intersection(constraints.Requirements.Zones()).UnsortedList()[0] + operatingSystem := instance.OperatingSystems().UnsortedList()[0] go func() { err <- bind(&v1.Node{ @@ -60,7 +55,7 @@ func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constrai Status: v1.NodeStatus{ NodeInfo: v1.NodeSystemInfo{ Architecture: instance.Architecture(), - OperatingSystem: instance.OperatingSystems()[0], + OperatingSystem: operatingSystem, }, Allocatable: v1.ResourceList{ v1.ResourcePods: *instance.Pods(), @@ -74,7 +69,7 @@ func (c *CloudProvider) Create(_ context.Context, constraints *v1alpha5.Constrai return err } -func (c *CloudProvider) GetInstanceTypes(_ context.Context) ([]cloudprovider.InstanceType, error) { +func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Constraints) ([]cloudprovider.InstanceType, error) { return []cloudprovider.InstanceType{ NewInstanceType(InstanceTypeOptions{ name: "default-instance-type", @@ -93,7 +88,7 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context) ([]cloudprovider.Ins }), NewInstanceType(InstanceTypeOptions{ name: "windows-instance-type", - operatingSystems: []string{"windows"}, + operatingSystems: sets.NewString("windows"), }), NewInstanceType(InstanceTypeOptions{ name: "arm-instance-type", diff --git a/pkg/cloudprovider/fake/instancetype.go b/pkg/cloudprovider/fake/instancetype.go index 049e43c2fecd..61d360dd4fc8 100644 --- a/pkg/cloudprovider/fake/instancetype.go +++ b/pkg/cloudprovider/fake/instancetype.go @@ -17,20 +17,21 @@ package fake import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" ) func NewInstanceType(options InstanceTypeOptions) *InstanceType { if len(options.zones) == 0 { - options.zones = []string{"test-zone-1", "test-zone-2", "test-zone-3"} + options.zones = sets.NewString("test-zone-1", "test-zone-2", "test-zone-3") } if len(options.capacityTypes) == 0 { - options.capacityTypes = []string{"spot", "on-demand"} + options.capacityTypes = sets.NewString("spot", "on-demand") } if len(options.architecture) == 0 { options.architecture = "amd64" } if len(options.operatingSystems) == 0 { - options.operatingSystems = []string{"linux"} + options.operatingSystems = sets.NewString("linux") } if options.cpu.IsZero() { options.cpu = resource.MustParse("4") @@ -60,10 +61,10 @@ func NewInstanceType(options InstanceTypeOptions) *InstanceType { type InstanceTypeOptions struct { name string - zones []string - capacityTypes []string + zones sets.String + capacityTypes sets.String architecture string - operatingSystems []string + operatingSystems sets.String cpu resource.Quantity memory resource.Quantity pods resource.Quantity @@ -80,11 +81,11 @@ func (i *InstanceType) Name() string { return i.name } -func (i *InstanceType) CapacityTypes() []string { +func (i *InstanceType) CapacityTypes() sets.String { return i.capacityTypes } -func (i *InstanceType) Zones() []string { +func (i *InstanceType) Zones() sets.String { return i.zones } @@ -92,7 +93,7 @@ func (i *InstanceType) Architecture() string { return i.architecture } -func (i *InstanceType) OperatingSystems() []string { +func (i *InstanceType) OperatingSystems() sets.String { return i.operatingSystems } diff --git a/pkg/cloudprovider/registry/register.go b/pkg/cloudprovider/registry/register.go index e72901a4f452..81dbac1cf0c4 100644 --- a/pkg/cloudprovider/registry/register.go +++ b/pkg/cloudprovider/registry/register.go @@ -16,11 +16,9 @@ package registry import ( "context" - "fmt" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" - v1 "k8s.io/api/core/v1" ) func NewCloudProvider(ctx context.Context, options cloudprovider.Options) cloudprovider.CloudProvider { @@ -34,34 +32,6 @@ func NewCloudProvider(ctx context.Context, options cloudprovider.Options) cloudp // once at startup time. Typically, this call is made by NewCloudProvider(), but // must be called if the cloud provider is constructed manually (e.g. tests). func RegisterOrDie(ctx context.Context, cloudProvider cloudprovider.CloudProvider) { - zones := map[string]bool{} - architectures := map[string]bool{} - operatingSystems := map[string]bool{} - - instanceTypes, err := cloudProvider.GetInstanceTypes(ctx) - if err != nil { - panic(fmt.Sprintf("Failed to retrieve instance types, %s", err.Error())) - } - for _, instanceType := range instanceTypes { - v1alpha5.WellKnownLabels[v1.LabelInstanceTypeStable] = append(v1alpha5.WellKnownLabels[v1.LabelInstanceTypeStable], instanceType.Name()) - architectures[instanceType.Architecture()] = true - for _, zone := range instanceType.Zones() { - zones[zone] = true - } - for _, operatingSystem := range instanceType.OperatingSystems() { - operatingSystems[operatingSystem] = true - } - } - for zone := range zones { - v1alpha5.WellKnownLabels[v1.LabelTopologyZone] = append(v1alpha5.WellKnownLabels[v1.LabelTopologyZone], zone) - } - for architecture := range architectures { - v1alpha5.WellKnownLabels[v1.LabelArchStable] = append(v1alpha5.WellKnownLabels[v1.LabelArchStable], architecture) - } - for operatingSystem := range operatingSystems { - v1alpha5.WellKnownLabels[v1.LabelOSStable] = append(v1alpha5.WellKnownLabels[v1.LabelOSStable], operatingSystem) - } - v1alpha5.ValidateHook = cloudProvider.Validate v1alpha5.DefaultHook = cloudProvider.Default } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 397e3d45fa7d..afb6f64e413c 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -20,6 +20,7 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" "knative.dev/pkg/apis" ) @@ -34,9 +35,9 @@ type CloudProvider interface { Create(context.Context, *v1alpha5.Constraints, []InstanceType, int, func(*v1.Node) error) chan error // Delete node in cloudprovider Delete(context.Context, *v1.Node) error - // GetInstanceTypes returns the instance types supported by the cloud - // provider limited by the provided constraints and daemons. - GetInstanceTypes(context.Context) ([]InstanceType, error) + // GetInstanceTypes returns instance types supported by the cloudprovider. + // Availability of types or zone may vary by provisioner or over time. + GetInstanceTypes(context.Context, *v1alpha5.Constraints) ([]InstanceType, error) // Default is a hook for additional defaulting logic at webhook time. Default(context.Context, *v1alpha5.Constraints) // Validate is a hook for additional validation logic at webhook time. @@ -52,10 +53,10 @@ type Options struct { // or supported options in the case of arrays) type InstanceType interface { Name() string - Zones() []string - CapacityTypes() []string + Zones() sets.String + CapacityTypes() sets.String Architecture() string - OperatingSystems() []string + OperatingSystems() sets.String CPU() *resource.Quantity Memory() *resource.Quantity Pods() *resource.Quantity diff --git a/pkg/controllers/allocation/binpacking/packable.go b/pkg/controllers/allocation/binpacking/packable.go index f578863d84c7..6826b45f664f 100644 --- a/pkg/controllers/allocation/binpacking/packable.go +++ b/pkg/controllers/allocation/binpacking/packable.go @@ -20,7 +20,6 @@ import ( "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" - "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/awslabs/karpenter/pkg/utils/resources" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" @@ -152,29 +151,29 @@ func (p *Packable) reservePod(pod *v1.Pod) bool { } func (p *Packable) validateInstanceType(schedule *scheduling.Schedule) error { - if instanceTypes := schedule.InstanceTypes(); !functional.ContainsString(instanceTypes, p.Name()) { - return fmt.Errorf("instance type %s is not in %v", p.Name(), instanceTypes) + if !schedule.Requirements.InstanceTypes().Has(p.Name()) { + return fmt.Errorf("instance type %s is not in %v", p.Name(), schedule.Requirements.InstanceTypes().List()) } return nil } func (p *Packable) validateArchitecture(schedule *scheduling.Schedule) error { - if architectures := schedule.Architectures(); !functional.ContainsString(architectures, p.Architecture()) { - return fmt.Errorf("architecture %s is not in %v", p.Architecture(), architectures) + if !schedule.Requirements.Architectures().Has(p.Architecture()) { + return fmt.Errorf("architecture %s is not in %v", p.Architecture(), schedule.Requirements.Architectures().List()) } return nil } func (p *Packable) validateOperatingSystem(schedule *scheduling.Schedule) error { - if operatingSystems := schedule.OperatingSystems(); len(functional.IntersectStringSlice(p.OperatingSystems(), operatingSystems)) == 0 { - return fmt.Errorf("operating system %s is not in %v", operatingSystems, p.OperatingSystems()) + if schedule.Requirements.OperatingSystems().Intersection(p.OperatingSystems()).Len() == 0 { + return fmt.Errorf("operating system %s is not in %v", p.OperatingSystems(), schedule.Requirements.OperatingSystems().List()) } return nil } func (p *Packable) validateZones(schedule *scheduling.Schedule) error { - if zones := schedule.Zones(); len(functional.IntersectStringSlice(zones, p.Zones())) == 0 { - return fmt.Errorf("zones %v are not in %v", zones, p.Zones()) + if schedule.Requirements.Zones().Intersection(p.Zones()).Len() == 0 { + return fmt.Errorf("zones %v are not in %v", p.Zones(), schedule.Requirements.Zones().List()) } return nil } diff --git a/pkg/controllers/allocation/binpacking/packer.go b/pkg/controllers/allocation/binpacking/packer.go index 7d4245322e26..9af9d091b3fe 100644 --- a/pkg/controllers/allocation/binpacking/packer.go +++ b/pkg/controllers/allocation/binpacking/packer.go @@ -18,7 +18,6 @@ import ( "context" "math" "sort" - "time" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" "github.com/awslabs/karpenter/pkg/cloudprovider" @@ -39,7 +38,7 @@ var ( // MaxInstanceTypes defines the number of instance type options to return to the cloud provider MaxInstanceTypes = 20 - packTimeHistogram = prometheus.NewHistogram( + packDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: metrics.KarpenterNamespace, Subsystem: "allocation_controller", @@ -51,19 +50,11 @@ var ( ) func init() { - crmetrics.Registry.MustRegister(packTimeHistogram) + crmetrics.Registry.MustRegister(packDuration) } -type packer struct{} - -// Packer helps pack the pods and calculates efficient placement on the instances. -type Packer interface { - Pack(context.Context, *scheduling.Schedule, []cloudprovider.InstanceType) []*Packing -} - -// NewPacker returns a Packer implementation -func NewPacker() Packer { - return &packer{} +// Packer packs pods and calculates efficient placement on the instances. +type Packer struct { } // Packing is a binpacking solution of equivalently schedulable pods to a set of @@ -82,11 +73,8 @@ type Packing struct { // Pods provided are all schedulable in the same zone as tightly as possible. // It follows the First Fit Decreasing bin packing technique, reference- // https://en.wikipedia.org/wiki/Bin_packing_problem#First_Fit_Decreasing_(FFD) -func (p *packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instances []cloudprovider.InstanceType) []*Packing { - startTime := time.Now() - defer func() { - packTimeHistogram.Observe(time.Since(startTime).Seconds()) - }() +func (p *Packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instances []cloudprovider.InstanceType) []*Packing { + defer metrics.Measure(packDuration)() // Sort pods in decreasing order by the amount of CPU requested, if // CPU requested is equal compare memory requested. @@ -109,7 +97,7 @@ func (p *packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instan if mainPack, ok := packs[key]; ok { mainPack.NodeQuantity++ mainPack.Pods = append(mainPack.Pods, packing.Pods...) - logging.FromContext(ctx).Infof("Incremented node count to %d on packing for %d pod(s) with instance type option(s) %v", mainPack.NodeQuantity, flattenedLen(packing.Pods...), instanceTypeNames(mainPack.InstanceTypeOptions)) + logging.FromContext(ctx).Debugf("Incremented node count to %d on packing for %d pod(s) with instance type option(s) %v", mainPack.NodeQuantity, flattenedLen(packing.Pods...), instanceTypeNames(mainPack.InstanceTypeOptions)) continue } else { packs[key] = packing @@ -124,7 +112,7 @@ func (p *packer) Pack(ctx context.Context, schedule *scheduling.Schedule, instan // packWithLargestPod will try to pack max number of pods with largest pod in // pods across all available node capacities. It returns Packing: max pod count // that fit; with their node capacities and list of leftover pods -func (p *packer) packWithLargestPod(constraints *v1alpha5.Constraints, unpackedPods []*v1.Pod, packables []*Packable) (*Packing, []*v1.Pod) { +func (p *Packer) packWithLargestPod(constraints *v1alpha5.Constraints, unpackedPods []*v1.Pod, packables []*Packable) (*Packing, []*v1.Pod) { bestPackedPods := []*v1.Pod{} bestInstances := []cloudprovider.InstanceType{} remainingPods := unpackedPods @@ -155,7 +143,7 @@ func (p *packer) packWithLargestPod(constraints *v1alpha5.Constraints, unpackedP return &Packing{Pods: [][]*v1.Pod{bestPackedPods}, Constraints: constraints, InstanceTypeOptions: bestInstances, NodeQuantity: 1}, remainingPods } -func (*packer) podsMatch(first, second []*v1.Pod) bool { +func (*Packer) podsMatch(first, second []*v1.Pod) bool { if len(first) != len(second) { return false } diff --git a/pkg/controllers/allocation/controller.go b/pkg/controllers/allocation/controller.go index e2c99f92a77c..0b446f73a438 100644 --- a/pkg/controllers/allocation/controller.go +++ b/pkg/controllers/allocation/controller.go @@ -19,12 +19,10 @@ import ( "fmt" "time" - "go.uber.org/multierr" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/util/workqueue" "knative.dev/pkg/logging" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -41,7 +39,6 @@ import ( "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking" "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" - "github.com/awslabs/karpenter/pkg/utils/functional" ) const ( @@ -53,23 +50,26 @@ const ( type Controller struct { Batcher *Batcher Filter *Filter - Binder *Binder Scheduler *scheduling.Scheduler - Packer binpacking.Packer - CloudProvider cloudprovider.CloudProvider + Launcher *Launcher KubeClient client.Client + CloudProvider cloudprovider.CloudProvider } // NewController constructs a controller instance func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Controller { return &Controller{ - Filter: &Filter{KubeClient: kubeClient}, - Binder: &Binder{KubeClient: kubeClient, CoreV1Client: coreV1Client}, - Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout), - Scheduler: scheduling.NewScheduler(kubeClient), - Packer: binpacking.NewPacker(), - CloudProvider: cloudProvider, + Batcher: NewBatcher(maxBatchWindow, batchIdleTimeout), + Filter: &Filter{KubeClient: kubeClient}, + Scheduler: scheduling.NewScheduler(kubeClient, cloudProvider), + Launcher: &Launcher{ + Packer: &binpacking.Packer{}, + CloudProvider: cloudProvider, + KubeClient: kubeClient, + CoreV1Client: coreV1Client, + }, KubeClient: kubeClient, + CloudProvider: cloudProvider, } } @@ -90,7 +90,6 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco // Wait on a pod batch logging.FromContext(ctx).Infof("Waiting to batch additional pods") c.Batcher.Wait(provisioner) - // Filter pods pods, err := c.Filter.GetProvisionablePods(ctx, provisioner) if err != nil { @@ -101,40 +100,21 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco logging.FromContext(ctx).Infof("Watching for pod events") return reconcile.Result{}, nil } - // Group by constraints - schedules, err := c.Scheduler.Solve(ctx, provisioner, pods) - if err != nil { - return reconcile.Result{}, fmt.Errorf("solving scheduling constraints, %w", err) - } // Get Instance Types Options - instanceTypes, err := c.CloudProvider.GetInstanceTypes(ctx) + instanceTypes, err := c.CloudProvider.GetInstanceTypes(ctx, &provisioner.Spec.Constraints) if err != nil { return reconcile.Result{}, fmt.Errorf("getting instance types, %w", err) } - // Create capacity - errs := make([]error, len(schedules)) - workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(index int) { - for _, packing := range c.Packer.Pack(ctx, schedules[index], instanceTypes) { - // Create thread safe channel to pop off packed pod slices - packedPods := make(chan []*v1.Pod, len(packing.Pods)) - for _, pods := range packing.Pods { - packedPods <- pods - } - close(packedPods) - if err := <-c.CloudProvider.Create(ctx, packing.Constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error { - node.Labels = functional.UnionStringMaps( - node.Labels, - packing.Constraints.Labels, - map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, - ) - node.Spec.Taints = append(node.Spec.Taints, packing.Constraints.Taints...) - return c.Binder.Bind(ctx, node, <-packedPods) - }); err != nil { - errs[index] = multierr.Append(errs[index], err) - } - } - }) - return reconcile.Result{Requeue: true}, multierr.Combine(errs...) + // Separate pods by scheduling constraints + schedules, err := c.Scheduler.Solve(ctx, provisioner, instanceTypes, pods) + if err != nil { + return reconcile.Result{}, fmt.Errorf("solving scheduling constraints, %w", err) + } + // Launch capacity and bind pods + if err := c.Launcher.Launch(ctx, schedules, instanceTypes); err != nil { + return reconcile.Result{}, err + } + return reconcile.Result{Requeue: true}, nil } func (c *Controller) Register(ctx context.Context, m manager.Manager) error { diff --git a/pkg/controllers/allocation/bind.go b/pkg/controllers/allocation/launcher.go similarity index 54% rename from pkg/controllers/allocation/bind.go rename to pkg/controllers/allocation/launcher.go index 952dcec0fdf2..623f2b1ddb29 100644 --- a/pkg/controllers/allocation/bind.go +++ b/pkg/controllers/allocation/launcher.go @@ -17,10 +17,13 @@ package allocation import ( "context" "fmt" - "time" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" + "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/awslabs/karpenter/pkg/controllers/allocation/binpacking" + "github.com/awslabs/karpenter/pkg/controllers/allocation/scheduling" "github.com/awslabs/karpenter/pkg/metrics" + "github.com/awslabs/karpenter/pkg/utils/functional" "github.com/prometheus/client_golang/prometheus" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" @@ -33,54 +36,40 @@ import ( crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" ) -var bindTimeHistogramVec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: metrics.KarpenterNamespace, - Subsystem: "allocation_controller", - Name: "bind_duration_seconds", - Help: "Duration of bind process in seconds. Broken down by result.", - Buckets: metrics.DurationBuckets(), - }, - []string{metrics.ResultLabel}, -) - -func init() { - crmetrics.Registry.MustRegister(bindTimeHistogramVec) +type Launcher struct { + Packer *binpacking.Packer + KubeClient client.Client + CoreV1Client corev1.CoreV1Interface + CloudProvider cloudprovider.CloudProvider } -type Binder struct { - KubeClient client.Client - CoreV1Client corev1.CoreV1Interface +func (l *Launcher) Launch(ctx context.Context, schedules []*scheduling.Schedule, instanceTypes []cloudprovider.InstanceType) error { + // Pack and bind pods + errs := make([]error, len(schedules)) + workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(index int) { + for _, packing := range l.Packer.Pack(ctx, schedules[index], instanceTypes) { + // Create thread safe channel to pop off packed pod slices + packedPods := make(chan []*v1.Pod, len(packing.Pods)) + for _, pods := range packing.Pods { + packedPods <- pods + } + close(packedPods) + if err := <-l.CloudProvider.Create(ctx, packing.Constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error { + node.Labels = functional.UnionStringMaps(node.Labels, packing.Constraints.Labels) + node.Spec.Taints = append(node.Spec.Taints, packing.Constraints.Taints...) + return l.bind(ctx, node, <-packedPods) + }); err != nil { + errs[index] = multierr.Append(errs[index], err) + } + } + }) + return multierr.Combine(errs...) } -func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error { - startTime := time.Now() - bindErr := b.bind(ctx, node, pods) - durationSeconds := time.Since(startTime).Seconds() - - result := "success" - if bindErr != nil { - result = "error" - } - - labels := prometheus.Labels{metrics.ResultLabel: result} - observer, promErr := bindTimeHistogramVec.GetMetricWith(labels) - if promErr != nil { - logging.FromContext(ctx).Warnf( - "Failed to record bind duration metric [labels=%s, duration=%f]: error=%s", - labels, - durationSeconds, - promErr.Error(), - ) - } else { - observer.Observe(durationSeconds) - } - - return bindErr -} +func (l *Launcher) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err error) { + defer metrics.Measure(bindTimeHistogram)() -func (b *Binder) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error { - // 1. Add the Karpenter finalizer to the node to enable the termination workflow + // Add the Karpenter finalizer to the node to enable the termination workflow node.Finalizers = append(node.Finalizers, v1alpha5.TerminationFinalizer) // 2. Taint karpenter.sh/not-ready=NoSchedule to prevent the kube scheduler // from scheduling pods before we're able to bind them ourselves. The kube @@ -95,34 +84,35 @@ func (b *Binder) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error Key: v1alpha5.NotReadyTaintKey, Effect: v1.TaintEffectNoSchedule, }) - // 3. Idempotently create a node. In rare cases, nodes can come online and + // Idempotently create a node. In rare cases, nodes can come online and // self register before the controller is able to register a node object // with the API server. In the common case, we create the node object // ourselves to enforce the binding decision and enable images to be pulled // before the node is fully Ready. - if _, err := b.CoreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + if _, err := l.CoreV1Client.Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { if !errors.IsAlreadyExists(err) { return fmt.Errorf("creating node %s, %w", node.Name, err) } } - - // 4. Bind pods + // Bind pods errs := make([]error, len(pods)) - workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(index int) { - errs[index] = b.bindPod(ctx, node, pods[index]) + workqueue.ParallelizeUntil(ctx, len(pods), len(pods), func(i int) { + l.CoreV1Client.Pods(pods[i].Namespace).Bind(ctx, &v1.Binding{TypeMeta: pods[i].TypeMeta, ObjectMeta: pods[i].ObjectMeta, Target: v1.ObjectReference{Name: node.Name}}, metav1.CreateOptions{}) }) - err := multierr.Combine(errs...) - logging.FromContext(ctx).Infof("Bound %d pod(s) to node %s", len(pods)-len(multierr.Errors(err)), node.Name) - return err + logging.FromContext(ctx).Infof("Bound %d pod(s) to node %s", len(pods)-len(multierr.Errors(multierr.Combine(errs...))), node.Name) + return multierr.Combine(errs...) } -func (b *Binder) bindPod(ctx context.Context, node *v1.Node, pod *v1.Pod) error { - if err := b.CoreV1Client.Pods(pod.Namespace).Bind(ctx, &v1.Binding{ - TypeMeta: pod.TypeMeta, - ObjectMeta: pod.ObjectMeta, - Target: v1.ObjectReference{Name: node.Name}, - }, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("binding pod, %w", err) - } - return nil +var bindTimeHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: metrics.KarpenterNamespace, + Subsystem: "allocation_controller", + Name: "bind_duration_seconds", + Help: "Duration of bind process in seconds. Broken down by result.", + Buckets: metrics.DurationBuckets(), + }, +) + +func init() { + crmetrics.Registry.MustRegister(bindTimeHistogram) } diff --git a/pkg/controllers/allocation/scheduling/constraints.go b/pkg/controllers/allocation/scheduling/constraints.go index 4c9af9ab483f..10bf4c5c73b3 100644 --- a/pkg/controllers/allocation/scheduling/constraints.go +++ b/pkg/controllers/allocation/scheduling/constraints.go @@ -24,8 +24,7 @@ import ( v1 "k8s.io/api/core/v1" ) -// NewConstraints overrides the constraints with pod scheduling constraints -func NewConstraints(ctx context.Context, constraints *v1alpha5.Constraints, pod *v1.Pod) (*v1alpha5.Constraints, error) { +func NewConstraints(ctx context.Context, constraints *v1alpha5.Constraints, requirements v1alpha5.Requirements, pod *v1.Pod) (*v1alpha5.Constraints, error) { // Validate that the pod is viable if err := multierr.Combine( validateAffinity(pod), @@ -34,57 +33,18 @@ func NewConstraints(ctx context.Context, constraints *v1alpha5.Constraints, pod ); err != nil { return nil, err } - requirements := constraints.Requirements.With(pod) + requirements = requirements.WithPod(pod).Consolidate() if err := requirements.Validate(); err != nil { return nil, err } return &v1alpha5.Constraints{ - Requirements: requirements, - Labels: generateLabels(requirements), - Taints: generateTaints(constraints.Taints, pod.Spec.Tolerations), + Labels: requirements.CustomLabels(), + Requirements: requirements.WellKnown(), + Taints: Taints(constraints.Taints).WithPod(pod), Provider: constraints.Provider, }, nil } -func generateTaints(taints []v1.Taint, tolerations []v1.Toleration) []v1.Taint { - for _, toleration := range tolerations { - // Only OpEqual is supported. OpExists does not make sense for - // provisioning -- in theory we could create a taint on the node with a - // random string, but it's unclear use case this would accomplish. - if toleration.Operator != v1.TolerationOpEqual { - continue - } - var generated []v1.Taint - // Use effect if defined, otherwise taint all effects - if toleration.Effect != "" { - generated = []v1.Taint{{Key: toleration.Key, Value: toleration.Value, Effect: toleration.Effect}} - } else { - generated = []v1.Taint{ - {Key: toleration.Key, Value: toleration.Value, Effect: v1.TaintEffectNoSchedule}, - {Key: toleration.Key, Value: toleration.Value, Effect: v1.TaintEffectNoExecute}, - } - } - // Only add taints that do not already exist on constraints - for _, taint := range generated { - if !Taints(taints).Has(taint) { - taints = append(taints, taint) - } - } - } - return taints -} - -func generateLabels(requirements v1alpha5.Requirements) map[string]string { - labels := map[string]string{} - for _, label := range requirements.GetLabels() { - // Only include labels that aren't well known. Well known labels will be populated by the kubelet - if _, ok := v1alpha5.WellKnownLabels[label]; !ok { - labels[label] = requirements.GetLabelValues(label)[0] - } - } - return labels -} - func validateTopology(pod *v1.Pod) (errs error) { for _, constraint := range pod.Spec.TopologySpreadConstraints { if supported := []string{v1.LabelHostname, v1.LabelTopologyZone}; !functional.ContainsString(supported, constraint.TopologyKey) { diff --git a/pkg/controllers/allocation/scheduling/scheduler.go b/pkg/controllers/allocation/scheduling/scheduler.go index aaf67a46f079..5863d113fe29 100644 --- a/pkg/controllers/allocation/scheduling/scheduler.go +++ b/pkg/controllers/allocation/scheduling/scheduler.go @@ -17,39 +17,41 @@ package scheduling import ( "context" "fmt" - "time" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5" + "github.com/awslabs/karpenter/pkg/cloudprovider" "github.com/awslabs/karpenter/pkg/metrics" "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/client_golang/prometheus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" ) -var scheduleTimeHistogramVec = prometheus.NewHistogramVec( +var schedulingDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: metrics.KarpenterNamespace, Subsystem: "allocation_controller", Name: "scheduling_duration_seconds", - Help: "Duration of scheduling process in seconds. Broken down by provisioner and result.", + Help: "Duration of scheduling process in seconds. Broken down by provisioner and error.", Buckets: metrics.DurationBuckets(), }, - []string{metrics.ProvisionerLabel, metrics.ResultLabel}, + []string{metrics.ProvisionerLabel}, ) func init() { - crmetrics.Registry.MustRegister(scheduleTimeHistogramVec) + crmetrics.Registry.MustRegister(schedulingDuration) } type Scheduler struct { - KubeClient client.Client - Topology *Topology - Preferences *Preferences + CloudProvider cloudprovider.CloudProvider + KubeClient client.Client + Topology *Topology + Preferences *Preferences } type Schedule struct { @@ -60,61 +62,31 @@ type Schedule struct { Daemons []*v1.Pod } -func NewScheduler(kubeClient client.Client) *Scheduler { +func NewScheduler(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Scheduler { return &Scheduler{ - KubeClient: kubeClient, - Topology: &Topology{ - kubeClient: kubeClient, - }, - Preferences: NewPreferences(), + CloudProvider: cloudProvider, + KubeClient: kubeClient, + Topology: &Topology{kubeClient: kubeClient}, + Preferences: NewPreferences(), } } -func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner, pods []*v1.Pod) ([]*Schedule, error) { - startTime := time.Now() - schedules, scheduleErr := s.solve(ctx, &provisioner.Spec.Constraints, pods) - durationSeconds := time.Since(startTime).Seconds() +func (s *Scheduler) Solve(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypes []cloudprovider.InstanceType, pods []*v1.Pod) (schedules []*Schedule, err error) { + defer metrics.Measure(schedulingDuration.WithLabelValues(provisioner.Name))() - result := "success" - if scheduleErr != nil { - result = "error" - } - - newLabels := prometheus.Labels{ - metrics.ProvisionerLabel: provisioner.ObjectMeta.Name, - metrics.ResultLabel: result, - } - observer, promErr := scheduleTimeHistogramVec.GetMetricWith(newLabels) - if promErr != nil { - logging.FromContext(ctx).Warnf( - "Failed to record scheduling duration metric [labels=%s, duration=%f]: error=%s", - newLabels, - durationSeconds, - promErr.Error(), - ) - } else { - observer.Observe(durationSeconds) - } - - return schedules, scheduleErr -} + requirements := globalRequirements(instanceTypes).WithProvisioner(*provisioner).Consolidate() -func (s *Scheduler) solve(ctx context.Context, constraints *v1alpha5.Constraints, pods []*v1.Pod) ([]*Schedule, error) { - // Consolidate requirements in memory before executing scheduling logic. - // This is a performance optimization to avoid executing requirement - // evaluation logic redundantly for every pod. - constraints = constraints.Consolidate() // Relax preferences if pods have previously failed to schedule. s.Preferences.Relax(ctx, pods) // Inject temporarily adds specific NodeSelectors to pods, which are then // used by scheduling logic. This isn't strictly necessary, but is a useful // trick to avoid passing topology decisions through the scheduling code. It // lets us to treat TopologySpreadConstraints as just-in-time NodeSelectors. - if err := s.Topology.Inject(ctx, constraints.Requirements, pods); err != nil { + if err := s.Topology.Inject(ctx, requirements, pods); err != nil { return nil, fmt.Errorf("injecting topology, %w", err) } // Separate pods into schedules of isomorphic scheduling constraints. - schedules, err := s.getSchedules(ctx, constraints, pods) + schedules, err = s.getSchedules(ctx, &provisioner.Spec.Constraints, requirements, pods) if err != nil { return nil, fmt.Errorf("getting schedules, %w", err) } @@ -125,16 +97,35 @@ func (s *Scheduler) solve(ctx context.Context, constraints *v1alpha5.Constraints return schedules, nil } +func globalRequirements(instanceTypes []cloudprovider.InstanceType) (requirements v1alpha5.Requirements) { + supported := map[string]sets.String{ + v1.LabelInstanceTypeStable: sets.NewString(), + v1.LabelTopologyZone: sets.NewString(), + v1.LabelArchStable: sets.NewString(), + v1.LabelOSStable: sets.NewString(), + } + for _, instanceType := range instanceTypes { + supported[v1.LabelInstanceTypeStable].Insert(instanceType.Name()) + supported[v1.LabelTopologyZone].Insert(instanceType.Zones().UnsortedList()...) + supported[v1.LabelArchStable].Insert(instanceType.Architecture()) + supported[v1.LabelOSStable].Insert(instanceType.OperatingSystems().UnsortedList()...) + } + for key, values := range supported { + requirements = append(requirements, v1.NodeSelectorRequirement{Key: key, Operator: v1.NodeSelectorOpIn, Values: values.UnsortedList()}) + } + return requirements +} + // getSchedules separates pods into a set of schedules. All pods in each group // contain isomorphic scheduling constraints and can be deployed together on the // same node, or multiple similar nodes if the pods exceed one node's capacity. -func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Constraints, pods []*v1.Pod) ([]*Schedule, error) { +func (s *Scheduler) getSchedules(ctx context.Context, constraints *v1alpha5.Constraints, requirements v1alpha5.Requirements, pods []*v1.Pod) ([]*Schedule, error) { // schedule uniqueness is tracked by hash(Constraints) schedules := map[uint64]*Schedule{} for _, pod := range pods { - constraints, err := NewConstraints(ctx, constraints, pod) + constraints, err := NewConstraints(ctx, constraints, requirements, pod) if err != nil { - logging.FromContext(ctx).Debugf("Ignored pod %s/%s due to invalid constraints, %s", pod.Name, pod.Namespace, err.Error()) + logging.FromContext(ctx).Infof("Ignored pod %s/%s, %s", pod.Name, pod.Namespace, err.Error()) continue } key, err := hashstructure.Hash(constraints, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true}) diff --git a/pkg/controllers/allocation/scheduling/suite_test.go b/pkg/controllers/allocation/scheduling/suite_test.go index 6682c1705fbb..b2b1a1cc045a 100644 --- a/pkg/controllers/allocation/scheduling/suite_test.go +++ b/pkg/controllers/allocation/scheduling/suite_test.go @@ -54,13 +54,17 @@ var _ = BeforeSuite(func() { cloudProvider := &fake.CloudProvider{} registry.RegisterOrDie(ctx, cloudProvider) controller = &allocation.Controller{ - Filter: &allocation.Filter{KubeClient: e.Client}, - Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: corev1.NewForConfigOrDie(e.Config)}, - Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), - Scheduler: scheduling.NewScheduler(e.Client), - Packer: binpacking.NewPacker(), - CloudProvider: cloudProvider, + Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), + Filter: &allocation.Filter{KubeClient: e.Client}, + Scheduler: scheduling.NewScheduler(e.Client, cloudProvider), + Launcher: &allocation.Launcher{ + Packer: &binpacking.Packer{}, + KubeClient: e.Client, + CoreV1Client: corev1.NewForConfigOrDie(e.Config), + CloudProvider: cloudProvider, + }, KubeClient: e.Client, + CloudProvider: cloudProvider, } }) Expect(env.Start()).To(Succeed(), "Failed to start environment") @@ -643,6 +647,67 @@ var _ = Describe("Topology", func() { ExpectSkew(env.Client, v1.LabelHostname).ToNot(ContainElements(BeNumerically(">", 3))) }) }) + + // https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/#interaction-with-node-affinity-and-node-selectors + Context("Combined Zonal Topology and Affinity", func() { + It("should limit spread options by nodeSelector", func() { + ExpectCreated(env.Client, provisioner) + topology := []v1.TopologySpreadConstraint{{ + TopologyKey: v1.LabelTopologyZone, + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + }} + ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + append( + MakePods(5, test.PodOptions{ + Labels: labels, + TopologySpreadConstraints: topology, + NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-1"}, + }), + MakePods(5, test.PodOptions{ + Labels: labels, + TopologySpreadConstraints: topology, + NodeSelector: map[string]string{v1.LabelTopologyZone: "test-zone-2"}, + })..., + )..., + ) + ExpectSkew(env.Client, v1.LabelTopologyZone).To(ConsistOf(5, 5)) + }) + It("should limit spread options by node affinity", func() { + ExpectCreated(env.Client, provisioner) + topology := []v1.TopologySpreadConstraint{{ + TopologyKey: v1.LabelTopologyZone, + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: labels}, + MaxSkew: 1, + }} + ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, append( + MakePods(6, test.PodOptions{ + Labels: labels, + TopologySpreadConstraints: topology, + NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpIn, Values: []string{ + "test-zone-1", "test-zone-2", + }}}, + }), + MakePods(1, test.PodOptions{ + Labels: labels, + TopologySpreadConstraints: topology, + NodeRequirements: []v1.NodeSelectorRequirement{{Key: v1.LabelTopologyZone, Operator: v1.NodeSelectorOpNotIn, Values: []string{ + "test-zone-2", "test-zone-3", + }}}, + })..., + )...) + ExpectSkew(env.Client, v1.LabelTopologyZone).To(ConsistOf(4, 3)) + ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, + MakePods(5, test.PodOptions{ + Labels: labels, + TopologySpreadConstraints: topology, + })..., + ) + ExpectSkew(env.Client, v1.LabelTopologyZone).To(ConsistOf(4, 4, 4)) + }) + }) }) var _ = Describe("Taints", func() { diff --git a/pkg/controllers/allocation/scheduling/taints.go b/pkg/controllers/allocation/scheduling/taints.go index 0c7885448736..77ed35c40860 100644 --- a/pkg/controllers/allocation/scheduling/taints.go +++ b/pkg/controllers/allocation/scheduling/taints.go @@ -23,6 +23,34 @@ import ( type Taints []v1.Taint +func (ts Taints) WithPod(pod *v1.Pod) Taints { + for _, toleration := range pod.Spec.Tolerations { + // Only OpEqual is supported. OpExists does not make sense for + // provisioning -- in theory we could create a taint on the node with a + // random string, but it's unclear what use case this would accomplish. + if toleration.Operator != v1.TolerationOpEqual { + continue + } + var generated []v1.Taint + // Use effect if defined, otherwise taint all effects + if toleration.Effect != "" { + generated = []v1.Taint{{Key: toleration.Key, Value: toleration.Value, Effect: toleration.Effect}} + } else { + generated = []v1.Taint{ + {Key: toleration.Key, Value: toleration.Value, Effect: v1.TaintEffectNoSchedule}, + {Key: toleration.Key, Value: toleration.Value, Effect: v1.TaintEffectNoExecute}, + } + } + // Only add taints that do not already exist on constraints + for _, taint := range generated { + if !ts.Has(taint) { + ts = append(ts, taint) + } + } + } + return ts +} + // Has returns true if taints has a taint for the given key func (ts Taints) Has(taint v1.Taint) bool { for _, t := range ts { diff --git a/pkg/controllers/allocation/scheduling/topology.go b/pkg/controllers/allocation/scheduling/topology.go index 5bf128ab5be5..8a6df17e0aac 100644 --- a/pkg/controllers/allocation/scheduling/topology.go +++ b/pkg/controllers/allocation/scheduling/topology.go @@ -44,10 +44,8 @@ func (t *Topology) Inject(ctx context.Context, requirements v1alpha5.Requirement return fmt.Errorf("computing topology, %w", err) } for _, pod := range topologyGroup.Pods { - pod.Spec.NodeSelector = functional.UnionStringMaps( - pod.Spec.NodeSelector, - map[string]string{topologyGroup.Constraint.TopologyKey: topologyGroup.NextDomain()}, - ) + domain := topologyGroup.NextDomain(requirements.WithPod(pod).Requirement(topologyGroup.Constraint.TopologyKey)) + pod.Spec.NodeSelector = functional.UnionStringMaps(pod.Spec.NodeSelector, map[string]string{topologyGroup.Constraint.TopologyKey: domain}) } } return nil @@ -106,7 +104,7 @@ func (t *Topology) computeHostnameTopology(topologyGroup *TopologyGroup) error { // selection. For example, if a cloud provider or provisioner changes the viable // set of nodes, topology calculations will rebalance the new set of zones. func (t *Topology) computeZonalTopology(ctx context.Context, requirements v1alpha5.Requirements, topologyGroup *TopologyGroup) error { - topologyGroup.Register(requirements.GetLabelValues(v1.LabelTopologyZone)...) + topologyGroup.Register(requirements.Zones().UnsortedList()...) if err := t.countMatchingPods(ctx, topologyGroup); err != nil { return fmt.Errorf("getting matching pods, %w", err) } @@ -142,7 +140,7 @@ func topologyGroupKey(namespace string, constraint v1.TopologySpreadConstraint) hash, err := hashstructure.Hash(struct { Namespace string Constraint v1.TopologySpreadConstraint - }{Namespace: namespace, Constraint: constraint}, hashstructure.FormatV2, nil) + }{namespace, constraint}, hashstructure.FormatV2, nil) if err != nil { panic(fmt.Errorf("unexpected failure hashing topology, %w", err)) } diff --git a/pkg/controllers/allocation/scheduling/topologygroup.go b/pkg/controllers/allocation/scheduling/topologygroup.go index 3d81f295abec..3ab31ad4441a 100644 --- a/pkg/controllers/allocation/scheduling/topologygroup.go +++ b/pkg/controllers/allocation/scheduling/topologygroup.go @@ -18,6 +18,7 @@ import ( "math" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" ) func NewTopologyGroup(pod *v1.Pod, constraint v1.TopologySpreadConstraint) *TopologyGroup { @@ -44,17 +45,19 @@ func (t *TopologyGroup) Register(domains ...string) { // Increment increments the spread of a registered domain func (t *TopologyGroup) Increment(domain string) { - _, ok := t.spread[domain] - if ok { + if _, ok := t.spread[domain]; ok { t.spread[domain]++ } } -// NextDomain chooses a domain that minimizes skew and increments its count -func (t *TopologyGroup) NextDomain() string { +// NextDomain chooses a domain within the constraints that minimizes skew +func (t *TopologyGroup) NextDomain(requirement sets.String) string { minDomain := "" minCount := math.MaxInt32 for domain, count := range t.spread { + if requirement != nil && !requirement.Has(domain) { + continue + } if count <= minCount { minDomain = domain minCount = count diff --git a/pkg/controllers/allocation/suite_test.go b/pkg/controllers/allocation/suite_test.go index 6271450f3cfa..f723cde93f48 100644 --- a/pkg/controllers/allocation/suite_test.go +++ b/pkg/controllers/allocation/suite_test.go @@ -56,13 +56,17 @@ var _ = BeforeSuite(func() { cloudProvider := &fake.CloudProvider{} registry.RegisterOrDie(ctx, cloudProvider) controller = &allocation.Controller{ - Filter: &allocation.Filter{KubeClient: e.Client}, - Binder: &allocation.Binder{KubeClient: e.Client, CoreV1Client: corev1.NewForConfigOrDie(e.Config)}, - Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), - Scheduler: scheduling.NewScheduler(e.Client), - Packer: binpacking.NewPacker(), CloudProvider: cloudProvider, - KubeClient: e.Client, + Batcher: allocation.NewBatcher(1*time.Millisecond, 1*time.Millisecond), + Filter: &allocation.Filter{KubeClient: e.Client}, + Scheduler: scheduling.NewScheduler(e.Client, cloudProvider), + Launcher: &allocation.Launcher{ + Packer: &binpacking.Packer{}, + KubeClient: e.Client, + CoreV1Client: corev1.NewForConfigOrDie(e.Config), + CloudProvider: cloudProvider, + }, + KubeClient: e.Client, } }) Expect(env.Start()).To(Succeed(), "Failed to start environment") @@ -207,19 +211,16 @@ var _ = Describe("Allocation", func() { }) Context("Labels", func() { - It("should label nodes with provisioner labels", func() { + It("should label nodes", func() { provisioner.Spec.Labels = map[string]string{"test-key": "test-value", "test-key-2": "test-value-2"} ExpectCreated(env.Client, provisioner) pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) Expect(node.Labels).To(HaveKeyWithValue("test-key", "test-value")) Expect(node.Labels).To(HaveKeyWithValue("test-key-2", "test-value-2")) - }) - It("should labels nodes with provisioner name", func() { - ExpectCreated(env.Client, provisioner) - pods := ExpectProvisioningSucceeded(ctx, env.Client, controller, provisioner, test.UnschedulablePod()) - node := ExpectNodeExists(env.Client, pods[0].Spec.NodeName) Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.ProvisionerNameLabelKey, provisioner.Name)) + Expect(node.Labels).To(HaveKey(v1.LabelTopologyZone)) + Expect(node.Labels).To(HaveKey(v1.LabelInstanceTypeStable)) }) }) Context("Taints", func() { diff --git a/pkg/controllers/metrics/node/counter.go b/pkg/controllers/metrics/node/counter.go index a41a15ce9897..5e779745adad 100644 --- a/pkg/controllers/metrics/node/counter.go +++ b/pkg/controllers/metrics/node/counter.go @@ -52,7 +52,7 @@ type ( var ( nodeLabelProvisioner = v1alpha5.ProvisionerNameLabelKey - knownValuesForNodeLabels = v1alpha5.WellKnownLabels + knownValuesForNodeLabels = map[string][]string{} nodeCountByProvisioner = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -205,7 +205,7 @@ func filterReadyNodes(consume nodeListConsumerFunc) nodeListConsumerFunc { } } -func metricLabelsFrom(nodeLabels client.MatchingLabels) prometheus.Labels { +func metricLabelsFrom(nodeLabels map[string]string) prometheus.Labels { metricLabels := prometheus.Labels{} // Exclude node label values that not present or are empty strings. if arch := nodeLabels[nodeLabelArch]; arch != "" { diff --git a/pkg/metrics/constants.go b/pkg/metrics/constants.go index d53880dce25f..82267cbfb1de 100644 --- a/pkg/metrics/constants.go +++ b/pkg/metrics/constants.go @@ -14,11 +14,17 @@ limitations under the License. package metrics +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + const ( // KarpenterNamespace is the common namespace for application metrics KarpenterNamespace = "karpenter" - ResultLabel = "result" + ErrorLabel = "error" ProvisionerLabel = "provisioner" ) @@ -30,3 +36,10 @@ func DurationBuckets() []float64 { return []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60} } + +// Measure returns a deferrable function that observes the duration between the +// defer statement and the end of the function. +func Measure(observer prometheus.Observer) func() { + start := time.Now() + return func() { observer.Observe(time.Since(start).Seconds()) } +}