Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLOB-930] Add ctx.Done() support to shutdown, make start-up wait time configurable, and add support for domain sockets for gRPC server and gRPC web server. #29

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions server/grpc/grpc_web.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package grpc

import (
"fmt"
"net"
"net/http"
"strings"
"time"

"github.com/improbable-eng/grpc-web/go/grpcweb"
Expand All @@ -23,24 +25,38 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err
)
}

var proto, addr string
parts := strings.SplitN(config.GRPCWeb.Address, "://", 2)
// Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify
// the network to use.
if len(parts) != 2 {
proto = "tcp"
addr = config.GRPCWeb.Address
} else {
proto, addr = parts[0], parts[1]
}
listener, err := net.Listen(proto, addr)
if err != nil {
return nil, err
}

wrappedServer := grpcweb.WrapServer(grpcSrv, options...)
grpcWebSrv := &http.Server{
Addr: config.GRPCWeb.Address,
Handler: wrappedServer,
ReadHeaderTimeout: 500 * time.Millisecond,
}

errCh := make(chan error)
go func() {
if err := grpcWebSrv.ListenAndServe(); err != nil {
if err := grpcWebSrv.Serve(listener); err != nil {
errCh <- fmt.Errorf("[grpc] failed to serve: %w", err)
}
}()

select {
case err := <-errCh:
return nil, err
case <-time.After(types.ServerStartTime): // assume server started successfully
case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully
return grpcWebSrv, nil
}
}
25 changes: 18 additions & 7 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"fmt"
"net"
"strings"
"time"

"google.golang.org/grpc"
Expand All @@ -18,7 +19,7 @@ import (
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, net.Addr, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
Expand Down Expand Up @@ -53,16 +54,26 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
InterfaceRegistry: clientCtx.InterfaceRegistry,
})
if err != nil {
return nil, err
return nil, nil, err
}

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)

listener, err := net.Listen("tcp", cfg.Address)
var proto, addr string
parts := strings.SplitN(cfg.Address, "://", 2)
// Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify
// the network to use.
if len(parts) != 2 {
proto = "tcp"
addr = cfg.Address
} else {
proto, addr = parts[0], parts[1]
}
listener, err := net.Listen(proto, addr)
if err != nil {
return nil, err
return nil, nil, err
}

errCh := make(chan error)
Expand All @@ -75,10 +86,10 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config

select {
case err := <-errCh:
return nil, err
return nil, nil, err

case <-time.After(types.ServerStartTime):
case <-time.After(time.Duration(types.ServerStartTime.Load())):
// assume server started successfully
return grpcSrv, nil
return grpcSrv, listener.Addr(), nil
}
}
107 changes: 59 additions & 48 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
// DONTCOVER

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -141,14 +142,14 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
withTM, _ := cmd.Flags().GetBool(flagWithTendermint)
if !withTM {
serverCtx.Logger.Info("starting ABCI without Tendermint")
return wrapCPUProfile(serverCtx, func() error {
return startStandAlone(serverCtx, appCreator)
return wrapCPUProfile(cmd.Context(), serverCtx, func() error {
return startStandAlone(cmd.Context(), serverCtx, appCreator)
})
}

// amino is needed here for backwards compatibility of REST routes
err = wrapCPUProfile(serverCtx, func() error {
return startInProcess(serverCtx, clientCtx, appCreator)
err = wrapCPUProfile(cmd.Context(), serverCtx, func() error {
return startInProcess(cmd.Context(), serverCtx, clientCtx, appCreator)
})
errCode, ok := err.(ErrorCode)
if !ok {
Expand Down Expand Up @@ -206,7 +207,7 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
return cmd
}

func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
func startStandAlone(parentCtx context.Context, ctx *Context, appCreator types.AppCreator) error {
addr := ctx.Viper.GetString(flagAddress)
transport := ctx.Viper.GetString(flagTransport)
home := ctx.Viper.GetString(flags.FlagHome)
Expand Down Expand Up @@ -260,10 +261,10 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
}()

// Wait for SIGINT or SIGTERM signal
return WaitForQuitSignals()
return WaitForQuitSignals(parentCtx)
}

func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
func startInProcess(parentCtx context.Context, ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
cfg := ctx.Config
home := cfg.RootDir

Expand Down Expand Up @@ -354,6 +355,32 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
return err
}

var (
grpcSrv *grpc.Server
grpcSrvAddr net.Addr
grpcWebSrv *http.Server
)

if config.GRPC.Enable {
grpcSrv, grpcSrvAddr, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC)
if err != nil {
return err
}
defer grpcSrv.Stop()
if config.GRPCWeb.Enable {
grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config)
if err != nil {
ctx.Logger.Error("failed to start grpc-web http server: ", err)
return err
}
defer func() {
if err := grpcWebSrv.Close(); err != nil {
ctx.Logger.Error("failed to close grpc-web http server: ", err)
}
}()
}
}

var apiSrv *api.Server
if config.API.Enable {
genDoc, err := genDocProvider()
Expand All @@ -364,11 +391,6 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID)

if config.GRPC.Enable {
_, port, err := net.SplitHostPort(config.GRPC.Address)
if err != nil {
return err
}

maxSendMsgSize := config.GRPC.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
Expand All @@ -379,11 +401,10 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}

grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)

grpcSrvAddrString := fmt.Sprintf("%s://%s", grpcSrvAddr.Network(), grpcSrvAddr.String())
// If grpc is enabled, configure grpc client for grpc gateway.
grpcClient, err := grpc.Dial(
grpcAddress,
grpcSrvAddrString,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
Expand All @@ -396,7 +417,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress)
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcSrvAddrString)
}

apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server"))
Expand All @@ -416,40 +437,30 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
case err := <-errCh:
return err

case <-time.After(types.ServerStartTime): // assume server started successfully
}
}

var (
grpcSrv *grpc.Server
grpcWebSrv *http.Server
)

if config.GRPC.Enable {
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC)
if err != nil {
return err
}
defer grpcSrv.Stop()
if config.GRPCWeb.Enable {
grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config)
if err != nil {
ctx.Logger.Error("failed to start grpc-web http server: ", err)
return err
}
defer func() {
if err := grpcWebSrv.Close(); err != nil {
ctx.Logger.Error("failed to close grpc-web http server: ", err)
}
}()
case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully
}
}

// At this point it is safe to block the process if we're in gRPC only mode as
// we do not need to start Rosetta or handle any Tendermint related processes.
if gRPCOnly {
// Fix application shutdown
defer func() {
_ = app.Close()

if traceWriterCleanup != nil {
traceWriterCleanup()
}

if apiSrv != nil {
_ = apiSrv.Close()
}

ctx.Logger.Info("exiting...")
}()

// wait for signal capture and gracefully return
return WaitForQuitSignals()
return WaitForQuitSignals(parentCtx)
}

var rosettaSrv crgserver.Server
Expand Down Expand Up @@ -498,7 +509,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
case err := <-errCh:
return err

case <-time.After(types.ServerStartTime): // assume server started successfully
case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully
}
}

Expand All @@ -520,7 +531,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
}()

// wait for signal capture and gracefully return
return WaitForQuitSignals()
return WaitForQuitSignals(parentCtx)
}

func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
Expand All @@ -531,7 +542,7 @@ func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
}

// wrapCPUProfile runs callback in a goroutine, then wait for quit signals.
func wrapCPUProfile(ctx *Context, callback func() error) error {
func wrapCPUProfile(parentCtx context.Context, ctx *Context, callback func() error) error {
if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" {
f, err := os.Create(cpuProfile)
if err != nil {
Expand Down Expand Up @@ -561,8 +572,8 @@ func wrapCPUProfile(ctx *Context, callback func() error) error {
case err := <-errCh:
return err

case <-time.After(types.ServerStartTime):
case <-time.After(time.Duration(types.ServerStartTime.Load())):
}

return WaitForQuitSignals()
return WaitForQuitSignals(parentCtx)
}
7 changes: 6 additions & 1 deletion server/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"encoding/json"
"io"
"sync/atomic"
"time"

dbm "github.com/cometbft/cometbft-db"
Expand All @@ -22,7 +23,11 @@ import (

// ServerStartTime defines the time duration that the server need to stay running after startup
// for the startup be considered successful
const ServerStartTime = 5 * time.Second
var ServerStartTime = atomic.Int64{}

func init() {
ServerStartTime.Add(int64(5 * time.Second))
}

type (
// AppOptions defines an interface that is passed into an application
Expand Down
11 changes: 8 additions & 3 deletions server/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -381,11 +382,15 @@ func TrapSignal(cleanupFunc func()) {
}

// WaitForQuitSignals waits for SIGINT and SIGTERM and returns.
func WaitForQuitSignals() ErrorCode {
func WaitForQuitSignals(ctx context.Context) error {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
return ErrorCode{Code: int(sig.(syscall.Signal)) + 128}
select {
case sig := <-sigs:
return ErrorCode{Code: int(sig.(syscall.Signal)) + 128}
case <-ctx.Done():
return nil
}
}

// GetAppDBBackend gets the backend type to use for the application DBs.
Expand Down
4 changes: 2 additions & 2 deletions testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ func startInProcess(cfg Config, val *Validator) error {
select {
case err := <-errCh:
return err
case <-time.After(srvtypes.ServerStartTime): // assume server started successfully
case <-time.After(time.Duration(srvtypes.ServerStartTime.Load())): // assume server started successfully
}

val.api = apiSrv
}

if val.AppConfig.GRPC.Enable {
grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC)
grpcSrv, _, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC)
if err != nil {
return err
}
Expand Down
Loading