diff --git a/scripts/build/simulations.mk b/scripts/build/simulations.mk index 7b18244360..f924a9aa84 100644 --- a/scripts/build/simulations.mk +++ b/scripts/build/simulations.mk @@ -1,4 +1,3 @@ -# TODO: This should be ported to work with SimApp v2. #? test-sim-nondeterminism: Run non-determinism test for simapp test-sim-nondeterminism: @@ -6,27 +5,6 @@ test-sim-nondeterminism: @cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout=30m -tags='sims' -run TestAppStateDeterminism \ -NumBlocks=100 -BlockSize=200 - -# Requires an exported plugin. See store/streaming/README.md for documentation. -# -# example: -# export COSMOS_SDK_ABCI_V1= -# make test-sim-nondeterminism-streaming -# -# Using the built-in examples: -# export COSMOS_SDK_ABCI_V1=/store/streaming/abci/examples/file/file -# make test-sim-nondeterminism-streaming -test-sim-nondeterminism-streaming: - # @echo "Running non-determinism-streaming test..." - # @cd ${CURRENT_DIR}/simapp && go test -failfast -mod=readonly -timeout=30m -tags='sims' -run TestAppStateDeterminism \ - # -NumBlocks=100 -BlockSize=200 -EnableStreaming=true - -test-sim-custom-genesis-fast: - # @echo "Running custom genesis simulation..." - # @echo "By default, ${HOME}/.simapp/config/genesis.json will be used." - # @cd ${CURRENT_DIR}/simapp && go test -failfast -mod=readonly -timeout=30m -tags='sims' -run TestFullAppSimulation -Genesis=${HOME}/.simapp/config/genesis.json \ - # -NumBlocks=100 -BlockSize=200 -Seed=99 -SigverifyTx=false - test-sim-import-export: @echo "Running application import/export simulation. This may take several minutes..." @cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout 20m -tags='sims' -run TestAppImportExport \ @@ -37,11 +15,6 @@ test-sim-after-import: @cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout 30m -tags='sims' -run TestAppSimulationAfterImport \ -NumBlocks=50 -test-sim-custom-genesis-multi-seed: - # @echo "Running multi-seed custom genesis simulation..." - # @echo "By default, ${HOME}/.simapp/config/genesis.json will be used." - # @cd ${CURRENT_DIR}/simapp/v2 && go test -failfast -mod=readonly -timeout 30m -tags='sims' -run TestFullAppSimulation -Genesis=${HOME}/.simapp/config/genesis.json \ - # -NumBlocks=400 test-sim-multi-seed-long: @echo "Running long multi-seed application simulation. This may take awhile!" @@ -55,11 +28,8 @@ test-sim-multi-seed-short: .PHONY: \ test-sim-nondeterminism \ -test-sim-nondeterminism-streaming \ -test-sim-custom-genesis-fast \ test-sim-import-export \ test-sim-after-import \ -test-sim-custom-genesis-multi-seed \ test-sim-multi-seed-short \ test-sim-multi-seed-long \ diff --git a/server/v2/appmanager/CHANGELOG.md b/server/v2/appmanager/CHANGELOG.md index 97126de715..6fe1ece162 100644 --- a/server/v2/appmanager/CHANGELOG.md +++ b/server/v2/appmanager/CHANGELOG.md @@ -22,6 +22,7 @@ Each entry must include the Github issue reference in the following format: ## [Unreleased] + ## [v1.0.0-beta.2](https://github.com/cosmos/cosmos-sdk/releases/tag/server/v2/appmanager%2Fv1.0.0-beta.2) * [#23013](https://github.com/cosmos/cosmos-sdk/pull/23013) Introduce `TransactionFuzzer`, an interface for processing and generated state transitions. diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 00163412d9..38bbbfaa0a 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -18,7 +18,6 @@ import ( appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/comet" corecontext "cosmossdk.io/core/context" - "cosmossdk.io/core/event" "cosmossdk.io/core/server" "cosmossdk.io/core/store" "cosmossdk.io/core/transaction" @@ -510,16 +509,8 @@ func (c *consensus[T]) FinalizeBlock( return nil, fmt.Errorf("unable to commit the changeset: %w", err) } - var events []event.Event - events = append(events, resp.PreBlockEvents...) - events = append(events, resp.BeginBlockEvents...) - for _, tx := range resp.TxResults { - events = append(events, tx.Events...) - } - events = append(events, resp.EndBlockEvents...) - // listen to state streaming changes in accordance with the block - err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, decodedTxs, resp.TxResults, events, stateChanges) + err = c.streamDeliverBlockChanges(ctx, req.Height, req.Txs, decodedTxs, *resp, stateChanges) if err != nil { return nil, err } diff --git a/server/v2/cometbft/streaming.go b/server/v2/cometbft/streaming.go index 8ce5f0a0f7..2fcde8916d 100644 --- a/server/v2/cometbft/streaming.go +++ b/server/v2/cometbft/streaming.go @@ -3,10 +3,12 @@ package cometbft import ( "context" "encoding/json" + "fmt" "cosmossdk.io/core/event" "cosmossdk.io/core/server" "cosmossdk.io/core/store" + "cosmossdk.io/core/transaction" errorsmod "cosmossdk.io/errors/v2" "cosmossdk.io/schema/appdata" "cosmossdk.io/server/v2/streaming" @@ -18,14 +20,69 @@ func (c *consensus[T]) streamDeliverBlockChanges( height int64, txs [][]byte, decodedTxs []T, - txResults []server.TxResult, - events []event.Event, + blockResp server.BlockResponse, stateChanges []store.StateChanges, ) error { + return StreamOut(ctx, height, txs, decodedTxs, blockResp, stateChanges, c.streamingManager, c.listener, c.cfg.AppTomlConfig.Trace, c.logger.Error) +} + +// StreamOut stream all the changes happened during deliver block. +func StreamOut[T transaction.Tx]( + ctx context.Context, + height int64, + rawTXs [][]byte, + decodedTXs []T, + blockRsp server.BlockResponse, + stateChanges []store.StateChanges, + streamingManager streaming.Manager, + listener *appdata.Listener, + traceErrs bool, + logErrFn func(msg string, keyVals ...any), +) error { + var events []event.Event + events = append(events, blockRsp.PreBlockEvents...) + events = append(events, blockRsp.BeginBlockEvents...) + for _, tx := range blockRsp.TxResults { + events = append(events, tx.Events...) + } + events = append(events, blockRsp.EndBlockEvents...) + txResults := blockRsp.TxResults + + err := doServeStreamListeners( + ctx, + height, + rawTXs, + txResults, + traceErrs, + streamingManager, + events, + logErrFn, + stateChanges, + ) + if err != nil { + return err + } + return doServeHookListener(listener, height, rawTXs, decodedTXs, events, stateChanges) +} + +func doServeStreamListeners( + ctx context.Context, + height int64, + rawTXs [][]byte, + txResults []server.TxResult, + traceErrs bool, + streamingManager streaming.Manager, + events []event.Event, + logErrFn func(msg string, keyVals ...any), + stateChanges []store.StateChanges, +) error { + if len(streamingManager.Listeners) == 0 { + return nil + } // convert txresults to streaming txresults streamingTxResults := make([]*streaming.ExecTxResult, len(txResults)) for i, txResult := range txResults { - space, code, log := errorsmod.ABCIInfo(txResult.Error, c.cfg.AppTomlConfig.Trace) + space, code, log := errorsmod.ABCIInfo(txResult.Error, traceErrs) events, err := streaming.IntoStreamingEvents(txResult.Events) if err != nil { @@ -42,31 +99,47 @@ func (c *consensus[T]) streamDeliverBlockChanges( } } - for _, streamingListener := range c.streamingManager.Listeners { + for _, streamingListener := range streamingManager.Listeners { events, err := streaming.IntoStreamingEvents(events) if err != nil { return err } if err := streamingListener.ListenDeliverBlock(ctx, streaming.ListenDeliverBlockRequest{ BlockHeight: height, - Txs: txs, + Txs: rawTXs, TxResults: streamingTxResults, Events: events, }); err != nil { - c.logger.Error("ListenDeliverBlock listening hook failed", "height", height, "err", err) + if streamingManager.StopNodeOnErr { + return fmt.Errorf("listen deliver block: %w", err) + } + logErrFn("ListenDeliverBlock listening hook failed", "height", height, "err", err) } if err := streamingListener.ListenStateChanges(ctx, intoStreamingKVPairs(stateChanges)); err != nil { - c.logger.Error("ListenStateChanges listening hook failed", "height", height, "err", err) + if streamingManager.StopNodeOnErr { + return fmt.Errorf("listen state changes: %w", err) + } + logErrFn("ListenStateChanges listening hook failed", "height", height, "err", err) } } + return nil +} - if c.listener == nil { +func doServeHookListener[T transaction.Tx]( + listener *appdata.Listener, + height int64, + rawTXs [][]byte, + decodedTXs []T, + events []event.Event, + stateChanges []store.StateChanges, +) error { + if listener == nil { return nil } // stream the StartBlockData to the listener. - if c.listener.StartBlock != nil { - if err := c.listener.StartBlock(appdata.StartBlockData{ + if listener.StartBlock != nil { + if err := listener.StartBlock(appdata.StartBlockData{ Height: uint64(height), HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 @@ -75,14 +148,14 @@ func (c *consensus[T]) streamDeliverBlockChanges( } } // stream the TxData to the listener. - if c.listener.OnTx != nil { - for i, tx := range txs { - if err := c.listener.OnTx(appdata.TxData{ + if listener.OnTx != nil { + for i, tx := range rawTXs { + if err := listener.OnTx(appdata.TxData{ BlockNumber: uint64(height), TxIndex: int32(i), Bytes: func() ([]byte, error) { return tx, nil }, JSON: func() (json.RawMessage, error) { - return json.Marshal(decodedTxs[i]) + return json.Marshal(decodedTXs[i]) }, }); err != nil { return err @@ -90,20 +163,20 @@ func (c *consensus[T]) streamDeliverBlockChanges( } } // stream the EventData to the listener. - if c.listener.OnEvent != nil { - if err := c.listener.OnEvent(appdata.EventData{Events: events}); err != nil { + if listener.OnEvent != nil { + if err := listener.OnEvent(appdata.EventData{Events: events}); err != nil { return err } } // stream the KVPairData to the listener. - if c.listener.OnKVPair != nil { - if err := c.listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil { + if listener.OnKVPair != nil { + if err := listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil { return err } } // stream the CommitData to the listener. - if c.listener.Commit != nil { - if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil { + if listener.Commit != nil { + if completionCallback, err := listener.Commit(appdata.CommitData{}); err != nil { return err } else if completionCallback != nil { if err := completionCallback(); err != nil { diff --git a/server/v2/stf/CHANGELOG.md b/server/v2/stf/CHANGELOG.md index 2cd7ea060a..6074306814 100644 --- a/server/v2/stf/CHANGELOG.md +++ b/server/v2/stf/CHANGELOG.md @@ -22,6 +22,7 @@ Each entry must include the Github issue reference in the following format: ## [Unreleased] + ## [v1.0.0-beta.2](https://github.com/cosmos/cosmos-sdk/releases/tag/server/v2/stf%2Fv1.0.0-beta.2) * [#23013](https://github.com/cosmos/cosmos-sdk/pull/23013) Introduce `DeliverSims`, an interface for state transitions by sims. diff --git a/simapp/v2/go.mod b/simapp/v2/go.mod index f848c12f16..8a9b259e0d 100644 --- a/simapp/v2/go.mod +++ b/simapp/v2/go.mod @@ -51,6 +51,8 @@ require ( google.golang.org/protobuf v1.36.3 ) +require cosmossdk.io/schema v1.0.0 + require ( buf.build/gen/go/cometbft/cometbft/protocolbuffers/go v1.36.3-20241120201313-68e42a58b301.1 // indirect buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.36.3-20240130113600-88ef6483f90f.1 // indirect @@ -64,7 +66,6 @@ require ( cosmossdk.io/core/testing v0.0.1 // indirect cosmossdk.io/errors v1.0.1 // indirect cosmossdk.io/errors/v2 v2.0.0 // indirect - cosmossdk.io/schema v1.0.0 // indirect cosmossdk.io/server/v2/stf v1.0.0-beta.2 // indirect cosmossdk.io/store v1.10.0-rc.1.0.20241218084712-ca559989da43 // indirect cosmossdk.io/x/tx v1.0.1 // indirect diff --git a/simapp/v2/sim_runner.go b/simapp/v2/sim_runner.go index aceb3367fa..56a81f84c4 100644 --- a/simapp/v2/sim_runner.go +++ b/simapp/v2/sim_runner.go @@ -7,6 +7,7 @@ import ( "iter" "maps" "math/rand" + "os" "slices" "testing" "time" @@ -26,7 +27,10 @@ import ( "cosmossdk.io/depinject" "cosmossdk.io/log" "cosmossdk.io/runtime/v2" + "cosmossdk.io/schema/appdata" "cosmossdk.io/server/v2/appmanager" + "cosmossdk.io/server/v2/cometbft" + "cosmossdk.io/server/v2/streaming" storev2 "cosmossdk.io/store/v2" consensustypes "cosmossdk.io/x/consensus/types" @@ -46,6 +50,7 @@ type ( HasWeightedOperationsX = simsx.HasWeightedOperationsX HasWeightedOperationsXWithProposals = simsx.HasWeightedOperationsXWithProposals HasProposalMsgsX = simsx.HasProposalMsgsX + HasLegacyProposalMsgs = simsx.HasLegacyProposalMsgs ) const SimAppChainID = "simulation-app" @@ -116,6 +121,8 @@ type ( TXBuilder simsxv2.TXBuilder[T] AppManager appmanager.AppManager[T] ModuleManager ModuleManager + StreamManager streaming.Manager + StreamHook *appdata.Listener } AppFactory[T Tx, V SimulationApp[T]] func(config depinject.Config, outputs ...any) (V, error) @@ -282,11 +289,15 @@ func RunWithRandSourceX[T Tx]( defer done() testInstance, chainState, accounts := setupChainStateFn(rootCtx, r) - - emptySimParams := make(map[string]json.RawMessage) // todo read sims params from disk as before + customFactoryParams := make(map[string]json.RawMessage) + if tCfg.ParamsFile != "" { + bz, err := os.ReadFile(tCfg.ParamsFile) + require.NoError(tb, err) + require.NoError(tb, json.Unmarshal(bz, &customFactoryParams)) + } modules := testInstance.ModuleManager.Modules() - msgFactoriesFn := prepareSimsMsgFactories(r, modules, simsx.ParamWeightSource(emptySimParams)) + msgFactoriesFn := prepareSimsMsgFactories(tb, r, modules, simsx.ParamWeightSource(customFactoryParams)) if b, ok := tb.(interface{ ResetTimer() }); ok { b.ResetTimer() @@ -320,7 +331,6 @@ func prepareInitialGenesisState[T Tx]( moduleManager ModuleManager, ) ([]simtypes.Account, json.RawMessage, string, time.Time) { txConfig := app.TxConfig() - // todo: replace legacy testdata functions ? appStateFn := simtestutil.AppStateFn( app.AppCodec(), txConfig.SigningContext().AddressCodec(), @@ -443,9 +453,10 @@ func doMainLoop[T Tx]( } cometInfo := comet.Info{ - ValidatorsHash: nil, - Evidence: cs.ValsetHistory.MissBehaviour(r), - ProposerAddress: cs.ActiveValidatorSet[0].Address, // todo: pick random one + ValidatorsHash: nil, + Evidence: cs.ValsetHistory.MissBehaviour(r), + // pick one of top 10 + ProposerAddress: cs.ActiveValidatorSet[r.Intn(min(len(cs.ActiveValidatorSet), 10))].Address, LastCommit: cs.ActiveValidatorSet.NewCommitInfo(r), } fOps, pos := futureOpsReg.PopScheduledFor(cs.BlockTime), 0 @@ -488,6 +499,7 @@ func doMainLoop[T Tx]( tx, err := testInstance.TXBuilder.Build(ctx, testInstance.AuthKeeper, signers, msg, r, cs.ChainID) require.NoError(tb, err) + blockReqN.Txs = append(blockReqN.Txs, tx) if !yield(tx) { return } @@ -509,6 +521,26 @@ func doMainLoop[T Tx]( } txTotalCounter += txPerBlockCounter cs.ActiveValidatorSet = cs.ActiveValidatorSet.Update(blockRsp.ValidatorUpdates) + + if len(testInstance.StreamManager.Listeners) == 0 && testInstance.StreamHook == nil { + continue + } + // stream data + strmCtx, cancel := context.WithTimeout(rootCtx, time.Second) + rawTxs := simsx.Collect(blockReqN.Txs, func(a T) []byte { return a.Bytes() }) + require.NoError(tb, cometbft.StreamOut[T]( + strmCtx, + int64(blockReqN.Height), + rawTxs, + blockReqN.Txs, + *blockRsp, + changeSet, + testInstance.StreamManager, + testInstance.StreamHook, + true, + tb.Logf, + )) + cancel() } fmt.Println("+++ reporter:\n" + rootReporter.Summary().String()) fmt.Printf("Tx total: %d skipped: %d\n", txTotalCounter, txSkippedCounter) @@ -516,21 +548,19 @@ func doMainLoop[T Tx]( // prepareSimsMsgFactories constructs and returns a function to retrieve simulation message factories for all modules. // It initializes proposal and factory registries, registers proposals and weighted operations, and sorts deterministically. -func prepareSimsMsgFactories( - r *rand.Rand, - modules map[string]appmodulev2.AppModule, - weights simsx.WeightSource, -) func() simsx.SimMsgFactoryX { +func prepareSimsMsgFactories(tb testing.TB, r *rand.Rand, modules map[string]appmodulev2.AppModule, weights simsx.WeightSource) func() simsx.SimMsgFactoryX { + tb.Helper() moduleNames := slices.Collect(maps.Keys(modules)) slices.Sort(moduleNames) // make deterministic // get all proposal types proposalRegistry := simsx.NewUniqueTypeRegistry() for _, n := range moduleNames { - switch xm := modules[n].(type) { // nolint: gocritic // extended in the future + switch xm := modules[n].(type) { case HasProposalMsgsX: xm.ProposalMsgsX(weights, proposalRegistry) - // todo: register legacy and v1 msg proposals + case HasLegacyProposalMsgs: + tb.Logf("Ignoring legacy proposal messages for module: %s", n) } } // register all msg factories diff --git a/store/CHANGELOG.md b/store/CHANGELOG.md index 4d170bff0e..777886f675 100644 --- a/store/CHANGELOG.md +++ b/store/CHANGELOG.md @@ -31,6 +31,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [Unreleased] + ## v1.10.0 (December 13, 2024) ### Improvements diff --git a/store/v2/CHANGELOG.md b/store/v2/CHANGELOG.md index 27c96ed198..2f4820544a 100644 --- a/store/v2/CHANGELOG.md +++ b/store/v2/CHANGELOG.md @@ -25,6 +25,11 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [Unreleased] +### Improvements + +* [#23013](https://github.com/cosmos/cosmos-sdk/pull/23013) Support memDB for sims + + ## [v2.0.0-beta.2](https://github.com/cosmos/cosmos-sdk/releases/tag/store/v2.0.0-beta.2) * [#22336](https://github.com/cosmos/cosmos-sdk/pull/22336) Finish migration manager.