refactor(server/v2/cometbft): add codec.Codec and clean-up APIs (#22566)

This commit is contained in:
Julien Robert 2024-11-20 19:52:18 +04:00 committed by GitHub
parent d810b77c71
commit efc05e8122
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 174 additions and 147 deletions

View File

@ -6,14 +6,10 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
gogoproto "github.com/cosmos/gogoproto/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
@ -37,6 +33,11 @@ import (
"cosmossdk.io/server/v2/streaming"
"cosmossdk.io/store/v2/snapshots"
consensustypes "cosmossdk.io/x/consensus/types"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
)
const (
@ -45,22 +46,24 @@ const (
QueryPathStore = "store"
)
var _ abci.Application = (*Consensus[transaction.Tx])(nil)
var _ abci.Application = (*consensus[transaction.Tx])(nil)
type Consensus[T transaction.Tx] struct {
// consensus contains the implementation of the ABCI interface for CometBFT.
type consensus[T transaction.Tx] struct {
logger log.Logger
appName, version string
app appmanager.AppManager[T]
appCodec codec.Codec
txCodec transaction.Codec[T]
store types.Store
streaming streaming.Manager
listener *appdata.Listener
snapshotManager *snapshots.Manager
streamingManager streaming.Manager
mempool mempool.Mempool[T]
cfg Config
indexedEvents map[string]struct{}
chainID string
indexedEvents map[string]struct{}
initialHeight uint64
// this is only available after this node has committed a block (in FinalizeBlock),
@ -81,60 +84,9 @@ type Consensus[T transaction.Tx] struct {
getProtoRegistry func() (*protoregistry.Files, error)
}
func NewConsensus[T transaction.Tx](
logger log.Logger,
appName string,
app appmanager.AppManager[T],
mp mempool.Mempool[T],
indexedEvents map[string]struct{},
queryHandlersMap map[string]appmodulev2.Handler,
store types.Store,
cfg Config,
txCodec transaction.Codec[T],
chainId string,
) *Consensus[T] {
return &Consensus[T]{
appName: appName,
version: getCometBFTServerVersion(),
app: app,
cfg: cfg,
store: store,
logger: logger,
txCodec: txCodec,
streaming: streaming.Manager{},
snapshotManager: nil,
mempool: mp,
lastCommittedHeight: atomic.Int64{},
prepareProposalHandler: nil,
processProposalHandler: nil,
verifyVoteExt: nil,
extendVote: nil,
chainID: chainId,
indexedEvents: indexedEvents,
initialHeight: 0,
queryHandlersMap: queryHandlersMap,
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
}
}
// SetStreamingManager sets the streaming manager for the consensus module.
func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}
// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
if err := c.snapshotManager.RegisterExtensions(extensions...); err != nil {
return fmt.Errorf("failed to register snapshot extensions: %w", err)
}
return nil
}
// CheckTx implements types.Application.
// It is called by cometbft to verify transaction validity
func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
decodedTx, err := c.txCodec.Decode(req.Tx)
if err != nil {
return nil, err
@ -172,7 +124,7 @@ func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
}
// Info implements types.Application.
func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
func (c *consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
version, _, err := c.store.StateLatest()
if err != nil {
return nil, err
@ -212,7 +164,7 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
// Query implements types.Application.
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req)
if isGRPC {
return resp, err
@ -227,7 +179,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
switch path[0] {
case QueryPathApp:
resp, err = c.handlerQueryApp(ctx, path, req)
resp, err = c.handleQueryApp(ctx, path, req)
case QueryPathStore:
resp, err = c.handleQueryStore(path, req)
@ -246,7 +198,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
return resp, nil
}
func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
// if this fails then we cannot serve queries anymore
registry, err := c.getProtoRegistry()
if err != nil {
@ -288,7 +240,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq
txResult, _, err := c.app.Simulate(ctx, tx)
if err != nil {
return nil, true, fmt.Errorf("%v with gas used: '%d'", err, txResult.GasUsed)
return nil, true, fmt.Errorf("failed with gas used: '%d': %w", txResult.GasUsed, err)
}
msgResponses := make([]*codectypes.Any, 0, len(txResult.Resp))
@ -337,7 +289,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq
}
// InitChain implements types.Application.
func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId)
// store chainID to be used later on in execution
@ -421,7 +373,7 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe
// PrepareProposal implements types.Application.
// It is called by cometbft to prepare a proposal block.
func (c *Consensus[T]) PrepareProposal(
func (c *consensus[T]) PrepareProposal(
ctx context.Context,
req *abciproto.PrepareProposalRequest,
) (resp *abciproto.PrepareProposalResponse, err error) {
@ -457,7 +409,7 @@ func (c *Consensus[T]) PrepareProposal(
// ProcessProposal implements types.Application.
// It is called by cometbft to process/verify a proposal block.
func (c *Consensus[T]) ProcessProposal(
func (c *consensus[T]) ProcessProposal(
ctx context.Context,
req *abciproto.ProcessProposalRequest,
) (*abciproto.ProcessProposalResponse, error) {
@ -491,7 +443,7 @@ func (c *Consensus[T]) ProcessProposal(
// FinalizeBlock implements types.Application.
// It is called by cometbft to finalize a block.
func (c *Consensus[T]) FinalizeBlock(
func (c *consensus[T]) FinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
@ -581,7 +533,7 @@ func (c *Consensus[T]) FinalizeBlock(
// Commit implements types.Application.
// It is called by cometbft to notify the application that a block was committed.
func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
lastCommittedHeight := c.lastCommittedHeight.Load()
c.snapshotManager.SnapshotIfApplicable(lastCommittedHeight)
@ -599,7 +551,7 @@ func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (
// Vote extensions
// VerifyVoteExtension implements types.Application.
func (c *Consensus[T]) VerifyVoteExtension(
func (c *consensus[T]) VerifyVoteExtension(
ctx context.Context,
req *abciproto.VerifyVoteExtensionRequest,
) (*abciproto.VerifyVoteExtensionResponse, error) {
@ -641,7 +593,7 @@ func (c *Consensus[T]) VerifyVoteExtension(
}
// ExtendVote implements types.Application.
func (c *Consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
func (c *consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
// If vote extensions are not enabled, as a safety precaution, we return an
// error.
cp, err := c.GetConsensusParams(ctx)

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"io"
"strings"
"sync"
"testing"
"time"
@ -637,7 +638,7 @@ func TestConsensus_Query(t *testing.T) {
require.Equal(t, res.Value, []byte(nil))
}
func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *Consensus[mock.Tx] {
func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *consensus[mock.Tx] {
t.Helper()
msgRouterBuilder := getMsgRouterBuilder(t, func(ctx context.Context, msg *gogotypes.BoolValue) (*gogotypes.BoolValue, error) {
@ -699,9 +700,17 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
nil,
)
return NewConsensus[mock.Tx](log.NewNopLogger(), "testing-app", am,
mempool, map[string]struct{}{}, nil, mockStore,
Config{AppTomlConfig: DefaultAppTomlConfig()}, mock.TxCodec{}, "test")
return &consensus[mock.Tx]{
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
mempool: mempool,
store: mockStore,
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
txCodec: mock.TxCodec{},
chainID: "test",
getProtoRegistry: sync.OnceValues(proto.MergedRegistry),
}
}
// Check target version same with store's latest version

View File

@ -388,7 +388,7 @@ func (s *CometBFTServer[T]) BootstrapStateCmd() *cobra.Command {
return err
}
if height == 0 {
height, err = s.Consensus.store.GetLatestVersion()
height, err = s.store.GetLatestVersion()
if err != nil {
return err
}

View File

@ -3,7 +3,8 @@ package cometbft
import (
"context"
v1 "github.com/cometbft/cometbft/api/cometbft/abci/v1"
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -12,8 +13,8 @@ import (
autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
cmtv1beta1 "cosmossdk.io/api/cosmos/base/tendermint/v1beta1"
"cosmossdk.io/core/server"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/grpc/cmtservice"
@ -23,17 +24,25 @@ import (
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
)
// GRPCServiceRegistrar returns a function that registers the CometBFT gRPC service
type appSimulator[T transaction.Tx] interface {
Simulate(ctx context.Context, tx T) (server.TxResult, corestore.WriterMap, error)
}
// gRPCServiceRegistrar returns a function that registers the CometBFT gRPC service
// Those services are defined for backward compatibility.
// Eventually, they will be removed in favor of the new gRPC services.
func (c *Consensus[T]) GRPCServiceRegistrar(
func gRPCServiceRegistrar[T transaction.Tx](
clientCtx client.Context,
cfg server.ConfigMap,
cometBFTAppConfig *AppTomlConfig,
txCodec transaction.Codec[T],
consensus abci.Application,
app appSimulator[T],
) func(srv *grpc.Server) error {
return func(srv *grpc.Server) error {
cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, c.Query, clientCtx.ConsensusAddressCodec))
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, c})
nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, c})
cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, consensus.Query, clientCtx.ConsensusAddressCodec))
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app})
nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, cometBFTAppConfig, consensus})
return nil
}
@ -86,7 +95,8 @@ var CometBFTAutoCLIDescriptor = &autocliv1.ServiceCommandDescriptor{
type txServer[T transaction.Tx] struct {
clientCtx client.Context
consensus *Consensus[T]
txCodec transaction.Codec[T]
app appSimulator[T]
}
// BroadcastTx implements tx.ServiceServer.
@ -132,12 +142,12 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest)
return nil, status.Errorf(codes.InvalidArgument, "empty txBytes is not allowed")
}
tx, err := t.consensus.txCodec.Decode(txBytes)
tx, err := t.txCodec.Decode(txBytes)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to decode tx")
return nil, status.Errorf(codes.InvalidArgument, "failed to decode tx: %v", err)
}
txResult, _, err := t.consensus.app.Simulate(ctx, tx)
txResult, _, err := t.app.Simulate(ctx, tx)
if err != nil {
return nil, status.Errorf(codes.Unknown, "%v with gas used: '%d'", err, txResult.GasUsed)
}
@ -186,8 +196,9 @@ func (t txServer[T]) TxEncodeAmino(context.Context, *txtypes.TxEncodeAminoReques
var _ txtypes.ServiceServer = txServer[transaction.Tx]{}
type nodeServer[T transaction.Tx] struct {
cfg server.ConfigMap
consensus *Consensus[T]
cfg server.ConfigMap
cometBFTAppConfig *AppTomlConfig
consensus abci.Application
}
func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest) (*nodeservice.ConfigResponse, error) {
@ -201,12 +212,12 @@ func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest)
MinimumGasPrice: minGasPricesStr,
PruningKeepRecent: "ambiguous in v2",
PruningInterval: "ambiguous in v2",
HaltHeight: s.consensus.cfg.AppTomlConfig.HaltHeight,
HaltHeight: s.cometBFTAppConfig.HaltHeight,
}, nil
}
func (s nodeServer[T]) Status(ctx context.Context, _ *nodeservice.StatusRequest) (*nodeservice.StatusResponse, error) {
nodeInfo, err := s.consensus.Info(ctx, &v1.InfoRequest{})
nodeInfo, err := s.consensus.Info(ctx, &abciproto.InfoRequest{})
if err != nil {
return nil, err
}

View File

@ -8,6 +8,7 @@ import (
"cosmossdk.io/server/v2/cometbft/handlers"
"cosmossdk.io/server/v2/cometbft/mempool"
"cosmossdk.io/server/v2/cometbft/types"
"cosmossdk.io/server/v2/streaming"
"cosmossdk.io/store/v2/snapshots"
)
@ -23,8 +24,14 @@ type ServerOptions[T transaction.Tx] struct {
ExtendVoteHandler handlers.ExtendVoteHandler
KeygenF keyGenF
Mempool func(cfg map[string]any) mempool.Mempool[T]
// Set mempool for the consensus module.
Mempool func(cfg map[string]any) mempool.Mempool[T]
// Set streaming manager for the consensus module.
StreamingManager streaming.Manager
// Set snapshot options for the consensus module.
SnapshotOptions func(cfg map[string]any) snapshots.SnapshotOptions
// Allows additional snapshotter implementations to be used for creating and restoring snapshots.
SnapshotExtensions []snapshots.ExtensionSnapshotter
AddrPeerFilter types.PeerFilter // filter peers by address and port
IdPeerFilter types.PeerFilter // filter peers by node ID
@ -40,7 +47,9 @@ func DefaultServerOptions[T transaction.Tx]() ServerOptions[T] {
VerifyVoteExtensionHandler: handlers.NoOpVerifyVoteExtensionHandler(),
ExtendVoteHandler: handlers.NoOpExtendVote(),
Mempool: func(cfg map[string]any) mempool.Mempool[T] { return mempool.NoOpMempool[T]{} },
StreamingManager: streaming.Manager{},
SnapshotOptions: func(cfg map[string]any) snapshots.SnapshotOptions { return snapshots.NewSnapshotOptions(0, 0) },
SnapshotExtensions: []snapshots.ExtensionSnapshotter{},
AddrPeerFilter: nil,
IdPeerFilter: nil,
KeygenF: func() (cmtcrypto.PrivKey, error) { return cmted22519.GenPrivKey(), nil },

View File

@ -11,7 +11,7 @@ import (
cometerrors "cosmossdk.io/server/v2/cometbft/types/errors"
)
func (c *Consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error) {
func (c *consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error) {
// "/p2p" prefix for p2p queries
if len(path) < 4 {
return nil, errorsmod.Wrap(cometerrors.ErrUnknownRequest, "path should be p2p filter <addr|id> <parameter>")
@ -34,14 +34,14 @@ func (c *Consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error
return nil, errorsmod.Wrap(cometerrors.ErrUnknownRequest, "expected second parameter to be 'filter'")
}
// handlerQueryApp handles the query requests for the application.
// handleQueryApp handles the query requests for the application.
// It expects the path parameter to have at least two elements.
// The second element of the path can be either 'simulate' or 'version'.
// If the second element is 'simulate', it decodes the request data into a transaction,
// simulates the transaction using the application, and returns the simulation result.
// If the second element is 'version', it returns the version of the application.
// If the second element is neither 'simulate' nor 'version', it returns an error indicating an unknown query.
func (c *Consensus[T]) handlerQueryApp(ctx context.Context, path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) {
func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) {
if len(path) < 2 {
return nil, errorsmod.Wrap(
cometerrors.ErrUnknownRequest,
@ -83,7 +83,7 @@ func (c *Consensus[T]) handlerQueryApp(ctx context.Context, path []string, req *
return nil, errorsmod.Wrapf(cometerrors.ErrUnknownRequest, "unknown query: %s", path)
}
func (c *Consensus[T]) handleQueryStore(path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) {
func (c *consensus[T]) handleQueryStore(path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) {
req.Path = "/" + strings.Join(path[1:], "/")
if req.Height <= 1 && req.Prove {
return nil, errorsmod.Wrap(

View File

@ -7,21 +7,27 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
abciserver "github.com/cometbft/cometbft/abci/server"
abci "github.com/cometbft/cometbft/abci/types"
cmtcmd "github.com/cometbft/cometbft/cmd/cometbft/commands"
cmtcfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/node"
"github.com/cometbft/cometbft/p2p"
pvm "github.com/cometbft/cometbft/privval"
"github.com/cometbft/cometbft/proxy"
gogoproto "github.com/cosmos/gogoproto/proto"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/grpc"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/schema/decoding"
"cosmossdk.io/schema/indexer"
serverv2 "cosmossdk.io/server/v2"
@ -31,6 +37,8 @@ import (
"cosmossdk.io/server/v2/cometbft/types"
"cosmossdk.io/store/v2/snapshots"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types"
)
@ -44,32 +52,39 @@ var (
type CometBFTServer[T transaction.Tx] struct {
Node *node.Node
Consensus *Consensus[T]
Consensus abci.Application
initTxCodec transaction.Codec[T]
logger log.Logger
serverOptions ServerOptions[T]
config Config
cfgOptions []CfgOption
app appmanager.AppManager[T]
txCodec transaction.Codec[T]
store types.Store
}
func New[T transaction.Tx](
logger log.Logger,
appName string,
store types.Store,
appManager appmanager.AppManager[T],
app appmanager.AppManager[T],
appCodec codec.Codec,
txCodec transaction.Codec[T],
queryHandlers map[string]appmodulev2.Handler,
decoderResolver decoding.DecoderResolver,
txCodec transaction.Codec[T],
cfg server.ConfigMap,
serverOptions ServerOptions[T],
cfg server.ConfigMap,
cfgOptions ...CfgOption,
) (*CometBFTServer[T], error) {
srv := &CometBFTServer[T]{
initTxCodec: txCodec,
serverOptions: serverOptions,
cfgOptions: cfgOptions,
app: app,
txCodec: txCodec,
store: store,
}
srv.logger = logger.With(log.ModuleKey, srv.Name())
home, _ := cfg[serverv2.FlagHome].(string)
@ -111,27 +126,6 @@ func New[T transaction.Tx](
indexEvents[e] = struct{}{}
}
srv.logger = logger.With(log.ModuleKey, srv.Name())
consensus := NewConsensus(
logger,
appName,
appManager,
srv.serverOptions.Mempool(cfg),
indexEvents,
queryHandlers,
store,
srv.config,
srv.initTxCodec,
chainID,
)
consensus.prepareProposalHandler = srv.serverOptions.PrepareProposalHandler
consensus.processProposalHandler = srv.serverOptions.ProcessProposalHandler
consensus.checkTxHandler = srv.serverOptions.CheckTxHandler
consensus.verifyVoteExt = srv.serverOptions.VerifyVoteExtensionHandler
consensus.extendVote = srv.serverOptions.ExtendVoteHandler
consensus.addrPeerFilter = srv.serverOptions.AddrPeerFilter
consensus.idPeerFilter = srv.serverOptions.IdPeerFilter
ss := store.GetStateStorage().(snapshots.StorageSnapshotter)
sc := store.GetStateCommitment().(snapshots.CommitSnapshotter)
@ -139,14 +133,11 @@ func New[T transaction.Tx](
if err != nil {
return nil, err
}
consensus.snapshotManager = snapshots.NewManager(
snapshotStore, srv.serverOptions.SnapshotOptions(cfg), sc, ss, nil, logger)
srv.Consensus = consensus
// initialize the indexer
var listener *appdata.Listener
if indexerCfg := srv.config.AppTomlConfig.Indexer; len(indexerCfg.Target) > 0 {
listener, err := indexer.StartIndexing(indexer.IndexingOptions{
indexingTarget, err := indexer.StartIndexing(indexer.IndexingOptions{
Config: indexerCfg,
Resolver: decoderResolver,
Logger: logger.With(log.ModuleKey, "indexer"),
@ -154,7 +145,51 @@ func New[T transaction.Tx](
if err != nil {
return nil, fmt.Errorf("failed to start indexing: %w", err)
}
consensus.listener = &listener.Listener
listener = &indexingTarget.Listener
}
// snapshot manager
snapshotManager := snapshots.NewManager(
snapshotStore,
srv.serverOptions.SnapshotOptions(cfg),
sc,
ss,
nil, // extensions snapshotter registered below
logger,
)
if exts := serverOptions.SnapshotExtensions; len(exts) > 0 {
if err := snapshotManager.RegisterExtensions(serverOptions.SnapshotExtensions...); err != nil {
return nil, fmt.Errorf("failed to register snapshot extensions: %w", err)
}
}
srv.Consensus = &consensus[T]{
appName: appName,
version: getCometBFTServerVersion(),
app: app,
cfg: srv.config,
store: store,
logger: logger,
txCodec: txCodec,
appCodec: appCodec,
listener: listener,
snapshotManager: snapshotManager,
streamingManager: srv.serverOptions.StreamingManager,
mempool: srv.serverOptions.Mempool(cfg),
lastCommittedHeight: atomic.Int64{},
prepareProposalHandler: srv.serverOptions.PrepareProposalHandler,
processProposalHandler: srv.serverOptions.ProcessProposalHandler,
verifyVoteExt: srv.serverOptions.VerifyVoteExtensionHandler,
checkTxHandler: srv.serverOptions.CheckTxHandler,
extendVote: srv.serverOptions.ExtendVoteHandler,
chainID: chainID,
indexedEvents: indexEvents,
initialHeight: 0,
queryHandlersMap: queryHandlers,
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
addrPeerFilter: srv.serverOptions.AddrPeerFilter,
idPeerFilter: srv.serverOptions.IdPeerFilter,
}
return srv, nil
@ -333,3 +368,13 @@ func (s *CometBFTServer[T]) WriteCustomConfigAt(configPath string) error {
cmtcfg.WriteConfigFile(filepath.Join(configPath, "config.toml"), cfg.ConfigTomlConfig)
return nil
}
// gRPCServiceRegistrar returns a function that registers the CometBFT gRPC service
// Those services are defined for backward compatibility.
// Eventually, they will be removed in favor of the new gRPC services.
func (s *CometBFTServer[T]) GRPCServiceRegistrar(
clientCtx client.Context,
cfg server.ConfigMap,
) func(srv *grpc.Server) error {
return gRPCServiceRegistrar[T](clientCtx, cfg, s.Config().(*AppTomlConfig), s.txCodec, s.Consensus, s.app)
}

View File

@ -34,7 +34,7 @@ func GetSnapshotStore(rootDir string) (*snapshots.Store, error) {
}
// ApplySnapshotChunk implements types.Application.
func (c *Consensus[T]) ApplySnapshotChunk(_ context.Context, req *abci.ApplySnapshotChunkRequest) (*abci.ApplySnapshotChunkResponse, error) {
func (c *consensus[T]) ApplySnapshotChunk(_ context.Context, req *abci.ApplySnapshotChunkRequest) (*abci.ApplySnapshotChunkResponse, error) {
if c.snapshotManager == nil {
c.logger.Error("snapshot manager not configured")
return &abci.ApplySnapshotChunkResponse{Result: abci.APPLY_SNAPSHOT_CHUNK_RESULT_ABORT}, nil
@ -65,7 +65,7 @@ func (c *Consensus[T]) ApplySnapshotChunk(_ context.Context, req *abci.ApplySnap
}
// ListSnapshots implements types.Application.
func (c *Consensus[T]) ListSnapshots(_ context.Context, ctx *abci.ListSnapshotsRequest) (*abci.ListSnapshotsResponse, error) {
func (c *consensus[T]) ListSnapshots(_ context.Context, ctx *abci.ListSnapshotsRequest) (*abci.ListSnapshotsResponse, error) {
if c.snapshotManager == nil {
return nil, nil
}
@ -91,7 +91,7 @@ func (c *Consensus[T]) ListSnapshots(_ context.Context, ctx *abci.ListSnapshotsR
}
// LoadSnapshotChunk implements types.Application.
func (c *Consensus[T]) LoadSnapshotChunk(_ context.Context, req *abci.LoadSnapshotChunkRequest) (*abci.LoadSnapshotChunkResponse, error) {
func (c *consensus[T]) LoadSnapshotChunk(_ context.Context, req *abci.LoadSnapshotChunkRequest) (*abci.LoadSnapshotChunkResponse, error) {
if c.snapshotManager == nil {
return &abci.LoadSnapshotChunkResponse{}, nil
}
@ -112,7 +112,7 @@ func (c *Consensus[T]) LoadSnapshotChunk(_ context.Context, req *abci.LoadSnapsh
}
// OfferSnapshot implements types.Application.
func (c *Consensus[T]) OfferSnapshot(_ context.Context, req *abci.OfferSnapshotRequest) (*abci.OfferSnapshotResponse, error) {
func (c *consensus[T]) OfferSnapshot(_ context.Context, req *abci.OfferSnapshotRequest) (*abci.OfferSnapshotResponse, error) {
if c.snapshotManager == nil {
c.logger.Error("snapshot manager not configured")
return &abci.OfferSnapshotResponse{Result: abci.OFFER_SNAPSHOT_RESULT_ABORT}, nil

View File

@ -12,7 +12,7 @@ import (
)
// streamDeliverBlockChanges will stream all the changes happened during deliver block.
func (c *Consensus[T]) streamDeliverBlockChanges(
func (c *consensus[T]) streamDeliverBlockChanges(
ctx context.Context,
height int64,
txs [][]byte,
@ -40,7 +40,7 @@ func (c *Consensus[T]) streamDeliverBlockChanges(
}
}
for _, streamingListener := range c.streaming.Listeners {
for _, streamingListener := range c.streamingManager.Listeners {
events, err := streaming.IntoStreamingEvents(events)
if err != nil {
return err

View File

@ -19,7 +19,7 @@ import (
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"
consensus "cosmossdk.io/x/consensus/types"
"cosmossdk.io/x/consensus/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
@ -268,7 +268,7 @@ func QueryResult(err error, debug bool) *abci.QueryResponse {
}
}
func (c *Consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockRequest) error {
func (c *consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockRequest) error {
if req.Height < 1 {
return fmt.Errorf("invalid height: %d", req.Height)
}
@ -302,18 +302,18 @@ func (c *Consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockReques
// GetConsensusParams makes a query to the consensus module in order to get the latest consensus
// parameters from committed state
func (c *Consensus[T]) GetConsensusParams(ctx context.Context) (*cmtproto.ConsensusParams, error) {
func (c *consensus[T]) GetConsensusParams(ctx context.Context) (*cmtproto.ConsensusParams, error) {
latestVersion, err := c.store.GetLatestVersion()
if err != nil {
return nil, err
}
res, err := c.app.Query(ctx, latestVersion, &consensus.QueryParamsRequest{})
res, err := c.app.Query(ctx, latestVersion, &types.QueryParamsRequest{})
if err != nil {
return nil, err
}
if r, ok := res.(*consensus.QueryParamsResponse); !ok {
if r, ok := res.(*types.QueryParamsResponse); !ok {
return nil, errors.New("failed to query consensus params")
} else {
// convert our params to cometbft params
@ -321,7 +321,7 @@ func (c *Consensus[T]) GetConsensusParams(ctx context.Context) (*cmtproto.Consen
}
}
func (c *Consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, commitHeight int64) int64 {
func (c *consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, commitHeight int64) int64 {
// pruning is disabled if minRetainBlocks is zero
if c.cfg.AppTomlConfig.MinRetainBlocks == 0 {
return 0
@ -376,7 +376,7 @@ func (c *Consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, com
}
// checkHalt checks if height or time exceeds halt-height or halt-time respectively.
func (c *Consensus[T]) checkHalt(height int64, time time.Time) error {
func (c *consensus[T]) checkHalt(height int64, time time.Time) error {
var halt bool
switch {
case c.cfg.AppTomlConfig.HaltHeight > 0 && uint64(height) >= c.cfg.AppTomlConfig.HaltHeight:

View File

@ -40,7 +40,7 @@ type CommandDependencies[T transaction.Tx] struct {
TxConfig client.TxConfig
ModuleManager *runtimev2.MM[T]
SimApp *simapp.SimApp[T]
// could be more generic with serverv2.ServerComponent[T]
// could generally be more generic with serverv2.ServerComponent[T]
// however, we want to register extra grpc handlers
ConsensusServer *cometbft.CometBFTServer[T]
ClientContext client.Context
@ -106,11 +106,12 @@ func InitRootCmd[T transaction.Tx](
simApp.Name(),
simApp.Store(),
simApp.App.AppManager,
simApp.AppCodec(),
&genericTxDecoder[T]{deps.TxConfig},
simApp.App.QueryHandlers(),
simApp.App.SchemaDecoderResolver(),
&genericTxDecoder[T]{deps.TxConfig},
deps.GlobalConfig,
initCometOptions[T](),
deps.GlobalConfig,
)
if err != nil {
return nil, err
@ -129,7 +130,7 @@ func InitRootCmd[T transaction.Tx](
simApp.Query,
deps.GlobalConfig,
grpcserver.WithExtraGRPCHandlers[T](
deps.ConsensusServer.Consensus.GRPCServiceRegistrar(
deps.ConsensusServer.GRPCServiceRegistrar(
deps.ClientContext,
deps.GlobalConfig,
),