Skip to content

Commit

Permalink
Allow specification of multiple expanders
Browse files Browse the repository at this point in the history
Multiple expanders can now be specified, expanders now "filter to the
tied for best" instead of "selecting the best" so the output of one
expander is now fed to the input of the next. Each expander may only
be used once to disallow bad configuration. This should not be a change
in functionality as in the event of a tie the random expander is still
used.
  • Loading branch information
ryanmcnamara committed Jul 30, 2021
1 parent 1ecc8b4 commit 1ad750b
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 129 deletions.
6 changes: 6 additions & 0 deletions cluster-autoscaler/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,12 @@ would match the cluster size. This expander is described in more details
* `priority` - selects the node group that has the highest priority assigned by the user. It's configuration is described in more details [here](expander/priority/readme.md)
Multiple expanders may be passed, i.e.
`.cluster-autoscaler --expader=priority,least-waste`
This will cause the `least-waste` expander to be used as a fallback in the event that the priority expander selects multiple node groups. In general, a list of expanders can be used, where the output of one is passed to the next and the final decision by randomly selecting one. An expander must not appear in the list more than once.
### Does CA respect node affinity when selecting node groups to scale up?
CA respects `nodeSelector` and `requiredDuringSchedulingIgnoredDuringExecution` in nodeAffinity given that you have labelled your node groups accordingly. If there is a pod that cannot be scheduled with either `nodeSelector` or `requiredDuringSchedulingIgnoredDuringExecution` specified, CA will only consider node groups that satisfy those requirements for expansion.
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ type AutoscalingOptions struct {
NodeGroupAutoDiscovery []string
// EstimatorName is the estimator used to estimate the number of needed nodes in scale up.
EstimatorName string
// ExpanderName sets the type of node group expander to be used in scale up
ExpanderName string
// ExpanderNames sets the chain of node group expanders to be used in scale up
ExpanderNames string
// IgnoreDaemonSetsUtilization is whether CA will ignore DaemonSet pods when calculating resource utilization for scaling down
IgnoreDaemonSetsUtilization bool
// IgnoreMirrorPodsUtilization is whether CA will ignore Mirror pods when calculating resource utilization for scaling down
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package core

import (
"strings"
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
Expand Down Expand Up @@ -101,7 +102,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
}
if opts.ExpanderStrategy == nil {
expanderStrategy, err := factory.ExpanderStrategyFromString(opts.ExpanderName,
expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","),
opts.CloudProvider, opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/expander/expander.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ type Option struct {
type Strategy interface {
BestOption(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) *Option
}

// Filter describes an interface for filtering to equally good options according to some criteria
type Filter interface {
BestOptions(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) []Option
}
27 changes: 27 additions & 0 deletions cluster-autoscaler/expander/factory/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package factory

import (
"k8s.io/autoscaler/cluster-autoscaler/expander"

schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type chainStrategy struct {
filters []expander.Filter
fallback expander.Strategy
}

func newChainStrategy(filters []expander.Filter, fallback expander.Strategy) expander.Strategy {
return &chainStrategy{
filters: filters,
fallback: fallback,
}
}

func (c *chainStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
filteredOptions := options
for _, filter := range c.filters {
filteredOptions = filter.BestOptions(filteredOptions, nodeInfo)
}
return c.fallback.BestOption(filteredOptions, nodeInfo)
}
106 changes: 106 additions & 0 deletions cluster-autoscaler/expander/factory/chain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package factory

import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"strings"
"testing"

"github.com/stretchr/testify/assert"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type substringTestFilterStrategy struct {
substring string
}

func newSubstringTestFilterStrategy(substring string) *substringTestFilterStrategy {
return &substringTestFilterStrategy{
substring: substring,
}
}

func (s *substringTestFilterStrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
var ret []expander.Option
for _, option := range expansionOptions {
if strings.Contains(option.Debug, s.substring) {
ret = append(ret, option)
}
}
return ret

}

func (s *substringTestFilterStrategy) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
ret := s.BestOptions(expansionOptions, nodeInfo)
if len(ret) == 0 {
return nil
}
return &ret[0]
}

func TestChainStrategy_BestOption(t *testing.T) {
for name, tc := range map[string]struct {
filters []expander.Filter
fallback expander.Strategy
options []expander.Option
expected *expander.Option
}{
"selects with no filters": {
filters: []expander.Filter{},
fallback: newSubstringTestFilterStrategy("a"),
options: []expander.Option{
*newOption("b"),
*newOption("a"),
},
expected: newOption("a"),
},
"filters with one filter": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("a"),
},
fallback: newSubstringTestFilterStrategy("b"),
options: []expander.Option{
*newOption("ab"),
*newOption("b"),
},
expected: newOption("ab"),
},
"filters with multiple filters": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("a"),
newSubstringTestFilterStrategy("b"),
},
fallback: newSubstringTestFilterStrategy("x"),
options: []expander.Option{
*newOption("xab"),
*newOption("xa"),
*newOption("x"),
},
expected: newOption("xab"),
},
"selects from multiple after filters": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("x"),
},
fallback: newSubstringTestFilterStrategy("a"),
options: []expander.Option{
*newOption("xc"),
*newOption("xaa"),
*newOption("xab"),
},
expected: newOption("xaa"),
},
} {
t.Run(name, func(t *testing.T) {
subject := newChainStrategy(tc.filters, tc.fallback)
actual := subject.BestOption(tc.options, nil)
assert.Equal(t, tc.expected, actual)
})
}
}

func newOption(debug string) *expander.Option {
return &expander.Option{
Debug: debug,
}
}
55 changes: 33 additions & 22 deletions cluster-autoscaler/expander/factory/expander_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,41 @@ import (
kube_client "k8s.io/client-go/kubernetes"
)

// ExpanderStrategyFromString creates an expander.Strategy according to its name
func ExpanderStrategyFromString(expanderFlag string, cloudProvider cloudprovider.CloudProvider,
// ExpanderStrategyFromStrings creates an expander.Strategy according to the names of the expanders passed in
func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprovider.CloudProvider,
autoscalingKubeClients *context.AutoscalingKubeClients, kubeClient kube_client.Interface,
configNamespace string) (expander.Strategy, errors.AutoscalerError) {
switch expanderFlag {
case expander.RandomExpanderName:
return random.NewStrategy(), nil
case expander.MostPodsExpanderName:
return mostpods.NewStrategy(), nil
case expander.LeastWasteExpanderName:
return waste.NewStrategy(), nil
case expander.PriceBasedExpanderName:
if _, err := cloudProvider.Pricing(); err != nil {
return nil, err
var filters []expander.Filter
seenExpanders := map[string]struct{}{}
for _, expanderFlag := range expanderFlags {
if _, ok := seenExpanders[expanderFlag]; ok {
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s was specified multiple times, each expander must not be specified more than once", expanderFlag)
}
seenExpanders[expanderFlag] = struct{}{}

switch expanderFlag {
case expander.RandomExpanderName:
filters = append(filters, random.NewFilter())
case expander.MostPodsExpanderName:
filters = append(filters, mostpods.NewFilter())
case expander.LeastWasteExpanderName:
filters = append(filters, waste.NewFilter())
case expander.PriceBasedExpanderName:
if _, err := cloudProvider.Pricing(); err != nil {
return nil, err
}
filters = append(filters, price.NewFilter(cloudProvider,
price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
price.SimpleNodeUnfitness))
case expander.PriorityBasedExpanderName:
// It seems other listers do the same here - they never receive the termination msg on the ch.
// This should be currently OK.
stopChannel := make(chan struct{})
lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder))
default:
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
}
return price.NewStrategy(cloudProvider,
price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
price.SimpleNodeUnfitness), nil
case expander.PriorityBasedExpanderName:
// It seems other listers do the same here - they never receive the termination msg on the ch.
// This should be currently OK.
stopChannel := make(chan struct{})
lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
return priority.NewStrategy(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder)
}
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
return newChainStrategy(filters, random.NewStrategy()), nil
}
12 changes: 5 additions & 7 deletions cluster-autoscaler/expander/mostpods/mostpods.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@ package mostpods

import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type mostpods struct {
fallbackStrategy expander.Strategy
}

// NewStrategy returns a scale up strategy (expander) that picks the node group that can schedule the most pods
func NewStrategy() expander.Strategy {
return &mostpods{random.NewStrategy()}
// NewFilter returns a scale up filter that picks the node group that can schedule the most pods
func NewFilter() expander.Filter {
return &mostpods{}
}

// BestOption Selects the expansion option that schedules the most pods
func (m *mostpods) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
func (m *mostpods) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
var maxPods int
var maxOptions []expander.Option

Expand All @@ -51,5 +49,5 @@ func (m *mostpods) BestOption(expansionOptions []expander.Option, nodeInfo map[s
return nil
}

return m.fallbackStrategy.BestOption(maxOptions, nodeInfo)
return maxOptions
}
30 changes: 17 additions & 13 deletions cluster-autoscaler/expander/price/price.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ var (
gpuUnfitnessOverride = 1000.0
)

// NewStrategy returns an expansion strategy that picks nodes based on price and preferred node type.
func NewStrategy(cloudProvider cloudprovider.CloudProvider,
// NewFilter returns an expansion filter that picks nodes based on price and preferred node type.
func NewFilter(cloudProvider cloudprovider.CloudProvider,
preferredNodeProvider PreferredNodeProvider,
nodeUnfitness NodeUnfitness,
) expander.Strategy {
) expander.Filter {
return &priceBased{
cloudProvider: cloudProvider,
preferredNodeProvider: preferredNodeProvider,
Expand All @@ -87,8 +87,8 @@ func NewStrategy(cloudProvider cloudprovider.CloudProvider,
}

// BestOption selects option based on cost and preferred node type.
func (p *priceBased) BestOption(expansionOptions []expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo) *expander.Option {
var bestOption *expander.Option
func (p *priceBased) BestOptions(expansionOptions []expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo) []expander.Option {
var bestOptions []expander.Option
bestOptionScore := 0.0
now := time.Now()
then := now.Add(time.Hour)
Expand Down Expand Up @@ -169,17 +169,21 @@ nextoption:

klog.V(5).Infof("Price expander for %s: %s", option.NodeGroup.Id(), debug)

if bestOption == nil || bestOptionScore > optionScore {
bestOption = &expander.Option{
NodeGroup: option.NodeGroup,
NodeCount: option.NodeCount,
Debug: fmt.Sprintf("%s | price-expander: %s", option.Debug, debug),
Pods: option.Pods,
}
maybeBestOption := expander.Option{
NodeGroup: option.NodeGroup,
NodeCount: option.NodeCount,
Debug: fmt.Sprintf("%s | price-expander: %s", option.Debug, debug),
Pods: option.Pods,
}
if len(bestOptions) == 0 || bestOptionScore == optionScore {
bestOptions = append(bestOptions, maybeBestOption)
bestOptionScore = optionScore
} else if bestOptionScore > optionScore {
bestOptions = []expander.Option{maybeBestOption}
bestOptionScore = optionScore
}
}
return bestOption
return bestOptions
}

// buildPod creates a pod with specified resources.
Expand Down
Loading

0 comments on commit 1ad750b

Please sign in to comment.