feat(baseapp): integrate the appdata.Listener in baseapp (#21965)
This commit is contained in:
parent
c8f4cf787b
commit
80726f7ebd
@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -831,7 +832,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
|
||||
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
|
||||
// vote extensions, so skip those.
|
||||
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
|
||||
for _, rawTx := range req.Txs {
|
||||
for txIndex, rawTx := range req.Txs {
|
||||
|
||||
response := app.deliverTx(rawTx)
|
||||
|
||||
@ -843,6 +844,12 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
|
||||
// continue
|
||||
}
|
||||
|
||||
// append the tx index to the response.Events
|
||||
for i, event := range response.Events {
|
||||
response.Events[i].Attributes = append(event.Attributes,
|
||||
abci.EventAttribute{Key: "tx_index", Value: strconv.Itoa(txIndex)})
|
||||
}
|
||||
|
||||
txResults = append(txResults, response)
|
||||
}
|
||||
|
||||
|
||||
@ -676,7 +676,7 @@ func TestABCI_FinalizeBlock_DeliverTx(t *testing.T) {
|
||||
|
||||
events := res.TxResults[i].GetEvents()
|
||||
require.Len(t, events, 3, "should contain ante handler, message type and counter events respectively")
|
||||
require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0], events[0], "ante handler event")
|
||||
require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0].Attributes[0], events[0].Attributes[0], "ante handler event")
|
||||
require.Equal(t, sdk.MarkEventsToIndex(counterEvent(sdk.EventTypeMessage, counter).ToABCIEvents(), map[string]struct{}{})[0].Attributes[0], events[2].Attributes[0], "msg handler update counter event")
|
||||
}
|
||||
|
||||
|
||||
@ -718,6 +718,15 @@ func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, erro
|
||||
ctx = ctx.WithBlockGasMeter(gasMeter)
|
||||
app.finalizeBlockState.SetContext(ctx)
|
||||
events = ctx.EventManager().ABCIEvents()
|
||||
|
||||
// append PreBlock attributes to all events
|
||||
for i, event := range events {
|
||||
events[i].Attributes = append(
|
||||
event.Attributes,
|
||||
abci.EventAttribute{Key: "mode", Value: "PreBlock"},
|
||||
abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)},
|
||||
)
|
||||
}
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
@ -739,6 +748,7 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er
|
||||
resp.Events[i].Attributes = append(
|
||||
event.Attributes,
|
||||
abci.EventAttribute{Key: "mode", Value: "BeginBlock"},
|
||||
abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)},
|
||||
)
|
||||
}
|
||||
|
||||
@ -801,6 +811,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
|
||||
eb.Events[i].Attributes = append(
|
||||
event.Attributes,
|
||||
abci.EventAttribute{Key: "mode", Value: "EndBlock"},
|
||||
abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)},
|
||||
)
|
||||
}
|
||||
|
||||
@ -1151,6 +1162,12 @@ func createEvents(cdc codec.Codec, events sdk.Events, msg sdk.Msg, reflectMsg pr
|
||||
}
|
||||
}
|
||||
|
||||
// append the event_index attribute to all events
|
||||
msgEvent = msgEvent.AppendAttributes(sdk.NewAttribute("event_index", "0"))
|
||||
for i, event := range events {
|
||||
events[i] = event.AppendAttributes(sdk.NewAttribute("event_index", strconv.Itoa(i+1)))
|
||||
}
|
||||
|
||||
return sdk.Events{msgEvent}.AppendEvents(events), nil
|
||||
}
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
|
||||
@ -143,21 +144,111 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore
|
||||
return exposeStoreKeys
|
||||
}
|
||||
|
||||
func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
|
||||
appdataEvent := appdata.Event{
|
||||
Type: event.Type,
|
||||
Attributes: func() ([]appdata.EventAttribute, error) {
|
||||
attrs := make([]appdata.EventAttribute, len(event.Attributes))
|
||||
for j, attr := range event.Attributes {
|
||||
attrs[j] = appdata.EventAttribute{
|
||||
Key: attr.Key,
|
||||
Value: attr.Value,
|
||||
}
|
||||
}
|
||||
return attrs, nil
|
||||
},
|
||||
}
|
||||
|
||||
for _, attr := range event.Attributes {
|
||||
if attr.Key == "mode" {
|
||||
switch attr.Value {
|
||||
case "PreBlock":
|
||||
appdataEvent.BlockStage = appdata.PreBlockStage
|
||||
case "BeginBlock":
|
||||
appdataEvent.BlockStage = appdata.BeginBlockStage
|
||||
case "EndBlock":
|
||||
appdataEvent.BlockStage = appdata.EndBlockStage
|
||||
default:
|
||||
appdataEvent.BlockStage = appdata.UnknownBlockStage
|
||||
}
|
||||
} else if attr.Key == "tx_index" {
|
||||
txIndex, err := strconv.Atoi(attr.Value)
|
||||
if err != nil {
|
||||
return appdata.Event{}, err
|
||||
}
|
||||
appdataEvent.TxIndex = int32(txIndex + 1)
|
||||
appdataEvent.BlockStage = appdata.TxProcessingStage
|
||||
} else if attr.Key == "msg_index" {
|
||||
msgIndex, err := strconv.Atoi(attr.Value)
|
||||
if err != nil {
|
||||
return appdata.Event{}, err
|
||||
}
|
||||
appdataEvent.MsgIndex = int32(msgIndex + 1)
|
||||
} else if attr.Key == "event_index" {
|
||||
eventIndex, err := strconv.Atoi(attr.Value)
|
||||
if err != nil {
|
||||
return appdata.Event{}, err
|
||||
}
|
||||
appdataEvent.EventIndex = int32(eventIndex + 1)
|
||||
}
|
||||
}
|
||||
|
||||
return appdataEvent, nil
|
||||
}
|
||||
|
||||
type listenerWrapper struct {
|
||||
listener appdata.Listener
|
||||
}
|
||||
|
||||
// NewListenerWrapper creates a new listenerWrapper.
|
||||
// It is only used for testing purposes.
|
||||
func NewListenerWrapper(listener appdata.Listener) listenerWrapper {
|
||||
return listenerWrapper{listener: listener}
|
||||
}
|
||||
|
||||
func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
|
||||
if p.listener.StartBlock != nil {
|
||||
err := p.listener.StartBlock(appdata.StartBlockData{
|
||||
Height: uint64(req.Height),
|
||||
})
|
||||
if err != nil {
|
||||
if err := p.listener.StartBlock(appdata.StartBlockData{
|
||||
Height: uint64(req.Height),
|
||||
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
|
||||
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if p.listener.OnTx != nil {
|
||||
for i, tx := range req.Txs {
|
||||
if err := p.listener.OnTx(appdata.TxData{
|
||||
TxIndex: int32(i),
|
||||
Bytes: func() ([]byte, error) { return tx, nil },
|
||||
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if p.listener.OnEvent != nil {
|
||||
events := make([]appdata.Event, len(res.Events))
|
||||
var err error
|
||||
for i, event := range res.Events {
|
||||
events[i], err = eventToAppDataEvent(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, txResult := range res.TxResults {
|
||||
for _, event := range txResult.Events {
|
||||
appdataEvent, err := eventToAppDataEvent(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events = append(events, appdataEvent)
|
||||
}
|
||||
}
|
||||
if err := p.listener.OnEvent(appdata.EventData{Events: events}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//// TODO txs, events
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -9,10 +9,12 @@ import (
|
||||
tmproto "github.com/cometbft/cometbft/api/cometbft/types/v1"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cosmossdk.io/schema/appdata"
|
||||
storetypes "cosmossdk.io/store/types"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
baseapptestutil "github.com/cosmos/cosmos-sdk/baseapp/testutil"
|
||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||
)
|
||||
|
||||
var _ storetypes.ABCIListener = (*MockABCIListener)(nil)
|
||||
@ -146,3 +148,186 @@ func Test_Ctx_with_StreamingManager(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
type mockAppDataListener struct {
|
||||
appdata.Listener
|
||||
|
||||
startBlockData []appdata.StartBlockData
|
||||
txData []appdata.TxData
|
||||
eventData []appdata.EventData
|
||||
kvPairData []appdata.KVPairData
|
||||
commitData []appdata.CommitData
|
||||
}
|
||||
|
||||
func newMockAppDataListener() *mockAppDataListener {
|
||||
listener := &mockAppDataListener{}
|
||||
|
||||
// Initialize the Listener with custom behavior to store data
|
||||
listener.Listener = appdata.Listener{
|
||||
StartBlock: func(data appdata.StartBlockData) error {
|
||||
listener.startBlockData = append(listener.startBlockData, data) // Store StartBlockData
|
||||
return nil
|
||||
},
|
||||
OnTx: func(data appdata.TxData) error {
|
||||
listener.txData = append(listener.txData, data) // Store TxData
|
||||
return nil
|
||||
},
|
||||
OnEvent: func(data appdata.EventData) error {
|
||||
listener.eventData = append(listener.eventData, data) // Store EventData
|
||||
return nil
|
||||
},
|
||||
OnKVPair: func(data appdata.KVPairData) error {
|
||||
listener.kvPairData = append(listener.kvPairData, data) // Store KVPairData
|
||||
return nil
|
||||
},
|
||||
Commit: func(data appdata.CommitData) (func() error, error) {
|
||||
listener.commitData = append(listener.commitData, data) // Store CommitData
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
|
||||
return listener
|
||||
}
|
||||
|
||||
func TestAppDataListener(t *testing.T) {
|
||||
anteKey := []byte("ante-key")
|
||||
anteOpt := func(bapp *baseapp.BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, anteKey)) }
|
||||
distOpt := func(bapp *baseapp.BaseApp) { bapp.MountStores(distKey1) }
|
||||
mockListener := newMockAppDataListener()
|
||||
streamingManager := storetypes.StreamingManager{ABCIListeners: []storetypes.ABCIListener{baseapp.NewListenerWrapper(mockListener.Listener)}}
|
||||
streamingManagerOpt := func(bapp *baseapp.BaseApp) { bapp.SetStreamingManager(streamingManager) }
|
||||
addListenerOpt := func(bapp *baseapp.BaseApp) { bapp.CommitMultiStore().AddListeners([]storetypes.StoreKey{distKey1}) }
|
||||
|
||||
// for event tests
|
||||
baseappOpts := func(app *baseapp.BaseApp) {
|
||||
app.SetPreBlocker(func(ctx sdk.Context, req *abci.FinalizeBlockRequest) error {
|
||||
ctx.EventManager().EmitEvent(sdk.NewEvent("pre-block"))
|
||||
return nil
|
||||
})
|
||||
app.SetBeginBlocker(func(_ sdk.Context) (sdk.BeginBlock, error) {
|
||||
return sdk.BeginBlock{
|
||||
Events: []abci.Event{
|
||||
{Type: "begin-block"},
|
||||
},
|
||||
}, nil
|
||||
})
|
||||
app.SetEndBlocker(func(_ sdk.Context) (sdk.EndBlock, error) {
|
||||
return sdk.EndBlock{
|
||||
Events: []abci.Event{
|
||||
{Type: "end-block"},
|
||||
},
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
suite := NewBaseAppSuite(t, anteOpt, distOpt, streamingManagerOpt, addListenerOpt, baseappOpts)
|
||||
|
||||
_, err := suite.baseApp.InitChain(
|
||||
&abci.InitChainRequest{
|
||||
ConsensusParams: &tmproto.ConsensusParams{},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
deliverKey := []byte("deliver-key")
|
||||
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
|
||||
|
||||
txCount := 5
|
||||
txs := make([][]byte, txCount)
|
||||
|
||||
for i := 0; i < txCount; i++ {
|
||||
tx := newTxCounter(t, suite.txConfig, suite.ac, int64(i), int64(i))
|
||||
|
||||
txBytes, err := suite.txConfig.TxEncoder()(tx)
|
||||
require.NoError(t, err)
|
||||
|
||||
sKey := []byte(fmt.Sprintf("distKey%d", i))
|
||||
sVal := []byte(fmt.Sprintf("distVal%d", i))
|
||||
store := getFinalizeBlockStateCtx(suite.baseApp).KVStore(distKey1)
|
||||
store.Set(sKey, sVal)
|
||||
|
||||
txs[i] = txBytes
|
||||
}
|
||||
|
||||
_, err = suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: txs})
|
||||
require.NoError(t, err)
|
||||
_, err = suite.baseApp.Commit()
|
||||
require.NoError(t, err)
|
||||
|
||||
// StartBlockData
|
||||
require.Len(t, mockListener.startBlockData, 1)
|
||||
require.Equal(t, uint64(1), mockListener.startBlockData[0].Height)
|
||||
// TxData
|
||||
txData := mockListener.txData
|
||||
require.Len(t, txData, len(txs))
|
||||
for i := 0; i < txCount; i++ {
|
||||
require.Equal(t, int32(i), txData[i].TxIndex)
|
||||
txBytes, err := txData[i].Bytes()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, txs[i], txBytes)
|
||||
}
|
||||
// KVPairData
|
||||
require.Len(t, mockListener.kvPairData, 1)
|
||||
updates := mockListener.kvPairData[0].Updates
|
||||
for i := 0; i < txCount; i++ {
|
||||
require.Equal(t, []byte(distKey1.Name()), updates[i].Actor)
|
||||
require.Len(t, updates[i].StateChanges, 1)
|
||||
sKey := []byte(fmt.Sprintf("distKey%d", i))
|
||||
sVal := []byte(fmt.Sprintf("distVal%d", i))
|
||||
require.Equal(t, sKey, updates[i].StateChanges[0].Key)
|
||||
require.Equal(t, sVal, updates[i].StateChanges[0].Value)
|
||||
}
|
||||
// CommitData
|
||||
require.Len(t, mockListener.commitData, 1)
|
||||
// EventData
|
||||
require.Len(t, mockListener.eventData, 1)
|
||||
events := mockListener.eventData[0].Events
|
||||
require.Len(t, events, 3+txCount*3)
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
require.Equal(t, int32(0), events[i].TxIndex)
|
||||
require.Equal(t, int32(0), events[i].MsgIndex)
|
||||
require.Equal(t, int32(1), events[i].EventIndex)
|
||||
attrs, err := events[i].Attributes()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, attrs, 2)
|
||||
switch i {
|
||||
case 0:
|
||||
require.Equal(t, appdata.PreBlockStage, events[i].BlockStage)
|
||||
require.Equal(t, "pre-block", events[i].Type)
|
||||
case 1:
|
||||
require.Equal(t, appdata.BeginBlockStage, events[i].BlockStage)
|
||||
require.Equal(t, "begin-block", events[i].Type)
|
||||
case 2:
|
||||
require.Equal(t, appdata.EndBlockStage, events[i].BlockStage)
|
||||
require.Equal(t, "end-block", events[i].Type)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 3; i < 3+txCount*3; i++ {
|
||||
require.Equal(t, appdata.TxProcessingStage, events[i].BlockStage)
|
||||
require.Equal(t, int32(i/3), events[i].TxIndex)
|
||||
switch i % 3 {
|
||||
case 0:
|
||||
require.Equal(t, "ante_handler", events[i].Type)
|
||||
require.Equal(t, int32(0), events[i].MsgIndex)
|
||||
require.Equal(t, int32(0), events[i].EventIndex)
|
||||
attrs, err := events[i].Attributes()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, attrs, 2)
|
||||
case 1:
|
||||
require.Equal(t, "message", events[i].Type)
|
||||
require.Equal(t, int32(1), events[i].MsgIndex)
|
||||
require.Equal(t, int32(1), events[i].EventIndex)
|
||||
attrs, err := events[i].Attributes()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, attrs, 5)
|
||||
case 2:
|
||||
require.Equal(t, "message", events[i].Type)
|
||||
require.Equal(t, int32(1), events[i].MsgIndex)
|
||||
require.Equal(t, int32(2), events[i].EventIndex)
|
||||
attrs, err := events[i].Attributes()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, attrs, 4)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user