fix(stf,server/v2/cometbft): fix default events + improve codec handling (#22837)

This commit is contained in:
Julien Robert 2024-12-12 11:42:57 +01:00 committed by GitHub
parent a068405339
commit e6948eeda8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 123 additions and 59 deletions

View File

@ -15,7 +15,6 @@ import (
"google.golang.org/protobuf/reflect/protoregistry"
"cosmossdk.io/collections"
addresscodec "cosmossdk.io/core/address"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/comet"
corecontext "cosmossdk.io/core/context"
@ -35,8 +34,6 @@ import (
"cosmossdk.io/server/v2/streaming"
"cosmossdk.io/store/v2/snapshots"
consensustypes "cosmossdk.io/x/consensus/types"
"github.com/cosmos/cosmos-sdk/codec"
)
const (
@ -52,13 +49,12 @@ type consensus[T transaction.Tx] struct {
logger log.Logger
appName, version string
app appmanager.AppManager[T]
appCodec codec.Codec
txCodec transaction.Codec[T]
store types.Store
listener *appdata.Listener
snapshotManager *snapshots.Manager
streamingManager streaming.Manager
mempool mempool.Mempool[T]
appCodecs AppCodecs[T]
cfg Config
chainID string
@ -84,16 +80,15 @@ type consensus[T transaction.Tx] struct {
addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID
queryHandlersMap map[string]appmodulev2.Handler
getProtoRegistry func() (*protoregistry.Files, error)
consensusAddressCodec addresscodec.Codec
cfgMap server.ConfigMap
queryHandlersMap map[string]appmodulev2.Handler
getProtoRegistry func() (*protoregistry.Files, error)
cfgMap server.ConfigMap
}
// CheckTx implements types.Application.
// It is called by cometbft to verify transaction validity
func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
decodedTx, err := c.txCodec.Decode(req.Tx)
decodedTx, err := c.appCodecs.TxCodec.Decode(req.Tx)
if err != nil {
return nil, err
}
@ -325,7 +320,7 @@ func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe
ctx,
br,
req.AppStateBytes,
c.txCodec)
c.appCodecs.TxCodec)
if err != nil {
return nil, fmt.Errorf("genesis state init failure: %w", err)
}
@ -392,7 +387,7 @@ func (c *consensus[T]) PrepareProposal(
LastCommit: toCoreExtendedCommitInfo(req.LocalLastCommit),
})
txs, err := c.prepareProposalHandler(ciCtx, c.app, c.txCodec, req)
txs, err := c.prepareProposalHandler(ciCtx, c.app, c.appCodecs.TxCodec, req)
if err != nil {
return nil, err
}
@ -438,7 +433,7 @@ func (c *consensus[T]) ProcessProposal(
LastCommit: toCoreCommitInfo(req.ProposedLastCommit),
})
err := c.processProposalHandler(ciCtx, c.app, c.txCodec, req)
err := c.processProposalHandler(ciCtx, c.app, c.appCodecs.TxCodec, req)
if err != nil {
c.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abciproto.ProcessProposalResponse{
@ -567,7 +562,7 @@ func (c *consensus[T]) internalFinalizeBlock(
// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(c.logger, req.Txs, c.txCodec)
decodedTxs, err := decodeTxs(c.logger, req.Txs, c.appCodecs.TxCodec)
if err != nil {
return nil, nil, nil, err
}

View File

@ -886,13 +886,15 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
}
return &consensus[mock.Tx]{
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
mempool: mempool,
store: mockStore,
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
txCodec: mock.TxCodec{},
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
mempool: mempool,
store: mockStore,
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
appCodecs: AppCodecs[mock.Tx]{
TxCodec: mock.TxCodec{},
},
chainID: "test",
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
queryHandlersMap: queryHandler,

View File

@ -27,7 +27,6 @@ import (
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/std"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/query"
@ -117,12 +116,13 @@ func (t txServer[T]) GetBlockWithTxs(ctx context.Context, req *txtypes.GetBlockW
}
decodeTxAt := func(i uint64) error {
tx := blockTxs[i]
txb, err := t.clientCtx.TxConfig.TxDecoder()(tx)
fmt.Println("TxDecoder", txb, err)
txb, err := t.txCodec.Decode(tx)
if err != nil {
return err
}
p, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()
// txServer works only with sdk.Tx
p, err := any(txb).(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()
if err != nil {
return err
}
@ -256,6 +256,11 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest)
msgResponses = append(msgResponses, anyMsg)
}
event, err := intoABCIEvents(txResult.Events, map[string]struct{}{}, false)
if err != nil {
return nil, status.Errorf(codes.Unknown, "failed to convert events: %v", err)
}
return &txtypes.SimulateResponse{
GasInfo: &sdk.GasInfo{
GasUsed: txResult.GasUsed,
@ -263,6 +268,7 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest)
},
Result: &sdk.Result{
MsgResponses: msgResponses,
Events: event,
},
}, nil
}
@ -273,15 +279,17 @@ func (t txServer[T]) TxDecode(ctx context.Context, req *txtypes.TxDecodeRequest)
return nil, status.Error(codes.InvalidArgument, "invalid empty tx bytes")
}
txb, err := t.clientCtx.TxConfig.TxDecoder()(req.TxBytes)
txb, err := t.txCodec.Decode(req.TxBytes)
if err != nil {
return nil, err
}
tx, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() // TODO: maybe we can break the Tx interface to add this also
// txServer works only with sdk.Tx
tx, err := any(txb).(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()
if err != nil {
return nil, err
}
return &txtypes.TxDecodeResponse{
Tx: tx,
}, nil
@ -350,7 +358,7 @@ func (t txServer[T]) TxEncodeAmino(_ context.Context, req *txtypes.TxEncodeAmino
var stdTx legacytx.StdTx
err := t.clientCtx.LegacyAmino.UnmarshalJSON([]byte(req.AminoJson), &stdTx)
if err != nil {
return nil, err
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("invalid request %s", err))
}
encodedBytes, err := t.clientCtx.LegacyAmino.Marshal(stdTx)
@ -466,7 +474,7 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc
if strings.HasPrefix(req.Path, "/cosmos.base.tendermint.v1beta1.Service") {
rpcClient, _ := rpchttp.New(c.cfg.ConfigTomlConfig.RPC.ListenAddress)
cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec)
cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.appCodecs.ConsensusAddressCodec)
paths := strings.Split(req.Path, "/")
if len(paths) <= 2 {
return nil, fmt.Errorf("invalid request path: %s", req.Path)
@ -516,27 +524,18 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc
// Handle tx service
if strings.HasPrefix(req.Path, "/cosmos.tx.v1beta1.Service") {
// init simple client context
amino := codec.NewLegacyAmino()
std.RegisterLegacyAminoCodec(amino)
txConfig := authtx.NewTxConfig(
c.appCodec,
c.appCodec.InterfaceRegistry().SigningContext().AddressCodec(),
c.appCodec.InterfaceRegistry().SigningContext().ValidatorAddressCodec(),
authtx.DefaultSignModes,
)
rpcClient, _ := client.NewClientFromNode(c.cfg.AppTomlConfig.Address)
// init simple client context
clientCtx := client.Context{}.
WithLegacyAmino(amino).
WithCodec(c.appCodec).
WithTxConfig(txConfig).
WithLegacyAmino(c.appCodecs.LegacyAmino.(*codec.LegacyAmino)).
WithCodec(c.appCodecs.AppCodec).
WithNodeURI(c.cfg.AppTomlConfig.Address).
WithClient(rpcClient)
txService := txServer[T]{
clientCtx: clientCtx,
txCodec: c.txCodec,
txCodec: c.appCodecs.TxCodec,
app: c.app,
consensus: c,
}

View File

@ -51,7 +51,7 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a
switch path[1] {
case "simulate":
tx, err := c.txCodec.Decode(req.Data)
tx, err := c.appCodecs.TxCodec.Decode(req.Data)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to decode tx")
}

View File

@ -25,6 +25,7 @@ import (
addresscodec "cosmossdk.io/core/address"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/registry"
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
@ -66,14 +67,24 @@ type CometBFTServer[T transaction.Tx] struct {
store types.Store
}
// AppCodecs contains all codecs that the CometBFT server requires
// provided by the application. They are extracted in struct to not be API
// breaking once amino is completely deprecated or new codecs should be added.
type AppCodecs[T transaction.Tx] struct {
TxCodec transaction.Codec[T]
// The following codecs are only required for the gRPC services
AppCodec codec.Codec
LegacyAmino registry.AminoRegistrar
ConsensusAddressCodec addresscodec.Codec
}
func New[T transaction.Tx](
logger log.Logger,
appName string,
store types.Store,
app appmanager.AppManager[T],
appCodec codec.Codec,
txCodec transaction.Codec[T],
consensusAddressCodec addresscodec.Codec,
appCodecs AppCodecs[T],
queryHandlers map[string]appmodulev2.Handler,
decoderResolver decoding.DecoderResolver,
serverOptions ServerOptions[T],
@ -84,7 +95,7 @@ func New[T transaction.Tx](
serverOptions: serverOptions,
cfgOptions: cfgOptions,
app: app,
txCodec: txCodec,
txCodec: appCodecs.TxCodec,
store: store,
}
srv.logger = logger.With(log.ModuleKey, srv.Name())
@ -172,8 +183,7 @@ func New[T transaction.Tx](
cfg: srv.config,
store: store,
logger: logger,
txCodec: txCodec,
appCodec: appCodec,
appCodecs: appCodecs,
listener: listener,
snapshotManager: snapshotManager,
streamingManager: srv.serverOptions.StreamingManager,
@ -192,7 +202,6 @@ func New[T transaction.Tx](
addrPeerFilter: srv.serverOptions.AddrPeerFilter,
idPeerFilter: srv.serverOptions.IdPeerFilter,
cfgMap: cfg,
consensusAddressCodec: consensusAddressCodec,
}
c.optimisticExec = oe.NewOptimisticExecution(

View File

@ -2,8 +2,10 @@ package stf
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
corecontext "cosmossdk.io/core/context"
@ -358,12 +360,53 @@ func (s STF[T]) runTxMsgs(
e.EventIndex = int32(j + 1)
events = append(events, e)
}
// add message event
events = append(events, createMessageEvent(msg, int32(i+1), int32(len(execCtx.events)+1)))
}
consumed := execCtx.meter.Limit() - execCtx.meter.Remaining()
return msgResps, consumed, events, nil
}
// Create a message event, with two kv: action, the type url of the message
// and module, the module of the message.
func createMessageEvent(msg transaction.Msg, msgIndex, eventIndex int32) event.Event {
// Assumes that module name is the second element of the msg type URL
// e.g. "cosmos.bank.v1beta1.MsgSend" => "bank"
// It returns an empty string if the input is not a valid type URL
getModuleNameFromTypeURL := func(input string) string {
moduleName := strings.Split(input, ".")
if len(moduleName) > 1 {
return moduleName[1]
}
return ""
}
return event.Event{
MsgIndex: msgIndex,
EventIndex: eventIndex,
Type: "message",
Attributes: func() ([]appdata.EventAttribute, error) {
typeURL := msgTypeURL(msg)
return []appdata.EventAttribute{
{Key: "action", Value: "/" + typeURL},
{Key: "module", Value: getModuleNameFromTypeURL(typeURL)},
}, nil
},
Data: func() (json.RawMessage, error) {
typeURL := msgTypeURL(msg)
attrs := []appdata.EventAttribute{
{Key: "action", Value: "/" + typeURL},
{Key: "module", Value: getModuleNameFromTypeURL(typeURL)},
}
return json.Marshal(attrs)
},
}
}
// preBlock executes the pre block logic.
func (s STF[T]) preBlock(
ctx *executionContext,

View File

@ -225,8 +225,8 @@ func TestSTF(t *testing.T) {
}
// check TxEvents
events := txResult.Events
if len(events) != 6 {
t.Fatalf("Expected 6 TxEvents, got %d", len(events))
if len(events) != 7 {
t.Fatalf("Expected 7 TxEvents, got %d", len(events))
}
for i, event := range events {
if event.BlockStage != appdata.TxProcessingStage {
@ -235,7 +235,8 @@ func TestSTF(t *testing.T) {
if event.TxIndex != 1 {
t.Errorf("Expected TxIndex 1, got %d", event.TxIndex)
}
if event.EventIndex != int32(i%2+1) {
if event.EventIndex != int32(i%2+1) &&
(event.Type == "message" && event.EventIndex != 3) { // special case for message event type as it happens in the msg handling flow
t.Errorf("Expected EventIndex %d, got %d", i%2+1, event.EventIndex)
}
@ -247,7 +248,7 @@ func TestSTF(t *testing.T) {
t.Errorf("Expected 1 or 2 attributes, got %d", len(attrs))
}
if len(attrs) == 2 {
if len(attrs) == 2 && event.Type != "message" {
if attrs[1].Key != "index" || attrs[1].Value != "2" {
t.Errorf("Expected attribute key 'index' and value '2', got key '%s' and value '%s'", attrs[1].Key, attrs[1].Value)
}
@ -273,7 +274,19 @@ func TestSTF(t *testing.T) {
if attrs[0].Key != "msg" || attrs[0].Value != "&BoolValue{Value:true,XXX_unrecognized:[],}" {
t.Errorf("Expected msg attribute with value '&BoolValue{Value:true,XXX_unrecognized:[],}', got '%s'", attrs[0].Value)
}
case 4, 5:
case 4:
if event.Type != "message" {
t.Errorf("Expected event type 'message', got %s", event.Type)
}
if event.MsgIndex != 1 {
t.Errorf("Expected MsgIndex 1, got %d", event.MsgIndex)
}
if attrs[0].Key != "action" || attrs[0].Value != "/google.protobuf.BoolValue" {
t.Errorf("Expected msg attribute with value '/google.protobuf.BoolValue', got '%s'", attrs[0].Value)
}
case 5, 6:
if event.Type != "post-tx-exec" {
t.Errorf("Expected event type 'post-tx-exec', got %s", event.Type)
}

View File

@ -115,9 +115,12 @@ func InitRootCmd[T transaction.Tx](
simApp.Name(),
simApp.Store(),
simApp.App.AppManager,
simApp.AppCodec(),
&client.DefaultTxDecoder[T]{TxConfig: deps.TxConfig},
deps.ClientContext.ConsensusAddressCodec,
cometbft.AppCodecs[T]{
AppCodec: simApp.AppCodec(),
TxCodec: &client.DefaultTxDecoder[T]{TxConfig: deps.TxConfig},
LegacyAmino: deps.ClientContext.LegacyAmino,
ConsensusAddressCodec: deps.ClientContext.ConsensusAddressCodec,
},
simApp.App.QueryHandlers(),
simApp.App.SchemaDecoderResolver(),
initCometOptions[T](),

View File

@ -122,7 +122,7 @@ func TestSimulateTx_GRPC(t *testing.T) {
require.NoError(t, err)
// Check the result and gas used are correct.
//
// The 12 events are:
// The 10 events are:
// - Sending Fee to the pool: coin_spent, coin_received and transfer
// - tx.* events: tx.fee, tx.acc_seq, tx.signature
// - Sending Amount to recipient: coin_spent, coin_received and transfer