refactor(mempool)!: match server/v2/cometbft and sdk mempool interface (#21744)

Co-authored-by: Marko <marko@baricevic.me>
This commit is contained in:
Julien Robert 2024-09-18 15:57:52 +02:00 committed by GitHub
parent f1dd03f212
commit 356df96770
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 53 additions and 36 deletions

View File

@ -50,6 +50,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i
### Improvements
* (genutil) [#21701](https://github.com/cosmos/cosmos-sdk/pull/21701) Improved error messages for genesis validation.
* (sims)[#21613](https://github.com/cosmos/cosmos-sdk/pull/21613) Add sims2 framework and factory methods for simpler message factories in modules
### Bug Fixes
@ -58,7 +59,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i
### API Breaking Changes
* (sims)[#21613](https://github.com/cosmos/cosmos-sdk/pull/21613) Add sims2 framework and factory methods for simpler message factories in modules
* (types/mempool) [#21744](https://github.com/cosmos/cosmos-sdk/pull/21744) Update types/mempool.Mempool interface to take decoded transactions. This avoid to decode the transaction twice.
### Deprecated

View File

@ -264,19 +264,25 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
defer h.txSelector.Clear()
// decode transactions
decodedTxs := make([]sdk.Tx, len(req.Txs))
for i, txBz := range req.Txs {
tx, err := h.txVerifier.TxDecode(txBz)
if err != nil {
return nil, err
}
decodedTxs[i] = tx
}
// If the mempool is nil or NoOp we simply return the transactions
// requested from CometBFT, which, by default, should be in FIFO order.
//
// Note, we still need to ensure the transactions returned respect req.MaxTxBytes.
_, isNoOp := h.mempool.(mempool.NoOpMempool)
if h.mempool == nil || isNoOp {
for _, txBz := range req.Txs {
tx, err := h.txVerifier.TxDecode(txBz)
if err != nil {
return nil, err
}
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, txBz)
for i, tx := range decodedTxs {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, req.Txs[i])
if stop {
break
}
@ -291,7 +297,7 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
selectedTxsNums int
invalidTxs []sdk.Tx // invalid txs to be removed out of the loop to avoid dead lock
)
h.mempool.SelectBy(ctx, req.Txs, func(memTx sdk.Tx) bool {
h.mempool.SelectBy(ctx, decodedTxs, func(memTx sdk.Tx) bool {
unorderedTx, ok := memTx.(sdk.TxWithUnordered)
isUnordered := ok && unorderedTx.GetUnordered()
txSignersSeqs := make(map[string]uint64)

View File

@ -691,6 +691,7 @@ func (s *ABCIUtilsTestSuite) TestDefaultProposalHandler_PriorityNonceMempoolTxSe
ph := baseapp.NewDefaultProposalHandler(mp, app)
for _, v := range tc.txInputs {
app.EXPECT().TxDecode(v.bz).Return(v.tx, nil).AnyTimes()
app.EXPECT().PrepareProposalVerifyTx(v.tx).Return(v.bz, nil).AnyTimes()
s.NoError(mp.Insert(s.ctx.WithPriority(v.priority), v.tx))
tc.req.Txs = append(tc.req.Txs, v.bz)

View File

@ -467,9 +467,10 @@ func (c *Consensus[T]) FinalizeBlock(
}
// remove txs from the mempool
err = c.mempool.Remove(decodedTxs)
if err != nil {
return nil, fmt.Errorf("unable to remove txs: %w", err)
for _, tx := range decodedTxs {
if err = c.mempool.Remove(tx); err != nil {
return nil, fmt.Errorf("unable to remove tx: %w", err)
}
}
c.lastCommittedHeight.Store(req.Height)

View File

@ -79,7 +79,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
// check again.
_, err := app.ValidateTx(ctx, memTx)
if err != nil {
err := h.mempool.Remove([]T{memTx})
err := h.mempool.Remove(memTx)
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
return nil, err
}

View File

@ -15,5 +15,6 @@ type MockMempool[T transaction.Tx] struct{}
func (MockMempool[T]) Insert(context.Context, T) error { return nil }
func (MockMempool[T]) Select(context.Context, []T) mempool.Iterator[T] { return nil }
func (MockMempool[T]) SelectBy(context.Context, []T, func(T) bool) {}
func (MockMempool[T]) CountTx() int { return 0 }
func (MockMempool[T]) Remove([]T) error { return nil }
func (MockMempool[T]) Remove(T) error { return nil }

View File

@ -19,13 +19,18 @@ type Mempool[T transaction.Tx] interface {
Insert(context.Context, T) error
// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator must be
// closed by the caller.
// then they shall be incorporated into the Iterator. The Iterator is not thread-safe to use.
Select(context.Context, []T) Iterator[T]
// SelectBy use callback to iterate over the mempool, it's thread-safe to use.
SelectBy(context.Context, []T, func(T) bool)
// CountTx returns the number of transactions currently in the mempool.
CountTx() int
// Remove attempts to remove a transaction from the mempool, returning an error
// upon failure.
Remove([]T) error
Remove(T) error
}
// Iterator defines an app-side mempool iterator interface that is as minimal as

View File

@ -4,8 +4,11 @@ import (
"context"
"cosmossdk.io/core/transaction"
sdk "github.com/cosmos/cosmos-sdk/types"
)
var _ Mempool[sdk.Tx] = (*NoOpMempool[sdk.Tx])(nil) // verify interface at compile time
var _ Mempool[transaction.Tx] = (*NoOpMempool[transaction.Tx])(nil)
// NoOpMempool defines a no-op mempool. Transactions are completely discarded and
@ -16,7 +19,8 @@ var _ Mempool[transaction.Tx] = (*NoOpMempool[transaction.Tx])(nil)
// is FIFO-ordered by default.
type NoOpMempool[T transaction.Tx] struct{}
func (NoOpMempool[T]) Insert(context.Context, T) error { return nil }
func (NoOpMempool[T]) Select(context.Context, []T) Iterator[T] { return nil }
func (NoOpMempool[T]) CountTx() int { return 0 }
func (NoOpMempool[T]) Remove([]T) error { return nil }
func (NoOpMempool[T]) Insert(context.Context, T) error { return nil }
func (NoOpMempool[T]) Select(context.Context, []T) Iterator[T] { return nil }
func (NoOpMempool[T]) SelectBy(context.Context, []T, func(T) bool) {}
func (NoOpMempool[T]) CountTx() int { return 0 }
func (NoOpMempool[T]) Remove(T) error { return nil }

View File

@ -35,6 +35,7 @@ require (
github.com/cosmos/cosmos-db v1.0.3-0.20240911104526-ddc3f09bfc22 // indirect
// this version is not used as it is always replaced by the latest Cosmos SDK version
github.com/cosmos/cosmos-sdk v0.53.0
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
@ -194,7 +195,6 @@ require (
github.com/sasha-s/go-deadlock v0.3.5 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.13 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect

View File

@ -91,13 +91,11 @@ func initCometConfig() cometbft.CfgOption {
func initCometOptions[T transaction.Tx]() cometbft.ServerOptions[T] {
serverOptions := cometbft.DefaultServerOptions[T]()
// TODO mempool interface doesn't match!
// overwrite app mempool, using max-txs option
// serverOptions.Mempool = func(cfg map[string]any) mempool.Mempool[T] {
// if maxTxs := cast.ToInt(cfg[cometbft.FlagMempoolMaxTxs]); maxTxs >= 0 {
// return mempool.NewSenderNonceMempool(
// mempool.SenderNonceMaxTxOpt(maxTxs),
// return sdkmempool.NewSenderNonceMempool(
// sdkmempool.SenderNonceMaxTxOpt(maxTxs),
// )
// }

View File

@ -14,10 +14,10 @@ type Mempool interface {
// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator is not thread-safe to use.
Select(context.Context, [][]byte) Iterator
Select(context.Context, []sdk.Tx) Iterator
// SelectBy use callback to iterate over the mempool, it's thread-safe to use.
SelectBy(context.Context, [][]byte, func(sdk.Tx) bool)
SelectBy(context.Context, []sdk.Tx, func(sdk.Tx) bool)
// CountTx returns the number of transactions currently in the mempool.
CountTx() int

View File

@ -17,7 +17,7 @@ var _ Mempool = (*NoOpMempool)(nil)
type NoOpMempool struct{}
func (NoOpMempool) Insert(context.Context, sdk.Tx) error { return nil }
func (NoOpMempool) Select(context.Context, [][]byte) Iterator { return nil }
func (NoOpMempool) SelectBy(context.Context, [][]byte, func(sdk.Tx) bool) {}
func (NoOpMempool) Select(context.Context, []sdk.Tx) Iterator { return nil }
func (NoOpMempool) SelectBy(context.Context, []sdk.Tx, func(sdk.Tx) bool) {}
func (NoOpMempool) CountTx() int { return 0 }
func (NoOpMempool) Remove(sdk.Tx) error { return nil }

View File

@ -361,13 +361,13 @@ func (i *PriorityNonceIterator[C]) Tx() sdk.Tx {
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (mp *PriorityNonceMempool[C]) Select(ctx context.Context, txs [][]byte) Iterator {
func (mp *PriorityNonceMempool[C]) Select(ctx context.Context, txs []sdk.Tx) Iterator {
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.doSelect(ctx, txs)
}
func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ [][]byte) Iterator {
func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ []sdk.Tx) Iterator {
if mp.priorityIndex.Len() == 0 {
return nil
}
@ -383,7 +383,7 @@ func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ [][]byte) Itera
}
// SelectBy will hold the mutex during the iteration, callback returns if continue.
func (mp *PriorityNonceMempool[C]) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
func (mp *PriorityNonceMempool[C]) SelectBy(ctx context.Context, txs []sdk.Tx, callback func(sdk.Tx) bool) {
mp.mtx.Lock()
defer mp.mtx.Unlock()

View File

@ -167,13 +167,13 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (snm *SenderNonceMempool) Select(ctx context.Context, txs [][]byte) Iterator {
func (snm *SenderNonceMempool) Select(ctx context.Context, txs []sdk.Tx) Iterator {
snm.mtx.Lock()
defer snm.mtx.Unlock()
return snm.doSelect(ctx, txs)
}
func (snm *SenderNonceMempool) doSelect(_ context.Context, _ [][]byte) Iterator {
func (snm *SenderNonceMempool) doSelect(_ context.Context, _ []sdk.Tx) Iterator {
var senders []string
senderCursors := make(map[string]*skiplist.Element)
@ -202,7 +202,7 @@ func (snm *SenderNonceMempool) doSelect(_ context.Context, _ [][]byte) Iterator
}
// SelectBy will hold the mutex during the iteration, callback returns if continue.
func (snm *SenderNonceMempool) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
func (snm *SenderNonceMempool) SelectBy(ctx context.Context, txs []sdk.Tx, callback func(sdk.Tx) bool) {
snm.mtx.Lock()
defer snm.mtx.Unlock()