chore(sims): Complete sims integration for app v2 (#23478)

This commit is contained in:
Alexander Peters 2025-01-24 08:58:11 +01:00 committed by GitHub
parent 494389d15a
commit 5e58330196
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 148 additions and 75 deletions

View File

@ -1,4 +1,3 @@
# TODO: This should be ported to work with SimApp v2.
#? test-sim-nondeterminism: Run non-determinism test for simapp
test-sim-nondeterminism:
@ -6,27 +5,6 @@ test-sim-nondeterminism:
@cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout=30m -tags='sims' -run TestAppStateDeterminism \
-NumBlocks=100 -BlockSize=200
# Requires an exported plugin. See store/streaming/README.md for documentation.
#
# example:
# export COSMOS_SDK_ABCI_V1=<path-to-plugin-binary>
# make test-sim-nondeterminism-streaming
#
# Using the built-in examples:
# export COSMOS_SDK_ABCI_V1=<path-to-sdk>/store/streaming/abci/examples/file/file
# make test-sim-nondeterminism-streaming
test-sim-nondeterminism-streaming:
# @echo "Running non-determinism-streaming test..."
# @cd ${CURRENT_DIR}/simapp && go test -failfast -mod=readonly -timeout=30m -tags='sims' -run TestAppStateDeterminism \
# -NumBlocks=100 -BlockSize=200 -EnableStreaming=true
test-sim-custom-genesis-fast:
# @echo "Running custom genesis simulation..."
# @echo "By default, ${HOME}/.simapp/config/genesis.json will be used."
# @cd ${CURRENT_DIR}/simapp && go test -failfast -mod=readonly -timeout=30m -tags='sims' -run TestFullAppSimulation -Genesis=${HOME}/.simapp/config/genesis.json \
# -NumBlocks=100 -BlockSize=200 -Seed=99 -SigverifyTx=false
test-sim-import-export:
@echo "Running application import/export simulation. This may take several minutes..."
@cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout 20m -tags='sims' -run TestAppImportExport \
@ -37,11 +15,6 @@ test-sim-after-import:
@cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout 30m -tags='sims' -run TestAppSimulationAfterImport \
-NumBlocks=50
test-sim-custom-genesis-multi-seed:
# @echo "Running multi-seed custom genesis simulation..."
# @echo "By default, ${HOME}/.simapp/config/genesis.json will be used."
# @cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout 30m -tags='sims' -run TestFullAppSimulation -Genesis=${HOME}/.simapp/config/genesis.json \
# -NumBlocks=400
test-sim-multi-seed-long:
@echo "Running long multi-seed application simulation. This may take awhile!"
@ -55,11 +28,8 @@ test-sim-multi-seed-short:
.PHONY: \
test-sim-nondeterminism \
test-sim-nondeterminism-streaming \
test-sim-custom-genesis-fast \
test-sim-import-export \
test-sim-after-import \
test-sim-custom-genesis-multi-seed \
test-sim-multi-seed-short \
test-sim-multi-seed-long \

View File

@ -22,6 +22,7 @@ Each entry must include the Github issue reference in the following format:
## [Unreleased]
## [v1.0.0-beta.2](https://github.com/cosmos/cosmos-sdk/releases/tag/server/v2/appmanager%2Fv1.0.0-beta.2)
* [#23013](https://github.com/cosmos/cosmos-sdk/pull/23013) Introduce `TransactionFuzzer`, an interface for processing and generated state transitions.

View File

@ -18,7 +18,6 @@ import (
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/comet"
corecontext "cosmossdk.io/core/context"
"cosmossdk.io/core/event"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
@ -510,16 +509,8 @@ func (c *consensus[T]) FinalizeBlock(
return nil, fmt.Errorf("unable to commit the changeset: %w", err)
}
var events []event.Event
events = append(events, resp.PreBlockEvents...)
events = append(events, resp.BeginBlockEvents...)
for _, tx := range resp.TxResults {
events = append(events, tx.Events...)
}
events = append(events, resp.EndBlockEvents...)
// listen to state streaming changes in accordance with the block
err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, decodedTxs, resp.TxResults, events, stateChanges)
err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, decodedTxs, *resp, stateChanges)
if err != nil {
return nil, err
}

View File

@ -3,10 +3,12 @@ package cometbft
import (
"context"
"encoding/json"
"fmt"
"cosmossdk.io/core/event"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/streaming"
@ -18,14 +20,69 @@ func (c *consensus[T]) streamDeliverBlockChanges(
height int64,
txs [][]byte,
decodedTxs []T,
txResults []server.TxResult,
events []event.Event,
blockResp server.BlockResponse,
stateChanges []store.StateChanges,
) error {
return StreamOut(ctx, height, txs, decodedTxs, blockResp, stateChanges, c.streamingManager, c.listener, c.cfg.AppTomlConfig.Trace, c.logger.Error)
}
// StreamOut stream all the changes happened during deliver block.
func StreamOut[T transaction.Tx](
ctx context.Context,
height int64,
rawTXs [][]byte,
decodedTXs []T,
blockRsp server.BlockResponse,
stateChanges []store.StateChanges,
streamingManager streaming.Manager,
listener *appdata.Listener,
traceErrs bool,
logErrFn func(msg string, keyVals ...any),
) error {
var events []event.Event
events = append(events, blockRsp.PreBlockEvents...)
events = append(events, blockRsp.BeginBlockEvents...)
for _, tx := range blockRsp.TxResults {
events = append(events, tx.Events...)
}
events = append(events, blockRsp.EndBlockEvents...)
txResults := blockRsp.TxResults
err := doServeStreamListeners(
ctx,
height,
rawTXs,
txResults,
traceErrs,
streamingManager,
events,
logErrFn,
stateChanges,
)
if err != nil {
return err
}
return doServeHookListener(listener, height, rawTXs, decodedTXs, events, stateChanges)
}
func doServeStreamListeners(
ctx context.Context,
height int64,
rawTXs [][]byte,
txResults []server.TxResult,
traceErrs bool,
streamingManager streaming.Manager,
events []event.Event,
logErrFn func(msg string, keyVals ...any),
stateChanges []store.StateChanges,
) error {
if len(streamingManager.Listeners) == 0 {
return nil
}
// convert txresults to streaming txresults
streamingTxResults := make([]*streaming.ExecTxResult, len(txResults))
for i, txResult := range txResults {
space, code, log := errorsmod.ABCIInfo(txResult.Error, c.cfg.AppTomlConfig.Trace)
space, code, log := errorsmod.ABCIInfo(txResult.Error, traceErrs)
events, err := streaming.IntoStreamingEvents(txResult.Events)
if err != nil {
@ -42,31 +99,47 @@ func (c *consensus[T]) streamDeliverBlockChanges(
}
}
for _, streamingListener := range c.streamingManager.Listeners {
for _, streamingListener := range streamingManager.Listeners {
events, err := streaming.IntoStreamingEvents(events)
if err != nil {
return err
}
if err := streamingListener.ListenDeliverBlock(ctx, streaming.ListenDeliverBlockRequest{
BlockHeight: height,
Txs: txs,
Txs: rawTXs,
TxResults: streamingTxResults,
Events: events,
}); err != nil {
c.logger.Error("ListenDeliverBlock listening hook failed", "height", height, "err", err)
if streamingManager.StopNodeOnErr {
return fmt.Errorf("listen deliver block: %w", err)
}
logErrFn("ListenDeliverBlock listening hook failed", "height", height, "err", err)
}
if err := streamingListener.ListenStateChanges(ctx, intoStreamingKVPairs(stateChanges)); err != nil {
c.logger.Error("ListenStateChanges listening hook failed", "height", height, "err", err)
if streamingManager.StopNodeOnErr {
return fmt.Errorf("listen state changes: %w", err)
}
logErrFn("ListenStateChanges listening hook failed", "height", height, "err", err)
}
}
return nil
}
if c.listener == nil {
func doServeHookListener[T transaction.Tx](
listener *appdata.Listener,
height int64,
rawTXs [][]byte,
decodedTXs []T,
events []event.Event,
stateChanges []store.StateChanges,
) error {
if listener == nil {
return nil
}
// stream the StartBlockData to the listener.
if c.listener.StartBlock != nil {
if err := c.listener.StartBlock(appdata.StartBlockData{
if listener.StartBlock != nil {
if err := listener.StartBlock(appdata.StartBlockData{
Height: uint64(height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
@ -75,14 +148,14 @@ func (c *consensus[T]) streamDeliverBlockChanges(
}
}
// stream the TxData to the listener.
if c.listener.OnTx != nil {
for i, tx := range txs {
if err := c.listener.OnTx(appdata.TxData{
if listener.OnTx != nil {
for i, tx := range rawTXs {
if err := listener.OnTx(appdata.TxData{
BlockNumber: uint64(height),
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: func() (json.RawMessage, error) {
return json.Marshal(decodedTxs[i])
return json.Marshal(decodedTXs[i])
},
}); err != nil {
return err
@ -90,20 +163,20 @@ func (c *consensus[T]) streamDeliverBlockChanges(
}
}
// stream the EventData to the listener.
if c.listener.OnEvent != nil {
if err := c.listener.OnEvent(appdata.EventData{Events: events}); err != nil {
if listener.OnEvent != nil {
if err := listener.OnEvent(appdata.EventData{Events: events}); err != nil {
return err
}
}
// stream the KVPairData to the listener.
if c.listener.OnKVPair != nil {
if err := c.listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil {
if listener.OnKVPair != nil {
if err := listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil {
return err
}
}
// stream the CommitData to the listener.
if c.listener.Commit != nil {
if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil {
if listener.Commit != nil {
if completionCallback, err := listener.Commit(appdata.CommitData{}); err != nil {
return err
} else if completionCallback != nil {
if err := completionCallback(); err != nil {

View File

@ -22,6 +22,7 @@ Each entry must include the Github issue reference in the following format:
## [Unreleased]
## [v1.0.0-beta.2](https://github.com/cosmos/cosmos-sdk/releases/tag/server/v2/stf%2Fv1.0.0-beta.2)
* [#23013](https://github.com/cosmos/cosmos-sdk/pull/23013) Introduce `DeliverSims`, an interface for state transitions by sims.

View File

@ -51,6 +51,8 @@ require (
google.golang.org/protobuf v1.36.3
)
require cosmossdk.io/schema v1.0.0
require (
buf.build/gen/go/cometbft/cometbft/protocolbuffers/go v1.36.3-20241120201313-68e42a58b301.1 // indirect
buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.36.3-20240130113600-88ef6483f90f.1 // indirect
@ -64,7 +66,6 @@ require (
cosmossdk.io/core/testing v0.0.1 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/errors/v2 v2.0.0 // indirect
cosmossdk.io/schema v1.0.0 // indirect
cosmossdk.io/server/v2/stf v1.0.0-beta.2 // indirect
cosmossdk.io/store v1.10.0-rc.1.0.20241218084712-ca559989da43 // indirect
cosmossdk.io/x/tx v1.0.1 // indirect

View File

@ -7,6 +7,7 @@ import (
"iter"
"maps"
"math/rand"
"os"
"slices"
"testing"
"time"
@ -26,7 +27,10 @@ import (
"cosmossdk.io/depinject"
"cosmossdk.io/log"
"cosmossdk.io/runtime/v2"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft"
"cosmossdk.io/server/v2/streaming"
storev2 "cosmossdk.io/store/v2"
consensustypes "cosmossdk.io/x/consensus/types"
@ -46,6 +50,7 @@ type (
HasWeightedOperationsX = simsx.HasWeightedOperationsX
HasWeightedOperationsXWithProposals = simsx.HasWeightedOperationsXWithProposals
HasProposalMsgsX = simsx.HasProposalMsgsX
HasLegacyProposalMsgs = simsx.HasLegacyProposalMsgs
)
const SimAppChainID = "simulation-app"
@ -116,6 +121,8 @@ type (
TXBuilder simsxv2.TXBuilder[T]
AppManager appmanager.AppManager[T]
ModuleManager ModuleManager
StreamManager streaming.Manager
StreamHook *appdata.Listener
}
AppFactory[T Tx, V SimulationApp[T]] func(config depinject.Config, outputs ...any) (V, error)
@ -282,11 +289,15 @@ func RunWithRandSourceX[T Tx](
defer done()
testInstance, chainState, accounts := setupChainStateFn(rootCtx, r)
emptySimParams := make(map[string]json.RawMessage) // todo read sims params from disk as before
customFactoryParams := make(map[string]json.RawMessage)
if tCfg.ParamsFile != "" {
bz, err := os.ReadFile(tCfg.ParamsFile)
require.NoError(tb, err)
require.NoError(tb, json.Unmarshal(bz, &customFactoryParams))
}
modules := testInstance.ModuleManager.Modules()
msgFactoriesFn := prepareSimsMsgFactories(r, modules, simsx.ParamWeightSource(emptySimParams))
msgFactoriesFn := prepareSimsMsgFactories(tb, r, modules, simsx.ParamWeightSource(customFactoryParams))
if b, ok := tb.(interface{ ResetTimer() }); ok {
b.ResetTimer()
@ -320,7 +331,6 @@ func prepareInitialGenesisState[T Tx](
moduleManager ModuleManager,
) ([]simtypes.Account, json.RawMessage, string, time.Time) {
txConfig := app.TxConfig()
// todo: replace legacy testdata functions ?
appStateFn := simtestutil.AppStateFn(
app.AppCodec(),
txConfig.SigningContext().AddressCodec(),
@ -443,9 +453,10 @@ func doMainLoop[T Tx](
}
cometInfo := comet.Info{
ValidatorsHash: nil,
Evidence: cs.ValsetHistory.MissBehaviour(r),
ProposerAddress: cs.ActiveValidatorSet[0].Address, // todo: pick random one
ValidatorsHash: nil,
Evidence: cs.ValsetHistory.MissBehaviour(r),
// pick one of top 10
ProposerAddress: cs.ActiveValidatorSet[r.Intn(min(len(cs.ActiveValidatorSet), 10))].Address,
LastCommit: cs.ActiveValidatorSet.NewCommitInfo(r),
}
fOps, pos := futureOpsReg.PopScheduledFor(cs.BlockTime), 0
@ -488,6 +499,7 @@ func doMainLoop[T Tx](
tx, err := testInstance.TXBuilder.Build(ctx, testInstance.AuthKeeper, signers, msg, r, cs.ChainID)
require.NoError(tb, err)
blockReqN.Txs = append(blockReqN.Txs, tx)
if !yield(tx) {
return
}
@ -509,6 +521,26 @@ func doMainLoop[T Tx](
}
txTotalCounter += txPerBlockCounter
cs.ActiveValidatorSet = cs.ActiveValidatorSet.Update(blockRsp.ValidatorUpdates)
if len(testInstance.StreamManager.Listeners) == 0 && testInstance.StreamHook == nil {
continue
}
// stream data
strmCtx, cancel := context.WithTimeout(rootCtx, time.Second)
rawTxs := simsx.Collect(blockReqN.Txs, func(a T) []byte { return a.Bytes() })
require.NoError(tb, cometbft.StreamOut[T](
strmCtx,
int64(blockReqN.Height),
rawTxs,
blockReqN.Txs,
*blockRsp,
changeSet,
testInstance.StreamManager,
testInstance.StreamHook,
true,
tb.Logf,
))
cancel()
}
fmt.Println("+++ reporter:\n" + rootReporter.Summary().String())
fmt.Printf("Tx total: %d skipped: %d\n", txTotalCounter, txSkippedCounter)
@ -516,21 +548,19 @@ func doMainLoop[T Tx](
// prepareSimsMsgFactories constructs and returns a function to retrieve simulation message factories for all modules.
// It initializes proposal and factory registries, registers proposals and weighted operations, and sorts deterministically.
func prepareSimsMsgFactories(
r *rand.Rand,
modules map[string]appmodulev2.AppModule,
weights simsx.WeightSource,
) func() simsx.SimMsgFactoryX {
func prepareSimsMsgFactories(tb testing.TB, r *rand.Rand, modules map[string]appmodulev2.AppModule, weights simsx.WeightSource) func() simsx.SimMsgFactoryX {
tb.Helper()
moduleNames := slices.Collect(maps.Keys(modules))
slices.Sort(moduleNames) // make deterministic
// get all proposal types
proposalRegistry := simsx.NewUniqueTypeRegistry()
for _, n := range moduleNames {
switch xm := modules[n].(type) { // nolint: gocritic // extended in the future
switch xm := modules[n].(type) {
case HasProposalMsgsX:
xm.ProposalMsgsX(weights, proposalRegistry)
// todo: register legacy and v1 msg proposals
case HasLegacyProposalMsgs:
tb.Logf("Ignoring legacy proposal messages for module: %s", n)
}
}
// register all msg factories

View File

@ -31,6 +31,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
## [Unreleased]
## v1.10.0 (December 13, 2024)
### Improvements

View File

@ -25,6 +25,11 @@ Ref: https://keepachangelog.com/en/1.0.0/
## [Unreleased]
### Improvements
* [#23013](https://github.com/cosmos/cosmos-sdk/pull/23013) Support memDB for sims
## [v2.0.0-beta.2](https://github.com/cosmos/cosmos-sdk/releases/tag/store/v2.0.0-beta.2)
* [#22336](https://github.com/cosmos/cosmos-sdk/pull/22336) Finish migration manager.