chore: UX Mempool Tweaks (#15121)

This commit is contained in:
Aleksandr Bezobchuk 2023-02-21 21:21:24 -05:00 committed by GitHub
parent 27f3c729a3
commit 77660ec452
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 67 deletions

View File

@ -12,18 +12,18 @@ import (
)
var (
_ Mempool = (*priorityNonceMempool)(nil)
_ Iterator = (*priorityNonceIterator)(nil)
_ Mempool = (*PriorityNonceMempool)(nil)
_ Iterator = (*PriorityNonceIterator)(nil)
)
// priorityNonceMempool is a mempool implementation that stores txs
// PriorityNonceMempool is a mempool implementation that stores txs
// in a partially ordered set by 2 dimensions: priority, and sender-nonce
// (sequence number). Internally it uses one priority ordered skip list and one
// skip list per sender ordered by sender-nonce (sequence number). When there
// are multiple txs from the same sender, they are not always comparable by
// priority to other sender txs and must be partially ordered by both sender-nonce
// and priority.
type priorityNonceMempool struct {
type PriorityNonceMempool struct {
priorityIndex *skiplist.SkipList
priorityCounts map[int64]int
senderIndices map[string]*skiplist.SkipList
@ -33,12 +33,12 @@ type priorityNonceMempool struct {
maxTx int
}
type priorityNonceIterator struct {
type PriorityNonceIterator struct {
senderCursors map[string]*skiplist.Element
nextPriority int64
sender string
priorityNode *skiplist.Element
mempool *priorityNonceMempool
mempool *PriorityNonceMempool
}
// txMeta stores transaction metadata used in indices
@ -67,15 +67,17 @@ func txMetaLess(a, b any) int {
return res
}
// weight is used as a tiebreaker for transactions with the same priority. weight is calculated in a single
// pass in .Select(...) and so will be 0 on .Insert(...)
// Weight is used as a tiebreaker for transactions with the same priority.
// Weight is calculated in a single pass in .Select(...) and so will be 0
// on .Insert(...).
res = skiplist.Int64.Compare(keyA.weight, keyB.weight)
if res != 0 {
return res
}
// Because weight will be 0 on .Insert(...), we must also compare sender and nonce to resolve priority collisions.
// If we didn't then transactions with the same priority would overwrite each other in the priority index.
// Because weight will be 0 on .Insert(...), we must also compare sender and
// nonce to resolve priority collisions. If we didn't then transactions with
// the same priority would overwrite each other in the priority index.
res = skiplist.String.Compare(keyA.sender, keyB.sender)
if res != 0 {
return res
@ -84,30 +86,33 @@ func txMetaLess(a, b any) int {
return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce)
}
type PriorityNonceMempoolOption func(*priorityNonceMempool)
type PriorityNonceMempoolOption func(*PriorityNonceMempool)
// PriorityNonceWithOnRead sets a callback to be called when a tx is read from the mempool.
// PriorityNonceWithOnRead sets a callback to be called when a tx is read from
// the mempool.
func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption {
return func(mp *priorityNonceMempool) {
return func(mp *PriorityNonceMempool) {
mp.onRead = onRead
}
}
// PriorityNonceWithTxReplacement sets a callback to be called when duplicated transaction nonce detected during mempool insert.
// Application can define a transaction replacement rule based on tx priority or certain transaction fields.
// PriorityNonceWithTxReplacement sets a callback to be called when duplicated
// transaction nonce detected during mempool insert. An application can define a
// transaction replacement rule based on tx priority or certain transaction fields.
func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption {
return func(mp *priorityNonceMempool) {
return func(mp *PriorityNonceMempool) {
mp.txReplacement = txReplacementRule
}
}
// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the mempool with the semantics:
// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the
// mempool with the semantics:
//
// <0: disabled, `Insert` is a no-op
// 0: unlimited
// >0: maximum number of transactions allowed
func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption {
return func(mp *priorityNonceMempool) {
return func(mp *PriorityNonceMempool) {
mp.maxTx = maxTx
}
}
@ -119,8 +124,8 @@ func DefaultPriorityMempool() Mempool {
// NewPriorityMempool returns the SDK's default mempool implementation which
// returns txs in a partial order by 2 dimensions; priority, and sender-nonce.
func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool {
mp := &priorityNonceMempool{
func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempool {
mp := &PriorityNonceMempool{
priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)),
priorityCounts: make(map[int64]int),
senderIndices: make(map[string]*skiplist.SkipList),
@ -134,6 +139,19 @@ func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool {
return mp
}
// NextSenderTx returns the next transaction for a given sender by nonce order,
// i.e. the next valid transaction for the sender. If no such transaction exists,
// nil will be returned.
func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx {
senderIndex, ok := mp.senderIndices[sender]
if !ok {
return nil
}
cursor := senderIndex.Front()
return cursor.Value.(sdk.Tx)
}
// Insert attempts to insert a Tx into the app-side mempool in O(log n) time,
// returning an error if unsuccessful. Sender and nonce are derived from the
// transaction's first signature.
@ -143,7 +161,7 @@ func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool {
//
// Inserting a duplicate tx with a different priority overwrites the existing tx,
// changing the total order of the mempool.
func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
return ErrMempoolTxMaxCapacity
} else if mp.maxTx < 0 {
@ -161,7 +179,7 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
sdkContext := sdk.UnwrapSDKContext(ctx)
priority := sdkContext.Priority()
sig := sigs[0]
sender := sig.PubKey.Address().String()
sender := sdk.AccAddress(sig.PubKey.Address()).String()
nonce := sig.Sequence
key := txMeta{nonce: nonce, priority: priority, sender: sender}
@ -215,7 +233,7 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
return nil
}
func (i *priorityNonceIterator) iteratePriority() Iterator {
func (i *PriorityNonceIterator) iteratePriority() Iterator {
// beginning of priority iteration
if i.priorityNode == nil {
i.priorityNode = i.mempool.priorityIndex.Front()
@ -240,7 +258,7 @@ func (i *priorityNonceIterator) iteratePriority() Iterator {
return i.Next()
}
func (i *priorityNonceIterator) Next() Iterator {
func (i *PriorityNonceIterator) Next() Iterator {
if i.priorityNode == nil {
return nil
}
@ -258,14 +276,16 @@ func (i *priorityNonceIterator) Next() Iterator {
if cursor == nil {
return i.iteratePriority()
}
key := cursor.Key().(txMeta)
// we've reached a transaction with a priority lower than the next highest priority in the pool
// We've reached a transaction with a priority lower than the next highest
// priority in the pool.
if key.priority < i.nextPriority {
return i.iteratePriority()
} else if key.priority == i.nextPriority {
// weight is incorporated into the priority index key only (not sender index) so we must fetch it here
// from the scores map.
// Weight is incorporated into the priority index key only (not sender index)
// so we must fetch it here from the scores map.
weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight
if weight < i.priorityNode.Next().Key().(txMeta).weight {
return i.iteratePriority()
@ -276,7 +296,7 @@ func (i *priorityNonceIterator) Next() Iterator {
return i
}
func (i *priorityNonceIterator) Tx() sdk.Tx {
func (i *PriorityNonceIterator) Tx() sdk.Tx {
return i.senderCursors[i.sender].Value.(sdk.Tx)
}
@ -286,14 +306,14 @@ func (i *priorityNonceIterator) Tx() sdk.Tx {
//
// The maxBytes parameter defines the maximum number of bytes of transactions to
// return.
func (mp *priorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
if mp.priorityIndex.Len() == 0 {
return nil
}
mp.reorderPriorityTies()
iterator := &priorityNonceIterator{
iterator := &PriorityNonceIterator{
mempool: mp,
senderCursors: make(map[string]*skiplist.Element),
}
@ -307,8 +327,9 @@ type reorderKey struct {
tx sdk.Tx
}
func (mp *priorityNonceMempool) reorderPriorityTies() {
func (mp *PriorityNonceMempool) reorderPriorityTies() {
node := mp.priorityIndex.Front()
var reordering []reorderKey
for node != nil {
key := node.Key().(txMeta)
@ -317,6 +338,7 @@ func (mp *priorityNonceMempool) reorderPriorityTies() {
newKey.weight = senderWeight(key.senderElement)
reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)})
}
node = node.Next()
}
@ -328,13 +350,15 @@ func (mp *priorityNonceMempool) reorderPriorityTies() {
}
}
// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is defined as the first (nonce-wise)
// same sender tx with a priority not equal to t. It is used to resolve priority collisions, that is when 2 or more
// txs from different senders have the same priority.
// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is
// defined as the first (nonce-wise) same sender tx with a priority not equal to
// t. It is used to resolve priority collisions, that is when 2 or more txs from
// different senders have the same priority.
func senderWeight(senderCursor *skiplist.Element) int64 {
if senderCursor == nil {
return 0
}
weight := senderCursor.Key().(txMeta).priority
senderCursor = senderCursor.Next()
for senderCursor != nil {
@ -342,6 +366,7 @@ func senderWeight(senderCursor *skiplist.Element) int64 {
if p != weight {
weight = p
}
senderCursor = senderCursor.Next()
}
@ -349,12 +374,13 @@ func senderWeight(senderCursor *skiplist.Element) int64 {
}
// CountTx returns the number of transactions in the mempool.
func (mp *priorityNonceMempool) CountTx() int {
func (mp *PriorityNonceMempool) CountTx() int {
return mp.priorityIndex.Len()
}
// Remove removes a transaction from the mempool in O(log n) time, returning an error if unsuccessful.
func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error {
// Remove removes a transaction from the mempool in O(log n) time, returning an
// error if unsuccessful.
func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
@ -364,7 +390,7 @@ func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error {
}
sig := sigs[0]
sender := sig.PubKey.Address().String()
sender := sdk.AccAddress(sig.PubKey.Address()).String()
nonce := sig.Sequence
scoreKey := txMeta{nonce: nonce, sender: sender}
@ -388,7 +414,7 @@ func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error {
}
func IsEmpty(mempool Mempool) error {
mp := mempool.(*priorityNonceMempool)
mp := mempool.(*PriorityNonceMempool)
if mp.priorityIndex.Len() != 0 {
return fmt.Errorf("priorityIndex not empty")
}
@ -397,6 +423,7 @@ func IsEmpty(mempool Mempool) error {
for k := range mp.priorityCounts {
countKeys = append(countKeys, k)
}
for _, k := range countKeys {
if mp.priorityCounts[k] != 0 {
return fmt.Errorf("priorityCounts not zero at %v, got %v", k, mp.priorityCounts[k])
@ -407,6 +434,7 @@ func IsEmpty(mempool Mempool) error {
for k := range mp.senderIndices {
senderKeys = append(senderKeys, k)
}
for _, k := range senderKeys {
if mp.senderIndices[k].Len() != 0 {
return fmt.Errorf("senderIndex not empty for sender %v", k)

View File

@ -157,8 +157,8 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() {
order: []int{0, 1, 2, 3},
},
{
// if all txs have the same priority they will be ordered lexically sender address, and nonce with the
// sender.
// If all txs have the same priority they will be ordered lexically sender
// address, and nonce with the sender.
txs: []txSpec{
{p: 10, n: 7, a: sc},
{p: 10, n: 8, a: sc},
@ -170,12 +170,12 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() {
{p: 10, n: 5, a: sb},
{p: 10, n: 6, a: sb},
},
order: []int{0, 1, 2, 3, 4, 5, 6, 7, 8},
order: []int{3, 4, 5, 6, 7, 8, 0, 1, 2},
},
/*
The next 4 tests are different permutations of the same set:
{p: 5, n: 1, a: sa},
{p: 5, n: 1, a: sa},
{p: 10, n: 2, a: sa},
{p: 20, n: 2, a: sb},
{p: 5, n: 1, a: sb},
@ -240,11 +240,12 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() {
}
orderedTxs := fetchTxs(pool.Select(ctx, nil), 1000)
var txOrder []int
for _, tx := range orderedTxs {
txOrder = append(txOrder, tx.(testTx).id)
fmt.Println(tx)
}
require.Equal(t, tt.order, txOrder)
require.NoError(t, validateOrder(orderedTxs))
@ -582,7 +583,37 @@ func TestTxOrderN(t *testing.T) {
}
}
func TestTxLimit(t *testing.T) {
func TestPriorityNonceMempool_NextSenderTx(t *testing.T) {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 2)
ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger())
accA := accounts[0].Address
accB := accounts[1].Address
mp := mempool.NewPriorityMempool()
txs := []testTx{
{priority: 20, nonce: 1, address: accA},
{priority: 15, nonce: 2, address: accA},
{priority: 66, nonce: 3, address: accA},
{priority: 20, nonce: 4, address: accA},
{priority: 88, nonce: 5, address: accA},
}
for i, tx := range txs {
c := ctx.WithPriority(tx.priority)
require.NoError(t, mp.Insert(c, tx))
require.Equal(t, i+1, mp.CountTx())
}
tx := mp.NextSenderTx(accB.String())
require.Nil(t, tx)
tx = mp.NextSenderTx(accA.String())
require.NotNil(t, tx)
require.Equal(t, txs[0], tx)
}
func TestNextSenderTx_TxLimit(t *testing.T) {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 2)
ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger())
sa := accounts[0].Address
@ -639,7 +670,7 @@ func TestTxLimit(t *testing.T) {
}
}
func TestTxReplacement(t *testing.T) {
func TestNextSenderTx_TxReplacement(t *testing.T) {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1)
ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger())
sa := accounts[0].Address

View File

@ -14,13 +14,13 @@ import (
)
var (
_ Mempool = (*senderNonceMempool)(nil)
_ Iterator = (*senderNonceMepoolIterator)(nil)
_ Mempool = (*SenderNonceMempool)(nil)
_ Iterator = (*senderNonceMempoolIterator)(nil)
)
var DefaultMaxTx = 0
// senderNonceMempool is a mempool that prioritizes transactions within a sender
// SenderNonceMempool is a mempool that prioritizes transactions within a sender
// by nonce, the lowest first, but selects a random sender on each iteration.
// The mempool is iterated by:
//
@ -30,14 +30,14 @@ var DefaultMaxTx = 0
//
// Note that PrepareProposal could choose to stop iteration before reaching the
// end if maxBytes is reached.
type senderNonceMempool struct {
type SenderNonceMempool struct {
senders map[string]*skiplist.SkipList
rnd *rand.Rand
maxTx int
existingTx map[txKey]bool
}
type SenderNonceOptions func(mp *senderNonceMempool)
type SenderNonceOptions func(mp *SenderNonceMempool)
type txKey struct {
address string
@ -46,10 +46,10 @@ type txKey struct {
// NewSenderNonceMempool creates a new mempool that prioritizes transactions by
// nonce, the lowest first, picking a random sender on each iteration.
func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool {
func NewSenderNonceMempool(opts ...SenderNonceOptions) *SenderNonceMempool {
senderMap := make(map[string]*skiplist.SkipList)
existingTx := make(map[txKey]bool)
snp := &senderNonceMempool{
snp := &SenderNonceMempool{
senders: senderMap,
maxTx: DefaultMaxTx,
existingTx: existingTx,
@ -78,7 +78,7 @@ func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool {
// random_seed := int64(1000)
// NewSenderNonceMempool(SenderNonceSeedTxOpt(random_seed))
func SenderNonceSeedOpt(seed int64) SenderNonceOptions {
return func(snp *senderNonceMempool) {
return func(snp *SenderNonceMempool) {
snp.setSeed(seed)
}
}
@ -90,19 +90,32 @@ func SenderNonceSeedOpt(seed int64) SenderNonceOptions {
//
// NewSenderNonceMempool(SenderNonceMaxTxOpt(100))
func SenderNonceMaxTxOpt(maxTx int) SenderNonceOptions {
return func(snp *senderNonceMempool) {
return func(snp *SenderNonceMempool) {
snp.maxTx = maxTx
}
}
func (snm *senderNonceMempool) setSeed(seed int64) {
func (snm *SenderNonceMempool) setSeed(seed int64) {
s1 := rand.NewSource(seed)
snm.rnd = rand.New(s1) //#nosec // math/rand is seeded from crypto/rand by default
}
// NextSenderTx returns the next transaction for a given sender by nonce order,
// i.e. the next valid transaction for the sender. If no such transaction exists,
// nil will be returned.
func (mp *SenderNonceMempool) NextSenderTx(sender string) sdk.Tx {
senderIndex, ok := mp.senders[sender]
if !ok {
return nil
}
cursor := senderIndex.Front()
return cursor.Value.(sdk.Tx)
}
// Insert adds a tx to the mempool. It returns an error if the tx does not have
// at least one signer. Note, priority is ignored.
func (snm *senderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx {
return ErrMempoolTxMaxCapacity
}
@ -119,7 +132,7 @@ func (snm *senderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
}
sig := sigs[0]
sender := sig.PubKey.Address().String()
sender := sdk.AccAddress(sig.PubKey.Address()).String()
nonce := sig.Sequence
senderTxs, found := snm.senders[sender]
@ -138,7 +151,7 @@ func (snm *senderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
// Select returns an iterator ordering transactions the mempool with the lowest
// nonce of a random selected sender first.
func (snm *senderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
func (snm *SenderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
var senders []string
senderCursors := make(map[string]*skiplist.Element)
@ -157,7 +170,7 @@ func (snm *senderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
s = s.Next()
}
iter := &senderNonceMepoolIterator{
iter := &senderNonceMempoolIterator{
senders: senders,
rnd: snm.rnd,
senderCursors: senderCursors,
@ -167,13 +180,13 @@ func (snm *senderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
}
// CountTx returns the total count of txs in the mempool.
func (snm *senderNonceMempool) CountTx() int {
func (snm *SenderNonceMempool) CountTx() int {
return len(snm.existingTx)
}
// Remove removes a tx from the mempool. It returns an error if the tx does not
// have at least one signer or the tx was not found in the pool.
func (snm *senderNonceMempool) Remove(tx sdk.Tx) error {
func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
@ -183,7 +196,7 @@ func (snm *senderNonceMempool) Remove(tx sdk.Tx) error {
}
sig := sigs[0]
sender := sig.PubKey.Address().String()
sender := sdk.AccAddress(sig.PubKey.Address()).String()
nonce := sig.Sequence
senderTxs, found := snm.senders[sender]
@ -206,7 +219,7 @@ func (snm *senderNonceMempool) Remove(tx sdk.Tx) error {
return nil
}
type senderNonceMepoolIterator struct {
type senderNonceMempoolIterator struct {
rnd *rand.Rand
currentTx *skiplist.Element
senders []string
@ -215,7 +228,7 @@ type senderNonceMepoolIterator struct {
// Next returns the next iterator state which will contain a tx with the next
// smallest nonce of a randomly selected sender.
func (i *senderNonceMepoolIterator) Next() Iterator {
func (i *senderNonceMempoolIterator) Next() Iterator {
for len(i.senders) > 0 {
senderIndex := i.rnd.Intn(len(i.senders))
sender := i.senders[senderIndex]
@ -231,7 +244,7 @@ func (i *senderNonceMepoolIterator) Next() Iterator {
i.senders = removeAtIndex(i.senders, senderIndex)
}
return &senderNonceMepoolIterator{
return &senderNonceMempoolIterator{
senders: i.senders,
currentTx: senderCursor,
rnd: i.rnd,
@ -242,7 +255,7 @@ func (i *senderNonceMepoolIterator) Next() Iterator {
return nil
}
func (i *senderNonceMepoolIterator) Tx() sdk.Tx {
func (i *senderNonceMempoolIterator) Tx() sdk.Tx {
return i.currentTx.Value.(sdk.Tx)
}