Skip to content

Commit

Permalink
Merge pull request #6373 from tedli/grpc-timeout
Browse files Browse the repository at this point in the history
allow specifing grpc timeout rather than hardcoded 5 seconds
  • Loading branch information
k8s-ci-robot authored Dec 15, 2023
2 parents 2afb968 + ac01132 commit 5bf33b2
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 99 deletions.
1 change: 1 addition & 0 deletions cluster-autoscaler/cloudprovider/externalgrpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ For the cluster autoscaler parameters, use the `--cloud-provider=externalgrpc` f
| key | path to file containing the tls key, if using mTLS | no | none |
| cert | path to file containing the tls certificate, if using mTLS | no | none |
| cacert | path to file containing the CA certificate, if using mTLS | no | none |
| grpc_timeout | timeout of invoking a grpc call | no | 5s |

The use of mTLS is recommended, since simple, non-authenticated calls to the external gRPC cloud provider service will result in the creation / deletion of nodes.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v2"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -40,16 +39,18 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
klog "k8s.io/klog/v2"
"sigs.k8s.io/yaml"
)

const (
grpcTimeout = 5 * time.Second
defaultGRPCTimeout = 5 * time.Second
)

// externalGrpcCloudProvider implements CloudProvider interface.
type externalGrpcCloudProvider struct {
resourceLimiter *cloudprovider.ResourceLimiter
client protos.CloudProviderClient
grpcTimeout time.Duration

mutex sync.Mutex
nodeGroupForNodeCache map[string]cloudprovider.NodeGroup // used to cache NodeGroupForNode grpc calls. Discarded at each Refresh()
Expand All @@ -73,7 +74,7 @@ func (e *externalGrpcCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
return e.nodeGroupsCache
}
nodeGroups := make([]cloudprovider.NodeGroup, 0)
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
ctx, cancel := context.WithTimeout(context.Background(), e.grpcTimeout)
defer cancel()
klog.V(5).Info("Performing gRPC call NodeGroups")
res, err := e.client.NodeGroups(ctx, &protos.NodeGroupsRequest{})
Expand All @@ -83,11 +84,12 @@ func (e *externalGrpcCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
}
for _, pbNg := range res.GetNodeGroups() {
ng := &NodeGroup{
id: pbNg.Id,
minSize: int(pbNg.MinSize),
maxSize: int(pbNg.MaxSize),
debug: pbNg.Debug,
client: e.client,
id: pbNg.Id,
minSize: int(pbNg.MinSize),
maxSize: int(pbNg.MaxSize),
debug: pbNg.Debug,
client: e.client,
grpcTimeout: e.grpcTimeout,
}
nodeGroups = append(nodeGroups, ng)
}
Expand All @@ -112,7 +114,7 @@ func (e *externalGrpcCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudpro
return ng, nil
}
// perform grpc call
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
ctx, cancel := context.WithTimeout(context.Background(), e.grpcTimeout)
defer cancel()
klog.V(5).Infof("Performing gRPC call NodeGroupForNode for node %v - %v", node.Name, node.Spec.ProviderID)
res, err := e.client.NodeGroupForNode(ctx, &protos.NodeGroupForNodeRequest{
Expand All @@ -127,11 +129,12 @@ func (e *externalGrpcCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudpro
return nil, nil
}
ng := &NodeGroup{
id: pbNg.GetId(),
maxSize: int(pbNg.GetMaxSize()),
minSize: int(pbNg.GetMinSize()),
debug: pbNg.GetDebug(),
client: e.client,
id: pbNg.GetId(),
maxSize: int(pbNg.GetMaxSize()),
minSize: int(pbNg.GetMinSize()),
debug: pbNg.GetDebug(),
client: e.client,
grpcTimeout: e.grpcTimeout,
}
e.nodeGroupForNodeCache[nodeID] = ng
return ng, nil
Expand All @@ -144,12 +147,13 @@ func (e *externalGrpcCloudProvider) HasInstance(node *apiv1.Node) (bool, error)

// pricingModel implements cloudprovider.PricingModel interface.
type pricingModel struct {
client protos.CloudProviderClient
client protos.CloudProviderClient
grpcTimeout time.Duration
}

// NodePrice returns a price of running the given node for a given period of time.
func (m *pricingModel) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) {
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
ctx, cancel := context.WithTimeout(context.Background(), m.grpcTimeout)
defer cancel()
klog.V(5).Infof("Performing gRPC call PricingNodePrice for node %v", node.Name)
start := metav1.NewTime(startTime)
Expand All @@ -173,7 +177,7 @@ func (m *pricingModel) NodePrice(node *apiv1.Node, startTime time.Time, endTime
// PodPrice returns a theoretical minimum price of running a pod for a given
// period of time on a perfectly matching machine.
func (m *pricingModel) PodPrice(pod *apiv1.Pod, startTime time.Time, endTime time.Time) (float64, error) {
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
ctx, cancel := context.WithTimeout(context.Background(), m.grpcTimeout)
defer cancel()
klog.V(5).Infof("Performing gRPC call PricingPodPrice for pod %v", pod.Name)
start := metav1.NewTime(startTime)
Expand Down Expand Up @@ -202,7 +206,8 @@ func (m *pricingModel) PodPrice(pod *apiv1.Pod, startTime time.Time, endTime tim
// by subsequent calls to the pricing model if this is the case.
func (e *externalGrpcCloudProvider) Pricing() (cloudprovider.PricingModel, errors.AutoscalerError) {
return &pricingModel{
client: e.client,
client: e.client,
grpcTimeout: e.grpcTimeout,
}, nil
}

Expand Down Expand Up @@ -234,7 +239,7 @@ func (e *externalGrpcCloudProvider) GPULabel() string {
klog.V(5).Info("Returning cached GPULabel")
return *e.gpuLabelCache
}
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
ctx, cancel := context.WithTimeout(context.Background(), e.grpcTimeout)
defer cancel()
klog.V(5).Info("Performing gRPC call GPULabel")
res, err := e.client.GPULabel(ctx, &protos.GPULabelRequest{})
Expand All @@ -256,7 +261,7 @@ func (e *externalGrpcCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
klog.V(5).Info("Returning cached GetAvailableGPUTypes")
return e.gpuTypesCache
}
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
ctx, cancel := context.WithTimeout(context.Background(), e.grpcTimeout)
defer cancel()
klog.V(5).Info("Performing gRPC call GetAvailableGPUTypes")
res, err := e.client.GetAvailableGPUTypes(ctx, &protos.GetAvailableGPUTypesRequest{})
Expand All @@ -281,7 +286,7 @@ func (e *externalGrpcCloudProvider) GetNodeGpuConfig(node *apiv1.Node) *cloudpro

// Cleanup cleans up open resources before the cloud provider is destroyed, i.e. go routines etc.
func (e *externalGrpcCloudProvider) Cleanup() error {
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
ctx, cancel := context.WithTimeout(context.Background(), e.grpcTimeout)
defer cancel()
klog.V(5).Info("Performing gRPC call Cleanup")
_, err := e.client.Cleanup(ctx, &protos.CleanupRequest{})
Expand All @@ -300,7 +305,7 @@ func (e *externalGrpcCloudProvider) Refresh() error {
e.nodeGroupForNodeCache = make(map[string]cloudprovider.NodeGroup)
e.nodeGroupsCache = nil
e.mutex.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
ctx, cancel := context.WithTimeout(context.Background(), e.grpcTimeout)
defer cancel()
klog.V(5).Info("Performing gRPC call Refresh")
_, err := e.client.Refresh(ctx, &protos.RefreshRequest{})
Expand All @@ -324,30 +329,32 @@ func BuildExternalGrpc(
if err != nil {
klog.Fatalf("Could not open cloud provider configuration file %q: %v", opts.CloudConfig, err)
}
client, err := newExternalGrpcCloudProviderClient(config)
client, grpcTimeout, err := newExternalGrpcCloudProviderClient(config)
if err != nil {
klog.Fatalf("Could not create gRPC client: %v", err)
}
return newExternalGrpcCloudProvider(client, rl)
return newExternalGrpcCloudProvider(client, grpcTimeout, rl)
}

// cloudConfig is the struct hoding the configs to connect to the external cluster autoscaler provider service.
// sigs.k8s.io/yaml actually reads the json tag
type cloudConfig struct {
Address string `yaml:"address"` // external cluster autoscaler provider address of the form "host:port", "host%zone:port", "[host]:port" or "[host%zone]:port"
Key string `yaml:"key"` // path to file containing the tls key
Cert string `yaml:"cert"` // path to file containing the tls certificate
Cacert string `yaml:"cacert"` // path to file containing the CA certificate
Address string `json:"address"` // external cluster autoscaler provider address of the form "host:port", "host%zone:port", "[host]:port" or "[host%zone]:port"
Key string `json:"key"` // path to file containing the tls key
Cert string `json:"cert"` // path to file containing the tls certificate
Cacert string `json:"cacert"` // path to file containing the CA certificate
GRPCTimeout *metav1.Duration `json:"grpc_timeout,omitempty"` // timeout of invoking a grpc call
}

func newExternalGrpcCloudProviderClient(config []byte) (protos.CloudProviderClient, error) {
func newExternalGrpcCloudProviderClient(config []byte) (protos.CloudProviderClient, time.Duration, error) {
var yamlConfig cloudConfig
err := yaml.Unmarshal([]byte(config), &yamlConfig)
if err != nil {
return nil, fmt.Errorf("can't parse YAML: %v", err)
return nil, 0, fmt.Errorf("can't parse YAML: %v", err)
}
host, _, err := net.SplitHostPort(yamlConfig.Address)
if err != nil {
return nil, fmt.Errorf("failed to parse address: %v", err)
return nil, 0, fmt.Errorf("failed to parse address: %v", err)
}
var dialOpt grpc.DialOption
if len(yamlConfig.Cert) == 0 {
Expand All @@ -356,24 +363,24 @@ func newExternalGrpcCloudProviderClient(config []byte) (protos.CloudProviderClie
} else {
certFile, err := ioutil.ReadFile(yamlConfig.Cert)
if err != nil {
return nil, fmt.Errorf("could not open Cert configuration file %q: %v", yamlConfig.Cert, err)
return nil, 0, fmt.Errorf("could not open Cert configuration file %q: %v", yamlConfig.Cert, err)
}
keyFile, err := ioutil.ReadFile(yamlConfig.Key)
if err != nil {
return nil, fmt.Errorf("could not open Key configuration file %q: %v", yamlConfig.Key, err)
return nil, 0, fmt.Errorf("could not open Key configuration file %q: %v", yamlConfig.Key, err)
}
cacertFile, err := ioutil.ReadFile(yamlConfig.Cacert)
if err != nil {
return nil, fmt.Errorf("could not open Cacert configuration file %q: %v", yamlConfig.Cacert, err)
return nil, 0, fmt.Errorf("could not open Cacert configuration file %q: %v", yamlConfig.Cacert, err)
}
cert, err := tls.X509KeyPair(certFile, keyFile)
if err != nil {
return nil, fmt.Errorf("failed to parse cert key pair: %v", err)
return nil, 0, fmt.Errorf("failed to parse cert key pair: %v", err)
}
certPool := x509.NewCertPool()
ok := certPool.AppendCertsFromPEM(cacertFile)
if !ok {
return nil, fmt.Errorf("failed to parse ca: %v", err)
return nil, 0, fmt.Errorf("failed to parse ca: %v", err)
}
transportCreds := credentials.NewTLS(&tls.Config{
ServerName: host,
Expand All @@ -384,15 +391,22 @@ func newExternalGrpcCloudProviderClient(config []byte) (protos.CloudProviderClie
}
conn, err := grpc.Dial(yamlConfig.Address, dialOpt)
if err != nil {
return nil, fmt.Errorf("failed to dial server: %v", err)
return nil, 0, fmt.Errorf("failed to dial server: %v", err)
}
return protos.NewCloudProviderClient(conn), nil
var timeout time.Duration
if gt := yamlConfig.GRPCTimeout; gt != nil {
timeout = gt.Duration
} else {
timeout = defaultGRPCTimeout
}
return protos.NewCloudProviderClient(conn), timeout, nil
}

func newExternalGrpcCloudProvider(client protos.CloudProviderClient, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
func newExternalGrpcCloudProvider(client protos.CloudProviderClient, grpcTimeout time.Duration, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
return &externalGrpcCloudProvider{
resourceLimiter: rl,
client: client,
grpcTimeout: grpcTimeout,
nodeGroupForNodeCache: make(map[string]cloudprovider.NodeGroup),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func TestCloudProvider_NodeGroups(t *testing.T) {
client, m, teardown := setupTest(t)
defer teardown()
c := newExternalGrpcCloudProvider(client, nil)
c := newExternalGrpcCloudProvider(client, defaultGRPCTimeout, nil)

m.On("Refresh", mock.Anything, mock.Anything).Return(&protos.RefreshResponse{}, nil)

Expand Down Expand Up @@ -117,7 +117,7 @@ func TestCloudProvider_NodeGroups(t *testing.T) {
func TestCloudProvider_NodeGroupForNode(t *testing.T) {
client, m, teardown := setupTest(t)
defer teardown()
c := newExternalGrpcCloudProvider(client, nil)
c := newExternalGrpcCloudProvider(client, defaultGRPCTimeout, nil)

m.On("Refresh", mock.Anything, mock.Anything).Return(&protos.RefreshResponse{}, nil)

Expand Down Expand Up @@ -235,7 +235,7 @@ func TestCloudProvider_NodeGroupForNode(t *testing.T) {
func TestCloudProvider_Pricing(t *testing.T) {
client, m, teardown := setupTest(t)
defer teardown()
c := newExternalGrpcCloudProvider(client, nil)
c := newExternalGrpcCloudProvider(client, defaultGRPCTimeout, nil)

model, errPricing := c.Pricing()
assert.NoError(t, errPricing)
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestCloudProvider_Pricing(t *testing.T) {
func TestCloudProvider_GPULabel(t *testing.T) {
client, m, teardown := setupTest(t)
defer teardown()
c := newExternalGrpcCloudProvider(client, nil)
c := newExternalGrpcCloudProvider(client, defaultGRPCTimeout, nil)

m.On("Refresh", mock.Anything, mock.Anything).Return(&protos.RefreshResponse{}, nil)

Expand All @@ -399,7 +399,7 @@ func TestCloudProvider_GPULabel(t *testing.T) {
// test grpc error
client2, m2, teardown2 := setupTest(t)
defer teardown2()
c2 := newExternalGrpcCloudProvider(client2, nil)
c2 := newExternalGrpcCloudProvider(client2, defaultGRPCTimeout, nil)

m2.On("Refresh", mock.Anything, mock.Anything).Return(&protos.RefreshResponse{}, nil)

Expand All @@ -422,7 +422,7 @@ func TestCloudProvider_GPULabel(t *testing.T) {
func TestCloudProvider_GetAvailableGPUTypes(t *testing.T) {
client, m, teardown := setupTest(t)
defer teardown()
c := newExternalGrpcCloudProvider(client, nil)
c := newExternalGrpcCloudProvider(client, defaultGRPCTimeout, nil)

m.On("Refresh", mock.Anything, mock.Anything).Return(&protos.RefreshResponse{}, nil)

Expand Down Expand Up @@ -453,7 +453,7 @@ func TestCloudProvider_GetAvailableGPUTypes(t *testing.T) {
// test no gpu types
client2, m2, teardown2 := setupTest(t)
defer teardown2()
c2 := newExternalGrpcCloudProvider(client2, nil)
c2 := newExternalGrpcCloudProvider(client2, defaultGRPCTimeout, nil)

m2.On(
"GetAvailableGPUTypes", mock.Anything, mock.Anything,
Expand All @@ -469,7 +469,7 @@ func TestCloudProvider_GetAvailableGPUTypes(t *testing.T) {
// test grpc error
client3, m3, teardown3 := setupTest(t)
defer teardown3()
c3 := newExternalGrpcCloudProvider(client3, nil)
c3 := newExternalGrpcCloudProvider(client3, defaultGRPCTimeout, nil)

m3.On(
"GetAvailableGPUTypes", mock.Anything, mock.Anything,
Expand All @@ -491,7 +491,7 @@ func TestCloudProvider_GetAvailableGPUTypes(t *testing.T) {
func TestCloudProvider_Cleanup(t *testing.T) {
client, m, teardown := setupTest(t)
defer teardown()
c := newExternalGrpcCloudProvider(client, nil)
c := newExternalGrpcCloudProvider(client, defaultGRPCTimeout, nil)

// test correct call
m.On(
Expand Down Expand Up @@ -519,7 +519,7 @@ func TestCloudProvider_Cleanup(t *testing.T) {
func TestCloudProvider_Refresh(t *testing.T) {
client, m, teardown := setupTest(t)
defer teardown()
c := newExternalGrpcCloudProvider(client, nil)
c := newExternalGrpcCloudProvider(client, defaultGRPCTimeout, nil)

// test correct call
m.On(
Expand Down
Loading

0 comments on commit 5bf33b2

Please sign in to comment.