fix(baseapp): introduce mutex to state (#18846)

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
Nikhil Vasan 2023-12-21 14:05:40 -05:00 committed by GitHub
parent 055487ed29
commit c5191041a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 61 additions and 48 deletions

View File

@ -70,7 +70,7 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
// done after the finalizeBlockState and context have been set as it's persisted
// to state.
if req.ConsensusParams != nil {
err := app.StoreConsensusParams(app.finalizeBlockState.ctx, *req.ConsensusParams)
err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams)
if err != nil {
return nil, err
}
@ -82,18 +82,18 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
// handler, the block height is zero by default. However, after Commit is called
// the height needs to reflect the true block height.
initHeader.Height = req.InitialHeight
app.checkState.ctx = app.checkState.ctx.WithBlockHeader(initHeader).
app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
})
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockHeader(initHeader).
}))
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
})
}))
}()
if app.initChainer == nil {
@ -101,9 +101,9 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
}
// add block gas meter for any genesis transactions (allow infinite gas)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(storetypes.NewInfiniteGasMeter())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))
res, err := app.initChainer(app.finalizeBlockState.ctx, req)
res, err := app.initChainer(app.finalizeBlockState.Context(), req)
if err != nil {
return nil, err
}
@ -410,7 +410,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
return nil, errors.New("PrepareProposal called with invalid height")
}
app.prepareProposalState.ctx = app.getContextForProposal(app.prepareProposalState.ctx, req.Height).
app.prepareProposalState.SetContext(app.getContextForProposal(app.prepareProposalState.Context(), req.Height).
WithVoteInfos(toVoteInfo(req.LocalLastCommit.Votes)). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithProposer(req.ProposerAddress).
@ -425,11 +425,11 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
ChainID: app.chainID,
Height: req.Height,
Time: req.Time,
})
}))
app.prepareProposalState.ctx = app.prepareProposalState.ctx.
WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.ctx)).
WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.ctx))
app.prepareProposalState.SetContext(app.prepareProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.Context())))
defer func() {
if err := recover(); err != nil {
@ -444,7 +444,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
}
}()
resp, err = app.prepareProposal(app.prepareProposalState.ctx, req)
resp, err = app.prepareProposal(app.prepareProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to prepare proposal", "height", req.Height, "time", req.Time, "err", err)
return &abci.ResponsePrepareProposal{Txs: req.Txs}, nil
@ -502,7 +502,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
app.setState(execModeFinalize, header)
}
app.processProposalState.ctx = app.getContextForProposal(app.processProposalState.ctx, req.Height).
app.processProposalState.SetContext(app.getContextForProposal(app.processProposalState.Context(), req.Height).
WithVoteInfos(req.ProposedLastCommit.Votes). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithHeaderHash(req.Hash).
@ -519,11 +519,11 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
ChainID: app.chainID,
Height: req.Height,
Time: req.Time,
})
}))
app.processProposalState.ctx = app.processProposalState.ctx.
WithConsensusParams(app.GetConsensusParams(app.processProposalState.ctx)).
WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.ctx))
app.processProposalState.SetContext(app.processProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.processProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.Context())))
defer func() {
if err := recover(); err != nil {
@ -538,7 +538,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
}
}()
resp, err = app.processProposal(app.processProposalState.ctx, req)
resp, err = app.processProposal(app.processProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
@ -578,7 +578,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.RequestExtendVote) (
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
@ -654,7 +654,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
@ -696,7 +696,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r
// internalFinalizeBlock executes the block, called by the Optimistic
// Execution flow or by the FinalizeBlock ABCI method. The context received is
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.ctx
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context()
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
var events []abci.Event
@ -732,7 +732,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}
// Context is now updated with Header information.
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().
WithBlockHeader(header).
WithHeaderHash(req.Hash).
WithHeaderInfo(coreheader.Info{
@ -742,7 +742,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
Hash: req.Hash,
AppHash: app.LastCommitID().Hash,
}).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.ctx)).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())).
WithVoteInfos(req.DecidedLastCommit.Votes).
WithExecMode(sdk.ExecModeFinalize).
WithCometInfo(corecomet.Info{
@ -750,16 +750,16 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: sdk.ToSDKCommitInfo(req.DecidedLastCommit),
})
}))
// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.ctx)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter)
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
if app.checkState != nil {
app.checkState.ctx = app.checkState.ctx.
app.checkState.SetContext(app.checkState.Context().
WithBlockGasMeter(gasMeter).
WithHeaderHash(req.Hash)
WithHeaderHash(req.Hash))
}
if err := app.preBlock(req); err != nil {
@ -783,8 +783,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
events = append(events, beginBlock.Events...)
// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.ctx)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter)
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
@ -825,7 +825,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}
endBlock, err := app.endBlock(app.finalizeBlockState.ctx)
endBlock, err := app.endBlock(app.finalizeBlockState.Context())
if err != nil {
return nil, err
}
@ -839,7 +839,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}
events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
return &abci.ResponseFinalizeBlock{
Events: events,
@ -887,7 +887,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
// call the streaming service hooks with the FinalizeBlock messages
for _, streamingListener := range app.streamingManager.ABCIListeners {
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.ctx, *req, *res); err != nil {
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); err != nil {
app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err)
}
}
@ -921,11 +921,11 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error {
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
header := app.finalizeBlockState.ctx.BlockHeader()
header := app.finalizeBlockState.Context().BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)
if app.precommiter != nil {
app.precommiter(app.finalizeBlockState.ctx)
app.precommiter(app.finalizeBlockState.Context())
}
rms, ok := app.cms.(*rootmulti.Store)
@ -941,7 +941,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
abciListeners := app.streamingManager.ABCIListeners
if len(abciListeners) > 0 {
ctx := app.finalizeBlockState.ctx
ctx := app.finalizeBlockState.Context()
blockHeight := ctx.BlockHeight()
changeSet := app.cms.PopStateCache()
@ -961,7 +961,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
app.finalizeBlockState = nil
if app.prepareCheckStater != nil {
app.prepareCheckStater(app.checkState.ctx)
app.prepareCheckStater(app.checkState.Context())
}
// The SnapshotIfApplicable method will create the snapshot by starting the goroutine
@ -1126,7 +1126,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.ResponseQuery {
// access any state changes made in InitChain.
func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context {
if height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()
// clear all context data set during InitChain to avoid inconsistent behavior
ctx = ctx.WithHeaderInfo(coreheader.Info{}).WithBlockHeader(cmtproto.Header{})
@ -1236,7 +1236,7 @@ func (app *BaseApp) CreateQueryContext(height int64, prove bool) (sdk.Context, e
ctx := sdk.NewContext(cacheMS, true, app.logger).
WithMinGasPrices(app.minGasPrices).
WithBlockHeight(height).
WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)).WithBlockHeader(app.checkState.ctx.BlockHeader())
WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)).WithBlockHeader(app.checkState.Context().BlockHeader())
if height != lastBlockHeight {
rms, ok := app.cms.(*rootmulti.Store)
@ -1304,7 +1304,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 {
// evidence parameters instead of computing an estimated number of blocks based
// on the unbonding period and block commitment time as the two should be
// equivalent.
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 {
retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks
}

View File

@ -487,7 +487,7 @@ func (app *BaseApp) setState(mode execMode, header cmtproto.Header) {
switch mode {
case execModeCheck:
baseState.ctx = baseState.ctx.WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)
baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices))
app.checkState = baseState
case execModePrepareProposal:
@ -656,7 +656,7 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.ctx.
ctx := modeState.Context().
WithTxBytes(txBytes)
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed
@ -696,7 +696,7 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error {
if app.preBlocker != nil {
ctx := app.finalizeBlockState.ctx
ctx := app.finalizeBlockState.Context()
rsp, err := app.preBlocker(ctx, req)
if err != nil {
return err
@ -708,7 +708,7 @@ func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error {
// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(ctx)
ctx = ctx.WithBlockGasMeter(gasMeter)
app.finalizeBlockState.ctx = ctx
app.finalizeBlockState.SetContext(ctx)
}
}
return nil
@ -721,7 +721,7 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock,
)
if app.beginBlocker != nil {
resp, err = app.beginBlocker(app.finalizeBlockState.ctx)
resp, err = app.beginBlocker(app.finalizeBlockState.Context())
if err != nil {
return resp, err
}
@ -783,7 +783,7 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) {
var endblock sdk.EndBlock
if app.endBlocker != nil {
eb, err := app.endBlocker(app.finalizeBlockState.ctx)
eb, err := app.endBlocker(app.finalizeBlockState.Context())
if err != nil {
return endblock, err
}

View File

@ -1,13 +1,17 @@
package baseapp
import (
"sync"
storetypes "cosmossdk.io/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
type state struct {
ms storetypes.CacheMultiStore
ms storetypes.CacheMultiStore
mtx sync.RWMutex
ctx sdk.Context
}
@ -17,7 +21,16 @@ func (st *state) CacheMultiStore() storetypes.CacheMultiStore {
return st.ms.CacheMultiStore()
}
// SetContext updates the state's context to the context provided.
func (st *state) SetContext(ctx sdk.Context) {
st.mtx.Lock()
defer st.mtx.Unlock()
st.ctx = ctx
}
// Context returns the Context of the state.
func (st *state) Context() sdk.Context {
st.mtx.RLock()
defer st.mtx.RUnlock()
return st.ctx
}