perf: Use Caching in Priority Nonce Mempool for Tx Look ups (#520)
* benchmark contains * use sender/nonce when caching * nit * nits * nit
This commit is contained in:
parent
2ae5b146a6
commit
3376dd3aa6
@ -59,7 +59,6 @@ func NewBaseLane(
|
||||
|
||||
lane.LaneMempool = NewMempool(
|
||||
DefaultTxPriority(),
|
||||
lane.cfg.TxEncoder,
|
||||
lane.cfg.SignerExtractor,
|
||||
lane.cfg.MaxTxs,
|
||||
)
|
||||
|
||||
@ -9,7 +9,6 @@ import (
|
||||
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
|
||||
|
||||
signer_extraction "github.com/skip-mev/block-sdk/v2/adapters/signer_extraction_adapter"
|
||||
"github.com/skip-mev/block-sdk/v2/block/utils"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -20,7 +19,7 @@ type (
|
||||
// transactions.
|
||||
Mempool[C comparable] struct {
|
||||
// index defines an index of transactions.
|
||||
index sdkmempool.Mempool
|
||||
index MempoolInterface
|
||||
|
||||
// signerExtractor defines the signer extraction adapter that allows us to
|
||||
// extract the signer from a transaction.
|
||||
@ -31,19 +30,11 @@ type (
|
||||
// of two transactions. The index utilizes this struct to order transactions
|
||||
// in the mempool.
|
||||
txPriority TxPriority[C]
|
||||
|
||||
// txEncoder defines the sdk.Tx encoder that allows us to encode transactions
|
||||
// to bytes.
|
||||
txEncoder sdk.TxEncoder
|
||||
|
||||
// txCache is a map of all transactions in the mempool. It is used
|
||||
// to quickly check if a transaction is already in the mempool.
|
||||
txCache map[string]struct{}
|
||||
}
|
||||
)
|
||||
|
||||
// NewMempool returns a new Mempool.
|
||||
func NewMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder, extractor signer_extraction.Adapter, maxTx int) *Mempool[C] {
|
||||
func NewMempool[C comparable](txPriority TxPriority[C], extractor signer_extraction.Adapter, maxTx int) *Mempool[C] {
|
||||
return &Mempool[C]{
|
||||
index: NewPriorityMempool(
|
||||
PriorityNonceMempoolConfig[C]{
|
||||
@ -54,8 +45,6 @@ func NewMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder,
|
||||
),
|
||||
extractor: extractor,
|
||||
txPriority: txPriority,
|
||||
txEncoder: txEncoder,
|
||||
txCache: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -67,17 +56,9 @@ func (cm *Mempool[C]) Priority(ctx sdk.Context, tx sdk.Tx) any {
|
||||
// Insert inserts a transaction into the mempool.
|
||||
func (cm *Mempool[C]) Insert(ctx context.Context, tx sdk.Tx) error {
|
||||
if err := cm.index.Insert(ctx, tx); err != nil {
|
||||
return fmt.Errorf("failed to insert tx into auction index: %w", err)
|
||||
return fmt.Errorf("failed to insert tx into mempool: %w", err)
|
||||
}
|
||||
|
||||
hash, err := utils.GetTxHash(cm.txEncoder, tx)
|
||||
if err != nil {
|
||||
cm.Remove(tx)
|
||||
return err
|
||||
}
|
||||
|
||||
cm.txCache[hash] = struct{}{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -87,13 +68,6 @@ func (cm *Mempool[C]) Remove(tx sdk.Tx) error {
|
||||
return fmt.Errorf("failed to remove transaction from the mempool: %w", err)
|
||||
}
|
||||
|
||||
hash, err := utils.GetTxHash(cm.txEncoder, tx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get tx hash string: %w", err)
|
||||
}
|
||||
|
||||
delete(cm.txCache, hash)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -112,13 +86,7 @@ func (cm *Mempool[C]) CountTx() int {
|
||||
|
||||
// Contains returns true if the transaction is contained in the mempool.
|
||||
func (cm *Mempool[C]) Contains(tx sdk.Tx) bool {
|
||||
hash, err := utils.GetTxHash(cm.txEncoder, tx)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, ok := cm.txCache[hash]
|
||||
return ok
|
||||
return cm.index.Contains(tx)
|
||||
}
|
||||
|
||||
// Compare determines the relative priority of two transactions belonging in the same lane.
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
package base
|
||||
package base_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
signerextraction "github.com/skip-mev/block-sdk/v2/adapters/signer_extraction_adapter"
|
||||
"github.com/skip-mev/block-sdk/v2/block/base"
|
||||
"github.com/skip-mev/block-sdk/v2/testutils"
|
||||
)
|
||||
|
||||
@ -19,13 +20,49 @@ type txGen struct {
|
||||
amount sdk.Coin
|
||||
}
|
||||
|
||||
var (
|
||||
numAccounts = 10
|
||||
numTxsPerAcct = 100
|
||||
)
|
||||
|
||||
func BenchmarkContains(b *testing.B) {
|
||||
acct := testutils.RandomAccounts(rand.New(rand.NewSource(1)), numAccounts)
|
||||
txc := testutils.CreateTestEncodingConfig().TxConfig
|
||||
|
||||
mp := base.NewMempool(
|
||||
base.DefaultTxPriority(),
|
||||
signerextraction.NewDefaultAdapter(),
|
||||
1000,
|
||||
)
|
||||
|
||||
txs := make([]sdk.Tx, numAccounts*numTxsPerAcct)
|
||||
for i := 0; i < numAccounts; i++ {
|
||||
for j := 0; j < numTxsPerAcct; j++ {
|
||||
tx, err := testutils.CreateTx(txc, acct[i], uint64(j), 0, nil, sdk.NewCoin("stake", sdkmath.NewInt(1)))
|
||||
require.NoError(b, err)
|
||||
err = mp.Insert(sdk.Context{}, tx)
|
||||
require.NoError(b, err)
|
||||
txs[i*numTxsPerAcct+j] = tx
|
||||
}
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, tx := range txs {
|
||||
found := mp.Contains(tx)
|
||||
if !found {
|
||||
b.Fatalf("tx not found in mempool")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMempoolComparison(t *testing.T) {
|
||||
acct := testutils.RandomAccounts(rand.New(rand.NewSource(1)), 2)
|
||||
txc := testutils.CreateTestEncodingConfig().TxConfig
|
||||
ctx := testutils.CreateBaseSDKContext(t)
|
||||
mp := NewMempool(
|
||||
DefaultTxPriority(),
|
||||
txc.TxEncoder(),
|
||||
mp := base.NewMempool(
|
||||
base.DefaultTxPriority(),
|
||||
signerextraction.NewDefaultAdapter(),
|
||||
1000,
|
||||
)
|
||||
@ -99,9 +136,8 @@ func TestMempoolSelect(t *testing.T) {
|
||||
txc := testutils.CreateTestEncodingConfig().TxConfig
|
||||
ctx := testutils.CreateBaseSDKContext(t)
|
||||
se := signerextraction.NewDefaultAdapter()
|
||||
mp := NewMempool(
|
||||
DefaultTxPriority(),
|
||||
txc.TxEncoder(),
|
||||
mp := base.NewMempool(
|
||||
base.DefaultTxPriority(),
|
||||
se,
|
||||
1000,
|
||||
)
|
||||
|
||||
@ -73,7 +73,6 @@ func WithMempoolConfigs[C comparable](cfg LaneConfig, txPriority TxPriority[C])
|
||||
return func(l *BaseLane) {
|
||||
l.LaneMempool = NewMempool(
|
||||
txPriority,
|
||||
cfg.TxEncoder,
|
||||
cfg.SignerExtractor,
|
||||
cfg.MaxTxs,
|
||||
)
|
||||
|
||||
@ -26,11 +26,19 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
_ sdkmempool.Mempool = (*PriorityNonceMempool[int64])(nil)
|
||||
_ MempoolInterface = (*PriorityNonceMempool[int64])(nil)
|
||||
_ sdkmempool.Iterator = (*PriorityNonceIterator[int64])(nil)
|
||||
)
|
||||
|
||||
type (
|
||||
// MempoolInterface defines the interface a mempool should implement.
|
||||
MempoolInterface interface {
|
||||
sdkmempool.Mempool
|
||||
|
||||
// Contains returns true if the transaction is in the mempool.
|
||||
Contains(tx sdk.Tx) bool
|
||||
}
|
||||
|
||||
// PriorityNonceMempoolConfig defines the configuration used to configure the
|
||||
// PriorityNonceMempool.
|
||||
PriorityNonceMempoolConfig[C comparable] struct {
|
||||
@ -462,6 +470,24 @@ func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Contains returns true if the transaction is in the mempool.
|
||||
func (mp *PriorityNonceMempool[C]) Contains(tx sdk.Tx) bool {
|
||||
signers, err := mp.signerExtractor.GetSigners(tx)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if len(signers) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
sig := signers[0]
|
||||
nonce := sig.Sequence
|
||||
sender := sig.Signer.String()
|
||||
|
||||
_, ok := mp.scores[txMeta[C]{nonce: nonce, sender: sender}]
|
||||
return ok
|
||||
}
|
||||
|
||||
func IsEmpty[C comparable](mempool sdkmempool.Mempool) error {
|
||||
mp := mempool.(*PriorityNonceMempool[C])
|
||||
if mp.priorityIndex.Len() != 0 {
|
||||
|
||||
@ -125,7 +125,7 @@ func (s *BaseTestSuite) TestCompareTxPriority() {
|
||||
}
|
||||
|
||||
func (s *BaseTestSuite) TestInsert() {
|
||||
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
|
||||
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)
|
||||
|
||||
s.Run("should be able to insert a transaction", func() {
|
||||
tx, err := testutils.CreateRandomTx(
|
||||
@ -180,7 +180,7 @@ func (s *BaseTestSuite) TestInsert() {
|
||||
}
|
||||
|
||||
func (s *BaseTestSuite) TestRemove() {
|
||||
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
|
||||
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)
|
||||
|
||||
s.Run("should be able to remove a transaction", func() {
|
||||
tx, err := testutils.CreateRandomTx(
|
||||
@ -220,7 +220,7 @@ func (s *BaseTestSuite) TestRemove() {
|
||||
|
||||
func (s *BaseTestSuite) TestSelect() {
|
||||
s.Run("should be able to select transactions in the correct order", func() {
|
||||
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
|
||||
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)
|
||||
|
||||
tx1, err := testutils.CreateRandomTx(
|
||||
s.encodingConfig.TxConfig,
|
||||
@ -261,7 +261,7 @@ func (s *BaseTestSuite) TestSelect() {
|
||||
})
|
||||
|
||||
s.Run("should be able to select a single transaction", func() {
|
||||
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
|
||||
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)
|
||||
|
||||
tx1, err := testutils.CreateRandomTx(
|
||||
s.encodingConfig.TxConfig,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user