From fd1394ef32b78a82b08069d6843df53e667b0cfa Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik Date: Wed, 30 Nov 2022 13:31:49 +0100 Subject: [PATCH] grpc client: wait properly for establishing a connection Signed-off-by: Zbynek Roubalik --- .../{indentifier.go => identifier.go} | 0 pkg/metricsservice/client.go | 31 +++++++++++++++-- pkg/provider/provider.go | 33 +++++++++++-------- 3 files changed, 49 insertions(+), 15 deletions(-) rename apis/keda/v1alpha1/{indentifier.go => identifier.go} (100%) diff --git a/apis/keda/v1alpha1/indentifier.go b/apis/keda/v1alpha1/identifier.go similarity index 100% rename from apis/keda/v1alpha1/indentifier.go rename to apis/keda/v1alpha1/identifier.go diff --git a/pkg/metricsservice/client.go b/pkg/metricsservice/client.go index 61d75bc538e..b6d20d4f478 100644 --- a/pkg/metricsservice/client.go +++ b/pkg/metricsservice/client.go @@ -19,8 +19,11 @@ package metricsservice import ( "context" "fmt" + "time" + "github.com/go-logr/logr" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "k8s.io/metrics/pkg/apis/external_metrics" "k8s.io/metrics/pkg/apis/external_metrics/v1beta1" @@ -29,7 +32,8 @@ import ( ) type GrpcClient struct { - client api.MetricsServiceClient + client api.MetricsServiceClient + connection *grpc.ClientConn } func NewGrpcClient(url string) (*GrpcClient, error) { @@ -51,7 +55,7 @@ func NewGrpcClient(url string) (*GrpcClient, error) { return nil, err } - return &GrpcClient{client: api.NewMetricsServiceClient(conn)}, nil + return &GrpcClient{client: api.NewMetricsServiceClient(conn), connection: conn}, nil } func (c *GrpcClient) GetMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, *api.PromMetricsMsg, error) { @@ -68,3 +72,26 @@ func (c *GrpcClient) GetMetrics(ctx context.Context, scaledObjectName, scaledObj return extMetrics, response.GetPromMetrics(), nil } + +// WaitForConnectionReady waits for gRPC connection to be ready +// returns true if the connection was successful, false if we hit a timeut from context +func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, logger logr.Logger) bool { + currentState := c.connection.GetState() + if currentState != connectivity.Ready { + logger.Info("Waiting for establishing a gRPC connection to KEDA Metrics Server") + for { + select { + case <-ctx.Done(): + return false + default: + c.connection.Connect() + time.Sleep(500 * time.Millisecond) + currentState := c.connection.GetState() + if currentState == connectivity.Ready { + return true + } + } + } + } + return true +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index e0e71340509..c4d283284fe 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -92,6 +92,11 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, // Get Metrics from Metrics Service gRPC Server if p.useMetricsServiceGrpc { + if !p.grpcClient.WaitForConnectionReady(ctx, logger) { + logger.Error(fmt.Errorf("timeout while waiting to establish gRPC connection to KEDA Metrics Server"), "timeout") + return nil, err + } + // selector is in form: `scaledobject.keda.sh/name: scaledobject-name` scaledObjectName := selector.Get("scaledobject.keda.sh/name") @@ -99,20 +104,22 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, logger.V(1).WithValues("scaledObjectName", scaledObjectName, "scaledObjectNamespace", namespace, "metrics", metrics).Info("Receiving metrics") // [DEPRECATED] handle exporting Prometheus metrics from Operator to Metrics Server - var scaledObjectErr error - if promMetrics.ScaledObjectErr { - scaledObjectErr = fmt.Errorf("scaledObject error") - } - promMetricsServer.RecordScaledObjectError(namespace, scaledObjectName, scaledObjectErr) - for _, scalerMetric := range promMetrics.ScalerMetric { - promMetricsServer.RecordHPAScalerMetric(namespace, scaledObjectName, scalerMetric.ScalerName, int(scalerMetric.ScalerIndex), scalerMetric.MetricName, float64(scalerMetric.MetricValue)) - } - for _, scalerError := range promMetrics.ScalerError { - var scalerErr error - if scalerError.Error { - scalerErr = fmt.Errorf("scaler error") + if promMetrics != nil { + var scaledObjectErr error + if promMetrics.ScaledObjectErr { + scaledObjectErr = fmt.Errorf("scaledObject error") + } + promMetricsServer.RecordScaledObjectError(namespace, scaledObjectName, scaledObjectErr) + for _, scalerMetric := range promMetrics.ScalerMetric { + promMetricsServer.RecordHPAScalerMetric(namespace, scaledObjectName, scalerMetric.ScalerName, int(scalerMetric.ScalerIndex), scalerMetric.MetricName, float64(scalerMetric.MetricValue)) + } + for _, scalerError := range promMetrics.ScalerError { + var scalerErr error + if scalerError.Error { + scalerErr = fmt.Errorf("scaler error") + } + promMetricsServer.RecordHPAScalerError(namespace, scaledObjectName, scalerError.ScalerName, int(scalerError.ScalerIndex), scalerError.MetricName, scalerErr) } - promMetricsServer.RecordHPAScalerError(namespace, scaledObjectName, scalerError.ScalerName, int(scalerError.ScalerIndex), scalerError.MetricName, scalerErr) } return metrics, err