Skip to content

Commit

Permalink
Merge pull request #3 from airbnb/es--grpc-expander-plugin
Browse files Browse the repository at this point in the history
Add grpc expander plugin
  • Loading branch information
evansheng authored Dec 4, 2021
2 parents ee91390 + d821603 commit c147ba9
Show file tree
Hide file tree
Showing 47 changed files with 2,217 additions and 666 deletions.
4 changes: 4 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type AutoscalingOptions struct {
EstimatorName string
// ExpanderNames sets the chain of node group expanders to be used in scale up
ExpanderNames string
// GRPCExpanderCert is the location of the cert passed to the gRPC server for TLS when using the gRPC expander
GRPCExpanderCert string
// GRPCExpanderURL is the url of the gRPC server when using the gRPC expander
GRPCExpanderURL string
// IgnoreDaemonSetsUtilization is whether CA will ignore DaemonSet pods when calculating resource utilization for scaling down
IgnoreDaemonSetsUtilization bool
// IgnoreMirrorPodsUtilization is whether CA will ignore Mirror pods when calculating resource utilization for scaling down
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
}
if opts.ExpanderStrategy == nil {
expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","),
opts.CloudProvider, opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace)
expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","), opts.CloudProvider,
opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace, opts.GRPCExpanderCert, opts.GRPCExpanderURL)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/expander/expander.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

var (
// AvailableExpanders is a list of available expander options
AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName, PriorityBasedExpanderName}
AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName, PriorityBasedExpanderName, GRPCExpanderName}
// RandomExpanderName selects a node group at random
RandomExpanderName = "random"
// MostPodsExpanderName selects a node group that fits the most pods
Expand All @@ -36,6 +36,8 @@ var (
PriceBasedExpanderName = "price"
// PriorityBasedExpanderName selects a node group based on a user-configured priorities assigned to group names
PriorityBasedExpanderName = "priority"
// GRPCExpanderName uses the gRPC client expander to call to an external gRPC server to select a node group for scale up
GRPCExpanderName = "grpc"
)

// Option describes an option to expand the cluster.
Expand Down
8 changes: 7 additions & 1 deletion cluster-autoscaler/expander/factory/expander_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,24 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin"
"k8s.io/autoscaler/cluster-autoscaler/expander/mostpods"
"k8s.io/autoscaler/cluster-autoscaler/expander/price"
"k8s.io/autoscaler/cluster-autoscaler/expander/priority"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
"k8s.io/autoscaler/cluster-autoscaler/expander/waste"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/klog/v2"

kube_client "k8s.io/client-go/kubernetes"
)

// ExpanderStrategyFromStrings creates an expander.Strategy according to the names of the expanders passed in
// take in whole opts and access stuff here
func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprovider.CloudProvider,
autoscalingKubeClients *context.AutoscalingKubeClients, kubeClient kube_client.Interface,
configNamespace string) (expander.Strategy, errors.AutoscalerError) {
configNamespace string, GRPCExpanderCert string, GRPCExpanderURL string) (expander.Strategy, errors.AutoscalerError) {
var filters []expander.Filter
seenExpanders := map[string]struct{}{}
strategySeen := false
Expand Down Expand Up @@ -67,6 +70,9 @@ func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprov
stopChannel := make(chan struct{})
lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder))
case expander.GRPCExpanderName:
klog.V(1).Info("GRPC expander chosen")
filters = append(filters, grpcplugin.NewFilter(GRPCExpanderCert, GRPCExpanderURL))
default:
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
}
Expand Down
140 changes: 140 additions & 0 deletions cluster-autoscaler/expander/grpcplugin/grpc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package grpcplugin

import (
"context"
"log"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

type grpcclientstrategy struct {
grpcClient protos.ExpanderClient
}

// NewFilter returns an expansion filter that creates a gRPC client, and calls out to a gRPC server
func NewFilter(expanderCert string, expanderUrl string) expander.Filter {
client := createGRPCClient(expanderCert, expanderUrl)
if client == nil {
return &grpcclientstrategy{grpcClient: nil}
}
return &grpcclientstrategy{grpcClient: client}
}

func createGRPCClient(expanderCert string, expanderUrl string) protos.ExpanderClient {
var dialOpt grpc.DialOption

// if no Cert file specified, use insecure
if expanderCert == "" {
dialOpt = grpc.WithInsecure()
} else {
creds, err := credentials.NewClientTLSFromFile(expanderCert, "")
if err != nil {
log.Fatalf("Failed to create TLS credentials %v", err)
return nil
}
dialOpt = grpc.WithTransportCredentials(creds)
}
klog.V(2).Info("Dialing ", expanderUrl, " dialopt: ", dialOpt)
conn, err := grpc.Dial(expanderUrl, dialOpt)
if err != nil {
log.Fatalf("fail to dial server: %v", err)
return nil
}
return protos.NewExpanderClient(conn)
}

func (g *grpcclientstrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
if g.grpcClient == nil {
log.Fatalf("Incorrect gRPC client config, filtering no options")
return expansionOptions
}

// Transform inputs to gRPC inputs
nodeGroupIDOptionMap := make(map[string]expander.Option)
grpcOptionsSlice := []*protos.Option{}
populateOptionsForGRPC(expansionOptions, nodeGroupIDOptionMap, &grpcOptionsSlice)
grpcNodeInfoMap := make(map[string]*v1.Node)
populateNodeInfoForGRPC(nodeInfo, grpcNodeInfoMap)

// call gRPC server to get BestOption
klog.V(2).Info("GPRC call of best options to server with ", len(nodeGroupIDOptionMap), " options")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeInfoMap: grpcNodeInfoMap})
if err != nil {
klog.V(2).Info("GRPC call timed out, no options filtered")
return expansionOptions
}

if bestOptionsResponse == nil || bestOptionsResponse.Options == nil {
klog.V(2).Info("GRPC returned nil bestOptions, no options filtered")
return expansionOptions
}
// Transform back options slice
options := transformAndSanitizeOptionsFromGRPC(bestOptionsResponse.Options, nodeGroupIDOptionMap)
if options == nil {
klog.V(2).Info("Unable to sanitize GPRC returned bestOptions, no options filtered")
return expansionOptions
}
return options
}

// populateOptionsForGRPC creates a map of nodegroup ID and options, as well as a slice of Options objects for the gRPC call
func populateOptionsForGRPC(expansionOptions []expander.Option, nodeGroupIDOptionMap map[string]expander.Option, grpcOptionsSlice *[]*protos.Option) {
for _, option := range expansionOptions {
nodeGroupIDOptionMap[option.NodeGroup.Id()] = option
*grpcOptionsSlice = append(*grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods))
}
}

// populateNodeInfoForGRPC modifies the nodeInfo object, and replaces it with the v1.Node to pass through grpc
func populateNodeInfoForGRPC(nodeInfos map[string]*schedulerframework.NodeInfo, grpcNodeInfoMap map[string]*v1.Node) {
for nodeId, nodeInfo := range nodeInfos {
grpcNodeInfoMap[nodeId] = nodeInfo.Node()
}
}

func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Option, nodeGroupIDOptionMap map[string]expander.Option) []expander.Option {
var options []expander.Option
for _, option := range bestOptionsResponseOptions {
if option == nil {
klog.Errorf("gRPC server returned nil Option")
return nil
}
if _, ok := nodeGroupIDOptionMap[option.NodeGroupId]; ok {
options = append(options, nodeGroupIDOptionMap[option.NodeGroupId])
} else {
klog.Errorf("gRPC server returned invalid nodeGroup ID: ", option.NodeGroupId)
return nil
}
}
return options
}

func newOptionMessage(nodeGroupId string, nodeCount int32, debug string, pods []*v1.Pod) *protos.Option {
return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods}
}
Loading

0 comments on commit c147ba9

Please sign in to comment.