From f4852b8ddc8bef962d34210a4f7774b95767e421 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 20 Feb 2024 11:37:23 +0200 Subject: [PATCH] core/txpool, eth, miner: retrieve plain and blob txs separately (#29026) * core/txpool, eth, miner: retrieve plain and blob txs separately * core/txpool: fix typo, no farming * miner: farm all the typos Co-authored-by: Martin HS --------- Co-authored-by: Martin HS --- core/txpool/blobpool/blobpool.go | 19 +++--- core/txpool/blobpool/blobpool_test.go | 6 +- core/txpool/legacypool/legacypool.go | 16 +++-- core/txpool/subpool.go | 17 +++++- core/txpool/txpool.go | 5 +- eth/api_backend.go | 2 +- eth/catalyst/simulated_beacon.go | 5 +- eth/handler.go | 3 +- eth/handler_test.go | 2 +- eth/sync.go | 3 +- miner/ordering.go | 11 ++++ miner/worker.go | 86 +++++++++++++++++++-------- 12 files changed, 125 insertions(+), 50 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 0ab382001..d1fe7a606 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1446,7 +1446,12 @@ func (p *BlobPool) drop() { // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. -func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction { +func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { + // If only plain transactions are requested, this pool is unsuitable as it + // contains none, don't even bother. + if filter.OnlyPlainTxs { + return nil + } // Track the amount of time waiting to retrieve the list of pending blob txs // from the pool and the amount of time actually spent on assembling the data. // The latter will be pretty much moot, but we've kept it to have symmetric @@ -1466,20 +1471,20 @@ func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *u lazies := make([]*txpool.LazyTransaction, 0, len(txs)) for _, tx := range txs { // If transaction filtering was requested, discard badly priced ones - if minTip != nil && baseFee != nil { - if tx.execFeeCap.Lt(baseFee) { + if filter.MinTip != nil && filter.BaseFee != nil { + if tx.execFeeCap.Lt(filter.BaseFee) { break // basefee too low, cannot be included, discard rest of txs from the account } - tip := new(uint256.Int).Sub(tx.execFeeCap, baseFee) + tip := new(uint256.Int).Sub(tx.execFeeCap, filter.BaseFee) if tip.Gt(tx.execTipCap) { tip = tx.execTipCap } - if tip.Lt(minTip) { + if tip.Lt(filter.MinTip) { break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account } } - if blobFee != nil { - if tx.blobFeeCap.Lt(blobFee) { + if filter.BlobFee != nil { + if tx.blobFeeCap.Lt(filter.BlobFee) { break // blobfee too low, cannot be included, discard rest of txs from the account } } diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 4cec78b57..579d42a2d 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -1340,7 +1340,11 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { b.ReportAllocs() for i := 0; i < b.N; i++ { - p := pool.Pending(uint256.NewInt(1), chain.basefee, chain.blobfee) + p := pool.Pending(txpool.PendingFilter{ + MinTip: uint256.NewInt(1), + BaseFee: chain.basefee, + BlobFee: chain.blobfee, + }) if len(p) != int(capacity) { b.Fatalf("have %d want %d", len(p), capacity) } diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 0d1b3139c..8e7095f29 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -522,7 +522,12 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. -func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction { +func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { + // If only blob transactions are requested, this pool is unsuitable as it + // contains none, don't even bother. + if filter.OnlyBlobTxs { + return nil + } pool.mu.Lock() defer pool.mu.Unlock() @@ -531,13 +536,12 @@ func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobF minTipBig *big.Int baseFeeBig *big.Int ) - if minTip != nil { - minTipBig = minTip.ToBig() + if filter.MinTip != nil { + minTipBig = filter.MinTip.ToBig() } - if baseFee != nil { - baseFeeBig = baseFee.ToBig() + if filter.BaseFee != nil { + baseFeeBig = filter.BaseFee.ToBig() } - pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) for addr, list := range pool.pending { txs := list.Flatten() diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index edd15ec1e..9881ed1b8 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -70,6 +70,21 @@ type LazyResolver interface { // may request (and relinquish) exclusive access to certain addresses. type AddressReserver func(addr common.Address, reserve bool) error +// PendingFilter is a collection of filter rules to allow retrieving a subset +// of transactions for announcement or mining. +// +// Note, the entries here are not arbitrary useful filters, rather each one has +// a very specific call site in mind and each one can be evaluated very cheaply +// by the pool implementations. Only add new ones that satisfy those constraints. +type PendingFilter struct { + MinTip *uint256.Int // Minimum miner tip required to include a transaction + BaseFee *uint256.Int // Minimum 1559 basefee needed to include a transaction + BlobFee *uint256.Int // Minimum 4844 blobfee needed to include a blob transaction + + OnlyPlainTxs bool // Return only plain EVM transactions (peer-join announces, block space filling) + OnlyBlobTxs bool // Return only blob transactions (block blob-space filling) +} + // SubPool represents a specialized transaction pool that lives on its own (e.g. // blob pool). Since independent of how many specialized pools we have, they do // need to be updated in lockstep and assemble into one coherent view for block @@ -118,7 +133,7 @@ type SubPool interface { // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. - Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction + Pending(filter PendingFilter) map[common.Address][]*LazyTransaction // SubscribeTransactions subscribes to new transaction events. The subscriber // can decide whether to receive notifications only for newly seen transactions diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 3d0d6bf61..8bf3e0a51 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/holiman/uint256" ) // TxStatus is the current status of a transaction as seen by the pool. @@ -357,10 +356,10 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. -func (p *TxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction { +func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction { txs := make(map[common.Address][]*LazyTransaction) for _, subpool := range p.subpools { - for addr, set := range subpool.Pending(minTip, baseFee, blobFee) { + for addr, set := range subpool.Pending(filter) { txs[addr] = set } } diff --git a/eth/api_backend.go b/eth/api_backend.go index c24fa3139..65adccd85 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -292,7 +292,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) } func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { - pending := b.eth.txPool.Pending(nil, nil, nil) + pending := b.eth.txPool.Pending(txpool.PendingFilter{}) var txs types.Transactions for _, batch := range pending { for _, lazy := range batch { diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index 91ac1771d..f1c5689e1 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/log" @@ -263,7 +264,7 @@ func (c *SimulatedBeacon) Rollback() { // Fork sets the head to the provided hash. func (c *SimulatedBeacon) Fork(parentHash common.Hash) error { - if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 { + if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 { return errors.New("pending block dirty") } parent := c.eth.BlockChain().GetBlockByHash(parentHash) @@ -275,7 +276,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error { // AdjustTime creates a new block with an adjusted timestamp. func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error { - if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 { + if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 { return errors.New("could not adjust time on non-empty block") } parent := c.eth.BlockChain().CurrentBlock() diff --git a/eth/handler.go b/eth/handler.go index b2fef62ea..0343a5787 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -42,7 +42,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/triedb/pathdb" - "github.com/holiman/uint256" ) const ( @@ -74,7 +73,7 @@ type txPool interface { // Pending should return pending transactions. // The slice should be modifiable by the caller. - Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction + Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction // SubscribeTransactions subscribes to new transaction events. The subscriber // can decide whether to receive notifications only for newly seen transactions diff --git a/eth/handler_test.go b/eth/handler_test.go index 0ca665156..58353f6b6 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -93,7 +93,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []erro } // Pending returns all the transactions known to the pool -func (p *testTxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction { +func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { p.lock.RLock() defer p.lock.RUnlock() diff --git a/eth/sync.go b/eth/sync.go index fa3a40880..cdcfbdb3d 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" @@ -36,7 +37,7 @@ const ( // syncTransactions starts sending all currently pending transactions to the given peer. func (h *handler) syncTransactions(p *eth.Peer) { var hashes []common.Hash - for _, batch := range h.txpool.Pending(nil, nil, nil) { + for _, batch := range h.txpool.Pending(txpool.PendingFilter{OnlyPlainTxs: true}) { for _, tx := range batch { hashes = append(hashes, tx.Hash) } diff --git a/miner/ordering.go b/miner/ordering.go index c9ecb512f..bcf7af46e 100644 --- a/miner/ordering.go +++ b/miner/ordering.go @@ -153,3 +153,14 @@ func (t *transactionsByPriceAndNonce) Shift() { func (t *transactionsByPriceAndNonce) Pop() { heap.Pop(&t.heads) } + +// Empty returns if the price heap is empty. It can be used to check it simpler +// than calling peek and checking for nil return. +func (t *transactionsByPriceAndNonce) Empty() bool { + return len(t.heads) == 0 +} + +// Clear removes the entire content of the heap. +func (t *transactionsByPriceAndNonce) Clear() { + t.heads, t.txs = nil, nil +} diff --git a/miner/worker.go b/miner/worker.go index c1726fc64..9a3610623 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -562,9 +562,11 @@ func (w *worker) mainLoop() { BlobGas: tx.BlobGas(), }) } - txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) + plainTxs := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) // Mixed bag of everrything, yolo + blobTxs := newTransactionsByPriceAndNonce(w.current.signer, nil, w.current.header.BaseFee) // Empty bag, don't bother optimising + tcount := w.current.tcount - w.commitTransactions(w.current, txset, nil, new(uint256.Int)) + w.commitTransactions(w.current, plainTxs, blobTxs, nil) // Only update the snapshot if any new transactions were added // to the pending block @@ -802,7 +804,7 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*typ return receipt, err } -func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *uint256.Int) error { +func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -821,8 +823,33 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) break } + // If we don't have enough blob space for any further blob transactions, + // skip that list altogether + if !blobTxs.Empty() && env.blobs*params.BlobTxBlobGasPerBlob >= params.MaxBlobGasPerBlock { + log.Trace("Not enough blob space for further blob transactions") + blobTxs.Clear() + // Fall though to pick up any plain txs + } // Retrieve the next transaction and abort if all done. - ltx, tip := txs.Peek() + var ( + ltx *txpool.LazyTransaction + txs *transactionsByPriceAndNonce + ) + pltx, ptip := plainTxs.Peek() + bltx, btip := blobTxs.Peek() + + switch { + case pltx == nil: + txs, ltx = blobTxs, bltx + case bltx == nil: + txs, ltx = plainTxs, pltx + default: + if ptip.Lt(btip) { + txs, ltx = blobTxs, bltx + } else { + txs, ltx = plainTxs, pltx + } + } if ltx == nil { break } @@ -837,11 +864,6 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn txs.Pop() continue } - // If we don't receive enough tip for the next transaction, skip the account - if tip.Cmp(minTip) < 0 { - log.Trace("Not enough tip for transaction", "hash", ltx.Hash, "tip", tip, "needed", minTip) - break // If the next-best is too low, surely no better will be available - } // Transaction seems to fit, pull it up from the pool tx := ltx.Resolve() if tx == nil { @@ -1005,35 +1027,49 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err w.mu.RUnlock() // Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees - var baseFee *uint256.Int + filter := txpool.PendingFilter{ + MinTip: tip, + } if env.header.BaseFee != nil { - baseFee = uint256.MustFromBig(env.header.BaseFee) + filter.BaseFee = uint256.MustFromBig(env.header.BaseFee) } - var blobFee *uint256.Int if env.header.ExcessBlobGas != nil { - blobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas)) + filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas)) } - pending := w.eth.TxPool().Pending(tip, baseFee, blobFee) + filter.OnlyPlainTxs, filter.OnlyBlobTxs = true, false + pendingPlainTxs := w.eth.TxPool().Pending(filter) + + filter.OnlyPlainTxs, filter.OnlyBlobTxs = false, true + pendingBlobTxs := w.eth.TxPool().Pending(filter) // Split the pending transactions into locals and remotes. - localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending + localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs + localBlobTxs, remoteBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs + for _, account := range w.eth.TxPool().Locals() { - if txs := remoteTxs[account]; len(txs) > 0 { - delete(remoteTxs, account) - localTxs[account] = txs + if txs := remotePlainTxs[account]; len(txs) > 0 { + delete(remotePlainTxs, account) + localPlainTxs[account] = txs + } + if txs := remoteBlobTxs[account]; len(txs) > 0 { + delete(remoteBlobTxs, account) + localBlobTxs[account] = txs } } - // Fill the block with all available pending transactions. - if len(localTxs) > 0 { - txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if err := w.commitTransactions(env, txs, interrupt, new(uint256.Int)); err != nil { + if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 { + plainTxs := newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee) + blobTxs := newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee) + + if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { return err } } - if len(remoteTxs) > 0 { - txs := newTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if err := w.commitTransactions(env, txs, interrupt, tip); err != nil { + if len(remotePlainTxs) > 0 || len(remoteBlobTxs) > 0 { + plainTxs := newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee) + blobTxs := newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee) + + if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { return err } }