From bef8a051e94fdfa3c8c4aae239dbea94bac9045b Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Wed, 25 Oct 2023 10:25:43 -0700 Subject: [PATCH] [CLOB-930] Add ctx.Done() support to shutdown, make start-up wait time configurable, and add support for domain sockets for gRPC server and gRPC web server. ctx.Done() support and was added in 0.50 with https://github.com/cosmos/cosmos-sdk/pull/15041 server start up time was removed in 0.50 with https://github.com/cosmos/cosmos-sdk/pull/15041 --- server/grpc/grpc_web.go | 22 ++++++-- server/grpc/server.go | 25 ++++++--- server/start.go | 107 +++++++++++++++++++++------------------ server/types/app.go | 7 ++- server/util.go | 11 ++-- testutil/network/util.go | 4 +- 6 files changed, 112 insertions(+), 64 deletions(-) diff --git a/server/grpc/grpc_web.go b/server/grpc/grpc_web.go index 99040ae26302..ea42ab094f6d 100644 --- a/server/grpc/grpc_web.go +++ b/server/grpc/grpc_web.go @@ -2,7 +2,9 @@ package grpc import ( "fmt" + "net" "net/http" + "strings" "time" "github.com/improbable-eng/grpc-web/go/grpcweb" @@ -23,16 +25,30 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err ) } + var proto, addr string + parts := strings.SplitN(config.GRPCWeb.Address, "://", 2) + // Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify + // the network to use. + if len(parts) != 2 { + proto = "tcp" + addr = config.GRPCWeb.Address + } else { + proto, addr = parts[0], parts[1] + } + listener, err := net.Listen(proto, addr) + if err != nil { + return nil, err + } + wrappedServer := grpcweb.WrapServer(grpcSrv, options...) grpcWebSrv := &http.Server{ - Addr: config.GRPCWeb.Address, Handler: wrappedServer, ReadHeaderTimeout: 500 * time.Millisecond, } errCh := make(chan error) go func() { - if err := grpcWebSrv.ListenAndServe(); err != nil { + if err := grpcWebSrv.Serve(listener); err != nil { errCh <- fmt.Errorf("[grpc] failed to serve: %w", err) } }() @@ -40,7 +56,7 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err select { case err := <-errCh: return nil, err - case <-time.After(types.ServerStartTime): // assume server started successfully + case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully return grpcWebSrv, nil } } diff --git a/server/grpc/server.go b/server/grpc/server.go index 79a9be3dca24..0def44e1dba8 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -3,6 +3,7 @@ package grpc import ( "fmt" "net" + "strings" "time" "google.golang.org/grpc" @@ -18,7 +19,7 @@ import ( ) // StartGRPCServer starts a gRPC server on the given address. -func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) { +func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, net.Addr, error) { maxSendMsgSize := cfg.MaxSendMsgSize if maxSendMsgSize == 0 { maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize @@ -53,16 +54,26 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config InterfaceRegistry: clientCtx.InterfaceRegistry, }) if err != nil { - return nil, err + return nil, nil, err } // Reflection allows external clients to see what services and methods // the gRPC server exposes. gogoreflection.Register(grpcSrv) - listener, err := net.Listen("tcp", cfg.Address) + var proto, addr string + parts := strings.SplitN(cfg.Address, "://", 2) + // Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify + // the network to use. + if len(parts) != 2 { + proto = "tcp" + addr = cfg.Address + } else { + proto, addr = parts[0], parts[1] + } + listener, err := net.Listen(proto, addr) if err != nil { - return nil, err + return nil, nil, err } errCh := make(chan error) @@ -75,10 +86,10 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config select { case err := <-errCh: - return nil, err + return nil, nil, err - case <-time.After(types.ServerStartTime): + case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully - return grpcSrv, nil + return grpcSrv, listener.Addr(), nil } } diff --git a/server/start.go b/server/start.go index f3f4f657d674..806656906b60 100644 --- a/server/start.go +++ b/server/start.go @@ -3,6 +3,7 @@ package server // DONTCOVER import ( + "context" "errors" "fmt" "net" @@ -141,14 +142,14 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. withTM, _ := cmd.Flags().GetBool(flagWithTendermint) if !withTM { serverCtx.Logger.Info("starting ABCI without Tendermint") - return wrapCPUProfile(serverCtx, func() error { - return startStandAlone(serverCtx, appCreator) + return wrapCPUProfile(cmd.Context(), serverCtx, func() error { + return startStandAlone(cmd.Context(), serverCtx, appCreator) }) } // amino is needed here for backwards compatibility of REST routes - err = wrapCPUProfile(serverCtx, func() error { - return startInProcess(serverCtx, clientCtx, appCreator) + err = wrapCPUProfile(cmd.Context(), serverCtx, func() error { + return startInProcess(cmd.Context(), serverCtx, clientCtx, appCreator) }) errCode, ok := err.(ErrorCode) if !ok { @@ -206,7 +207,7 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. return cmd } -func startStandAlone(ctx *Context, appCreator types.AppCreator) error { +func startStandAlone(parentCtx context.Context, ctx *Context, appCreator types.AppCreator) error { addr := ctx.Viper.GetString(flagAddress) transport := ctx.Viper.GetString(flagTransport) home := ctx.Viper.GetString(flags.FlagHome) @@ -260,10 +261,10 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error { }() // Wait for SIGINT or SIGTERM signal - return WaitForQuitSignals() + return WaitForQuitSignals(parentCtx) } -func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error { +func startInProcess(parentCtx context.Context, ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error { cfg := ctx.Config home := cfg.RootDir @@ -354,6 +355,32 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App return err } + var ( + grpcSrv *grpc.Server + grpcSrvAddr net.Addr + grpcWebSrv *http.Server + ) + + if config.GRPC.Enable { + grpcSrv, grpcSrvAddr, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC) + if err != nil { + return err + } + defer grpcSrv.Stop() + if config.GRPCWeb.Enable { + grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config) + if err != nil { + ctx.Logger.Error("failed to start grpc-web http server: ", err) + return err + } + defer func() { + if err := grpcWebSrv.Close(); err != nil { + ctx.Logger.Error("failed to close grpc-web http server: ", err) + } + }() + } + } + var apiSrv *api.Server if config.API.Enable { genDoc, err := genDocProvider() @@ -364,11 +391,6 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID) if config.GRPC.Enable { - _, port, err := net.SplitHostPort(config.GRPC.Address) - if err != nil { - return err - } - maxSendMsgSize := config.GRPC.MaxSendMsgSize if maxSendMsgSize == 0 { maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize @@ -379,11 +401,10 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize } - grpcAddress := fmt.Sprintf("127.0.0.1:%s", port) - + grpcSrvAddrString := fmt.Sprintf("%s://%s", grpcSrvAddr.Network(), grpcSrvAddr.String()) // If grpc is enabled, configure grpc client for grpc gateway. grpcClient, err := grpc.Dial( - grpcAddress, + grpcSrvAddrString, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), @@ -396,7 +417,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App } clientCtx = clientCtx.WithGRPCClient(grpcClient) - ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress) + ctx.Logger.Debug("grpc client assigned to client context", "target", grpcSrvAddrString) } apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server")) @@ -416,40 +437,30 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App case err := <-errCh: return err - case <-time.After(types.ServerStartTime): // assume server started successfully - } - } - - var ( - grpcSrv *grpc.Server - grpcWebSrv *http.Server - ) - - if config.GRPC.Enable { - grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC) - if err != nil { - return err - } - defer grpcSrv.Stop() - if config.GRPCWeb.Enable { - grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config) - if err != nil { - ctx.Logger.Error("failed to start grpc-web http server: ", err) - return err - } - defer func() { - if err := grpcWebSrv.Close(); err != nil { - ctx.Logger.Error("failed to close grpc-web http server: ", err) - } - }() + case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully } } // At this point it is safe to block the process if we're in gRPC only mode as // we do not need to start Rosetta or handle any Tendermint related processes. if gRPCOnly { + // Fix application shutdown + defer func() { + _ = app.Close() + + if traceWriterCleanup != nil { + traceWriterCleanup() + } + + if apiSrv != nil { + _ = apiSrv.Close() + } + + ctx.Logger.Info("exiting...") + }() + // wait for signal capture and gracefully return - return WaitForQuitSignals() + return WaitForQuitSignals(parentCtx) } var rosettaSrv crgserver.Server @@ -498,7 +509,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App case err := <-errCh: return err - case <-time.After(types.ServerStartTime): // assume server started successfully + case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully } } @@ -520,7 +531,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App }() // wait for signal capture and gracefully return - return WaitForQuitSignals() + return WaitForQuitSignals(parentCtx) } func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { @@ -531,7 +542,7 @@ func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { } // wrapCPUProfile runs callback in a goroutine, then wait for quit signals. -func wrapCPUProfile(ctx *Context, callback func() error) error { +func wrapCPUProfile(parentCtx context.Context, ctx *Context, callback func() error) error { if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" { f, err := os.Create(cpuProfile) if err != nil { @@ -561,8 +572,8 @@ func wrapCPUProfile(ctx *Context, callback func() error) error { case err := <-errCh: return err - case <-time.After(types.ServerStartTime): + case <-time.After(time.Duration(types.ServerStartTime.Load())): } - return WaitForQuitSignals() + return WaitForQuitSignals(parentCtx) } diff --git a/server/types/app.go b/server/types/app.go index 727f767fc35e..21da074564b9 100644 --- a/server/types/app.go +++ b/server/types/app.go @@ -3,6 +3,7 @@ package types import ( "encoding/json" "io" + "sync/atomic" "time" dbm "github.com/cometbft/cometbft-db" @@ -22,7 +23,11 @@ import ( // ServerStartTime defines the time duration that the server need to stay running after startup // for the startup be considered successful -const ServerStartTime = 5 * time.Second +var ServerStartTime = atomic.Int64{} + +func init() { + ServerStartTime.Add(int64(5 * time.Second)) +} type ( // AppOptions defines an interface that is passed into an application diff --git a/server/util.go b/server/util.go index 58323b0d2108..7e4e16983d01 100644 --- a/server/util.go +++ b/server/util.go @@ -1,6 +1,7 @@ package server import ( + "context" "errors" "fmt" "io" @@ -381,11 +382,15 @@ func TrapSignal(cleanupFunc func()) { } // WaitForQuitSignals waits for SIGINT and SIGTERM and returns. -func WaitForQuitSignals() ErrorCode { +func WaitForQuitSignals(ctx context.Context) error { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - sig := <-sigs - return ErrorCode{Code: int(sig.(syscall.Signal)) + 128} + select { + case sig := <-sigs: + return ErrorCode{Code: int(sig.(syscall.Signal)) + 128} + case <-ctx.Done(): + return nil + } } // GetAppDBBackend gets the backend type to use for the application DBs. diff --git a/testutil/network/util.go b/testutil/network/util.go index b7e7e7475acd..eab0c9af160a 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -89,14 +89,14 @@ func startInProcess(cfg Config, val *Validator) error { select { case err := <-errCh: return err - case <-time.After(srvtypes.ServerStartTime): // assume server started successfully + case <-time.After(time.Duration(srvtypes.ServerStartTime.Load())): // assume server started successfully } val.api = apiSrv } if val.AppConfig.GRPC.Enable { - grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC) + grpcSrv, _, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC) if err != nil { return err }