feat(server/v2/cometbft): config (#20989)
This commit is contained in:
parent
4b3a0b0afe
commit
8484dc50e2
@ -51,6 +51,11 @@ type App[T transaction.Tx] struct {
|
||||
GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
|
||||
}
|
||||
|
||||
// Name returns the app name.
|
||||
func (a *App[T]) Name() string {
|
||||
return a.config.AppName
|
||||
}
|
||||
|
||||
// Logger returns the app logger.
|
||||
func (a *App[T]) Logger() log.Logger {
|
||||
return a.logger
|
||||
|
||||
@ -117,7 +117,6 @@ func (a *AppBuilder[T]) Build(opts ...AppBuilderOption[T]) (*App[T], error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create STF: %w", err)
|
||||
}
|
||||
|
||||
a.app.stf = stf
|
||||
|
||||
rs, err := rootstore.CreateRootStore(a.storeOptions)
|
||||
|
||||
@ -34,7 +34,7 @@ func New[T transaction.Tx](cfgOptions ...CfgOption) *GRPCServer[T] {
|
||||
func (s *GRPCServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error {
|
||||
cfg := s.Config().(*Config)
|
||||
if v != nil {
|
||||
if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil {
|
||||
if err := serverv2.UnmarshalSubConfig(v, s.Name(), &cfg); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,7 +84,7 @@ func (s *GRPCGatewayServer[T]) Config() any {
|
||||
func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], v *viper.Viper, logger log.Logger) error {
|
||||
cfg := s.Config().(*Config)
|
||||
if v != nil {
|
||||
if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil {
|
||||
if err := serverv2.UnmarshalSubConfig(v, s.Name(), &cfg); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -32,18 +32,22 @@ import (
|
||||
var _ abci.Application = (*Consensus[transaction.Tx])(nil)
|
||||
|
||||
type Consensus[T transaction.Tx] struct {
|
||||
// legacy support for gRPC
|
||||
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
|
||||
logger log.Logger
|
||||
appName, version string
|
||||
consensusAuthority string // Set by the application to grant authority to the consensus engine to send messages to the consensus module
|
||||
app *appmanager.AppManager[T]
|
||||
txCodec transaction.Codec[T]
|
||||
store types.Store
|
||||
streaming streaming.Manager
|
||||
snapshotManager *snapshots.Manager
|
||||
mempool mempool.Mempool[T]
|
||||
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) // legacy support for gRPC
|
||||
|
||||
app *appmanager.AppManager[T]
|
||||
cfg Config
|
||||
store types.Store
|
||||
logger log.Logger
|
||||
txCodec transaction.Codec[T]
|
||||
streaming streaming.Manager
|
||||
snapshotManager *snapshots.Manager
|
||||
mempool mempool.Mempool[T]
|
||||
cfg Config
|
||||
indexedEvents map[string]struct{}
|
||||
chainID string
|
||||
|
||||
initialHeight uint64
|
||||
// this is only available after this node has committed a block (in FinalizeBlock),
|
||||
// otherwise it will be empty and we will need to query the app for the last
|
||||
// committed block.
|
||||
@ -54,19 +58,26 @@ type Consensus[T transaction.Tx] struct {
|
||||
verifyVoteExt handlers.VerifyVoteExtensionhandler
|
||||
extendVote handlers.ExtendVoteHandler
|
||||
|
||||
chainID string
|
||||
addrPeerFilter types.PeerFilter // filter peers by address and port
|
||||
idPeerFilter types.PeerFilter // filter peers by node ID
|
||||
}
|
||||
|
||||
func NewConsensus[T transaction.Tx](
|
||||
logger log.Logger,
|
||||
appName string,
|
||||
consensusAuthority string,
|
||||
app *appmanager.AppManager[T],
|
||||
mp mempool.Mempool[T],
|
||||
indexedEvents map[string]struct{},
|
||||
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error),
|
||||
store types.Store,
|
||||
cfg Config,
|
||||
txCodec transaction.Codec[T],
|
||||
logger log.Logger,
|
||||
) *Consensus[T] {
|
||||
return &Consensus[T]{
|
||||
appName: appName,
|
||||
version: getCometBFTServerVersion(),
|
||||
consensusAuthority: consensusAuthority,
|
||||
grpcQueryDecoders: grpcQueryDecoders,
|
||||
app: app,
|
||||
cfg: cfg,
|
||||
@ -82,27 +93,24 @@ func NewConsensus[T transaction.Tx](
|
||||
verifyVoteExt: nil,
|
||||
extendVote: nil,
|
||||
chainID: "",
|
||||
indexedEvents: indexedEvents,
|
||||
initialHeight: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// SetStreamingManager sets the streaming manager for the consensus module.
|
||||
func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
|
||||
c.streaming = sm
|
||||
}
|
||||
|
||||
// SetSnapshotManager sets the snapshot manager for the Consensus.
|
||||
// The snapshot manager is responsible for managing snapshots of the Consensus state.
|
||||
// It allows for creating, storing, and restoring snapshots of the Consensus state.
|
||||
// The provided snapshot manager will be used by the Consensus to handle snapshots.
|
||||
func (c *Consensus[T]) SetSnapshotManager(sm *snapshots.Manager) {
|
||||
c.snapshotManager = sm
|
||||
}
|
||||
|
||||
// RegisterExtensions registers the given extensions with the consensus module's snapshot manager.
|
||||
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
|
||||
func (c *Consensus[T]) RegisterExtensions(extensions ...snapshots.ExtensionSnapshotter) {
|
||||
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
|
||||
if err := c.snapshotManager.RegisterExtensions(extensions...); err != nil {
|
||||
panic(fmt.Errorf("failed to register snapshot extensions: %w", err))
|
||||
return fmt.Errorf("failed to register snapshot extensions: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CheckTx implements types.Application.
|
||||
@ -122,7 +130,7 @@ func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
|
||||
Code: resp.Code,
|
||||
GasWanted: uint64ToInt64(resp.GasWanted),
|
||||
GasUsed: uint64ToInt64(resp.GasUsed),
|
||||
Events: intoABCIEvents(resp.Events, c.cfg.IndexEvents),
|
||||
Events: intoABCIEvents(resp.Events, c.indexedEvents),
|
||||
Info: resp.Info,
|
||||
Data: resp.Data,
|
||||
Log: resp.Log,
|
||||
@ -144,7 +152,7 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
|
||||
|
||||
// cp, err := c.GetConsensusParams(ctx)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
cid, err := c.store.LastCommitID()
|
||||
@ -153,10 +161,9 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
|
||||
}
|
||||
|
||||
return &abciproto.InfoResponse{
|
||||
Data: c.cfg.Name,
|
||||
Version: c.cfg.Version,
|
||||
// AppVersion: cp.GetVersion().App,
|
||||
AppVersion: 0, // TODO fetch from store?
|
||||
Data: c.appName,
|
||||
Version: c.version,
|
||||
AppVersion: 0, // TODO fetch consensus params?
|
||||
LastBlockHeight: int64(version),
|
||||
LastBlockAppHash: cid.Hash,
|
||||
}, nil
|
||||
@ -173,7 +180,6 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
|
||||
return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
|
||||
}
|
||||
res, err := c.app.Query(ctx, uint64(req.Height), protoRequest)
|
||||
|
||||
if err != nil {
|
||||
resp := queryResult(err)
|
||||
resp.Height = req.Height
|
||||
@ -188,7 +194,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
|
||||
// it must be an app/p2p/store query
|
||||
path := splitABCIQueryPath(req.Path)
|
||||
if len(path) == 0 {
|
||||
return QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "no query path provided"), c.cfg.Trace), nil
|
||||
return QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "no query path provided"), c.cfg.AppTomlConfig.Trace), nil
|
||||
}
|
||||
|
||||
switch path[0] {
|
||||
@ -202,11 +208,11 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
|
||||
resp, err = c.handleQueryP2P(path)
|
||||
|
||||
default:
|
||||
resp = QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "unknown query path"), c.cfg.Trace)
|
||||
resp = QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "unknown query path"), c.cfg.AppTomlConfig.Trace)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return QueryResult(err, c.cfg.Trace), nil
|
||||
return QueryResult(err, c.cfg.AppTomlConfig.Trace), nil
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
@ -218,17 +224,17 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe
|
||||
|
||||
// store chainID to be used later on in execution
|
||||
c.chainID = req.ChainId
|
||||
// TODO: check if we need to load the config from genesis.json or config.toml
|
||||
c.cfg.InitialHeight = uint64(req.InitialHeight)
|
||||
|
||||
// On a new chain, we consider the init chain block height as 0, even though
|
||||
// req.InitialHeight is 1 by default.
|
||||
// TODO
|
||||
// TODO: check if we need to load the config from genesis.json or config.toml
|
||||
c.initialHeight = uint64(req.InitialHeight)
|
||||
if c.initialHeight == 0 { // If initial height is 0, set it to 1
|
||||
c.initialHeight = 1
|
||||
}
|
||||
|
||||
var consMessages []transaction.Msg
|
||||
if req.ConsensusParams != nil {
|
||||
consMessages = append(consMessages, &consensustypes.MsgUpdateParams{
|
||||
Authority: c.cfg.ConsensusAuthority,
|
||||
Authority: c.consensusAuthority,
|
||||
Block: req.ConsensusParams.Block,
|
||||
Evidence: req.ConsensusParams.Evidence,
|
||||
Validator: req.ConsensusParams.Validator,
|
||||
@ -394,7 +400,7 @@ func (c *Consensus[T]) FinalizeBlock(
|
||||
|
||||
// TODO evaluate this approach vs. service using context.
|
||||
// cometInfo := &consensustypes.MsgUpdateCometInfo{
|
||||
// Authority: c.cfg.ConsensusAuthority,
|
||||
// Authority: c.consensusAuthority,
|
||||
// CometInfo: &consensustypes.CometInfo{
|
||||
// Evidence: req.Misbehavior,
|
||||
// ValidatorsHash: req.NextValidatorsHash,
|
||||
@ -411,7 +417,7 @@ func (c *Consensus[T]) FinalizeBlock(
|
||||
// })
|
||||
|
||||
// we don't need to deliver the block in the genesis block
|
||||
if req.Height == int64(c.cfg.InitialHeight) {
|
||||
if req.Height == int64(c.initialHeight) {
|
||||
appHash, err := c.store.Commit(store.NewChangeset())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to commit the changeset: %w", err)
|
||||
@ -495,7 +501,7 @@ func (c *Consensus[T]) FinalizeBlock(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return finalizeBlockResponse(resp, cp, appHash, c.cfg.IndexEvents)
|
||||
return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents)
|
||||
}
|
||||
|
||||
// Commit implements types.Application.
|
||||
|
||||
@ -19,7 +19,6 @@ import (
|
||||
"sigs.k8s.io/yaml"
|
||||
|
||||
"cosmossdk.io/server/v2/cometbft/client/rpc"
|
||||
auth "cosmossdk.io/x/auth/client/cli"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
|
||||
@ -29,7 +28,7 @@ import (
|
||||
)
|
||||
|
||||
func (s *CometBFTServer[T]) rpcClient(cmd *cobra.Command) (rpc.CometRPC, error) {
|
||||
if s.config.Standalone {
|
||||
if s.config.AppTomlConfig.Standalone {
|
||||
client, err := rpchttp.New(client.GetConfigFromCmd(cmd).RPC.ListenAddress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -201,10 +200,10 @@ for. Each module documents its respective events under 'xx_events.md'.
|
||||
return err
|
||||
}
|
||||
|
||||
query, _ := cmd.Flags().GetString(auth.FlagQuery)
|
||||
query, _ := cmd.Flags().GetString(FlagQuery)
|
||||
page, _ := cmd.Flags().GetInt(FlagPage)
|
||||
limit, _ := cmd.Flags().GetInt(FlagLimit)
|
||||
orderBy, _ := cmd.Flags().GetString(auth.FlagOrderBy)
|
||||
orderBy, _ := cmd.Flags().GetString(FlagOrderBy)
|
||||
|
||||
blocks, err := rpc.QueryBlocks(cmd.Context(), rpcclient, page, limit, query, orderBy)
|
||||
if err != nil {
|
||||
@ -223,9 +222,9 @@ for. Each module documents its respective events under 'xx_events.md'.
|
||||
AddQueryFlagsToCmd(cmd)
|
||||
cmd.Flags().Int(FlagPage, query.DefaultPage, "Query a specific page of paginated results")
|
||||
cmd.Flags().Int(FlagLimit, query.DefaultLimit, "Query number of transactions results per page returned")
|
||||
cmd.Flags().String(auth.FlagQuery, "", "The blocks events query per CometBFT's query semantics")
|
||||
cmd.Flags().String(auth.FlagOrderBy, "", "The ordering semantics (asc|dsc)")
|
||||
_ = cmd.MarkFlagRequired(auth.FlagQuery)
|
||||
cmd.Flags().String(FlagQuery, "", "The blocks events query per CometBFT's query semantics")
|
||||
cmd.Flags().String(FlagOrderBy, "", "The ordering semantics (asc|dsc)")
|
||||
_ = cmd.MarkFlagRequired(FlagQuery)
|
||||
|
||||
return cmd
|
||||
}
|
||||
@ -240,11 +239,11 @@ func (s *CometBFTServer[T]) QueryBlockCmd() *cobra.Command {
|
||||
$ %s query block --%s=%s <height>
|
||||
$ %s query block --%s=%s <hash>
|
||||
`,
|
||||
version.AppName, auth.FlagType, auth.TypeHeight,
|
||||
version.AppName, auth.FlagType, auth.TypeHash)),
|
||||
version.AppName, FlagType, TypeHeight,
|
||||
version.AppName, FlagType, TypeHash)),
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
typ, _ := cmd.Flags().GetString(auth.FlagType)
|
||||
typ, _ := cmd.Flags().GetString(FlagType)
|
||||
|
||||
rpcclient, err := s.rpcClient(cmd)
|
||||
if err != nil {
|
||||
@ -252,7 +251,7 @@ $ %s query block --%s=%s <hash>
|
||||
}
|
||||
|
||||
switch typ {
|
||||
case auth.TypeHeight:
|
||||
case TypeHeight:
|
||||
if args[0] == "" {
|
||||
return fmt.Errorf("argument should be a block height")
|
||||
}
|
||||
@ -282,7 +281,7 @@ $ %s query block --%s=%s <hash>
|
||||
|
||||
return printOutput(cmd, bz)
|
||||
|
||||
case auth.TypeHash:
|
||||
case TypeHash:
|
||||
|
||||
if args[0] == "" {
|
||||
return fmt.Errorf("argument should be a tx hash")
|
||||
@ -306,13 +305,13 @@ $ %s query block --%s=%s <hash>
|
||||
return printOutput(cmd, bz)
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown --%s value %s", auth.FlagType, typ)
|
||||
return fmt.Errorf("unknown --%s value %s", FlagType, typ)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
AddQueryFlagsToCmd(cmd)
|
||||
cmd.Flags().String(auth.FlagType, auth.TypeHash, fmt.Sprintf("The type to be used when querying tx, can be one of \"%s\", \"%s\"", auth.TypeHeight, auth.TypeHash))
|
||||
cmd.Flags().String(FlagType, TypeHash, fmt.Sprintf("The type to be used when querying tx, can be one of \"%s\", \"%s\"", TypeHeight, TypeHash))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
@ -5,54 +5,62 @@ import (
|
||||
"github.com/spf13/viper"
|
||||
|
||||
serverv2 "cosmossdk.io/server/v2"
|
||||
"cosmossdk.io/server/v2/cometbft/types"
|
||||
)
|
||||
|
||||
// TODO REDO/VERIFY THIS
|
||||
// Config is the configuration for the CometBFT application
|
||||
type Config struct {
|
||||
AppTomlConfig *AppTomlConfig
|
||||
ConfigTomlConfig *cmtcfg.Config
|
||||
}
|
||||
|
||||
func GetConfigFromViper(v *viper.Viper) *cmtcfg.Config {
|
||||
conf := cmtcfg.DefaultConfig()
|
||||
err := v.Unmarshal(conf)
|
||||
func DefaultAppTomlConfig() *AppTomlConfig {
|
||||
return &AppTomlConfig{
|
||||
MinRetainBlocks: 0,
|
||||
IndexEvents: make([]string, 0),
|
||||
HaltHeight: 0,
|
||||
HaltTime: 0,
|
||||
Address: "tcp://127.0.0.1:26658",
|
||||
Transport: "socket",
|
||||
Trace: false,
|
||||
Standalone: false,
|
||||
}
|
||||
}
|
||||
|
||||
type AppTomlConfig struct {
|
||||
MinRetainBlocks uint64 `mapstructure:"min_retain_blocks" toml:"min_retain_blocks" comment:"min_retain_blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
|
||||
IndexEvents []string `mapstructure:"index_events" toml:"index_events" comment:"index_events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
|
||||
HaltHeight uint64 `mapstructure:"halt_height" toml:"halt_height" comment:"halt_height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
|
||||
HaltTime uint64 `mapstructure:"halt_time" toml:"halt_time" comment:"halt_time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
|
||||
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
|
||||
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
|
||||
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
|
||||
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`
|
||||
}
|
||||
|
||||
// CfgOption is a function that allows to overwrite the default server configuration.
|
||||
type CfgOption func(*Config)
|
||||
|
||||
// OverwriteDefaultConfigTomlConfig overwrites the default comet config with the new config.
|
||||
func OverwriteDefaultConfigTomlConfig(newCfg *cmtcfg.Config) CfgOption {
|
||||
return func(cfg *Config) {
|
||||
cfg.ConfigTomlConfig = newCfg // nolint:ineffassign,staticcheck // We want to overwrite everything
|
||||
}
|
||||
}
|
||||
|
||||
// OverwriteDefaultAppTomlConfig overwrites the default comet config with the new config.
|
||||
func OverwriteDefaultAppTomlConfig(newCfg *AppTomlConfig) CfgOption {
|
||||
return func(cfg *Config) {
|
||||
cfg.AppTomlConfig = newCfg // nolint:ineffassign,staticcheck // We want to overwrite everything
|
||||
}
|
||||
}
|
||||
|
||||
func getConfigTomlFromViper(v *viper.Viper) *cmtcfg.Config {
|
||||
rootDir := v.GetString(serverv2.FlagHome)
|
||||
if err != nil {
|
||||
|
||||
conf := cmtcfg.DefaultConfig()
|
||||
if err := v.Unmarshal(conf); err != nil {
|
||||
return cmtcfg.DefaultConfig().SetRoot(rootDir)
|
||||
}
|
||||
|
||||
return conf.SetRoot(rootDir)
|
||||
}
|
||||
|
||||
// Config is the configuration for the CometBFT application
|
||||
type Config struct {
|
||||
// app.toml config options
|
||||
Name string `mapstructure:"name" toml:"name"`
|
||||
Version string `mapstructure:"version" toml:"version"`
|
||||
InitialHeight uint64 `mapstructure:"initial_height" toml:"initial_height"`
|
||||
MinRetainBlocks uint64 `mapstructure:"min_retain_blocks" toml:"min_retain_blocks"`
|
||||
IndexEvents map[string]struct{} `mapstructure:"index_events" toml:"index_events"`
|
||||
HaltHeight uint64 `mapstructure:"halt_height" toml:"halt_height"`
|
||||
HaltTime uint64 `mapstructure:"halt_time" toml:"halt_time"`
|
||||
// end of app.toml config options
|
||||
|
||||
AddrPeerFilter types.PeerFilter // filter peers by address and port
|
||||
IdPeerFilter types.PeerFilter // filter peers by node ID
|
||||
|
||||
Transport string `mapstructure:"transport" toml:"transport"`
|
||||
Addr string `mapstructure:"addr" toml:"addr"`
|
||||
Standalone bool `mapstructure:"standalone" toml:"standalone"`
|
||||
Trace bool `mapstructure:"trace" toml:"trace"`
|
||||
// Must be set by the application to grant authority to the consensus engine to send messages to the consensus module
|
||||
ConsensusAuthority string
|
||||
|
||||
// config.toml
|
||||
CmtConfig *cmtcfg.Config
|
||||
}
|
||||
|
||||
// CmtCfgOption is a function that allows to overwrite the default server configuration.
|
||||
type CmtCfgOption func(*cmtcfg.Config)
|
||||
|
||||
// OverwriteDefaultCometConfig overwrites the default comet config with the new config.
|
||||
func OverwriteDefaultCometConfig(newCfg *cmtcfg.Config) CmtCfgOption {
|
||||
return func(cfg *cmtcfg.Config) { // nolint:staticcheck // We want to overwrite everything
|
||||
cfg = newCfg // nolint:ineffassign,staticcheck // We want to overwrite everything
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,26 +2,11 @@ package cometbft
|
||||
|
||||
import "github.com/spf13/cobra"
|
||||
|
||||
// Query flags
|
||||
const (
|
||||
FlagQuery = "query"
|
||||
FlagType = "type"
|
||||
FlagOrderBy = "order_by"
|
||||
)
|
||||
|
||||
const (
|
||||
FlagWithComet = "with-comet"
|
||||
FlagAddress = "address"
|
||||
FlagTransport = "transport"
|
||||
FlagTraceStore = "trace-store"
|
||||
FlagCPUProfile = "cpu-profile"
|
||||
FlagMinGasPrices = "minimum-gas-prices"
|
||||
FlagQueryGasLimit = "query-gas-limit"
|
||||
FlagHaltHeight = "halt-height"
|
||||
FlagHaltTime = "halt-time"
|
||||
FlagTrace = "trace"
|
||||
)
|
||||
|
||||
const (
|
||||
FlagQuery = "query"
|
||||
FlagType = "type"
|
||||
FlagOrderBy = "order_by"
|
||||
FlagChainID = "chain-id"
|
||||
FlagNode = "node"
|
||||
FlagGRPC = "grpc-addr"
|
||||
@ -30,6 +15,8 @@ const (
|
||||
FlagPage = "page"
|
||||
FlagLimit = "limit"
|
||||
FlagOutput = "output"
|
||||
TypeHash = "hash"
|
||||
TypeHeight = "height"
|
||||
)
|
||||
|
||||
// List of supported output formats
|
||||
@ -50,3 +37,13 @@ func AddQueryFlagsToCmd(cmd *cobra.Command) {
|
||||
// hence the flag should not be required for those commands
|
||||
_ = cmd.MarkFlagRequired(FlagChainID)
|
||||
}
|
||||
|
||||
// Server flags
|
||||
const (
|
||||
Standalone = "standalone"
|
||||
FlagAddress = "address"
|
||||
FlagTransport = "transport"
|
||||
FlagHaltHeight = "halt-height"
|
||||
FlagHaltTime = "halt-time"
|
||||
FlagTrace = "trace"
|
||||
)
|
||||
|
||||
@ -29,7 +29,6 @@ require (
|
||||
cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000
|
||||
cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000
|
||||
cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000
|
||||
cosmossdk.io/x/auth v0.0.0-00010101000000-000000000000
|
||||
cosmossdk.io/x/consensus v0.0.0-00010101000000-000000000000
|
||||
github.com/cometbft/cometbft v1.0.0-rc1
|
||||
github.com/cometbft/cometbft/api v1.0.0-rc.1
|
||||
@ -56,6 +55,7 @@ require (
|
||||
cosmossdk.io/math v1.3.0 // indirect
|
||||
cosmossdk.io/schema v0.1.1 // indirect
|
||||
cosmossdk.io/store v1.1.1-0.20240418092142-896cdf1971bc // indirect
|
||||
cosmossdk.io/x/auth v0.0.0-00010101000000-000000000000 // indirect
|
||||
cosmossdk.io/x/bank v0.0.0-20240226161501-23359a0b6d91 // indirect
|
||||
cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000 // indirect
|
||||
cosmossdk.io/x/tx v0.13.3 // indirect
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"cosmossdk.io/core/transaction"
|
||||
"cosmossdk.io/server/v2/cometbft/handlers"
|
||||
"cosmossdk.io/server/v2/cometbft/mempool"
|
||||
"cosmossdk.io/server/v2/cometbft/types"
|
||||
"cosmossdk.io/store/v2/snapshots"
|
||||
)
|
||||
|
||||
@ -16,6 +17,9 @@ type ServerOptions[T transaction.Tx] struct {
|
||||
ExtendVoteHandler handlers.ExtendVoteHandler
|
||||
|
||||
SnapshotOptions snapshots.SnapshotOptions
|
||||
|
||||
AddrPeerFilter types.PeerFilter // filter peers by address and port
|
||||
IdPeerFilter types.PeerFilter // filter peers by node ID
|
||||
}
|
||||
|
||||
// DefaultServerOptions returns the default server options.
|
||||
@ -28,5 +32,7 @@ func DefaultServerOptions[T transaction.Tx]() ServerOptions[T] {
|
||||
VerifyVoteExtensionHandler: handlers.NoOpVerifyVoteExtensionHandler(),
|
||||
ExtendVoteHandler: handlers.NoOpExtendVote(),
|
||||
SnapshotOptions: snapshots.NewSnapshotOptions(0, 0),
|
||||
AddrPeerFilter: nil,
|
||||
IdPeerFilter: nil,
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,13 +20,14 @@ func (c *Consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error
|
||||
|
||||
cmd, typ, arg := path[1], path[2], path[3]
|
||||
if cmd == "filter" {
|
||||
if typ == "addr" {
|
||||
if c.cfg.AddrPeerFilter != nil {
|
||||
return c.cfg.AddrPeerFilter(arg)
|
||||
switch typ {
|
||||
case "addr":
|
||||
if c.addrPeerFilter != nil {
|
||||
return c.addrPeerFilter(arg)
|
||||
}
|
||||
} else if typ == "id" {
|
||||
if c.cfg.IdPeerFilter != nil {
|
||||
return c.cfg.IdPeerFilter(arg)
|
||||
case "id":
|
||||
if c.idPeerFilter != nil {
|
||||
return c.idPeerFilter(arg)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -61,7 +62,7 @@ func (c *Consensus[T]) handlerQueryApp(ctx context.Context, path []string, req *
|
||||
return nil, errorsmod.Wrap(err, "failed to simulate tx")
|
||||
}
|
||||
|
||||
bz, err := intoABCISimulationResponse(txResult, c.cfg.IndexEvents)
|
||||
bz, err := intoABCISimulationResponse(txResult, c.indexedEvents)
|
||||
if err != nil {
|
||||
return nil, errorsmod.Wrap(err, "failed to marshal txResult")
|
||||
}
|
||||
@ -75,7 +76,7 @@ func (c *Consensus[T]) handlerQueryApp(ctx context.Context, path []string, req *
|
||||
case "version":
|
||||
return &abci.QueryResponse{
|
||||
Codespace: cometerrors.RootCodespace,
|
||||
Value: []byte(c.cfg.Version),
|
||||
Value: []byte(c.version),
|
||||
Height: req.Height,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -18,7 +18,6 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
corectx "cosmossdk.io/core/context"
|
||||
"cosmossdk.io/core/log"
|
||||
"cosmossdk.io/core/transaction"
|
||||
serverv2 "cosmossdk.io/server/v2"
|
||||
@ -39,47 +38,71 @@ type CometBFTServer[T transaction.Tx] struct {
|
||||
Node *node.Node
|
||||
Consensus *Consensus[T]
|
||||
|
||||
initTxCodec transaction.Codec[T]
|
||||
logger log.Logger
|
||||
config Config
|
||||
options ServerOptions[T]
|
||||
cmtConfigOptions []CmtCfgOption
|
||||
initTxCodec transaction.Codec[T]
|
||||
logger log.Logger
|
||||
serverOptions ServerOptions[T]
|
||||
config Config
|
||||
cfgOptions []CfgOption
|
||||
}
|
||||
|
||||
func New[T transaction.Tx](txCodec transaction.Codec[T], options ServerOptions[T], cfgOptions ...CmtCfgOption) *CometBFTServer[T] {
|
||||
func New[T transaction.Tx](txCodec transaction.Codec[T], serverOptions ServerOptions[T], cfgOptions ...CfgOption) *CometBFTServer[T] {
|
||||
return &CometBFTServer[T]{
|
||||
initTxCodec: txCodec,
|
||||
options: options,
|
||||
cmtConfigOptions: cfgOptions,
|
||||
initTxCodec: txCodec,
|
||||
serverOptions: serverOptions,
|
||||
cfgOptions: cfgOptions,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error {
|
||||
s.config = Config{CmtConfig: GetConfigFromViper(v), ConsensusAuthority: appI.GetConsensusAuthority()}
|
||||
// get configs (app.toml + config.toml) from viper
|
||||
appTomlConfig := s.Config().(*AppTomlConfig)
|
||||
if v != nil {
|
||||
if err := serverv2.UnmarshalSubConfig(v, s.Name(), &appTomlConfig); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal config: %w", err)
|
||||
}
|
||||
}
|
||||
s.config = Config{
|
||||
ConfigTomlConfig: getConfigTomlFromViper(v),
|
||||
AppTomlConfig: appTomlConfig,
|
||||
}
|
||||
|
||||
indexEvents := make(map[string]struct{}, len(s.config.AppTomlConfig.IndexEvents))
|
||||
for _, e := range s.config.AppTomlConfig.IndexEvents {
|
||||
indexEvents[e] = struct{}{}
|
||||
}
|
||||
|
||||
s.logger = logger.With(log.ModuleKey, s.Name())
|
||||
|
||||
// create consensus
|
||||
store := appI.GetStore().(types.Store)
|
||||
consensus := NewConsensus[T](appI.GetAppManager(), s.options.Mempool, appI.GetGRPCQueryDecoders(), store, s.config, s.initTxCodec, s.logger)
|
||||
|
||||
consensus.prepareProposalHandler = s.options.PrepareProposalHandler
|
||||
consensus.processProposalHandler = s.options.ProcessProposalHandler
|
||||
consensus.verifyVoteExt = s.options.VerifyVoteExtensionHandler
|
||||
consensus.extendVote = s.options.ExtendVoteHandler
|
||||
consensus := NewConsensus(
|
||||
s.logger,
|
||||
appI.Name(),
|
||||
appI.GetConsensusAuthority(),
|
||||
appI.GetAppManager(),
|
||||
s.serverOptions.Mempool,
|
||||
indexEvents,
|
||||
appI.GetGRPCQueryDecoders(),
|
||||
appI.GetStore().(types.Store),
|
||||
s.config,
|
||||
s.initTxCodec,
|
||||
)
|
||||
consensus.prepareProposalHandler = s.serverOptions.PrepareProposalHandler
|
||||
consensus.processProposalHandler = s.serverOptions.ProcessProposalHandler
|
||||
consensus.verifyVoteExt = s.serverOptions.VerifyVoteExtensionHandler
|
||||
consensus.extendVote = s.serverOptions.ExtendVoteHandler
|
||||
consensus.addrPeerFilter = s.serverOptions.AddrPeerFilter
|
||||
consensus.idPeerFilter = s.serverOptions.IdPeerFilter
|
||||
|
||||
// TODO: set these; what is the appropriate presence of the Store interface here?
|
||||
var ss snapshots.StorageSnapshotter
|
||||
var sc snapshots.CommitSnapshotter
|
||||
|
||||
snapshotStore, err := GetSnapshotStore(s.config.CmtConfig.RootDir)
|
||||
snapshotStore, err := GetSnapshotStore(s.config.ConfigTomlConfig.RootDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sm := snapshots.NewManager(snapshotStore, s.options.SnapshotOptions, sc, ss, nil, s.logger)
|
||||
consensus.SetSnapshotManager(sm)
|
||||
consensus.snapshotManager = snapshots.NewManager(snapshotStore, s.serverOptions.SnapshotOptions, sc, ss, nil, s.logger)
|
||||
|
||||
s.Consensus = consensus
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -88,12 +111,9 @@ func (s *CometBFTServer[T]) Name() string {
|
||||
}
|
||||
|
||||
func (s *CometBFTServer[T]) Start(ctx context.Context) error {
|
||||
viper := ctx.Value(corectx.ViperContextKey).(*viper.Viper)
|
||||
cometConfig := GetConfigFromViper(viper)
|
||||
|
||||
wrappedLogger := cometlog.CometLoggerWrapper{Logger: s.logger}
|
||||
if s.config.Standalone {
|
||||
svr, err := abciserver.NewServer(s.config.Addr, s.config.Transport, s.Consensus)
|
||||
if s.config.AppTomlConfig.Standalone {
|
||||
svr, err := abciserver.NewServer(s.config.AppTomlConfig.Address, s.config.AppTomlConfig.Transport, s.Consensus)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating listener: %w", err)
|
||||
}
|
||||
@ -103,20 +123,20 @@ func (s *CometBFTServer[T]) Start(ctx context.Context) error {
|
||||
return svr.Start()
|
||||
}
|
||||
|
||||
nodeKey, err := p2p.LoadOrGenNodeKey(cometConfig.NodeKeyFile())
|
||||
nodeKey, err := p2p.LoadOrGenNodeKey(s.config.ConfigTomlConfig.NodeKeyFile())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.Node, err = node.NewNode(
|
||||
ctx,
|
||||
cometConfig,
|
||||
pvm.LoadOrGenFilePV(cometConfig.PrivValidatorKeyFile(), cometConfig.PrivValidatorStateFile()),
|
||||
s.config.ConfigTomlConfig,
|
||||
pvm.LoadOrGenFilePV(s.config.ConfigTomlConfig.PrivValidatorKeyFile(), s.config.ConfigTomlConfig.PrivValidatorStateFile()),
|
||||
nodeKey,
|
||||
proxy.NewConsensusSyncLocalClientCreator(s.Consensus),
|
||||
getGenDocProvider(cometConfig),
|
||||
getGenDocProvider(s.config.ConfigTomlConfig),
|
||||
cmtcfg.DefaultDBProvider,
|
||||
node.DefaultMetricsProvider(cometConfig.Instrumentation),
|
||||
node.DefaultMetricsProvider(s.config.ConfigTomlConfig.Instrumentation),
|
||||
wrappedLogger,
|
||||
)
|
||||
if err != nil {
|
||||
@ -174,16 +194,20 @@ func getGenDocProvider(cfg *cmtcfg.Config) func() (node.ChecksummedGenesisDoc, e
|
||||
|
||||
func (s *CometBFTServer[T]) StartCmdFlags() *pflag.FlagSet {
|
||||
flags := pflag.NewFlagSet("cometbft", pflag.ExitOnError)
|
||||
flags.Bool(FlagWithComet, true, "Run abci app embedded in-process with CometBFT")
|
||||
flags.String(FlagAddress, "tcp://127.0.0.1:26658", "Listen address")
|
||||
flags.String(FlagTransport, "socket", "Transport protocol: socket, grpc")
|
||||
flags.String(FlagTraceStore, "", "Enable KVStore tracing to an output file")
|
||||
flags.String(FlagMinGasPrices, "", "Minimum gas prices to accept for transactions; Any fee in a tx must meet this minimum (e.g. 0.01photino;0.0001stake)")
|
||||
flags.Uint64(FlagQueryGasLimit, 0, "Maximum gas a Rest/Grpc query can consume. Blank and 0 imply unbounded.")
|
||||
flags.Uint64(FlagHaltHeight, 0, "Block height at which to gracefully halt the chain and shutdown the node")
|
||||
flags.Uint64(FlagHaltTime, 0, "Minimum block time (in Unix seconds) at which to gracefully halt the chain and shutdown the node")
|
||||
flags.String(FlagCPUProfile, "", "Enable CPU profiling and write to the provided file")
|
||||
flags.Bool(FlagTrace, false, "Provide full stack traces for errors in ABCI Log")
|
||||
|
||||
// start flags are prefixed with the server name
|
||||
// as the config in prefixed with the server name
|
||||
// this allows viper to properly bind the flags
|
||||
prefix := func(f string) string {
|
||||
return fmt.Sprintf("%s.%s", s.Name(), f)
|
||||
}
|
||||
|
||||
flags.String(prefix(FlagAddress), "tcp://127.0.0.1:26658", "Listen address")
|
||||
flags.String(prefix(FlagTransport), "socket", "Transport protocol: socket, grpc")
|
||||
flags.Uint64(prefix(FlagHaltHeight), 0, "Block height at which to gracefully halt the chain and shutdown the node")
|
||||
flags.Uint64(prefix(FlagHaltTime), 0, "Minimum block time (in Unix seconds) at which to gracefully halt the chain and shutdown the node")
|
||||
flags.Bool(prefix(FlagTrace), false, "Provide full stack traces for errors in ABCI Log")
|
||||
flags.Bool(prefix(Standalone), false, "Run app without CometBFT")
|
||||
return flags
|
||||
}
|
||||
|
||||
@ -206,12 +230,30 @@ func (s *CometBFTServer[T]) CLICommands() serverv2.CLIConfig {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *CometBFTServer[T]) WriteDefaultConfigAt(configPath string) error {
|
||||
cometConfig := cmtcfg.DefaultConfig()
|
||||
for _, opt := range s.cmtConfigOptions {
|
||||
opt(cometConfig)
|
||||
// CometBFT is a special server, it has config in config.toml and app.toml
|
||||
|
||||
// Config returns the (app.toml) server configuration.
|
||||
func (s *CometBFTServer[T]) Config() any {
|
||||
if s.config.AppTomlConfig == nil || s.config.AppTomlConfig == (&AppTomlConfig{}) {
|
||||
cfg := &Config{AppTomlConfig: DefaultAppTomlConfig()}
|
||||
// overwrite the default config with the provided options
|
||||
for _, opt := range s.cfgOptions {
|
||||
opt(cfg)
|
||||
}
|
||||
|
||||
return cfg.AppTomlConfig
|
||||
}
|
||||
|
||||
cmtcfg.WriteConfigFile(filepath.Join(configPath, "config.toml"), cometConfig)
|
||||
return s.config.AppTomlConfig
|
||||
}
|
||||
|
||||
// WriteCustomConfigAt writes the default cometbft config.toml
|
||||
func (s *CometBFTServer[T]) WriteCustomConfigAt(configPath string) error {
|
||||
cfg := &Config{ConfigTomlConfig: cmtcfg.DefaultConfig()}
|
||||
for _, opt := range s.cfgOptions {
|
||||
opt(cfg)
|
||||
}
|
||||
|
||||
cmtcfg.WriteConfigFile(filepath.Join(configPath, "config.toml"), cfg.ConfigTomlConfig)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -262,10 +262,10 @@ func (c *Consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockReques
|
||||
|
||||
// expectedHeight holds the expected height to validate
|
||||
var expectedHeight uint64
|
||||
if lastBlockHeight == 0 && c.cfg.InitialHeight > 1 {
|
||||
if lastBlockHeight == 0 && c.initialHeight > 1 {
|
||||
// In this case, we're validating the first block of the chain, i.e no
|
||||
// previous commit. The height we're expecting is the initial height.
|
||||
expectedHeight = c.cfg.InitialHeight
|
||||
expectedHeight = c.initialHeight
|
||||
} else {
|
||||
// This case can mean two things:
|
||||
//
|
||||
@ -327,7 +327,7 @@ func (c *Consensus[T]) GetConsensusParams(ctx context.Context) (*cmtproto.Consen
|
||||
|
||||
func (c *Consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, commitHeight int64) int64 {
|
||||
// pruning is disabled if minRetainBlocks is zero
|
||||
if c.cfg.MinRetainBlocks == 0 {
|
||||
if c.cfg.AppTomlConfig.MinRetainBlocks == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
@ -368,7 +368,7 @@ func (c *Consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, com
|
||||
}
|
||||
}
|
||||
|
||||
v := commitHeight - int64(c.cfg.MinRetainBlocks)
|
||||
v := commitHeight - int64(c.cfg.AppTomlConfig.MinRetainBlocks)
|
||||
retentionHeight = minNonZero(retentionHeight, v)
|
||||
|
||||
if retentionHeight <= 0 {
|
||||
@ -383,15 +383,15 @@ func (c *Consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, com
|
||||
func (c *Consensus[T]) checkHalt(height int64, time time.Time) error {
|
||||
var halt bool
|
||||
switch {
|
||||
case c.cfg.HaltHeight > 0 && uint64(height) > c.cfg.HaltHeight:
|
||||
case c.cfg.AppTomlConfig.HaltHeight > 0 && uint64(height) > c.cfg.AppTomlConfig.HaltHeight:
|
||||
halt = true
|
||||
|
||||
case c.cfg.HaltTime > 0 && time.Unix() > int64(c.cfg.HaltTime):
|
||||
case c.cfg.AppTomlConfig.HaltTime > 0 && time.Unix() > int64(c.cfg.AppTomlConfig.HaltTime):
|
||||
halt = true
|
||||
}
|
||||
|
||||
if halt {
|
||||
return fmt.Errorf("halt per configuration height %d time %d", c.cfg.HaltHeight, c.cfg.HaltTime)
|
||||
return fmt.Errorf("halt per configuration height %d time %d", c.cfg.AppTomlConfig.HaltHeight, c.cfg.AppTomlConfig.HaltTime)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
26
server/v2/cometbft/version.go
Normal file
26
server/v2/cometbft/version.go
Normal file
@ -0,0 +1,26 @@
|
||||
package cometbft
|
||||
|
||||
import "runtime/debug"
|
||||
|
||||
var Version = ""
|
||||
|
||||
func getCometBFTServerVersion() string {
|
||||
deps, ok := debug.ReadBuildInfo()
|
||||
if !ok {
|
||||
return Version
|
||||
}
|
||||
|
||||
var serverVersion string
|
||||
for _, dep := range deps.Deps {
|
||||
if dep.Path == "cosmossdk.io/server/v2/cometbft" {
|
||||
if dep.Replace != nil && dep.Replace.Version != "(devel)" {
|
||||
serverVersion = dep.Replace.Version
|
||||
} else {
|
||||
serverVersion = dep.Version
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Version = serverVersion
|
||||
return serverVersion
|
||||
}
|
||||
@ -3,7 +3,6 @@ package serverv2
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
@ -102,19 +101,12 @@ func createStartCommand[T transaction.Tx](
|
||||
) *cobra.Command {
|
||||
flags := server.StartFlags()
|
||||
|
||||
return &cobra.Command{
|
||||
cmd := &cobra.Command{
|
||||
Use: "start",
|
||||
Short: "Run the application",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
v := GetViperFromCmd(cmd)
|
||||
l := GetLoggerFromCmd(cmd)
|
||||
|
||||
for _, startFlags := range flags {
|
||||
if err := v.BindPFlags(startFlags); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := v.BindPFlags(cmd.Flags()); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -137,12 +129,19 @@ func createStartCommand[T transaction.Tx](
|
||||
}()
|
||||
|
||||
if err := server.Start(ctx); err != nil {
|
||||
return fmt.Errorf("failed to start servers: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
// add the start flags to the command
|
||||
for _, startFlags := range flags {
|
||||
cmd.Flags().AddFlagSet(startFlags)
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// configHandle writes the default config to the home directory if it does not exist and sets the server context
|
||||
|
||||
@ -1,15 +1,61 @@
|
||||
package serverv2
|
||||
|
||||
import "github.com/spf13/cobra"
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
// CLIConfig defines the CLI configuration for a module server.
|
||||
type CLIConfig struct {
|
||||
// Commands defines the main command of a module server.
|
||||
Commands []*cobra.Command
|
||||
// Queries defines the query commands of a module server.
|
||||
// Those commands are meant to be added in the root query command.
|
||||
Queries []*cobra.Command
|
||||
// Txs defines the tx commands of a module server.
|
||||
// Those commands are meant to be added in the root tx command.
|
||||
Txs []*cobra.Command
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
// ReadConfig returns a viper instance of the config file
|
||||
func ReadConfig(configPath string) (*viper.Viper, error) {
|
||||
v := viper.New()
|
||||
v.SetConfigType("toml")
|
||||
v.SetConfigName("config")
|
||||
v.AddConfigPath(configPath)
|
||||
if err := v.ReadInConfig(); err != nil {
|
||||
return nil, fmt.Errorf("failed to read config: %s: %w", configPath, err)
|
||||
}
|
||||
|
||||
v.SetConfigName("app")
|
||||
if err := v.MergeInConfig(); err != nil {
|
||||
return nil, fmt.Errorf("failed to merge configuration: %w", err)
|
||||
}
|
||||
|
||||
v.WatchConfig()
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// UnmarshalSubconfig unmarshals the given subconfig from the viper instance.
|
||||
// It unmarshals the config, env, flags into the target struct.
|
||||
// Use this instead of viper.Sub because viper does not unmarshal flags.
|
||||
func UnmarshalSubConfig(v *viper.Viper, subName string, target any) error {
|
||||
var sub any
|
||||
for k, val := range v.AllSettings() {
|
||||
if strings.HasPrefix(k, subName) {
|
||||
sub = val
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new decoder with custom decoding options
|
||||
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.ComposeDecodeHookFunc(
|
||||
mapstructure.StringToTimeDurationHookFunc(),
|
||||
mapstructure.StringToSliceHookFunc(","),
|
||||
),
|
||||
Result: target,
|
||||
WeaklyTypedInput: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create decoder: %w", err)
|
||||
}
|
||||
|
||||
// Decode the sub-configuration
|
||||
if err := decoder.Decode(sub); err != nil {
|
||||
return fmt.Errorf("failed to decode sub-configuration: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
39
server/v2/config_test.go
Normal file
39
server/v2/config_test.go
Normal file
@ -0,0 +1,39 @@
|
||||
package serverv2_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
serverv2 "cosmossdk.io/server/v2"
|
||||
grpc "cosmossdk.io/server/v2/api/grpc"
|
||||
)
|
||||
|
||||
func TestReadConfig(t *testing.T) {
|
||||
currentDir, err := os.Getwd()
|
||||
require.NoError(t, err)
|
||||
configPath := filepath.Join(currentDir, "testdata")
|
||||
|
||||
v, err := serverv2.ReadConfig(configPath)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, v.GetString("grpc.address"), grpc.DefaultConfig().Address)
|
||||
}
|
||||
|
||||
func TestUnmarshalSubConfig(t *testing.T) {
|
||||
currentDir, err := os.Getwd()
|
||||
require.NoError(t, err)
|
||||
configPath := filepath.Join(currentDir, "testdata")
|
||||
|
||||
v, err := serverv2.ReadConfig(configPath)
|
||||
require.NoError(t, err)
|
||||
|
||||
grpcConfig := grpc.DefaultConfig()
|
||||
err = serverv2.UnmarshalSubConfig(v, "grpc", &grpcConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, grpc.DefaultConfig().Enable)
|
||||
require.False(t, grpcConfig.Enable)
|
||||
}
|
||||
@ -28,6 +28,7 @@ require (
|
||||
github.com/hashicorp/go-hclog v1.6.2
|
||||
github.com/hashicorp/go-metrics v0.5.3
|
||||
github.com/hashicorp/go-plugin v1.6.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/pelletier/go-toml/v2 v2.2.2
|
||||
github.com/prometheus/client_golang v1.19.1
|
||||
github.com/prometheus/common v0.55.0
|
||||
@ -62,7 +63,6 @@ require (
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/oklog/run v1.1.0 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
|
||||
@ -26,9 +26,12 @@ type ServerComponent[T transaction.Tx] interface {
|
||||
Init(AppI[T], *viper.Viper, log.Logger) error
|
||||
}
|
||||
|
||||
// HasCLICommands is a server module that has CLI commands.
|
||||
type HasCLICommands interface {
|
||||
CLICommands() CLIConfig
|
||||
// HasStartFlags is a server module that has start flags.
|
||||
type HasStartFlags interface {
|
||||
// StartCmdFlags returns server start flags.
|
||||
// Those flags should be prefixed with the server name.
|
||||
// They are then merged with the server config in one viper instance.
|
||||
StartCmdFlags() *pflag.FlagSet
|
||||
}
|
||||
|
||||
// HasConfig is a server module that has a config.
|
||||
@ -36,33 +39,25 @@ type HasConfig interface {
|
||||
Config() any
|
||||
}
|
||||
|
||||
// HasStartFlags is a server module that has start flags.
|
||||
type HasStartFlags interface {
|
||||
StartCmdFlags() *pflag.FlagSet
|
||||
// HasCLICommands is a server module that has CLI commands.
|
||||
type HasCLICommands interface {
|
||||
CLICommands() CLIConfig
|
||||
}
|
||||
|
||||
// CLIConfig defines the CLI configuration for a module server.
|
||||
type CLIConfig struct {
|
||||
// Commands defines the main command of a module server.
|
||||
Commands []*cobra.Command
|
||||
// Queries defines the query commands of a module server.
|
||||
// Those commands are meant to be added in the root query command.
|
||||
Queries []*cobra.Command
|
||||
// Txs defines the tx commands of a module server.
|
||||
// Those commands are meant to be added in the root tx command.
|
||||
Txs []*cobra.Command
|
||||
}
|
||||
|
||||
var _ ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil)
|
||||
|
||||
// ReadConfig returns a viper instance of the config file
|
||||
func ReadConfig(configPath string) (*viper.Viper, error) {
|
||||
v := viper.New()
|
||||
v.SetConfigType("toml")
|
||||
v.SetConfigName("config")
|
||||
v.AddConfigPath(configPath)
|
||||
if err := v.ReadInConfig(); err != nil {
|
||||
return nil, fmt.Errorf("failed to read config: %s: %w", configPath, err)
|
||||
}
|
||||
|
||||
v.SetConfigName("app")
|
||||
if err := v.MergeInConfig(); err != nil {
|
||||
return nil, fmt.Errorf("failed to merge configuration: %w", err)
|
||||
}
|
||||
|
||||
v.WatchConfig()
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
||||
type Server[T transaction.Tx] struct {
|
||||
logger log.Logger
|
||||
components []ServerComponent[T]
|
||||
@ -209,8 +204,8 @@ func (s *Server[T]) WriteConfig(configPath string) error {
|
||||
// undocumented interface to write the component default config in another file than app.toml
|
||||
// it is used by cometbft for backward compatibility
|
||||
// it should not be used by other components
|
||||
if mod, ok := component.(interface{ WriteDefaultConfigAt(string) error }); ok {
|
||||
if err := mod.WriteDefaultConfigAt(configPath); err != nil {
|
||||
if mod, ok := component.(interface{ WriteCustomConfigAt(string) error }); ok {
|
||||
if err := mod.WriteCustomConfigAt(configPath); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,19 +37,9 @@ func (*mockApp[T]) InterfaceRegistry() coreapp.InterfaceRegistry {
|
||||
return &mockInterfaceRegistry{}
|
||||
}
|
||||
|
||||
// TODO split this test into multiple tests
|
||||
// test read config
|
||||
// test write config
|
||||
// test server configs
|
||||
// test start empty
|
||||
// test start config exists
|
||||
// test stop
|
||||
func TestServer(t *testing.T) {
|
||||
currentDir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
t.Fail()
|
||||
}
|
||||
require.NoError(t, err)
|
||||
configPath := filepath.Join(currentDir, "testdata")
|
||||
|
||||
v, err := serverv2.ReadConfig(configPath)
|
||||
@ -59,10 +49,8 @@ func TestServer(t *testing.T) {
|
||||
|
||||
logger := log.NewLogger(os.Stdout)
|
||||
grpcServer := grpc.New[transaction.Tx]()
|
||||
if err := grpcServer.Init(&mockApp[transaction.Tx]{}, v, logger); err != nil {
|
||||
t.Log(err)
|
||||
t.Fail()
|
||||
}
|
||||
err = grpcServer.Init(&mockApp[transaction.Tx]{}, v, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
mockServer := &mockServer{name: "mock-server-1", ch: make(chan string, 100)}
|
||||
|
||||
@ -73,30 +61,17 @@ func TestServer(t *testing.T) {
|
||||
)
|
||||
|
||||
serverCfgs := server.Configs()
|
||||
if serverCfgs[grpcServer.Name()].(*grpc.Config).Address != grpc.DefaultConfig().Address {
|
||||
t.Logf("config is not equal: %v", serverCfgs[grpcServer.Name()])
|
||||
t.Fail()
|
||||
}
|
||||
if serverCfgs[mockServer.Name()].(*mockServerConfig).MockFieldOne != MockServerDefaultConfig().MockFieldOne {
|
||||
t.Logf("config is not equal: %v", serverCfgs[mockServer.Name()])
|
||||
t.Fail()
|
||||
}
|
||||
require.Equal(t, serverCfgs[grpcServer.Name()].(*grpc.Config).Address, grpc.DefaultConfig().Address)
|
||||
require.Equal(t, serverCfgs[mockServer.Name()].(*mockServerConfig).MockFieldOne, MockServerDefaultConfig().MockFieldOne)
|
||||
|
||||
// write config
|
||||
if err := server.WriteConfig(configPath); err != nil {
|
||||
t.Log(err)
|
||||
t.Fail()
|
||||
}
|
||||
err = server.WriteConfig(configPath)
|
||||
require.NoError(t, err)
|
||||
|
||||
v, err = serverv2.ReadConfig(configPath)
|
||||
if err != nil {
|
||||
t.Log(err) // config should be created by WriteConfig
|
||||
t.FailNow()
|
||||
}
|
||||
if v.GetString(grpcServer.Name()+".address") != grpc.DefaultConfig().Address {
|
||||
t.Logf("config is not equal: %v", v)
|
||||
t.Fail()
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, v.GetString(grpcServer.Name()+".address"), grpc.DefaultConfig().Address)
|
||||
|
||||
// start empty
|
||||
ctx, cancelFn := context.WithCancel(context.TODO())
|
||||
@ -105,30 +80,10 @@ func TestServer(t *testing.T) {
|
||||
<-time.After(5 * time.Second)
|
||||
cancelFn()
|
||||
|
||||
if err := server.Stop(ctx); err != nil {
|
||||
t.Logf("failed to stop servers: %s", err)
|
||||
t.Fail()
|
||||
}
|
||||
err = server.Stop(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
if err := server.Start(ctx); err != nil {
|
||||
t.Log(err)
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadConfig(t *testing.T) {
|
||||
currentDir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
t.Fail()
|
||||
}
|
||||
configPath := filepath.Join(currentDir, "testdata")
|
||||
|
||||
v, err := serverv2.ReadConfig(configPath)
|
||||
require.NoError(t, err)
|
||||
|
||||
grpcConfig := grpc.DefaultConfig()
|
||||
err = v.Sub("grpc").Unmarshal(&grpcConfig)
|
||||
err = server.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
2
server/v2/testdata/app.toml
vendored
2
server/v2/testdata/app.toml
vendored
@ -1,6 +1,6 @@
|
||||
[grpc]
|
||||
# Enable defines if the gRPC server should be enabled.
|
||||
enable = true
|
||||
enable = false
|
||||
# Address defines the gRPC server address to bind to.
|
||||
address = 'localhost:9090'
|
||||
# MaxRecvMsgSize defines the max message size in bytes the server can receive.
|
||||
|
||||
@ -13,9 +13,10 @@ import (
|
||||
type AppCreator[T transaction.Tx] func(log.Logger, *viper.Viper) AppI[T]
|
||||
|
||||
type AppI[T transaction.Tx] interface {
|
||||
Name() string
|
||||
InterfaceRegistry() coreapp.InterfaceRegistry
|
||||
GetAppManager() *appmanager.AppManager[T]
|
||||
GetConsensusAuthority() string
|
||||
InterfaceRegistry() coreapp.InterfaceRegistry
|
||||
GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error)
|
||||
GetStore() any
|
||||
}
|
||||
|
||||
@ -32,46 +32,6 @@ import (
|
||||
genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types"
|
||||
)
|
||||
|
||||
var _ transaction.Codec[transaction.Tx] = &temporaryTxDecoder[transaction.Tx]{}
|
||||
|
||||
type temporaryTxDecoder[T transaction.Tx] struct {
|
||||
txConfig client.TxConfig
|
||||
}
|
||||
|
||||
// Decode implements transaction.Codec.
|
||||
func (t *temporaryTxDecoder[T]) Decode(bz []byte) (T, error) {
|
||||
var out T
|
||||
tx, err := t.txConfig.TxDecoder()(bz)
|
||||
if err != nil {
|
||||
return out, err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
out, ok = tx.(T)
|
||||
if !ok {
|
||||
return out, errors.New("unexpected Tx type")
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// DecodeJSON implements transaction.Codec.
|
||||
func (t *temporaryTxDecoder[T]) DecodeJSON(bz []byte) (T, error) {
|
||||
var out T
|
||||
tx, err := t.txConfig.TxJSONDecoder()(bz)
|
||||
if err != nil {
|
||||
return out, err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
out, ok = tx.(T)
|
||||
if !ok {
|
||||
return out, errors.New("unexpected Tx type")
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func newApp[T transaction.Tx](
|
||||
logger log.Logger, viper *viper.Viper,
|
||||
) serverv2.AppI[T] {
|
||||
@ -102,19 +62,19 @@ func initRootCmd[T transaction.Tx](
|
||||
|
||||
// add keybase, auxiliary RPC, query, genesis, and tx child commands
|
||||
rootCmd.AddCommand(
|
||||
genesisCommand[T](moduleManager, appExport[T]),
|
||||
genesisCommand(moduleManager, appExport[T]),
|
||||
queryCommand(),
|
||||
txCommand(),
|
||||
keys.Commands(),
|
||||
offchain.OffChain(),
|
||||
)
|
||||
|
||||
// Add empty server struct here for writing default config
|
||||
// wire server commands
|
||||
if err = serverv2.AddCommands(
|
||||
rootCmd,
|
||||
newApp,
|
||||
logger,
|
||||
cometbft.New[T](&temporaryTxDecoder[T]{txConfig}, cometbft.DefaultServerOptions[T]()),
|
||||
cometbft.New(&genericTxDecoder[T]{txConfig}, cometbft.DefaultServerOptions[T]()),
|
||||
grpc.New[T](),
|
||||
); err != nil {
|
||||
panic(err)
|
||||
@ -220,3 +180,43 @@ func appExport[T transaction.Tx](
|
||||
|
||||
return simApp.ExportAppStateAndValidators(forZeroHeight, jailAllowedAddrs, modulesToExport)
|
||||
}
|
||||
|
||||
var _ transaction.Codec[transaction.Tx] = &genericTxDecoder[transaction.Tx]{}
|
||||
|
||||
type genericTxDecoder[T transaction.Tx] struct {
|
||||
txConfig client.TxConfig
|
||||
}
|
||||
|
||||
// Decode implements transaction.Codec.
|
||||
func (t *genericTxDecoder[T]) Decode(bz []byte) (T, error) {
|
||||
var out T
|
||||
tx, err := t.txConfig.TxDecoder()(bz)
|
||||
if err != nil {
|
||||
return out, err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
out, ok = tx.(T)
|
||||
if !ok {
|
||||
return out, errors.New("unexpected Tx type")
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// DecodeJSON implements transaction.Codec.
|
||||
func (t *genericTxDecoder[T]) DecodeJSON(bz []byte) (T, error) {
|
||||
var out T
|
||||
tx, err := t.txConfig.TxJSONDecoder()(bz)
|
||||
if err != nil {
|
||||
return out, err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
out, ok = tx.(T)
|
||||
if !ok {
|
||||
return out, errors.New("unexpected Tx type")
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
@ -40,6 +40,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
flagMinGasPrices = "min-gas-prices"
|
||||
flagNodeDirPrefix = "node-dir-prefix"
|
||||
flagNumValidators = "validator-count"
|
||||
flagOutputDir = "output-dir"
|
||||
@ -70,7 +71,7 @@ func addTestnetFlagsToCmd(cmd *cobra.Command) {
|
||||
cmd.Flags().IntP(flagNumValidators, "n", 4, "Number of validators to initialize the testnet with")
|
||||
cmd.Flags().StringP(flagOutputDir, "o", "./.testnets", "Directory to store initialization data for the testnet")
|
||||
cmd.Flags().String(flags.FlagChainID, "", "genesis file chain-id, if left blank will be randomly created")
|
||||
cmd.Flags().String(cometbft.FlagMinGasPrices, fmt.Sprintf("0.000006%s", sdk.DefaultBondDenom), "Minimum gas prices to accept for transactions; All fees in a tx must meet this minimum (e.g. 0.01photino,0.001stake)")
|
||||
cmd.Flags().String(flagMinGasPrices, fmt.Sprintf("0.000006%s", sdk.DefaultBondDenom), "Minimum gas prices to accept for transactions; All fees in a tx must meet this minimum (e.g. 0.01photino,0.001stake)")
|
||||
cmd.Flags().String(flags.FlagKeyType, string(hd.Secp256k1Type), "Key signing algorithm to generate keys for")
|
||||
|
||||
// support old flags name for backwards compatibility
|
||||
@ -127,7 +128,7 @@ Example:
|
||||
args.outputDir, _ = cmd.Flags().GetString(flagOutputDir)
|
||||
args.keyringBackend, _ = cmd.Flags().GetString(flags.FlagKeyringBackend)
|
||||
args.chainID, _ = cmd.Flags().GetString(flags.FlagChainID)
|
||||
args.minGasPrices, _ = cmd.Flags().GetString(cometbft.FlagMinGasPrices)
|
||||
args.minGasPrices, _ = cmd.Flags().GetString(flagMinGasPrices)
|
||||
args.nodeDirPrefix, _ = cmd.Flags().GetString(flagNodeDirPrefix)
|
||||
args.nodeDaemonHome, _ = cmd.Flags().GetString(flagNodeDaemonHome)
|
||||
args.startingIPAddress, _ = cmd.Flags().GetString(flagStartingIPAddress)
|
||||
@ -336,7 +337,11 @@ func initTestnetFiles[T transaction.Tx](
|
||||
}
|
||||
|
||||
// Write server config
|
||||
cometServer := cometbft.New[T](&temporaryTxDecoder[T]{clientCtx.TxConfig}, cometbft.ServerOptions[T]{}, cometbft.OverwriteDefaultCometConfig(nodeConfig))
|
||||
cometServer := cometbft.New[T](
|
||||
&genericTxDecoder[T]{clientCtx.TxConfig},
|
||||
cometbft.ServerOptions[T]{},
|
||||
cometbft.OverwriteDefaultConfigTomlConfig(nodeConfig),
|
||||
)
|
||||
grpcServer := grpc.New[T](grpc.OverwriteDefaultConfig(grpcConfig))
|
||||
server := serverv2.NewServer(coretesting.NewNopLogger(), cometServer, grpcServer)
|
||||
err = server.WriteConfig(filepath.Join(nodeDir, "config"))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user