feat: sender mempool impl (#13888)

* draft sender mempool impl

* select

* nit

* random sender update

* nit

* prevent memory leak

* fix nil return

* small fixes

* added tests

* change count

* finish tx order test removed the three address test due to make the test to bloated when including non determinism

* remove unsued variable

* nit

* fix

* temoral commit braking

* nit most

* nit most

* final

* comments

* t

* comments

* test

* add nolint

* Fix comment

* golint comment

* golint

* improve format?

* more gosec disable

* Fix ctr usage

* use #nosec

* Update types/mempool/sender_nonce.go

* Kocubinski/random sender nonce (#13956)

* refactor

* fix iteration logic

* fix merge err

* import fixes

* derive order randomness from seed only

* gosec fix

* ignore gosec again

* comments

* property based

* minor fixes

* added property test

* comment

* fix imports

* comment

* Update types/mempool/sender_nonce_property_test.go

Co-authored-by: Matt Kocubinski <mkocubinski@gmail.com>

* remove unesessary loop

* improve function name

* Update types/mempool/sender_nonce.go

Co-authored-by: Facundo Medica <14063057+facundomedica@users.noreply.github.com>

* change import name

* change validation to be preemvtive

Co-authored-by: Matt Kocubinski <mkocubinski@gmail.com>
Co-authored-by: Facundo Medica <14063057+facundomedica@users.noreply.github.com>
This commit is contained in:
Jeancarlo Barrios 2022-11-23 18:15:41 -05:00 committed by GitHub
parent 644f906966
commit 908fda1f53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 344 additions and 222 deletions

View File

@ -47,7 +47,7 @@ func TestABCIv1TestSuite(t *testing.T) {
func (s *ABCIv1TestSuite) SetupTest() {
t := s.T()
anteKey := []byte("ante-key")
pool := mempool.NewNonceMempool()
pool := mempool.NewSenderNonceMempool()
anteOpt := func(bapp *baseapp.BaseApp) {
bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, anteKey))
}

View File

@ -170,7 +170,7 @@ func NewBaseApp(
}
if app.mempool == nil {
app.SetMempool(mempool.NewNonceMempool())
app.SetMempool(mempool.NewSenderNonceMempool())
}
if app.processProposal == nil {

View File

@ -150,7 +150,7 @@ func makeTestConfig() depinject.Config {
}
func makeMinimalConfig() depinject.Config {
var mempoolOpt runtime.BaseAppOption = baseapp.SetMempool(mempool.NewNonceMempool())
var mempoolOpt runtime.BaseAppOption = baseapp.SetMempool(mempool.NewSenderNonceMempool())
return depinject.Configs(
depinject.Supply(mempoolOpt),
appconfig.Compose(&appv1alpha1.Config{

View File

@ -132,6 +132,7 @@ func (s *MempoolTestSuite) TestDefaultMempool() {
for i := 0; i < txCount; i++ {
acc := accounts[i%len(accounts)]
tx := testTx{
nonce: 0,
address: acc.Address,
priority: rand.Int63(),
}
@ -201,7 +202,7 @@ type MempoolTestSuite struct {
func (s *MempoolTestSuite) resetMempool() {
s.iterations = 0
s.mempool = mempool.NewNonceMempool()
s.mempool = mempool.NewSenderNonceMempool()
}
func (s *MempoolTestSuite) SetupTest() {

View File

@ -1,125 +0,0 @@
package mempool
import (
"fmt"
huandu "github.com/huandu/skiplist"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
)
var (
_ Mempool = (*nonceMempool)(nil)
_ Iterator = (*nonceMempoolIterator)(nil)
)
// nonceMempool is a mempool that keeps transactions sorted by nonce. Transactions
// with the lowest nonce globally are prioritized. Transactions with the same
// nonce are prioritized by sender address. Fee/gas based prioritization is not
// supported.
type nonceMempool struct {
txQueue *huandu.SkipList
}
type nonceMempoolIterator struct {
currentTx *huandu.Element
}
func (i nonceMempoolIterator) Next() Iterator {
if i.currentTx == nil {
return nil
} else if n := i.currentTx.Next(); n != nil {
return nonceMempoolIterator{currentTx: n}
} else {
return nil
}
}
func (i nonceMempoolIterator) Tx() sdk.Tx {
return i.currentTx.Value.(sdk.Tx)
}
type txKey struct {
nonce uint64
sender string
}
// txKeyLessNonce compares two txKeys by nonce then by sender address.
func txKeyLessNonce(a, b any) int {
keyA := a.(txKey)
keyB := b.(txKey)
res := huandu.Uint64.Compare(keyB.nonce, keyA.nonce)
if res != 0 {
return res
}
return huandu.String.Compare(keyB.sender, keyA.sender)
}
// NewNonceMempool creates a new mempool that prioritizes transactions by nonce, the lowest first.
func NewNonceMempool() Mempool {
sp := &nonceMempool{
txQueue: huandu.New(huandu.LessThanFunc(txKeyLessNonce)),
}
return sp
}
// Insert adds a tx to the mempool. It returns an error if the tx does not have at least one signer.
// priority is ignored.
func (sp nonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
}
if len(sigs) == 0 {
return fmt.Errorf("tx must have at least one signer")
}
sig := sigs[0]
sender := sig.PubKey.Address().String()
nonce := sig.Sequence
tk := txKey{nonce: nonce, sender: sender}
sp.txQueue.Set(tk, tx)
return nil
}
// Select returns an iterator ordering transactions the mempool with the lowest nonce globally first. A sender's txs
// will always be returned in nonce order.
func (sp nonceMempool) Select(_ sdk.Context, _ [][]byte) Iterator {
currentTx := sp.txQueue.Front()
if currentTx == nil {
return nil
}
return &nonceMempoolIterator{currentTx: currentTx}
}
// CountTx returns the number of txs in the mempool.
func (sp nonceMempool) CountTx() int {
return sp.txQueue.Len()
}
// Remove removes a tx from the mempool. It returns an error if the tx does not have at least one signer or the tx
// was not found in the pool.
func (sp nonceMempool) Remove(tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
}
if len(sigs) == 0 {
return fmt.Errorf("tx must have at least one signer")
}
sig := sigs[0]
sender := sig.PubKey.Address().String()
nonce := sig.Sequence
tk := txKey{nonce: nonce, sender: sender}
res := sp.txQueue.Remove(tk)
if res == nil {
return ErrTxNotFound
}
return nil
}

View File

@ -0,0 +1,202 @@
package mempool
import (
crand "crypto/rand" // #nosec // crypto/rand is used for seed generation
"encoding/binary"
"fmt"
"math/rand" // #nosec // math/rand is used for random selection and seeded from crypto/rand
"github.com/huandu/skiplist"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
)
var (
_ Mempool = (*senderNonceMempool)(nil)
_ Iterator = (*senderNonceMepoolIterator)(nil)
)
// senderNonceMempool is a mempool that prioritizes transactions within a sender by nonce, the lowest first,
// but selects a random sender on each iteration. The mempool is iterated by:
//
// 1) Maintaining a separate list of nonce ordered txs per sender
// 2) For each select iteration, randomly choose a sender and pick the next nonce ordered tx from their list
// 3) Repeat 1,2 until the mempool is exhausted
//
// Note that PrepareProposal could choose to stop iteration before reaching the end if maxBytes is reached.
type senderNonceMempool struct {
senders map[string]*skiplist.SkipList
rnd *rand.Rand
}
// NewSenderNonceMempool creates a new mempool that prioritizes transactions by nonce, the lowest first.
func NewSenderNonceMempool() Mempool {
senderMap := make(map[string]*skiplist.SkipList)
snp := &senderNonceMempool{
senders: senderMap,
}
var seed int64
err := binary.Read(crand.Reader, binary.BigEndian, &seed)
if err != nil {
panic(err)
}
snp.setSeed(seed)
return snp
}
// NewSenderNonceMempoolWithSeed creates a new mempool that prioritizes transactions by nonce, the lowest first and sets the random seed.
func NewSenderNonceMempoolWithSeed(seed int64) Mempool {
senderMap := make(map[string]*skiplist.SkipList)
snp := &senderNonceMempool{
senders: senderMap,
}
snp.setSeed(seed)
return snp
}
func (snm *senderNonceMempool) setSeed(seed int64) {
s1 := rand.NewSource(seed)
snm.rnd = rand.New(s1) //#nosec // math/rand is seeded from crypto/rand by default
}
// Insert adds a tx to the mempool. It returns an error if the tx does not have at least one signer.
// priority is ignored.
func (snm *senderNonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
}
if len(sigs) == 0 {
return fmt.Errorf("tx must have at least one signer")
}
sig := sigs[0]
sender := sig.PubKey.Address().String()
nonce := sig.Sequence
senderTxs, found := snm.senders[sender]
if !found {
senderTxs = skiplist.New(skiplist.Uint64)
snm.senders[sender] = senderTxs
}
senderTxs.Set(nonce, tx)
return nil
}
// Select returns an iterator ordering transactions the mempool with the lowest nonce of a random selected sender first.
func (snm *senderNonceMempool) Select(_ sdk.Context, _ [][]byte) Iterator {
var senders []string
senderCursors := make(map[string]*skiplist.Element)
orderedSenders := skiplist.New(skiplist.String)
// #nosec
for s := range snm.senders {
orderedSenders.Set(s, s)
}
s := orderedSenders.Front()
for s != nil {
sender := s.Value.(string)
senders = append(senders, sender)
senderCursors[sender] = snm.senders[sender].Front()
s = s.Next()
}
iter := &senderNonceMepoolIterator{
senders: senders,
rnd: snm.rnd,
senderCursors: senderCursors,
}
return iter.Next()
}
// CountTx returns the total count of txs in the mempool.
func (snm *senderNonceMempool) CountTx() int {
count := 0
// Disable gosec here since we need neither strong randomness nor deterministic iteration.
// #nosec
for _, value := range snm.senders {
count += value.Len()
}
return count
}
// Remove removes a tx from the mempool. It returns an error if the tx does not have at least one signer or the tx
// was not found in the pool.
func (snm *senderNonceMempool) Remove(tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
}
if len(sigs) == 0 {
return fmt.Errorf("tx must have at least one signer")
}
sig := sigs[0]
sender := sig.PubKey.Address().String()
nonce := sig.Sequence
senderTxs, found := snm.senders[sender]
if !found {
return ErrTxNotFound
}
res := senderTxs.Remove(nonce)
if res == nil {
return ErrTxNotFound
}
if senderTxs.Len() == 0 {
delete(snm.senders, sender)
}
return nil
}
type senderNonceMepoolIterator struct {
rnd *rand.Rand
currentTx *skiplist.Element
senders []string
senderCursors map[string]*skiplist.Element
}
// Next returns the next iterator state which will contain a tx with the next smallest nonce of a randomly
// selected sender.
func (i *senderNonceMepoolIterator) Next() Iterator {
for len(i.senders) > 0 {
senderIndex := i.rnd.Intn(len(i.senders))
sender := i.senders[senderIndex]
senderCursor, found := i.senderCursors[sender]
if !found {
i.senders = removeAtIndex(i.senders, senderIndex)
continue
}
if nextCursor := senderCursor.Next(); nextCursor != nil {
i.senderCursors[sender] = nextCursor
} else {
i.senders = removeAtIndex(i.senders, senderIndex)
}
return &senderNonceMepoolIterator{
senders: i.senders,
currentTx: senderCursor,
rnd: i.rnd,
senderCursors: i.senderCursors,
}
}
return nil
}
func (i *senderNonceMepoolIterator) Tx() sdk.Tx {
return i.currentTx.Value.(sdk.Tx)
}
func removeAtIndex[T any](slice []T, index int) []T {
return append(slice[:index], slice[index+1:]...)
}

View File

@ -0,0 +1,115 @@
package mempool_test
import (
"math/rand"
"sort"
"pgregory.net/rapid"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
sdk "github.com/cosmos/cosmos-sdk/types"
mempool "github.com/cosmos/cosmos-sdk/types/mempool"
simtypes "github.com/cosmos/cosmos-sdk/types/simulation"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)
var (
_ sdk.Tx = (*testTx)(nil)
_ signing.SigVerifiableTx = (*testTx)(nil)
_ cryptotypes.PubKey = (*testPubKey)(nil)
)
// Property Based Testing
// Split the senders tx in independent slices and then test the following properties in each slice
// same elements input on the mempool should be in the output except for sender nonce duplicates, which are overwritten by the later duplicate entries.
// for every sender transaction tx_n, tx_0.nonce < tx_1.nonce ... < tx_n.nonce
var genAddress = rapid.Custom(func(t *rapid.T) simtypes.Account {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(rapid.Int64().Draw(t, "seed for account"))), 1)
return accounts[0]
})
func testMempoolProperties(t *rapid.T) {
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
mp := mempool.NewSenderNonceMempool()
genMultipleAddress := rapid.SliceOfNDistinct(genAddress, 1, 10, func(acc simtypes.Account) string {
return acc.Address.String()
})
accounts := genMultipleAddress.Draw(t, "address")
genTx := rapid.Custom(func(t *rapid.T) testTx {
return testTx{
priority: rapid.Int64Range(0, 1000).Draw(t, "priority"),
nonce: rapid.Uint64().Draw(t, "nonce"),
address: rapid.SampledFrom(accounts).Draw(t, "acc").Address,
}
})
genMultipleTX := rapid.SliceOfN(genTx, 1, 500)
txs := genMultipleTX.Draw(t, "txs")
senderTxRaw := getSenderTxMap(txs)
for _, tx := range txs {
err := mp.Insert(ctx, tx)
require.NoError(t, err)
}
iter := mp.Select(ctx, nil)
orderTx := fetchAllTxs(iter)
senderTxOrdered := getSenderTxMap(orderTx)
for key := range senderTxOrdered {
ordered, found := senderTxOrdered[key]
require.True(t, found)
raw, found := senderTxRaw[key]
require.True(t, found)
rawSet := mergeByNonce(raw)
sort.Slice(rawSet, func(i, j int) bool { return rawSet[i].nonce < rawSet[j].nonce })
require.Equal(t, rawSet, ordered)
}
}
func (s *MempoolTestSuite) TestProperties() {
t := s.T()
rapid.Check(t, testMempoolProperties)
}
func getSenderTxMap(txs []testTx) map[string][]testTx {
senderTxs := make(map[string][]testTx)
for _, tx := range txs {
stx, found := senderTxs[tx.address.String()]
if !found {
stx = make([]testTx, 0)
}
stx = append(stx, tx)
senderTxs[tx.address.String()] = stx
}
return senderTxs
}
func fetchAllTxs(iterator mempool.Iterator) []testTx {
var txs []testTx
for iterator != nil {
tx := iterator.Tx()
txs = append(txs, tx.(testTx))
i := iterator.Next()
iterator = i
}
return txs
}
func mergeByNonce(raw []testTx) []testTx {
rawMap := make(map[uint64]testTx)
for _, v := range raw {
rawMap[v.nonce] = v
}
result := make([]testTx, 0)
for _, v := range rawMap {
result = append(result, v)
}
return result
}

View File

@ -10,6 +10,7 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"
simtypes "github.com/cosmos/cosmos-sdk/types/simulation"
)
@ -19,12 +20,12 @@ func (s *MempoolTestSuite) TestTxOrder() {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 5)
sa := accounts[0].Address
sb := accounts[1].Address
sc := accounts[2].Address
tests := []struct {
txs []txSpec
order []int
fail bool
seed int64
}{
{
txs: []txSpec{
@ -35,6 +36,8 @@ func (s *MempoolTestSuite) TestTxOrder() {
{p: 20, n: 1, a: sa},
},
order: []int{3, 4, 2, 1, 0},
// Index order base on seed 0: 0 0 1 0 1 0 0
seed: 0,
},
{
txs: []txSpec{
@ -45,7 +48,9 @@ func (s *MempoolTestSuite) TestTxOrder() {
{p: 5, n: 1, a: sb},
{p: 8, n: 2, a: sb},
},
order: []int{3, 0, 4, 1, 5, 2},
order: []int{3, 4, 0, 5, 1, 2},
// Index order base on seed 0: 0 0 1 0 1 0 0
seed: 0,
},
{
txs: []txSpec{
@ -54,6 +59,8 @@ func (s *MempoolTestSuite) TestTxOrder() {
{p: 20, n: 1, a: sa},
},
order: []int{1, 2, 0},
// Index order base on seed 0: 0 0 1 0 1 0 0
seed: 0,
},
{
txs: []txSpec{
@ -63,7 +70,9 @@ func (s *MempoolTestSuite) TestTxOrder() {
{p: 15, n: 1, a: sb},
{p: 21, n: 2, a: sb},
},
order: []int{3, 2, 4, 1, 0},
order: []int{3, 4, 2, 1, 0},
// Index order base on seed 0: 0 0 1 0 1 0 0
seed: 0,
},
{
txs: []txSpec{
@ -73,7 +82,9 @@ func (s *MempoolTestSuite) TestTxOrder() {
{p: 15, n: 1, a: sb},
{p: 8, n: 2, a: sb},
},
order: []int{3, 2, 4, 1, 0},
order: []int{3, 4, 2, 1, 0},
// Index order base on seed 0: 0 0 1 0 1 0 0
seed: 0,
},
{
txs: []txSpec{
@ -85,21 +96,9 @@ func (s *MempoolTestSuite) TestTxOrder() {
{p: 6, a: sa, n: 3},
{p: 4, a: sb, n: 3},
},
order: []int{4, 3, 1, 2, 0, 6, 5},
},
{
txs: []txSpec{
{p: 30, n: 2, a: sa},
{p: 20, a: sb, n: 1},
{p: 15, a: sa, n: 1},
{p: 10, a: sa, n: 0},
{p: 8, a: sb, n: 0},
{p: 6, a: sa, n: 3},
{p: 4, a: sb, n: 3},
{p: 2, a: sc, n: 0},
{p: 7, a: sc, n: 3},
},
order: []int{4, 3, 7, 1, 2, 0, 6, 5, 8},
order: []int{4, 1, 3, 6, 2, 0, 5},
// Index order base on seed 0: 0 0 1 0 1 0 1 1 0
seed: 0,
},
{
txs: []txSpec{
@ -108,83 +107,14 @@ func (s *MempoolTestSuite) TestTxOrder() {
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sb},
},
order: []int{2, 0, 3, 1},
},
{
// if all txs have the same priority they will be ordered lexically sender address, and nonce with the
// sender.
txs: []txSpec{
{p: 10, n: 7, a: sc},
{p: 10, n: 8, a: sc},
{p: 10, n: 9, a: sc},
{p: 10, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 10, n: 3, a: sa},
{p: 10, n: 4, a: sb},
{p: 10, n: 5, a: sb},
{p: 10, n: 6, a: sb},
},
order: []int{3, 4, 5, 6, 7, 8, 0, 1, 2},
},
/*
The next 4 tests are different permutations of the same set:
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 20, n: 2, a: sb},
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sc},
{p: 5, n: 1, a: sc},
which exercises the actions required to resolve priority ties.
*/
{
txs: []txSpec{
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sb},
},
order: []int{2, 0, 3, 1},
},
{
txs: []txSpec{
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 20, n: 2, a: sb},
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sc},
{p: 5, n: 1, a: sc},
},
order: []int{3, 0, 5, 2, 1, 4},
},
{
txs: []txSpec{
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 5, n: 1, a: sb},
{p: 20, n: 2, a: sb},
{p: 5, n: 1, a: sc},
{p: 99, n: 2, a: sc},
},
order: []int{2, 0, 4, 3, 1, 5},
},
{
txs: []txSpec{
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 5, n: 1, a: sc},
{p: 20, n: 2, a: sc},
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sb},
},
order: []int{4, 0, 2, 5, 1, 3},
order: []int{2, 3, 0, 1},
// Index order base on seed 0: 0 0 1 0 1 0 1 1 0
seed: 0,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
pool := s.mempool
pool := mempool.NewSenderNonceMempoolWithSeed(tt.seed)
// create test txs and insert into mempool
for i, ts := range tt.txs {
tx := testTx{id: i, priority: int64(ts.p), nonce: uint64(ts.n), address: ts.a}
@ -202,7 +132,6 @@ func (s *MempoolTestSuite) TestTxOrder() {
for _, tx := range orderedTxs {
require.NoError(t, pool.Remove(tx))
}
require.Equal(t, tt.order, txOrder)
require.Equal(t, 0, pool.CountTx())
})