Skip to content

Commit

Permalink
refactor: More cleanup of server/start.go (#16238)
Browse files Browse the repository at this point in the history
  • Loading branch information
ValarDragon authored May 22, 2023
1 parent c8ab555 commit 38f27e3
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 89 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (store) [#16067](https://github.com/cosmos/cosmos-sdk/pull/16067) Add local snapshots management commands.
* (server) [#16142](https://github.com/cosmos/cosmos-sdk/pull/16142) Remove JSON Indentation from the GRPC to REST gateway's responses. (Saving bandwidth)
* (baseapp) [#16193](https://github.com/cosmos/cosmos-sdk/pull/16193) Add `Close` method to `BaseApp` for custom app to cleanup resource in graceful shutdown.
* (server) [#16238](https://github.com/cosmos/cosmos-sdk/pull/16238) Don't setup p2p node keys if starting a node in GRPC only mode.

### State Machine Breaking

Expand Down
162 changes: 73 additions & 89 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,10 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
withCMT, _ := cmd.Flags().GetBool(flagWithComet)
if !withCMT {
serverCtx.Logger.Info("starting ABCI without CometBFT")

return wrapCPUProfile(serverCtx, func() error {
return startStandAlone(serverCtx, appCreator, opts)
})
}

return wrapCPUProfile(serverCtx, func() error {
return startInProcess(serverCtx, clientCtx, appCreator, opts)
return start(serverCtx, clientCtx, appCreator, withCMT, opts)
})
},
}
Expand Down Expand Up @@ -230,40 +226,34 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
return cmd
}

func startStandAlone(svrCtx *Context, appCreator types.AppCreator, opts StartCmdOptions) error {
addr := svrCtx.Viper.GetString(flagAddress)
transport := svrCtx.Viper.GetString(flagTransport)
home := svrCtx.Viper.GetString(flags.FlagHome)

db, err := opts.DBOpener(home, GetAppDBBackend(svrCtx.Viper))
func start(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator, withCmt bool, opts StartCmdOptions) error {
svrCfg, err := getAndValidateConfig(svrCtx)
if err != nil {
return err
}

// TODO: Should we be using startTraceServer, and defer closing the traceWriter?
// right now its left unclosed
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err := openTraceWriter(traceWriterFile)
app, appCleanupFn, err := startApp(svrCtx, appCreator, opts)
if err != nil {
return err
}
defer appCleanupFn()

app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)

config, err := serverconfig.GetConfig(svrCtx.Viper)
metrics, err := startTelemetry(svrCfg)
if err != nil {
return err
}

if err := config.ValidateBasic(); err != nil {
return err
}
emitServerInfoMetrics()

if _, err := startTelemetry(config); err != nil {
return err
if !withCmt {
return startStandAlone(svrCtx, app, opts)
}
return startInProcess(svrCtx, svrCfg, clientCtx, app, metrics, opts)
}

emitServerInfoMetrics()
func startStandAlone(svrCtx *Context, app types.Application, opts StartCmdOptions) error {
addr := svrCtx.Viper.GetString(flagAddress)
transport := svrCtx.Viper.GetString(flagTransport)

svr, err := server.NewServer(addr, transport, app)
if err != nil {
Expand All @@ -272,12 +262,9 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator, opts StartCmd

svr.SetLogger(servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger.With("module", "abci-server")})

ctx, cancelFn := context.WithCancel(context.Background())
ctx := getCtx(svrCtx)
g, ctx := errgroup.WithContext(ctx)

// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(cancelFn, svrCtx.Logger)

g.Go(func() error {
if err := svr.Start(); err != nil {
svrCtx.Logger.Error("failed to start out-of-process ABCI server", "err", err)
Expand All @@ -294,47 +281,25 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator, opts StartCmd
return g.Wait()
}

func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator, opts StartCmdOptions) error {
func startInProcess(svrCtx *Context, svrCfg serverconfig.Config, clientCtx client.Context, app types.Application,
metrics *telemetry.Metrics, opts StartCmdOptions,
) error {
cmtCfg := svrCtx.Config
home := cmtCfg.RootDir

db, err := opts.DBOpener(home, GetAppDBBackend(svrCtx.Viper))
if err != nil {
return err
}

svrCfg, err := getAndValidateConfig(svrCtx)
if err != nil {
return err
}

traceWriter, traceWriterCleanup, err := setupTraceWriter(svrCtx)
if err != nil {
return err
}

app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)

// TODO: Move this to only be done if were launching the node. (So not in GRPC-only mode)
nodeKey, err := p2p.LoadOrGenNodeKey(cmtCfg.NodeKeyFile())
if err != nil {
return err
}

var (
tmNode *node.Node
gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly)
)
gRPCOnly := svrCtx.Viper.GetBool(flagGRPCOnly)

if gRPCOnly {
// TODO: Generalize logic so that gRPC only is really in startStandAlone
svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
svrCfg.GRPC.Enable = true
} else {
svrCtx.Logger.Info("starting node with ABCI CometBFT in-process")
tmNode, err = startCmtNode(cmtCfg, nodeKey, app, svrCtx)
tmNode, cleanupFn, err := startCmtNode(cmtCfg, app, svrCtx)
if err != nil {
return err
}
defer cleanupFn()

// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC is enabled, and avoid doing so in the general
Expand All @@ -350,19 +315,9 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
}
}

metrics, err := startTelemetry(svrCfg)
if err != nil {
return err
}

emitServerInfoMetrics()

ctx, cancelFn := context.WithCancel(context.Background())
ctx := getCtx(svrCtx)
g, ctx := errgroup.WithContext(ctx)

// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(cancelFn, svrCtx.Logger)

grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
if err != nil {
return err
Expand Down Expand Up @@ -394,26 +349,46 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
return nil
})

// deferred cleanup function
// TODO: Make a generic cleanup function that takes in several func(), and runs them all.
// then we defer that.
defer func() {
if tmNode != nil && tmNode.IsRunning() {
_ = tmNode.Stop()
_ = app.Close()
}
// wait for signal capture and gracefully return
return g.Wait()
}

func startApp(svrCtx *Context, appCreator types.AppCreator, opts StartCmdOptions) (app types.Application, cleanupFn func(), err error) {
traceWriter, traceCleanupFn, err := setupTraceWriter(svrCtx)
if err != nil {
return app, traceCleanupFn, err
}

if traceWriterCleanup != nil {
traceWriterCleanup()
home := svrCtx.Config.RootDir
db, err := opts.DBOpener(home, GetAppDBBackend(svrCtx.Viper))
if err != nil {
return app, traceCleanupFn, err
}

app = appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)
cleanupFn = func() {
traceCleanupFn()
if localErr := app.Close(); localErr != nil {
svrCtx.Logger.Error(localErr.Error())
}
}()
}
return app, cleanupFn, nil
}

// wait for signal capture and gracefully return
return g.Wait()
func getCtx(svrCtx *Context) context.Context {
ctx, cancelFn := context.WithCancel(context.Background())
// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(cancelFn, svrCtx.Logger)
return ctx
}

// TODO: Move nodeKey into being created within the function.
func startCmtNode(cfg *cmtcfg.Config, nodeKey *p2p.NodeKey, app types.Application, svrCtx *Context) (tmNode *node.Node, err error) {
func startCmtNode(cfg *cmtcfg.Config, app types.Application, svrCtx *Context) (tmNode *node.Node, cleanupFn func(), err error) {
cleanupFn = func() {}
nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
if err != nil {
return nil, cleanupFn, err
}

tmNode, err = node.NewNode(
cfg,
pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()),
Expand All @@ -425,13 +400,21 @@ func startCmtNode(cfg *cmtcfg.Config, nodeKey *p2p.NodeKey, app types.Applicatio
servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger},
)
if err != nil {
return tmNode, err
return tmNode, cleanupFn, err
}

if err := tmNode.Start(); err != nil {
return tmNode, err
return tmNode, cleanupFn, err
}
return tmNode, nil

cleanupFn = func() {
if tmNode != nil && tmNode.IsRunning() {
_ = tmNode.Stop()
_ = app.Close()
}
}

return tmNode, cleanupFn, nil
}

func getAndValidateConfig(svrCtx *Context) (serverconfig.Config, error) {
Expand Down Expand Up @@ -459,15 +442,15 @@ func getGenDocProvider(cfg *cmtcfg.Config) func() (*cmttypes.GenesisDoc, error)
}

func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func(), err error) {
// clean up the traceWriter when the server is shutting down
cleanup = func() {}

traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err = openTraceWriter(traceWriterFile)
if err != nil {
return traceWriter, cleanup, err
}

// clean up the traceWriter when the server is shutting down
cleanup = func() {}

// if flagTraceStore is not used then traceWriter is nil
if traceWriter != nil {
cleanup = func() {
Expand Down Expand Up @@ -542,6 +525,7 @@ func startAPIServer(ctx context.Context, g *errgroup.Group, cmtCfg *cmtcfg.Confi
}
// TODO: Why do we reload and unmarshal the entire genesis doc in order to get the chain ID.
// surely theres a better way. This is likely a serious node start time overhead.
// Shouldn't it be in cmtCfg.ChainID() ?
genDocProvider := getGenDocProvider(cmtCfg)
genDoc, err := genDocProvider()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
SnapshotManager() *snapshots.Manager

// Close is called in start cmd to gracefully cleanup resources.
// Must be safe to be called multiple times.
Close() error
}

Expand Down

0 comments on commit 38f27e3

Please sign in to comment.