From 77660ec452b6365a8b6c09604c827ab7de38595d Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 21 Feb 2023 21:21:24 -0500 Subject: [PATCH] chore: UX Mempool Tweaks (#15121) --- types/mempool/priority_nonce.go | 106 +++++++++++++++++---------- types/mempool/priority_nonce_test.go | 45 ++++++++++-- types/mempool/sender_nonce.go | 55 ++++++++------ 3 files changed, 139 insertions(+), 67 deletions(-) diff --git a/types/mempool/priority_nonce.go b/types/mempool/priority_nonce.go index d5cee30286..20e331bf19 100644 --- a/types/mempool/priority_nonce.go +++ b/types/mempool/priority_nonce.go @@ -12,18 +12,18 @@ import ( ) var ( - _ Mempool = (*priorityNonceMempool)(nil) - _ Iterator = (*priorityNonceIterator)(nil) + _ Mempool = (*PriorityNonceMempool)(nil) + _ Iterator = (*PriorityNonceIterator)(nil) ) -// priorityNonceMempool is a mempool implementation that stores txs +// PriorityNonceMempool is a mempool implementation that stores txs // in a partially ordered set by 2 dimensions: priority, and sender-nonce // (sequence number). Internally it uses one priority ordered skip list and one // skip list per sender ordered by sender-nonce (sequence number). When there // are multiple txs from the same sender, they are not always comparable by // priority to other sender txs and must be partially ordered by both sender-nonce // and priority. -type priorityNonceMempool struct { +type PriorityNonceMempool struct { priorityIndex *skiplist.SkipList priorityCounts map[int64]int senderIndices map[string]*skiplist.SkipList @@ -33,12 +33,12 @@ type priorityNonceMempool struct { maxTx int } -type priorityNonceIterator struct { +type PriorityNonceIterator struct { senderCursors map[string]*skiplist.Element nextPriority int64 sender string priorityNode *skiplist.Element - mempool *priorityNonceMempool + mempool *PriorityNonceMempool } // txMeta stores transaction metadata used in indices @@ -67,15 +67,17 @@ func txMetaLess(a, b any) int { return res } - // weight is used as a tiebreaker for transactions with the same priority. weight is calculated in a single - // pass in .Select(...) and so will be 0 on .Insert(...) + // Weight is used as a tiebreaker for transactions with the same priority. + // Weight is calculated in a single pass in .Select(...) and so will be 0 + // on .Insert(...). res = skiplist.Int64.Compare(keyA.weight, keyB.weight) if res != 0 { return res } - // Because weight will be 0 on .Insert(...), we must also compare sender and nonce to resolve priority collisions. - // If we didn't then transactions with the same priority would overwrite each other in the priority index. + // Because weight will be 0 on .Insert(...), we must also compare sender and + // nonce to resolve priority collisions. If we didn't then transactions with + // the same priority would overwrite each other in the priority index. res = skiplist.String.Compare(keyA.sender, keyB.sender) if res != 0 { return res @@ -84,30 +86,33 @@ func txMetaLess(a, b any) int { return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce) } -type PriorityNonceMempoolOption func(*priorityNonceMempool) +type PriorityNonceMempoolOption func(*PriorityNonceMempool) -// PriorityNonceWithOnRead sets a callback to be called when a tx is read from the mempool. +// PriorityNonceWithOnRead sets a callback to be called when a tx is read from +// the mempool. func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption { - return func(mp *priorityNonceMempool) { + return func(mp *PriorityNonceMempool) { mp.onRead = onRead } } -// PriorityNonceWithTxReplacement sets a callback to be called when duplicated transaction nonce detected during mempool insert. -// Application can define a transaction replacement rule based on tx priority or certain transaction fields. +// PriorityNonceWithTxReplacement sets a callback to be called when duplicated +// transaction nonce detected during mempool insert. An application can define a +// transaction replacement rule based on tx priority or certain transaction fields. func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption { - return func(mp *priorityNonceMempool) { + return func(mp *PriorityNonceMempool) { mp.txReplacement = txReplacementRule } } -// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the mempool with the semantics: +// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the +// mempool with the semantics: // // <0: disabled, `Insert` is a no-op // 0: unlimited // >0: maximum number of transactions allowed func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption { - return func(mp *priorityNonceMempool) { + return func(mp *PriorityNonceMempool) { mp.maxTx = maxTx } } @@ -119,8 +124,8 @@ func DefaultPriorityMempool() Mempool { // NewPriorityMempool returns the SDK's default mempool implementation which // returns txs in a partial order by 2 dimensions; priority, and sender-nonce. -func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool { - mp := &priorityNonceMempool{ +func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempool { + mp := &PriorityNonceMempool{ priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)), priorityCounts: make(map[int64]int), senderIndices: make(map[string]*skiplist.SkipList), @@ -134,6 +139,19 @@ func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool { return mp } +// NextSenderTx returns the next transaction for a given sender by nonce order, +// i.e. the next valid transaction for the sender. If no such transaction exists, +// nil will be returned. +func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx { + senderIndex, ok := mp.senderIndices[sender] + if !ok { + return nil + } + + cursor := senderIndex.Front() + return cursor.Value.(sdk.Tx) +} + // Insert attempts to insert a Tx into the app-side mempool in O(log n) time, // returning an error if unsuccessful. Sender and nonce are derived from the // transaction's first signature. @@ -143,7 +161,7 @@ func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool { // // Inserting a duplicate tx with a different priority overwrites the existing tx, // changing the total order of the mempool. -func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { +func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx { return ErrMempoolTxMaxCapacity } else if mp.maxTx < 0 { @@ -161,7 +179,7 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { sdkContext := sdk.UnwrapSDKContext(ctx) priority := sdkContext.Priority() sig := sigs[0] - sender := sig.PubKey.Address().String() + sender := sdk.AccAddress(sig.PubKey.Address()).String() nonce := sig.Sequence key := txMeta{nonce: nonce, priority: priority, sender: sender} @@ -215,7 +233,7 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { return nil } -func (i *priorityNonceIterator) iteratePriority() Iterator { +func (i *PriorityNonceIterator) iteratePriority() Iterator { // beginning of priority iteration if i.priorityNode == nil { i.priorityNode = i.mempool.priorityIndex.Front() @@ -240,7 +258,7 @@ func (i *priorityNonceIterator) iteratePriority() Iterator { return i.Next() } -func (i *priorityNonceIterator) Next() Iterator { +func (i *PriorityNonceIterator) Next() Iterator { if i.priorityNode == nil { return nil } @@ -258,14 +276,16 @@ func (i *priorityNonceIterator) Next() Iterator { if cursor == nil { return i.iteratePriority() } + key := cursor.Key().(txMeta) - // we've reached a transaction with a priority lower than the next highest priority in the pool + // We've reached a transaction with a priority lower than the next highest + // priority in the pool. if key.priority < i.nextPriority { return i.iteratePriority() } else if key.priority == i.nextPriority { - // weight is incorporated into the priority index key only (not sender index) so we must fetch it here - // from the scores map. + // Weight is incorporated into the priority index key only (not sender index) + // so we must fetch it here from the scores map. weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight if weight < i.priorityNode.Next().Key().(txMeta).weight { return i.iteratePriority() @@ -276,7 +296,7 @@ func (i *priorityNonceIterator) Next() Iterator { return i } -func (i *priorityNonceIterator) Tx() sdk.Tx { +func (i *PriorityNonceIterator) Tx() sdk.Tx { return i.senderCursors[i.sender].Value.(sdk.Tx) } @@ -286,14 +306,14 @@ func (i *priorityNonceIterator) Tx() sdk.Tx { // // The maxBytes parameter defines the maximum number of bytes of transactions to // return. -func (mp *priorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { +func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { if mp.priorityIndex.Len() == 0 { return nil } mp.reorderPriorityTies() - iterator := &priorityNonceIterator{ + iterator := &PriorityNonceIterator{ mempool: mp, senderCursors: make(map[string]*skiplist.Element), } @@ -307,8 +327,9 @@ type reorderKey struct { tx sdk.Tx } -func (mp *priorityNonceMempool) reorderPriorityTies() { +func (mp *PriorityNonceMempool) reorderPriorityTies() { node := mp.priorityIndex.Front() + var reordering []reorderKey for node != nil { key := node.Key().(txMeta) @@ -317,6 +338,7 @@ func (mp *priorityNonceMempool) reorderPriorityTies() { newKey.weight = senderWeight(key.senderElement) reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)}) } + node = node.Next() } @@ -328,13 +350,15 @@ func (mp *priorityNonceMempool) reorderPriorityTies() { } } -// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is defined as the first (nonce-wise) -// same sender tx with a priority not equal to t. It is used to resolve priority collisions, that is when 2 or more -// txs from different senders have the same priority. +// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is +// defined as the first (nonce-wise) same sender tx with a priority not equal to +// t. It is used to resolve priority collisions, that is when 2 or more txs from +// different senders have the same priority. func senderWeight(senderCursor *skiplist.Element) int64 { if senderCursor == nil { return 0 } + weight := senderCursor.Key().(txMeta).priority senderCursor = senderCursor.Next() for senderCursor != nil { @@ -342,6 +366,7 @@ func senderWeight(senderCursor *skiplist.Element) int64 { if p != weight { weight = p } + senderCursor = senderCursor.Next() } @@ -349,12 +374,13 @@ func senderWeight(senderCursor *skiplist.Element) int64 { } // CountTx returns the number of transactions in the mempool. -func (mp *priorityNonceMempool) CountTx() int { +func (mp *PriorityNonceMempool) CountTx() int { return mp.priorityIndex.Len() } -// Remove removes a transaction from the mempool in O(log n) time, returning an error if unsuccessful. -func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error { +// Remove removes a transaction from the mempool in O(log n) time, returning an +// error if unsuccessful. +func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() if err != nil { return err @@ -364,7 +390,7 @@ func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error { } sig := sigs[0] - sender := sig.PubKey.Address().String() + sender := sdk.AccAddress(sig.PubKey.Address()).String() nonce := sig.Sequence scoreKey := txMeta{nonce: nonce, sender: sender} @@ -388,7 +414,7 @@ func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error { } func IsEmpty(mempool Mempool) error { - mp := mempool.(*priorityNonceMempool) + mp := mempool.(*PriorityNonceMempool) if mp.priorityIndex.Len() != 0 { return fmt.Errorf("priorityIndex not empty") } @@ -397,6 +423,7 @@ func IsEmpty(mempool Mempool) error { for k := range mp.priorityCounts { countKeys = append(countKeys, k) } + for _, k := range countKeys { if mp.priorityCounts[k] != 0 { return fmt.Errorf("priorityCounts not zero at %v, got %v", k, mp.priorityCounts[k]) @@ -407,6 +434,7 @@ func IsEmpty(mempool Mempool) error { for k := range mp.senderIndices { senderKeys = append(senderKeys, k) } + for _, k := range senderKeys { if mp.senderIndices[k].Len() != 0 { return fmt.Errorf("senderIndex not empty for sender %v", k) diff --git a/types/mempool/priority_nonce_test.go b/types/mempool/priority_nonce_test.go index 5ae4b90d4d..88910c36c0 100644 --- a/types/mempool/priority_nonce_test.go +++ b/types/mempool/priority_nonce_test.go @@ -157,8 +157,8 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() { order: []int{0, 1, 2, 3}, }, { - // if all txs have the same priority they will be ordered lexically sender address, and nonce with the - // sender. + // If all txs have the same priority they will be ordered lexically sender + // address, and nonce with the sender. txs: []txSpec{ {p: 10, n: 7, a: sc}, {p: 10, n: 8, a: sc}, @@ -170,12 +170,12 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() { {p: 10, n: 5, a: sb}, {p: 10, n: 6, a: sb}, }, - order: []int{0, 1, 2, 3, 4, 5, 6, 7, 8}, + order: []int{3, 4, 5, 6, 7, 8, 0, 1, 2}, }, /* The next 4 tests are different permutations of the same set: - {p: 5, n: 1, a: sa}, + {p: 5, n: 1, a: sa}, {p: 10, n: 2, a: sa}, {p: 20, n: 2, a: sb}, {p: 5, n: 1, a: sb}, @@ -240,11 +240,12 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() { } orderedTxs := fetchTxs(pool.Select(ctx, nil), 1000) + var txOrder []int for _, tx := range orderedTxs { txOrder = append(txOrder, tx.(testTx).id) - fmt.Println(tx) } + require.Equal(t, tt.order, txOrder) require.NoError(t, validateOrder(orderedTxs)) @@ -582,7 +583,37 @@ func TestTxOrderN(t *testing.T) { } } -func TestTxLimit(t *testing.T) { +func TestPriorityNonceMempool_NextSenderTx(t *testing.T) { + accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 2) + ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger()) + accA := accounts[0].Address + accB := accounts[1].Address + + mp := mempool.NewPriorityMempool() + + txs := []testTx{ + {priority: 20, nonce: 1, address: accA}, + {priority: 15, nonce: 2, address: accA}, + {priority: 66, nonce: 3, address: accA}, + {priority: 20, nonce: 4, address: accA}, + {priority: 88, nonce: 5, address: accA}, + } + + for i, tx := range txs { + c := ctx.WithPriority(tx.priority) + require.NoError(t, mp.Insert(c, tx)) + require.Equal(t, i+1, mp.CountTx()) + } + + tx := mp.NextSenderTx(accB.String()) + require.Nil(t, tx) + + tx = mp.NextSenderTx(accA.String()) + require.NotNil(t, tx) + require.Equal(t, txs[0], tx) +} + +func TestNextSenderTx_TxLimit(t *testing.T) { accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 2) ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger()) sa := accounts[0].Address @@ -639,7 +670,7 @@ func TestTxLimit(t *testing.T) { } } -func TestTxReplacement(t *testing.T) { +func TestNextSenderTx_TxReplacement(t *testing.T) { accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1) ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger()) sa := accounts[0].Address diff --git a/types/mempool/sender_nonce.go b/types/mempool/sender_nonce.go index 30194614f0..7f41482d69 100644 --- a/types/mempool/sender_nonce.go +++ b/types/mempool/sender_nonce.go @@ -14,13 +14,13 @@ import ( ) var ( - _ Mempool = (*senderNonceMempool)(nil) - _ Iterator = (*senderNonceMepoolIterator)(nil) + _ Mempool = (*SenderNonceMempool)(nil) + _ Iterator = (*senderNonceMempoolIterator)(nil) ) var DefaultMaxTx = 0 -// senderNonceMempool is a mempool that prioritizes transactions within a sender +// SenderNonceMempool is a mempool that prioritizes transactions within a sender // by nonce, the lowest first, but selects a random sender on each iteration. // The mempool is iterated by: // @@ -30,14 +30,14 @@ var DefaultMaxTx = 0 // // Note that PrepareProposal could choose to stop iteration before reaching the // end if maxBytes is reached. -type senderNonceMempool struct { +type SenderNonceMempool struct { senders map[string]*skiplist.SkipList rnd *rand.Rand maxTx int existingTx map[txKey]bool } -type SenderNonceOptions func(mp *senderNonceMempool) +type SenderNonceOptions func(mp *SenderNonceMempool) type txKey struct { address string @@ -46,10 +46,10 @@ type txKey struct { // NewSenderNonceMempool creates a new mempool that prioritizes transactions by // nonce, the lowest first, picking a random sender on each iteration. -func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool { +func NewSenderNonceMempool(opts ...SenderNonceOptions) *SenderNonceMempool { senderMap := make(map[string]*skiplist.SkipList) existingTx := make(map[txKey]bool) - snp := &senderNonceMempool{ + snp := &SenderNonceMempool{ senders: senderMap, maxTx: DefaultMaxTx, existingTx: existingTx, @@ -78,7 +78,7 @@ func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool { // random_seed := int64(1000) // NewSenderNonceMempool(SenderNonceSeedTxOpt(random_seed)) func SenderNonceSeedOpt(seed int64) SenderNonceOptions { - return func(snp *senderNonceMempool) { + return func(snp *SenderNonceMempool) { snp.setSeed(seed) } } @@ -90,19 +90,32 @@ func SenderNonceSeedOpt(seed int64) SenderNonceOptions { // // NewSenderNonceMempool(SenderNonceMaxTxOpt(100)) func SenderNonceMaxTxOpt(maxTx int) SenderNonceOptions { - return func(snp *senderNonceMempool) { + return func(snp *SenderNonceMempool) { snp.maxTx = maxTx } } -func (snm *senderNonceMempool) setSeed(seed int64) { +func (snm *SenderNonceMempool) setSeed(seed int64) { s1 := rand.NewSource(seed) snm.rnd = rand.New(s1) //#nosec // math/rand is seeded from crypto/rand by default } +// NextSenderTx returns the next transaction for a given sender by nonce order, +// i.e. the next valid transaction for the sender. If no such transaction exists, +// nil will be returned. +func (mp *SenderNonceMempool) NextSenderTx(sender string) sdk.Tx { + senderIndex, ok := mp.senders[sender] + if !ok { + return nil + } + + cursor := senderIndex.Front() + return cursor.Value.(sdk.Tx) +} + // Insert adds a tx to the mempool. It returns an error if the tx does not have // at least one signer. Note, priority is ignored. -func (snm *senderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { +func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx { return ErrMempoolTxMaxCapacity } @@ -119,7 +132,7 @@ func (snm *senderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { } sig := sigs[0] - sender := sig.PubKey.Address().String() + sender := sdk.AccAddress(sig.PubKey.Address()).String() nonce := sig.Sequence senderTxs, found := snm.senders[sender] @@ -138,7 +151,7 @@ func (snm *senderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { // Select returns an iterator ordering transactions the mempool with the lowest // nonce of a random selected sender first. -func (snm *senderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { +func (snm *SenderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { var senders []string senderCursors := make(map[string]*skiplist.Element) @@ -157,7 +170,7 @@ func (snm *senderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { s = s.Next() } - iter := &senderNonceMepoolIterator{ + iter := &senderNonceMempoolIterator{ senders: senders, rnd: snm.rnd, senderCursors: senderCursors, @@ -167,13 +180,13 @@ func (snm *senderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { } // CountTx returns the total count of txs in the mempool. -func (snm *senderNonceMempool) CountTx() int { +func (snm *SenderNonceMempool) CountTx() int { return len(snm.existingTx) } // Remove removes a tx from the mempool. It returns an error if the tx does not // have at least one signer or the tx was not found in the pool. -func (snm *senderNonceMempool) Remove(tx sdk.Tx) error { +func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error { sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() if err != nil { return err @@ -183,7 +196,7 @@ func (snm *senderNonceMempool) Remove(tx sdk.Tx) error { } sig := sigs[0] - sender := sig.PubKey.Address().String() + sender := sdk.AccAddress(sig.PubKey.Address()).String() nonce := sig.Sequence senderTxs, found := snm.senders[sender] @@ -206,7 +219,7 @@ func (snm *senderNonceMempool) Remove(tx sdk.Tx) error { return nil } -type senderNonceMepoolIterator struct { +type senderNonceMempoolIterator struct { rnd *rand.Rand currentTx *skiplist.Element senders []string @@ -215,7 +228,7 @@ type senderNonceMepoolIterator struct { // Next returns the next iterator state which will contain a tx with the next // smallest nonce of a randomly selected sender. -func (i *senderNonceMepoolIterator) Next() Iterator { +func (i *senderNonceMempoolIterator) Next() Iterator { for len(i.senders) > 0 { senderIndex := i.rnd.Intn(len(i.senders)) sender := i.senders[senderIndex] @@ -231,7 +244,7 @@ func (i *senderNonceMepoolIterator) Next() Iterator { i.senders = removeAtIndex(i.senders, senderIndex) } - return &senderNonceMepoolIterator{ + return &senderNonceMempoolIterator{ senders: i.senders, currentTx: senderCursor, rnd: i.rnd, @@ -242,7 +255,7 @@ func (i *senderNonceMepoolIterator) Next() Iterator { return nil } -func (i *senderNonceMepoolIterator) Tx() sdk.Tx { +func (i *senderNonceMempoolIterator) Tx() sdk.Tx { return i.currentTx.Value.(sdk.Tx) }