From 52c3db2eae1e1c199240d8dd3306657f10af5d4c Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 22 Dec 2023 17:46:17 +0000 Subject: [PATCH] fix(baseapp): introduce mutex to state (backport #18846) (#18863) Co-authored-by: Nikhil Vasan <97126437+nivasan1@users.noreply.github.com> Co-authored-by: marbar3778 Co-authored-by: Aleksandr Bezobchuk --- baseapp/abci.go | 85 +++++++++++++++++++++++----------------------- baseapp/baseapp.go | 12 +++---- baseapp/state.go | 15 +++++++- 3 files changed, 63 insertions(+), 49 deletions(-) diff --git a/baseapp/abci.go b/baseapp/abci.go index d1ab2b53ce..67ebb6dc28 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -69,7 +69,7 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha // done after the finalizeBlockState and context have been set as it's persisted // to state. if req.ConsensusParams != nil { - err := app.StoreConsensusParams(app.finalizeBlockState.ctx, *req.ConsensusParams) + err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams) if err != nil { return nil, err } @@ -81,18 +81,18 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha // handler, the block height is zero by default. However, after Commit is called // the height needs to reflect the true block height. initHeader.Height = req.InitialHeight - app.checkState.ctx = app.checkState.ctx.WithBlockHeader(initHeader). + app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader). WithHeaderInfo(coreheader.Info{ ChainID: req.ChainId, Height: req.InitialHeight, Time: req.Time, - }) - app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockHeader(initHeader). + })) + app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader). WithHeaderInfo(coreheader.Info{ ChainID: req.ChainId, Height: req.InitialHeight, Time: req.Time, - }) + })) }() if app.initChainer == nil { @@ -100,9 +100,9 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha } // add block gas meter for any genesis transactions (allow infinite gas) - app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(storetypes.NewInfiniteGasMeter()) + app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter())) - res, err := app.initChainer(app.finalizeBlockState.ctx, req) + res, err := app.initChainer(app.finalizeBlockState.Context(), req) if err != nil { return nil, err } @@ -398,7 +398,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc return nil, errors.New("PrepareProposal called with invalid height") } - app.prepareProposalState.ctx = app.getContextForProposal(app.prepareProposalState.ctx, req.Height). + app.prepareProposalState.SetContext(app.getContextForProposal(app.prepareProposalState.Context(), req.Height). WithVoteInfos(toVoteInfo(req.LocalLastCommit.Votes)). // this is a set of votes that are not finalized yet, wait for commit WithBlockHeight(req.Height). WithBlockTime(req.Time). @@ -409,11 +409,11 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc ChainID: app.chainID, Height: req.Height, Time: req.Time, - }) + })) - app.prepareProposalState.ctx = app.prepareProposalState.ctx. - WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.ctx)). - WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.ctx)) + app.prepareProposalState.SetContext(app.prepareProposalState.Context(). + WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.Context())). + WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.Context()))) defer func() { if err := recover(); err != nil { @@ -428,7 +428,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc } }() - resp, err = app.prepareProposal(app.prepareProposalState.ctx, req) + resp, err = app.prepareProposal(app.prepareProposalState.Context(), req) if err != nil { app.logger.Error("failed to prepare proposal", "height", req.Height, "time", req.Time, "err", err) return &abci.ResponsePrepareProposal{Txs: req.Txs}, nil @@ -486,7 +486,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc app.setState(execModeFinalize, header) } - app.processProposalState.ctx = app.getContextForProposal(app.processProposalState.ctx, req.Height). + app.processProposalState.SetContext(app.getContextForProposal(app.processProposalState.Context(), req.Height). WithVoteInfos(req.ProposedLastCommit.Votes). // this is a set of votes that are not finalized yet, wait for commit WithBlockHeight(req.Height). WithBlockTime(req.Time). @@ -498,11 +498,11 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc ChainID: app.chainID, Height: req.Height, Time: req.Time, - }) + })) - app.processProposalState.ctx = app.processProposalState.ctx. - WithConsensusParams(app.GetConsensusParams(app.processProposalState.ctx)). - WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.ctx)) + app.processProposalState.SetContext(app.processProposalState.Context(). + WithConsensusParams(app.GetConsensusParams(app.processProposalState.Context())). + WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.Context()))) defer func() { if err := recover(); err != nil { @@ -517,7 +517,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc } }() - resp, err = app.processProposal(app.processProposalState.ctx, req) + resp, err = app.processProposal(app.processProposalState.Context(), req) if err != nil { app.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err) return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil @@ -557,7 +557,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.RequestExtendVote) ( // finalizeBlockState context, otherwise we don't get the uncommitted data // from InitChain. if req.Height == app.initialHeight { - ctx, _ = app.finalizeBlockState.ctx.CacheContext() + ctx, _ = app.finalizeBlockState.Context().CacheContext() } else { emptyHeader := cmtproto.Header{ChainID: app.chainID, Height: req.Height} ms := app.cms.CacheMultiStore() @@ -632,7 +632,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r // finalizeBlockState context, otherwise we don't get the uncommitted data // from InitChain. if req.Height == app.initialHeight { - ctx, _ = app.finalizeBlockState.ctx.CacheContext() + ctx, _ = app.finalizeBlockState.Context().CacheContext() } else { emptyHeader := cmtproto.Header{ChainID: app.chainID, Height: req.Height} ms := app.cms.CacheMultiStore() @@ -675,7 +675,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r // internalFinalizeBlock executes the block, called by the Optimistic // Execution flow or by the FinalizeBlock ABCI method. The context received is -// only used to handle early cancellation, for anything related to state app.finalizeBlockState.ctx +// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context() // must be used. func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { var events []abci.Event @@ -711,7 +711,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request } // Context is now updated with Header information. - app.finalizeBlockState.ctx = app.finalizeBlockState.ctx. + app.finalizeBlockState.SetContext(app.finalizeBlockState.Context(). WithBlockHeader(header). WithHeaderHash(req.Hash). WithHeaderInfo(coreheader.Info{ @@ -721,7 +721,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request Hash: req.Hash, AppHash: app.LastCommitID().Hash, }). - WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.ctx)). + WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())). WithVoteInfos(req.DecidedLastCommit.Votes). WithExecMode(sdk.ExecModeFinalize). WithCometInfo(cometInfo{ @@ -729,16 +729,16 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request ValidatorsHash: req.NextValidatorsHash, ProposerAddress: req.ProposerAddress, LastCommit: req.DecidedLastCommit, - }) + })) // GasMeter must be set after we get a context with updated consensus params. - gasMeter := app.getBlockGasMeter(app.finalizeBlockState.ctx) - app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter) + gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context()) + app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) if app.checkState != nil { - app.checkState.ctx = app.checkState.ctx. + app.checkState.SetContext(app.checkState.Context(). WithBlockGasMeter(gasMeter). - WithHeaderHash(req.Hash) + WithHeaderHash(req.Hash)) } if err := app.preBlock(req); err != nil { @@ -762,8 +762,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request events = append(events, beginBlock.Events...) // Reset the gas meter so that the AnteHandlers aren't required to - gasMeter = app.getBlockGasMeter(app.finalizeBlockState.ctx) - app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter) + gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context()) + app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) // Iterate over all raw transactions in the proposal and attempt to execute // them, gathering the execution results. @@ -804,7 +804,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) } - endBlock, err := app.endBlock(app.finalizeBlockState.ctx) + endBlock, err := app.endBlock(app.finalizeBlockState.Context()) if err != nil { return nil, err } @@ -818,7 +818,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request } events = append(events, endBlock.Events...) - cp := app.GetConsensusParams(app.finalizeBlockState.ctx) + cp := app.GetConsensusParams(app.finalizeBlockState.Context()) return &abci.ResponseFinalizeBlock{ Events: events, @@ -866,7 +866,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons // call the streaming service hooks with the FinalizeBlock messages for _, streamingListener := range app.streamingManager.ABCIListeners { - if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.ctx, *req, *res); err != nil { + if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); err != nil { app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err) } } @@ -900,11 +900,11 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error { // against that height and gracefully halt if it matches the latest committed // height. func (app *BaseApp) Commit() (*abci.ResponseCommit, error) { - header := app.finalizeBlockState.ctx.BlockHeader() + header := app.finalizeBlockState.Context().BlockHeader() retainHeight := app.GetBlockRetentionHeight(header.Height) if app.precommiter != nil { - app.precommiter(app.finalizeBlockState.ctx) + app.precommiter(app.finalizeBlockState.Context()) } rms, ok := app.cms.(*rootmulti.Store) @@ -920,7 +920,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) { abciListeners := app.streamingManager.ABCIListeners if len(abciListeners) > 0 { - ctx := app.finalizeBlockState.ctx + ctx := app.finalizeBlockState.Context() blockHeight := ctx.BlockHeight() changeSet := app.cms.PopStateCache() @@ -940,7 +940,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) { app.finalizeBlockState = nil if app.prepareCheckStater != nil { - app.prepareCheckStater(app.checkState.ctx) + app.prepareCheckStater(app.checkState.Context()) } // The SnapshotIfApplicable method will create the snapshot by starting the goroutine @@ -1105,7 +1105,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.ResponseQuery { // access any state changes made in InitChain. func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context { if height == app.initialHeight { - ctx, _ = app.finalizeBlockState.ctx.CacheContext() + ctx, _ = app.finalizeBlockState.Context().CacheContext() // clear all context data set during InitChain to avoid inconsistent behavior ctx = ctx.WithBlockHeader(cmtproto.Header{}).WithHeaderInfo(coreheader.Info{}) @@ -1212,10 +1212,11 @@ func (app *BaseApp) CreateQueryContext(height int64, prove bool) (sdk.Context, e } // branch the commit multi-store for safety - ctx := sdk.NewContext(cacheMS, app.checkState.ctx.BlockHeader(), true, app.logger). + header := app.checkState.Context().BlockHeader() + ctx := sdk.NewContext(cacheMS, header, true, app.logger). WithMinGasPrices(app.minGasPrices). WithBlockHeight(height). - WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)) + WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)).WithBlockHeader(header) if height != lastBlockHeight { rms, ok := app.cms.(*rootmulti.Store) @@ -1283,7 +1284,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 { // evidence parameters instead of computing an estimated number of blocks based // on the unbonding period and block commitment time as the two should be // equivalent. - cp := app.GetConsensusParams(app.finalizeBlockState.ctx) + cp := app.GetConsensusParams(app.finalizeBlockState.Context()) if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 { retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks } diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 8de14c5f05..a36ecb1922 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -476,7 +476,7 @@ func (app *BaseApp) setState(mode execMode, header cmtproto.Header) { switch mode { case execModeCheck: - baseState.ctx = baseState.ctx.WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices) + baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)) app.checkState = baseState case execModePrepareProposal: @@ -643,7 +643,7 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context { if modeState == nil { panic(fmt.Sprintf("state is nil for mode %v", mode)) } - ctx := modeState.ctx. + ctx := modeState.Context(). WithTxBytes(txBytes) // WithVoteInfos(app.voteInfos) // TODO: identify if this is needed @@ -681,7 +681,7 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error { if app.preBlocker != nil { - ctx := app.finalizeBlockState.ctx + ctx := app.finalizeBlockState.Context() rsp, err := app.preBlocker(ctx, req) if err != nil { return err @@ -693,7 +693,7 @@ func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error { // GasMeter must be set after we get a context with updated consensus params. gasMeter := app.getBlockGasMeter(ctx) ctx = ctx.WithBlockGasMeter(gasMeter) - app.finalizeBlockState.ctx = ctx + app.finalizeBlockState.SetContext(ctx) } } return nil @@ -706,7 +706,7 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock, ) if app.beginBlocker != nil { - resp, err = app.beginBlocker(app.finalizeBlockState.ctx) + resp, err = app.beginBlocker(app.finalizeBlockState.Context()) if err != nil { return resp, err } @@ -768,7 +768,7 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) { var endblock sdk.EndBlock if app.endBlocker != nil { - eb, err := app.endBlocker(app.finalizeBlockState.ctx) + eb, err := app.endBlocker(app.finalizeBlockState.Context()) if err != nil { return endblock, err } diff --git a/baseapp/state.go b/baseapp/state.go index ddfb82f92d..19ee4e7eaa 100644 --- a/baseapp/state.go +++ b/baseapp/state.go @@ -1,13 +1,17 @@ package baseapp import ( + "sync" + storetypes "cosmossdk.io/store/types" sdk "github.com/cosmos/cosmos-sdk/types" ) type state struct { - ms storetypes.CacheMultiStore + ms storetypes.CacheMultiStore + + mtx sync.RWMutex ctx sdk.Context } @@ -17,7 +21,16 @@ func (st *state) CacheMultiStore() storetypes.CacheMultiStore { return st.ms.CacheMultiStore() } +// SetContext updates the state's context to the context provided. +func (st *state) SetContext(ctx sdk.Context) { + st.mtx.Lock() + defer st.mtx.Unlock() + st.ctx = ctx +} + // Context returns the Context of the state. func (st *state) Context() sdk.Context { + st.mtx.RLock() + defer st.mtx.RUnlock() return st.ctx }