core/txpool: move some validation to outside of mutex (#27006)

Currently, most of transaction validation while holding the txpool mutex: one exception being an early-on signature check. 

This PR changes that, so that we do all non-stateful checks before we entering the mutex area. This means they can be performed in parallel, and to enable that, certain fields have been made atomic bools and uint64.
This commit is contained in:
Martin Holst Swende 2023-04-03 07:16:57 -04:00 committed by GitHub
parent a25dd8064e
commit beda6c41ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 51 deletions

View File

@ -23,6 +23,7 @@ import (
"math/big" "math/big"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -250,14 +251,14 @@ type TxPool struct {
signer types.Signer signer types.Signer
mu sync.RWMutex mu sync.RWMutex
istanbul bool // Fork indicator whether we are in the istanbul stage. istanbul atomic.Bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions. eip2718 atomic.Bool // Fork indicator whether we are using EIP-2718 type transactions.
eip1559 bool // Fork indicator whether we are using EIP-1559 type transactions. eip1559 atomic.Bool // Fork indicator whether we are using EIP-1559 type transactions.
shanghai bool // Fork indicator whether we are in the Shanghai stage. shanghai atomic.Bool // Fork indicator whether we are in the Shanghai stage.
currentState *state.StateDB // Current state in the blockchain head currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces pendingNonces *noncer // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps currentMaxGas atomic.Uint64 // Current gas limit for transaction caps
locals *accountSet // Set of local transaction to exempt from eviction rules locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk journal *journal // Journal of local transaction to back up to disk
@ -592,15 +593,17 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
return txs return txs
} }
// validateTx checks whether a transaction is valid according to the consensus // validateTxBasics checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size). // rules, but does not check state-dependent validation such as sufficient balance.
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held.
func (pool *TxPool) validateTxBasics(tx *types.Transaction, local bool) error {
// Accept only legacy transactions until EIP-2718/2930 activates. // Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718 && tx.Type() != types.LegacyTxType { if !pool.eip2718.Load() && tx.Type() != types.LegacyTxType {
return core.ErrTxTypeNotSupported return core.ErrTxTypeNotSupported
} }
// Reject dynamic fee transactions until EIP-1559 activates. // Reject dynamic fee transactions until EIP-1559 activates.
if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType { if !pool.eip1559.Load() && tx.Type() == types.DynamicFeeTxType {
return core.ErrTxTypeNotSupported return core.ErrTxTypeNotSupported
} }
// Reject transactions over defined size to prevent DOS attacks // Reject transactions over defined size to prevent DOS attacks
@ -608,7 +611,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrOversizedData return ErrOversizedData
} }
// Check whether the init code size has been exceeded. // Check whether the init code size has been exceeded.
if pool.shanghai && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize { if pool.shanghai.Load() && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize {
return fmt.Errorf("%w: code size %v limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize) return fmt.Errorf("%w: code size %v limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize)
} }
// Transactions can't be negative. This may never happen using RLP decoded // Transactions can't be negative. This may never happen using RLP decoded
@ -617,7 +620,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrNegativeValue return ErrNegativeValue
} }
// Ensure the transaction doesn't exceed the current block limit gas. // Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas < tx.Gas() { if pool.currentMaxGas.Load() < tx.Gas() {
return ErrGasLimit return ErrGasLimit
} }
// Sanity check for extremely large numbers // Sanity check for extremely large numbers
@ -632,14 +635,29 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return core.ErrTipAboveFeeCap return core.ErrTipAboveFeeCap
} }
// Make sure the transaction is signed properly. // Make sure the transaction is signed properly.
from, err := types.Sender(pool.signer, tx) if _, err := types.Sender(pool.signer, tx); err != nil {
if err != nil {
return ErrInvalidSender return ErrInvalidSender
} }
// Drop non-local transactions under our own minimal accepted gas price or tip // Drop non-local transactions under our own minimal accepted gas price or tip
if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 { if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
return ErrUnderpriced return ErrUnderpriced
} }
// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul.Load(), pool.shanghai.Load())
if err != nil {
return err
}
if tx.Gas() < intrGas {
return core.ErrIntrinsicGas
}
return nil
}
// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Signature has been checked already, this cannot error.
from, _ := types.Sender(pool.signer, tx)
// Ensure the transaction adheres to nonce ordering // Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() { if pool.currentState.GetNonce(from) > tx.Nonce() {
return core.ErrNonceTooLow return core.ErrNonceTooLow
@ -664,15 +682,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrOverdraft return ErrOverdraft
} }
} }
// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul, pool.shanghai)
if err != nil {
return err
}
if tx.Gas() < intrGas {
return core.ErrIntrinsicGas
}
return nil return nil
} }
@ -969,12 +978,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
knownTxMeter.Mark(1) knownTxMeter.Mark(1)
continue continue
} }
// Exclude transactions with invalid signatures as soon as // Exclude transactions with basic errors, e.g invalid signatures and
// possible and cache senders in transactions before // insufficient intrinsic gas as soon as possible and cache senders
// obtaining lock // in transactions before obtaining lock
_, err := types.Sender(pool.signer, tx)
if err != nil { if err := pool.validateTxBasics(tx, local); err != nil {
errs[i] = ErrInvalidSender errs[i] = err
invalidTxMeter.Mark(1) invalidTxMeter.Mark(1)
continue continue
} }
@ -1364,7 +1373,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
} }
pool.currentState = statedb pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb) pool.pendingNonces = newNoncer(statedb)
pool.currentMaxGas = newHead.GasLimit pool.currentMaxGas.Store(newHead.GasLimit)
// Inject any transactions discarded due to reorgs // Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject)) log.Debug("Reinjecting stale transactions", "count", len(reinject))
@ -1373,10 +1382,10 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
// Update all fork indicator by next pending block number. // Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1)) next := new(big.Int).Add(newHead.Number, big.NewInt(1))
pool.istanbul = pool.chainconfig.IsIstanbul(next) pool.istanbul.Store(pool.chainconfig.IsIstanbul(next))
pool.eip2718 = pool.chainconfig.IsBerlin(next) pool.eip2718.Store(pool.chainconfig.IsBerlin(next))
pool.eip1559 = pool.chainconfig.IsLondon(next) pool.eip1559.Store(pool.chainconfig.IsLondon(next))
pool.shanghai = pool.chainconfig.IsShanghai(uint64(time.Now().Unix())) pool.shanghai.Store(pool.chainconfig.IsShanghai(uint64(time.Now().Unix())))
} }
// promoteExecutables moves transactions that have become processable from the // promoteExecutables moves transactions that have become processable from the
@ -1400,7 +1409,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
} }
log.Trace("Removed old queued transactions", "count", len(forwards)) log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas) // Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas.Load())
for _, tx := range drops { for _, tx := range drops {
hash := tx.Hash() hash := tx.Hash()
pool.all.Remove(hash) pool.all.Remove(hash)
@ -1597,7 +1606,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Trace("Removed old pending transaction", "hash", hash) log.Trace("Removed old pending transaction", "hash", hash)
} }
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas.Load())
for _, tx := range drops { for _, tx := range drops {
hash := tx.Hash() hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash) log.Trace("Removed unpayable pending transaction", "hash", hash)

View File

@ -293,28 +293,29 @@ func TestInvalidTransactions(t *testing.T) {
tx := transaction(0, 100, key) tx := transaction(0, 100, key)
from, _ := deriveSender(tx) from, _ := deriveSender(tx)
// Intrinsic gas too low
testAddBalance(pool, from, big.NewInt(1)) testAddBalance(pool, from, big.NewInt(1))
if err := pool.AddRemote(tx); !errors.Is(err, core.ErrInsufficientFunds) { if err, want := pool.AddRemote(tx), core.ErrIntrinsicGas; !errors.Is(err, want) {
t.Error("expected", core.ErrInsufficientFunds) t.Errorf("want %v have %v", want, err)
} }
balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(new(big.Int).SetUint64(tx.Gas()), tx.GasPrice())) // Insufficient funds
testAddBalance(pool, from, balance) tx = transaction(0, 100000, key)
if err := pool.AddRemote(tx); !errors.Is(err, core.ErrIntrinsicGas) { if err, want := pool.AddRemote(tx), core.ErrInsufficientFunds; !errors.Is(err, want) {
t.Error("expected", core.ErrIntrinsicGas, "got", err) t.Errorf("want %v have %v", want, err)
} }
testSetNonce(pool, from, 1) testSetNonce(pool, from, 1)
testAddBalance(pool, from, big.NewInt(0xffffffffffffff)) testAddBalance(pool, from, big.NewInt(0xffffffffffffff))
tx = transaction(0, 100000, key) tx = transaction(0, 100000, key)
if err := pool.AddRemote(tx); !errors.Is(err, core.ErrNonceTooLow) { if err, want := pool.AddRemote(tx), core.ErrNonceTooLow; !errors.Is(err, want) {
t.Error("expected", core.ErrNonceTooLow) t.Errorf("want %v have %v", want, err)
} }
tx = transaction(1, 100000, key) tx = transaction(1, 100000, key)
pool.gasPrice = big.NewInt(1000) pool.gasPrice = big.NewInt(1000)
if err := pool.AddRemote(tx); err != ErrUnderpriced { if err, want := pool.AddRemote(tx), ErrUnderpriced; !errors.Is(err, want) {
t.Error("expected", ErrUnderpriced, "got", err) t.Errorf("want %v have %v", want, err)
} }
if err := pool.AddLocal(tx); err != nil { if err := pool.AddLocal(tx); err != nil {
t.Error("expected", nil, "got", err) t.Error("expected", nil, "got", err)
@ -1217,22 +1218,22 @@ func TestAllowedTxSize(t *testing.T) {
// All those fields are summed up to at most 213 bytes. // All those fields are summed up to at most 213 bytes.
baseSize := uint64(213) baseSize := uint64(213)
dataSize := txMaxSize - baseSize dataSize := txMaxSize - baseSize
maxGas := pool.currentMaxGas.Load()
// Try adding a transaction with maximal allowed size // Try adding a transaction with maximal allowed size
tx := pricedDataTransaction(0, pool.currentMaxGas, big.NewInt(1), key, dataSize) tx := pricedDataTransaction(0, maxGas, big.NewInt(1), key, dataSize)
if err := pool.addRemoteSync(tx); err != nil { if err := pool.addRemoteSync(tx); err != nil {
t.Fatalf("failed to add transaction of size %d, close to maximal: %v", int(tx.Size()), err) t.Fatalf("failed to add transaction of size %d, close to maximal: %v", int(tx.Size()), err)
} }
// Try adding a transaction with random allowed size // Try adding a transaction with random allowed size
if err := pool.addRemoteSync(pricedDataTransaction(1, pool.currentMaxGas, big.NewInt(1), key, uint64(rand.Intn(int(dataSize))))); err != nil { if err := pool.addRemoteSync(pricedDataTransaction(1, maxGas, big.NewInt(1), key, uint64(rand.Intn(int(dataSize))))); err != nil {
t.Fatalf("failed to add transaction of random allowed size: %v", err) t.Fatalf("failed to add transaction of random allowed size: %v", err)
} }
// Try adding a transaction of minimal not allowed size // Try adding a transaction of minimal not allowed size
if err := pool.addRemoteSync(pricedDataTransaction(2, pool.currentMaxGas, big.NewInt(1), key, txMaxSize)); err == nil { if err := pool.addRemoteSync(pricedDataTransaction(2, maxGas, big.NewInt(1), key, txMaxSize)); err == nil {
t.Fatalf("expected rejection on slightly oversize transaction") t.Fatalf("expected rejection on slightly oversize transaction")
} }
// Try adding a transaction of random not allowed size // Try adding a transaction of random not allowed size
if err := pool.addRemoteSync(pricedDataTransaction(2, pool.currentMaxGas, big.NewInt(1), key, dataSize+1+uint64(rand.Intn(10*txMaxSize)))); err == nil { if err := pool.addRemoteSync(pricedDataTransaction(2, maxGas, big.NewInt(1), key, dataSize+1+uint64(rand.Intn(10*txMaxSize)))); err == nil {
t.Fatalf("expected rejection on oversize transaction") t.Fatalf("expected rejection on oversize transaction")
} }
// Run some sanity checks on the pool internals // Run some sanity checks on the pool internals