From b0466fd74f8ef58daf4baf997f14670d62e0a70c Mon Sep 17 00:00:00 2001 From: Alex | Interchain Labs Date: Mon, 12 May 2025 20:13:38 -0400 Subject: [PATCH] fix: race condition in baseapp states (#24655) Co-authored-by: beer-1 Co-authored-by: Tyler <48813565+technicallyty@users.noreply.github.com> --- CHANGELOG.md | 9 ++- baseapp/abci.go | 148 ++++++++++++++++++----------------- baseapp/abci_test.go | 49 ++++++++++++ baseapp/baseapp.go | 145 +++++++++------------------------- baseapp/baseapp_test.go | 44 ++++++++--- baseapp/config/gas.go | 12 +++ baseapp/options.go | 2 +- baseapp/regression_test.go | 4 + baseapp/state/manager.go | 137 ++++++++++++++++++++++++++++++++ baseapp/{ => state}/state.go | 21 +++-- baseapp/test_helpers.go | 8 +- baseapp/utils_test.go | 20 +++-- go.mod | 2 + go.sum | 2 - store/rootmulti/store.go | 37 +++++---- 15 files changed, 409 insertions(+), 231 deletions(-) create mode 100644 baseapp/config/gas.go create mode 100644 baseapp/state/manager.go rename baseapp/{ => state}/state.go (54%) diff --git a/CHANGELOG.md b/CHANGELOG.md index b7b27dbeda..0e4dc2b189 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,8 +38,13 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [Unreleased] +### Improvements + +* (baseapp) [#24655](https://github.com/cosmos/cosmos-sdk/pull/24655) Add mutex locks for `state` and make `lastCommitInfo` atomic to prevent race conditions between `Commit` and `CreateQueryContext`. + ### Bug Fixes +* (client, client/rpc, x/auth/tx) [#24551](https://github.com/cosmos/cosmos-sdk/pull/24551) Handle cancellation properly when supplying context to client methods. * (x/authz) [#24638](https://github.com/cosmos/cosmos-sdk/pull/24638) Fixed a minor bug where the grant key was cast as a string and dumped directly into the error message leading to an error string possibly containing invalid UTF-8. ### Deprecated @@ -48,10 +53,6 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (x/group) [#24571](https://github.com/cosmos/cosmos-sdk/pull/24571) Deprecate the `x/group` module in the Cosmos SDK repository. This module will not be maintained to the extent that our core modules will and will be kept in a [legacy repo](https://github.com/cosmos/cosmos-legacy). * (types) [#24664](https://github.com/cosmos/cosmos-sdk/pull/24664) Deprecate the `Invariant` type in the Cosmos SDK. -### Bug Fixes - -* (client, client/rpc, x/auth/tx) [#24551](https://github.com/cosmos/cosmos-sdk/pull/24551) Handle cancellation properly when supplying context to client methods. - ## [v0.53.0](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.53.0) - 2025-04-29 ### Features diff --git a/baseapp/abci.go b/baseapp/abci.go index edfc43da08..58e41b6aff 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -20,6 +20,7 @@ import ( snapshottypes "cosmossdk.io/store/snapshots/types" storetypes "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/baseapp/state" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/telemetry" sdk "github.com/cosmos/cosmos-sdk/types" @@ -62,14 +63,15 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha } // initialize states with a correct header - app.setState(execModeFinalize, initHeader) - app.setState(execModeCheck, initHeader) + app.stateManager.SetState(execModeFinalize, app.cms, initHeader, app.logger, app.streamingManager) + app.stateManager.SetState(execModeCheck, app.cms, initHeader, app.logger, app.streamingManager) + finalizeState := app.stateManager.GetState(execModeFinalize) // Store the consensus params in the BaseApp's param store. Note, this must be // done after the finalizeBlockState and context have been set as it's persisted // to state. if req.ConsensusParams != nil { - err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams) + err := app.StoreConsensusParams(finalizeState.Context(), *req.ConsensusParams) if err != nil { return nil, err } @@ -81,13 +83,14 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha // handler, the block height is zero by default. However, after Commit is called // the height needs to reflect the true block height. initHeader.Height = req.InitialHeight - app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader). + checkState := app.stateManager.GetState(execModeCheck) + checkState.SetContext(checkState.Context().WithBlockHeader(initHeader). WithHeaderInfo(coreheader.Info{ ChainID: req.ChainId, Height: req.InitialHeight, Time: req.Time, })) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader). + finalizeState.SetContext(finalizeState.Context().WithBlockHeader(initHeader). WithHeaderInfo(coreheader.Info{ ChainID: req.ChainId, Height: req.InitialHeight, @@ -100,9 +103,9 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha } // add block gas meter for any genesis transactions (allow infinite gas) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter())) + finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter())) - res, err := app.abciHandlers.InitChainer(app.finalizeBlockState.Context(), req) + res, err := app.abciHandlers.InitChainer(finalizeState.Context(), req) if err != nil { return nil, err } @@ -336,9 +339,9 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab // and only the AnteHandler is executed. State is persisted to the BaseApp's // internal CheckTx state if the AnteHandler passes. Otherwise, the ResponseCheckTx // will contain relevant error information. Regardless of tx execution outcome, -// the ResponseCheckTx will contain relevant gas execution context. +// the ResponseCheckTx will contain the relevant gas execution context. func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { - var mode execMode + var mode sdk.ExecMode switch req.Type { case abci.CheckTxType_New: @@ -352,14 +355,14 @@ func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, er } if app.abciHandlers.CheckTxHandler == nil { - gInfo, result, anteEvents, err := app.runTx(mode, req.Tx, nil) + gasInfo, result, anteEvents, err := app.runTx(mode, req.Tx, nil) if err != nil { - return sdkerrors.ResponseCheckTxWithEvents(err, gInfo.GasWanted, gInfo.GasUsed, anteEvents, app.trace), nil + return sdkerrors.ResponseCheckTxWithEvents(err, gasInfo.GasWanted, gasInfo.GasUsed, anteEvents, app.trace), nil } return &abci.ResponseCheckTx{ - GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints? - GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints? + GasWanted: int64(gasInfo.GasWanted), // TODO: Should type accept unsigned ints? + GasUsed: int64(gasInfo.GasUsed), // TODO: Should type accept unsigned ints? Log: result.Log, Data: result.Data, Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents), @@ -402,7 +405,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc NextValidatorsHash: req.NextValidatorsHash, AppHash: app.LastCommitID().Hash, } - app.setState(execModePrepareProposal, header) + app.stateManager.SetState(execModePrepareProposal, app.cms, header, app.logger, app.streamingManager) // CometBFT must never call PrepareProposal with a height of 0. // @@ -411,7 +414,8 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc return nil, errors.New("PrepareProposal called with invalid height") } - app.prepareProposalState.SetContext(app.getContextForProposal(app.prepareProposalState.Context(), req.Height). + prepareProposalState := app.stateManager.GetState(execModePrepareProposal) + prepareProposalState.SetContext(app.getContextForProposal(prepareProposalState.Context(), req.Height). WithVoteInfos(toVoteInfo(req.LocalLastCommit.Votes)). // this is a set of votes that are not finalized yet, wait for commit WithBlockHeight(req.Height). WithBlockTime(req.Time). @@ -424,9 +428,9 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc Time: req.Time, })) - app.prepareProposalState.SetContext(app.prepareProposalState.Context(). - WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.Context())). - WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.Context()))) + prepareProposalState.SetContext(prepareProposalState.Context(). + WithConsensusParams(app.GetConsensusParams(prepareProposalState.Context())). + WithBlockGasMeter(app.getBlockGasMeter(prepareProposalState.Context()))) defer func() { if err := recover(); err != nil { @@ -441,7 +445,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc } }() - resp, err = app.abciHandlers.PrepareProposalHandler(app.prepareProposalState.Context(), req) + resp, err = app.abciHandlers.PrepareProposalHandler(prepareProposalState.Context(), req) if err != nil { app.logger.Error("failed to prepare proposal", "height", req.Height, "time", req.Time, "err", err) return &abci.ResponsePrepareProposal{Txs: req.Txs}, nil @@ -486,7 +490,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc NextValidatorsHash: req.NextValidatorsHash, AppHash: app.LastCommitID().Hash, } - app.setState(execModeProcessProposal, header) + app.stateManager.SetState(execModeProcessProposal, app.cms, header, app.logger, app.streamingManager) // Since the application can get access to FinalizeBlock state and write to it, // we must be sure to reset it in case ProcessProposal timeouts and is called @@ -496,10 +500,11 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc if req.Height > app.initialHeight { // abort any running OE app.optimisticExec.Abort() - app.setState(execModeFinalize, header) + app.stateManager.SetState(execModeFinalize, app.cms, header, app.logger, app.streamingManager) } - app.processProposalState.SetContext(app.getContextForProposal(app.processProposalState.Context(), req.Height). + processProposalState := app.stateManager.GetState(execModeProcessProposal) + processProposalState.SetContext(app.getContextForProposal(processProposalState.Context(), req.Height). WithVoteInfos(req.ProposedLastCommit.Votes). // this is a set of votes that are not finalized yet, wait for commit WithBlockHeight(req.Height). WithBlockTime(req.Time). @@ -513,9 +518,9 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc Time: req.Time, })) - app.processProposalState.SetContext(app.processProposalState.Context(). - WithConsensusParams(app.GetConsensusParams(app.processProposalState.Context())). - WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.Context()))) + processProposalState.SetContext(processProposalState.Context(). + WithConsensusParams(app.GetConsensusParams(processProposalState.Context())). + WithBlockGasMeter(app.getBlockGasMeter(processProposalState.Context()))) defer func() { if err := recover(); err != nil { @@ -530,7 +535,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc } }() - resp, err = app.abciHandlers.ProcessProposalHandler(app.processProposalState.Context(), req) + resp, err = app.abciHandlers.ProcessProposalHandler(processProposalState.Context(), req) if err != nil { app.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err) return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil @@ -570,7 +575,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.RequestExtendVote) ( // finalizeBlockState context, otherwise we don't get the uncommitted data // from InitChain. if req.Height == app.initialHeight { - ctx, _ = app.finalizeBlockState.Context().CacheContext() + ctx, _ = app.stateManager.GetState(execModeFinalize).Context().CacheContext() } else { emptyHeader := cmtproto.Header{ChainID: app.chainID, Height: req.Height} ms := app.cms.CacheMultiStore() @@ -645,7 +650,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r // finalizeBlockState context, otherwise we don't get the uncommitted data // from InitChain. if req.Height == app.initialHeight { - ctx, _ = app.finalizeBlockState.Context().CacheContext() + ctx, _ = app.stateManager.GetState(execModeFinalize).Context().CacheContext() } else { emptyHeader := cmtproto.Header{ChainID: app.chainID, Height: req.Height} ms := app.cms.CacheMultiStore() @@ -700,7 +705,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r // internalFinalizeBlock executes the block, called by the Optimistic // Execution flow or by the FinalizeBlock ABCI method. The context received is -// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context() +// only used to handle early cancellation, for anything related to state app.stateManager.GetState(execModeFinalize).Context() // must be used. func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { var events []abci.Event @@ -731,12 +736,14 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request // finalizeBlockState should be set on InitChain or ProcessProposal. If it is // nil, it means we are replaying this block and we need to set the state here // given that during block replay ProcessProposal is not executed by CometBFT. - if app.finalizeBlockState == nil { - app.setState(execModeFinalize, header) + finalizeState := app.stateManager.GetState(execModeFinalize) + if finalizeState == nil { + app.stateManager.SetState(execModeFinalize, app.cms, header, app.logger, app.streamingManager) + finalizeState = app.stateManager.GetState(execModeFinalize) } // Context is now updated with Header information. - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context(). + finalizeState.SetContext(finalizeState.Context(). WithBlockHeader(header). WithHeaderHash(req.Hash). WithHeaderInfo(coreheader.Info{ @@ -746,7 +753,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request Hash: req.Hash, AppHash: app.LastCommitID().Hash, }). - WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())). + WithConsensusParams(app.GetConsensusParams(finalizeState.Context())). WithVoteInfos(req.DecidedLastCommit.Votes). WithExecMode(sdk.ExecModeFinalize). WithCometInfo(cometInfo{ @@ -757,11 +764,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request })) // GasMeter must be set after we get a context with updated consensus params. - gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context()) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) + gasMeter := app.getBlockGasMeter(finalizeState.Context()) + finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter)) - if app.checkState != nil { - app.checkState.SetContext(app.checkState.Context(). + if checkState := app.stateManager.GetState(execModeCheck); checkState != nil { + checkState.SetContext(checkState.Context(). WithBlockGasMeter(gasMeter). WithHeaderHash(req.Hash)) } @@ -790,8 +797,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request events = append(events, beginBlock.Events...) // Reset the gas meter so that the AnteHandlers aren't required to - gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context()) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) + gasMeter = app.getBlockGasMeter(finalizeState.Context()) + finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter)) // Iterate over all raw transactions in the proposal and attempt to execute // them, gathering the execution results. @@ -828,11 +835,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request txResults = append(txResults, response) } - if app.finalizeBlockState.ms.TracingEnabled() { - app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) + if finalizeState.MultiStore.TracingEnabled() { + finalizeState.MultiStore = finalizeState.MultiStore.SetTracingContext(nil).(storetypes.CacheMultiStore) } - endBlock, err := app.endBlock(app.finalizeBlockState.Context()) + endBlock, err := app.endBlock(finalizeState.Context()) if err != nil { return nil, err } @@ -846,7 +853,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request } events = append(events, endBlock.Events...) - cp := app.GetConsensusParams(app.finalizeBlockState.Context()) + cp := app.GetConsensusParams(finalizeState.Context()) return &abci.ResponseFinalizeBlock{ Events: events, @@ -873,7 +880,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (res *abci.Res } // call the streaming service hooks with the FinalizeBlock messages for _, streamingListener := range app.streamingManager.ABCIListeners { - if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); err != nil { + if err := streamingListener.ListenFinalizeBlock(app.stateManager.GetState(execModeFinalize).Context(), *req, *res); err != nil { app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err) } } @@ -895,7 +902,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (res *abci.Res } // if it was aborted, we need to reset the state - app.finalizeBlockState = nil + app.stateManager.ClearState(execModeFinalize) app.optimisticExec.Reset() } @@ -934,11 +941,12 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error { // against that height and gracefully halt if it matches the latest committed // height. func (app *BaseApp) Commit() (*abci.ResponseCommit, error) { - header := app.finalizeBlockState.Context().BlockHeader() + finalizeState := app.stateManager.GetState(execModeFinalize) + header := finalizeState.Context().BlockHeader() retainHeight := app.GetBlockRetentionHeight(header.Height) if app.abciHandlers.Precommiter != nil { - app.abciHandlers.Precommiter(app.finalizeBlockState.Context()) + app.abciHandlers.Precommiter(finalizeState.Context()) } rms, ok := app.cms.(*rootmulti.Store) @@ -954,7 +962,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) { abciListeners := app.streamingManager.ABCIListeners if len(abciListeners) > 0 { - ctx := app.finalizeBlockState.Context() + ctx := finalizeState.Context() blockHeight := ctx.BlockHeight() changeSet := app.cms.PopStateCache() @@ -969,12 +977,12 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) { // // NOTE: This is safe because CometBFT holds a lock on the mempool for // Commit. Use the header from this latest block. - app.setState(execModeCheck, header) + app.stateManager.SetState(execModeCheck, app.cms, header, app.logger, app.streamingManager) - app.finalizeBlockState = nil + app.stateManager.ClearState(execModeFinalize) if app.abciHandlers.PrepareCheckStater != nil { - app.abciHandlers.PrepareCheckStater(app.checkState.Context()) + app.abciHandlers.PrepareCheckStater(app.stateManager.GetState(execModeCheck).Context()) } // The SnapshotIfApplicable method will create the snapshot by starting the goroutine @@ -992,7 +1000,7 @@ func (app *BaseApp) workingHash() []byte { // Write the FinalizeBlock state into branched storage and commit the MultiStore. // The write to the FinalizeBlock state writes all state transitions to the root // MultiStore (app.cms) so when Commit() is called it persists those values. - app.finalizeBlockState.ms.Write() + app.stateManager.GetState(execModeFinalize).MultiStore.Write() // Get the hash of all writes in order to return the apphash to the comet in finalizeBlock. commitHash := app.cms.WorkingHash() @@ -1139,7 +1147,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.ResponseQuery { // access any state changes made in InitChain. func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context { if height == app.initialHeight { - ctx, _ = app.finalizeBlockState.Context().CacheContext() + ctx, _ = app.stateManager.GetState(execModeFinalize).Context().CacheContext() // clear all context data set during InitChain to avoid inconsistent behavior ctx = ctx.WithBlockHeader(cmtproto.Header{}).WithHeaderInfo(coreheader.Info{}) @@ -1199,26 +1207,26 @@ func checkNegativeHeight(height int64) error { // CreateQueryContext creates a new sdk.Context for a query, taking as args // the block height and whether the query needs a proof or not. -func (app *BaseApp) CreateQueryContext(height int64, prove bool) (sdk.Context, error) { - return app.CreateQueryContextWithCheckHeader(height, prove, true) +func (bapp *BaseApp) CreateQueryContext(height int64, prove bool) (sdk.Context, error) { + return bapp.CreateQueryContextWithCheckHeader(height, prove, true) } // CreateQueryContextWithCheckHeader creates a new sdk.Context for a query, taking as args // the block height, whether the query needs a proof or not, and whether to check the header or not. -func (app *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, checkHeader bool) (sdk.Context, error) { +func (bapp *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, checkHeader bool) (sdk.Context, error) { if err := checkNegativeHeight(height); err != nil { return sdk.Context{}, err } // use custom query multi-store if provided - qms := app.qms + qms := bapp.qms if qms == nil { - qms = app.cms.(storetypes.MultiStore) + qms = bapp.cms.(storetypes.MultiStore) } lastBlockHeight := qms.LatestVersion() if lastBlockHeight == 0 { - return sdk.Context{}, errorsmod.Wrapf(sdkerrors.ErrInvalidHeight, "%s is not ready; please wait for first block", app.Name()) + return sdk.Context{}, errorsmod.Wrapf(sdkerrors.ErrInvalidHeight, "%s is not ready; please wait for first block", bapp.Name()) } if height > lastBlockHeight { @@ -1239,13 +1247,13 @@ func (app *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, check var header *cmtproto.Header isLatest := height == 0 - for _, state := range []*state{ - app.checkState, - app.finalizeBlockState, + for _, appState := range []*state.State{ + bapp.stateManager.GetState(execModeCheck), + bapp.stateManager.GetState(execModeFinalize), } { - if state != nil { + if appState != nil { // branch the commit multi-store for safety - h := state.Context().BlockHeader() + h := appState.Context().BlockHeader() if isLatest { lastBlockHeight = qms.LatestVersion() } @@ -1279,14 +1287,14 @@ func (app *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, check } // branch the commit multi-store for safety - ctx := sdk.NewContext(cacheMS, *header, true, app.logger). - WithMinGasPrices(app.minGasPrices). - WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)). + ctx := sdk.NewContext(cacheMS, *header, true, bapp.logger). + WithMinGasPrices(bapp.gasConfig.MinGasPrices). + WithGasMeter(storetypes.NewGasMeter(bapp.gasConfig.QueryGasLimit)). WithBlockHeader(*header). WithBlockHeight(height) if !isLatest { - rms, ok := app.cms.(*rootmulti.Store) + rms, ok := bapp.cms.(*rootmulti.Store) if ok { cInfo, err := rms.GetCommitInfo(height) if cInfo != nil && err == nil { @@ -1349,11 +1357,11 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 { var retentionHeight int64 // Define the number of blocks needed to protect against misbehaving validators - // which allows light clients to operate safely. Note, we piggy back of the + // which allows light clients to operate safely. Note, we piggyback of the // evidence parameters instead of computing an estimated number of blocks based // on the unbonding period and block commitment time as the two should be // equivalent. - cp := app.GetConsensusParams(app.finalizeBlockState.Context()) + cp := app.GetConsensusParams(app.stateManager.GetState(execModeFinalize).Context()) if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 { retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks } diff --git a/baseapp/abci_test.go b/baseapp/abci_test.go index 1e1cfff985..f23b274f23 100644 --- a/baseapp/abci_test.go +++ b/baseapp/abci_test.go @@ -11,6 +11,7 @@ import ( "math/rand" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -2535,3 +2536,51 @@ func TestFinalizeBlockDeferResponseHandle(t *testing.T) { require.Empty(t, res) require.NotEmpty(t, err) } + +func TestABCI_Race_Commit_Query(t *testing.T) { + suite := NewBaseAppSuite(t, baseapp.SetChainID("test-chain-id")) + app := suite.baseApp + + _, err := app.InitChain(&abci.RequestInitChain{ + ChainId: "test-chain-id", + ConsensusParams: &cmtproto.ConsensusParams{Block: &cmtproto.BlockParams{MaxGas: 5000000}}, + InitialHeight: 1, + }) + require.NoError(t, err) + _, err = app.Commit() + require.NoError(t, err) + + counter := atomic.Uint64{} + counter.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + queryCreator := func() { + for { + select { + case <-ctx.Done(): + return + default: + _, err := app.CreateQueryContextWithCheckHeader(0, false, false) + require.NoError(t, err) + + counter.Add(1) + } + } + } + + for i := 0; i < 100; i++ { + go queryCreator() + } + + for i := 0; i < 1000; i++ { + _, err = app.FinalizeBlock(&abci.RequestFinalizeBlock{Height: app.LastBlockHeight() + 1}) + require.NoError(t, err) + + _, err = app.Commit() + require.NoError(t, err) + } + + cancel() + + require.Equal(t, int64(1001), app.GetContextForCheckTx(nil).BlockHeight()) +} diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 60558dcaa8..c7ebc95610 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -17,7 +17,6 @@ import ( "github.com/cosmos/gogoproto/proto" protov2 "google.golang.org/protobuf/proto" - "cosmossdk.io/core/header" errorsmod "cosmossdk.io/errors" "cosmossdk.io/log" "cosmossdk.io/store" @@ -25,7 +24,9 @@ import ( "cosmossdk.io/store/snapshots" storetypes "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/baseapp/config" "github.com/cosmos/cosmos-sdk/baseapp/oe" + "github.com/cosmos/cosmos-sdk/baseapp/state" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" servertypes "github.com/cosmos/cosmos-sdk/server/types" @@ -37,8 +38,6 @@ import ( ) type ( - execMode uint8 - // StoreLoader defines a customizable function to control how we load the // CommitMultiStore from disk. This is useful for state migration, when // loading a datastore written with an older version of the software. In @@ -48,14 +47,14 @@ type ( ) const ( - execModeCheck execMode = iota // Check a transaction - execModeReCheck // Recheck a (pending) transaction after a commit - execModeSimulate // Simulate a transaction - execModePrepareProposal // Prepare a block proposal - execModeProcessProposal // Process a block proposal - execModeVoteExtension // Extend or verify a pre-commit vote - execModeVerifyVoteExtension // Verify a vote extension - execModeFinalize // Finalize a block proposal + execModeCheck = sdk.ExecModeCheck // Check a transaction + execModeReCheck = sdk.ExecModeReCheck // Recheck a (pending) transaction after a commit + execModeSimulate = sdk.ExecModeSimulate // Simulate a transaction + execModePrepareProposal = sdk.ExecModePrepareProposal // Prepare a block proposal + execModeProcessProposal = sdk.ExecModeProcessProposal // Process a block proposal + execModeVoteExtension = sdk.ExecModeVoteExtension // Extend or verify a pre-commit vote + execModeVerifyVoteExtension = sdk.ExecModeVerifyVoteExtension // Verify a vote extension + execModeFinalize = sdk.ExecModeFinalize // Finalize a block proposal ) var _ servertypes.ABCI = (*BaseApp)(nil) @@ -90,29 +89,7 @@ type BaseApp struct { // manages snapshots, i.e. dumps of app state at certain intervals snapshotManager *snapshots.Manager - // volatile states: - // - // - checkState is set on InitChain and reset on Commit - // - finalizeBlockState is set on InitChain and FinalizeBlock and set to nil - // on Commit. - // - // - checkState: Used for CheckTx, which is set based on the previous block's - // state. This state is never committed. - // - // - prepareProposalState: Used for PrepareProposal, which is set based on the - // previous block's state. This state is never committed. In case of multiple - // consensus rounds, the state is always reset to the previous block's state. - // - // - processProposalState: Used for ProcessProposal, which is set based on the - // the previous block's state. This state is never committed. In case of - // multiple rounds, the state is always reset to the previous block's state. - // - // - finalizeBlockState: Used for FinalizeBlock, which is set based on the - // previous block's state. This state is committed. - checkState *state - prepareProposalState *state - processProposalState *state - finalizeBlockState *state + stateManager *state.Manager // An inter-block write-through cache provided to the context during the ABCI // FinalizeBlock call. @@ -122,12 +99,8 @@ type BaseApp struct { // application parameter store. paramStore ParamStore - // queryGasLimit defines the maximum gas for queries; unbounded if 0. - queryGasLimit uint64 - - // The minimum gas prices a validator is willing to accept for processing a - // transaction. This is mainly used for DoS and spam prevention. - minGasPrices sdk.DecCoins + // gasConfig contains node-level gas configuration. + gasConfig config.GasConfig // initialHeight is the initial height at which we start the BaseApp initialHeight int64 @@ -207,7 +180,7 @@ func NewBaseApp( txDecoder: txDecoder, fauxMerkleMode: false, sigverifyTx: true, - queryGasLimit: math.MaxUint64, + gasConfig: config.GasConfig{QueryGasLimit: math.MaxUint64}, } for _, option := range options { @@ -254,6 +227,8 @@ func NewBaseApp( } } + app.stateManager = state.NewManager(app.gasConfig) + return app } @@ -431,17 +406,21 @@ func (app *BaseApp) Init() error { return errors.New("commit multi-store must not be nil") } + if app.stateManager == nil { + return errors.New("state manager must not be nil") + } + emptyHeader := cmtproto.Header{ChainID: app.chainID} // needed for the export command which inits from store but never calls initchain - app.setState(execModeCheck, emptyHeader) + app.stateManager.SetState(execModeCheck, app.cms, emptyHeader, app.logger, app.streamingManager) app.Seal() return app.cms.GetPruning().Validate() } func (app *BaseApp) setMinGasPrices(gasPrices sdk.DecCoins) { - app.minGasPrices = gasPrices + app.gasConfig.MinGasPrices = gasPrices } func (app *BaseApp) setHaltHeight(haltHeight uint64) { @@ -478,43 +457,6 @@ func (app *BaseApp) Seal() { app.sealed = true } // IsSealed returns true if the BaseApp is sealed and false otherwise. func (app *BaseApp) IsSealed() bool { return app.sealed } -// setState sets the BaseApp's state for the corresponding mode with a branched -// multi-store (i.e. a CacheMultiStore) and a new Context with the same -// multi-store branch, and provided header. -func (app *BaseApp) setState(mode execMode, h cmtproto.Header) { - ms := app.cms.CacheMultiStore() - headerInfo := header.Info{ - Height: h.Height, - Time: h.Time, - ChainID: h.ChainID, - AppHash: h.AppHash, - } - baseState := &state{ - ms: ms, - ctx: sdk.NewContext(ms, h, false, app.logger). - WithStreamingManager(app.streamingManager). - WithHeaderInfo(headerInfo), - } - - switch mode { - case execModeCheck: - baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)) - app.checkState = baseState - - case execModePrepareProposal: - app.prepareProposalState = baseState - - case execModeProcessProposal: - app.processProposalState = baseState - - case execModeFinalize: - app.finalizeBlockState = baseState - - default: - panic(fmt.Sprintf("invalid runTxMode for setState: %d", mode)) - } -} - // SetCircuitBreaker sets the circuit breaker for the BaseApp. // The circuit breaker is checked on every message execution to verify if a transaction should be executed or not. func (app *BaseApp) SetCircuitBreaker(cb CircuitBreaker) { @@ -635,22 +577,6 @@ func validateBasicTxMsgs(msgs []sdk.Msg) error { return nil } -func (app *BaseApp) getState(mode execMode) *state { - switch mode { - case execModeFinalize: - return app.finalizeBlockState - - case execModePrepareProposal: - return app.prepareProposalState - - case execModeProcessProposal: - return app.processProposalState - - default: - return app.checkState - } -} - func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter { if app.disableBlockGasMeter { return noopGasMeter{} @@ -664,11 +590,11 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter { } // retrieve the context for the tx w/ txBytes and other memoized values. -func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context { +func (app *BaseApp) getContextForTx(mode sdk.ExecMode, txBytes []byte) sdk.Context { app.mu.Lock() defer app.mu.Unlock() - modeState := app.getState(mode) + modeState := app.stateManager.GetState(mode) if modeState == nil { panic(fmt.Sprintf("state is nil for mode %v", mode)) } @@ -687,7 +613,7 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context { if mode == execModeSimulate { ctx, _ = ctx.CacheContext() - ctx = ctx.WithExecMode(sdk.ExecMode(execModeSimulate)) + ctx = ctx.WithExecMode(execModeSimulate) } return ctx @@ -700,11 +626,9 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context msCache := ms.CacheMultiStore() if msCache.TracingEnabled() { msCache = msCache.SetTracingContext( - storetypes.TraceContext( - map[string]any{ - "txHash": fmt.Sprintf("%X", tmhash.Sum(txBytes)), - }, - ), + map[string]any{ + "txHash": fmt.Sprintf("%X", tmhash.Sum(txBytes)), + }, ).(storetypes.CacheMultiStore) } @@ -714,7 +638,8 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) ([]abci.Event, error) { var events []abci.Event if app.abciHandlers.PreBlocker != nil { - ctx := app.finalizeBlockState.Context().WithEventManager(sdk.NewEventManager()) + finalizeState := app.stateManager.GetState(execModeFinalize) + ctx := finalizeState.Context().WithEventManager(sdk.NewEventManager()) rsp, err := app.abciHandlers.PreBlocker(ctx, req) if err != nil { return nil, err @@ -726,7 +651,7 @@ func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) ([]abci.Event, erro // GasMeter must be set after we get a context with updated consensus params. gasMeter := app.getBlockGasMeter(ctx) ctx = ctx.WithBlockGasMeter(gasMeter) - app.finalizeBlockState.SetContext(ctx) + finalizeState.SetContext(ctx) } events = ctx.EventManager().ABCIEvents() } @@ -740,7 +665,7 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er ) if app.abciHandlers.BeginBlocker != nil { - resp, err = app.abciHandlers.BeginBlocker(app.finalizeBlockState.Context()) + resp, err = app.abciHandlers.BeginBlocker(app.stateManager.GetState(execModeFinalize).Context()) if err != nil { return resp, err } @@ -802,7 +727,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) { var endblock sdk.EndBlock if app.abciHandlers.EndBlocker != nil { - eb, err := app.abciHandlers.EndBlocker(app.finalizeBlockState.Context()) + eb, err := app.abciHandlers.EndBlocker(app.stateManager.GetState(execModeFinalize).Context()) if err != nil { return endblock, err } @@ -831,7 +756,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) { // and execute successfully. An error is returned otherwise. // both txbytes and the decoded tx are passed to runTx to avoid the state machine encoding the tx and decoding the transaction twice // passing the decoded tx to runTX is optional, it will be decoded if the tx is nil -func (app *BaseApp) runTx(mode execMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) { +func (app *BaseApp) runTx(mode sdk.ExecMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) { // NOTE: GasWanted should be returned by the AnteHandler. GasUsed is // determined by the GasMeter. We need access to the context to get the gas // meter, so we initialize upfront. @@ -1020,7 +945,7 @@ func (app *BaseApp) runTx(mode execMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.G // and DeliverTx. An error is returned if any single message fails or if a // Handler does not exist for a given message route. Otherwise, a reference to a // Result is returned. The caller must not commit state if an error is returned. -func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Message, mode execMode) (*sdk.Result, error) { +func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Message, mode sdk.ExecMode) (*sdk.Result, error) { events := sdk.EmptyEvents() var msgResponses []*codectypes.Any diff --git a/baseapp/baseapp_test.go b/baseapp/baseapp_test.go index 2813dfc3ea..e941ee5c37 100644 --- a/baseapp/baseapp_test.go +++ b/baseapp/baseapp_test.go @@ -5,7 +5,9 @@ import ( "context" "crypto/sha256" "fmt" + "io" "math/rand" + "os" "testing" "time" @@ -48,10 +50,9 @@ var ( type ( BaseAppSuite struct { - baseApp *baseapp.BaseApp - cdc *codec.ProtoCodec - txConfig client.TxConfig - logBuffer *bytes.Buffer + baseApp *baseapp.BaseApp + cdc *codec.ProtoCodec + txConfig client.TxConfig } SnapshotsConfig struct { @@ -71,8 +72,7 @@ func NewBaseAppSuite(t *testing.T, opts ...func(*baseapp.BaseApp)) *BaseAppSuite txConfig := authtx.NewTxConfig(cdc, authtx.DefaultSignModes) db := dbm.NewMemDB() - logBuffer := new(bytes.Buffer) - logger := log.NewLogger(logBuffer, log.ColorOption(false)) + logger := log.NewLogger(os.Stdout, log.ColorOption(false)) app := baseapp.NewBaseApp(t.Name(), logger, db, txConfig.TxDecoder(), opts...) require.Equal(t, t.Name(), app.Name()) @@ -88,10 +88,9 @@ func NewBaseAppSuite(t *testing.T, opts ...func(*baseapp.BaseApp)) *BaseAppSuite require.Nil(t, app.LoadLatestVersion()) return &BaseAppSuite{ - baseApp: app, - cdc: cdc, - txConfig: txConfig, - logBuffer: logBuffer, + baseApp: app, + cdc: cdc, + txConfig: txConfig, } } @@ -687,9 +686,30 @@ func TestBaseAppPostHandler(t *testing.T) { tx = wonkyMsg(t, suite.txConfig, tx) txBytes, err = suite.txConfig.TxEncoder()(tx) require.NoError(t, err) - _, err = suite.baseApp.FinalizeBlock(&abci.RequestFinalizeBlock{Height: 1, Txs: [][]byte{txBytes}}) + + output := captureStdout(t, func() { + _, err = suite.baseApp.FinalizeBlock(&abci.RequestFinalizeBlock{Height: 1, Txs: [][]byte{txBytes}}) + require.NoError(t, err) + }) + // Check the captured output + require.NotContains(t, output, "panic recovered in runTx") +} + +func captureStdout(t *testing.T, fn func()) string { + t.Helper() + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + fn() + + w.Close() + os.Stdout = oldStdout + + var buf bytes.Buffer + _, err := io.Copy(&buf, r) require.NoError(t, err) - require.NotContains(t, suite.logBuffer.String(), "panic recovered in runTx") + return buf.String() } func TestBaseAppPostHandlerErrorHandling(t *testing.T) { diff --git a/baseapp/config/gas.go b/baseapp/config/gas.go new file mode 100644 index 0000000000..2386733c9b --- /dev/null +++ b/baseapp/config/gas.go @@ -0,0 +1,12 @@ +package config + +import sdk "github.com/cosmos/cosmos-sdk/types" + +type GasConfig struct { + // queryGasLimit defines the maximum gas for queries; unbounded if 0. + QueryGasLimit uint64 + + // The minimum gas prices a validator is willing to accept for processing a + // transaction. This is mainly used for DoS and spam prevention. + MinGasPrices sdk.DecCoins +} diff --git a/baseapp/options.go b/baseapp/options.go index 2e48c6b6e2..eff6eabfac 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -44,7 +44,7 @@ func SetQueryGasLimit(queryGasLimit uint64) func(*BaseApp) { queryGasLimit = math.MaxUint64 } - return func(bapp *BaseApp) { bapp.queryGasLimit = queryGasLimit } + return func(bapp *BaseApp) { bapp.gasConfig.QueryGasLimit = queryGasLimit } } // SetHaltHeight returns a BaseApp option function that sets the halt block height. diff --git a/baseapp/regression_test.go b/baseapp/regression_test.go index 6e8265718d..01018ddf16 100644 --- a/baseapp/regression_test.go +++ b/baseapp/regression_test.go @@ -9,12 +9,16 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store" storemetrics "cosmossdk.io/store/metrics" + + "github.com/cosmos/cosmos-sdk/baseapp/config" + "github.com/cosmos/cosmos-sdk/baseapp/state" ) // Ensures that error checks are performed before sealing the app. // Please see https://github.com/cosmos/cosmos-sdk/issues/18726 func TestNilCmsCheckBeforeSeal(t *testing.T) { app := new(BaseApp) + app.stateManager = state.NewManager(config.GasConfig{}) // 1. Invoking app.Init with a nil cms MUST not seal the app // and should return an error firstly, which can later be reversed. diff --git a/baseapp/state/manager.go b/baseapp/state/manager.go new file mode 100644 index 0000000000..fb8cb55e73 --- /dev/null +++ b/baseapp/state/manager.go @@ -0,0 +1,137 @@ +package state + +import ( + "fmt" + "sync" + + cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" + + "cosmossdk.io/core/header" + "cosmossdk.io/log" + storetypes "cosmossdk.io/store/types" + + "github.com/cosmos/cosmos-sdk/baseapp/config" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +type Manager struct { + // volatile states: + // + // - checkState is set on InitChain and reset on Commit + // - finalizeBlockState is set on InitChain and FinalizeBlock and set to nil + // on Commit. + // + // - checkState: Used for CheckTx, which is set based on the previous block's + // state. This state is never committed. + // + // - prepareProposalState: Used for PrepareProposal, which is set based on the + // previous block's state. This state is never committed. In case of multiple + // consensus rounds, the state is always reset to the previous block's state. + // + // - processProposalState: Used for ProcessProposal, which is set based on the + // the previous block's state. This state is never committed. In case of + // multiple rounds, the state is always reset to the previous block's state. + // + // - finalizeBlockState: Used for FinalizeBlock, which is set based on the + // previous block's state. This state is committed. + checkState *State + prepareProposalState *State + processProposalState *State + finalizeBlockState *State + stateMut sync.RWMutex + + gasConfig config.GasConfig +} + +func NewManager(gasConfig config.GasConfig) *Manager { + return &Manager{ + gasConfig: gasConfig, + } +} + +func (mgr *Manager) GetState(mode sdk.ExecMode) *State { + mgr.stateMut.RLock() + defer mgr.stateMut.RUnlock() + + switch mode { + case sdk.ExecModeFinalize: + return mgr.finalizeBlockState + + case sdk.ExecModePrepareProposal: + return mgr.prepareProposalState + + case sdk.ExecModeProcessProposal: + return mgr.processProposalState + + default: + return mgr.checkState + } +} + +// SetState sets the BaseApp's state for the corresponding mode with a branched +// multi-store (i.e. a CacheMultiStore) and a new Context with the same +// multi-store branch, and provided header. +func (mgr *Manager) SetState( + mode sdk.ExecMode, + unbranchedStore storetypes.CommitMultiStore, + h cmtproto.Header, + logger log.Logger, + streamingManager storetypes.StreamingManager, +) { + ms := unbranchedStore.CacheMultiStore() + headerInfo := header.Info{ + Height: h.Height, + Time: h.Time, + ChainID: h.ChainID, + AppHash: h.AppHash, + } + baseState := NewState( + sdk.NewContext(ms, h, false, logger). + WithStreamingManager(streamingManager). + WithHeaderInfo(headerInfo), + ms, + ) + + mgr.stateMut.Lock() + defer mgr.stateMut.Unlock() + + switch mode { + case sdk.ExecModeCheck: + baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(mgr.gasConfig.MinGasPrices)) + mgr.checkState = baseState + + case sdk.ExecModePrepareProposal: + mgr.prepareProposalState = baseState + + case sdk.ExecModeProcessProposal: + mgr.processProposalState = baseState + + case sdk.ExecModeFinalize: + mgr.finalizeBlockState = baseState + + default: + panic(fmt.Sprintf("invalid runTxMode for setState: %d", mode)) + } +} + +func (mgr *Manager) ClearState(mode sdk.ExecMode) { + mgr.stateMut.Lock() + defer mgr.stateMut.Unlock() + + switch mode { + case sdk.ExecModeCheck: + mgr.checkState = nil + + case sdk.ExecModePrepareProposal: + mgr.prepareProposalState = nil + + case sdk.ExecModeProcessProposal: + mgr.processProposalState = nil + + case sdk.ExecModeFinalize: + mgr.finalizeBlockState = nil + + default: + panic(fmt.Sprintf("invalid runTxMode for clearState: %d", mode)) + } +} diff --git a/baseapp/state.go b/baseapp/state/state.go similarity index 54% rename from baseapp/state.go rename to baseapp/state/state.go index 19ee4e7eaa..7e3f2e4251 100644 --- a/baseapp/state.go +++ b/baseapp/state/state.go @@ -1,4 +1,4 @@ -package baseapp +package state import ( "sync" @@ -8,28 +8,35 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -type state struct { - ms storetypes.CacheMultiStore +type State struct { + MultiStore storetypes.CacheMultiStore mtx sync.RWMutex ctx sdk.Context } +func NewState(ctx sdk.Context, ms storetypes.CacheMultiStore) *State { + return &State{ + MultiStore: ms, + ctx: ctx, + } +} + // CacheMultiStore calls and returns a CacheMultiStore on the state's underling // CacheMultiStore. -func (st *state) CacheMultiStore() storetypes.CacheMultiStore { - return st.ms.CacheMultiStore() +func (st *State) CacheMultiStore() storetypes.CacheMultiStore { + return st.MultiStore.CacheMultiStore() } // SetContext updates the state's context to the context provided. -func (st *state) SetContext(ctx sdk.Context) { +func (st *State) SetContext(ctx sdk.Context) { st.mtx.Lock() defer st.mtx.Unlock() st.ctx = ctx } // Context returns the Context of the state. -func (st *state) Context() sdk.Context { +func (st *State) Context() sdk.Context { st.mtx.RLock() defer st.mtx.RUnlock() return st.ctx diff --git a/baseapp/test_helpers.go b/baseapp/test_helpers.go index b3aa396a02..9cabad2e32 100644 --- a/baseapp/test_helpers.go +++ b/baseapp/test_helpers.go @@ -54,17 +54,17 @@ func (app *BaseApp) SimTxFinalizeBlock(txEncoder sdk.TxEncoder, tx sdk.Tx) (sdk. // SimWriteState is an entrypoint for simulations only. They are not executed during the normal ABCI finalize // block step but later. Therefor an extra call to the root multi-store (app.cms) is required to write the changes. func (app *BaseApp) SimWriteState() { - app.finalizeBlockState.ms.Write() + app.stateManager.GetState(execModeFinalize).MultiStore.Write() } // NewContextLegacy returns a new sdk.Context with the provided header func (app *BaseApp) NewContextLegacy(isCheckTx bool, header cmtproto.Header) sdk.Context { if isCheckTx { - return sdk.NewContext(app.checkState.ms, header, true, app.logger). - WithMinGasPrices(app.minGasPrices) + return sdk.NewContext(app.stateManager.GetState(execModeCheck).MultiStore, header, true, app.logger). + WithMinGasPrices(app.gasConfig.MinGasPrices) } - return sdk.NewContext(app.finalizeBlockState.ms, header, false, app.logger) + return sdk.NewContext(app.stateManager.GetState(execModeFinalize).MultiStore, header, false, app.logger) } // NewContext returns a new sdk.Context with a empty header diff --git a/baseapp/utils_test.go b/baseapp/utils_test.go index d2f9ce50d0..b447486d19 100644 --- a/baseapp/utils_test.go +++ b/baseapp/utils_test.go @@ -28,6 +28,7 @@ import ( storetypes "cosmossdk.io/store/types" "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/baseapp/state" baseapptestutil "github.com/cosmos/cosmos-sdk/baseapp/testutil" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" @@ -323,18 +324,23 @@ func testLoadVersionHelper(t *testing.T, app *baseapp.BaseApp, expectedHeight in require.Equal(t, expectedID, lastID) } -func getCheckStateCtx(app *baseapp.BaseApp) sdk.Context { +// note: we use reflection in this test because we do not want to make the baseapp state publicly available + +func reflectGetStateCtx(app *baseapp.BaseApp, mode sdk.ExecMode) sdk.Context { v := reflect.ValueOf(app).Elem() - f := v.FieldByName("checkState") + f := v.FieldByName("stateManager") rf := reflect.NewAt(f.Type(), unsafe.Pointer(f.UnsafeAddr())).Elem() - return rf.MethodByName("Context").Call(nil)[0].Interface().(sdk.Context) + arg := reflect.ValueOf(mode) + st := rf.MethodByName("GetState").Call([]reflect.Value{arg})[0].Interface().(*state.State) + return st.Context() +} + +func getCheckStateCtx(app *baseapp.BaseApp) sdk.Context { + return reflectGetStateCtx(app, sdk.ExecModeCheck) } func getFinalizeBlockStateCtx(app *baseapp.BaseApp) sdk.Context { - v := reflect.ValueOf(app).Elem() - f := v.FieldByName("finalizeBlockState") - rf := reflect.NewAt(f.Type(), unsafe.Pointer(f.UnsafeAddr())).Elem() - return rf.MethodByName("Context").Call(nil)[0].Interface().(sdk.Context) + return reflectGetStateCtx(app, sdk.ExecModeFinalize) } func parseTxMemo(t *testing.T, tx sdk.Tx) (counter int64, failOnAnte bool) { diff --git a/go.mod b/go.mod index f9aeaba1cb..c453248720 100644 --- a/go.mod +++ b/go.mod @@ -221,6 +221,8 @@ require ( // // ) +replace cosmossdk.io/store => ./store + // Below are the long-lived replace of the Cosmos SDK replace ( // use cosmos fork of keyring diff --git a/go.sum b/go.sum index 950cf28197..8976076189 100644 --- a/go.sum +++ b/go.sum @@ -630,8 +630,6 @@ cosmossdk.io/math v1.5.3 h1:WH6tu6Z3AUCeHbeOSHg2mt9rnoiUWVWaQ2t6Gkll96U= cosmossdk.io/math v1.5.3/go.mod h1:uqcZv7vexnhMFJF+6zh9EWdm/+Ylyln34IvPnBauPCQ= cosmossdk.io/schema v1.1.0 h1:mmpuz3dzouCoyjjcMcA/xHBEmMChN+EHh8EHxHRHhzE= cosmossdk.io/schema v1.1.0/go.mod h1:Gb7pqO+tpR+jLW5qDcNOSv0KtppYs7881kfzakguhhI= -cosmossdk.io/store v1.1.2 h1:3HOZG8+CuThREKv6cn3WSohAc6yccxO3hLzwK6rBC7o= -cosmossdk.io/store v1.1.2/go.mod h1:60rAGzTHevGm592kFhiUVkNC9w7gooSEn5iUBPzHQ6A= cosmossdk.io/x/tx v0.14.0 h1:hB3O25kIcyDW/7kMTLMaO8Ripj3yqs5imceVd6c/heA= cosmossdk.io/x/tx v0.14.0/go.mod h1:Tn30rSRA1PRfdGB3Yz55W4Sn6EIutr9xtMKSHij+9PM= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index b998745509..672559421e 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -10,6 +10,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" dbm "github.com/cosmos/cosmos-db" @@ -59,7 +60,7 @@ func keysFromStoreKeyMap[V any](m map[types.StoreKey]V) []types.StoreKey { type Store struct { db dbm.DB logger log.Logger - lastCommitInfo *types.CommitInfo + lastCommitInfo atomic.Pointer[types.CommitInfo] pruningManager *pruning.Manager iavlCacheSize int iavlDisableFastNode bool @@ -296,7 +297,7 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { } } - rs.lastCommitInfo = cInfo + rs.lastCommitInfo.Store(cInfo) rs.stores = newStores // load any snapshot heights we missed from disk to be pruned on the next run @@ -445,7 +446,8 @@ func (rs *Store) LatestVersion() int64 { // LastCommitID implements Committer/CommitStore. func (rs *Store) LastCommitID() types.CommitID { - if rs.lastCommitInfo == nil { + info := rs.lastCommitInfo.Load() + if info == nil { emptyHash := sha256.Sum256([]byte{}) appHash := emptyHash[:] return types.CommitID{ @@ -453,22 +455,22 @@ func (rs *Store) LastCommitID() types.CommitID { Hash: appHash, // set empty apphash to sha256([]byte{}) if info is nil } } - if len(rs.lastCommitInfo.CommitID().Hash) == 0 { + if len(info.CommitID().Hash) == 0 { emptyHash := sha256.Sum256([]byte{}) appHash := emptyHash[:] return types.CommitID{ - Version: rs.lastCommitInfo.Version, + Version: info.Version, Hash: appHash, // set empty apphash to sha256([]byte{}) if hash is nil } } - return rs.lastCommitInfo.CommitID() + return info.CommitID() } // Commit implements Committer/CommitStore. func (rs *Store) Commit() types.CommitID { var previousHeight, version int64 - if rs.lastCommitInfo.GetVersion() == 0 && rs.initialVersion > 1 { + if cInfo := rs.lastCommitInfo.Load(); (cInfo == nil || cInfo.Version == 0) && rs.initialVersion > 1 { // This case means that no commit has been made in the store, we // start from initialVersion. version = rs.initialVersion @@ -478,7 +480,11 @@ func (rs *Store) Commit() types.CommitID { // case we increment the version from there, // - or there was no previous commit, and initial version was not set, // in which case we start at version 1. - previousHeight = rs.lastCommitInfo.GetVersion() + if cInfo != nil { + previousHeight = cInfo.Version + } else { + previousHeight = 0 + } version = previousHeight + 1 } @@ -486,9 +492,11 @@ func (rs *Store) Commit() types.CommitID { rs.logger.Debug("commit header and version mismatch", "header_height", rs.commitHeader.Height, "version", version) } - rs.lastCommitInfo = commitStores(version, rs.stores, rs.removalMap) - rs.lastCommitInfo.Timestamp = rs.commitHeader.Time - defer rs.flushMetadata(rs.db, version, rs.lastCommitInfo) + cInfo := commitStores(version, rs.stores, rs.removalMap) + cInfo.Timestamp = rs.commitHeader.Time + rs.lastCommitInfo.Store(cInfo) + + defer rs.flushMetadata(rs.db, version, cInfo) // remove remnants of removed stores for sk := range rs.removalMap { @@ -511,7 +519,7 @@ func (rs *Store) Commit() types.CommitID { return types.CommitID{ Version: version, - Hash: rs.lastCommitInfo.Hash(), + Hash: cInfo.Hash(), } } @@ -762,8 +770,9 @@ func (rs *Store) Query(req *types.RequestQuery) (*types.ResponseQuery, error) { // Otherwise, we query for the commit info from disk. var commitInfo *types.CommitInfo - if res.Height == rs.lastCommitInfo.Version { - commitInfo = rs.lastCommitInfo + cInfo := rs.lastCommitInfo.Load() + if res.Height == cInfo.Version { + commitInfo = cInfo } else { commitInfo, err = rs.GetCommitInfo(res.Height) if err != nil {