refactor(server/v2/cometbft): Handle non-module service queries (backport #22803) (#22834)

Co-authored-by: Hieu Vu <72878483+hieuvubk@users.noreply.github.com>
Co-authored-by: Julien Robert <julien@rbrt.fr>
This commit is contained in:
mergify[bot] 2024-12-11 17:15:21 +01:00 committed by GitHub
parent 95e940670d
commit d6d177771d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 312 additions and 58 deletions

View File

@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/reflect/protoregistry"
"cosmossdk.io/collections"
addresscodec "cosmossdk.io/core/address"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/comet"
corecontext "cosmossdk.io/core/context"
@ -36,9 +37,6 @@ import (
consensustypes "cosmossdk.io/x/consensus/types"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
)
const (
@ -86,8 +84,10 @@ type consensus[T transaction.Tx] struct {
addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID
queryHandlersMap map[string]appmodulev2.Handler
getProtoRegistry func() (*protoregistry.Files, error)
queryHandlersMap map[string]appmodulev2.Handler
getProtoRegistry func() (*protoregistry.Files, error)
consensusAddressCodec addresscodec.Codec
cfgMap server.ConfigMap
}
// CheckTx implements types.Application.
@ -184,6 +184,16 @@ func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
return resp, err
}
// when a client did not provide a query height, manually inject the latest
// for modules queries, AppManager does it automatically
if req.Height == 0 {
latestVersion, err := c.store.GetLatestVersion()
if err != nil {
return nil, err
}
req.Height = int64(latestVersion)
}
// this error most probably means that we can't handle it with a proto message, so
// it must be an app/p2p/store query
path := splitABCIQueryPath(req.Path)
@ -238,48 +248,15 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq
handlerFullName = string(md.Input().FullName())
}
// special case for simulation as it is an external gRPC registered on the grpc server component
// special case for non-module services as they are external gRPC registered on the grpc server component
// and not on the app itself, so it won't pass the router afterwards.
if req.Path == "/cosmos.tx.v1beta1.Service/Simulate" {
simulateRequest := &txtypes.SimulateRequest{}
err = gogoproto.Unmarshal(req.Data, simulateRequest)
if err != nil {
return nil, true, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
}
tx, err := c.txCodec.Decode(simulateRequest.TxBytes)
if err != nil {
return nil, true, fmt.Errorf("failed to decode tx: %w", err)
}
txResult, _, err := c.app.Simulate(ctx, tx)
if err != nil {
return nil, true, fmt.Errorf("failed with gas used: '%d': %w", txResult.GasUsed, err)
}
msgResponses := make([]*codectypes.Any, 0, len(txResult.Resp))
// pack the messages into Any
for _, msg := range txResult.Resp {
anyMsg, err := codectypes.NewAnyWithValue(msg)
if err != nil {
return nil, true, fmt.Errorf("failed to pack message response: %w", err)
}
msgResponses = append(msgResponses, anyMsg)
}
resp := &txtypes.SimulateResponse{
GasInfo: &sdk.GasInfo{
GasUsed: txResult.GasUsed,
GasWanted: txResult.GasWanted,
},
Result: &sdk.Result{
MsgResponses: msgResponses,
},
}
res, err := queryResponse(resp, req.Height)
return res, true, err
externalResp, err := c.maybeHandleExternalServices(ctx, req)
if err != nil {
return nil, true, err
} else if externalResp != nil {
resp, err = queryResponse(externalResp, req.Height)
return resp, true, err
}
handler, found := c.queryHandlersMap[handlerFullName]

View File

@ -21,7 +21,7 @@ require (
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5
cosmossdk.io/log v1.5.0
cosmossdk.io/schema v0.4.0 //main
cosmossdk.io/server/v2 v2.0.0-20241209145349-34f407d6367a // main
cosmossdk.io/server/v2 v2.0.0-20241211154953-a38a6a2c8bc8 // main
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880 // main
cosmossdk.io/server/v2/stf v0.0.0-20241204101618-7fa2356c07aa // main
cosmossdk.io/store/v2 v2.0.0-20241209145349-34f407d6367a // main

View File

@ -24,8 +24,8 @@ cosmossdk.io/math v1.4.0 h1:XbgExXFnXmF/CccPPEto40gOO7FpWu9yWNAZPN3nkNQ=
cosmossdk.io/math v1.4.0/go.mod h1:O5PkD4apz2jZs4zqFdTr16e1dcaQCc5z6lkEnrrppuk=
cosmossdk.io/schema v0.4.0 h1:TrBs5BUnGqniAwEBVsjiisrAk3h3DK/zHLU1O8fRnO0=
cosmossdk.io/schema v0.4.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/server/v2 v2.0.0-20241209145349-34f407d6367a h1:qkO+rB9yD6+bTGgQpaf+oyvgEdkPs5TUaFK3OEYh3AI=
cosmossdk.io/server/v2 v2.0.0-20241209145349-34f407d6367a/go.mod h1:sb6WEIMHAT+8z7iM6IbBeSf+62wSkss2q+coDxmOi/o=
cosmossdk.io/server/v2 v2.0.0-20241211154953-a38a6a2c8bc8 h1:Z1tRewzCemRc4iwKPFGhS+FG4+Xqq8zm/6UBzeEGjXs=
cosmossdk.io/server/v2 v2.0.0-20241211154953-a38a6a2c8bc8/go.mod h1:RAectNg/rAaq0AHOuxbxY2YVTYTVBJCTVg5wHpCIZhE=
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880 h1:0mtB8fSvDjD835WwWF4rGk9qy5TjVjk2jsW14L37v0E=
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880/go.mod h1:elhlrldWtm+9U4PxE0G3wjz83yQwVVGVAOncXJPY1Xc=
cosmossdk.io/server/v2/stf v0.0.0-20241204101618-7fa2356c07aa h1:2V9nqgL50nw45HcQw1UBRQ/y0QBzrgfLIStPSxFnMtY=

View File

@ -2,6 +2,7 @@ package cometbft
import (
"context"
"errors"
"fmt"
"strings"
@ -17,13 +18,19 @@ import (
"cosmossdk.io/core/server"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
storeserver "cosmossdk.io/server/v2/store"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/grpc/cmtservice"
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/std"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/query"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/cosmos/cosmos-sdk/x/auth/migrations/legacytx"
authtx "github.com/cosmos/cosmos-sdk/x/auth/tx"
@ -46,7 +53,7 @@ func gRPCServiceRegistrar[T transaction.Tx](
) func(srv *grpc.Server) error {
return func(srv *grpc.Server) error {
cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, consensus.Query, clientCtx.ConsensusAddressCodec))
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app})
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app, consensus})
nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, cometBFTAppConfig, consensus})
return nil
@ -57,6 +64,7 @@ type txServer[T transaction.Tx] struct {
clientCtx client.Context
txCodec transaction.Codec[T]
app appSimulator[T]
consensus abci.Application
}
// BroadcastTx implements tx.ServiceServer.
@ -65,8 +73,84 @@ func (t txServer[T]) BroadcastTx(ctx context.Context, req *txtypes.BroadcastTxRe
}
// GetBlockWithTxs implements tx.ServiceServer.
func (t txServer[T]) GetBlockWithTxs(context.Context, *txtypes.GetBlockWithTxsRequest) (*txtypes.GetBlockWithTxsResponse, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
func (t txServer[T]) GetBlockWithTxs(ctx context.Context, req *txtypes.GetBlockWithTxsRequest) (*txtypes.GetBlockWithTxsResponse, error) {
logger := log.NewNopLogger()
if req == nil {
return nil, status.Error(codes.InvalidArgument, "request cannot be nil")
}
resp, err := t.consensus.Info(ctx, &abci.InfoRequest{})
if err != nil {
return nil, err
}
currentHeight := resp.LastBlockHeight
if req.Height < 1 || req.Height > currentHeight {
return nil, sdkerrors.ErrInvalidHeight.Wrapf("requested height %d but height must not be less than 1 "+
"or greater than the current height %d", req.Height, currentHeight)
}
node, err := t.clientCtx.GetNode()
if err != nil {
return nil, err
}
blockID, block, err := cmtservice.GetProtoBlock(ctx, node, &req.Height)
if err != nil {
return nil, err
}
var offset, limit uint64
if req.Pagination != nil {
offset = req.Pagination.Offset
limit = req.Pagination.Limit
} else {
offset = 0
limit = query.DefaultLimit
}
blockTxs := block.Data.Txs
blockTxsLn := uint64(len(blockTxs))
txs := make([]*txtypes.Tx, 0, limit)
if offset >= blockTxsLn && blockTxsLn != 0 {
return nil, sdkerrors.ErrInvalidRequest.Wrapf("out of range: cannot paginate %d txs with offset %d and limit %d", blockTxsLn, offset, limit)
}
decodeTxAt := func(i uint64) error {
tx := blockTxs[i]
txb, err := t.clientCtx.TxConfig.TxDecoder()(tx)
fmt.Println("TxDecoder", txb, err)
if err != nil {
return err
}
p, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()
if err != nil {
return err
}
txs = append(txs, p)
return nil
}
if req.Pagination != nil && req.Pagination.Reverse {
for i, count := offset, uint64(0); i > 0 && count != limit; i, count = i-1, count+1 {
if err = decodeTxAt(i); err != nil {
logger.Error("failed to decode tx", "error", err)
}
}
} else {
for i, count := offset, uint64(0); i < blockTxsLn && count != limit; i, count = i+1, count+1 {
if err = decodeTxAt(i); err != nil {
logger.Error("failed to decode tx", "error", err)
}
}
}
return &txtypes.GetBlockWithTxsResponse{
Txs: txs,
BlockId: &blockID,
Block: block,
Pagination: &query.PageResponse{
Total: blockTxsLn,
},
}, nil
}
// GetTx implements tx.ServiceServer.
@ -100,8 +184,33 @@ func (t txServer[T]) GetTx(ctx context.Context, req *txtypes.GetTxRequest) (*txt
}
// GetTxsEvent implements tx.ServiceServer.
func (t txServer[T]) GetTxsEvent(context.Context, *txtypes.GetTxsEventRequest) (*txtypes.GetTxsEventResponse, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
func (t txServer[T]) GetTxsEvent(ctx context.Context, req *txtypes.GetTxsEventRequest) (*txtypes.GetTxsEventResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "request cannot be nil")
}
orderBy := parseOrderBy(req.OrderBy)
result, err := authtx.QueryTxsByEvents(t.clientCtx, int(req.Page), int(req.Limit), req.Query, orderBy)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
txsList := make([]*txtypes.Tx, len(result.Txs))
for i, tx := range result.Txs {
protoTx, ok := tx.Tx.GetCachedValue().(*txtypes.Tx)
if !ok {
return nil, status.Errorf(codes.Internal, "getting cached value failed expected %T, got %T", txtypes.Tx{}, tx.Tx.GetCachedValue())
}
txsList[i] = protoTx
}
return &txtypes.GetTxsEventResponse{
Txs: txsList,
TxResponses: result.Txs,
Total: result.TotalCount,
}, nil
}
// Simulate implements tx.ServiceServer.
@ -159,8 +268,23 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest)
}
// TxDecode implements tx.ServiceServer.
func (t txServer[T]) TxDecode(context.Context, *txtypes.TxDecodeRequest) (*txtypes.TxDecodeResponse, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
func (t txServer[T]) TxDecode(ctx context.Context, req *txtypes.TxDecodeRequest) (*txtypes.TxDecodeResponse, error) {
if req.TxBytes == nil {
return nil, status.Error(codes.InvalidArgument, "invalid empty tx bytes")
}
txb, err := t.clientCtx.TxConfig.TxDecoder()(req.TxBytes)
if err != nil {
return nil, err
}
tx, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() // TODO: maybe we can break the Tx interface to add this also
if err != nil {
return nil, err
}
return &txtypes.TxDecodeResponse{
Tx: tx,
}, nil
}
// TxDecodeAmino implements tx.ServiceServer.
@ -325,3 +449,151 @@ var CometBFTAutoCLIDescriptor = &autocliv1.ServiceCommandDescriptor{
},
},
}
func parseOrderBy(orderBy txtypes.OrderBy) string {
switch orderBy {
case txtypes.OrderBy_ORDER_BY_ASC:
return "asc"
case txtypes.OrderBy_ORDER_BY_DESC:
return "desc"
default:
return "" // Defaults to CometBFT's default, which is `asc` now.
}
}
func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abci.QueryRequest) (transaction.Msg, error) {
// Handle comet service
if strings.HasPrefix(req.Path, "/cosmos.base.tendermint.v1beta1.Service") {
rpcClient, _ := rpchttp.New(c.cfg.ConfigTomlConfig.RPC.ListenAddress)
cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec)
paths := strings.Split(req.Path, "/")
if len(paths) <= 2 {
return nil, fmt.Errorf("invalid request path: %s", req.Path)
}
var resp transaction.Msg
var err error
switch paths[2] {
case "GetNodeInfo":
resp, err = handleExternalService(ctx, req, cometQServer.GetNodeInfo)
case "GetSyncing":
resp, err = handleExternalService(ctx, req, cometQServer.GetSyncing)
case "GetLatestBlock":
resp, err = handleExternalService(ctx, req, cometQServer.GetLatestBlock)
case "GetBlockByHeight":
resp, err = handleExternalService(ctx, req, cometQServer.GetBlockByHeight)
case "GetLatestValidatorSet":
resp, err = handleExternalService(ctx, req, cometQServer.GetLatestValidatorSet)
case "GetValidatorSetByHeight":
resp, err = handleExternalService(ctx, req, cometQServer.GetValidatorSetByHeight)
case "ABCIQuery":
resp, err = handleExternalService(ctx, req, cometQServer.ABCIQuery)
}
return resp, err
}
// Handle node service
if strings.HasPrefix(req.Path, "/cosmos.base.node.v1beta1.Service") {
nodeQService := nodeServer[T]{c.cfgMap, c.cfg.AppTomlConfig, c}
paths := strings.Split(req.Path, "/")
if len(paths) <= 2 {
return nil, fmt.Errorf("invalid request path: %s", req.Path)
}
var resp transaction.Msg
var err error
switch paths[2] {
case "Config":
resp, err = handleExternalService(ctx, req, nodeQService.Config)
case "Status":
resp, err = handleExternalService(ctx, req, nodeQService.Status)
}
return resp, err
}
// Handle tx service
if strings.HasPrefix(req.Path, "/cosmos.tx.v1beta1.Service") {
// init simple client context
amino := codec.NewLegacyAmino()
std.RegisterLegacyAminoCodec(amino)
txConfig := authtx.NewTxConfig(
c.appCodec,
c.appCodec.InterfaceRegistry().SigningContext().AddressCodec(),
c.appCodec.InterfaceRegistry().SigningContext().ValidatorAddressCodec(),
authtx.DefaultSignModes,
)
rpcClient, _ := client.NewClientFromNode(c.cfg.AppTomlConfig.Address)
clientCtx := client.Context{}.
WithLegacyAmino(amino).
WithCodec(c.appCodec).
WithTxConfig(txConfig).
WithNodeURI(c.cfg.AppTomlConfig.Address).
WithClient(rpcClient)
txService := txServer[T]{
clientCtx: clientCtx,
txCodec: c.txCodec,
app: c.app,
consensus: c,
}
paths := strings.Split(req.Path, "/")
if len(paths) <= 2 {
return nil, fmt.Errorf("invalid request path: %s", req.Path)
}
var resp transaction.Msg
var err error
switch paths[2] {
case "Simulate":
resp, err = handleExternalService(ctx, req, txService.Simulate)
case "GetTx":
resp, err = handleExternalService(ctx, req, txService.GetTx)
case "BroadcastTx":
return nil, errors.New("can't route a broadcast tx message")
case "GetTxsEvent":
resp, err = handleExternalService(ctx, req, txService.GetTxsEvent)
case "GetBlockWithTxs":
resp, err = handleExternalService(ctx, req, txService.GetBlockWithTxs)
case "TxDecode":
resp, err = handleExternalService(ctx, req, txService.TxDecode)
case "TxEncode":
resp, err = handleExternalService(ctx, req, txService.TxEncode)
case "TxEncodeAmino":
resp, err = handleExternalService(ctx, req, txService.TxEncodeAmino)
case "TxDecodeAmino":
resp, err = handleExternalService(ctx, req, txService.TxDecodeAmino)
}
return resp, err
}
return nil, nil
}
func handleExternalService[T any, PT interface {
*T
proto.Message
},
U any, UT interface {
*U
proto.Message
}](
ctx context.Context,
rawReq *abciproto.QueryRequest,
handler func(ctx context.Context, msg PT) (UT, error),
) (transaction.Msg, error) {
req := PT(new(T))
err := proto.Unmarshal(rawReq.Data, req)
if err != nil {
return nil, err
}
typedResp, err := handler(ctx, req)
if err != nil {
return nil, err
}
return typedResp, nil
}

View File

@ -23,6 +23,7 @@ import (
"github.com/spf13/pflag"
"google.golang.org/grpc"
addresscodec "cosmossdk.io/core/address"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
@ -72,6 +73,7 @@ func New[T transaction.Tx](
app appmanager.AppManager[T],
appCodec codec.Codec,
txCodec transaction.Codec[T],
consensusAddressCodec addresscodec.Codec,
queryHandlers map[string]appmodulev2.Handler,
decoderResolver decoding.DecoderResolver,
serverOptions ServerOptions[T],
@ -189,6 +191,8 @@ func New[T transaction.Tx](
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
addrPeerFilter: srv.serverOptions.AddrPeerFilter,
idPeerFilter: srv.serverOptions.IdPeerFilter,
cfgMap: cfg,
consensusAddressCodec: consensusAddressCodec,
}
c.optimisticExec = oe.NewOptimisticExecution(

View File

@ -12,7 +12,7 @@ require (
cosmossdk.io/log v1.5.0
cosmossdk.io/math v1.4.0
cosmossdk.io/runtime/v2 v2.0.0-20241204100030-c47fb8ab2dcb // main
cosmossdk.io/server/v2 v2.0.0-20241209145349-34f407d6367a // main
cosmossdk.io/server/v2 v2.0.0-20241211154953-a38a6a2c8bc8 // main
cosmossdk.io/server/v2/cometbft v0.0.0-00010101000000-000000000000
cosmossdk.io/store/v2 v2.0.0-20241209145349-34f407d6367a // main
cosmossdk.io/tools/confix v0.0.0-00010101000000-000000000000

View File

@ -217,8 +217,8 @@ cosmossdk.io/runtime/v2 v2.0.0-20241204100030-c47fb8ab2dcb/go.mod h1:7DCLOq3Xzyq
cosmossdk.io/schema v0.3.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/schema v0.4.0 h1:TrBs5BUnGqniAwEBVsjiisrAk3h3DK/zHLU1O8fRnO0=
cosmossdk.io/schema v0.4.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/server/v2 v2.0.0-20241209145349-34f407d6367a h1:qkO+rB9yD6+bTGgQpaf+oyvgEdkPs5TUaFK3OEYh3AI=
cosmossdk.io/server/v2 v2.0.0-20241209145349-34f407d6367a/go.mod h1:sb6WEIMHAT+8z7iM6IbBeSf+62wSkss2q+coDxmOi/o=
cosmossdk.io/server/v2 v2.0.0-20241211154953-a38a6a2c8bc8 h1:Z1tRewzCemRc4iwKPFGhS+FG4+Xqq8zm/6UBzeEGjXs=
cosmossdk.io/server/v2 v2.0.0-20241211154953-a38a6a2c8bc8/go.mod h1:RAectNg/rAaq0AHOuxbxY2YVTYTVBJCTVg5wHpCIZhE=
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880 h1:0mtB8fSvDjD835WwWF4rGk9qy5TjVjk2jsW14L37v0E=
cosmossdk.io/server/v2/appmanager v0.0.0-20241203212527-7d117425d880/go.mod h1:elhlrldWtm+9U4PxE0G3wjz83yQwVVGVAOncXJPY1Xc=
cosmossdk.io/server/v2/stf v0.0.0-20241204101618-7fa2356c07aa h1:2V9nqgL50nw45HcQw1UBRQ/y0QBzrgfLIStPSxFnMtY=

View File

@ -117,6 +117,7 @@ func InitRootCmd[T transaction.Tx](
simApp.App.AppManager,
simApp.AppCodec(),
&client.DefaultTxDecoder[T]{TxConfig: deps.TxConfig},
deps.ClientContext.ConsensusAddressCodec,
simApp.App.QueryHandlers(),
simApp.App.SchemaDecoderResolver(),
initCometOptions[T](),