feat: Priority & sender-nonce mempool implementation (#13262)

Co-authored-by: Jeancarlo <jeancarlobarrios@gmail.com>
Co-authored-by: Jeancarlo Barrios <JeancarloBarrios@users.noreply.github.com>
Co-authored-by: Amaury <1293565+amaurym@users.noreply.github.com>
Co-authored-by: Facundo Medica <14063057+facundomedica@users.noreply.github.com>
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
Co-authored-by: Aleksandr Bezobchuk <aleks.bezobchuk@gmail.com>
This commit is contained in:
Matt Kocubinski 2022-12-12 19:16:18 -06:00 committed by GitHub
parent f13afd12ed
commit c6c7eb9ef2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1117 additions and 5 deletions

View File

@ -48,6 +48,8 @@ type testTx struct {
priority int64
nonce uint64
address sdk.AccAddress
// useful for debugging
strAddress string
}
func (tx testTx) GetSigners() []sdk.AccAddress { panic("not implemented") }

View File

@ -0,0 +1,380 @@
package mempool
import (
"context"
"fmt"
"math"
"github.com/huandu/skiplist"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
)
var (
_ Mempool = (*priorityNonceMempool)(nil)
_ Iterator = (*priorityNonceIterator)(nil)
)
// priorityNonceMempool defines the SDK's default mempool implementation which stores
// txs in a partially ordered set by 2 dimensions: priority, and sender-nonce
// (sequence number). Internally it uses one priority ordered skip list and one
// skip list per sender ordered by sender-nonce (sequence number). When there
// are multiple txs from the same sender, they are not always comparable by
// priority to other sender txs and must be partially ordered by both sender-nonce
// and priority.
type priorityNonceMempool struct {
priorityIndex *skiplist.SkipList
priorityCounts map[int64]int
senderIndices map[string]*skiplist.SkipList
scores map[txMeta]txMeta
onRead func(tx sdk.Tx)
}
type priorityNonceIterator struct {
senderCursors map[string]*skiplist.Element
nextPriority int64
sender string
priorityNode *skiplist.Element
mempool *priorityNonceMempool
}
// txMeta stores transaction metadata used in indices
type txMeta struct {
// nonce is the sender's sequence number
nonce uint64
// priority is the transaction's priority
priority int64
// sender is the transaction's sender
sender string
// weight is the transaction's weight, used as a tiebreaker for transactions with the same priority
weight int64
// senderElement is a pointer to the transaction's element in the sender index
senderElement *skiplist.Element
}
// txMetaLess is a comparator for txKeys that first compares priority, then weight,
// then sender, then nonce, uniquely identifying a transaction.
//
// Note, txMetaLess is used as the comparator in the priority index.
func txMetaLess(a, b any) int {
keyA := a.(txMeta)
keyB := b.(txMeta)
res := skiplist.Int64.Compare(keyA.priority, keyB.priority)
if res != 0 {
return res
}
// weight is used as a tiebreaker for transactions with the same priority. weight is calculated in a single
// pass in .Select(...) and so will be 0 on .Insert(...)
res = skiplist.Int64.Compare(keyA.weight, keyB.weight)
if res != 0 {
return res
}
// Because weight will be 0 on .Insert(...), we must also compare sender and nonce to resolve priority collisions.
// If we didn't then transactions with the same priority would overwrite each other in the priority index.
res = skiplist.String.Compare(keyA.sender, keyB.sender)
if res != 0 {
return res
}
return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce)
}
type PriorityNonceMempoolOption func(*priorityNonceMempool)
// WithOnRead sets a callback to be called when a tx is read from the mempool.
func WithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption {
return func(mp *priorityNonceMempool) {
mp.onRead = onRead
}
}
// DefaultPriorityMempool returns a priorityNonceMempool with no options.
func DefaultPriorityMempool() Mempool {
return NewPriorityMempool()
}
// NewPriorityMempool returns the SDK's default mempool implementation which
// returns txs in a partial order by 2 dimensions; priority, and sender-nonce.
func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool {
mp := &priorityNonceMempool{
priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)),
priorityCounts: make(map[int64]int),
senderIndices: make(map[string]*skiplist.SkipList),
scores: make(map[txMeta]txMeta),
}
for _, opt := range opts {
opt(mp)
}
return mp
}
// Insert attempts to insert a Tx into the app-side mempool in O(log n) time,
// returning an error if unsuccessful. Sender and nonce are derived from the
// transaction's first signature.
//
// Transactions are unique by sender and nonce. Inserting a duplicate tx is an
// O(log n) no-op.
//
// Inserting a duplicate tx with a different priority overwrites the existing tx,
// changing the total order of the mempool.
func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
}
if len(sigs) == 0 {
return fmt.Errorf("tx must have at least one signer")
}
sdkContext := sdk.UnwrapSDKContext(ctx)
priority := sdkContext.Priority()
sig := sigs[0]
sender := sig.PubKey.Address().String()
nonce := sig.Sequence
key := txMeta{nonce: nonce, priority: priority, sender: sender}
senderIndex, ok := mp.senderIndices[sender]
if !ok {
senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
}))
// initialize sender index if not found
mp.senderIndices[sender] = senderIndex
}
mp.priorityCounts[priority]++
// Since senderIndex is scored by nonce, a changed priority will overwrite the
// existing key.
key.senderElement = senderIndex.Set(key, tx)
// Since mp.priorityIndex is scored by priority, then sender, then nonce, a
// changed priority will create a new key, so we must remove the old key and
// re-insert it to avoid having the same tx with different priorityIndex indexed
// twice in the mempool.
//
// This O(log n) remove operation is rare and only happens when a tx's priority
// changes.
sk := txMeta{nonce: nonce, sender: sender}
if oldScore, txExists := mp.scores[sk]; txExists {
mp.priorityIndex.Remove(txMeta{
nonce: nonce,
sender: sender,
priority: oldScore.priority,
weight: oldScore.weight,
})
mp.priorityCounts[oldScore.priority]--
}
mp.scores[sk] = txMeta{priority: priority}
mp.priorityIndex.Set(key, tx)
return nil
}
func (i *priorityNonceIterator) iteratePriority() Iterator {
// beginning of priority iteration
if i.priorityNode == nil {
i.priorityNode = i.mempool.priorityIndex.Front()
} else {
i.priorityNode = i.priorityNode.Next()
}
// end of priority iteration
if i.priorityNode == nil {
return nil
}
i.sender = i.priorityNode.Key().(txMeta).sender
nextPriorityNode := i.priorityNode.Next()
if nextPriorityNode != nil {
i.nextPriority = nextPriorityNode.Key().(txMeta).priority
} else {
i.nextPriority = math.MinInt64
}
return i.Next()
}
func (i *priorityNonceIterator) Next() Iterator {
if i.priorityNode == nil {
return nil
}
cursor, ok := i.senderCursors[i.sender]
if !ok {
// beginning of sender iteration
cursor = i.mempool.senderIndices[i.sender].Front()
} else {
// middle of sender iteration
cursor = cursor.Next()
}
// end of sender iteration
if cursor == nil {
return i.iteratePriority()
}
key := cursor.Key().(txMeta)
// we've reached a transaction with a priority lower than the next highest priority in the pool
if key.priority < i.nextPriority {
return i.iteratePriority()
} else if key.priority == i.nextPriority {
// weight is incorporated into the priority index key only (not sender index) so we must fetch it here
// from the scores map.
weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight
if weight < i.priorityNode.Next().Key().(txMeta).weight {
return i.iteratePriority()
}
}
i.senderCursors[i.sender] = cursor
return i
}
func (i *priorityNonceIterator) Tx() sdk.Tx {
return i.senderCursors[i.sender].Value.(sdk.Tx)
}
// Select returns a set of transactions from the mempool, ordered by priority
// and sender-nonce in O(n) time. The passed in list of transactions are ignored.
// This is a readonly operation, the mempool is not modified.
//
// The maxBytes parameter defines the maximum number of bytes of transactions to
// return.
func (mp *priorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
if mp.priorityIndex.Len() == 0 {
return nil
}
mp.reorderPriorityTies()
iterator := &priorityNonceIterator{
mempool: mp,
senderCursors: make(map[string]*skiplist.Element),
}
return iterator.iteratePriority()
}
type reorderKey struct {
deleteKey txMeta
insertKey txMeta
tx sdk.Tx
}
func (mp *priorityNonceMempool) reorderPriorityTies() {
node := mp.priorityIndex.Front()
var reordering []reorderKey
for node != nil {
key := node.Key().(txMeta)
if mp.priorityCounts[key.priority] > 1 {
newKey := key
newKey.weight = senderWeight(key.senderElement)
reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)})
}
node = node.Next()
}
for _, k := range reordering {
mp.priorityIndex.Remove(k.deleteKey)
delete(mp.scores, txMeta{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender})
mp.priorityIndex.Set(k.insertKey, k.tx)
mp.scores[txMeta{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey
}
}
// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is defined as the first (nonce-wise)
// same sender tx with a priority not equal to t. It is used to resolve priority collisions, that is when 2 or more
// txs from different senders have the same priority.
func senderWeight(senderCursor *skiplist.Element) int64 {
if senderCursor == nil {
return 0
}
weight := senderCursor.Key().(txMeta).priority
senderCursor = senderCursor.Next()
for senderCursor != nil {
p := senderCursor.Key().(txMeta).priority
if p != weight {
weight = p
}
senderCursor = senderCursor.Next()
}
return weight
}
// CountTx returns the number of transactions in the mempool.
func (mp *priorityNonceMempool) CountTx() int {
return mp.priorityIndex.Len()
}
// Remove removes a transaction from the mempool in O(log n) time, returning an error if unsuccessful.
func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
}
if len(sigs) == 0 {
return fmt.Errorf("attempted to remove a tx with no signatures")
}
sig := sigs[0]
sender := sig.PubKey.Address().String()
nonce := sig.Sequence
scoreKey := txMeta{nonce: nonce, sender: sender}
score, ok := mp.scores[scoreKey]
if !ok {
return ErrTxNotFound
}
tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight}
senderTxs, ok := mp.senderIndices[sender]
if !ok {
return fmt.Errorf("sender %s not found", sender)
}
mp.priorityIndex.Remove(tk)
senderTxs.Remove(tk)
delete(mp.scores, scoreKey)
mp.priorityCounts[score.priority]--
return nil
}
func IsEmpty(mempool Mempool) error {
mp := mempool.(*priorityNonceMempool)
if mp.priorityIndex.Len() != 0 {
return fmt.Errorf("priorityIndex not empty")
}
var countKeys []int64
for k := range mp.priorityCounts {
countKeys = append(countKeys, k)
}
for _, k := range countKeys {
if mp.priorityCounts[k] != 0 {
return fmt.Errorf("priorityCounts not zero at %v, got %v", k, mp.priorityCounts[k])
}
}
var senderKeys []string
for k := range mp.senderIndices {
senderKeys = append(senderKeys, k)
}
for _, k := range senderKeys {
if mp.senderIndices[k].Len() != 0 {
return fmt.Errorf("senderIndex not empty for sender %v", k)
}
}
return nil
}

View File

@ -0,0 +1,146 @@
# Priority sender-nonce mempool specification
[priorityNonceMempool](./priority_nonce.go) defines a mempool implementation which stores txs in a partially
ordered set by 2 dimensions: priority, and sender-nonce (sequence number). Internally it uses one priority
ordered skip list and one skip list per sender ordered by sender-nonce. When there are
multiple txs from the same sender, they are not always comparable by priority to other sender txs and must be
partially ordered by both sender-nonce and priority.
The follow rules are strictly observed while iterating the mempool to select transactions:
1) For a given sender their txs must be selected in nonce order.
2) A transaction with a higher priority is always selected before a transaction with a lower priority except
when to do so would violate sender-nonce order.
The observance of these rules leads to many interesting cases some of which are outlined below to give an
impression of the prioritization behavior of this mempool.
In each case mempool order is indicated by a graph where nodes are transactions and edges are dependencies described by
the two rules for transaction ordering above. The node label indicates the transaction's priority. Edges within a
sender satisfy rule 1 (nonce order). Edges between senders satisfy rule 2 (priority order). A topological sort of the
graph is a valid mempool order.
### Case 1
| Sender | Nonce | Priority |
|--------|-------|----------|
| A | 0 | 20 |
| A | 1 | 6 |
| A | 2 | 8 |
| A | 3 | 21 |
| B | 0 | 15 |
```mermaid
graph TB
subgraph Sender A
20-->6
6-->8
8-->21
end
subgraph Sender B
20-->15
15-->6
end
```
Mempool order: [20, 15, 6, 8, 21]
Sender A has 4 txs in the pool and sender B only 1. tx(priority=15) is selected before tx(priority=21) even
though it has a lower priority because it would violate either rule 1 or 2.
### Case 2
| Sender | Nonce | Priority |
|--------|-------|----------|
| A | 0 | 3 |
| A | 1 | 5 |
| A | 2 | 9 |
| B | 0 | 6 |
| B | 1 | 5 |
| B | 2 | 8 |
```mermaid
graph LR
subgraph Sender A
3-->5a[5]
5a-->9
end
subgraph Sender B
6-->5b[5]
6-->3
5b-->8
8-->3
end
```
Mempool order: [6, 5, 8, 3, 5, 9]
Although tx(priority=9) has the highest global priority it is selected last. This is due tx(priority=3)
gating 9's selection by rule 1.
### Case 3 - Priority ties
| Sender | Nonce | Priority |
|--------|-------|----------|
| A | 0 | 5 |
| A | 1 | 99 |
| B | 0 | 5 |
| B | 1 | 20 |
```mermaid
graph LR
subgraph Sender A
5a[5]-->99
end
subgraph Sender B
5b[5]-->20
99-->5b
5a-->5b
end
```
Mempool order: [5, 99, 5, 20]
This case shows how priority ties are handled. Tx(priority=5, sender=A) is prioritized before tx(priority=5, sender=B)
because of the transactions following them, tx(priority=99) must be selected before tx(priority=20) by rule 2.
### Case 4
| Sender | Nonce | Priority |
|--------|-------|----------|
| A | 0 | 10 |
| A | 1 | 15 |
| A | 2 | 30 |
| A | 3 | 6 |
| B | 0 | 8 |
| B | 1 | 20 |
| B | 3 | 4 |
| C | 0 | 2 |
| C | 3 | 7 |
```mermaid
graph TD
subgraph Sender A
10-->15
15-->30
30-->6
end
subgraph Sender B
8
8-->20
10-->8
30-->8
20-->4
20-->6
end
subgraph Sender C
2-->90
4-->2
6-->4
end
```
Mempool order: [10, 15, 30, 8, 20, 6, 4, 2, 90]
This case shows how the mempool handles a more complex graph with more priority edges between senders. Again we also demonstrate an idiosyncrasy of this nonce/priroity ordering scheme, tx(priority=90) is selected last because it is gated behind tx(priority=2) by nonce ordering.

View File

@ -0,0 +1,584 @@
package mempool_test
import (
"fmt"
"math"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"
simtypes "github.com/cosmos/cosmos-sdk/types/simulation"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
)
func TestOutOfOrder(t *testing.T) {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 2)
sa := accounts[0].Address
sb := accounts[1].Address
outOfOrders := [][]testTx{
{
{priority: 20, nonce: 1, address: sa},
{priority: 21, nonce: 4, address: sa},
{priority: 15, nonce: 1, address: sb},
{priority: 8, nonce: 3, address: sa},
{priority: 6, nonce: 2, address: sa},
},
{
{priority: 15, nonce: 1, address: sb},
{priority: 20, nonce: 1, address: sa},
{priority: 21, nonce: 4, address: sa},
{priority: 8, nonce: 3, address: sa},
{priority: 6, nonce: 2, address: sa},
}}
for _, outOfOrder := range outOfOrders {
var mtxs []sdk.Tx
for _, mtx := range outOfOrder {
mtxs = append(mtxs, mtx)
}
err := validateOrder(mtxs)
require.Error(t, err)
}
seed := time.Now().UnixNano()
t.Logf("running with seed: %d", seed)
randomTxs := genRandomTxs(seed, 1000, 10)
var rmtxs []sdk.Tx
for _, rtx := range randomTxs {
rmtxs = append(rmtxs, rtx)
}
require.Error(t, validateOrder(rmtxs))
}
func (s *MempoolTestSuite) TestPriorityNonceTxOrder() {
t := s.T()
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 5)
sa := accounts[0].Address
sb := accounts[1].Address
sc := accounts[2].Address
tests := []struct {
txs []txSpec
order []int
fail bool
}{
{
txs: []txSpec{
{p: 21, n: 4, a: sa},
{p: 8, n: 3, a: sa},
{p: 6, n: 2, a: sa},
{p: 15, n: 1, a: sb},
{p: 20, n: 1, a: sa},
},
order: []int{4, 3, 2, 1, 0},
},
{
txs: []txSpec{
{p: 3, n: 0, a: sa},
{p: 5, n: 1, a: sa},
{p: 9, n: 2, a: sa},
{p: 6, n: 0, a: sb},
{p: 5, n: 1, a: sb},
{p: 8, n: 2, a: sb},
},
order: []int{3, 4, 5, 0, 1, 2},
},
{
txs: []txSpec{
{p: 21, n: 4, a: sa},
{p: 15, n: 1, a: sb},
{p: 20, n: 1, a: sa},
},
order: []int{2, 0, 1},
},
{
txs: []txSpec{
{p: 50, n: 3, a: sa},
{p: 30, n: 2, a: sa},
{p: 10, n: 1, a: sa},
{p: 15, n: 1, a: sb},
{p: 21, n: 2, a: sb},
},
order: []int{3, 4, 2, 1, 0},
},
{
txs: []txSpec{
{p: 50, n: 3, a: sa},
{p: 10, n: 2, a: sa},
{p: 99, n: 1, a: sa},
{p: 15, n: 1, a: sb},
{p: 8, n: 2, a: sb},
},
order: []int{2, 3, 1, 0, 4},
},
{
txs: []txSpec{
{p: 30, a: sa, n: 2},
{p: 20, a: sb, n: 1},
{p: 15, a: sa, n: 1},
{p: 10, a: sa, n: 0},
{p: 8, a: sb, n: 0},
{p: 6, a: sa, n: 3},
{p: 4, a: sb, n: 3},
},
order: []int{3, 2, 0, 4, 1, 5, 6},
},
{
txs: []txSpec{
{p: 30, n: 2, a: sa},
{p: 20, a: sb, n: 1},
{p: 15, a: sa, n: 1},
{p: 10, a: sa, n: 0},
{p: 8, a: sb, n: 0},
{p: 6, a: sa, n: 3},
{p: 4, a: sb, n: 3},
{p: 2, a: sc, n: 0},
{p: 7, a: sc, n: 3},
},
order: []int{3, 2, 0, 4, 1, 5, 6, 7, 8},
},
{
txs: []txSpec{
{p: 6, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sb},
},
order: []int{0, 1, 2, 3},
},
{
// if all txs have the same priority they will be ordered lexically sender address, and nonce with the
// sender.
txs: []txSpec{
{p: 10, n: 7, a: sc},
{p: 10, n: 8, a: sc},
{p: 10, n: 9, a: sc},
{p: 10, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 10, n: 3, a: sa},
{p: 10, n: 4, a: sb},
{p: 10, n: 5, a: sb},
{p: 10, n: 6, a: sb},
},
order: []int{0, 1, 2, 3, 4, 5, 6, 7, 8},
},
/*
The next 4 tests are different permutations of the same set:
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 20, n: 2, a: sb},
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sc},
{p: 5, n: 1, a: sc},
which exercises the actions required to resolve priority ties.
*/
{
txs: []txSpec{
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sb},
},
order: []int{2, 3, 0, 1},
},
{
txs: []txSpec{
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 20, n: 2, a: sb},
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sc},
{p: 5, n: 1, a: sc},
},
order: []int{5, 4, 3, 2, 0, 1},
},
{
txs: []txSpec{
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 5, n: 1, a: sb},
{p: 20, n: 2, a: sb},
{p: 5, n: 1, a: sc},
{p: 99, n: 2, a: sc},
},
order: []int{4, 5, 2, 3, 0, 1},
},
{
txs: []txSpec{
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 5, n: 1, a: sc},
{p: 20, n: 2, a: sc},
{p: 5, n: 1, a: sb},
{p: 99, n: 2, a: sb},
},
order: []int{4, 5, 2, 3, 0, 1},
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
pool := mempool.NewPriorityMempool()
// create test txs and insert into mempool
for i, ts := range tt.txs {
tx := testTx{id: i, priority: int64(ts.p), nonce: uint64(ts.n), address: ts.a}
c := ctx.WithPriority(tx.priority)
err := pool.Insert(c, tx)
require.NoError(t, err)
}
orderedTxs := fetchTxs(pool.Select(ctx, nil), 1000)
var txOrder []int
for _, tx := range orderedTxs {
txOrder = append(txOrder, tx.(testTx).id)
fmt.Println(tx)
}
require.Equal(t, tt.order, txOrder)
require.NoError(t, validateOrder(orderedTxs))
for _, tx := range orderedTxs {
require.NoError(t, pool.Remove(tx))
}
require.NoError(t, mempool.IsEmpty(pool))
})
}
}
func (s *MempoolTestSuite) TestPriorityTies() {
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 3)
sa := accounts[0].Address
sb := accounts[1].Address
sc := accounts[2].Address
txSet := []txSpec{
{p: 5, n: 1, a: sc},
{p: 99, n: 2, a: sc},
{p: 5, n: 1, a: sb},
{p: 20, n: 2, a: sb},
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
}
for i := 0; i < 100; i++ {
s.mempool = mempool.NewPriorityMempool()
var shuffled []txSpec
for _, t := range txSet {
tx := txSpec{
p: t.p,
n: t.n,
a: t.a,
}
shuffled = append(shuffled, tx)
}
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
for id, ts := range shuffled {
tx := testTx{priority: int64(ts.p), nonce: uint64(ts.n), address: ts.a, id: id}
c := ctx.WithPriority(tx.priority)
err := s.mempool.Insert(c, tx)
s.NoError(err)
}
selected := fetchTxs(s.mempool.Select(ctx, nil), 1000)
var orderedTxs []txSpec
for _, tx := range selected {
ttx := tx.(testTx)
ts := txSpec{p: int(ttx.priority), n: int(ttx.nonce), a: ttx.address}
orderedTxs = append(orderedTxs, ts)
}
s.Equal(txSet, orderedTxs)
}
}
func (s *MempoolTestSuite) TestRandomTxOrderManyTimes() {
for i := 0; i < 3; i++ {
s.Run("TestRandomGeneratedTxs", func() {
s.TestRandomGeneratedTxs()
})
s.Run("TestRandomWalkTxs", func() {
s.TestRandomWalkTxs()
})
}
}
// validateOrder checks that the txs are ordered by priority and nonce
// in O(n^2) time by checking each tx against all the other txs
func validateOrder(mtxs []sdk.Tx) error {
iterations := 0
var itxs []txSpec
for i, mtx := range mtxs {
iterations++
tx := mtx.(testTx)
itxs = append(itxs, txSpec{p: int(tx.priority), n: int(tx.nonce), a: tx.address, i: i})
}
// Given 2 transactions t1 and t2, where t2.p > t1.p but t2.i < t1.i
// Then if t2.sender have the same sender then t2.nonce > t1.nonce
// or
// If t1 and t2 have different senders then there must be some t3 with
// t3.sender == t2.sender and t3.n < t2.n and t3.p <= t1.p
for _, a := range itxs {
for _, b := range itxs {
iterations++
// when b is before a
// when a is before b
if a.i < b.i {
// same sender
if a.a.Equals(b.a) {
// same sender
if a.n == b.n {
return fmt.Errorf("same sender tx have the same nonce\n%v\n%v", a, b)
}
if a.n > b.n {
return fmt.Errorf("same sender tx have wrong nonce order\n%v\n%v", a, b)
}
} else {
// different sender
if a.p < b.p {
// find a tx with same sender as b and lower nonce
found := false
for _, c := range itxs {
iterations++
if c.a.Equals(b.a) && c.n < b.n && c.p <= a.p {
found = true
break
}
}
if !found {
return fmt.Errorf("different sender tx have wrong order\n%v\n%v", b, a)
}
}
}
}
}
}
// fmt.Printf("validation in iterations: %d\n", iterations)
return nil
}
func (s *MempoolTestSuite) TestRandomGeneratedTxs() {
s.iterations = 0
s.mempool = mempool.NewPriorityMempool(mempool.WithOnRead(func(tx sdk.Tx) {
s.iterations++
}))
t := s.T()
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
seed := time.Now().UnixNano()
t.Logf("running with seed: %d", seed)
generated := genRandomTxs(seed, s.numTxs, s.numAccounts)
mp := s.mempool
for _, otx := range generated {
tx := testTx{id: otx.id, priority: otx.priority, nonce: otx.nonce, address: otx.address}
c := ctx.WithPriority(tx.priority)
err := mp.Insert(c, tx)
require.NoError(t, err)
}
selected := fetchTxs(mp.Select(ctx, nil), 100000)
for i, tx := range selected {
ttx := tx.(testTx)
sigs, _ := tx.(signing.SigVerifiableTx).GetSignaturesV2()
ttx.strAddress = sigs[0].PubKey.Address().String()
selected[i] = ttx
}
require.Equal(t, len(generated), len(selected))
start := time.Now()
require.NoError(t, validateOrder(selected))
duration := time.Since(start)
fmt.Printf("seed: %d completed in %d iterations; validation in %dms\n",
seed, s.iterations, duration.Milliseconds())
}
func (s *MempoolTestSuite) TestRandomWalkTxs() {
s.iterations = 0
s.mempool = mempool.NewPriorityMempool()
t := s.T()
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
seed := time.Now().UnixNano()
// interesting failing seeds:
// seed := int64(1663971399133628000)
// seed := int64(1663989445512438000)
//
t.Logf("running with seed: %d", seed)
ordered, shuffled := genOrderedTxs(seed, s.numTxs, s.numAccounts)
mp := s.mempool
for _, otx := range shuffled {
tx := testTx{id: otx.id, priority: otx.priority, nonce: otx.nonce, address: otx.address}
c := ctx.WithPriority(tx.priority)
err := mp.Insert(c, tx)
require.NoError(t, err)
}
require.Equal(t, s.numTxs, mp.CountTx())
selected := fetchTxs(mp.Select(ctx, nil), math.MaxInt)
require.Equal(t, len(ordered), len(selected))
var orderedStr, selectedStr string
for i := 0; i < s.numTxs; i++ {
otx := ordered[i]
stx := selected[i].(testTx)
orderedStr = fmt.Sprintf("%s\n%s, %d, %d; %d",
orderedStr, otx.address, otx.priority, otx.nonce, otx.id)
selectedStr = fmt.Sprintf("%s\n%s, %d, %d; %d",
selectedStr, stx.address, stx.priority, stx.nonce, stx.id)
}
require.Equal(t, s.numTxs, len(selected))
errMsg := fmt.Sprintf("Expected order: %v\nGot order: %v\nSeed: %v", orderedStr, selectedStr, seed)
start := time.Now()
require.NoError(t, validateOrder(selected), errMsg)
duration := time.Since(start)
t.Logf("seed: %d completed in %d iterations; validation in %dms\n",
seed, s.iterations, duration.Milliseconds())
}
func genRandomTxs(seed int64, countTx int, countAccount int) (res []testTx) {
maxPriority := 100
r := rand.New(rand.NewSource(seed))
accounts := simtypes.RandomAccounts(r, countAccount)
accountNonces := make(map[string]uint64)
for _, account := range accounts {
accountNonces[account.Address.String()] = 0
}
for i := 0; i < countTx; i++ {
addr := accounts[r.Intn(countAccount)].Address
priority := int64(r.Intn(maxPriority + 1))
nonce := accountNonces[addr.String()]
accountNonces[addr.String()] = nonce + 1
res = append(res, testTx{
priority: priority,
nonce: nonce,
address: addr,
id: i})
}
return res
}
// since there are multiple valid ordered graph traversals for a given set of txs strict
// validation against the ordered txs generated from this function is not possible as written
func genOrderedTxs(seed int64, maxTx int, numAcc int) (ordered []testTx, shuffled []testTx) {
r := rand.New(rand.NewSource(seed))
accountNonces := make(map[string]uint64)
prange := 10
randomAccounts := simtypes.RandomAccounts(r, numAcc)
for _, account := range randomAccounts {
accountNonces[account.Address.String()] = 0
}
getRandAccount := func(notAddress string) simtypes.Account {
for {
res := randomAccounts[r.Intn(len(randomAccounts))]
if res.Address.String() != notAddress {
return res
}
}
}
txCursor := int64(10000)
ptx := testTx{address: getRandAccount("").Address, nonce: 0, priority: txCursor}
samepChain := make(map[string]bool)
for i := 0; i < maxTx; {
var tx testTx
move := r.Intn(5)
switch move {
case 0:
// same sender, less p
nonce := ptx.nonce + 1
tx = testTx{nonce: nonce, address: ptx.address, priority: txCursor - int64(r.Intn(prange)+1)}
txCursor = tx.priority
case 1:
// same sender, same p
nonce := ptx.nonce + 1
tx = testTx{nonce: nonce, address: ptx.address, priority: ptx.priority}
case 2:
// same sender, greater p
nonce := ptx.nonce + 1
tx = testTx{nonce: nonce, address: ptx.address, priority: ptx.priority + int64(r.Intn(prange)+1)}
case 3:
// different sender, less p
sender := getRandAccount(ptx.address.String()).Address
nonce := accountNonces[sender.String()] + 1
tx = testTx{nonce: nonce, address: sender, priority: txCursor - int64(r.Intn(prange)+1)}
txCursor = tx.priority
case 4:
// different sender, same p
sender := getRandAccount(ptx.address.String()).Address
// disallow generating cycles of same p txs. this is an invalid processing order according to our
// algorithm decision.
if _, ok := samepChain[sender.String()]; ok {
continue
}
nonce := accountNonces[sender.String()] + 1
tx = testTx{nonce: nonce, address: sender, priority: txCursor}
samepChain[sender.String()] = true
}
tx.id = i
accountNonces[tx.address.String()] = tx.nonce
ordered = append(ordered, tx)
ptx = tx
i++
if move != 4 {
samepChain = make(map[string]bool)
}
}
for _, item := range ordered {
tx := testTx{
priority: item.priority,
nonce: item.nonce,
address: item.address,
id: item.id,
}
shuffled = append(shuffled, tx)
}
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
return ordered, shuffled
}
func TestTxOrderN(t *testing.T) {
numTx := 10
seed := time.Now().UnixNano()
ordered, shuffled := genOrderedTxs(seed, numTx, 3)
require.Equal(t, numTx, len(ordered))
require.Equal(t, numTx, len(shuffled))
fmt.Println("ordered")
for _, tx := range ordered {
fmt.Printf("%s, %d, %d\n", tx.address, tx.priority, tx.nonce)
}
fmt.Println("shuffled")
for _, tx := range shuffled {
fmt.Printf("%s, %d, %d\n", tx.address, tx.priority, tx.nonce)
}
}

View File

@ -3,7 +3,7 @@ package mempool_test
import (
"testing"
huandu "github.com/huandu/skiplist"
"github.com/huandu/skiplist"
"github.com/stretchr/testify/require"
)
@ -13,7 +13,7 @@ type collisionKey struct {
}
func TestSkipListCollisions(t *testing.T) {
integerList := huandu.New(huandu.Int)
integerList := skiplist.New(skiplist.Int)
integerList.Set(1, 1)
integerList.Set(2, 2)
@ -34,13 +34,13 @@ func TestSkipListCollisions(t *testing.T) {
require.Equal(t, 4, integerList.Get(1).Value)
// prove this again with a compound key
compoundList := huandu.New(huandu.LessThanFunc(func(x, y any) int {
compoundList := skiplist.New(skiplist.LessThanFunc(func(x, y any) int {
kx := x.(collisionKey)
ky := y.(collisionKey)
if kx.a == ky.a {
return huandu.Int.Compare(kx.b, ky.b)
return skiplist.Int.Compare(kx.b, ky.b)
}
return huandu.Int.Compare(kx.a, ky.a)
return skiplist.Int.Compare(kx.a, ky.a)
}))
compoundList.Set(collisionKey{a: 1, b: 1}, 1)