diff --git a/cmd/bio-rd/main.go b/cmd/bio-rd/main.go index 0918f701..31de7c69 100644 --- a/cmd/bio-rd/main.go +++ b/cmd/bio-rd/main.go @@ -16,16 +16,18 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) var ( - configFilePath = flag.String("config.file", "bio-rd.yml", "bio-rd config file") - apiPort = flag.Uint("api_port", 5566, "API server port") - metricsPort = flag.Uint("metrics_port", 55667, "Metrics HTTP server port") - sigHUP = make(chan os.Signal) - vrfReg = vrf.NewVRFRegistry() - bgpSrv bgpserver.BGPServer - runCfg *config.Config + configFilePath = flag.String("config.file", "bio-rd.yml", "bio-rd config file") + grpcPort = flag.Uint("grpc_port", 5566, "GRPC API server port") + grpcKeepaliveMinTime = flag.Uint("grpc_keepalive_min_time", 1, "Minimum time (seconds) for a client to wait between GRPC keepalive pings") + metricsPort = flag.Uint("metrics_port", 55667, "Metrics HTTP server port") + sigHUP = make(chan os.Signal) + vrfReg = vrf.NewVRFRegistry() + bgpSrv bgpserver.BGPServer + runCfg *config.Config ) func main() { @@ -60,10 +62,14 @@ func main() { unaryInterceptors := []grpc.UnaryServerInterceptor{} streamInterceptors := []grpc.StreamServerInterceptor{} srv, err := servicewrapper.New( - uint16(*apiPort), + uint16(*grpcPort), servicewrapper.HTTP(uint16(*metricsPort)), unaryInterceptors, streamInterceptors, + keepalive.EnforcementPolicy{ + MinTime: time.Duration(*grpcKeepaliveMinTime) * time.Second, + PermitWithoutStream: true, + }, ) if err != nil { log.Errorf("failed to listen: %v", err) diff --git a/cmd/ris/main.go b/cmd/ris/main.go index 69d9d9a2..75b1f903 100644 --- a/cmd/ris/main.go +++ b/cmd/ris/main.go @@ -4,6 +4,7 @@ import ( "flag" "net" "os" + "time" "google.golang.org/grpc" @@ -12,6 +13,7 @@ import ( "github.com/bio-routing/bio-rd/protocols/bgp/server" "github.com/bio-routing/bio-rd/util/servicewrapper" "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc/keepalive" pb "github.com/bio-routing/bio-rd/cmd/ris/api" prom_bmp "github.com/bio-routing/bio-rd/metrics/bmp/adapter/prom" @@ -19,9 +21,10 @@ import ( ) var ( - grpcPort = flag.Uint("grpc_port", 4321, "gRPC server port") - httpPort = flag.Uint("http_port", 4320, "HTTP server port") - configFilePath = flag.String("config.file", "ris_config.yml", "Configuration file") + grpcPort = flag.Uint("grpc_port", 4321, "gRPC server port") + httpPort = flag.Uint("http_port", 4320, "HTTP server port") + grpcKeepaliveMinTime = flag.Uint("grpc_keepalive_min_time", 1, "Minimum time (seconds) for a client to wait between GRPC keepalive pings") + configFilePath = flag.String("config.file", "ris_config.yml", "Configuration file") ) func main() { @@ -53,6 +56,10 @@ func main() { servicewrapper.HTTP(uint16(*httpPort)), unaryInterceptors, streamInterceptors, + keepalive.EnforcementPolicy{ + MinTime: time.Duration(*grpcKeepaliveMinTime) * time.Second, + PermitWithoutStream: true, + }, ) if err != nil { log.Errorf("failed to listen: %v", err) diff --git a/util/servicewrapper/grpc.go b/util/servicewrapper/grpc.go index d435c0d3..8eefc4a8 100644 --- a/util/servicewrapper/grpc.go +++ b/util/servicewrapper/grpc.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" log "github.com/sirupsen/logrus" @@ -57,7 +58,7 @@ type grpcSrv struct { } // New creates a new exarpc server wrapper -func New(grpcPort uint16, h *http.Server, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor) (*Server, error) { +func New(grpcPort uint16, h *http.Server, unaryInterceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, keepalivePol keepalive.EnforcementPolicy) (*Server, error) { s := &Server{ grpcSrv: &grpcSrv{port: grpcPort}, httpSrv: h, @@ -79,10 +80,13 @@ func New(grpcPort uint16, h *http.Server, unaryInterceptors []grpc.UnaryServerIn grpc_recovery.StreamServerInterceptor(), grpc_logrus.StreamServerInterceptor(logrusEntry, levelOpt), ) - unaryOpts := grpc_middleware.WithUnaryServerChain(unaryInterceptors...) - streamOpts := grpc_middleware.WithStreamServerChain(streamInterceptors...) - s.grpcSrv.srv = grpc.NewServer(unaryOpts, streamOpts) + opts := make([]grpc.ServerOption, 0) + opts = append(opts, grpc_middleware.WithUnaryServerChain(unaryInterceptors...)) + opts = append(opts, grpc_middleware.WithStreamServerChain(streamInterceptors...)) + opts = append(opts, grpc.KeepaliveEnforcementPolicy(keepalivePol)) + + s.grpcSrv.srv = grpc.NewServer(opts...) reflection.Register(s.grpcSrv.srv) grpc_prometheus.Register(s.grpcSrv.srv) grpc_prometheus.EnableClientHandlingTimeHistogram()