From bc286e7e0900f137b5871e7a9d34256c3c43ba2e Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 8 Jan 2025 17:12:45 -0800 Subject: [PATCH] [ENH] Add flags to the go binary for concurrency and stream processing. By default, most grpc implementations allow up to 100 concurrent streams. We have 200 concurrent requests, basically guaranteeing queueing. Expose the two gRPC options in Go that will allow us to tune. --- go/pkg/grpcutils/config.go | 3 +++ go/pkg/grpcutils/service.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/go/pkg/grpcutils/config.go b/go/pkg/grpcutils/config.go index 15ed30dbd32..beb0d7b3376 100644 --- a/go/pkg/grpcutils/config.go +++ b/go/pkg/grpcutils/config.go @@ -4,6 +4,9 @@ type GrpcConfig struct { // BindAddress is the address to bind the GRPC server to. BindAddress string + MaxConcurrentStreams uint32 + NumStreamWorkers uint32 + // GRPC mTLS config CertPath string KeyPath string diff --git a/go/pkg/grpcutils/service.go b/go/pkg/grpcutils/service.go index 3fd15919649..43f6bf68a2d 100644 --- a/go/pkg/grpcutils/service.go +++ b/go/pkg/grpcutils/service.go @@ -49,6 +49,8 @@ type defaultGrpcServer struct { func newDefaultGrpcProvider(name string, grpcConfig *GrpcConfig, registerFunc func(grpc.ServiceRegistrar)) (GrpcServer, error) { var opts []grpc.ServerOption opts = append(opts, grpc.MaxRecvMsgSize(maxGrpcFrameSize)) + opts = append(opts, grpc.NumStreamWorkers(grpcConfig.NumStreamWorkers)) + opts = append(opts, grpc.MaxConcurrentStreams(grpcConfig.MaxConcurrentStreams)) if grpcConfig.MTLSEnabled() { cert, err := tls.LoadX509KeyPair(grpcConfig.CertPath, grpcConfig.KeyPath) if err != nil {