refactor: More cleanup of server/start.go (#16238)
This commit is contained in:
parent
c8ab555174
commit
38f27e3d1a
@ -118,6 +118,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
* (store) [#16067](https://github.com/cosmos/cosmos-sdk/pull/16067) Add local snapshots management commands.
|
||||
* (server) [#16142](https://github.com/cosmos/cosmos-sdk/pull/16142) Remove JSON Indentation from the GRPC to REST gateway's responses. (Saving bandwidth)
|
||||
* (baseapp) [#16193](https://github.com/cosmos/cosmos-sdk/pull/16193) Add `Close` method to `BaseApp` for custom app to cleanup resource in graceful shutdown.
|
||||
* (server) [#16238](https://github.com/cosmos/cosmos-sdk/pull/16238) Don't setup p2p node keys if starting a node in GRPC only mode.
|
||||
|
||||
### State Machine Breaking
|
||||
|
||||
|
||||
168
server/start.go
168
server/start.go
@ -166,14 +166,10 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
|
||||
withCMT, _ := cmd.Flags().GetBool(flagWithComet)
|
||||
if !withCMT {
|
||||
serverCtx.Logger.Info("starting ABCI without CometBFT")
|
||||
|
||||
return wrapCPUProfile(serverCtx, func() error {
|
||||
return startStandAlone(serverCtx, appCreator, opts)
|
||||
})
|
||||
}
|
||||
|
||||
return wrapCPUProfile(serverCtx, func() error {
|
||||
return startInProcess(serverCtx, clientCtx, appCreator, opts)
|
||||
return start(serverCtx, clientCtx, appCreator, withCMT, opts)
|
||||
})
|
||||
},
|
||||
}
|
||||
@ -230,41 +226,35 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
|
||||
return cmd
|
||||
}
|
||||
|
||||
func startStandAlone(svrCtx *Context, appCreator types.AppCreator, opts StartCmdOptions) error {
|
||||
addr := svrCtx.Viper.GetString(flagAddress)
|
||||
transport := svrCtx.Viper.GetString(flagTransport)
|
||||
home := svrCtx.Viper.GetString(flags.FlagHome)
|
||||
|
||||
db, err := opts.DBOpener(home, GetAppDBBackend(svrCtx.Viper))
|
||||
func start(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator, withCmt bool, opts StartCmdOptions) error {
|
||||
svrCfg, err := getAndValidateConfig(svrCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Should we be using startTraceServer, and defer closing the traceWriter?
|
||||
// right now its left unclosed
|
||||
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
|
||||
traceWriter, err := openTraceWriter(traceWriterFile)
|
||||
app, appCleanupFn, err := startApp(svrCtx, appCreator, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer appCleanupFn()
|
||||
|
||||
app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)
|
||||
|
||||
config, err := serverconfig.GetConfig(svrCtx.Viper)
|
||||
metrics, err := startTelemetry(svrCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := config.ValidateBasic(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := startTelemetry(config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
emitServerInfoMetrics()
|
||||
|
||||
if !withCmt {
|
||||
return startStandAlone(svrCtx, app, opts)
|
||||
}
|
||||
return startInProcess(svrCtx, svrCfg, clientCtx, app, metrics, opts)
|
||||
}
|
||||
|
||||
func startStandAlone(svrCtx *Context, app types.Application, opts StartCmdOptions) error {
|
||||
addr := svrCtx.Viper.GetString(flagAddress)
|
||||
transport := svrCtx.Viper.GetString(flagTransport)
|
||||
|
||||
svr, err := server.NewServer(addr, transport, app)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating listener: %v", err)
|
||||
@ -272,12 +262,9 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator, opts StartCmd
|
||||
|
||||
svr.SetLogger(servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger.With("module", "abci-server")})
|
||||
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
ctx := getCtx(svrCtx)
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// listen for quit signals so the calling parent process can gracefully exit
|
||||
ListenForQuitSignals(cancelFn, svrCtx.Logger)
|
||||
|
||||
g.Go(func() error {
|
||||
if err := svr.Start(); err != nil {
|
||||
svrCtx.Logger.Error("failed to start out-of-process ABCI server", "err", err)
|
||||
@ -294,47 +281,25 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator, opts StartCmd
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator, opts StartCmdOptions) error {
|
||||
func startInProcess(svrCtx *Context, svrCfg serverconfig.Config, clientCtx client.Context, app types.Application,
|
||||
metrics *telemetry.Metrics, opts StartCmdOptions,
|
||||
) error {
|
||||
cmtCfg := svrCtx.Config
|
||||
home := cmtCfg.RootDir
|
||||
|
||||
db, err := opts.DBOpener(home, GetAppDBBackend(svrCtx.Viper))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
svrCfg, err := getAndValidateConfig(svrCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
traceWriter, traceWriterCleanup, err := setupTraceWriter(svrCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)
|
||||
|
||||
// TODO: Move this to only be done if were launching the node. (So not in GRPC-only mode)
|
||||
nodeKey, err := p2p.LoadOrGenNodeKey(cmtCfg.NodeKeyFile())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
tmNode *node.Node
|
||||
gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly)
|
||||
)
|
||||
gRPCOnly := svrCtx.Viper.GetBool(flagGRPCOnly)
|
||||
|
||||
if gRPCOnly {
|
||||
// TODO: Generalize logic so that gRPC only is really in startStandAlone
|
||||
svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
|
||||
svrCfg.GRPC.Enable = true
|
||||
} else {
|
||||
svrCtx.Logger.Info("starting node with ABCI CometBFT in-process")
|
||||
tmNode, err = startCmtNode(cmtCfg, nodeKey, app, svrCtx)
|
||||
tmNode, cleanupFn, err := startCmtNode(cmtCfg, app, svrCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cleanupFn()
|
||||
|
||||
// Add the tx service to the gRPC router. We only need to register this
|
||||
// service if API or gRPC is enabled, and avoid doing so in the general
|
||||
@ -350,19 +315,9 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
|
||||
}
|
||||
}
|
||||
|
||||
metrics, err := startTelemetry(svrCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
emitServerInfoMetrics()
|
||||
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
ctx := getCtx(svrCtx)
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// listen for quit signals so the calling parent process can gracefully exit
|
||||
ListenForQuitSignals(cancelFn, svrCtx.Logger)
|
||||
|
||||
grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -394,26 +349,46 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
|
||||
return nil
|
||||
})
|
||||
|
||||
// deferred cleanup function
|
||||
// TODO: Make a generic cleanup function that takes in several func(), and runs them all.
|
||||
// then we defer that.
|
||||
defer func() {
|
||||
if tmNode != nil && tmNode.IsRunning() {
|
||||
_ = tmNode.Stop()
|
||||
_ = app.Close()
|
||||
}
|
||||
|
||||
if traceWriterCleanup != nil {
|
||||
traceWriterCleanup()
|
||||
}
|
||||
}()
|
||||
|
||||
// wait for signal capture and gracefully return
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
// TODO: Move nodeKey into being created within the function.
|
||||
func startCmtNode(cfg *cmtcfg.Config, nodeKey *p2p.NodeKey, app types.Application, svrCtx *Context) (tmNode *node.Node, err error) {
|
||||
func startApp(svrCtx *Context, appCreator types.AppCreator, opts StartCmdOptions) (app types.Application, cleanupFn func(), err error) {
|
||||
traceWriter, traceCleanupFn, err := setupTraceWriter(svrCtx)
|
||||
if err != nil {
|
||||
return app, traceCleanupFn, err
|
||||
}
|
||||
|
||||
home := svrCtx.Config.RootDir
|
||||
db, err := opts.DBOpener(home, GetAppDBBackend(svrCtx.Viper))
|
||||
if err != nil {
|
||||
return app, traceCleanupFn, err
|
||||
}
|
||||
|
||||
app = appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)
|
||||
cleanupFn = func() {
|
||||
traceCleanupFn()
|
||||
if localErr := app.Close(); localErr != nil {
|
||||
svrCtx.Logger.Error(localErr.Error())
|
||||
}
|
||||
}
|
||||
return app, cleanupFn, nil
|
||||
}
|
||||
|
||||
func getCtx(svrCtx *Context) context.Context {
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
// listen for quit signals so the calling parent process can gracefully exit
|
||||
ListenForQuitSignals(cancelFn, svrCtx.Logger)
|
||||
return ctx
|
||||
}
|
||||
|
||||
func startCmtNode(cfg *cmtcfg.Config, app types.Application, svrCtx *Context) (tmNode *node.Node, cleanupFn func(), err error) {
|
||||
cleanupFn = func() {}
|
||||
nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
|
||||
if err != nil {
|
||||
return nil, cleanupFn, err
|
||||
}
|
||||
|
||||
tmNode, err = node.NewNode(
|
||||
cfg,
|
||||
pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()),
|
||||
@ -425,13 +400,21 @@ func startCmtNode(cfg *cmtcfg.Config, nodeKey *p2p.NodeKey, app types.Applicatio
|
||||
servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger},
|
||||
)
|
||||
if err != nil {
|
||||
return tmNode, err
|
||||
return tmNode, cleanupFn, err
|
||||
}
|
||||
|
||||
if err := tmNode.Start(); err != nil {
|
||||
return tmNode, err
|
||||
return tmNode, cleanupFn, err
|
||||
}
|
||||
return tmNode, nil
|
||||
|
||||
cleanupFn = func() {
|
||||
if tmNode != nil && tmNode.IsRunning() {
|
||||
_ = tmNode.Stop()
|
||||
_ = app.Close()
|
||||
}
|
||||
}
|
||||
|
||||
return tmNode, cleanupFn, nil
|
||||
}
|
||||
|
||||
func getAndValidateConfig(svrCtx *Context) (serverconfig.Config, error) {
|
||||
@ -459,15 +442,15 @@ func getGenDocProvider(cfg *cmtcfg.Config) func() (*cmttypes.GenesisDoc, error)
|
||||
}
|
||||
|
||||
func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func(), err error) {
|
||||
// clean up the traceWriter when the server is shutting down
|
||||
cleanup = func() {}
|
||||
|
||||
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
|
||||
traceWriter, err = openTraceWriter(traceWriterFile)
|
||||
if err != nil {
|
||||
return traceWriter, cleanup, err
|
||||
}
|
||||
|
||||
// clean up the traceWriter when the server is shutting down
|
||||
cleanup = func() {}
|
||||
|
||||
// if flagTraceStore is not used then traceWriter is nil
|
||||
if traceWriter != nil {
|
||||
cleanup = func() {
|
||||
@ -542,6 +525,7 @@ func startAPIServer(ctx context.Context, g *errgroup.Group, cmtCfg *cmtcfg.Confi
|
||||
}
|
||||
// TODO: Why do we reload and unmarshal the entire genesis doc in order to get the chain ID.
|
||||
// surely theres a better way. This is likely a serious node start time overhead.
|
||||
// Shouldn't it be in cmtCfg.ChainID() ?
|
||||
genDocProvider := getGenDocProvider(cmtCfg)
|
||||
genDoc, err := genDocProvider()
|
||||
if err != nil {
|
||||
|
||||
@ -60,6 +60,7 @@ type (
|
||||
SnapshotManager() *snapshots.Manager
|
||||
|
||||
// Close is called in start cmd to gracefully cleanup resources.
|
||||
// Must be safe to be called multiple times.
|
||||
Close() error
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user