feat: add tx limit to mempool (#14014)

* feat: add bounding max tx to mempool

* add bounded condition

* sligh improvement on generator

* remove unbouded option

* add test

* added mempool options mechanism

* mising test

* seting mempool

* change function name

* change function name

* failing test

* Revert "failing test"

This reverts commit d527982b0d4ec826ff680afb8f43ac1d71809ccf.

* fix import block

* changelog entries

* add ability to do unbounded mempool

* remove unesesary variable

* small comments

* change 0 to mean unbounded

* t

* small test fix

* add the ability to be bounded unbounded and disabled

* t

* set default maxtx

* Update docs/docs/building-apps/02-app-mempool.md

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* example for opts

* remove superflues logs entry

* add mempool to configurations

* fix more understandable name

* remove table in favor of bulletpoints

* sender nonce to unbounded

* Update docs/docs/building-apps/02-app-mempool.md

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* Update types/mempool/sender_nonce.go

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* Update types/mempool/sender_nonce.go

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* Update docs/docs/building-apps/02-app-mempool.md

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* Update server/config/config.go

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* t

* add comment for options

* fix inport

* fix inport

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
Co-authored-by: Emmanuel T Odeke <emmanuel@orijtech.com>
This commit is contained in:
Jeancarlo Barrios 2022-12-01 19:39:55 -05:00 committed by GitHub
parent 3aff993fcb
commit 754ca3169e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 163 additions and 32 deletions

View File

@ -51,3 +51,13 @@ Now that we have walked through the `PrepareProposal` & `ProcessProposal`, we ca
There are countless designs that an application developer can write for a mempool, the core team opted to provide a simple implementation of a nonce mempool. The nonce mempool is a mempool that keeps transactions from an sorted by nonce in order to avoid the issues with nonces.
It works by storing the transation in a list sorted by the transaction nonce. When the proposer asks for transactions to be included in a block it randomly selects a sender and gets the first transaction in the list. It repeats this until the mempool is empty or the block is full.
### Configurations
#### MaxTxs
Its an integer value that sets the mempool in one of three modes, bounded, unbounded, or disabled.
- **negative**: Disabled, mempool does not insert new tx and return early.
- **zero**: Unbounded mempool has no tx limit and will never fail with ErrMempoolTxMaxCapacity.
- **positive**: Bounded, it fails with ErrMempoolTxMaxCapacity when maxTx value is the same as CountTx()

View File

@ -166,6 +166,15 @@ type StateSyncConfig struct {
SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"`
}
// MempoolConfig defines the configurations for the appside mempool
type MempoolConfig struct {
// MaxTxs defines the behavior of the mempool. A negative value indicates
// the mempool is disabled entirely, zero indicates that the mempool is
// unbounded in how many txs it may contain, and a positive value indicates
// the maximum amount of txs it may contain.
MaxTxs int
}
type (
// StoreConfig defines application configuration for state streaming and other
// storage related operations.
@ -200,6 +209,7 @@ type Config struct {
StateSync StateSyncConfig `mapstructure:"state-sync"`
Store StoreConfig `mapstructure:"store"`
Streamers StreamersConfig `mapstructure:"streamers"`
Mempool MempoolConfig `mapstructure:"mempool"`
}
// SetMinGasPrices sets the validator's minimum gas prices.
@ -278,6 +288,9 @@ func DefaultConfig() *Config {
Keys: []string{"*"},
},
},
Mempool: MempoolConfig{
MaxTxs: 0,
},
}
}

View File

@ -212,6 +212,14 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"
###############################################################################
### Mempool ###
###############################################################################
[mempool]
max-txs = "{{ .Mempool.MaxTxs }}"
`
var configTemplate *template.Template

View File

@ -28,6 +28,7 @@ import (
"github.com/cosmos/cosmos-sdk/server/types"
pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types"
"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/cosmos/cosmos-sdk/types/mempool"
)
const (
@ -73,6 +74,9 @@ const (
flagGRPCAddress = "grpc.address"
flagGRPCWebEnable = "grpc-web.enable"
flagGRPCWebAddress = "grpc-web.address"
// mempool flags
FlagMempoolMaxTxs = "mempool.max-txs"
)
// StartCmd runs the service passed in, either stand-alone or in-process with
@ -184,6 +188,8 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
cmd.Flags().Bool(FlagDisableIAVLFastNode, false, "Disable fast node for IAVL tree")
cmd.Flags().Int(FlagMempoolMaxTxs, mempool.DefaultMaxTx, "Sets MaxTx value for the app side mempool")
// add support for all Tendermint-specific command line options
tcmd.AddNodeFlags(cmd)
return cmd

View File

@ -34,6 +34,7 @@ import (
snapshottypes "github.com/cosmos/cosmos-sdk/store/snapshots/types"
simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"
authcmd "github.com/cosmos/cosmos-sdk/x/auth/client/cli"
"github.com/cosmos/cosmos-sdk/x/auth/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
@ -307,6 +308,7 @@ func newApp(
baseapp.SetSnapshot(snapshotStore, snapshotOptions),
baseapp.SetIAVLCacheSize(cast.ToInt(appOpts.Get(server.FlagIAVLCacheSize))),
baseapp.SetIAVLDisableFastNode(cast.ToBool(appOpts.Get(server.FlagDisableIAVLFastNode))),
baseapp.SetMempool(mempool.NewSenderNonceMempool(mempool.SenderNonceMaxTxOpt(cast.ToInt(appOpts.Get(server.FlagMempoolMaxTxs))))),
)
}

View File

@ -36,4 +36,7 @@ type Iterator interface {
Tx() sdk.Tx
}
var ErrTxNotFound = errors.New("tx not found in mempool")
var (
ErrTxNotFound = errors.New("tx not found in mempool")
ErrMempoolTxMaxCapacity = errors.New("pool reached max tx capacity")
)

View File

@ -17,6 +17,8 @@ var (
_ Iterator = (*senderNonceMepoolIterator)(nil)
)
var DefaultMaxTx = 0
// senderNonceMempool is a mempool that prioritizes transactions within a sender by nonce, the lowest first,
// but selects a random sender on each iteration. The mempool is iterated by:
//
@ -26,15 +28,27 @@ var (
//
// Note that PrepareProposal could choose to stop iteration before reaching the end if maxBytes is reached.
type senderNonceMempool struct {
senders map[string]*skiplist.SkipList
rnd *rand.Rand
senders map[string]*skiplist.SkipList
rnd *rand.Rand
maxTx int
existingTx map[txKey]bool
}
type SenderNonceOptions func(mp *senderNonceMempool)
type txKey struct {
address string
nonce uint64
}
// NewSenderNonceMempool creates a new mempool that prioritizes transactions by nonce, the lowest first.
func NewSenderNonceMempool() Mempool {
func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool {
senderMap := make(map[string]*skiplist.SkipList)
existingTx := make(map[txKey]bool)
snp := &senderNonceMempool{
senders: senderMap,
senders: senderMap,
maxTx: DefaultMaxTx,
existingTx: existingTx,
}
var seed int64
@ -44,17 +58,30 @@ func NewSenderNonceMempool() Mempool {
}
snp.setSeed(seed)
for _, opt := range opts {
opt(snp)
}
return snp
}
// NewSenderNonceMempoolWithSeed creates a new mempool that prioritizes transactions by nonce, the lowest first and sets the random seed.
func NewSenderNonceMempoolWithSeed(seed int64) Mempool {
senderMap := make(map[string]*skiplist.SkipList)
snp := &senderNonceMempool{
senders: senderMap,
// SenderNonceSeedOpt Option To add a Seed for random type when calling the constructor NewSenderNonceMempool
// Example:
// > random_seed := int64(1000)
// > NewSenderNonceMempool(SenderNonceSeedTxOpt(random_seed))
func SenderNonceSeedOpt(seed int64) SenderNonceOptions {
return func(snp *senderNonceMempool) {
snp.setSeed(seed)
}
}
// SenderNonceMaxTxOpt Option To set limit of max tx when calling the constructor NewSenderNonceMempool
// Example:
// > NewSenderNonceMempool(SenderNonceMaxTxOpt(100))
func SenderNonceMaxTxOpt(maxTx int) SenderNonceOptions {
return func(snp *senderNonceMempool) {
snp.maxTx = maxTx
}
snp.setSeed(seed)
return snp
}
func (snm *senderNonceMempool) setSeed(seed int64) {
@ -65,6 +92,12 @@ func (snm *senderNonceMempool) setSeed(seed int64) {
// Insert adds a tx to the mempool. It returns an error if the tx does not have at least one signer.
// priority is ignored.
func (snm *senderNonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error {
if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx {
return ErrMempoolTxMaxCapacity
}
if snm.maxTx < 0 {
return nil
}
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
@ -82,7 +115,8 @@ func (snm *senderNonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error {
snm.senders[sender] = senderTxs
}
senderTxs.Set(nonce, tx)
key := txKey{nonce: nonce, address: sender}
snm.existingTx[key] = true
return nil
}
@ -117,14 +151,7 @@ func (snm *senderNonceMempool) Select(_ sdk.Context, _ [][]byte) Iterator {
// CountTx returns the total count of txs in the mempool.
func (snm *senderNonceMempool) CountTx() int {
count := 0
// Disable gosec here since we need neither strong randomness nor deterministic iteration.
// #nosec
for _, value := range snm.senders {
count += value.Len()
}
return count
return len(snm.existingTx)
}
// Remove removes a tx from the mempool. It returns an error if the tx does not have at least one signer or the tx
@ -154,6 +181,10 @@ func (snm *senderNonceMempool) Remove(tx sdk.Tx) error {
if senderTxs.Len() == 0 {
delete(snm.senders, sender)
}
key := txKey{nonce: nonce, address: sender}
delete(snm.existingTx, key)
return nil
}

View File

@ -1,7 +1,6 @@
package mempool_test
import (
"math/rand"
"sort"
"pgregory.net/rapid"
@ -9,7 +8,6 @@ import (
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
sdk "github.com/cosmos/cosmos-sdk/types"
mempool "github.com/cosmos/cosmos-sdk/types/mempool"
simtypes "github.com/cosmos/cosmos-sdk/types/simulation"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
@ -27,18 +25,20 @@ var (
// same elements input on the mempool should be in the output except for sender nonce duplicates, which are overwritten by the later duplicate entries.
// for every sender transaction tx_n, tx_0.nonce < tx_1.nonce ... < tx_n.nonce
var genAddress = rapid.Custom(func(t *rapid.T) simtypes.Account {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(rapid.Int64().Draw(t, "seed for account"))), 1)
return accounts[0]
})
func AddressGenerator(t *rapid.T) *rapid.Generator[sdk.AccAddress] {
return rapid.Custom(func(t *rapid.T) sdk.AccAddress {
pkBz := rapid.SliceOfN(rapid.Byte(), 20, 20).Draw(t, "hex")
return sdk.AccAddress(pkBz)
})
}
func testMempoolProperties(t *rapid.T) {
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
mp := mempool.NewSenderNonceMempool()
genMultipleAddress := rapid.SliceOfNDistinct(genAddress, 1, 10, func(acc simtypes.Account) string {
return acc.Address.String()
genMultipleAddress := rapid.SliceOfNDistinct(AddressGenerator(t), 1, 10, func(acc sdk.AccAddress) string {
return acc.String()
})
accounts := genMultipleAddress.Draw(t, "address")
@ -46,10 +46,10 @@ func testMempoolProperties(t *rapid.T) {
return testTx{
priority: rapid.Int64Range(0, 1000).Draw(t, "priority"),
nonce: rapid.Uint64().Draw(t, "nonce"),
address: rapid.SampledFrom(accounts).Draw(t, "acc").Address,
address: rapid.SampledFrom(accounts).Draw(t, "acc"),
}
})
genMultipleTX := rapid.SliceOfN(genTx, 1, 500)
genMultipleTX := rapid.SliceOfN(genTx, 1, 5000)
txs := genMultipleTX.Draw(t, "txs")
senderTxRaw := getSenderTxMap(txs)
@ -61,6 +61,7 @@ func testMempoolProperties(t *rapid.T) {
iter := mp.Select(ctx, nil)
orderTx := fetchAllTxs(iter)
require.Equal(t, len(orderTx), mp.CountTx())
senderTxOrdered := getSenderTxMap(orderTx)
for key := range senderTxOrdered {
ordered, found := senderTxOrdered[key]

View File

@ -114,7 +114,7 @@ func (s *MempoolTestSuite) TestTxOrder() {
}
for i, tt := range tests {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
pool := mempool.NewSenderNonceMempoolWithSeed(tt.seed)
pool := mempool.NewSenderNonceMempool(mempool.SenderNonceSeedOpt(tt.seed))
// create test txs and insert into mempool
for i, ts := range tt.txs {
tx := testTx{id: i, priority: int64(ts.p), nonce: uint64(ts.n), address: ts.a}
@ -137,3 +137,60 @@ func (s *MempoolTestSuite) TestTxOrder() {
})
}
}
func (s *MempoolTestSuite) TestMaxTx() {
t := s.T()
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1)
mp := mempool.NewSenderNonceMempool(mempool.SenderNonceMaxTxOpt(1))
tx := testTx{
nonce: 0,
address: accounts[0].Address,
priority: rand.Int63(),
}
tx2 := testTx{
nonce: 1,
address: accounts[0].Address,
priority: rand.Int63(),
}
// empty mempool behavior
require.Equal(t, 0, s.mempool.CountTx())
itr := mp.Select(ctx, nil)
require.Nil(t, itr)
ctx = ctx.WithPriority(tx.priority)
err := mp.Insert(ctx, tx)
require.NoError(t, err)
ctx = ctx.WithPriority(tx.priority)
err = mp.Insert(ctx, tx2)
require.Equal(t, mempool.ErrMempoolTxMaxCapacity, err)
}
func (s *MempoolTestSuite) TestTxNotFoundOnSender() {
t := s.T()
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1)
mp := mempool.NewSenderNonceMempool()
txSender := testTx{
nonce: 0,
address: accounts[0].Address,
priority: rand.Int63(),
}
tx := testTx{
nonce: 1,
address: accounts[0].Address,
priority: rand.Int63(),
}
ctx = ctx.WithPriority(tx.priority)
err := mp.Insert(ctx, txSender)
require.NoError(t, err)
err = mp.Remove(tx)
require.Equal(t, mempool.ErrTxNotFound, err)
}