diff --git a/types/mempool/mempool_test.go b/types/mempool/mempool_test.go index f786eba1fa..e18504c0c5 100644 --- a/types/mempool/mempool_test.go +++ b/types/mempool/mempool_test.go @@ -48,6 +48,8 @@ type testTx struct { priority int64 nonce uint64 address sdk.AccAddress + // useful for debugging + strAddress string } func (tx testTx) GetSigners() []sdk.AccAddress { panic("not implemented") } diff --git a/types/mempool/priority_nonce.go b/types/mempool/priority_nonce.go new file mode 100644 index 0000000000..c6ac0e7eed --- /dev/null +++ b/types/mempool/priority_nonce.go @@ -0,0 +1,380 @@ +package mempool + +import ( + "context" + "fmt" + "math" + + "github.com/huandu/skiplist" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/x/auth/signing" +) + +var ( + _ Mempool = (*priorityNonceMempool)(nil) + _ Iterator = (*priorityNonceIterator)(nil) +) + +// priorityNonceMempool defines the SDK's default mempool implementation which stores +// txs in a partially ordered set by 2 dimensions: priority, and sender-nonce +// (sequence number). Internally it uses one priority ordered skip list and one +// skip list per sender ordered by sender-nonce (sequence number). When there +// are multiple txs from the same sender, they are not always comparable by +// priority to other sender txs and must be partially ordered by both sender-nonce +// and priority. +type priorityNonceMempool struct { + priorityIndex *skiplist.SkipList + priorityCounts map[int64]int + senderIndices map[string]*skiplist.SkipList + scores map[txMeta]txMeta + onRead func(tx sdk.Tx) +} + +type priorityNonceIterator struct { + senderCursors map[string]*skiplist.Element + nextPriority int64 + sender string + priorityNode *skiplist.Element + mempool *priorityNonceMempool +} + +// txMeta stores transaction metadata used in indices +type txMeta struct { + // nonce is the sender's sequence number + nonce uint64 + // priority is the transaction's priority + priority int64 + // sender is the transaction's sender + sender string + // weight is the transaction's weight, used as a tiebreaker for transactions with the same priority + weight int64 + // senderElement is a pointer to the transaction's element in the sender index + senderElement *skiplist.Element +} + +// txMetaLess is a comparator for txKeys that first compares priority, then weight, +// then sender, then nonce, uniquely identifying a transaction. +// +// Note, txMetaLess is used as the comparator in the priority index. +func txMetaLess(a, b any) int { + keyA := a.(txMeta) + keyB := b.(txMeta) + res := skiplist.Int64.Compare(keyA.priority, keyB.priority) + if res != 0 { + return res + } + + // weight is used as a tiebreaker for transactions with the same priority. weight is calculated in a single + // pass in .Select(...) and so will be 0 on .Insert(...) + res = skiplist.Int64.Compare(keyA.weight, keyB.weight) + if res != 0 { + return res + } + + // Because weight will be 0 on .Insert(...), we must also compare sender and nonce to resolve priority collisions. + // If we didn't then transactions with the same priority would overwrite each other in the priority index. + res = skiplist.String.Compare(keyA.sender, keyB.sender) + if res != 0 { + return res + } + + return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce) +} + +type PriorityNonceMempoolOption func(*priorityNonceMempool) + +// WithOnRead sets a callback to be called when a tx is read from the mempool. +func WithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption { + return func(mp *priorityNonceMempool) { + mp.onRead = onRead + } +} + +// DefaultPriorityMempool returns a priorityNonceMempool with no options. +func DefaultPriorityMempool() Mempool { + return NewPriorityMempool() +} + +// NewPriorityMempool returns the SDK's default mempool implementation which +// returns txs in a partial order by 2 dimensions; priority, and sender-nonce. +func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool { + mp := &priorityNonceMempool{ + priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)), + priorityCounts: make(map[int64]int), + senderIndices: make(map[string]*skiplist.SkipList), + scores: make(map[txMeta]txMeta), + } + + for _, opt := range opts { + opt(mp) + } + + return mp +} + +// Insert attempts to insert a Tx into the app-side mempool in O(log n) time, +// returning an error if unsuccessful. Sender and nonce are derived from the +// transaction's first signature. +// +// Transactions are unique by sender and nonce. Inserting a duplicate tx is an +// O(log n) no-op. +// +// Inserting a duplicate tx with a different priority overwrites the existing tx, +// changing the total order of the mempool. +func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return err + } + if len(sigs) == 0 { + return fmt.Errorf("tx must have at least one signer") + } + + sdkContext := sdk.UnwrapSDKContext(ctx) + priority := sdkContext.Priority() + sig := sigs[0] + sender := sig.PubKey.Address().String() + nonce := sig.Sequence + key := txMeta{nonce: nonce, priority: priority, sender: sender} + + senderIndex, ok := mp.senderIndices[sender] + if !ok { + senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int { + return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce) + })) + + // initialize sender index if not found + mp.senderIndices[sender] = senderIndex + } + + mp.priorityCounts[priority]++ + + // Since senderIndex is scored by nonce, a changed priority will overwrite the + // existing key. + key.senderElement = senderIndex.Set(key, tx) + + // Since mp.priorityIndex is scored by priority, then sender, then nonce, a + // changed priority will create a new key, so we must remove the old key and + // re-insert it to avoid having the same tx with different priorityIndex indexed + // twice in the mempool. + // + // This O(log n) remove operation is rare and only happens when a tx's priority + // changes. + sk := txMeta{nonce: nonce, sender: sender} + if oldScore, txExists := mp.scores[sk]; txExists { + mp.priorityIndex.Remove(txMeta{ + nonce: nonce, + sender: sender, + priority: oldScore.priority, + weight: oldScore.weight, + }) + mp.priorityCounts[oldScore.priority]-- + } + + mp.scores[sk] = txMeta{priority: priority} + mp.priorityIndex.Set(key, tx) + + return nil +} + +func (i *priorityNonceIterator) iteratePriority() Iterator { + // beginning of priority iteration + if i.priorityNode == nil { + i.priorityNode = i.mempool.priorityIndex.Front() + } else { + i.priorityNode = i.priorityNode.Next() + } + + // end of priority iteration + if i.priorityNode == nil { + return nil + } + + i.sender = i.priorityNode.Key().(txMeta).sender + + nextPriorityNode := i.priorityNode.Next() + if nextPriorityNode != nil { + i.nextPriority = nextPriorityNode.Key().(txMeta).priority + } else { + i.nextPriority = math.MinInt64 + } + + return i.Next() +} + +func (i *priorityNonceIterator) Next() Iterator { + if i.priorityNode == nil { + return nil + } + + cursor, ok := i.senderCursors[i.sender] + if !ok { + // beginning of sender iteration + cursor = i.mempool.senderIndices[i.sender].Front() + } else { + // middle of sender iteration + cursor = cursor.Next() + } + + // end of sender iteration + if cursor == nil { + return i.iteratePriority() + } + key := cursor.Key().(txMeta) + + // we've reached a transaction with a priority lower than the next highest priority in the pool + if key.priority < i.nextPriority { + return i.iteratePriority() + } else if key.priority == i.nextPriority { + // weight is incorporated into the priority index key only (not sender index) so we must fetch it here + // from the scores map. + weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight + if weight < i.priorityNode.Next().Key().(txMeta).weight { + return i.iteratePriority() + } + } + + i.senderCursors[i.sender] = cursor + return i +} + +func (i *priorityNonceIterator) Tx() sdk.Tx { + return i.senderCursors[i.sender].Value.(sdk.Tx) +} + +// Select returns a set of transactions from the mempool, ordered by priority +// and sender-nonce in O(n) time. The passed in list of transactions are ignored. +// This is a readonly operation, the mempool is not modified. +// +// The maxBytes parameter defines the maximum number of bytes of transactions to +// return. +func (mp *priorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { + if mp.priorityIndex.Len() == 0 { + return nil + } + + mp.reorderPriorityTies() + + iterator := &priorityNonceIterator{ + mempool: mp, + senderCursors: make(map[string]*skiplist.Element), + } + + return iterator.iteratePriority() +} + +type reorderKey struct { + deleteKey txMeta + insertKey txMeta + tx sdk.Tx +} + +func (mp *priorityNonceMempool) reorderPriorityTies() { + node := mp.priorityIndex.Front() + var reordering []reorderKey + for node != nil { + key := node.Key().(txMeta) + if mp.priorityCounts[key.priority] > 1 { + newKey := key + newKey.weight = senderWeight(key.senderElement) + reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)}) + } + node = node.Next() + } + + for _, k := range reordering { + mp.priorityIndex.Remove(k.deleteKey) + delete(mp.scores, txMeta{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender}) + mp.priorityIndex.Set(k.insertKey, k.tx) + mp.scores[txMeta{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey + } +} + +// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is defined as the first (nonce-wise) +// same sender tx with a priority not equal to t. It is used to resolve priority collisions, that is when 2 or more +// txs from different senders have the same priority. +func senderWeight(senderCursor *skiplist.Element) int64 { + if senderCursor == nil { + return 0 + } + weight := senderCursor.Key().(txMeta).priority + senderCursor = senderCursor.Next() + for senderCursor != nil { + p := senderCursor.Key().(txMeta).priority + if p != weight { + weight = p + } + senderCursor = senderCursor.Next() + } + + return weight +} + +// CountTx returns the number of transactions in the mempool. +func (mp *priorityNonceMempool) CountTx() int { + return mp.priorityIndex.Len() +} + +// Remove removes a transaction from the mempool in O(log n) time, returning an error if unsuccessful. +func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error { + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return err + } + if len(sigs) == 0 { + return fmt.Errorf("attempted to remove a tx with no signatures") + } + + sig := sigs[0] + sender := sig.PubKey.Address().String() + nonce := sig.Sequence + + scoreKey := txMeta{nonce: nonce, sender: sender} + score, ok := mp.scores[scoreKey] + if !ok { + return ErrTxNotFound + } + tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight} + + senderTxs, ok := mp.senderIndices[sender] + if !ok { + return fmt.Errorf("sender %s not found", sender) + } + + mp.priorityIndex.Remove(tk) + senderTxs.Remove(tk) + delete(mp.scores, scoreKey) + mp.priorityCounts[score.priority]-- + + return nil +} + +func IsEmpty(mempool Mempool) error { + mp := mempool.(*priorityNonceMempool) + if mp.priorityIndex.Len() != 0 { + return fmt.Errorf("priorityIndex not empty") + } + + var countKeys []int64 + for k := range mp.priorityCounts { + countKeys = append(countKeys, k) + } + for _, k := range countKeys { + if mp.priorityCounts[k] != 0 { + return fmt.Errorf("priorityCounts not zero at %v, got %v", k, mp.priorityCounts[k]) + } + } + + var senderKeys []string + for k := range mp.senderIndices { + senderKeys = append(senderKeys, k) + } + for _, k := range senderKeys { + if mp.senderIndices[k].Len() != 0 { + return fmt.Errorf("senderIndex not empty for sender %v", k) + } + } + + return nil +} diff --git a/types/mempool/priority_nonce_spec.md b/types/mempool/priority_nonce_spec.md new file mode 100644 index 0000000000..cc24fa1dae --- /dev/null +++ b/types/mempool/priority_nonce_spec.md @@ -0,0 +1,146 @@ +# Priority sender-nonce mempool specification + +[priorityNonceMempool](./priority_nonce.go) defines a mempool implementation which stores txs in a partially +ordered set by 2 dimensions: priority, and sender-nonce (sequence number). Internally it uses one priority +ordered skip list and one skip list per sender ordered by sender-nonce. When there are +multiple txs from the same sender, they are not always comparable by priority to other sender txs and must be +partially ordered by both sender-nonce and priority. + +The follow rules are strictly observed while iterating the mempool to select transactions: + +1) For a given sender their txs must be selected in nonce order. +2) A transaction with a higher priority is always selected before a transaction with a lower priority except + when to do so would violate sender-nonce order. + +The observance of these rules leads to many interesting cases some of which are outlined below to give an +impression of the prioritization behavior of this mempool. + +In each case mempool order is indicated by a graph where nodes are transactions and edges are dependencies described by +the two rules for transaction ordering above. The node label indicates the transaction's priority. Edges within a +sender satisfy rule 1 (nonce order). Edges between senders satisfy rule 2 (priority order). A topological sort of the +graph is a valid mempool order. + +### Case 1 + +| Sender | Nonce | Priority | +|--------|-------|----------| +| A | 0 | 20 | +| A | 1 | 6 | +| A | 2 | 8 | +| A | 3 | 21 | +| B | 0 | 15 | + +```mermaid +graph TB + subgraph Sender A + 20-->6 + 6-->8 + 8-->21 + end + subgraph Sender B + 20-->15 + 15-->6 + end +``` + +Mempool order: [20, 15, 6, 8, 21] + +Sender A has 4 txs in the pool and sender B only 1. tx(priority=15) is selected before tx(priority=21) even +though it has a lower priority because it would violate either rule 1 or 2. + +### Case 2 + +| Sender | Nonce | Priority | +|--------|-------|----------| +| A | 0 | 3 | +| A | 1 | 5 | +| A | 2 | 9 | +| B | 0 | 6 | +| B | 1 | 5 | +| B | 2 | 8 | + +```mermaid +graph LR + subgraph Sender A + 3-->5a[5] + 5a-->9 + end + subgraph Sender B + 6-->5b[5] + 6-->3 + 5b-->8 + 8-->3 + end +``` + +Mempool order: [6, 5, 8, 3, 5, 9] + +Although tx(priority=9) has the highest global priority it is selected last. This is due tx(priority=3) +gating 9's selection by rule 1. + +### Case 3 - Priority ties + +| Sender | Nonce | Priority | +|--------|-------|----------| +| A | 0 | 5 | +| A | 1 | 99 | +| B | 0 | 5 | +| B | 1 | 20 | + +```mermaid +graph LR + subgraph Sender A + 5a[5]-->99 + end + subgraph Sender B + 5b[5]-->20 + 99-->5b + 5a-->5b + end +``` + +Mempool order: [5, 99, 5, 20] + +This case shows how priority ties are handled. Tx(priority=5, sender=A) is prioritized before tx(priority=5, sender=B) +because of the transactions following them, tx(priority=99) must be selected before tx(priority=20) by rule 2. + +### Case 4 + +| Sender | Nonce | Priority | +|--------|-------|----------| +| A | 0 | 10 | +| A | 1 | 15 | +| A | 2 | 30 | +| A | 3 | 6 | +| B | 0 | 8 | +| B | 1 | 20 | +| B | 3 | 4 | +| C | 0 | 2 | +| C | 3 | 7 | + +```mermaid +graph TD + subgraph Sender A + 10-->15 + 15-->30 + 30-->6 + end + subgraph Sender B + 8 + 8-->20 + 10-->8 + 30-->8 + 20-->4 + 20-->6 + + end + subgraph Sender C + 2-->90 + 4-->2 + 6-->4 + end +``` + +Mempool order: [10, 15, 30, 8, 20, 6, 4, 2, 90] + +This case shows how the mempool handles a more complex graph with more priority edges between senders. Again we also demonstrate an idiosyncrasy of this nonce/priroity ordering scheme, tx(priority=90) is selected last because it is gated behind tx(priority=2) by nonce ordering. diff --git a/types/mempool/priority_nonce_test.go b/types/mempool/priority_nonce_test.go new file mode 100644 index 0000000000..f75b59b4a2 --- /dev/null +++ b/types/mempool/priority_nonce_test.go @@ -0,0 +1,584 @@ +package mempool_test + +import ( + "fmt" + "math" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" + simtypes "github.com/cosmos/cosmos-sdk/types/simulation" + "github.com/cosmos/cosmos-sdk/x/auth/signing" +) + +func TestOutOfOrder(t *testing.T) { + accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 2) + sa := accounts[0].Address + sb := accounts[1].Address + + outOfOrders := [][]testTx{ + { + {priority: 20, nonce: 1, address: sa}, + {priority: 21, nonce: 4, address: sa}, + {priority: 15, nonce: 1, address: sb}, + {priority: 8, nonce: 3, address: sa}, + {priority: 6, nonce: 2, address: sa}, + }, + { + {priority: 15, nonce: 1, address: sb}, + {priority: 20, nonce: 1, address: sa}, + {priority: 21, nonce: 4, address: sa}, + {priority: 8, nonce: 3, address: sa}, + {priority: 6, nonce: 2, address: sa}, + }} + + for _, outOfOrder := range outOfOrders { + var mtxs []sdk.Tx + for _, mtx := range outOfOrder { + mtxs = append(mtxs, mtx) + } + err := validateOrder(mtxs) + require.Error(t, err) + } + + seed := time.Now().UnixNano() + t.Logf("running with seed: %d", seed) + randomTxs := genRandomTxs(seed, 1000, 10) + var rmtxs []sdk.Tx + for _, rtx := range randomTxs { + rmtxs = append(rmtxs, rtx) + } + + require.Error(t, validateOrder(rmtxs)) + +} + +func (s *MempoolTestSuite) TestPriorityNonceTxOrder() { + t := s.T() + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 5) + sa := accounts[0].Address + sb := accounts[1].Address + sc := accounts[2].Address + + tests := []struct { + txs []txSpec + order []int + fail bool + }{ + { + txs: []txSpec{ + {p: 21, n: 4, a: sa}, + {p: 8, n: 3, a: sa}, + {p: 6, n: 2, a: sa}, + {p: 15, n: 1, a: sb}, + {p: 20, n: 1, a: sa}, + }, + order: []int{4, 3, 2, 1, 0}, + }, + { + txs: []txSpec{ + {p: 3, n: 0, a: sa}, + {p: 5, n: 1, a: sa}, + {p: 9, n: 2, a: sa}, + {p: 6, n: 0, a: sb}, + {p: 5, n: 1, a: sb}, + {p: 8, n: 2, a: sb}, + }, + order: []int{3, 4, 5, 0, 1, 2}, + }, + { + txs: []txSpec{ + {p: 21, n: 4, a: sa}, + {p: 15, n: 1, a: sb}, + {p: 20, n: 1, a: sa}, + }, + order: []int{2, 0, 1}, + }, + { + txs: []txSpec{ + {p: 50, n: 3, a: sa}, + {p: 30, n: 2, a: sa}, + {p: 10, n: 1, a: sa}, + {p: 15, n: 1, a: sb}, + {p: 21, n: 2, a: sb}, + }, + order: []int{3, 4, 2, 1, 0}, + }, + { + txs: []txSpec{ + {p: 50, n: 3, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 99, n: 1, a: sa}, + {p: 15, n: 1, a: sb}, + {p: 8, n: 2, a: sb}, + }, + order: []int{2, 3, 1, 0, 4}, + }, + { + txs: []txSpec{ + {p: 30, a: sa, n: 2}, + {p: 20, a: sb, n: 1}, + {p: 15, a: sa, n: 1}, + {p: 10, a: sa, n: 0}, + {p: 8, a: sb, n: 0}, + {p: 6, a: sa, n: 3}, + {p: 4, a: sb, n: 3}, + }, + order: []int{3, 2, 0, 4, 1, 5, 6}, + }, + { + txs: []txSpec{ + {p: 30, n: 2, a: sa}, + {p: 20, a: sb, n: 1}, + {p: 15, a: sa, n: 1}, + {p: 10, a: sa, n: 0}, + {p: 8, a: sb, n: 0}, + {p: 6, a: sa, n: 3}, + {p: 4, a: sb, n: 3}, + {p: 2, a: sc, n: 0}, + {p: 7, a: sc, n: 3}, + }, + order: []int{3, 2, 0, 4, 1, 5, 6, 7, 8}, + }, + { + txs: []txSpec{ + {p: 6, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sb}, + }, + order: []int{0, 1, 2, 3}, + }, + { + // if all txs have the same priority they will be ordered lexically sender address, and nonce with the + // sender. + txs: []txSpec{ + {p: 10, n: 7, a: sc}, + {p: 10, n: 8, a: sc}, + {p: 10, n: 9, a: sc}, + {p: 10, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 10, n: 3, a: sa}, + {p: 10, n: 4, a: sb}, + {p: 10, n: 5, a: sb}, + {p: 10, n: 6, a: sb}, + }, + order: []int{0, 1, 2, 3, 4, 5, 6, 7, 8}, + }, + /* + The next 4 tests are different permutations of the same set: + + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 20, n: 2, a: sb}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sc}, + {p: 5, n: 1, a: sc}, + + which exercises the actions required to resolve priority ties. + */ + { + txs: []txSpec{ + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sb}, + }, + order: []int{2, 3, 0, 1}, + }, + { + txs: []txSpec{ + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 20, n: 2, a: sb}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sc}, + {p: 5, n: 1, a: sc}, + }, + order: []int{5, 4, 3, 2, 0, 1}, + }, + { + txs: []txSpec{ + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 5, n: 1, a: sb}, + {p: 20, n: 2, a: sb}, + {p: 5, n: 1, a: sc}, + {p: 99, n: 2, a: sc}, + }, + order: []int{4, 5, 2, 3, 0, 1}, + }, + { + txs: []txSpec{ + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 5, n: 1, a: sc}, + {p: 20, n: 2, a: sc}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sb}, + }, + order: []int{4, 5, 2, 3, 0, 1}, + }, + } + for i, tt := range tests { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + pool := mempool.NewPriorityMempool() + + // create test txs and insert into mempool + for i, ts := range tt.txs { + tx := testTx{id: i, priority: int64(ts.p), nonce: uint64(ts.n), address: ts.a} + c := ctx.WithPriority(tx.priority) + err := pool.Insert(c, tx) + require.NoError(t, err) + } + + orderedTxs := fetchTxs(pool.Select(ctx, nil), 1000) + var txOrder []int + for _, tx := range orderedTxs { + txOrder = append(txOrder, tx.(testTx).id) + fmt.Println(tx) + } + require.Equal(t, tt.order, txOrder) + require.NoError(t, validateOrder(orderedTxs)) + + for _, tx := range orderedTxs { + require.NoError(t, pool.Remove(tx)) + } + + require.NoError(t, mempool.IsEmpty(pool)) + }) + } +} + +func (s *MempoolTestSuite) TestPriorityTies() { + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 3) + sa := accounts[0].Address + sb := accounts[1].Address + sc := accounts[2].Address + + txSet := []txSpec{ + {p: 5, n: 1, a: sc}, + {p: 99, n: 2, a: sc}, + {p: 5, n: 1, a: sb}, + {p: 20, n: 2, a: sb}, + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + } + + for i := 0; i < 100; i++ { + s.mempool = mempool.NewPriorityMempool() + var shuffled []txSpec + for _, t := range txSet { + tx := txSpec{ + p: t.p, + n: t.n, + a: t.a, + } + shuffled = append(shuffled, tx) + } + rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) + + for id, ts := range shuffled { + tx := testTx{priority: int64(ts.p), nonce: uint64(ts.n), address: ts.a, id: id} + c := ctx.WithPriority(tx.priority) + err := s.mempool.Insert(c, tx) + s.NoError(err) + } + selected := fetchTxs(s.mempool.Select(ctx, nil), 1000) + var orderedTxs []txSpec + for _, tx := range selected { + ttx := tx.(testTx) + ts := txSpec{p: int(ttx.priority), n: int(ttx.nonce), a: ttx.address} + orderedTxs = append(orderedTxs, ts) + } + s.Equal(txSet, orderedTxs) + } +} + +func (s *MempoolTestSuite) TestRandomTxOrderManyTimes() { + for i := 0; i < 3; i++ { + s.Run("TestRandomGeneratedTxs", func() { + s.TestRandomGeneratedTxs() + }) + s.Run("TestRandomWalkTxs", func() { + s.TestRandomWalkTxs() + }) + } +} + +// validateOrder checks that the txs are ordered by priority and nonce +// in O(n^2) time by checking each tx against all the other txs +func validateOrder(mtxs []sdk.Tx) error { + iterations := 0 + var itxs []txSpec + for i, mtx := range mtxs { + iterations++ + tx := mtx.(testTx) + itxs = append(itxs, txSpec{p: int(tx.priority), n: int(tx.nonce), a: tx.address, i: i}) + } + + // Given 2 transactions t1 and t2, where t2.p > t1.p but t2.i < t1.i + // Then if t2.sender have the same sender then t2.nonce > t1.nonce + // or + // If t1 and t2 have different senders then there must be some t3 with + // t3.sender == t2.sender and t3.n < t2.n and t3.p <= t1.p + + for _, a := range itxs { + for _, b := range itxs { + iterations++ + // when b is before a + + // when a is before b + if a.i < b.i { + // same sender + if a.a.Equals(b.a) { + // same sender + if a.n == b.n { + return fmt.Errorf("same sender tx have the same nonce\n%v\n%v", a, b) + } + if a.n > b.n { + return fmt.Errorf("same sender tx have wrong nonce order\n%v\n%v", a, b) + } + } else { + // different sender + if a.p < b.p { + // find a tx with same sender as b and lower nonce + found := false + for _, c := range itxs { + iterations++ + if c.a.Equals(b.a) && c.n < b.n && c.p <= a.p { + found = true + break + } + } + if !found { + return fmt.Errorf("different sender tx have wrong order\n%v\n%v", b, a) + } + } + } + } + } + } + // fmt.Printf("validation in iterations: %d\n", iterations) + return nil +} + +func (s *MempoolTestSuite) TestRandomGeneratedTxs() { + s.iterations = 0 + s.mempool = mempool.NewPriorityMempool(mempool.WithOnRead(func(tx sdk.Tx) { + s.iterations++ + })) + t := s.T() + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + seed := time.Now().UnixNano() + + t.Logf("running with seed: %d", seed) + generated := genRandomTxs(seed, s.numTxs, s.numAccounts) + mp := s.mempool + + for _, otx := range generated { + tx := testTx{id: otx.id, priority: otx.priority, nonce: otx.nonce, address: otx.address} + c := ctx.WithPriority(tx.priority) + err := mp.Insert(c, tx) + require.NoError(t, err) + } + + selected := fetchTxs(mp.Select(ctx, nil), 100000) + for i, tx := range selected { + ttx := tx.(testTx) + sigs, _ := tx.(signing.SigVerifiableTx).GetSignaturesV2() + ttx.strAddress = sigs[0].PubKey.Address().String() + selected[i] = ttx + } + require.Equal(t, len(generated), len(selected)) + + start := time.Now() + require.NoError(t, validateOrder(selected)) + duration := time.Since(start) + + fmt.Printf("seed: %d completed in %d iterations; validation in %dms\n", + seed, s.iterations, duration.Milliseconds()) +} + +func (s *MempoolTestSuite) TestRandomWalkTxs() { + s.iterations = 0 + s.mempool = mempool.NewPriorityMempool() + + t := s.T() + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + + seed := time.Now().UnixNano() + // interesting failing seeds: + // seed := int64(1663971399133628000) + // seed := int64(1663989445512438000) + // + t.Logf("running with seed: %d", seed) + + ordered, shuffled := genOrderedTxs(seed, s.numTxs, s.numAccounts) + mp := s.mempool + + for _, otx := range shuffled { + tx := testTx{id: otx.id, priority: otx.priority, nonce: otx.nonce, address: otx.address} + c := ctx.WithPriority(tx.priority) + err := mp.Insert(c, tx) + require.NoError(t, err) + } + + require.Equal(t, s.numTxs, mp.CountTx()) + + selected := fetchTxs(mp.Select(ctx, nil), math.MaxInt) + require.Equal(t, len(ordered), len(selected)) + var orderedStr, selectedStr string + + for i := 0; i < s.numTxs; i++ { + otx := ordered[i] + stx := selected[i].(testTx) + orderedStr = fmt.Sprintf("%s\n%s, %d, %d; %d", + orderedStr, otx.address, otx.priority, otx.nonce, otx.id) + selectedStr = fmt.Sprintf("%s\n%s, %d, %d; %d", + selectedStr, stx.address, stx.priority, stx.nonce, stx.id) + } + + require.Equal(t, s.numTxs, len(selected)) + + errMsg := fmt.Sprintf("Expected order: %v\nGot order: %v\nSeed: %v", orderedStr, selectedStr, seed) + + start := time.Now() + require.NoError(t, validateOrder(selected), errMsg) + duration := time.Since(start) + + t.Logf("seed: %d completed in %d iterations; validation in %dms\n", + seed, s.iterations, duration.Milliseconds()) +} + +func genRandomTxs(seed int64, countTx int, countAccount int) (res []testTx) { + maxPriority := 100 + r := rand.New(rand.NewSource(seed)) + accounts := simtypes.RandomAccounts(r, countAccount) + accountNonces := make(map[string]uint64) + for _, account := range accounts { + accountNonces[account.Address.String()] = 0 + } + + for i := 0; i < countTx; i++ { + addr := accounts[r.Intn(countAccount)].Address + priority := int64(r.Intn(maxPriority + 1)) + nonce := accountNonces[addr.String()] + accountNonces[addr.String()] = nonce + 1 + res = append(res, testTx{ + priority: priority, + nonce: nonce, + address: addr, + id: i}) + } + + return res +} + +// since there are multiple valid ordered graph traversals for a given set of txs strict +// validation against the ordered txs generated from this function is not possible as written +func genOrderedTxs(seed int64, maxTx int, numAcc int) (ordered []testTx, shuffled []testTx) { + r := rand.New(rand.NewSource(seed)) + accountNonces := make(map[string]uint64) + prange := 10 + randomAccounts := simtypes.RandomAccounts(r, numAcc) + for _, account := range randomAccounts { + accountNonces[account.Address.String()] = 0 + } + + getRandAccount := func(notAddress string) simtypes.Account { + for { + res := randomAccounts[r.Intn(len(randomAccounts))] + if res.Address.String() != notAddress { + return res + } + } + } + + txCursor := int64(10000) + ptx := testTx{address: getRandAccount("").Address, nonce: 0, priority: txCursor} + samepChain := make(map[string]bool) + for i := 0; i < maxTx; { + var tx testTx + move := r.Intn(5) + switch move { + case 0: + // same sender, less p + nonce := ptx.nonce + 1 + tx = testTx{nonce: nonce, address: ptx.address, priority: txCursor - int64(r.Intn(prange)+1)} + txCursor = tx.priority + case 1: + // same sender, same p + nonce := ptx.nonce + 1 + tx = testTx{nonce: nonce, address: ptx.address, priority: ptx.priority} + case 2: + // same sender, greater p + nonce := ptx.nonce + 1 + tx = testTx{nonce: nonce, address: ptx.address, priority: ptx.priority + int64(r.Intn(prange)+1)} + case 3: + // different sender, less p + sender := getRandAccount(ptx.address.String()).Address + nonce := accountNonces[sender.String()] + 1 + tx = testTx{nonce: nonce, address: sender, priority: txCursor - int64(r.Intn(prange)+1)} + txCursor = tx.priority + case 4: + // different sender, same p + sender := getRandAccount(ptx.address.String()).Address + // disallow generating cycles of same p txs. this is an invalid processing order according to our + // algorithm decision. + if _, ok := samepChain[sender.String()]; ok { + continue + } + nonce := accountNonces[sender.String()] + 1 + tx = testTx{nonce: nonce, address: sender, priority: txCursor} + samepChain[sender.String()] = true + } + tx.id = i + accountNonces[tx.address.String()] = tx.nonce + ordered = append(ordered, tx) + ptx = tx + i++ + if move != 4 { + samepChain = make(map[string]bool) + } + } + + for _, item := range ordered { + tx := testTx{ + priority: item.priority, + nonce: item.nonce, + address: item.address, + id: item.id, + } + shuffled = append(shuffled, tx) + } + rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) + return ordered, shuffled +} + +func TestTxOrderN(t *testing.T) { + numTx := 10 + + seed := time.Now().UnixNano() + ordered, shuffled := genOrderedTxs(seed, numTx, 3) + require.Equal(t, numTx, len(ordered)) + require.Equal(t, numTx, len(shuffled)) + + fmt.Println("ordered") + for _, tx := range ordered { + fmt.Printf("%s, %d, %d\n", tx.address, tx.priority, tx.nonce) + } + + fmt.Println("shuffled") + for _, tx := range shuffled { + fmt.Printf("%s, %d, %d\n", tx.address, tx.priority, tx.nonce) + } +} diff --git a/types/mempool/skip_list_test.go b/types/mempool/skip_list_test.go index f13f297f12..a3154910b5 100644 --- a/types/mempool/skip_list_test.go +++ b/types/mempool/skip_list_test.go @@ -3,7 +3,7 @@ package mempool_test import ( "testing" - huandu "github.com/huandu/skiplist" + "github.com/huandu/skiplist" "github.com/stretchr/testify/require" ) @@ -13,7 +13,7 @@ type collisionKey struct { } func TestSkipListCollisions(t *testing.T) { - integerList := huandu.New(huandu.Int) + integerList := skiplist.New(skiplist.Int) integerList.Set(1, 1) integerList.Set(2, 2) @@ -34,13 +34,13 @@ func TestSkipListCollisions(t *testing.T) { require.Equal(t, 4, integerList.Get(1).Value) // prove this again with a compound key - compoundList := huandu.New(huandu.LessThanFunc(func(x, y any) int { + compoundList := skiplist.New(skiplist.LessThanFunc(func(x, y any) int { kx := x.(collisionKey) ky := y.(collisionKey) if kx.a == ky.a { - return huandu.Int.Compare(kx.b, ky.b) + return skiplist.Int.Compare(kx.b, ky.b) } - return huandu.Int.Compare(kx.a, ky.a) + return skiplist.Int.Compare(kx.a, ky.a) })) compoundList.Set(collisionKey{a: 1, b: 1}, 1)