Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: More cleanup of server/start.go #16238

Merged
merged 9 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (store) [#16067](https://github.com/cosmos/cosmos-sdk/pull/16067) Add local snapshots management commands.
* (server) [#16142](https://github.com/cosmos/cosmos-sdk/pull/16142) Remove JSON Indentation from the GRPC to REST gateway's responses. (Saving bandwidth)
* (baseapp) [#16193](https://github.com/cosmos/cosmos-sdk/pull/16193) Add `Close` method to `BaseApp` for custom app to cleanup resource in graceful shutdown.
* (server) [#16238](https://github.com/cosmos/cosmos-sdk/pull/16238) Don't setup p2p node keys if starting a node in GRPC only mode.

### State Machine Breaking

Expand Down
158 changes: 68 additions & 90 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,10 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
withCMT, _ := cmd.Flags().GetBool(flagWithComet)
if !withCMT {
serverCtx.Logger.Info("starting ABCI without CometBFT")

return wrapCPUProfile(serverCtx, func() error {
return startStandAlone(serverCtx, appCreator, opts)
})
}

return wrapCPUProfile(serverCtx, func() error {
return startInProcess(serverCtx, clientCtx, appCreator, opts)
return start(serverCtx, clientCtx, appCreator, withCMT, opts)
})
},
}
Expand Down Expand Up @@ -230,40 +226,34 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
return cmd
}

func startStandAlone(svrCtx *Context, appCreator types.AppCreator, opts StartCmdOptions) error {
addr := svrCtx.Viper.GetString(flagAddress)
transport := svrCtx.Viper.GetString(flagTransport)
home := svrCtx.Viper.GetString(flags.FlagHome)

db, err := opts.DBOpener(home, GetAppDBBackend(svrCtx.Viper))
func start(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator, withCmt bool, opts StartCmdOptions) error {
svrCfg, err := getAndValidateConfig(svrCtx)
if err != nil {
return err
}

// TODO: Should we be using startTraceServer, and defer closing the traceWriter?
// right now its left unclosed
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err := openTraceWriter(traceWriterFile)
app, appCleanupFn, err := startApp(svrCtx, appCreator, opts)
if err != nil {
return err
}
defer appCleanupFn()
Copy link
Contributor Author

@ValarDragon ValarDragon May 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tracestore defer is moved up here, which is equivalent to before with startInProcess, as the tracestore was cleaned up after the tmNode.

(It wasn't cleaned up for startStandAlone at all before)


app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)

config, err := serverconfig.GetConfig(svrCtx.Viper)
metrics, err := startTelemetry(svrCfg)
if err != nil {
return err
}

if err := config.ValidateBasic(); err != nil {
return err
}
emitServerInfoMetrics()

if _, err := startTelemetry(config); err != nil {
return err
if !withCmt {
return startStandAlone(svrCtx, app, opts)
}
return startInProcess(svrCtx, svrCfg, clientCtx, app, metrics, opts)
}

emitServerInfoMetrics()
func startStandAlone(svrCtx *Context, app types.Application, opts StartCmdOptions) error {
addr := svrCtx.Viper.GetString(flagAddress)
transport := svrCtx.Viper.GetString(flagTransport)

svr, err := server.NewServer(addr, transport, app)
if err != nil {
Expand All @@ -272,12 +262,9 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator, opts StartCmd

svr.SetLogger(servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger.With("module", "abci-server")})

ctx, cancelFn := context.WithCancel(context.Background())
ctx := getCtx(svrCtx)
g, ctx := errgroup.WithContext(ctx)

// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(cancelFn, svrCtx.Logger)

g.Go(func() error {
if err := svr.Start(); err != nil {
svrCtx.Logger.Error("failed to start out-of-process ABCI server", "err", err)
Expand All @@ -294,47 +281,25 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator, opts StartCmd
return g.Wait()
}

func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator, opts StartCmdOptions) error {
func startInProcess(svrCtx *Context, svrCfg serverconfig.Config, clientCtx client.Context, app types.Application,
metrics *telemetry.Metrics, opts StartCmdOptions,
) error {
cmtCfg := svrCtx.Config
home := cmtCfg.RootDir

db, err := opts.DBOpener(home, GetAppDBBackend(svrCtx.Viper))
if err != nil {
return err
}

svrCfg, err := getAndValidateConfig(svrCtx)
if err != nil {
return err
}

traceWriter, traceWriterCleanup, err := setupTraceWriter(svrCtx)
if err != nil {
return err
}

app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)

// TODO: Move this to only be done if were launching the node. (So not in GRPC-only mode)
nodeKey, err := p2p.LoadOrGenNodeKey(cmtCfg.NodeKeyFile())
if err != nil {
return err
}

var (
tmNode *node.Node
gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly)
)
gRPCOnly := svrCtx.Viper.GetBool(flagGRPCOnly)

if gRPCOnly {
// TODO: Generalize logic so that gRPC only is really in startStandAlone
svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
svrCfg.GRPC.Enable = true
} else {
svrCtx.Logger.Info("starting node with ABCI CometBFT in-process")
tmNode, err = startCmtNode(cmtCfg, nodeKey, app, svrCtx)
tmNode, cleanupFn, err := startCmtNode(cmtCfg, app, svrCtx)
if err != nil {
return err
}
defer cleanupFn()

// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC is enabled, and avoid doing so in the general
Expand All @@ -350,19 +315,9 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
}
}

metrics, err := startTelemetry(svrCfg)
if err != nil {
return err
}

emitServerInfoMetrics()

ctx, cancelFn := context.WithCancel(context.Background())
ctx := getCtx(svrCtx)
g, ctx := errgroup.WithContext(ctx)

// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(cancelFn, svrCtx.Logger)

grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
if err != nil {
return err
Expand Down Expand Up @@ -394,26 +349,40 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
return nil
})

// deferred cleanup function
// TODO: Make a generic cleanup function that takes in several func(), and runs them all.
// then we defer that.
defer func() {
if tmNode != nil && tmNode.IsRunning() {
_ = tmNode.Stop()
_ = app.Close()
}

if traceWriterCleanup != nil {
traceWriterCleanup()
}
}()

// wait for signal capture and gracefully return
return g.Wait()
}

// TODO: Move nodeKey into being created within the function.
func startCmtNode(cfg *cmtcfg.Config, nodeKey *p2p.NodeKey, app types.Application, svrCtx *Context) (tmNode *node.Node, err error) {
func startApp(svrCtx *Context, appCreator types.AppCreator, opts StartCmdOptions) (app types.Application, cleanupFn func(), err error) {
traceWriter, cleanupFn, err := setupTraceWriter(svrCtx)
if err != nil {
return app, cleanupFn, err
}

home := svrCtx.Config.RootDir
db, err := opts.DBOpener(home, GetAppDBBackend(svrCtx.Viper))
if err != nil {
return app, cleanupFn, err
}

app = appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)
return app, cleanupFn, nil
}

func getCtx(svrCtx *Context) context.Context {
ctx, cancelFn := context.WithCancel(context.Background())
// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(cancelFn, svrCtx.Logger)
return ctx
}

func startCmtNode(cfg *cmtcfg.Config, app types.Application, svrCtx *Context) (tmNode *node.Node, cleanupFn func(), err error) {
cleanupFn = func() {}
nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
if err != nil {
return nil, cleanupFn, err
}

tmNode, err = node.NewNode(
cfg,
pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()),
Expand All @@ -425,13 +394,21 @@ func startCmtNode(cfg *cmtcfg.Config, nodeKey *p2p.NodeKey, app types.Applicatio
servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger},
)
if err != nil {
return tmNode, err
return tmNode, cleanupFn, err
}

if err := tmNode.Start(); err != nil {
return tmNode, err
return tmNode, cleanupFn, err
}
return tmNode, nil

cleanupFn = func() {
if tmNode != nil && tmNode.IsRunning() {
_ = tmNode.Stop()
_ = app.Close()
}
}

return tmNode, cleanupFn, nil
}

func getAndValidateConfig(svrCtx *Context) (serverconfig.Config, error) {
Expand Down Expand Up @@ -459,15 +436,15 @@ func getGenDocProvider(cfg *cmtcfg.Config) func() (*cmttypes.GenesisDoc, error)
}

func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func(), err error) {
// clean up the traceWriter when the server is shutting down
cleanup = func() {}

traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err = openTraceWriter(traceWriterFile)
if err != nil {
return traceWriter, cleanup, err
}

// clean up the traceWriter when the server is shutting down
cleanup = func() {}

// if flagTraceStore is not used then traceWriter is nil
if traceWriter != nil {
cleanup = func() {
Expand Down Expand Up @@ -542,6 +519,7 @@ func startAPIServer(ctx context.Context, g *errgroup.Group, cmtCfg *cmtcfg.Confi
}
// TODO: Why do we reload and unmarshal the entire genesis doc in order to get the chain ID.
// surely theres a better way. This is likely a serious node start time overhead.
// Shouldn't it be in cmtCfg.ChainID() ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the genesis and config can have the incorrect values if a user is syncing from a snapshot, im not sure if this is needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, RIP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this into a follow-up issue for StartAPIServer: how to get chain-id ?

genDocProvider := getGenDocProvider(cmtCfg)
genDoc, err := genDocProvider()
if err != nil {
Expand Down