Skip to content

Commit

Permalink
Merge pull request kubernetes#4233 from ryanmcnamara/rm/expander-chain
Browse files Browse the repository at this point in the history
Allow specification of multiple expanders
  • Loading branch information
k8s-ci-robot authored and Jian Cheung committed Jul 29, 2022
1 parent 30a0fe5 commit 40e01a6
Show file tree
Hide file tree
Showing 16 changed files with 354 additions and 129 deletions.
6 changes: 6 additions & 0 deletions cluster-autoscaler/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,12 @@ would match the cluster size. This expander is described in more details

* `priority` - selects the node group that has the highest priority assigned by the user. It's configuration is described in more details [here](expander/priority/readme.md)


Multiple expanders may be passed, i.e.
`.cluster-autoscaler --expander=priority,least-waste`

This will cause the `least-waste` expander to be used as a fallback in the event that the priority expander selects multiple node groups. In general, a list of expanders can be used, where the output of one is passed to the next and the final decision by randomly selecting one. An expander must not appear in the list more than once.

### Does CA respect node affinity when selecting node groups to scale up?

CA respects `nodeSelector` and `requiredDuringSchedulingIgnoredDuringExecution` in nodeAffinity given that you have labelled your node groups accordingly. If there is a pod that cannot be scheduled with either `nodeSelector` or `requiredDuringSchedulingIgnoredDuringExecution` specified, CA will only consider node groups that satisfy those requirements for expansion.
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type AutoscalingOptions struct {
NodeGroupAutoDiscovery []string
// EstimatorName is the estimator used to estimate the number of needed nodes in scale up.
EstimatorName string
// ExpanderName sets the type of node group expander to be used in scale up
ExpanderName string
// ExpanderNames sets the chain of node group expanders to be used in scale up
ExpanderNames string
// IgnoreDaemonSetsUtilization is whether CA will ignore DaemonSet pods when calculating resource utilization for scaling down
IgnoreDaemonSetsUtilization bool
// IgnoreMirrorPodsUtilization is whether CA will ignore Mirror pods when calculating resource utilization for scaling down
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package core

import (
"strings"
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
Expand Down Expand Up @@ -101,7 +102,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
}
if opts.ExpanderStrategy == nil {
expanderStrategy, err := factory.ExpanderStrategyFromString(opts.ExpanderName,
expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","),
opts.CloudProvider, opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/expander/expander.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ type Option struct {
type Strategy interface {
BestOption(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) *Option
}

// Filter describes an interface for filtering to equally good options according to some criteria
type Filter interface {
BestOptions(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) []Option
}
46 changes: 46 additions & 0 deletions cluster-autoscaler/expander/factory/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package factory

import (
"k8s.io/autoscaler/cluster-autoscaler/expander"

schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type chainStrategy struct {
filters []expander.Filter
fallback expander.Strategy
}

func newChainStrategy(filters []expander.Filter, fallback expander.Strategy) expander.Strategy {
return &chainStrategy{
filters: filters,
fallback: fallback,
}
}

func (c *chainStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
filteredOptions := options
for _, filter := range c.filters {
filteredOptions = filter.BestOptions(filteredOptions, nodeInfo)
if len(filteredOptions) == 1 {
return &filteredOptions[0]
}
}
return c.fallback.BestOption(filteredOptions, nodeInfo)
}
133 changes: 133 additions & 0 deletions cluster-autoscaler/expander/factory/chain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package factory

import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"strings"
"testing"

"github.com/stretchr/testify/assert"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type substringTestFilterStrategy struct {
substring string
}

func newSubstringTestFilterStrategy(substring string) *substringTestFilterStrategy {
return &substringTestFilterStrategy{
substring: substring,
}
}

func (s *substringTestFilterStrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
var ret []expander.Option
for _, option := range expansionOptions {
if strings.Contains(option.Debug, s.substring) {
ret = append(ret, option)
}
}
return ret

}

func (s *substringTestFilterStrategy) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
ret := s.BestOptions(expansionOptions, nodeInfo)
if len(ret) == 0 {
return nil
}
return &ret[0]
}

func TestChainStrategy_BestOption(t *testing.T) {
for name, tc := range map[string]struct {
filters []expander.Filter
fallback expander.Strategy
options []expander.Option
expected *expander.Option
}{
"selects with no filters": {
filters: []expander.Filter{},
fallback: newSubstringTestFilterStrategy("a"),
options: []expander.Option{
*newOption("b"),
*newOption("a"),
},
expected: newOption("a"),
},
"filters with one filter": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("a"),
},
fallback: newSubstringTestFilterStrategy("b"),
options: []expander.Option{
*newOption("ab"),
*newOption("b"),
},
expected: newOption("ab"),
},
"filters with multiple filters": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("a"),
newSubstringTestFilterStrategy("b"),
},
fallback: newSubstringTestFilterStrategy("x"),
options: []expander.Option{
*newOption("xab"),
*newOption("xa"),
*newOption("x"),
},
expected: newOption("xab"),
},
"selects from multiple after filters": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("x"),
},
fallback: newSubstringTestFilterStrategy("a"),
options: []expander.Option{
*newOption("xc"),
*newOption("xaa"),
*newOption("xab"),
},
expected: newOption("xaa"),
},
"short circuits": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("a"),
newSubstringTestFilterStrategy("b"),
},
fallback: newSubstringTestFilterStrategy("x"),
options: []expander.Option{
*newOption("a"),
},
expected: newOption("a"),
},
} {
t.Run(name, func(t *testing.T) {
subject := newChainStrategy(tc.filters, tc.fallback)
actual := subject.BestOption(tc.options, nil)
assert.Equal(t, tc.expected, actual)
})
}
}

func newOption(debug string) *expander.Option {
return &expander.Option{
Debug: debug,
}
}
62 changes: 40 additions & 22 deletions cluster-autoscaler/expander/factory/expander_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,48 @@ import (
kube_client "k8s.io/client-go/kubernetes"
)

// ExpanderStrategyFromString creates an expander.Strategy according to its name
func ExpanderStrategyFromString(expanderFlag string, cloudProvider cloudprovider.CloudProvider,
// ExpanderStrategyFromStrings creates an expander.Strategy according to the names of the expanders passed in
func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprovider.CloudProvider,
autoscalingKubeClients *context.AutoscalingKubeClients, kubeClient kube_client.Interface,
configNamespace string) (expander.Strategy, errors.AutoscalerError) {
switch expanderFlag {
case expander.RandomExpanderName:
return random.NewStrategy(), nil
case expander.MostPodsExpanderName:
return mostpods.NewStrategy(), nil
case expander.LeastWasteExpanderName:
return waste.NewStrategy(), nil
case expander.PriceBasedExpanderName:
if _, err := cloudProvider.Pricing(); err != nil {
return nil, err
var filters []expander.Filter
seenExpanders := map[string]struct{}{}
strategySeen := false
for i, expanderFlag := range expanderFlags {
if _, ok := seenExpanders[expanderFlag]; ok {
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s was specified multiple times, each expander must not be specified more than once", expanderFlag)
}
if strategySeen {
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s came after an expander %s that will always return only one result, this is not allowed since %s will never be used", expanderFlag, expanderFlags[i-1], expanderFlag)
}
seenExpanders[expanderFlag] = struct{}{}

switch expanderFlag {
case expander.RandomExpanderName:
filters = append(filters, random.NewFilter())
case expander.MostPodsExpanderName:
filters = append(filters, mostpods.NewFilter())
case expander.LeastWasteExpanderName:
filters = append(filters, waste.NewFilter())
case expander.PriceBasedExpanderName:
if _, err := cloudProvider.Pricing(); err != nil {
return nil, err
}
filters = append(filters, price.NewFilter(cloudProvider,
price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
price.SimpleNodeUnfitness))
case expander.PriorityBasedExpanderName:
// It seems other listers do the same here - they never receive the termination msg on the ch.
// This should be currently OK.
stopChannel := make(chan struct{})
lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder))
default:
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
}
if _, ok := filters[len(filters)-1].(expander.Strategy); ok {
strategySeen = true
}
return price.NewStrategy(cloudProvider,
price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
price.SimpleNodeUnfitness), nil
case expander.PriorityBasedExpanderName:
// It seems other listers do the same here - they never receive the termination msg on the ch.
// This should be currently OK.
stopChannel := make(chan struct{})
lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
return priority.NewStrategy(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder)
}
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
return newChainStrategy(filters, random.NewStrategy()), nil
}
12 changes: 5 additions & 7 deletions cluster-autoscaler/expander/mostpods/mostpods.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@ package mostpods

import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type mostpods struct {
fallbackStrategy expander.Strategy
}

// NewStrategy returns a scale up strategy (expander) that picks the node group that can schedule the most pods
func NewStrategy() expander.Strategy {
return &mostpods{random.NewStrategy()}
// NewFilter returns a scale up filter that picks the node group that can schedule the most pods
func NewFilter() expander.Filter {
return &mostpods{}
}

// BestOption Selects the expansion option that schedules the most pods
func (m *mostpods) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
func (m *mostpods) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
var maxPods int
var maxOptions []expander.Option

Expand All @@ -51,5 +49,5 @@ func (m *mostpods) BestOption(expansionOptions []expander.Option, nodeInfo map[s
return nil
}

return m.fallbackStrategy.BestOption(maxOptions, nodeInfo)
return maxOptions
}
30 changes: 17 additions & 13 deletions cluster-autoscaler/expander/price/price.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ var (
gpuUnfitnessOverride = 1000.0
)

// NewStrategy returns an expansion strategy that picks nodes based on price and preferred node type.
func NewStrategy(cloudProvider cloudprovider.CloudProvider,
// NewFilter returns an expansion filter that picks nodes based on price and preferred node type.
func NewFilter(cloudProvider cloudprovider.CloudProvider,
preferredNodeProvider PreferredNodeProvider,
nodeUnfitness NodeUnfitness,
) expander.Strategy {
) expander.Filter {
return &priceBased{
cloudProvider: cloudProvider,
preferredNodeProvider: preferredNodeProvider,
Expand All @@ -87,8 +87,8 @@ func NewStrategy(cloudProvider cloudprovider.CloudProvider,
}

// BestOption selects option based on cost and preferred node type.
func (p *priceBased) BestOption(expansionOptions []expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo) *expander.Option {
var bestOption *expander.Option
func (p *priceBased) BestOptions(expansionOptions []expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo) []expander.Option {
var bestOptions []expander.Option
bestOptionScore := 0.0
now := time.Now()
then := now.Add(time.Hour)
Expand Down Expand Up @@ -169,17 +169,21 @@ nextoption:

klog.V(5).Infof("Price expander for %s: %s", option.NodeGroup.Id(), debug)

if bestOption == nil || bestOptionScore > optionScore {
bestOption = &expander.Option{
NodeGroup: option.NodeGroup,
NodeCount: option.NodeCount,
Debug: fmt.Sprintf("%s | price-expander: %s", option.Debug, debug),
Pods: option.Pods,
}
maybeBestOption := expander.Option{
NodeGroup: option.NodeGroup,
NodeCount: option.NodeCount,
Debug: fmt.Sprintf("%s | price-expander: %s", option.Debug, debug),
Pods: option.Pods,
}
if len(bestOptions) == 0 || bestOptionScore == optionScore {
bestOptions = append(bestOptions, maybeBestOption)
bestOptionScore = optionScore
} else if bestOptionScore > optionScore {
bestOptions = []expander.Option{maybeBestOption}
bestOptionScore = optionScore
}
}
return bestOption
return bestOptions
}

// buildPod creates a pod with specified resources.
Expand Down
Loading

0 comments on commit 40e01a6

Please sign in to comment.