Skip to content

Commit

Permalink
Factor minValues requirements checking into the InstanceTypes methods
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed May 16, 2024
1 parent f26918e commit 4c2ce1d
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 145 deletions.
77 changes: 77 additions & 0 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/scheduling"
Expand Down Expand Up @@ -123,6 +124,82 @@ func (its InstanceTypes) Compatible(requirements scheduling.Requirements) Instan
return filteredInstanceTypes
}

// SatisfiesMinValues validates whether the InstanceTypes satisfies the minValues requirements
// It returns the minimum number of needed instance types to satisfy the minValues requirement and an error
// that indicates whether the InstanceTypes satisfy the passed-in requirements
// This minNeededInstanceTypes value is dependent on the ordering of instance types, so relying on this value in a
// deterministic way implies that the instance types are sorted ahead of using this method
// For example:
// Requirements:
// - key: node.kubernetes.io/instance-type
// operator: In
// values: ["c4.large","c4.xlarge","c5.large","c5.xlarge","m4.large","m4.xlarge"]
// minValues: 3
// - key: karpenter.kwok.sh/instance-family
// operator: In
// values: ["c4","c5","m4"]
// minValues: 3
//
// InstanceTypes: ["c4.large","c5.xlarge","m4.2xlarge"], it PASSES the requirements
//
// we get the map as : {
// node.kubernetes.io/instance-type: ["c4.large","c5.xlarge","m4.2xlarge"],
// karpenter.k8s.aws/instance-family: ["c4","c5","m4"]
// }
// so, returns empty key.
//
// And if InstanceTypes: ["c4.large","c4.xlarge","c5.2xlarge"], it FAILS the requirements
//
// we get the map as : {
// node.kubernetes.io/instance-type: ["c4.large","c4.xlarge","c5.2xlarge"],
// karpenter.k8s.aws/instance-family: ["c4","c5"] // minimum requirement failed for this.
// }
func (its InstanceTypes) SatisfiesMinValues(requirements scheduling.Requirements) (minNeededInstanceTypes int, err error) {
valuesForKey := map[string]sets.Set[string]{}
// We validate if sorting by price and truncating the number of instance types to minItems breaks the minValue requirement.
// If minValue requirement fails, we return an error that indicates the first requirement key that couldn't be satisfied.
var incompatibleKey string
for i, it := range its {
for _, req := range requirements {
if req.MinValues != nil {
if _, ok := valuesForKey[req.Key]; !ok {
valuesForKey[req.Key] = sets.New[string]()
}
valuesForKey[req.Key] = valuesForKey[req.Key].Insert(it.Requirements.Get(req.Key).Values()...)
}
}
incompatibleKey = func() string {
for k, v := range valuesForKey {
// Break if any of the MinValues of requirement is not honored
if len(v) < lo.FromPtr(requirements.Get(k).MinValues) {
return k
}
}
return ""
}()
if incompatibleKey == "" {
return i + 1, nil
}
}
if incompatibleKey != "" {
return len(its), fmt.Errorf("minValues requirement is not met for %q", incompatibleKey)
}
return len(its), nil
}

// Truncate truncates the InstanceTypes based on the passed-in requirements
// It returns an error if it isn't possible to truncate the instance types on maxItems without violating minValues
func (its InstanceTypes) Truncate(requirements scheduling.Requirements, maxItems int) (InstanceTypes, error) {
truncatedInstanceTypes := InstanceTypes(lo.Slice(its.OrderByPrice(requirements), 0, maxItems))
// Only check for a validity of NodeClaim if its requirement has minValues in it.
if requirements.HasMinValues() {
if _, err := truncatedInstanceTypes.SatisfiesMinValues(requirements); err != nil {
return its, fmt.Errorf("validating minValues, %w", err)
}
}
return truncatedInstanceTypes, nil
}

type InstanceTypeOverhead struct {
// KubeReserved returns the default resources allocated to kubernetes system daemons by default
KubeReserved v1.ResourceList
Expand Down
49 changes: 23 additions & 26 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,20 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
return c.computeSpotToSpotConsolidation(ctx, candidates, results, candidatePrice)
}

var incompatibleMinReqKey string
// filterByPriceWithMinValues returns the instanceTypes that are lower priced than the current candidate and the requirement for the NodeClaim that does not meet minValues.
// filterByPrice returns the instanceTypes that are lower priced than the current candidate and any error that indicates the input couldn't be filtered.
// If we use this directly for spot-to-spot consolidation, we are bound to get repeated consolidations because the strategy that chooses to launch the spot instance from the list does
// it based on availability and price which could result in selection/launch of non-lowest priced instance in the list. So, we would keep repeating this loop till we get to lowest priced instance
// causing churns and landing onto lower available spot instance ultimately resulting in higher interruptions.
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, incompatibleMinReqKey, _ =
filterByPriceWithMinValues(results.NewNodeClaims[0].InstanceTypeOptions, results.NewNodeClaims[0].Requirements, candidatePrice)

results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, err = filterByPrice(results.NewNodeClaims[0].InstanceTypeOptions, results.NewNodeClaims[0].Requirements, candidatePrice)
if err != nil {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Filtering by price: %v", err))...)
}
return Command{}, pscheduling.Results{}, nil
}
if len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
if len(incompatibleMinReqKey) > 0 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("minValues requirement is not met for %s", incompatibleMinReqKey))...)
} else {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
}
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
}
return Command{}, pscheduling.Results{}, nil
}
Expand Down Expand Up @@ -227,24 +226,21 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
// Since we are sure that the replacement nodeclaim considered for the spot candidates are spot, we will enforce it through the requirements.
results.NewNodeClaims[0].Requirements.Add(scheduling.NewRequirement(v1beta1.CapacityTypeLabelKey, v1.NodeSelectorOpIn, v1beta1.CapacityTypeSpot))
// All possible replacements for the current candidate compatible with spot offerings
instanceTypeOptionsWithSpotOfferings :=
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions.Compatible(results.NewNodeClaims[0].Requirements)

var incompatibleMinReqKey string
var numInstanceTypes int
// Possible replacements that are lower priced than the current candidate and the requirement that is not compatible with minValues
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, incompatibleMinReqKey, numInstanceTypes =
filterByPriceWithMinValues(instanceTypeOptionsWithSpotOfferings, results.NewNodeClaims[0].Requirements, candidatePrice)
instanceTypeOptionsWithSpotOfferings := results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions.Compatible(results.NewNodeClaims[0].Requirements)

// filterByPrice returns the instanceTypes that are lower priced than the current candidate and any error that indicates the input couldn't be filtered.
var err error
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, err = filterByPrice(instanceTypeOptionsWithSpotOfferings, results.NewNodeClaims[0].Requirements, candidatePrice)
if err != nil {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Filtering by price: %v", err))...)
}
return Command{}, pscheduling.Results{}, nil
}
if len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
if len(incompatibleMinReqKey) > 0 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("minValues requirement is not met for %s", incompatibleMinReqKey))...)
} else {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace spot node with a cheaper spot node")...)
}
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
}
// no instance types remain after filtering by price
return Command{}, pscheduling.Results{}, nil
}

Expand Down Expand Up @@ -278,8 +274,9 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
// If we had restricted instance types to min flexibility at launch at step (1) i.e CreateInstanceFromTypes(A,B,C), we would have received the instance type part of the list preventing immediate consolidation.
// Taking this to 15 types, we need to only send the 15 cheapest types in the CreateInstanceFromTypes call so that the resulting instance is always in that set of 15 and we won’t immediately consolidate.
if results.NewNodeClaims[0].Requirements.HasMinValues() {
// Here we are trying to get the max of the minimum instances required to satify the minimum requirement and the default 15 to cap the instances for spot-to-spot consolidation.
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions = lo.Slice(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, 0, lo.Max([]int{MinInstanceTypesForSpotToSpotConsolidation, numInstanceTypes}))
// Here we are trying to get the max of the minimum instances required to satisfy the minimum requirement and the default 15 to cap the instances for spot-to-spot consolidation.
minInstanceTypes, _ := results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions.SatisfiesMinValues(results.NewNodeClaims[0].Requirements)
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions = lo.Slice(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, 0, lo.Max([]int{MinInstanceTypesForSpotToSpotConsolidation, minInstanceTypes}))
} else {
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions = lo.Slice(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, 0, MinInstanceTypesForSpotToSpotConsolidation)
}
Expand Down
25 changes: 11 additions & 14 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,28 +167,25 @@ func GetPodEvictionCost(ctx context.Context, p *v1.Pod) float64 {
return clamp(-10.0, cost, 10.0)
}

// filterByPriceWithMinValues returns the instanceTypes that are lower priced than the current candidate and iterates over the cumulative minimum requirement of the InstanceTypeOptions to see if it meets the minValues of requirements.
// The minValues requirement is checked again after filterByPrice as it may result in more constrained InstanceTypeOptions for a NodeClaim
func filterByPriceWithMinValues(options []*cloudprovider.InstanceType, reqs scheduling.Requirements, price float64) ([]*cloudprovider.InstanceType, string, int) {
var result []*cloudprovider.InstanceType

// filterByPrice returns the instanceTypes that are lower priced than the current candidate.
// The minValues requirement is checked again after filterByPrice as it may result in more constrained InstanceTypeOptions for a NodeClaim.
// If the result of the filtering means that minValues can't be satisfied, we return an error.
func filterByPrice(options []*cloudprovider.InstanceType, reqs scheduling.Requirements, price float64) ([]*cloudprovider.InstanceType, error) {
var res cloudprovider.InstanceTypes
for _, it := range options {
launchPrice := worstLaunchPrice(it.Offerings.Available(), reqs)
if launchPrice < price {
result = append(result, it)
res = append(res, it)
}
}
var incompatibleReqKey string
var numInstanceTypes int
// Only try to find the incompatible minValue requirement key if requirements have minValues.
if reqs.HasMinValues() {
// We would have already filtered the invalid nodeclaim not meeting the minimum requirements in simulated scheduling results.
// We would have already filtered the invalid NodeClaim not meeting the minimum requirements in simulated scheduling results.
// Here the instanceTypeOptions changed again based on the price and requires re-validation.
incompatibleReqKey, numInstanceTypes = pscheduling.IncompatibleReqAcrossInstanceTypes(reqs, lo.Slice(result, 0, pscheduling.MaxInstanceTypes))
if _, err := res.SatisfiesMinValues(reqs); err != nil {
return nil, fmt.Errorf("validating minValues, %w", err)
}
}
// If minValues is NOT met for any of the requirement across InstanceTypes, then return empty InstanceTypeOptions as we cannot launch with the remaining InstanceTypes.
result = lo.Ternary(len(incompatibleReqKey) > 0, []*cloudprovider.InstanceType{}, result)
return result, incompatibleReqKey, numInstanceTypes
return res, nil
}

func disruptionCost(ctx context.Context, pods []*v1.Pod) float64 {
Expand Down
13 changes: 4 additions & 9 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
// required
replacementHasValidInstanceTypes := false
if cmd.Action() == ReplaceAction {
cmd.replacements[0].InstanceTypeOptions = filterOutSameType(cmd.replacements[0], candidatesToConsolidate)
replacementHasValidInstanceTypes = len(cmd.replacements[0].InstanceTypeOptions) > 0
cmd.replacements[0].InstanceTypeOptions, err = filterOutSameType(cmd.replacements[0], candidatesToConsolidate)
replacementHasValidInstanceTypes = len(cmd.replacements[0].InstanceTypeOptions) > 0 && err == nil
}

// replacementHasValidInstanceTypes will be false if the replacement action has valid instance types remaining after filtering.
Expand Down Expand Up @@ -169,7 +169,7 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
// This code sees that t3a.small is the cheapest type in both lists and filters it and anything more expensive out
// leaving the valid consolidation:
// NodeClaims=[t3a.2xlarge, t3a.2xlarge, t3a.small] -> 1 of t3a.nano
func filterOutSameType(newNodeClaim *scheduling.NodeClaim, consolidate []*Candidate) []*cloudprovider.InstanceType {
func filterOutSameType(newNodeClaim *scheduling.NodeClaim, consolidate []*Candidate) ([]*cloudprovider.InstanceType, error) {
existingInstanceTypes := sets.New[string]()
pricesByInstanceType := map[string]float64{}

Expand Down Expand Up @@ -200,12 +200,7 @@ func filterOutSameType(newNodeClaim *scheduling.NodeClaim, consolidate []*Candid
}
}
}
// computeConsolidation would have thrown error related to `incompatibleRequirementKey` if minValues from requirements is not satisfied by the InstanceTypeOptions before we get to `filterOutSameType`.
// And we re-run the check here. The instanceTypeOptions we pass here should already have been sorted by price during computeConsolidation and if the minValues is not satisfied after `filterOutSameType`, we return empty InstanceTypeOptions thereby preventing consolidation.
// We do not use the incompatiblekey, numInstanceTypes from here as part of the messaging since this runs multiple times depending on the number of nodes in the cluster and we
// already have one messaging from the computeConsolidation if minValues is not met from the requirements.
filterByPrice, _, _ := filterByPriceWithMinValues(newNodeClaim.InstanceTypeOptions, newNodeClaim.Requirements, maxPrice)
return filterByPrice
return filterByPrice(newNodeClaim.InstanceTypeOptions, newNodeClaim.Requirements, maxPrice)
}

func (m *MultiNodeConsolidation) Type() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ var _ = Describe("Instance Type Selection", func() {
ExpectApplied(ctx, env.Client, pod2)
results, _ := prov.Schedule(ctx)
for _, v := range results.PodErrors {
Expect(v.Error()).To(ContainSubstring("minValues requirement is not met for karpenter/numerical-value"))
Expect(v.Error()).To(ContainSubstring(`minValues requirement is not met for "karpenter/numerical-value"`))
}
ExpectNotScheduled(ctx, env.Client, pod1)
ExpectNotScheduled(ctx, env.Client, pod2)
Expand Down
Loading

0 comments on commit 4c2ce1d

Please sign in to comment.