refactor: Use Custom Priority in Priority Nonce Mempool (#15328)
This commit is contained in:
parent
a53aee4eea
commit
1c31e98903
@ -62,6 +62,9 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
|
||||
### Improvements
|
||||
|
||||
* (mempool) [#15328](https://github.com/cosmos/cosmos-sdk/pull/15328) Improve the `PriorityNonceMempool`
|
||||
* Support generic transaction prioritization, instead of `ctx.Priority()`
|
||||
* Improve construction through the use of a single `PriorityNonceMempoolConfig` instead of option functions
|
||||
* (x/authz) [#15164](https://github.com/cosmos/cosmos-sdk/pull/15164) Add `MsgCancelUnbondingDelegation` to staking authorization
|
||||
* (server) [#15358](https://github.com/cosmos/cosmos-sdk/pull/15358) Add `server.InterceptConfigsAndCreateContext` as alternative to `server.InterceptConfigsPreRunHandler` which does not set the server context and the default SDK logger.
|
||||
* [#15011](https://github.com/cosmos/cosmos-sdk/pull/15011) Introduce `cosmossdk.io/log` package to provide a consistent logging interface through the SDK. CometBFT logger is now replaced by `cosmossdk.io/log.Logger`.
|
||||
@ -97,6 +100,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
|
||||
### API Breaking Changes
|
||||
|
||||
* (mempool) [#15328](https://github.com/cosmos/cosmos-sdk/pull/15328) The `PriorityNonceMempool` is now generic over type `C comparable` and takes a single `PriorityNonceMempoolConfig[C]` argument. See `DefaultPriorityNonceMempoolConfig` for how to construct the configuration and a `TxPriority` type.
|
||||
* (server) [#15358](https://github.com/cosmos/cosmos-sdk/pull/15358) Remove `server.ErrorCode` that was not used anywhere.
|
||||
* [#15211](https://github.com/cosmos/cosmos-sdk/pull/15211) Remove usage of `github.com/cometbft/cometbft/libs/bytes.HexBytes` in favor of `[]byte` thorough the SDK.
|
||||
* [#15011](https://github.com/cosmos/cosmos-sdk/pull/15011) All functions that were taking a CometBFT logger, now take `cosmossdk.io/log.Logger` instead.
|
||||
|
||||
@ -12,137 +12,169 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
_ Mempool = (*PriorityNonceMempool)(nil)
|
||||
_ Iterator = (*PriorityNonceIterator)(nil)
|
||||
_ Mempool = (*PriorityNonceMempool[int64])(nil)
|
||||
_ Iterator = (*PriorityNonceIterator[int64])(nil)
|
||||
)
|
||||
|
||||
// PriorityNonceMempool is a mempool implementation that stores txs
|
||||
// in a partially ordered set by 2 dimensions: priority, and sender-nonce
|
||||
// (sequence number). Internally it uses one priority ordered skip list and one
|
||||
// skip list per sender ordered by sender-nonce (sequence number). When there
|
||||
// are multiple txs from the same sender, they are not always comparable by
|
||||
// priority to other sender txs and must be partially ordered by both sender-nonce
|
||||
// and priority.
|
||||
type PriorityNonceMempool struct {
|
||||
priorityIndex *skiplist.SkipList
|
||||
priorityCounts map[int64]int
|
||||
senderIndices map[string]*skiplist.SkipList
|
||||
scores map[txMeta]txMeta
|
||||
onRead func(tx sdk.Tx)
|
||||
txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool
|
||||
maxTx int
|
||||
type (
|
||||
// PriorityNonceMempoolConfig defines the configuration used to configure the
|
||||
// PriorityNonceMempool.
|
||||
PriorityNonceMempoolConfig[C comparable] struct {
|
||||
// TxPriority defines the transaction priority and comparator.
|
||||
TxPriority TxPriority[C]
|
||||
|
||||
// OnRead is a callback to be called when a tx is read from the mempool.
|
||||
OnRead func(tx sdk.Tx)
|
||||
|
||||
// TxReplacement is a callback to be called when duplicated transaction nonce
|
||||
// detected during mempool insert. An application can define a transaction
|
||||
// replacement rule based on tx priority or certain transaction fields.
|
||||
TxReplacement func(op, np C, oTx, nTx sdk.Tx) bool
|
||||
|
||||
// MaxTx sets the maximum number of transactions allowed in the mempool with
|
||||
// the semantics:
|
||||
// - if MaxTx == 0, there is no cap on the number of transactions in the mempool
|
||||
// - if MaxTx > 0, the mempool will cap the number of transactions it stores,
|
||||
// and will prioritize transactions by their priority and sender-nonce
|
||||
// (sequence number) when evicting transactions.
|
||||
// - if MaxTx < 0, `Insert` is a no-op.
|
||||
MaxTx int
|
||||
}
|
||||
|
||||
// PriorityNonceMempool is a mempool implementation that stores txs
|
||||
// in a partially ordered set by 2 dimensions: priority, and sender-nonce
|
||||
// (sequence number). Internally it uses one priority ordered skip list and one
|
||||
// skip list per sender ordered by sender-nonce (sequence number). When there
|
||||
// are multiple txs from the same sender, they are not always comparable by
|
||||
// priority to other sender txs and must be partially ordered by both sender-nonce
|
||||
// and priority.
|
||||
PriorityNonceMempool[C comparable] struct {
|
||||
priorityIndex *skiplist.SkipList
|
||||
priorityCounts map[C]int
|
||||
senderIndices map[string]*skiplist.SkipList
|
||||
scores map[txMeta[C]]txMeta[C]
|
||||
cfg PriorityNonceMempoolConfig[C]
|
||||
}
|
||||
|
||||
// PriorityNonceIterator defines an iterator that is used for mempool iteration
|
||||
// on Select().
|
||||
PriorityNonceIterator[C comparable] struct {
|
||||
mempool *PriorityNonceMempool[C]
|
||||
priorityNode *skiplist.Element
|
||||
senderCursors map[string]*skiplist.Element
|
||||
sender string
|
||||
nextPriority C
|
||||
}
|
||||
|
||||
// TxPriority defines a type that is used to retrieve and compare transaction
|
||||
// priorities. Priorities must be comparable.
|
||||
TxPriority[C comparable] struct {
|
||||
// GetTxPriority returns the priority of the transaction. A priority must be
|
||||
// comparable via Compare.
|
||||
GetTxPriority func(ctx context.Context, tx sdk.Tx) C
|
||||
|
||||
// Compare compares two transaction priorities. The result must be 0 if
|
||||
// a == b, -1 if a < b, and +1 if a > b.
|
||||
Compare func(a, b C) int
|
||||
|
||||
// MinValue defines the minimum priority value, e.g. MinInt64. This value is
|
||||
// used when instantiating a new iterator and comparing weights.
|
||||
MinValue C
|
||||
}
|
||||
|
||||
// txMeta stores transaction metadata used in indices
|
||||
txMeta[C comparable] struct {
|
||||
// nonce is the sender's sequence number
|
||||
nonce uint64
|
||||
// priority is the transaction's priority
|
||||
priority C
|
||||
// sender is the transaction's sender
|
||||
sender string
|
||||
// weight is the transaction's weight, used as a tiebreaker for transactions
|
||||
// with the same priority
|
||||
weight C
|
||||
// senderElement is a pointer to the transaction's element in the sender index
|
||||
senderElement *skiplist.Element
|
||||
}
|
||||
)
|
||||
|
||||
// NewDefaultTxPriority returns a TxPriority comparator using ctx.Priority as
|
||||
// the defining transaction priority.
|
||||
func NewDefaultTxPriority() TxPriority[int64] {
|
||||
return TxPriority[int64]{
|
||||
GetTxPriority: func(goCtx context.Context, _ sdk.Tx) int64 {
|
||||
return sdk.UnwrapSDKContext(goCtx).Priority()
|
||||
},
|
||||
Compare: func(a, b int64) int {
|
||||
return skiplist.Int64.Compare(a, b)
|
||||
},
|
||||
MinValue: math.MinInt64,
|
||||
}
|
||||
}
|
||||
|
||||
type PriorityNonceIterator struct {
|
||||
senderCursors map[string]*skiplist.Element
|
||||
nextPriority int64
|
||||
sender string
|
||||
priorityNode *skiplist.Element
|
||||
mempool *PriorityNonceMempool
|
||||
func DefaultPriorityNonceMempoolConfig() PriorityNonceMempoolConfig[int64] {
|
||||
return PriorityNonceMempoolConfig[int64]{
|
||||
TxPriority: NewDefaultTxPriority(),
|
||||
}
|
||||
}
|
||||
|
||||
// txMeta stores transaction metadata used in indices
|
||||
type txMeta struct {
|
||||
// nonce is the sender's sequence number
|
||||
nonce uint64
|
||||
// priority is the transaction's priority
|
||||
priority int64
|
||||
// sender is the transaction's sender
|
||||
sender string
|
||||
// weight is the transaction's weight, used as a tiebreaker for transactions with the same priority
|
||||
weight int64
|
||||
// senderElement is a pointer to the transaction's element in the sender index
|
||||
senderElement *skiplist.Element
|
||||
}
|
||||
|
||||
// txMetaLess is a comparator for txKeys that first compares priority, then weight,
|
||||
// then sender, then nonce, uniquely identifying a transaction.
|
||||
// skiplistComparable is a comparator for txKeys that first compares priority,
|
||||
// then weight, then sender, then nonce, uniquely identifying a transaction.
|
||||
//
|
||||
// Note, txMetaLess is used as the comparator in the priority index.
|
||||
func txMetaLess(a, b any) int {
|
||||
keyA := a.(txMeta)
|
||||
keyB := b.(txMeta)
|
||||
res := skiplist.Int64.Compare(keyA.priority, keyB.priority)
|
||||
if res != 0 {
|
||||
return res
|
||||
}
|
||||
// Note, skiplistComparable is used as the comparator in the priority index.
|
||||
func skiplistComparable[C comparable](txPriority TxPriority[C]) skiplist.Comparable {
|
||||
return skiplist.LessThanFunc(func(a, b any) int {
|
||||
keyA := a.(txMeta[C])
|
||||
keyB := b.(txMeta[C])
|
||||
|
||||
// Weight is used as a tiebreaker for transactions with the same priority.
|
||||
// Weight is calculated in a single pass in .Select(...) and so will be 0
|
||||
// on .Insert(...).
|
||||
res = skiplist.Int64.Compare(keyA.weight, keyB.weight)
|
||||
if res != 0 {
|
||||
return res
|
||||
}
|
||||
res := txPriority.Compare(keyA.priority, keyB.priority)
|
||||
if res != 0 {
|
||||
return res
|
||||
}
|
||||
|
||||
// Because weight will be 0 on .Insert(...), we must also compare sender and
|
||||
// nonce to resolve priority collisions. If we didn't then transactions with
|
||||
// the same priority would overwrite each other in the priority index.
|
||||
res = skiplist.String.Compare(keyA.sender, keyB.sender)
|
||||
if res != 0 {
|
||||
return res
|
||||
}
|
||||
// Weight is used as a tiebreaker for transactions with the same priority.
|
||||
// Weight is calculated in a single pass in .Select(...) and so will be 0
|
||||
// on .Insert(...).
|
||||
res = txPriority.Compare(keyA.weight, keyB.weight)
|
||||
if res != 0 {
|
||||
return res
|
||||
}
|
||||
|
||||
return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce)
|
||||
}
|
||||
// Because weight will be 0 on .Insert(...), we must also compare sender and
|
||||
// nonce to resolve priority collisions. If we didn't then transactions with
|
||||
// the same priority would overwrite each other in the priority index.
|
||||
res = skiplist.String.Compare(keyA.sender, keyB.sender)
|
||||
if res != 0 {
|
||||
return res
|
||||
}
|
||||
|
||||
type PriorityNonceMempoolOption func(*PriorityNonceMempool)
|
||||
|
||||
// PriorityNonceWithOnRead sets a callback to be called when a tx is read from
|
||||
// the mempool.
|
||||
func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption {
|
||||
return func(mp *PriorityNonceMempool) {
|
||||
mp.onRead = onRead
|
||||
}
|
||||
}
|
||||
|
||||
// PriorityNonceWithTxReplacement sets a callback to be called when duplicated
|
||||
// transaction nonce detected during mempool insert. An application can define a
|
||||
// transaction replacement rule based on tx priority or certain transaction fields.
|
||||
func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption {
|
||||
return func(mp *PriorityNonceMempool) {
|
||||
mp.txReplacement = txReplacementRule
|
||||
}
|
||||
}
|
||||
|
||||
// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the
|
||||
// mempool with the semantics:
|
||||
//
|
||||
// <0: disabled, `Insert` is a no-op
|
||||
// 0: unlimited
|
||||
// >0: maximum number of transactions allowed
|
||||
func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption {
|
||||
return func(mp *PriorityNonceMempool) {
|
||||
mp.maxTx = maxTx
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultPriorityMempool returns a priorityNonceMempool with no options.
|
||||
func DefaultPriorityMempool() Mempool {
|
||||
return NewPriorityMempool()
|
||||
return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce)
|
||||
})
|
||||
}
|
||||
|
||||
// NewPriorityMempool returns the SDK's default mempool implementation which
|
||||
// returns txs in a partial order by 2 dimensions; priority, and sender-nonce.
|
||||
func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempool {
|
||||
mp := &PriorityNonceMempool{
|
||||
priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)),
|
||||
priorityCounts: make(map[int64]int),
|
||||
func NewPriorityMempool[C comparable](cfg PriorityNonceMempoolConfig[C]) *PriorityNonceMempool[C] {
|
||||
mp := &PriorityNonceMempool[C]{
|
||||
priorityIndex: skiplist.New(skiplistComparable(cfg.TxPriority)),
|
||||
priorityCounts: make(map[C]int),
|
||||
senderIndices: make(map[string]*skiplist.SkipList),
|
||||
scores: make(map[txMeta]txMeta),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(mp)
|
||||
scores: make(map[txMeta[C]]txMeta[C]),
|
||||
cfg: cfg,
|
||||
}
|
||||
|
||||
return mp
|
||||
}
|
||||
|
||||
// DefaultPriorityMempool returns a priorityNonceMempool with no options.
|
||||
func DefaultPriorityMempool() *PriorityNonceMempool[int64] {
|
||||
return NewPriorityMempool(DefaultPriorityNonceMempoolConfig())
|
||||
}
|
||||
|
||||
// NextSenderTx returns the next transaction for a given sender by nonce order,
|
||||
// i.e. the next valid transaction for the sender. If no such transaction exists,
|
||||
// nil will be returned.
|
||||
func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx {
|
||||
func (mp *PriorityNonceMempool[C]) NextSenderTx(sender string) sdk.Tx {
|
||||
senderIndex, ok := mp.senderIndices[sender]
|
||||
if !ok {
|
||||
return nil
|
||||
@ -161,10 +193,10 @@ func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx {
|
||||
//
|
||||
// Inserting a duplicate tx with a different priority overwrites the existing tx,
|
||||
// changing the total order of the mempool.
|
||||
func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
||||
if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
|
||||
func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error {
|
||||
if mp.cfg.MaxTx > 0 && mp.CountTx() >= mp.cfg.MaxTx {
|
||||
return ErrMempoolTxMaxCapacity
|
||||
} else if mp.maxTx < 0 {
|
||||
} else if mp.cfg.MaxTx < 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -176,17 +208,16 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
||||
return fmt.Errorf("tx must have at least one signer")
|
||||
}
|
||||
|
||||
sdkContext := sdk.UnwrapSDKContext(ctx)
|
||||
priority := sdkContext.Priority()
|
||||
sig := sigs[0]
|
||||
sender := sdk.AccAddress(sig.PubKey.Address()).String()
|
||||
priority := mp.cfg.TxPriority.GetTxPriority(ctx, tx)
|
||||
nonce := sig.Sequence
|
||||
key := txMeta{nonce: nonce, priority: priority, sender: sender}
|
||||
key := txMeta[C]{nonce: nonce, priority: priority, sender: sender}
|
||||
|
||||
senderIndex, ok := mp.senderIndices[sender]
|
||||
if !ok {
|
||||
senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
|
||||
return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
|
||||
return skiplist.Uint64.Compare(b.(txMeta[C]).nonce, a.(txMeta[C]).nonce)
|
||||
}))
|
||||
|
||||
// initialize sender index if not found
|
||||
@ -200,9 +231,9 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
||||
//
|
||||
// This O(log n) remove operation is rare and only happens when a tx's priority
|
||||
// changes.
|
||||
sk := txMeta{nonce: nonce, sender: sender}
|
||||
sk := txMeta[C]{nonce: nonce, sender: sender}
|
||||
if oldScore, txExists := mp.scores[sk]; txExists {
|
||||
if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) {
|
||||
if mp.cfg.TxReplacement != nil && !mp.cfg.TxReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) {
|
||||
return fmt.Errorf(
|
||||
"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
|
||||
oldScore.priority,
|
||||
@ -212,7 +243,7 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
||||
)
|
||||
}
|
||||
|
||||
mp.priorityIndex.Remove(txMeta{
|
||||
mp.priorityIndex.Remove(txMeta[C]{
|
||||
nonce: nonce,
|
||||
sender: sender,
|
||||
priority: oldScore.priority,
|
||||
@ -227,13 +258,13 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
||||
// existing key.
|
||||
key.senderElement = senderIndex.Set(key, tx)
|
||||
|
||||
mp.scores[sk] = txMeta{priority: priority}
|
||||
mp.scores[sk] = txMeta[C]{priority: priority}
|
||||
mp.priorityIndex.Set(key, tx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *PriorityNonceIterator) iteratePriority() Iterator {
|
||||
func (i *PriorityNonceIterator[C]) iteratePriority() Iterator {
|
||||
// beginning of priority iteration
|
||||
if i.priorityNode == nil {
|
||||
i.priorityNode = i.mempool.priorityIndex.Front()
|
||||
@ -246,19 +277,19 @@ func (i *PriorityNonceIterator) iteratePriority() Iterator {
|
||||
return nil
|
||||
}
|
||||
|
||||
i.sender = i.priorityNode.Key().(txMeta).sender
|
||||
i.sender = i.priorityNode.Key().(txMeta[C]).sender
|
||||
|
||||
nextPriorityNode := i.priorityNode.Next()
|
||||
if nextPriorityNode != nil {
|
||||
i.nextPriority = nextPriorityNode.Key().(txMeta).priority
|
||||
i.nextPriority = nextPriorityNode.Key().(txMeta[C]).priority
|
||||
} else {
|
||||
i.nextPriority = math.MinInt64
|
||||
i.nextPriority = i.mempool.cfg.TxPriority.MinValue
|
||||
}
|
||||
|
||||
return i.Next()
|
||||
}
|
||||
|
||||
func (i *PriorityNonceIterator) Next() Iterator {
|
||||
func (i *PriorityNonceIterator[C]) Next() Iterator {
|
||||
if i.priorityNode == nil {
|
||||
return nil
|
||||
}
|
||||
@ -277,17 +308,17 @@ func (i *PriorityNonceIterator) Next() Iterator {
|
||||
return i.iteratePriority()
|
||||
}
|
||||
|
||||
key := cursor.Key().(txMeta)
|
||||
key := cursor.Key().(txMeta[C])
|
||||
|
||||
// We've reached a transaction with a priority lower than the next highest
|
||||
// priority in the pool.
|
||||
if key.priority < i.nextPriority {
|
||||
if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) < 0 {
|
||||
return i.iteratePriority()
|
||||
} else if key.priority == i.nextPriority {
|
||||
} else if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) == 0 {
|
||||
// Weight is incorporated into the priority index key only (not sender index)
|
||||
// so we must fetch it here from the scores map.
|
||||
weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight
|
||||
if weight < i.priorityNode.Next().Key().(txMeta).weight {
|
||||
weight := i.mempool.scores[txMeta[C]{nonce: key.nonce, sender: key.sender}].weight
|
||||
if i.mempool.cfg.TxPriority.Compare(weight, i.priorityNode.Next().Key().(txMeta[C]).weight) < 0 {
|
||||
return i.iteratePriority()
|
||||
}
|
||||
}
|
||||
@ -296,7 +327,7 @@ func (i *PriorityNonceIterator) Next() Iterator {
|
||||
return i
|
||||
}
|
||||
|
||||
func (i *PriorityNonceIterator) Tx() sdk.Tx {
|
||||
func (i *PriorityNonceIterator[C]) Tx() sdk.Tx {
|
||||
return i.senderCursors[i.sender].Value.(sdk.Tx)
|
||||
}
|
||||
|
||||
@ -306,14 +337,14 @@ func (i *PriorityNonceIterator) Tx() sdk.Tx {
|
||||
//
|
||||
// The maxBytes parameter defines the maximum number of bytes of transactions to
|
||||
// return.
|
||||
func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
|
||||
func (mp *PriorityNonceMempool[C]) Select(_ context.Context, _ [][]byte) Iterator {
|
||||
if mp.priorityIndex.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
mp.reorderPriorityTies()
|
||||
|
||||
iterator := &PriorityNonceIterator{
|
||||
iterator := &PriorityNonceIterator[C]{
|
||||
mempool: mp,
|
||||
senderCursors: make(map[string]*skiplist.Element),
|
||||
}
|
||||
@ -321,22 +352,22 @@ func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
|
||||
return iterator.iteratePriority()
|
||||
}
|
||||
|
||||
type reorderKey struct {
|
||||
deleteKey txMeta
|
||||
insertKey txMeta
|
||||
type reorderKey[C comparable] struct {
|
||||
deleteKey txMeta[C]
|
||||
insertKey txMeta[C]
|
||||
tx sdk.Tx
|
||||
}
|
||||
|
||||
func (mp *PriorityNonceMempool) reorderPriorityTies() {
|
||||
func (mp *PriorityNonceMempool[C]) reorderPriorityTies() {
|
||||
node := mp.priorityIndex.Front()
|
||||
|
||||
var reordering []reorderKey
|
||||
var reordering []reorderKey[C]
|
||||
for node != nil {
|
||||
key := node.Key().(txMeta)
|
||||
key := node.Key().(txMeta[C])
|
||||
if mp.priorityCounts[key.priority] > 1 {
|
||||
newKey := key
|
||||
newKey.weight = senderWeight(key.senderElement)
|
||||
reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)})
|
||||
newKey.weight = senderWeight(mp.cfg.TxPriority, key.senderElement)
|
||||
reordering = append(reordering, reorderKey[C]{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)})
|
||||
}
|
||||
|
||||
node = node.Next()
|
||||
@ -344,9 +375,9 @@ func (mp *PriorityNonceMempool) reorderPriorityTies() {
|
||||
|
||||
for _, k := range reordering {
|
||||
mp.priorityIndex.Remove(k.deleteKey)
|
||||
delete(mp.scores, txMeta{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender})
|
||||
delete(mp.scores, txMeta[C]{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender})
|
||||
mp.priorityIndex.Set(k.insertKey, k.tx)
|
||||
mp.scores[txMeta{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey
|
||||
mp.scores[txMeta[C]{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey
|
||||
}
|
||||
}
|
||||
|
||||
@ -354,16 +385,16 @@ func (mp *PriorityNonceMempool) reorderPriorityTies() {
|
||||
// defined as the first (nonce-wise) same sender tx with a priority not equal to
|
||||
// t. It is used to resolve priority collisions, that is when 2 or more txs from
|
||||
// different senders have the same priority.
|
||||
func senderWeight(senderCursor *skiplist.Element) int64 {
|
||||
func senderWeight[C comparable](txPriority TxPriority[C], senderCursor *skiplist.Element) C {
|
||||
if senderCursor == nil {
|
||||
return 0
|
||||
return txPriority.MinValue
|
||||
}
|
||||
|
||||
weight := senderCursor.Key().(txMeta).priority
|
||||
weight := senderCursor.Key().(txMeta[C]).priority
|
||||
senderCursor = senderCursor.Next()
|
||||
for senderCursor != nil {
|
||||
p := senderCursor.Key().(txMeta).priority
|
||||
if p != weight {
|
||||
p := senderCursor.Key().(txMeta[C]).priority
|
||||
if txPriority.Compare(p, weight) != 0 {
|
||||
weight = p
|
||||
}
|
||||
|
||||
@ -374,13 +405,13 @@ func senderWeight(senderCursor *skiplist.Element) int64 {
|
||||
}
|
||||
|
||||
// CountTx returns the number of transactions in the mempool.
|
||||
func (mp *PriorityNonceMempool) CountTx() int {
|
||||
func (mp *PriorityNonceMempool[C]) CountTx() int {
|
||||
return mp.priorityIndex.Len()
|
||||
}
|
||||
|
||||
// Remove removes a transaction from the mempool in O(log n) time, returning an
|
||||
// error if unsuccessful.
|
||||
func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
|
||||
func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error {
|
||||
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -393,12 +424,12 @@ func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
|
||||
sender := sdk.AccAddress(sig.PubKey.Address()).String()
|
||||
nonce := sig.Sequence
|
||||
|
||||
scoreKey := txMeta{nonce: nonce, sender: sender}
|
||||
scoreKey := txMeta[C]{nonce: nonce, sender: sender}
|
||||
score, ok := mp.scores[scoreKey]
|
||||
if !ok {
|
||||
return ErrTxNotFound
|
||||
}
|
||||
tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight}
|
||||
tk := txMeta[C]{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight}
|
||||
|
||||
senderTxs, ok := mp.senderIndices[sender]
|
||||
if !ok {
|
||||
@ -413,13 +444,13 @@ func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func IsEmpty(mempool Mempool) error {
|
||||
mp := mempool.(*PriorityNonceMempool)
|
||||
func IsEmpty[C comparable](mempool Mempool) error {
|
||||
mp := mempool.(*PriorityNonceMempool[C])
|
||||
if mp.priorityIndex.Len() != 0 {
|
||||
return fmt.Errorf("priorityIndex not empty")
|
||||
}
|
||||
|
||||
var countKeys []int64
|
||||
countKeys := make([]C, 0, len(mp.priorityCounts))
|
||||
for k := range mp.priorityCounts {
|
||||
countKeys = append(countKeys, k)
|
||||
}
|
||||
@ -430,7 +461,7 @@ func IsEmpty(mempool Mempool) error {
|
||||
}
|
||||
}
|
||||
|
||||
var senderKeys []string
|
||||
senderKeys := make([]string, 0, len(mp.senderIndices))
|
||||
for k := range mp.senderIndices {
|
||||
senderKeys = append(senderKeys, k)
|
||||
}
|
||||
|
||||
@ -7,10 +7,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cosmossdk.io/log"
|
||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||
"github.com/cosmos/cosmos-sdk/types/mempool"
|
||||
simtypes "github.com/cosmos/cosmos-sdk/types/simulation"
|
||||
@ -229,7 +229,7 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() {
|
||||
}
|
||||
for i, tt := range tests {
|
||||
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
|
||||
pool := mempool.NewPriorityMempool()
|
||||
pool := mempool.DefaultPriorityMempool()
|
||||
|
||||
// create test txs and insert into mempool
|
||||
for i, ts := range tt.txs {
|
||||
@ -253,7 +253,7 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() {
|
||||
require.NoError(t, pool.Remove(tx))
|
||||
}
|
||||
|
||||
require.NoError(t, mempool.IsEmpty(pool))
|
||||
require.NoError(t, mempool.IsEmpty[int64](pool))
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -275,7 +275,7 @@ func (s *MempoolTestSuite) TestPriorityTies() {
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
s.mempool = mempool.NewPriorityMempool()
|
||||
s.mempool = mempool.DefaultPriorityMempool()
|
||||
var shuffled []txSpec
|
||||
for _, t := range txSet {
|
||||
tx := txSpec{
|
||||
@ -372,9 +372,15 @@ func validateOrder(mtxs []sdk.Tx) error {
|
||||
|
||||
func (s *MempoolTestSuite) TestRandomGeneratedTxs() {
|
||||
s.iterations = 0
|
||||
s.mempool = mempool.NewPriorityMempool(mempool.PriorityNonceWithOnRead(func(tx sdk.Tx) {
|
||||
s.iterations++
|
||||
}))
|
||||
s.mempool = mempool.NewPriorityMempool(
|
||||
mempool.PriorityNonceMempoolConfig[int64]{
|
||||
TxPriority: mempool.NewDefaultTxPriority(),
|
||||
OnRead: func(tx sdk.Tx) {
|
||||
s.iterations++
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
t := s.T()
|
||||
ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger())
|
||||
seed := time.Now().UnixNano()
|
||||
@ -409,7 +415,7 @@ func (s *MempoolTestSuite) TestRandomGeneratedTxs() {
|
||||
|
||||
func (s *MempoolTestSuite) TestRandomWalkTxs() {
|
||||
s.iterations = 0
|
||||
s.mempool = mempool.NewPriorityMempool()
|
||||
s.mempool = mempool.DefaultPriorityMempool()
|
||||
|
||||
t := s.T()
|
||||
ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger())
|
||||
@ -589,7 +595,7 @@ func TestPriorityNonceMempool_NextSenderTx(t *testing.T) {
|
||||
accA := accounts[0].Address
|
||||
accB := accounts[1].Address
|
||||
|
||||
mp := mempool.NewPriorityMempool()
|
||||
mp := mempool.DefaultPriorityMempool()
|
||||
|
||||
txs := []testTx{
|
||||
{priority: 20, nonce: 1, address: accA},
|
||||
@ -633,13 +639,19 @@ func TestNextSenderTx_TxLimit(t *testing.T) {
|
||||
}
|
||||
|
||||
// unlimited
|
||||
mp := mempool.NewPriorityMempool(mempool.PriorityNonceWithMaxTx(0))
|
||||
mp := mempool.NewPriorityMempool(
|
||||
mempool.PriorityNonceMempoolConfig[int64]{
|
||||
TxPriority: mempool.NewDefaultTxPriority(),
|
||||
MaxTx: 0,
|
||||
},
|
||||
)
|
||||
for i, tx := range txs {
|
||||
c := ctx.WithPriority(tx.priority)
|
||||
require.NoError(t, mp.Insert(c, tx))
|
||||
require.Equal(t, i+1, mp.CountTx())
|
||||
}
|
||||
mp = mempool.NewPriorityMempool()
|
||||
|
||||
mp = mempool.DefaultPriorityMempool()
|
||||
for i, tx := range txs {
|
||||
c := ctx.WithPriority(tx.priority)
|
||||
require.NoError(t, mp.Insert(c, tx))
|
||||
@ -647,7 +659,12 @@ func TestNextSenderTx_TxLimit(t *testing.T) {
|
||||
}
|
||||
|
||||
// limit: 3
|
||||
mp = mempool.NewPriorityMempool(mempool.PriorityNonceWithMaxTx(3))
|
||||
mp = mempool.NewPriorityMempool(
|
||||
mempool.PriorityNonceMempoolConfig[int64]{
|
||||
TxPriority: mempool.NewDefaultTxPriority(),
|
||||
MaxTx: 3,
|
||||
},
|
||||
)
|
||||
for i, tx := range txs {
|
||||
c := ctx.WithPriority(tx.priority)
|
||||
err := mp.Insert(c, tx)
|
||||
@ -661,7 +678,12 @@ func TestNextSenderTx_TxLimit(t *testing.T) {
|
||||
}
|
||||
|
||||
// disabled
|
||||
mp = mempool.NewPriorityMempool(mempool.PriorityNonceWithMaxTx(-1))
|
||||
mp = mempool.NewPriorityMempool(
|
||||
mempool.PriorityNonceMempoolConfig[int64]{
|
||||
TxPriority: mempool.NewDefaultTxPriority(),
|
||||
MaxTx: -1,
|
||||
},
|
||||
)
|
||||
for _, tx := range txs {
|
||||
c := ctx.WithPriority(tx.priority)
|
||||
err := mp.Insert(c, tx)
|
||||
@ -683,7 +705,7 @@ func TestNextSenderTx_TxReplacement(t *testing.T) {
|
||||
}
|
||||
|
||||
// test Priority with default mempool
|
||||
mp := mempool.NewPriorityMempool()
|
||||
mp := mempool.DefaultPriorityMempool()
|
||||
for _, tx := range txs {
|
||||
c := ctx.WithPriority(tx.priority)
|
||||
require.NoError(t, mp.Insert(c, tx))
|
||||
@ -697,10 +719,15 @@ func TestNextSenderTx_TxReplacement(t *testing.T) {
|
||||
// we set a TestTxReplacement rule which the priority of the new Tx must be 20% more than the priority of the old Tx
|
||||
// otherwise, the Insert will return error
|
||||
feeBump := 20
|
||||
mp = mempool.NewPriorityMempool(mempool.PriorityNonceWithTxReplacement(func(op, np int64, oTx, nTx sdk.Tx) bool {
|
||||
threshold := int64(100 + feeBump)
|
||||
return np >= op*threshold/100
|
||||
}))
|
||||
mp = mempool.NewPriorityMempool(
|
||||
mempool.PriorityNonceMempoolConfig[int64]{
|
||||
TxPriority: mempool.NewDefaultTxPriority(),
|
||||
TxReplacement: func(op, np int64, oTx, nTx sdk.Tx) bool {
|
||||
threshold := int64(100 + feeBump)
|
||||
return np >= op*threshold/100
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
c := ctx.WithPriority(txs[0].priority)
|
||||
require.NoError(t, mp.Insert(c, txs[0]))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user