core/txpool, eth, miner: pre-filter dynamic fees during pending tx retrieval (#29005)

* core/txpool, eth, miner: pre-filter dynamic fees during pending tx retrieval

* miner: fix typo

* core/txpool: handle init-error in blobpool without panicing

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
Péter Szilágyi 2024-02-17 13:37:14 +02:00 committed by GitHub
parent 95741b1844
commit 593e303485
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 79 additions and 25 deletions

View File

@ -436,8 +436,10 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.Addres
// Close closes down the underlying persistent store. // Close closes down the underlying persistent store.
func (p *BlobPool) Close() error { func (p *BlobPool) Close() error {
var errs []error var errs []error
if err := p.limbo.Close(); err != nil { if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set
errs = append(errs, err) if err := p.limbo.Close(); err != nil {
errs = append(errs, err)
}
} }
if err := p.store.Close(); err != nil { if err := p.store.Close(); err != nil {
errs = append(errs, err) errs = append(errs, err)
@ -1441,7 +1443,10 @@ func (p *BlobPool) drop() {
// Pending retrieves all currently processable transactions, grouped by origin // Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. // account and sorted by nonce.
func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction { //
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
// Track the amount of time waiting to retrieve the list of pending blob txs // Track the amount of time waiting to retrieve the list of pending blob txs
// from the pool and the amount of time actually spent on assembling the data. // from the pool and the amount of time actually spent on assembling the data.
// The latter will be pretty much moot, but we've kept it to have symmetric // The latter will be pretty much moot, but we've kept it to have symmetric
@ -1459,6 +1464,25 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr
for addr, txs := range p.index { for addr, txs := range p.index {
var lazies []*txpool.LazyTransaction var lazies []*txpool.LazyTransaction
for _, tx := range txs { for _, tx := range txs {
// If transaction filtering was requested, discard badly priced ones
if minTip != nil && baseFee != nil {
if tx.execFeeCap.Lt(baseFee) {
break // basefee too low, cannot be included, discard rest of txs from the account
}
tip := new(uint256.Int).Sub(tx.execFeeCap, baseFee)
if tip.Gt(tx.execTipCap) {
tip = tx.execTipCap
}
if tip.Lt(minTip) {
break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account
}
}
if blobFee != nil {
if tx.blobFeeCap.Lt(blobFee) {
break // blobfee too low, cannot be included, discard rest of txs from the account
}
}
// Transaction was accepted according to the filter, append to the pending list
lazies = append(lazies, &txpool.LazyTransaction{ lazies = append(lazies, &txpool.LazyTransaction{
Pool: p, Pool: p,
Hash: tx.hash, Hash: tx.hash,

View File

@ -518,24 +518,34 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
} }
// Pending retrieves all currently processable transactions, grouped by origin // Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be // account and sorted by nonce.
// freely modified by calling code.
// //
// The enforceTips parameter can be used to do an extra filtering on the pending // The transactions can also be pre-filtered by the dynamic fee components to
// transactions and only return those whose **effective** tip is large enough in // reduce allocations and load on downstream subsystems.
// the next pending execution environment. func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() defer pool.mu.Unlock()
// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
)
if minTip != nil {
minTipBig = minTip.ToBig()
}
if baseFee != nil {
baseFeeBig = baseFee.ToBig()
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending { for addr, list := range pool.pending {
txs := list.Flatten() txs := list.Flatten()
// If the miner requests tip enforcement, cap the lists now // If the miner requests tip enforcement, cap the lists now
if enforceTips && !pool.locals.contains(addr) { if minTipBig != nil && !pool.locals.contains(addr) {
for i, tx := range txs { for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(pool.gasTip.Load().ToBig(), pool.priced.urgent.baseFee) < 0 { if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i] txs = txs[:i]
break break
} }

View File

@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/holiman/uint256"
) )
// LazyTransaction contains a small subset of the transaction properties that is // LazyTransaction contains a small subset of the transaction properties that is
@ -114,7 +115,10 @@ type SubPool interface {
// Pending retrieves all currently processable transactions, grouped by origin // Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. // account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*LazyTransaction //
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction
// SubscribeTransactions subscribes to new transaction events. The subscriber // SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions // can decide whether to receive notifications only for newly seen transactions

View File

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
) )
// TxStatus is the current status of a transaction as seen by the pool. // TxStatus is the current status of a transaction as seen by the pool.
@ -353,10 +354,13 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
// Pending retrieves all currently processable transactions, grouped by origin // Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. // account and sorted by nonce.
func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction { //
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *TxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction {
txs := make(map[common.Address][]*LazyTransaction) txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools { for _, subpool := range p.subpools {
for addr, set := range subpool.Pending(enforceTips) { for addr, set := range subpool.Pending(minTip, baseFee, blobFee) {
txs[addr] = set txs[addr] = set
} }
} }

View File

@ -292,7 +292,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
} }
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(false) pending := b.eth.txPool.Pending(nil, nil, nil)
var txs types.Transactions var txs types.Transactions
for _, batch := range pending { for _, batch := range pending {
for _, lazy := range batch { for _, lazy := range batch {

View File

@ -263,7 +263,7 @@ func (c *SimulatedBeacon) Rollback() {
// Fork sets the head to the provided hash. // Fork sets the head to the provided hash.
func (c *SimulatedBeacon) Fork(parentHash common.Hash) error { func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
if len(c.eth.TxPool().Pending(false)) != 0 { if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
return errors.New("pending block dirty") return errors.New("pending block dirty")
} }
parent := c.eth.BlockChain().GetBlockByHash(parentHash) parent := c.eth.BlockChain().GetBlockByHash(parentHash)
@ -275,7 +275,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
// AdjustTime creates a new block with an adjusted timestamp. // AdjustTime creates a new block with an adjusted timestamp.
func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error { func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if len(c.eth.TxPool().Pending(false)) != 0 { if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
return errors.New("could not adjust time on non-empty block") return errors.New("could not adjust time on non-empty block")
} }
parent := c.eth.BlockChain().CurrentBlock() parent := c.eth.BlockChain().CurrentBlock()

View File

@ -42,6 +42,7 @@ import (
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/triedb/pathdb" "github.com/ethereum/go-ethereum/triedb/pathdb"
"github.com/holiman/uint256"
) )
const ( const (
@ -73,7 +74,7 @@ type txPool interface {
// Pending should return pending transactions. // Pending should return pending transactions.
// The slice should be modifiable by the caller. // The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction
// SubscribeTransactions subscribes to new transaction events. The subscriber // SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions // can decide whether to receive notifications only for newly seen transactions

View File

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
) )
var ( var (
@ -92,7 +93,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []erro
} }
// Pending returns all the transactions known to the pool // Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction { func (p *testTxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()

View File

@ -36,7 +36,7 @@ const (
// syncTransactions starts sending all currently pending transactions to the given peer. // syncTransactions starts sending all currently pending transactions to the given peer.
func (h *handler) syncTransactions(p *eth.Peer) { func (h *handler) syncTransactions(p *eth.Peer) {
var hashes []common.Hash var hashes []common.Hash
for _, batch := range h.txpool.Pending(false) { for _, batch := range h.txpool.Pending(nil, nil, nil) {
for _, tx := range batch { for _, tx := range batch {
hashes = append(hashes, tx.Hash) hashes = append(hashes, tx.Hash)
} }

View File

@ -37,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
) )
const ( const (
@ -999,7 +1000,20 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// into the given sealing block. The transaction selection and ordering strategy can // into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future. // be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error { func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error {
pending := w.eth.TxPool().Pending(true) w.mu.RLock()
tip := w.tip
w.mu.RUnlock()
// Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees
var baseFee *uint256.Int
if env.header.BaseFee != nil {
baseFee = uint256.MustFromBig(env.header.BaseFee)
}
var blobFee *uint256.Int
if env.header.ExcessBlobGas != nil {
blobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
}
pending := w.eth.TxPool().Pending(uint256.MustFromBig(tip), baseFee, blobFee)
// Split the pending transactions into locals and remotes. // Split the pending transactions into locals and remotes.
localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending
@ -1011,10 +1025,6 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
} }
// Fill the block with all available pending transactions. // Fill the block with all available pending transactions.
w.mu.RLock()
tip := w.tip
w.mu.RUnlock()
if len(localTxs) > 0 { if len(localTxs) > 0 {
txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt, new(big.Int)); err != nil { if err := w.commitTransactions(env, txs, interrupt, new(big.Int)); err != nil {