refactor: cleanup server logic (#15041)
This commit is contained in:
parent
df6f25dfee
commit
7f99ad5fe7
@ -80,6 +80,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
|
||||
### Improvements
|
||||
|
||||
* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Remove unnecessary sleeps from gRPC and API server initiation. The servers will start and accept requests as soon as they're ready.
|
||||
* (x/staking) [#14864](https://github.com/cosmos/cosmos-sdk/pull/14864) `create-validator` CLI command now takes a json file as an arg instead of having a bunch of required flags to it.
|
||||
* (cli) [#14659](https://github.com/cosmos/cosmos-sdk/pull/14659) Added ability to query blocks by either height/hash `simd q block --type=height|hash <height|hash>`.
|
||||
* (store) [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height.
|
||||
@ -179,6 +180,11 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
|
||||
### API Breaking Changes
|
||||
|
||||
* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Refactor how gRPC and API servers are started to remove unnecessary sleeps:
|
||||
* Remove `ServerStartTime` constant.
|
||||
* Rename `WaitForQuitSignals` to `ListenForQuitSignals`. Note, this function is no longer blocking. Thus the caller is expected to provide a `context.CancelFunc` which indicates that when a signal is caught, that any spawned processes can gracefully exit.
|
||||
* `api.Server#Start` now accepts a `context.Context`. The caller is responsible for ensuring that the context is canceled such that the API server can gracefully exit. The caller does not need to stop the server.
|
||||
* To start the gRPC server you must first create the server via `NewGRPCServer`, after which you can start the gRPC server via `StartGRPCServer` which accepts a `context.Context`. The caller is responsible for ensuring that the context is canceled such that the gRPC server can gracefully exit. The caller does not need to stop the server.
|
||||
* (types) [#15067](https://github.com/cosmos/cosmos-sdk/pull/15067) Remove deprecated alias from `types/errors`. Use `cosmossdk.io/errors` instead.
|
||||
* (testutil) [#14991](https://github.com/cosmos/cosmos-sdk/pull/14991) The `testutil/testdata_pulsar` package has moved to `testutil/testdata/testpb`.
|
||||
* (simapp) [#14977](https://github.com/cosmos/cosmos-sdk/pull/14977) Move simulation helpers functions (`AppStateFn` and `AppStateRandomizedFn`) to `testutil/sims`. These takes an extra genesisState argument which is the default state of the app.
|
||||
|
||||
1
go.mod
1
go.mod
@ -54,6 +54,7 @@ require (
|
||||
github.com/tendermint/go-amino v0.16.0
|
||||
golang.org/x/crypto v0.6.0
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
|
||||
golang.org/x/sync v0.1.0
|
||||
google.golang.org/genproto v0.0.0-20230202175211-008b39050e57
|
||||
google.golang.org/grpc v1.53.0
|
||||
google.golang.org/protobuf v1.28.2-0.20230208135220-49eaa78c6c9c
|
||||
|
||||
1
go.sum
1
go.sum
@ -991,6 +991,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -29,11 +30,10 @@ type Server struct {
|
||||
Router *mux.Router
|
||||
GRPCGatewayRouter *runtime.ServeMux
|
||||
ClientCtx client.Context
|
||||
GRPCSrv *grpc.Server
|
||||
logger log.Logger
|
||||
metrics *telemetry.Metrics
|
||||
|
||||
GRPCSrv *grpc.Server
|
||||
|
||||
logger log.Logger
|
||||
metrics *telemetry.Metrics
|
||||
// Start() is blocking and generally called from a separate goroutine.
|
||||
// Close() can be called asynchronously and access shared memory
|
||||
// via the listener. Therefore, we sync access to Start and Close with
|
||||
@ -51,6 +51,7 @@ func CustomGRPCHeaderMatcher(key string) (string, bool) {
|
||||
switch strings.ToLower(key) {
|
||||
case grpctypes.GRPCBlockHeightHeader:
|
||||
return grpctypes.GRPCBlockHeightHeader, true
|
||||
|
||||
default:
|
||||
return runtime.DefaultHeaderMatcher(key)
|
||||
}
|
||||
@ -88,9 +89,12 @@ func New(clientCtx client.Context, logger log.Logger, grpcSrv *grpc.Server) *Ser
|
||||
|
||||
// Start starts the API server. Internally, the API server leverages CometBFT's
|
||||
// JSON RPC server. Configuration options are provided via config.APIConfig
|
||||
// and are delegated to the CometBFT JSON RPC server. The process is
|
||||
// non-blocking, so an external signal handler must be used.
|
||||
func (s *Server) Start(cfg config.Config) error {
|
||||
// and are delegated to the CometBFT JSON RPC server.
|
||||
//
|
||||
// Note, this creates a blocking process if the server is started successfully.
|
||||
// Otherwise, an error is returned. The caller is expected to provide a Context
|
||||
// that is properly canceled or closed to indicate the server should be stopped.
|
||||
func (s *Server) Start(ctx context.Context, cfg config.Config) error {
|
||||
s.mtx.Lock()
|
||||
|
||||
cmtCfg := tmrpcserver.DefaultConfig()
|
||||
@ -134,13 +138,35 @@ func (s *Server) Start(cfg config.Config) error {
|
||||
// register grpc-gateway routes (after grpc-web server as the first match is used)
|
||||
s.Router.PathPrefix("/").Handler(s.GRPCGatewayRouter)
|
||||
|
||||
s.logger.Info("starting API server...")
|
||||
if cfg.API.EnableUnsafeCORS {
|
||||
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
|
||||
return tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, cmtCfg)
|
||||
}
|
||||
errCh := make(chan error)
|
||||
|
||||
return tmrpcserver.Serve(s.listener, s.Router, s.logger, cmtCfg)
|
||||
// Start the API in an external goroutine as Serve is blocking and will return
|
||||
// an error upon failure, which we'll send on the error channel that will be
|
||||
// consumed by the for block below.
|
||||
go func(enableUnsafeCORS bool) {
|
||||
s.logger.Info("starting API server...", "address", cfg.API.Address)
|
||||
|
||||
if enableUnsafeCORS {
|
||||
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
|
||||
errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, cmtCfg)
|
||||
} else {
|
||||
errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, cmtCfg)
|
||||
}
|
||||
}(cfg.API.EnableUnsafeCORS)
|
||||
|
||||
// Start a blocking select to wait for an indication to stop the server or that
|
||||
// the server failed to start properly.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// The calling process cancelled or closed the provided context, so we must
|
||||
// gracefully stop the API server.
|
||||
s.logger.Info("stopping API server...", "address", cfg.API.Address)
|
||||
return s.Close()
|
||||
|
||||
case err := <-errCh:
|
||||
s.logger.Error("failed to start API server", "err", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the API server.
|
||||
|
||||
@ -1,10 +1,11 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
@ -17,8 +18,9 @@ import (
|
||||
_ "github.com/cosmos/cosmos-sdk/types/tx/amino" // Import amino.proto file for reflection
|
||||
)
|
||||
|
||||
// StartGRPCServer starts a gRPC server on the given address.
|
||||
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
|
||||
// NewGRPCServer returns a correctly configured and initialized gRPC server.
|
||||
// Note, the caller is responsible for starting the server. See StartGRPCServer.
|
||||
func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
|
||||
maxSendMsgSize := cfg.MaxSendMsgSize
|
||||
if maxSendMsgSize == 0 {
|
||||
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
|
||||
@ -46,6 +48,7 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
|
||||
for _, m := range clientCtx.TxConfig.SignModeHandler().Modes() {
|
||||
modes[m.String()] = (int32)(m)
|
||||
}
|
||||
|
||||
return modes
|
||||
}(),
|
||||
ChainID: clientCtx.ChainID,
|
||||
@ -53,32 +56,50 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
|
||||
InterfaceRegistry: clientCtx.InterfaceRegistry,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to register reflection service: %w", err)
|
||||
}
|
||||
|
||||
// Reflection allows external clients to see what services and methods
|
||||
// the gRPC server exposes.
|
||||
gogoreflection.Register(grpcSrv)
|
||||
|
||||
return grpcSrv, nil
|
||||
}
|
||||
|
||||
// StartGRPCServer starts the provided gRPC server on the address specified in cfg.
|
||||
//
|
||||
// Note, this creates a blocking process if the server is started successfully.
|
||||
// Otherwise, an error is returned. The caller is expected to provide a Context
|
||||
// that is properly canceled or closed to indicate the server should be stopped.
|
||||
func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error {
|
||||
listener, err := net.Listen("tcp", cfg.Address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err)
|
||||
}
|
||||
|
||||
errCh := make(chan error)
|
||||
|
||||
// Start the gRPC in an external goroutine as Serve is blocking and will return
|
||||
// an error upon failure, which we'll send on the error channel that will be
|
||||
// consumed by the for block below.
|
||||
go func() {
|
||||
err = grpcSrv.Serve(listener)
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("failed to serve: %w", err)
|
||||
}
|
||||
logger.Info("starting gRPC server...", "address", cfg.Address)
|
||||
errCh <- grpcSrv.Serve(listener)
|
||||
}()
|
||||
|
||||
// Start a blocking select to wait for an indication to stop the server or that
|
||||
// the server failed to start properly.
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return nil, err
|
||||
case <-ctx.Done():
|
||||
// The calling process cancelled or closed the provided context, so we must
|
||||
// gracefully stop the gRPC server.
|
||||
logger.Info("stopping gRPC server...", "address", cfg.Address)
|
||||
grpcSrv.GracefulStop()
|
||||
|
||||
case <-time.After(types.ServerStartTime):
|
||||
// assume server started successfully
|
||||
return grpcSrv, nil
|
||||
return nil
|
||||
|
||||
case err := <-errCh:
|
||||
logger.Error("failed to start gRPC server", "err", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
281
server/start.go
281
server/start.go
@ -1,12 +1,13 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"runtime/pprof"
|
||||
"time"
|
||||
|
||||
pruningtypes "cosmossdk.io/store/pruning/types"
|
||||
"github.com/cometbft/cometbft/abci/server"
|
||||
cmtcmd "github.com/cometbft/cometbft/cmd/cometbft/commands"
|
||||
"github.com/cometbft/cometbft/node"
|
||||
@ -17,11 +18,10 @@ import (
|
||||
cmttypes "github.com/cometbft/cometbft/types"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
pruningtypes "cosmossdk.io/store/pruning/types"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
"github.com/cosmos/cosmos-sdk/client/flags"
|
||||
"github.com/cosmos/cosmos-sdk/codec"
|
||||
@ -137,22 +137,15 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
|
||||
withCMT, _ := cmd.Flags().GetBool(flagWithComet)
|
||||
if !withCMT {
|
||||
serverCtx.Logger.Info("starting ABCI without CometBFT")
|
||||
|
||||
return wrapCPUProfile(serverCtx, func() error {
|
||||
return startStandAlone(serverCtx, appCreator)
|
||||
})
|
||||
}
|
||||
|
||||
// amino is needed here for backwards compatibility of REST routes
|
||||
err = wrapCPUProfile(serverCtx, func() error {
|
||||
return wrapCPUProfile(serverCtx, func() error {
|
||||
return startInProcess(serverCtx, clientCtx, appCreator)
|
||||
})
|
||||
errCode, ok := err.(ErrorCode)
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
serverCtx.Logger.Debug(fmt.Sprintf("received quit signal: %d", errCode.Code))
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
@ -173,7 +166,6 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
|
||||
cmd.Flags().Uint64(FlagPruningInterval, 0, "Height interval at which pruned heights are removed from disk (ignored if pruning is not 'custom')")
|
||||
cmd.Flags().Uint(FlagInvCheckPeriod, 0, "Assert registered invariants every N blocks")
|
||||
cmd.Flags().Uint64(FlagMinRetainBlocks, 0, "Minimum block height offset during ABCI commit to prune CometBFT blocks")
|
||||
|
||||
cmd.Flags().Bool(FlagAPIEnable, false, "Define if the API server should be enabled")
|
||||
cmd.Flags().Bool(FlagAPISwagger, false, "Define if swagger documentation should automatically be registered (Note: the API must also be enabled)")
|
||||
cmd.Flags().String(FlagAPIAddress, serverconfig.DefaultAPIAddress, "the API server address to listen on")
|
||||
@ -182,18 +174,13 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
|
||||
cmd.Flags().Uint(FlagRPCWriteTimeout, 0, "Define the CometBFT RPC write timeout (in seconds)")
|
||||
cmd.Flags().Uint(FlagRPCMaxBodyBytes, 1000000, "Define the CometBFT maximum request body (in bytes)")
|
||||
cmd.Flags().Bool(FlagAPIEnableUnsafeCORS, false, "Define if CORS should be enabled (unsafe - use it at your own risk)")
|
||||
|
||||
cmd.Flags().Bool(flagGRPCOnly, false, "Start the node in gRPC query only mode (no CometBFT process is started)")
|
||||
cmd.Flags().Bool(flagGRPCEnable, true, "Define if the gRPC server should be enabled")
|
||||
cmd.Flags().String(flagGRPCAddress, serverconfig.DefaultGRPCAddress, "the gRPC server address to listen on")
|
||||
|
||||
cmd.Flags().Bool(flagGRPCWebEnable, true, "Define if the gRPC-Web server should be enabled. (Note: gRPC must also be enabled)")
|
||||
|
||||
cmd.Flags().Uint64(FlagStateSyncSnapshotInterval, 0, "State sync snapshot interval")
|
||||
cmd.Flags().Uint32(FlagStateSyncSnapshotKeepRecent, 2, "State sync snapshot to keep")
|
||||
|
||||
cmd.Flags().Bool(FlagDisableIAVLFastNode, false, "Disable fast node for IAVL tree")
|
||||
|
||||
cmd.Flags().Int(FlagMempoolMaxTxs, mempool.DefaultMaxTx, "Sets MaxTx value for the app-side mempool")
|
||||
|
||||
// support old flags name for backwards compatibility
|
||||
@ -210,31 +197,30 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
|
||||
return cmd
|
||||
}
|
||||
|
||||
func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
|
||||
addr := ctx.Viper.GetString(flagAddress)
|
||||
transport := ctx.Viper.GetString(flagTransport)
|
||||
home := ctx.Viper.GetString(flags.FlagHome)
|
||||
func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error {
|
||||
addr := svrCtx.Viper.GetString(flagAddress)
|
||||
transport := svrCtx.Viper.GetString(flagTransport)
|
||||
home := svrCtx.Viper.GetString(flags.FlagHome)
|
||||
|
||||
db, err := openDB(home, GetAppDBBackend(ctx.Viper))
|
||||
db, err := openDB(home, GetAppDBBackend(svrCtx.Viper))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
traceWriterFile := ctx.Viper.GetString(flagTraceStore)
|
||||
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
|
||||
traceWriter, err := openTraceWriter(traceWriterFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
app := appCreator(ctx.Logger, db, traceWriter, ctx.Viper)
|
||||
app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)
|
||||
|
||||
config, err := serverconfig.GetConfig(ctx.Viper)
|
||||
config, err := serverconfig.GetConfig(svrCtx.Viper)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = startTelemetry(config)
|
||||
if err != nil {
|
||||
if _, err := startTelemetry(config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -243,52 +229,58 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
|
||||
return fmt.Errorf("error creating listener: %v", err)
|
||||
}
|
||||
|
||||
svr.SetLogger(ctx.Logger.With("module", "abci-server"))
|
||||
svr.SetLogger(svrCtx.Logger.With("module", "abci-server"))
|
||||
|
||||
err = svr.Start()
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
defer func() {
|
||||
if err = svr.Stop(); err != nil {
|
||||
fmt.Println(err.Error())
|
||||
os.Exit(1)
|
||||
// listen for quit signals so the calling parent process can gracefully exit
|
||||
ListenForQuitSignals(cancelFn, svrCtx.Logger)
|
||||
|
||||
g.Go(func() error {
|
||||
if err := svr.Start(); err != nil {
|
||||
svrCtx.Logger.Error("failed to start out-of-process ABCI server", "err", err)
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for SIGINT or SIGTERM signal
|
||||
return WaitForQuitSignals()
|
||||
// Wait for the calling process to be cancelled or close the provided context,
|
||||
// so we can gracefully stop the ABCI server.
|
||||
<-ctx.Done()
|
||||
svrCtx.Logger.Info("stopping the ABCI server...")
|
||||
return svr.Stop()
|
||||
})
|
||||
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
|
||||
cfg := ctx.Config
|
||||
func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
|
||||
cfg := svrCtx.Config
|
||||
home := cfg.RootDir
|
||||
|
||||
db, err := openDB(home, GetAppDBBackend(ctx.Viper))
|
||||
db, err := openDB(home, GetAppDBBackend(svrCtx.Viper))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
traceWriterFile := ctx.Viper.GetString(flagTraceStore)
|
||||
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
|
||||
traceWriter, err := openTraceWriter(traceWriterFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Clean up the traceWriter when the server is shutting down.
|
||||
// clean up the traceWriter when the server is shutting down
|
||||
var traceWriterCleanup func()
|
||||
|
||||
// if flagTraceStore is not used then traceWriter is nil
|
||||
if traceWriter != nil {
|
||||
traceWriterCleanup = func() {
|
||||
if err = traceWriter.Close(); err != nil {
|
||||
ctx.Logger.Error("failed to close trace writer", "err", err)
|
||||
svrCtx.Logger.Error("failed to close trace writer", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
config, err := serverconfig.GetConfig(ctx.Viper)
|
||||
config, err := serverconfig.GetConfig(svrCtx.Viper)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -297,12 +289,13 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
|
||||
return err
|
||||
}
|
||||
|
||||
app := appCreator(ctx.Logger, db, traceWriter, ctx.Viper)
|
||||
app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)
|
||||
|
||||
nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
genDocProvider := func() (*cmttypes.GenesisDoc, error) {
|
||||
appGenesis, err := genutiltypes.AppGenesisFromFile(cfg.GenesisFile())
|
||||
if err != nil {
|
||||
@ -314,14 +307,14 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
|
||||
|
||||
var (
|
||||
tmNode *node.Node
|
||||
gRPCOnly = ctx.Viper.GetBool(flagGRPCOnly)
|
||||
gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly)
|
||||
)
|
||||
|
||||
if gRPCOnly {
|
||||
ctx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
|
||||
svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
|
||||
config.GRPC.Enable = true
|
||||
} else {
|
||||
ctx.Logger.Info("starting node with ABCI CometBFT in-process")
|
||||
svrCtx.Logger.Info("starting node with ABCI CometBFT in-process")
|
||||
|
||||
tmNode, err = node.NewNode(
|
||||
cfg,
|
||||
@ -331,7 +324,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
|
||||
genDocProvider,
|
||||
node.DefaultDBProvider,
|
||||
node.DefaultMetricsProvider(cfg.Instrumentation),
|
||||
ctx.Logger,
|
||||
svrCtx.Logger,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -346,8 +339,8 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
|
||||
// service if API or gRPC is enabled, and avoid doing so in the general
|
||||
// case, because it spawns a new local CometBFT RPC client.
|
||||
if (config.API.Enable || config.GRPC.Enable) && tmNode != nil {
|
||||
// re-assign for making the client available below
|
||||
// do not use := to avoid shadowing clientCtx
|
||||
// Re-assign for making the client available below do not use := to avoid
|
||||
// shadowing the clientCtx variable.
|
||||
clientCtx = clientCtx.WithClient(local.New(tmNode))
|
||||
|
||||
app.RegisterTxService(clientCtx)
|
||||
@ -365,6 +358,59 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
|
||||
grpcSrv *grpc.Server
|
||||
)
|
||||
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// listen for quit signals so the calling parent process can gracefully exit
|
||||
ListenForQuitSignals(cancelFn, svrCtx.Logger)
|
||||
|
||||
if config.GRPC.Enable {
|
||||
_, port, err := net.SplitHostPort(config.GRPC.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxSendMsgSize := config.GRPC.MaxSendMsgSize
|
||||
if maxSendMsgSize == 0 {
|
||||
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
|
||||
}
|
||||
|
||||
maxRecvMsgSize := config.GRPC.MaxRecvMsgSize
|
||||
if maxRecvMsgSize == 0 {
|
||||
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
|
||||
}
|
||||
|
||||
grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)
|
||||
|
||||
// if gRPC is enabled, configure gRPC client for gRPC gateway
|
||||
grpcClient, err := grpc.Dial(
|
||||
grpcAddress,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
|
||||
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
|
||||
grpc.MaxCallSendMsgSize(maxSendMsgSize),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clientCtx = clientCtx.WithGRPCClient(grpcClient)
|
||||
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress)
|
||||
|
||||
grpcSrv, err = servergrpc.NewGRPCServer(clientCtx, app, config.GRPC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
|
||||
// that the server is gracefully shut down.
|
||||
g.Go(func() error {
|
||||
return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config.GRPC, grpcSrv)
|
||||
})
|
||||
}
|
||||
|
||||
if config.API.Enable {
|
||||
genDoc, err := genDocProvider()
|
||||
if err != nil {
|
||||
@ -373,89 +419,34 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
|
||||
|
||||
clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID)
|
||||
|
||||
if config.GRPC.Enable {
|
||||
_, port, err := net.SplitHostPort(config.GRPC.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxSendMsgSize := config.GRPC.MaxSendMsgSize
|
||||
if maxSendMsgSize == 0 {
|
||||
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
|
||||
}
|
||||
|
||||
maxRecvMsgSize := config.GRPC.MaxRecvMsgSize
|
||||
if maxRecvMsgSize == 0 {
|
||||
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
|
||||
}
|
||||
|
||||
grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)
|
||||
|
||||
// If grpc is enabled, configure grpc client for grpc gateway.
|
||||
grpcClient, err := grpc.Dial(
|
||||
grpcAddress,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultCallOptions(
|
||||
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
|
||||
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
|
||||
grpc.MaxCallSendMsgSize(maxSendMsgSize),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clientCtx = clientCtx.WithGRPCClient(grpcClient)
|
||||
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress)
|
||||
|
||||
// start grpc server
|
||||
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer grpcSrv.Stop()
|
||||
}
|
||||
|
||||
// configure api server
|
||||
apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server"), grpcSrv)
|
||||
apiSrv = api.New(clientCtx, svrCtx.Logger.With("module", "api-server"), grpcSrv)
|
||||
app.RegisterAPIRoutes(apiSrv, config.API)
|
||||
|
||||
if config.Telemetry.Enabled {
|
||||
apiSrv.SetTelemetry(metrics)
|
||||
}
|
||||
errCh := make(chan error)
|
||||
|
||||
go func() {
|
||||
if err := apiSrv.Start(config); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return err
|
||||
|
||||
case <-time.After(types.ServerStartTime): // assume server started successfully
|
||||
}
|
||||
g.Go(func() error {
|
||||
return apiSrv.Start(ctx, config)
|
||||
})
|
||||
}
|
||||
|
||||
// If gRPC is enabled but API is not, we need to start the gRPC server
|
||||
// without the API server. If the API server is enabled, we've already
|
||||
// started the grpc server.
|
||||
if config.GRPC.Enable && !config.API.Enable {
|
||||
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer grpcSrv.Stop()
|
||||
}
|
||||
|
||||
// At this point it is safe to block the process if we're in gRPC only mode as
|
||||
// At this point it is safe to block the process if we're in gRPC-only mode as
|
||||
// we do not need to handle any CometBFT related processes.
|
||||
if gRPCOnly {
|
||||
// wait for signal capture and gracefully return
|
||||
return WaitForQuitSignals()
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
// In case the operator has both gRPC and API servers disabled, there is
|
||||
// nothing blocking this root process, so we need to block manually, so we'll
|
||||
// create an empty blocking loop.
|
||||
g.Go(func() error {
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
})
|
||||
|
||||
// deferred cleanup function
|
||||
defer func() {
|
||||
if tmNode != nil && tmNode.IsRunning() {
|
||||
_ = tmNode.Stop()
|
||||
@ -464,58 +455,52 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
|
||||
if traceWriterCleanup != nil {
|
||||
traceWriterCleanup()
|
||||
}
|
||||
|
||||
if apiSrv != nil {
|
||||
_ = apiSrv.Close()
|
||||
}
|
||||
|
||||
ctx.Logger.Info("exiting...")
|
||||
}()
|
||||
|
||||
// wait for signal capture and gracefully return
|
||||
return WaitForQuitSignals()
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
|
||||
if !cfg.Telemetry.Enabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return telemetry.New(cfg.Telemetry)
|
||||
}
|
||||
|
||||
// wrapCPUProfile runs callback in a goroutine, then wait for quit signals.
|
||||
func wrapCPUProfile(ctx *Context, callback func() error) error {
|
||||
if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" {
|
||||
// wrapCPUProfile starts CPU profiling, if enabled, and executes the provided
|
||||
// callbackFn in a separate goroutine, then will wait for that callback to
|
||||
// return.
|
||||
//
|
||||
// NOTE: We expect the caller to handle graceful shutdown and signal handling.
|
||||
func wrapCPUProfile(svrCtx *Context, callbackFn func() error) error {
|
||||
if cpuProfile := svrCtx.Viper.GetString(flagCPUProfile); cpuProfile != "" {
|
||||
f, err := os.Create(cpuProfile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx.Logger.Info("starting CPU profiler", "profile", cpuProfile)
|
||||
svrCtx.Logger.Info("starting CPU profiler", "profile", cpuProfile)
|
||||
|
||||
if err := pprof.StartCPUProfile(f); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
ctx.Logger.Info("stopping CPU profiler", "profile", cpuProfile)
|
||||
svrCtx.Logger.Info("stopping CPU profiler", "profile", cpuProfile)
|
||||
pprof.StopCPUProfile()
|
||||
|
||||
if err := f.Close(); err != nil {
|
||||
ctx.Logger.Info("failed to close cpu-profile file", "profile", cpuProfile, "err", err.Error())
|
||||
svrCtx.Logger.Info("failed to close cpu-profile file", "profile", cpuProfile, "err", err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
errCh <- callback()
|
||||
errCh <- callbackFn()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return err
|
||||
|
||||
case <-time.After(types.ServerStartTime):
|
||||
}
|
||||
|
||||
return WaitForQuitSignals()
|
||||
return <-errCh
|
||||
}
|
||||
|
||||
@ -3,28 +3,21 @@ package types
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
dbm "github.com/cosmos/cosmos-db"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
storetypes "cosmossdk.io/store/types"
|
||||
abci "github.com/cometbft/cometbft/abci/types"
|
||||
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
|
||||
cmttypes "github.com/cometbft/cometbft/types"
|
||||
dbm "github.com/cosmos/cosmos-db"
|
||||
"github.com/cosmos/gogoproto/grpc"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
storetypes "cosmossdk.io/store/types"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
"github.com/cosmos/cosmos-sdk/server/api"
|
||||
"github.com/cosmos/cosmos-sdk/server/config"
|
||||
)
|
||||
|
||||
// ServerStartTime defines the time duration that the server need to stay running after startup
|
||||
// for the startup be considered successful
|
||||
const ServerStartTime = 5 * time.Second
|
||||
|
||||
type (
|
||||
// AppOptions defines an interface that is passed into an application
|
||||
// constructor, typically used to set BaseApp options that are either supplied
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -14,23 +15,22 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
dbm "github.com/cosmos/cosmos-db"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
"cosmossdk.io/store"
|
||||
"cosmossdk.io/store/snapshots"
|
||||
snapshottypes "cosmossdk.io/store/snapshots/types"
|
||||
storetypes "cosmossdk.io/store/types"
|
||||
cmtcmd "github.com/cometbft/cometbft/cmd/cometbft/commands"
|
||||
cmtcfg "github.com/cometbft/cometbft/config"
|
||||
cmtcli "github.com/cometbft/cometbft/libs/cli"
|
||||
cmtflags "github.com/cometbft/cometbft/libs/cli/flags"
|
||||
cmtlog "github.com/cometbft/cometbft/libs/log"
|
||||
dbm "github.com/cosmos/cosmos-db"
|
||||
"github.com/spf13/cast"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"cosmossdk.io/store"
|
||||
"cosmossdk.io/store/snapshots"
|
||||
snapshottypes "cosmossdk.io/store/snapshots/types"
|
||||
storetypes "cosmossdk.io/store/types"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
"github.com/cosmos/cosmos-sdk/client/flags"
|
||||
"github.com/cosmos/cosmos-sdk/server/config"
|
||||
@ -363,12 +363,22 @@ func TrapSignal(cleanupFunc func()) {
|
||||
}()
|
||||
}
|
||||
|
||||
// WaitForQuitSignals waits for SIGINT and SIGTERM and returns.
|
||||
func WaitForQuitSignals() ErrorCode {
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
sig := <-sigs
|
||||
return ErrorCode{Code: int(sig.(syscall.Signal)) + 128}
|
||||
// ListenForQuitSignals listens for SIGINT and SIGTERM. When a signal is received,
|
||||
// the cleanup function is called, indicating the caller can gracefully exit or
|
||||
// return.
|
||||
//
|
||||
// Note, this performs a non-blocking process so the caller must ensure the
|
||||
// corresponding context derived from the cancelFn is used correctly.
|
||||
func ListenForQuitSignals(cancelFn context.CancelFunc, logger log.Logger) {
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
sig := <-sigCh
|
||||
cancelFn()
|
||||
|
||||
logger.Info("caught signal", "signal", sig.String())
|
||||
}()
|
||||
}
|
||||
|
||||
// GetAppDBBackend gets the backend type to use for the application DBs.
|
||||
|
||||
@ -173,6 +173,7 @@ require (
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
|
||||
golang.org/x/net v0.7.0 // indirect
|
||||
golang.org/x/oauth2 v0.5.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
golang.org/x/term v0.5.0 // indirect
|
||||
golang.org/x/text v0.7.0 // indirect
|
||||
|
||||
@ -1247,6 +1247,7 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
@ -170,6 +170,7 @@ require (
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
|
||||
golang.org/x/net v0.7.0 // indirect
|
||||
golang.org/x/oauth2 v0.5.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
golang.org/x/term v0.5.0 // indirect
|
||||
golang.org/x/text v0.7.0 // indirect
|
||||
|
||||
@ -1248,6 +1248,7 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
@ -19,15 +19,15 @@ import (
|
||||
"cosmossdk.io/log"
|
||||
sdkmath "cosmossdk.io/math"
|
||||
"cosmossdk.io/math/unsafe"
|
||||
pruningtypes "cosmossdk.io/store/pruning/types"
|
||||
cmtlog "github.com/cometbft/cometbft/libs/log"
|
||||
"github.com/cometbft/cometbft/node"
|
||||
cmtclient "github.com/cometbft/cometbft/rpc/client"
|
||||
dbm "github.com/cosmos/cosmos-db"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
pruningtypes "cosmossdk.io/store/pruning/types"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
"github.com/cosmos/cosmos-sdk/client/grpc/cmtservice"
|
||||
@ -261,10 +261,12 @@ type (
|
||||
ValAddress sdk.ValAddress
|
||||
RPCClient cmtclient.Client
|
||||
|
||||
tmNode *node.Node
|
||||
api *api.Server
|
||||
grpc *grpc.Server
|
||||
grpcWeb *http.Server
|
||||
tmNode *node.Node
|
||||
api *api.Server
|
||||
grpc *grpc.Server
|
||||
grpcWeb *http.Server
|
||||
errGroup *errgroup.Group
|
||||
cancelFn context.CancelFunc
|
||||
}
|
||||
|
||||
// ValidatorI expose a validator's context and configuration
|
||||
@ -734,24 +736,25 @@ func (n *Network) Cleanup() {
|
||||
n.Logger.Log("cleaning up test network...")
|
||||
|
||||
for _, v := range n.Validators {
|
||||
// cancel the validator's context which will signal to the gRPC and API
|
||||
// goroutines that they should gracefully exit.
|
||||
v.cancelFn()
|
||||
|
||||
if err := v.errGroup.Wait(); err != nil {
|
||||
n.Logger.Log("unexpected error waiting for validator gRPC and API processes to exit", "err", err)
|
||||
}
|
||||
|
||||
if v.tmNode != nil && v.tmNode.IsRunning() {
|
||||
_ = v.tmNode.Stop()
|
||||
}
|
||||
|
||||
if v.api != nil {
|
||||
_ = v.api.Close()
|
||||
}
|
||||
|
||||
if v.grpc != nil {
|
||||
v.grpc.Stop()
|
||||
if v.grpcWeb != nil {
|
||||
_ = v.grpcWeb.Close()
|
||||
if err := v.tmNode.Stop(); err != nil {
|
||||
n.Logger.Log("failed to stop validator CometBFT node", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if v.grpcWeb != nil {
|
||||
_ = v.grpcWeb.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Give a brief pause for things to finish closing in other processes. Hopefully this helps with the address-in-use errors.
|
||||
// 100ms chosen randomly.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if n.Config.CleanupDir {
|
||||
|
||||
@ -1,12 +1,12 @@
|
||||
package network
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/cometbft/cometbft/node"
|
||||
"github.com/cometbft/cometbft/p2p"
|
||||
@ -15,10 +15,10 @@ import (
|
||||
"github.com/cometbft/cometbft/rpc/client/local"
|
||||
cmttypes "github.com/cometbft/cometbft/types"
|
||||
cmttime "github.com/cometbft/cometbft/types/time"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/server/api"
|
||||
servergrpc "github.com/cosmos/cosmos-sdk/server/grpc"
|
||||
srvtypes "github.com/cosmos/cosmos-sdk/server/types"
|
||||
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
|
||||
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
|
||||
"github.com/cosmos/cosmos-sdk/x/genutil"
|
||||
@ -82,12 +82,24 @@ func startInProcess(cfg Config, val *Validator) error {
|
||||
app.RegisterNodeService(val.ClientCtx)
|
||||
}
|
||||
|
||||
if val.AppConfig.GRPC.Enable {
|
||||
grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC)
|
||||
ctx := context.Background()
|
||||
ctx, val.cancelFn = context.WithCancel(ctx)
|
||||
val.errGroup, ctx = errgroup.WithContext(ctx)
|
||||
|
||||
grpcCfg := val.AppConfig.GRPC
|
||||
|
||||
if grpcCfg.Enable {
|
||||
grpcSrv, err := servergrpc.NewGRPCServer(val.ClientCtx, app, grpcCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
|
||||
// that the server is gracefully shut down.
|
||||
val.errGroup.Go(func() error {
|
||||
return servergrpc.StartGRPCServer(ctx, logger.With("module", "grpc-server"), grpcCfg, grpcSrv)
|
||||
})
|
||||
|
||||
val.grpc = grpcSrv
|
||||
}
|
||||
|
||||
@ -95,19 +107,9 @@ func startInProcess(cfg Config, val *Validator) error {
|
||||
apiSrv := api.New(val.ClientCtx, logger.With("module", "api-server"), val.grpc)
|
||||
app.RegisterAPIRoutes(apiSrv, val.AppConfig.API)
|
||||
|
||||
errCh := make(chan error)
|
||||
|
||||
go func() {
|
||||
if err := apiSrv.Start(*val.AppConfig); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return err
|
||||
case <-time.After(srvtypes.ServerStartTime): // assume server started successfully
|
||||
}
|
||||
val.errGroup.Go(func() error {
|
||||
return apiSrv.Start(ctx, *val.AppConfig)
|
||||
})
|
||||
|
||||
val.api = apiSrv
|
||||
}
|
||||
|
||||
@ -139,6 +139,7 @@ require (
|
||||
go.etcd.io/bbolt v1.3.6 // indirect
|
||||
golang.org/x/crypto v0.6.0 // indirect
|
||||
golang.org/x/net v0.7.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
golang.org/x/term v0.5.0 // indirect
|
||||
golang.org/x/text v0.7.0 // indirect
|
||||
|
||||
@ -995,6 +995,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
@ -139,6 +139,7 @@ require (
|
||||
golang.org/x/crypto v0.6.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
|
||||
golang.org/x/net v0.7.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
golang.org/x/term v0.5.0 // indirect
|
||||
golang.org/x/text v0.7.0 // indirect
|
||||
|
||||
@ -991,6 +991,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
@ -141,6 +141,7 @@ require (
|
||||
golang.org/x/crypto v0.6.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
|
||||
golang.org/x/net v0.7.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
golang.org/x/term v0.5.0 // indirect
|
||||
golang.org/x/text v0.7.0 // indirect
|
||||
|
||||
@ -994,6 +994,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
@ -138,6 +138,7 @@ require (
|
||||
golang.org/x/crypto v0.6.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
|
||||
golang.org/x/net v0.7.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
golang.org/x/term v0.5.0 // indirect
|
||||
golang.org/x/text v0.7.0 // indirect
|
||||
|
||||
@ -991,6 +991,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
@ -162,6 +162,7 @@ require (
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
|
||||
golang.org/x/net v0.7.0 // indirect
|
||||
golang.org/x/oauth2 v0.5.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
golang.org/x/term v0.5.0 // indirect
|
||||
golang.org/x/text v0.7.0 // indirect
|
||||
|
||||
@ -1247,6 +1247,7 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
Loading…
Reference in New Issue
Block a user