Co-authored-by: Facundo Medica <14063057+facundomedica@users.noreply.github.com> Co-authored-by: Facundo <facundomedica@gmail.com>
This commit is contained in:
parent
1376e0dc3f
commit
42dbfc4d70
@ -41,6 +41,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
### Features
|
||||
|
||||
* (server) [#18162](https://github.com/cosmos/cosmos-sdk/pull/18162) Start gRPC & API server in standalone mode.
|
||||
* (baseapp) [#16581](https://github.com/cosmos/cosmos-sdk/pull/16581) Implement Optimistic Execution as an experimental feature (not enabled by default).
|
||||
|
||||
### Improvements
|
||||
|
||||
@ -326,7 +327,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
* (crypto/keyring) [#13734](https://github.com/cosmos/cosmos-sdk/pull/13834) The keyring's `Sign` method now takes a new `signMode` argument. It is only used if the signing key is a Ledger hardware device. You can set it to 0 in all other cases.
|
||||
* (snapshots) [14048](https://github.com/cosmos/cosmos-sdk/pull/14048) Move the Snapshot package to the store package. This is done in an effort group all storage related logic under one package.
|
||||
* (signing) [#13701](https://github.com/cosmos/cosmos-sdk/pull/) Add `context.Context` as an argument `x/auth/signing.VerifySignature`.
|
||||
* (store) [#11825](https://github.com/cosmos/cosmos-sdk/pull/11825) Make extension snapshotter interface safer to use, renamed the util function `WriteExtensionItem` to `WriteExtensionPayload`.
|
||||
* (store) [#11825](https://github.com/cosmos/cosmos-sdk/pull/11825) Make extension snapshotter interface safer to use, renamed the util function `WriteExtensionItem` to `WriteExtensionPayload`.
|
||||
|
||||
### Client Breaking Changes
|
||||
|
||||
@ -401,4 +402,4 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
|
||||
## Previous Versions
|
||||
|
||||
[CHANGELOG of previous versions](https://github.com/cosmos/cosmos-sdk/blob/main/CHANGELOG.md#v0470---2023-03-14).
|
||||
[CHANGELOG of previous versions](https://github.com/cosmos/cosmos-sdk/blob/main/CHANGELOG.md#v0470---2023-03-14).
|
||||
@ -494,6 +494,8 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
|
||||
// processed the first block, as we want to avoid overwriting the finalizeState
|
||||
// after state changes during InitChain.
|
||||
if req.Height > app.initialHeight {
|
||||
// abort any running OE
|
||||
app.optimisticExec.Abort()
|
||||
app.setState(execModeFinalize, header)
|
||||
}
|
||||
|
||||
@ -534,6 +536,19 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
|
||||
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
|
||||
}
|
||||
|
||||
// Only execute optimistic execution if the proposal is accepted, OE is
|
||||
// enabled and the block height is greater than the initial height. During
|
||||
// the first block we'll be carrying state from InitChain, so it would be
|
||||
// impossible for us to easily revert.
|
||||
// After the first block has been processed, the next blocks will get executed
|
||||
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
|
||||
// can have a response ready.
|
||||
if resp.Status == abci.ResponseProcessProposal_ACCEPT &&
|
||||
app.optimisticExec.Enabled() &&
|
||||
req.Height > app.initialHeight {
|
||||
app.optimisticExec.Execute(req)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -671,17 +686,11 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
|
||||
// Specifically, it will execute an application's BeginBlock (if defined), followed
|
||||
// by the transactions in the proposal, finally followed by the application's
|
||||
// EndBlock (if defined).
|
||||
//
|
||||
// For each raw transaction, i.e. a byte slice, BaseApp will only execute it if
|
||||
// it adheres to the sdk.Tx interface. Otherwise, the raw transaction will be
|
||||
// skipped. This is to support compatibility with proposers injecting vote
|
||||
// extensions into the proposal, which should not themselves be executed in cases
|
||||
// where they adhere to the sdk.Tx interface.
|
||||
func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
|
||||
// internalFinalizeBlock executes the block, called by the Optimistic
|
||||
// Execution flow or by the FinalizeBlock ABCI method. The context received is
|
||||
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.ctx
|
||||
// must be used.
|
||||
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
|
||||
var events []abci.Event
|
||||
|
||||
if err := app.checkHalt(req.Height, req.Time); err != nil {
|
||||
@ -754,6 +763,15 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// First check for an abort signal after beginBlock, as it's the first place
|
||||
// we spend any significant amount of time.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
// continue
|
||||
}
|
||||
|
||||
events = append(events, beginBlock.Events...)
|
||||
|
||||
// Iterate over all raw transactions in the proposal and attempt to execute
|
||||
@ -780,6 +798,14 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
|
||||
)
|
||||
}
|
||||
|
||||
// check after every tx if we should abort
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
// continue
|
||||
}
|
||||
|
||||
txResults = append(txResults, response)
|
||||
}
|
||||
|
||||
@ -792,6 +818,14 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check after endBlock if we should abort, to avoid propagating the result
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
// continue
|
||||
}
|
||||
|
||||
events = append(events, endBlock.Events...)
|
||||
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)
|
||||
|
||||
@ -800,10 +834,47 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
|
||||
TxResults: txResults,
|
||||
ValidatorUpdates: endBlock.ValidatorUpdates,
|
||||
ConsensusParamUpdates: &cp,
|
||||
AppHash: app.workingHash(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
|
||||
// Specifically, it will execute an application's BeginBlock (if defined), followed
|
||||
// by the transactions in the proposal, finally followed by the application's
|
||||
// EndBlock (if defined).
|
||||
//
|
||||
// For each raw transaction, i.e. a byte slice, BaseApp will only execute it if
|
||||
// it adheres to the sdk.Tx interface. Otherwise, the raw transaction will be
|
||||
// skipped. This is to support compatibility with proposers injecting vote
|
||||
// extensions into the proposal, which should not themselves be executed in cases
|
||||
// where they adhere to the sdk.Tx interface.
|
||||
func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
|
||||
if app.optimisticExec.Initialized() {
|
||||
// check if the hash we got is the same as the one we are executing
|
||||
aborted := app.optimisticExec.AbortIfNeeded(req.Hash)
|
||||
// Wait for the OE to finish, regardless of whether it was aborted or not
|
||||
res, err := app.optimisticExec.WaitResult()
|
||||
|
||||
// only return if we are not aborting
|
||||
if !aborted {
|
||||
if res != nil {
|
||||
res.AppHash = app.workingHash()
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
// if it was aborted, we need to reset the state
|
||||
app.finalizeBlockState = nil
|
||||
app.optimisticExec.Reset()
|
||||
}
|
||||
|
||||
// if no OE is running, just run the block (this is either a block replay or a OE that got aborted)
|
||||
res, err := app.internalFinalizeBlock(context.Background(), req)
|
||||
if res != nil {
|
||||
res.AppHash = app.workingHash()
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
// checkHalt checkes if height or time exceeds halt-height or halt-time respectively.
|
||||
func (app *BaseApp) checkHalt(height int64, time time.Time) error {
|
||||
var halt bool
|
||||
|
||||
@ -2312,3 +2312,44 @@ func TestABCI_PrepareProposal_Panic(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, len(resPrepareProposal.Txs))
|
||||
}
|
||||
|
||||
func TestOptimisticExecution(t *testing.T) {
|
||||
suite := NewBaseAppSuite(t, baseapp.SetOptimisticExecution())
|
||||
|
||||
_, err := suite.baseApp.InitChain(&abci.RequestInitChain{
|
||||
ConsensusParams: &cmtproto.ConsensusParams{},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// run 50 blocks
|
||||
for i := 0; i < 50; i++ {
|
||||
tx := newTxCounter(t, suite.txConfig, 0, 1)
|
||||
txBytes, err := suite.txConfig.TxEncoder()(tx)
|
||||
require.NoError(t, err)
|
||||
|
||||
reqProcProp := abci.RequestProcessProposal{
|
||||
Txs: [][]byte{txBytes},
|
||||
Height: suite.baseApp.LastBlockHeight() + 1,
|
||||
Hash: []byte("some-hash" + strconv.FormatInt(suite.baseApp.LastBlockHeight()+1, 10)),
|
||||
}
|
||||
|
||||
respProcProp, err := suite.baseApp.ProcessProposal(&reqProcProp)
|
||||
require.Equal(t, abci.ResponseProcessProposal_ACCEPT, respProcProp.Status)
|
||||
require.NoError(t, err)
|
||||
|
||||
reqFinalizeBlock := abci.RequestFinalizeBlock{
|
||||
Height: reqProcProp.Height,
|
||||
Txs: reqProcProp.Txs,
|
||||
Hash: reqProcProp.Hash,
|
||||
}
|
||||
|
||||
respFinalizeBlock, err := suite.baseApp.FinalizeBlock(&reqFinalizeBlock)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, respFinalizeBlock.TxResults, 1)
|
||||
|
||||
_, err = suite.baseApp.Commit()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, int64(50), suite.baseApp.LastBlockHeight())
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"cosmossdk.io/store/snapshots"
|
||||
storetypes "cosmossdk.io/store/types"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp/oe"
|
||||
"github.com/cosmos/cosmos-sdk/codec"
|
||||
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
|
||||
servertypes "github.com/cosmos/cosmos-sdk/server/types"
|
||||
@ -178,6 +179,11 @@ type BaseApp struct {
|
||||
chainID string
|
||||
|
||||
cdc codec.Codec
|
||||
|
||||
// optimisticExec contains the context required for Optimistic Execution,
|
||||
// including the goroutine handling.This is experimental and must be enabled
|
||||
// by developers.
|
||||
optimisticExec *oe.OptimisticExecution
|
||||
}
|
||||
|
||||
// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
|
||||
|
||||
157
baseapp/oe/optimistic_execution.go
Normal file
157
baseapp/oe/optimistic_execution.go
Normal file
@ -0,0 +1,157 @@
|
||||
package oe
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
abci "github.com/cometbft/cometbft/abci/types"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
)
|
||||
|
||||
// FinalizeBlockFunc is the function that is called by the OE to finalize the
|
||||
// block. It is the same as the one in the ABCI app.
|
||||
type FinalizeBlockFunc func(context.Context, *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error)
|
||||
|
||||
// OptimisticExecution is a struct that contains the OE context. It is used to
|
||||
// run the FinalizeBlock function in a goroutine, and to abort it if needed.
|
||||
type OptimisticExecution struct {
|
||||
finalizeBlockFunc FinalizeBlockFunc // ABCI FinalizeBlock function with a context
|
||||
logger log.Logger
|
||||
|
||||
mtx sync.Mutex
|
||||
stopCh chan struct{}
|
||||
request *abci.RequestFinalizeBlock
|
||||
response *abci.ResponseFinalizeBlock
|
||||
err error
|
||||
cancelFunc func() // cancel function for the context
|
||||
initialized bool // A boolean value indicating whether the struct has been initialized
|
||||
|
||||
// debugging/testing options
|
||||
abortRate int // number from 0 to 100 that determines the percentage of OE that should be aborted
|
||||
}
|
||||
|
||||
// NewOptimisticExecution initializes the Optimistic Execution context but does not start it.
|
||||
func NewOptimisticExecution(logger log.Logger, fn FinalizeBlockFunc, opts ...func(*OptimisticExecution)) *OptimisticExecution {
|
||||
logger = logger.With(log.ModuleKey, "oe")
|
||||
oe := &OptimisticExecution{logger: logger, finalizeBlockFunc: fn}
|
||||
for _, opt := range opts {
|
||||
opt(oe)
|
||||
}
|
||||
return oe
|
||||
}
|
||||
|
||||
// WithAbortRate sets the abort rate for the OE. The abort rate is a number from
|
||||
// 0 to 100 that determines the percentage of OE that should be aborted.
|
||||
// This is for testing purposes only and must not be used in production.
|
||||
func WithAbortRate(rate int) func(*OptimisticExecution) {
|
||||
return func(oe *OptimisticExecution) {
|
||||
oe.abortRate = rate
|
||||
}
|
||||
}
|
||||
|
||||
// Reset resets the OE context. Must be called whenever we want to invalidate
|
||||
// the current OE.
|
||||
func (oe *OptimisticExecution) Reset() {
|
||||
oe.mtx.Lock()
|
||||
defer oe.mtx.Unlock()
|
||||
oe.request = nil
|
||||
oe.response = nil
|
||||
oe.err = nil
|
||||
oe.initialized = false
|
||||
}
|
||||
|
||||
func (oe *OptimisticExecution) Enabled() bool {
|
||||
return oe != nil
|
||||
}
|
||||
|
||||
// Initialized returns true if the OE was initialized, meaning that it contains
|
||||
// a request and it was run or it is running.
|
||||
func (oe *OptimisticExecution) Initialized() bool {
|
||||
if oe == nil {
|
||||
return false
|
||||
}
|
||||
oe.mtx.Lock()
|
||||
defer oe.mtx.Unlock()
|
||||
|
||||
return oe.initialized
|
||||
}
|
||||
|
||||
// Execute initializes the OE and starts it in a goroutine.
|
||||
func (oe *OptimisticExecution) Execute(req *abci.RequestProcessProposal) {
|
||||
oe.mtx.Lock()
|
||||
defer oe.mtx.Unlock()
|
||||
|
||||
oe.stopCh = make(chan struct{})
|
||||
oe.request = &abci.RequestFinalizeBlock{
|
||||
Txs: req.Txs,
|
||||
DecidedLastCommit: req.ProposedLastCommit,
|
||||
Misbehavior: req.Misbehavior,
|
||||
Hash: req.Hash,
|
||||
Height: req.Height,
|
||||
Time: req.Time,
|
||||
NextValidatorsHash: req.NextValidatorsHash,
|
||||
ProposerAddress: req.ProposerAddress,
|
||||
}
|
||||
|
||||
oe.logger.Debug("OE started", "height", req.Height, "hash", hex.EncodeToString(req.Hash), "time", req.Time.String())
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
oe.cancelFunc = cancel
|
||||
oe.initialized = true
|
||||
|
||||
go func() {
|
||||
start := time.Now()
|
||||
resp, err := oe.finalizeBlockFunc(ctx, oe.request)
|
||||
oe.mtx.Lock()
|
||||
executionTime := time.Since(start)
|
||||
oe.logger.Debug("OE finished", "duration", executionTime.String(), "height", req.Height, "hash", hex.EncodeToString(req.Hash))
|
||||
oe.response, oe.err = resp, err
|
||||
close(oe.stopCh)
|
||||
oe.mtx.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
// AbortIfNeeded aborts the OE if the request hash is not the same as the one in
|
||||
// the running OE. Returns true if the OE was aborted.
|
||||
func (oe *OptimisticExecution) AbortIfNeeded(reqHash []byte) bool {
|
||||
if oe == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
oe.mtx.Lock()
|
||||
defer oe.mtx.Unlock()
|
||||
|
||||
if !bytes.Equal(oe.request.Hash, reqHash) {
|
||||
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
|
||||
oe.cancelFunc()
|
||||
return true
|
||||
} else if oe.abortRate > 0 && rand.Intn(100) < oe.abortRate {
|
||||
// this is for test purposes only, we can emulate a certain percentage of
|
||||
// OE needed to be aborted.
|
||||
oe.cancelFunc()
|
||||
oe.logger.Error("OE aborted due to test abort rate")
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Abort aborts the OE unconditionally and waits for it to finish.
|
||||
func (oe *OptimisticExecution) Abort() {
|
||||
if oe == nil || oe.cancelFunc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
oe.cancelFunc()
|
||||
<-oe.stopCh
|
||||
}
|
||||
|
||||
// WaitResult waits for the OE to finish and returns the result.
|
||||
func (oe *OptimisticExecution) WaitResult() (*abci.ResponseFinalizeBlock, error) {
|
||||
<-oe.stopCh
|
||||
return oe.response, oe.err
|
||||
}
|
||||
34
baseapp/oe/optimistic_execution_test.go
Normal file
34
baseapp/oe/optimistic_execution_test.go
Normal file
@ -0,0 +1,34 @@
|
||||
package oe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
abci "github.com/cometbft/cometbft/abci/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
)
|
||||
|
||||
func testFinalizeBlock(_ context.Context, _ *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
|
||||
return nil, errors.New("test error")
|
||||
}
|
||||
|
||||
func TestOptimisticExecution(t *testing.T) {
|
||||
oe := NewOptimisticExecution(log.NewNopLogger(), testFinalizeBlock)
|
||||
assert.True(t, oe.Enabled())
|
||||
oe.Execute(&abci.RequestProcessProposal{
|
||||
Hash: []byte("test"),
|
||||
})
|
||||
assert.True(t, oe.Initialized())
|
||||
|
||||
resp, err := oe.WaitResult()
|
||||
assert.Nil(t, resp)
|
||||
assert.EqualError(t, err, "test error")
|
||||
|
||||
assert.False(t, oe.AbortIfNeeded([]byte("test")))
|
||||
assert.True(t, oe.AbortIfNeeded([]byte("wrong_hash")))
|
||||
|
||||
oe.Reset()
|
||||
}
|
||||
@ -13,6 +13,7 @@ import (
|
||||
snapshottypes "cosmossdk.io/store/snapshots/types"
|
||||
storetypes "cosmossdk.io/store/types"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp/oe"
|
||||
"github.com/cosmos/cosmos-sdk/codec"
|
||||
"github.com/cosmos/cosmos-sdk/codec/types"
|
||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||
@ -104,6 +105,13 @@ func SetChainID(chainID string) func(*BaseApp) {
|
||||
return func(app *BaseApp) { app.chainID = chainID }
|
||||
}
|
||||
|
||||
// SetOptimisticExecution enables optimistic execution.
|
||||
func SetOptimisticExecution(opts ...func(*oe.OptimisticExecution)) func(*BaseApp) {
|
||||
return func(app *BaseApp) {
|
||||
app.optimisticExec = oe.NewOptimisticExecution(app.logger, app.internalFinalizeBlock, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (app *BaseApp) SetName(name string) {
|
||||
if app.sealed {
|
||||
panic("SetName() on sealed BaseApp")
|
||||
|
||||
@ -33,6 +33,9 @@ import (
|
||||
upgradetypes "cosmossdk.io/x/upgrade/types"
|
||||
abci "github.com/cometbft/cometbft/abci/types"
|
||||
dbm "github.com/cosmos/cosmos-db"
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/spf13/cast"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp"
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
"github.com/cosmos/cosmos-sdk/client/flags"
|
||||
@ -105,8 +108,6 @@ import (
|
||||
"github.com/cosmos/cosmos-sdk/x/staking"
|
||||
stakingkeeper "github.com/cosmos/cosmos-sdk/x/staking/keeper"
|
||||
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/spf13/cast"
|
||||
)
|
||||
|
||||
const appName = "SimApp"
|
||||
@ -243,7 +244,7 @@ func NewSimApp(
|
||||
voteExtHandler := NewVoteExtensionHandler()
|
||||
voteExtHandler.SetHandlers(bApp)
|
||||
}
|
||||
baseAppOptions = append(baseAppOptions, voteExtOp)
|
||||
baseAppOptions = append(baseAppOptions, voteExtOp, baseapp.SetOptimisticExecution())
|
||||
|
||||
bApp := baseapp.NewBaseApp(appName, logger, db, txConfig.TxDecoder(), baseAppOptions...)
|
||||
bApp.SetCommitMultiStoreTracer(traceStore)
|
||||
|
||||
@ -220,7 +220,7 @@ func NewSimApp(
|
||||
voteExtHandler := NewVoteExtensionHandler()
|
||||
voteExtHandler.SetHandlers(bApp)
|
||||
}
|
||||
baseAppOptions = append(baseAppOptions, voteExtOp)
|
||||
baseAppOptions = append(baseAppOptions, voteExtOp, baseapp.SetOptimisticExecution())
|
||||
|
||||
app.App = appBuilder.Build(db, traceStore, baseAppOptions...)
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/huandu/skiplist"
|
||||
|
||||
@ -49,6 +50,7 @@ type (
|
||||
// priority to other sender txs and must be partially ordered by both sender-nonce
|
||||
// and priority.
|
||||
PriorityNonceMempool[C comparable] struct {
|
||||
mtx sync.Mutex
|
||||
priorityIndex *skiplist.SkipList
|
||||
priorityCounts map[C]int
|
||||
senderIndices map[string]*skiplist.SkipList
|
||||
@ -194,7 +196,9 @@ func (mp *PriorityNonceMempool[C]) NextSenderTx(sender string) sdk.Tx {
|
||||
// Inserting a duplicate tx with a different priority overwrites the existing tx,
|
||||
// changing the total order of the mempool.
|
||||
func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error {
|
||||
if mp.cfg.MaxTx > 0 && mp.CountTx() >= mp.cfg.MaxTx {
|
||||
mp.mtx.Lock()
|
||||
defer mp.mtx.Unlock()
|
||||
if mp.cfg.MaxTx > 0 && mp.priorityIndex.Len() >= mp.cfg.MaxTx {
|
||||
return ErrMempoolTxMaxCapacity
|
||||
} else if mp.cfg.MaxTx < 0 {
|
||||
return nil
|
||||
@ -341,6 +345,8 @@ func (i *PriorityNonceIterator[C]) Tx() sdk.Tx {
|
||||
// NOTE: It is not safe to use this iterator while removing transactions from
|
||||
// the underlying mempool.
|
||||
func (mp *PriorityNonceMempool[C]) Select(_ context.Context, _ [][]byte) Iterator {
|
||||
mp.mtx.Lock()
|
||||
defer mp.mtx.Unlock()
|
||||
if mp.priorityIndex.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -409,12 +415,16 @@ func senderWeight[C comparable](txPriority TxPriority[C], senderCursor *skiplist
|
||||
|
||||
// CountTx returns the number of transactions in the mempool.
|
||||
func (mp *PriorityNonceMempool[C]) CountTx() int {
|
||||
mp.mtx.Lock()
|
||||
defer mp.mtx.Unlock()
|
||||
return mp.priorityIndex.Len()
|
||||
}
|
||||
|
||||
// Remove removes a transaction from the mempool in O(log n) time, returning an
|
||||
// error if unsuccessful.
|
||||
func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error {
|
||||
mp.mtx.Lock()
|
||||
defer mp.mtx.Unlock()
|
||||
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand" // #nosec // math/rand is used for random selection and seeded from crypto/rand
|
||||
"sync"
|
||||
|
||||
"github.com/huandu/skiplist"
|
||||
|
||||
@ -31,6 +32,7 @@ var DefaultMaxTx = 0
|
||||
// Note that PrepareProposal could choose to stop iteration before reaching the
|
||||
// end if maxBytes is reached.
|
||||
type SenderNonceMempool struct {
|
||||
mtx sync.Mutex
|
||||
senders map[string]*skiplist.SkipList
|
||||
rnd *rand.Rand
|
||||
maxTx int
|
||||
@ -116,7 +118,9 @@ func (snm *SenderNonceMempool) NextSenderTx(sender string) sdk.Tx {
|
||||
// Insert adds a tx to the mempool. It returns an error if the tx does not have
|
||||
// at least one signer. Note, priority is ignored.
|
||||
func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
|
||||
if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx {
|
||||
snm.mtx.Lock()
|
||||
defer snm.mtx.Unlock()
|
||||
if snm.maxTx > 0 && len(snm.existingTx) >= snm.maxTx {
|
||||
return ErrMempoolTxMaxCapacity
|
||||
}
|
||||
if snm.maxTx < 0 {
|
||||
@ -155,6 +159,8 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
|
||||
// NOTE: It is not safe to use this iterator while removing transactions from
|
||||
// the underlying mempool.
|
||||
func (snm *SenderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
|
||||
snm.mtx.Lock()
|
||||
defer snm.mtx.Unlock()
|
||||
var senders []string
|
||||
|
||||
senderCursors := make(map[string]*skiplist.Element)
|
||||
@ -184,12 +190,16 @@ func (snm *SenderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
|
||||
|
||||
// CountTx returns the total count of txs in the mempool.
|
||||
func (snm *SenderNonceMempool) CountTx() int {
|
||||
snm.mtx.Lock()
|
||||
defer snm.mtx.Unlock()
|
||||
return len(snm.existingTx)
|
||||
}
|
||||
|
||||
// Remove removes a tx from the mempool. It returns an error if the tx does not
|
||||
// have at least one signer or the tx was not found in the pool.
|
||||
func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error {
|
||||
snm.mtx.Lock()
|
||||
defer snm.mtx.Unlock()
|
||||
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Loading…
Reference in New Issue
Block a user