Skip to content

Commit

Permalink
refactor: cleanup server logic (#15041)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderbez authored Feb 22, 2023
1 parent df6f25d commit 7f99ad5
Show file tree
Hide file tree
Showing 24 changed files with 289 additions and 227 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Improvements

* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Remove unnecessary sleeps from gRPC and API server initiation. The servers will start and accept requests as soon as they're ready.
* (x/staking) [#14864](https://github.com/cosmos/cosmos-sdk/pull/14864) `create-validator` CLI command now takes a json file as an arg instead of having a bunch of required flags to it.
* (cli) [#14659](https://github.com/cosmos/cosmos-sdk/pull/14659) Added ability to query blocks by either height/hash `simd q block --type=height|hash <height|hash>`.
* (store) [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height.
Expand Down Expand Up @@ -179,6 +180,11 @@ Ref: https://keepachangelog.com/en/1.0.0/

### API Breaking Changes

* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Refactor how gRPC and API servers are started to remove unnecessary sleeps:
* Remove `ServerStartTime` constant.
* Rename `WaitForQuitSignals` to `ListenForQuitSignals`. Note, this function is no longer blocking. Thus the caller is expected to provide a `context.CancelFunc` which indicates that when a signal is caught, that any spawned processes can gracefully exit.
* `api.Server#Start` now accepts a `context.Context`. The caller is responsible for ensuring that the context is canceled such that the API server can gracefully exit. The caller does not need to stop the server.
* To start the gRPC server you must first create the server via `NewGRPCServer`, after which you can start the gRPC server via `StartGRPCServer` which accepts a `context.Context`. The caller is responsible for ensuring that the context is canceled such that the gRPC server can gracefully exit. The caller does not need to stop the server.
* (types) [#15067](https://github.com/cosmos/cosmos-sdk/pull/15067) Remove deprecated alias from `types/errors`. Use `cosmossdk.io/errors` instead.
* (testutil) [#14991](https://github.com/cosmos/cosmos-sdk/pull/14991) The `testutil/testdata_pulsar` package has moved to `testutil/testdata/testpb`.
* (simapp) [#14977](https://github.com/cosmos/cosmos-sdk/pull/14977) Move simulation helpers functions (`AppStateFn` and `AppStateRandomizedFn`) to `testutil/sims`. These takes an extra genesisState argument which is the default state of the app.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/tendermint/go-amino v0.16.0
golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/sync v0.1.0
google.golang.org/genproto v0.0.0-20230202175211-008b39050e57
google.golang.org/grpc v1.53.0
google.golang.org/protobuf v1.28.2-0.20230208135220-49eaa78c6c9c
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
52 changes: 39 additions & 13 deletions server/api/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -29,11 +30,10 @@ type Server struct {
Router *mux.Router
GRPCGatewayRouter *runtime.ServeMux
ClientCtx client.Context
GRPCSrv *grpc.Server
logger log.Logger
metrics *telemetry.Metrics

GRPCSrv *grpc.Server

logger log.Logger
metrics *telemetry.Metrics
// Start() is blocking and generally called from a separate goroutine.
// Close() can be called asynchronously and access shared memory
// via the listener. Therefore, we sync access to Start and Close with
Expand All @@ -51,6 +51,7 @@ func CustomGRPCHeaderMatcher(key string) (string, bool) {
switch strings.ToLower(key) {
case grpctypes.GRPCBlockHeightHeader:
return grpctypes.GRPCBlockHeightHeader, true

default:
return runtime.DefaultHeaderMatcher(key)
}
Expand Down Expand Up @@ -88,9 +89,12 @@ func New(clientCtx client.Context, logger log.Logger, grpcSrv *grpc.Server) *Ser

// Start starts the API server. Internally, the API server leverages CometBFT's
// JSON RPC server. Configuration options are provided via config.APIConfig
// and are delegated to the CometBFT JSON RPC server. The process is
// non-blocking, so an external signal handler must be used.
func (s *Server) Start(cfg config.Config) error {
// and are delegated to the CometBFT JSON RPC server.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func (s *Server) Start(ctx context.Context, cfg config.Config) error {
s.mtx.Lock()

cmtCfg := tmrpcserver.DefaultConfig()
Expand Down Expand Up @@ -134,13 +138,35 @@ func (s *Server) Start(cfg config.Config) error {
// register grpc-gateway routes (after grpc-web server as the first match is used)
s.Router.PathPrefix("/").Handler(s.GRPCGatewayRouter)

s.logger.Info("starting API server...")
if cfg.API.EnableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
return tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, cmtCfg)
}
errCh := make(chan error)

// Start the API in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func(enableUnsafeCORS bool) {
s.logger.Info("starting API server...", "address", cfg.API.Address)

return tmrpcserver.Serve(s.listener, s.Router, s.logger, cmtCfg)
if enableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, cmtCfg)
} else {
errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, cmtCfg)
}
}(cfg.API.EnableUnsafeCORS)

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the API server.
s.logger.Info("stopping API server...", "address", cfg.API.Address)
return s.Close()

case err := <-errCh:
s.logger.Error("failed to start API server", "err", err)
return err
}
}

// Close closes the API server.
Expand Down
49 changes: 35 additions & 14 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package grpc

import (
"context"
"fmt"
"net"
"time"

"cosmossdk.io/log"
"google.golang.org/grpc"

"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -17,8 +18,9 @@ import (
_ "github.com/cosmos/cosmos-sdk/types/tx/amino" // Import amino.proto file for reflection
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
// NewGRPCServer returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server. See StartGRPCServer.
func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
Expand Down Expand Up @@ -46,39 +48,58 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
for _, m := range clientCtx.TxConfig.SignModeHandler().Modes() {
modes[m.String()] = (int32)(m)
}

return modes
}(),
ChainID: clientCtx.ChainID,
SdkConfig: sdk.GetConfig(),
InterfaceRegistry: clientCtx.InterfaceRegistry,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to register reflection service: %w", err)
}

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)

return grpcSrv, nil
}

// StartGRPCServer starts the provided gRPC server on the address specified in cfg.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error {
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
return nil, err
return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err)
}

errCh := make(chan error)

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
err = grpcSrv.Serve(listener)
if err != nil {
errCh <- fmt.Errorf("failed to serve: %w", err)
}
logger.Info("starting gRPC server...", "address", cfg.Address)
errCh <- grpcSrv.Serve(listener)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case err := <-errCh:
return nil, err
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the gRPC server.
logger.Info("stopping gRPC server...", "address", cfg.Address)
grpcSrv.GracefulStop()

return nil

case <-time.After(types.ServerStartTime):
// assume server started successfully
return grpcSrv, nil
case err := <-errCh:
logger.Error("failed to start gRPC server", "err", err)
return err
}
}
Loading

0 comments on commit 7f99ad5

Please sign in to comment.