core/txpool, miner: speed up blob pool pending retrievals (#29008)

* core/txpool, miner: speed up blob pool pending retrievals

* miner: fix test merge issue

* eth: same same

* core/txpool/blobpool: speed up blobtx creation in benchmark a bit

* core/txpool/blobpool: fix linter

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
Péter Szilágyi 2024-02-19 15:59:40 +02:00 committed by GitHub
parent 5d984796af
commit 6fb0d0992b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 105 additions and 37 deletions

View File

@ -1456,13 +1456,14 @@ func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *u
pendwaitHist.Update(time.Since(pendStart).Nanoseconds()) pendwaitHist.Update(time.Since(pendStart).Nanoseconds())
defer p.lock.RUnlock() defer p.lock.RUnlock()
defer func(start time.Time) { execStart := time.Now()
pendtimeHist.Update(time.Since(start).Nanoseconds()) defer func() {
}(time.Now()) pendtimeHist.Update(time.Since(execStart).Nanoseconds())
}()
pending := make(map[common.Address][]*txpool.LazyTransaction) pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index))
for addr, txs := range p.index { for addr, txs := range p.index {
var lazies []*txpool.LazyTransaction lazies := make([]*txpool.LazyTransaction, 0, len(txs))
for _, tx := range txs { for _, tx := range txs {
// If transaction filtering was requested, discard badly priced ones // If transaction filtering was requested, discard badly priced ones
if minTip != nil && baseFee != nil { if minTip != nil && baseFee != nil {
@ -1486,9 +1487,9 @@ func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *u
lazies = append(lazies, &txpool.LazyTransaction{ lazies = append(lazies, &txpool.LazyTransaction{
Pool: p, Pool: p,
Hash: tx.hash, Hash: tx.hash,
Time: time.Now(), // TODO(karalabe): Maybe save these and use that? Time: execStart, // TODO(karalabe): Maybe save these and use that?
GasFeeCap: tx.execFeeCap.ToBig(), GasFeeCap: tx.execFeeCap,
GasTipCap: tx.execTipCap.ToBig(), GasTipCap: tx.execTipCap,
Gas: tx.execGas, Gas: tx.execGas,
BlobGas: tx.blobGas, BlobGas: tx.blobGas,
}) })

View File

@ -1288,3 +1288,61 @@ func TestAdd(t *testing.T) {
pool.Close() pool.Close()
} }
} }
// Benchmarks the time it takes to assemble the lazy pending transaction list
// from the pool contents.
func BenchmarkPoolPending100Mb(b *testing.B) { benchmarkPoolPending(b, 100_000_000) }
func BenchmarkPoolPending1GB(b *testing.B) { benchmarkPoolPending(b, 1_000_000_000) }
func BenchmarkPoolPending10GB(b *testing.B) { benchmarkPoolPending(b, 10_000_000_000) }
func benchmarkPoolPending(b *testing.B, datacap uint64) {
// Calculate the maximum number of transaction that would fit into the pool
// and generate a set of random accounts to seed them with.
capacity := datacap / params.BlobTxBlobGasPerBlob
var (
basefee = uint64(1050)
blobfee = uint64(105)
signer = types.LatestSigner(testChainConfig)
statedb, _ = state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewDatabase(memorydb.New())), nil)
chain = &testBlockChain{
config: testChainConfig,
basefee: uint256.NewInt(basefee),
blobfee: uint256.NewInt(blobfee),
statedb: statedb,
}
pool = New(Config{Datadir: ""}, chain)
)
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
b.Fatalf("failed to create blob pool: %v", err)
}
// Fill the pool up with one random transaction from each account with the
// same price and everything to maximize the worst case scenario
for i := 0; i < int(capacity); i++ {
blobtx := makeUnsignedTx(0, 10, basefee+10, blobfee)
blobtx.R = uint256.NewInt(1)
blobtx.S = uint256.NewInt(uint64(100 + i))
blobtx.V = uint256.NewInt(0)
tx := types.NewTx(blobtx)
addr, err := types.Sender(signer, tx)
if err != nil {
b.Fatal(err)
}
statedb.AddBalance(addr, uint256.NewInt(1_000_000_000))
pool.add(tx)
}
statedb.Commit(0, true)
defer pool.Close()
// Benchmark assembling the pending
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p := pool.Pending(uint256.NewInt(1), chain.basefee, chain.blobfee)
if len(p) != int(capacity) {
b.Fatalf("have %d want %d", len(p), capacity)
}
}
}

View File

@ -559,8 +559,8 @@ func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobF
Hash: txs[i].Hash(), Hash: txs[i].Hash(),
Tx: txs[i], Tx: txs[i],
Time: txs[i].Time(), Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(), GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: txs[i].GasTipCap(), GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(), Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(), BlobGas: txs[i].BlobGas(),
} }

View File

@ -35,9 +35,9 @@ type LazyTransaction struct {
Hash common.Hash // Transaction hash to pull up if needed Hash common.Hash // Transaction hash to pull up if needed
Tx *types.Transaction // Transaction if already resolved Tx *types.Transaction // Transaction if already resolved
Time time.Time // Time when the transaction was first seen Time time.Time // Time when the transaction was first seen
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume GasFeeCap *uint256.Int // Maximum fee per gas the transaction may consume
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay GasTipCap *uint256.Int // Maximum miner tip per gas the transaction can pay
Gas uint64 // Amount of gas required by the transaction Gas uint64 // Amount of gas required by the transaction
BlobGas uint64 // Amount of blob gas required by the transaction BlobGas uint64 // Amount of blob gas required by the transaction

View File

@ -112,8 +112,8 @@ func (p *testTxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee
Hash: tx.Hash(), Hash: tx.Hash(),
Tx: tx, Tx: tx,
Time: tx.Time(), Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(), GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: tx.GasTipCap(), GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(), Gas: tx.Gas(),
BlobGas: tx.BlobGas(), BlobGas: tx.BlobGas(),
}) })

View File

@ -21,28 +21,31 @@ import (
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256"
) )
// txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap // txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap
type txWithMinerFee struct { type txWithMinerFee struct {
tx *txpool.LazyTransaction tx *txpool.LazyTransaction
from common.Address from common.Address
fees *big.Int fees *uint256.Int
} }
// newTxWithMinerFee creates a wrapped transaction, calculating the effective // newTxWithMinerFee creates a wrapped transaction, calculating the effective
// miner gasTipCap if a base fee is provided. // miner gasTipCap if a base fee is provided.
// Returns error in case of a negative effective miner gasTipCap. // Returns error in case of a negative effective miner gasTipCap.
func newTxWithMinerFee(tx *txpool.LazyTransaction, from common.Address, baseFee *big.Int) (*txWithMinerFee, error) { func newTxWithMinerFee(tx *txpool.LazyTransaction, from common.Address, baseFee *uint256.Int) (*txWithMinerFee, error) {
tip := new(big.Int).Set(tx.GasTipCap) tip := new(uint256.Int).Set(tx.GasTipCap)
if baseFee != nil { if baseFee != nil {
if tx.GasFeeCap.Cmp(baseFee) < 0 { if tx.GasFeeCap.Cmp(baseFee) < 0 {
return nil, types.ErrGasFeeCapTooLow return nil, types.ErrGasFeeCapTooLow
} }
tip = math.BigMin(tx.GasTipCap, new(big.Int).Sub(tx.GasFeeCap, baseFee)) tip = new(uint256.Int).Sub(tx.GasFeeCap, baseFee)
if tip.Gt(tx.GasTipCap) {
tip = tx.GasTipCap
}
} }
return &txWithMinerFee{ return &txWithMinerFee{
tx: tx, tx: tx,
@ -87,7 +90,7 @@ type transactionsByPriceAndNonce struct {
txs map[common.Address][]*txpool.LazyTransaction // Per account nonce-sorted list of transactions txs map[common.Address][]*txpool.LazyTransaction // Per account nonce-sorted list of transactions
heads txByPriceAndTime // Next transaction for each unique account (price heap) heads txByPriceAndTime // Next transaction for each unique account (price heap)
signer types.Signer // Signer for the set of transactions signer types.Signer // Signer for the set of transactions
baseFee *big.Int // Current base fee baseFee *uint256.Int // Current base fee
} }
// newTransactionsByPriceAndNonce creates a transaction set that can retrieve // newTransactionsByPriceAndNonce creates a transaction set that can retrieve
@ -96,10 +99,15 @@ type transactionsByPriceAndNonce struct {
// Note, the input map is reowned so the caller should not interact any more with // Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor. // if after providing it to the constructor.
func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, baseFee *big.Int) *transactionsByPriceAndNonce { func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, baseFee *big.Int) *transactionsByPriceAndNonce {
// Convert the basefee from header format to uint256 format
var baseFeeUint *uint256.Int
if baseFee != nil {
baseFeeUint = uint256.MustFromBig(baseFee)
}
// Initialize a price and received time based heap with the head transactions // Initialize a price and received time based heap with the head transactions
heads := make(txByPriceAndTime, 0, len(txs)) heads := make(txByPriceAndTime, 0, len(txs))
for from, accTxs := range txs { for from, accTxs := range txs {
wrapped, err := newTxWithMinerFee(accTxs[0], from, baseFee) wrapped, err := newTxWithMinerFee(accTxs[0], from, baseFeeUint)
if err != nil { if err != nil {
delete(txs, from) delete(txs, from)
continue continue
@ -114,12 +122,12 @@ func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address]
txs: txs, txs: txs,
heads: heads, heads: heads,
signer: signer, signer: signer,
baseFee: baseFee, baseFee: baseFeeUint,
} }
} }
// Peek returns the next transaction by price. // Peek returns the next transaction by price.
func (t *transactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *big.Int) { func (t *transactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *uint256.Int) {
if len(t.heads) == 0 { if len(t.heads) == 0 {
return nil, nil return nil, nil
} }

View File

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/holiman/uint256"
) )
func TestTransactionPriceNonceSortLegacy(t *testing.T) { func TestTransactionPriceNonceSortLegacy(t *testing.T) {
@ -92,8 +93,8 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) {
Hash: tx.Hash(), Hash: tx.Hash(),
Tx: tx, Tx: tx,
Time: tx.Time(), Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(), GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: tx.GasTipCap(), GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(), Gas: tx.Gas(),
BlobGas: tx.BlobGas(), BlobGas: tx.BlobGas(),
}) })
@ -160,8 +161,8 @@ func TestTransactionTimeSort(t *testing.T) {
Hash: tx.Hash(), Hash: tx.Hash(),
Tx: tx, Tx: tx,
Time: tx.Time(), Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(), GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: tx.GasTipCap(), GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(), Gas: tx.Gas(),
BlobGas: tx.BlobGas(), BlobGas: tx.BlobGas(),
}) })

View File

@ -206,7 +206,7 @@ type worker struct {
mu sync.RWMutex // The lock used to protect the coinbase and extra fields mu sync.RWMutex // The lock used to protect the coinbase and extra fields
coinbase common.Address coinbase common.Address
extra []byte extra []byte
tip *big.Int // Minimum tip needed for non-local transaction to include them tip *uint256.Int // Minimum tip needed for non-local transaction to include them
pendingMu sync.RWMutex pendingMu sync.RWMutex
pendingTasks map[common.Hash]*task pendingTasks map[common.Hash]*task
@ -253,7 +253,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
isLocalBlock: isLocalBlock, isLocalBlock: isLocalBlock,
coinbase: config.Etherbase, coinbase: config.Etherbase,
extra: config.ExtraData, extra: config.ExtraData,
tip: config.GasPrice, tip: uint256.MustFromBig(config.GasPrice),
pendingTasks: make(map[common.Hash]*task), pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize), txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
@ -334,7 +334,7 @@ func (w *worker) setExtra(extra []byte) {
func (w *worker) setGasTip(tip *big.Int) { func (w *worker) setGasTip(tip *big.Int) {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
w.tip = tip w.tip = uint256.MustFromBig(tip)
} }
// setRecommitInterval updates the interval for miner sealing work recommitting. // setRecommitInterval updates the interval for miner sealing work recommitting.
@ -556,15 +556,15 @@ func (w *worker) mainLoop() {
Hash: tx.Hash(), Hash: tx.Hash(),
Tx: nil, // Do *not* set this! We need to resolve it later to pull blobs in Tx: nil, // Do *not* set this! We need to resolve it later to pull blobs in
Time: tx.Time(), Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(), GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: tx.GasTipCap(), GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(), Gas: tx.Gas(),
BlobGas: tx.BlobGas(), BlobGas: tx.BlobGas(),
}) })
} }
txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil, new(big.Int)) w.commitTransactions(w.current, txset, nil, new(uint256.Int))
// Only update the snapshot if any new transactions were added // Only update the snapshot if any new transactions were added
// to the pending block // to the pending block
@ -802,7 +802,7 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*typ
return receipt, err return receipt, err
} }
func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *big.Int) error { func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *uint256.Int) error {
gasLimit := env.header.GasLimit gasLimit := env.header.GasLimit
if env.gasPool == nil { if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit) env.gasPool = new(core.GasPool).AddGas(gasLimit)
@ -1013,7 +1013,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
if env.header.ExcessBlobGas != nil { if env.header.ExcessBlobGas != nil {
blobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas)) blobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
} }
pending := w.eth.TxPool().Pending(uint256.MustFromBig(tip), baseFee, blobFee) pending := w.eth.TxPool().Pending(tip, baseFee, blobFee)
// Split the pending transactions into locals and remotes. // Split the pending transactions into locals and remotes.
localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending
@ -1027,7 +1027,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
// Fill the block with all available pending transactions. // Fill the block with all available pending transactions.
if len(localTxs) > 0 { if len(localTxs) > 0 {
txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt, new(big.Int)); err != nil { if err := w.commitTransactions(env, txs, interrupt, new(uint256.Int)); err != nil {
return err return err
} }
} }