60c062e17d
* core: move TxPool reorg and events to background goroutine This change moves internal queue re-shuffling work in TxPool to a background goroutine, TxPool.runReorg. Requests to execute runReorg are accumulated by the new scheduleReorgLoop. The new loop also accumulates transaction events. The motivation for this change is making sends to txFeed synchronous instead of sending them in one-off goroutines launched by 'add' and 'promoteExecutables'. If a downstream consumer of txFeed is blocked for a while, reorg requests and events will queue up. * core: remove homestead check in TxPool This change removes tracking of the homestead block number from TxPool. The homestead field was used to enforce minimum gas of 53000 for contract creations after the homestead fork, but not before it. Since nobody would want configure a non-homestead chain nowadays and contract creations usually take more than 53000 gas, the extra correctness is redundant and can be removed. * core: fixes for review comments * core: remove BenchmarkPoolInsert This is useless now because there is no separate code path for individual transactions anymore. * core: fix pending counter metric * core: fix pool tests * core: dedup txpool announced events, discard stales * core: reorg tx promotion/demotion to avoid weird pending gaps
1523 lines
51 KiB
Go
1523 lines
51 KiB
Go
// Copyright 2014 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package core
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/big"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/common/prque"
|
|
"github.com/ethereum/go-ethereum/core/state"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/params"
|
|
)
|
|
|
|
const (
|
|
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
|
|
chainHeadChanSize = 10
|
|
)
|
|
|
|
var (
|
|
// ErrInvalidSender is returned if the transaction contains an invalid signature.
|
|
ErrInvalidSender = errors.New("invalid sender")
|
|
|
|
// ErrNonceTooLow is returned if the nonce of a transaction is lower than the
|
|
// one present in the local chain.
|
|
ErrNonceTooLow = errors.New("nonce too low")
|
|
|
|
// ErrUnderpriced is returned if a transaction's gas price is below the minimum
|
|
// configured for the transaction pool.
|
|
ErrUnderpriced = errors.New("transaction underpriced")
|
|
|
|
// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
|
|
// with a different one without the required price bump.
|
|
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
|
|
|
|
// ErrInsufficientFunds is returned if the total cost of executing a transaction
|
|
// is higher than the balance of the user's account.
|
|
ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
|
|
|
|
// ErrIntrinsicGas is returned if the transaction is specified to use less gas
|
|
// than required to start the invocation.
|
|
ErrIntrinsicGas = errors.New("intrinsic gas too low")
|
|
|
|
// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
|
|
// maximum allowance of the current block.
|
|
ErrGasLimit = errors.New("exceeds block gas limit")
|
|
|
|
// ErrNegativeValue is a sanity error to ensure noone is able to specify a
|
|
// transaction with a negative value.
|
|
ErrNegativeValue = errors.New("negative value")
|
|
|
|
// ErrOversizedData is returned if the input data of a transaction is greater
|
|
// than some meaningful limit a user might use. This is not a consensus error
|
|
// making the transaction invalid, rather a DOS protection.
|
|
ErrOversizedData = errors.New("oversized data")
|
|
)
|
|
|
|
var (
|
|
evictionInterval = time.Minute // Time interval to check for evictable transactions
|
|
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
|
|
)
|
|
|
|
var (
|
|
// Metrics for the pending pool
|
|
pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil)
|
|
pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
|
|
pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
|
|
pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
|
|
|
|
// Metrics for the queued pool
|
|
queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
|
|
queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
|
|
queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
|
|
queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
|
|
|
|
// General tx metrics
|
|
validMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
|
|
invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
|
|
underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
|
|
|
|
pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil)
|
|
queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil)
|
|
localCounter = metrics.NewRegisteredCounter("txpool/local", nil)
|
|
)
|
|
|
|
// TxStatus is the current status of a transaction as seen by the pool.
|
|
type TxStatus uint
|
|
|
|
const (
|
|
TxStatusUnknown TxStatus = iota
|
|
TxStatusQueued
|
|
TxStatusPending
|
|
TxStatusIncluded
|
|
)
|
|
|
|
// blockChain provides the state of blockchain and current gas limit to do
|
|
// some pre checks in tx pool and event subscribers.
|
|
type blockChain interface {
|
|
CurrentBlock() *types.Block
|
|
GetBlock(hash common.Hash, number uint64) *types.Block
|
|
StateAt(root common.Hash) (*state.StateDB, error)
|
|
|
|
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
|
|
}
|
|
|
|
// TxPoolConfig are the configuration parameters of the transaction pool.
|
|
type TxPoolConfig struct {
|
|
Locals []common.Address // Addresses that should be treated by default as local
|
|
NoLocals bool // Whether local transaction handling should be disabled
|
|
Journal string // Journal of local transactions to survive node restarts
|
|
Rejournal time.Duration // Time interval to regenerate the local transaction journal
|
|
|
|
PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
|
|
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
|
|
|
|
AccountSlots uint64 // Number of executable transaction slots guaranteed per account
|
|
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
|
|
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
|
|
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
|
|
|
|
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
|
|
}
|
|
|
|
// DefaultTxPoolConfig contains the default configurations for the transaction
|
|
// pool.
|
|
var DefaultTxPoolConfig = TxPoolConfig{
|
|
Journal: "transactions.rlp",
|
|
Rejournal: time.Hour,
|
|
|
|
PriceLimit: 1,
|
|
PriceBump: 10,
|
|
|
|
AccountSlots: 16,
|
|
GlobalSlots: 4096,
|
|
AccountQueue: 64,
|
|
GlobalQueue: 1024,
|
|
|
|
Lifetime: 3 * time.Hour,
|
|
}
|
|
|
|
// sanitize checks the provided user configurations and changes anything that's
|
|
// unreasonable or unworkable.
|
|
func (config *TxPoolConfig) sanitize() TxPoolConfig {
|
|
conf := *config
|
|
if conf.Rejournal < time.Second {
|
|
log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
|
|
conf.Rejournal = time.Second
|
|
}
|
|
if conf.PriceLimit < 1 {
|
|
log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
|
|
conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
|
|
}
|
|
if conf.PriceBump < 1 {
|
|
log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
|
|
conf.PriceBump = DefaultTxPoolConfig.PriceBump
|
|
}
|
|
if conf.AccountSlots < 1 {
|
|
log.Warn("Sanitizing invalid txpool account slots", "provided", conf.AccountSlots, "updated", DefaultTxPoolConfig.AccountSlots)
|
|
conf.AccountSlots = DefaultTxPoolConfig.AccountSlots
|
|
}
|
|
if conf.GlobalSlots < 1 {
|
|
log.Warn("Sanitizing invalid txpool global slots", "provided", conf.GlobalSlots, "updated", DefaultTxPoolConfig.GlobalSlots)
|
|
conf.GlobalSlots = DefaultTxPoolConfig.GlobalSlots
|
|
}
|
|
if conf.AccountQueue < 1 {
|
|
log.Warn("Sanitizing invalid txpool account queue", "provided", conf.AccountQueue, "updated", DefaultTxPoolConfig.AccountQueue)
|
|
conf.AccountQueue = DefaultTxPoolConfig.AccountQueue
|
|
}
|
|
if conf.GlobalQueue < 1 {
|
|
log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue)
|
|
conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue
|
|
}
|
|
if conf.Lifetime < 1 {
|
|
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
|
|
conf.Lifetime = DefaultTxPoolConfig.Lifetime
|
|
}
|
|
return conf
|
|
}
|
|
|
|
// TxPool contains all currently known transactions. Transactions
|
|
// enter the pool when they are received from the network or submitted
|
|
// locally. They exit the pool when they are included in the blockchain.
|
|
//
|
|
// The pool separates processable transactions (which can be applied to the
|
|
// current state) and future transactions. Transactions move between those
|
|
// two states over time as they are received and processed.
|
|
type TxPool struct {
|
|
config TxPoolConfig
|
|
chainconfig *params.ChainConfig
|
|
chain blockChain
|
|
gasPrice *big.Int
|
|
txFeed event.Feed
|
|
scope event.SubscriptionScope
|
|
signer types.Signer
|
|
mu sync.RWMutex
|
|
|
|
currentState *state.StateDB // Current state in the blockchain head
|
|
pendingState *state.ManagedState // Pending state tracking virtual nonces
|
|
currentMaxGas uint64 // Current gas limit for transaction caps
|
|
|
|
locals *accountSet // Set of local transaction to exempt from eviction rules
|
|
journal *txJournal // Journal of local transaction to back up to disk
|
|
|
|
pending map[common.Address]*txList // All currently processable transactions
|
|
queue map[common.Address]*txList // Queued but non-processable transactions
|
|
beats map[common.Address]time.Time // Last heartbeat from each known account
|
|
all *txLookup // All transactions to allow lookups
|
|
priced *txPricedList // All transactions sorted by price
|
|
|
|
chainHeadCh chan ChainHeadEvent
|
|
chainHeadSub event.Subscription
|
|
reqResetCh chan *txpoolResetRequest
|
|
reqPromoteCh chan *accountSet
|
|
queueTxEventCh chan *types.Transaction
|
|
reorgDoneCh chan chan struct{}
|
|
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
|
|
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
|
|
}
|
|
|
|
type txpoolResetRequest struct {
|
|
oldHead, newHead *types.Header
|
|
}
|
|
|
|
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
|
|
// transactions from the network.
|
|
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
|
|
// Sanitize the input to ensure no vulnerable gas prices are set
|
|
config = (&config).sanitize()
|
|
|
|
// Create the transaction pool with its initial settings
|
|
pool := &TxPool{
|
|
config: config,
|
|
chainconfig: chainconfig,
|
|
chain: chain,
|
|
signer: types.NewEIP155Signer(chainconfig.ChainID),
|
|
pending: make(map[common.Address]*txList),
|
|
queue: make(map[common.Address]*txList),
|
|
beats: make(map[common.Address]time.Time),
|
|
all: newTxLookup(),
|
|
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
|
|
reqResetCh: make(chan *txpoolResetRequest),
|
|
reqPromoteCh: make(chan *accountSet),
|
|
queueTxEventCh: make(chan *types.Transaction),
|
|
reorgDoneCh: make(chan chan struct{}),
|
|
reorgShutdownCh: make(chan struct{}),
|
|
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
|
|
}
|
|
pool.locals = newAccountSet(pool.signer)
|
|
for _, addr := range config.Locals {
|
|
log.Info("Setting new local account", "address", addr)
|
|
pool.locals.add(addr)
|
|
}
|
|
pool.priced = newTxPricedList(pool.all)
|
|
pool.reset(nil, chain.CurrentBlock().Header())
|
|
|
|
// Start the reorg loop early so it can handle requests generated during journal loading.
|
|
pool.wg.Add(1)
|
|
go pool.scheduleReorgLoop()
|
|
|
|
// If local transactions and journaling is enabled, load from disk
|
|
if !config.NoLocals && config.Journal != "" {
|
|
pool.journal = newTxJournal(config.Journal)
|
|
|
|
if err := pool.journal.load(pool.AddLocals); err != nil {
|
|
log.Warn("Failed to load transaction journal", "err", err)
|
|
}
|
|
if err := pool.journal.rotate(pool.local()); err != nil {
|
|
log.Warn("Failed to rotate transaction journal", "err", err)
|
|
}
|
|
}
|
|
|
|
// Subscribe events from blockchain and start the main event loop.
|
|
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
|
|
pool.wg.Add(1)
|
|
go pool.loop()
|
|
|
|
return pool
|
|
}
|
|
|
|
// loop is the transaction pool's main event loop, waiting for and reacting to
|
|
// outside blockchain events as well as for various reporting and transaction
|
|
// eviction events.
|
|
func (pool *TxPool) loop() {
|
|
defer pool.wg.Done()
|
|
|
|
var (
|
|
prevPending, prevQueued, prevStales int
|
|
// Start the stats reporting and transaction eviction tickers
|
|
report = time.NewTicker(statsReportInterval)
|
|
evict = time.NewTicker(evictionInterval)
|
|
journal = time.NewTicker(pool.config.Rejournal)
|
|
// Track the previous head headers for transaction reorgs
|
|
head = pool.chain.CurrentBlock()
|
|
)
|
|
defer report.Stop()
|
|
defer evict.Stop()
|
|
defer journal.Stop()
|
|
|
|
for {
|
|
select {
|
|
// Handle ChainHeadEvent
|
|
case ev := <-pool.chainHeadCh:
|
|
if ev.Block != nil {
|
|
pool.requestReset(head.Header(), ev.Block.Header())
|
|
head = ev.Block
|
|
}
|
|
|
|
// System shutdown.
|
|
case <-pool.chainHeadSub.Err():
|
|
close(pool.reorgShutdownCh)
|
|
return
|
|
|
|
// Handle stats reporting ticks
|
|
case <-report.C:
|
|
pool.mu.RLock()
|
|
pending, queued := pool.stats()
|
|
stales := pool.priced.stales
|
|
pool.mu.RUnlock()
|
|
|
|
if pending != prevPending || queued != prevQueued || stales != prevStales {
|
|
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
|
|
prevPending, prevQueued, prevStales = pending, queued, stales
|
|
}
|
|
|
|
// Handle inactive account transaction eviction
|
|
case <-evict.C:
|
|
pool.mu.Lock()
|
|
for addr := range pool.queue {
|
|
// Skip local transactions from the eviction mechanism
|
|
if pool.locals.contains(addr) {
|
|
continue
|
|
}
|
|
// Any non-locals old enough should be removed
|
|
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
|
|
for _, tx := range pool.queue[addr].Flatten() {
|
|
pool.removeTx(tx.Hash(), true)
|
|
}
|
|
}
|
|
}
|
|
pool.mu.Unlock()
|
|
|
|
// Handle local transaction journal rotation
|
|
case <-journal.C:
|
|
if pool.journal != nil {
|
|
pool.mu.Lock()
|
|
if err := pool.journal.rotate(pool.local()); err != nil {
|
|
log.Warn("Failed to rotate local tx journal", "err", err)
|
|
}
|
|
pool.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop terminates the transaction pool.
|
|
func (pool *TxPool) Stop() {
|
|
// Unsubscribe all subscriptions registered from txpool
|
|
pool.scope.Close()
|
|
|
|
// Unsubscribe subscriptions registered from blockchain
|
|
pool.chainHeadSub.Unsubscribe()
|
|
pool.wg.Wait()
|
|
|
|
if pool.journal != nil {
|
|
pool.journal.close()
|
|
}
|
|
log.Info("Transaction pool stopped")
|
|
}
|
|
|
|
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
|
|
// starts sending event to the given channel.
|
|
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
|
|
return pool.scope.Track(pool.txFeed.Subscribe(ch))
|
|
}
|
|
|
|
// GasPrice returns the current gas price enforced by the transaction pool.
|
|
func (pool *TxPool) GasPrice() *big.Int {
|
|
pool.mu.RLock()
|
|
defer pool.mu.RUnlock()
|
|
|
|
return new(big.Int).Set(pool.gasPrice)
|
|
}
|
|
|
|
// SetGasPrice updates the minimum price required by the transaction pool for a
|
|
// new transaction, and drops all transactions below this threshold.
|
|
func (pool *TxPool) SetGasPrice(price *big.Int) {
|
|
pool.mu.Lock()
|
|
defer pool.mu.Unlock()
|
|
|
|
pool.gasPrice = price
|
|
for _, tx := range pool.priced.Cap(price, pool.locals) {
|
|
pool.removeTx(tx.Hash(), false)
|
|
}
|
|
log.Info("Transaction pool price threshold updated", "price", price)
|
|
}
|
|
|
|
// State returns the virtual managed state of the transaction pool.
|
|
func (pool *TxPool) State() *state.ManagedState {
|
|
pool.mu.RLock()
|
|
defer pool.mu.RUnlock()
|
|
|
|
return pool.pendingState
|
|
}
|
|
|
|
// Stats retrieves the current pool stats, namely the number of pending and the
|
|
// number of queued (non-executable) transactions.
|
|
func (pool *TxPool) Stats() (int, int) {
|
|
pool.mu.RLock()
|
|
defer pool.mu.RUnlock()
|
|
|
|
return pool.stats()
|
|
}
|
|
|
|
// stats retrieves the current pool stats, namely the number of pending and the
|
|
// number of queued (non-executable) transactions.
|
|
func (pool *TxPool) stats() (int, int) {
|
|
pending := 0
|
|
for _, list := range pool.pending {
|
|
pending += list.Len()
|
|
}
|
|
queued := 0
|
|
for _, list := range pool.queue {
|
|
queued += list.Len()
|
|
}
|
|
return pending, queued
|
|
}
|
|
|
|
// Content retrieves the data content of the transaction pool, returning all the
|
|
// pending as well as queued transactions, grouped by account and sorted by nonce.
|
|
func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
|
|
pool.mu.Lock()
|
|
defer pool.mu.Unlock()
|
|
|
|
pending := make(map[common.Address]types.Transactions)
|
|
for addr, list := range pool.pending {
|
|
pending[addr] = list.Flatten()
|
|
}
|
|
queued := make(map[common.Address]types.Transactions)
|
|
for addr, list := range pool.queue {
|
|
queued[addr] = list.Flatten()
|
|
}
|
|
return pending, queued
|
|
}
|
|
|
|
// Pending retrieves all currently processable transactions, grouped by origin
|
|
// account and sorted by nonce. The returned transaction set is a copy and can be
|
|
// freely modified by calling code.
|
|
func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
|
|
pool.mu.Lock()
|
|
defer pool.mu.Unlock()
|
|
|
|
pending := make(map[common.Address]types.Transactions)
|
|
for addr, list := range pool.pending {
|
|
pending[addr] = list.Flatten()
|
|
}
|
|
return pending, nil
|
|
}
|
|
|
|
// Locals retrieves the accounts currently considered local by the pool.
|
|
func (pool *TxPool) Locals() []common.Address {
|
|
pool.mu.Lock()
|
|
defer pool.mu.Unlock()
|
|
|
|
return pool.locals.flatten()
|
|
}
|
|
|
|
// local retrieves all currently known local transactions, grouped by origin
|
|
// account and sorted by nonce. The returned transaction set is a copy and can be
|
|
// freely modified by calling code.
|
|
func (pool *TxPool) local() map[common.Address]types.Transactions {
|
|
txs := make(map[common.Address]types.Transactions)
|
|
for addr := range pool.locals.accounts {
|
|
if pending := pool.pending[addr]; pending != nil {
|
|
txs[addr] = append(txs[addr], pending.Flatten()...)
|
|
}
|
|
if queued := pool.queue[addr]; queued != nil {
|
|
txs[addr] = append(txs[addr], queued.Flatten()...)
|
|
}
|
|
}
|
|
return txs
|
|
}
|
|
|
|
// validateTx checks whether a transaction is valid according to the consensus
|
|
// rules and adheres to some heuristic limits of the local node (price and size).
|
|
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
|
|
// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
|
|
if tx.Size() > 32*1024 {
|
|
return ErrOversizedData
|
|
}
|
|
// Transactions can't be negative. This may never happen using RLP decoded
|
|
// transactions but may occur if you create a transaction using the RPC.
|
|
if tx.Value().Sign() < 0 {
|
|
return ErrNegativeValue
|
|
}
|
|
// Ensure the transaction doesn't exceed the current block limit gas.
|
|
if pool.currentMaxGas < tx.Gas() {
|
|
return ErrGasLimit
|
|
}
|
|
// Make sure the transaction is signed properly
|
|
from, err := types.Sender(pool.signer, tx)
|
|
if err != nil {
|
|
return ErrInvalidSender
|
|
}
|
|
// Drop non-local transactions under our own minimal accepted gas price
|
|
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
|
|
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
|
|
return ErrUnderpriced
|
|
}
|
|
// Ensure the transaction adheres to nonce ordering
|
|
if pool.currentState.GetNonce(from) > tx.Nonce() {
|
|
return ErrNonceTooLow
|
|
}
|
|
// Transactor should have enough funds to cover the costs
|
|
// cost == V + GP * GL
|
|
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
|
|
return ErrInsufficientFunds
|
|
}
|
|
// Ensure the transaction has more gas than the basic tx fee.
|
|
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if tx.Gas() < intrGas {
|
|
return ErrIntrinsicGas
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// add validates a transaction and inserts it into the non-executable queue for later
|
|
// pending promotion and execution. If the transaction is a replacement for an already
|
|
// pending or queued one, it overwrites the previous transaction if its price is higher.
|
|
//
|
|
// If a newly added transaction is marked as local, its sending account will be
|
|
// whitelisted, preventing any associated transaction from being dropped out of the pool
|
|
// due to pricing constraints.
|
|
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
|
|
// If the transaction is already known, discard it
|
|
hash := tx.Hash()
|
|
if pool.all.Get(hash) != nil {
|
|
log.Trace("Discarding already known transaction", "hash", hash)
|
|
return false, fmt.Errorf("known transaction: %x", hash)
|
|
}
|
|
|
|
// If the transaction fails basic validation, discard it
|
|
if err := pool.validateTx(tx, local); err != nil {
|
|
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
|
|
invalidTxMeter.Mark(1)
|
|
return false, err
|
|
}
|
|
|
|
// If the transaction pool is full, discard underpriced transactions
|
|
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
|
|
// If the new transaction is underpriced, don't accept it
|
|
if !local && pool.priced.Underpriced(tx, pool.locals) {
|
|
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
|
|
underpricedTxMeter.Mark(1)
|
|
return false, ErrUnderpriced
|
|
}
|
|
// New transaction is better than our worse ones, make room for it
|
|
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
|
|
for _, tx := range drop {
|
|
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
|
|
underpricedTxMeter.Mark(1)
|
|
pool.removeTx(tx.Hash(), false)
|
|
}
|
|
}
|
|
|
|
// Try to replace an existing transaction in the pending pool
|
|
from, _ := types.Sender(pool.signer, tx) // already validated
|
|
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
|
|
// Nonce already pending, check if required price bump is met
|
|
inserted, old := list.Add(tx, pool.config.PriceBump)
|
|
if !inserted {
|
|
pendingDiscardMeter.Mark(1)
|
|
return false, ErrReplaceUnderpriced
|
|
}
|
|
// New transaction is better, replace old one
|
|
if old != nil {
|
|
pool.all.Remove(old.Hash())
|
|
pool.priced.Removed(1)
|
|
pendingReplaceMeter.Mark(1)
|
|
}
|
|
pool.all.Add(tx)
|
|
pool.priced.Put(tx)
|
|
pool.journalTx(from, tx)
|
|
pool.queueTxEvent(tx)
|
|
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
|
|
return old != nil, nil
|
|
}
|
|
|
|
// New transaction isn't replacing a pending one, push into queue
|
|
replaced, err = pool.enqueueTx(hash, tx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// Mark local addresses and journal local transactions
|
|
if local {
|
|
if !pool.locals.contains(from) {
|
|
log.Info("Setting new local account", "address", from)
|
|
pool.locals.add(from)
|
|
}
|
|
}
|
|
if local || pool.locals.contains(from) {
|
|
localCounter.Inc(1)
|
|
}
|
|
pool.journalTx(from, tx)
|
|
|
|
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
|
|
return replaced, nil
|
|
}
|
|
|
|
// enqueueTx inserts a new transaction into the non-executable transaction queue.
|
|
//
|
|
// Note, this method assumes the pool lock is held!
|
|
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
|
|
// Try to insert the transaction into the future queue
|
|
from, _ := types.Sender(pool.signer, tx) // already validated
|
|
if pool.queue[from] == nil {
|
|
pool.queue[from] = newTxList(false)
|
|
}
|
|
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
|
|
if !inserted {
|
|
// An older transaction was better, discard this
|
|
queuedDiscardMeter.Mark(1)
|
|
return false, ErrReplaceUnderpriced
|
|
}
|
|
// Discard any previous transaction and mark this
|
|
if old != nil {
|
|
pool.all.Remove(old.Hash())
|
|
pool.priced.Removed(1)
|
|
queuedReplaceMeter.Mark(1)
|
|
} else {
|
|
// Nothing was replaced, bump the queued counter
|
|
queuedCounter.Inc(1)
|
|
}
|
|
if pool.all.Get(hash) == nil {
|
|
pool.all.Add(tx)
|
|
pool.priced.Put(tx)
|
|
}
|
|
return old != nil, nil
|
|
}
|
|
|
|
// journalTx adds the specified transaction to the local disk journal if it is
|
|
// deemed to have been sent from a local account.
|
|
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
|
|
// Only journal if it's enabled and the transaction is local
|
|
if pool.journal == nil || !pool.locals.contains(from) {
|
|
return
|
|
}
|
|
if err := pool.journal.insert(tx); err != nil {
|
|
log.Warn("Failed to journal local transaction", "err", err)
|
|
}
|
|
}
|
|
|
|
// promoteTx adds a transaction to the pending (processable) list of transactions
|
|
// and returns whether it was inserted or an older was better.
|
|
//
|
|
// Note, this method assumes the pool lock is held!
|
|
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
|
|
// Try to insert the transaction into the pending queue
|
|
if pool.pending[addr] == nil {
|
|
pool.pending[addr] = newTxList(true)
|
|
}
|
|
list := pool.pending[addr]
|
|
|
|
inserted, old := list.Add(tx, pool.config.PriceBump)
|
|
if !inserted {
|
|
// An older transaction was better, discard this
|
|
pool.all.Remove(hash)
|
|
pool.priced.Removed(1)
|
|
|
|
pendingDiscardMeter.Mark(1)
|
|
return false
|
|
}
|
|
// Otherwise discard any previous transaction and mark this
|
|
if old != nil {
|
|
pool.all.Remove(old.Hash())
|
|
pool.priced.Removed(1)
|
|
|
|
pendingReplaceMeter.Mark(1)
|
|
} else {
|
|
// Nothing was replaced, bump the pending counter
|
|
pendingCounter.Inc(1)
|
|
}
|
|
// Failsafe to work around direct pending inserts (tests)
|
|
if pool.all.Get(hash) == nil {
|
|
pool.all.Add(tx)
|
|
pool.priced.Put(tx)
|
|
}
|
|
// Set the potentially new pending nonce and notify any subsystems of the new tx
|
|
pool.beats[addr] = time.Now()
|
|
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
|
|
|
|
return true
|
|
}
|
|
|
|
// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
|
|
// senders as a local ones, ensuring they go around the local pricing constraints.
|
|
//
|
|
// This method is used to add transactions from the RPC API and performs synchronous pool
|
|
// reorganization and event propagation.
|
|
func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
|
|
return pool.addTxs(txs, !pool.config.NoLocals, true)
|
|
}
|
|
|
|
// AddLocal enqueues a single local transaction into the pool if it is valid. This is
|
|
// a convenience wrapper aroundd AddLocals.
|
|
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
|
|
errs := pool.AddLocals([]*types.Transaction{tx})
|
|
return errs[0]
|
|
}
|
|
|
|
// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the
|
|
// senders are not among the locally tracked ones, full pricing constraints will apply.
|
|
//
|
|
// This method is used to add transactions from the p2p network and does not wait for pool
|
|
// reorganization and internal event propagation.
|
|
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
|
|
return pool.addTxs(txs, false, false)
|
|
}
|
|
|
|
// This is like AddRemotes, but waits for pool reorganization. Tests use this method.
|
|
func (pool *TxPool) addRemotesSync(txs []*types.Transaction) []error {
|
|
return pool.addTxs(txs, false, true)
|
|
}
|
|
|
|
// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
|
|
func (pool *TxPool) addRemoteSync(tx *types.Transaction) error {
|
|
errs := pool.addRemotesSync([]*types.Transaction{tx})
|
|
return errs[0]
|
|
}
|
|
|
|
// AddRemote enqueues a single transaction into the pool if it is valid. This is a convenience
|
|
// wrapper around AddRemotes.
|
|
//
|
|
// Deprecated: use AddRemotes
|
|
func (pool *TxPool) AddRemote(tx *types.Transaction) error {
|
|
errs := pool.AddRemotes([]*types.Transaction{tx})
|
|
return errs[0]
|
|
}
|
|
|
|
// addTxs attempts to queue a batch of transactions if they are valid.
|
|
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
|
|
// Cache senders in transactions before obtaining lock (pool.signer is immutable)
|
|
for _, tx := range txs {
|
|
types.Sender(pool.signer, tx)
|
|
}
|
|
|
|
pool.mu.Lock()
|
|
errs, dirtyAddrs := pool.addTxsLocked(txs, local)
|
|
pool.mu.Unlock()
|
|
|
|
done := pool.requestPromoteExecutables(dirtyAddrs)
|
|
if sync {
|
|
<-done
|
|
}
|
|
return errs
|
|
}
|
|
|
|
// addTxsLocked attempts to queue a batch of transactions if they are valid.
|
|
// The transaction pool lock must be held.
|
|
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
|
|
dirty := newAccountSet(pool.signer)
|
|
errs := make([]error, len(txs))
|
|
for i, tx := range txs {
|
|
replaced, err := pool.add(tx, local)
|
|
errs[i] = err
|
|
if err == nil && !replaced {
|
|
dirty.addTx(tx)
|
|
}
|
|
}
|
|
validMeter.Mark(int64(len(dirty.accounts)))
|
|
return errs, dirty
|
|
}
|
|
|
|
// Status returns the status (unknown/pending/queued) of a batch of transactions
|
|
// identified by their hashes.
|
|
func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
|
|
pool.mu.RLock()
|
|
defer pool.mu.RUnlock()
|
|
|
|
status := make([]TxStatus, len(hashes))
|
|
for i, hash := range hashes {
|
|
if tx := pool.all.Get(hash); tx != nil {
|
|
from, _ := types.Sender(pool.signer, tx) // already validated
|
|
if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
|
|
status[i] = TxStatusPending
|
|
} else {
|
|
status[i] = TxStatusQueued
|
|
}
|
|
}
|
|
}
|
|
return status
|
|
}
|
|
|
|
// Get returns a transaction if it is contained in the pool and nil otherwise.
|
|
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
|
|
return pool.all.Get(hash)
|
|
}
|
|
|
|
// removeTx removes a single transaction from the queue, moving all subsequent
|
|
// transactions back to the future queue.
|
|
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
|
|
// Fetch the transaction we wish to delete
|
|
tx := pool.all.Get(hash)
|
|
if tx == nil {
|
|
return
|
|
}
|
|
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
|
|
|
|
// Remove it from the list of known transactions
|
|
pool.all.Remove(hash)
|
|
if outofbound {
|
|
pool.priced.Removed(1)
|
|
}
|
|
if pool.locals.contains(addr) {
|
|
localCounter.Dec(1)
|
|
}
|
|
// Remove the transaction from the pending lists and reset the account nonce
|
|
if pending := pool.pending[addr]; pending != nil {
|
|
if removed, invalids := pending.Remove(tx); removed {
|
|
// If no more pending transactions are left, remove the list
|
|
if pending.Empty() {
|
|
delete(pool.pending, addr)
|
|
delete(pool.beats, addr)
|
|
}
|
|
// Postpone any invalidated transactions
|
|
for _, tx := range invalids {
|
|
pool.enqueueTx(tx.Hash(), tx)
|
|
}
|
|
// Update the account nonce if needed
|
|
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
|
|
pool.pendingState.SetNonce(addr, nonce)
|
|
}
|
|
// Reduce the pending counter
|
|
pendingCounter.Dec(int64(1 + len(invalids)))
|
|
return
|
|
}
|
|
}
|
|
// Transaction is in the future queue
|
|
if future := pool.queue[addr]; future != nil {
|
|
if removed, _ := future.Remove(tx); removed {
|
|
// Reduce the queued counter
|
|
queuedCounter.Dec(1)
|
|
}
|
|
if future.Empty() {
|
|
delete(pool.queue, addr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// requestPromoteExecutables requests a pool reset to the new head block.
|
|
// The returned channel is closed when the reset has occurred.
|
|
func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
|
|
select {
|
|
case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
|
|
return <-pool.reorgDoneCh
|
|
case <-pool.reorgShutdownCh:
|
|
return pool.reorgShutdownCh
|
|
}
|
|
}
|
|
|
|
// requestPromoteExecutables requests transaction promotion checks for the given addresses.
|
|
// The returned channel is closed when the promotion checks have occurred.
|
|
func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} {
|
|
select {
|
|
case pool.reqPromoteCh <- set:
|
|
return <-pool.reorgDoneCh
|
|
case <-pool.reorgShutdownCh:
|
|
return pool.reorgShutdownCh
|
|
}
|
|
}
|
|
|
|
// queueTxEvent enqueues a transaction event to be sent in the next reorg run.
|
|
func (pool *TxPool) queueTxEvent(tx *types.Transaction) {
|
|
select {
|
|
case pool.queueTxEventCh <- tx:
|
|
case <-pool.reorgShutdownCh:
|
|
}
|
|
}
|
|
|
|
// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
|
|
// call those methods directly, but request them being run using requestReset and
|
|
// requestPromoteExecutables instead.
|
|
func (pool *TxPool) scheduleReorgLoop() {
|
|
defer pool.wg.Done()
|
|
|
|
var (
|
|
curDone chan struct{} // non-nil while runReorg is active
|
|
nextDone = make(chan struct{})
|
|
launchNextRun bool
|
|
reset *txpoolResetRequest
|
|
dirtyAccounts *accountSet
|
|
queuedEvents = make(map[common.Address]*txSortedMap)
|
|
)
|
|
for {
|
|
// Launch next background reorg if needed
|
|
if curDone == nil && launchNextRun {
|
|
// Run the background reorg and announcements
|
|
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
|
|
|
|
// Prepare everything for the next round of reorg
|
|
curDone, nextDone = nextDone, make(chan struct{})
|
|
launchNextRun = false
|
|
|
|
reset, dirtyAccounts = nil, nil
|
|
queuedEvents = make(map[common.Address]*txSortedMap)
|
|
}
|
|
|
|
select {
|
|
case req := <-pool.reqResetCh:
|
|
// Reset request: update head if request is already pending.
|
|
if reset == nil {
|
|
reset = req
|
|
} else {
|
|
reset.newHead = req.newHead
|
|
}
|
|
launchNextRun = true
|
|
pool.reorgDoneCh <- nextDone
|
|
|
|
case req := <-pool.reqPromoteCh:
|
|
// Promote request: update address set if request is already pending.
|
|
if dirtyAccounts == nil {
|
|
dirtyAccounts = req
|
|
} else {
|
|
dirtyAccounts.merge(req)
|
|
}
|
|
launchNextRun = true
|
|
pool.reorgDoneCh <- nextDone
|
|
|
|
case tx := <-pool.queueTxEventCh:
|
|
// Queue up the event, but don't schedule a reorg. It's up to the caller to
|
|
// request one later if they want the events sent.
|
|
addr, _ := types.Sender(pool.signer, tx)
|
|
if _, ok := queuedEvents[addr]; !ok {
|
|
queuedEvents[addr] = newTxSortedMap()
|
|
}
|
|
queuedEvents[addr].Put(tx)
|
|
|
|
case <-curDone:
|
|
curDone = nil
|
|
|
|
case <-pool.reorgShutdownCh:
|
|
// Wait for current run to finish.
|
|
if curDone != nil {
|
|
<-curDone
|
|
}
|
|
close(nextDone)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
|
|
func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
|
|
defer close(done)
|
|
|
|
var promoteAddrs []common.Address
|
|
if dirtyAccounts != nil {
|
|
promoteAddrs = dirtyAccounts.flatten()
|
|
}
|
|
pool.mu.Lock()
|
|
if reset != nil {
|
|
// Reset from the old head to the new, rescheduling any reorged transactions
|
|
pool.reset(reset.oldHead, reset.newHead)
|
|
|
|
// Nonces were reset, discard any events that became stale
|
|
for addr := range events {
|
|
events[addr].Forward(pool.pendingState.GetNonce(addr))
|
|
if events[addr].Len() == 0 {
|
|
delete(events, addr)
|
|
}
|
|
}
|
|
// Reset needs promote for all addresses
|
|
promoteAddrs = promoteAddrs[:0]
|
|
for addr := range pool.queue {
|
|
promoteAddrs = append(promoteAddrs, addr)
|
|
}
|
|
}
|
|
// Check for pending transactions for every account that sent new ones
|
|
promoted := pool.promoteExecutables(promoteAddrs)
|
|
for _, tx := range promoted {
|
|
addr, _ := types.Sender(pool.signer, tx)
|
|
if _, ok := events[addr]; !ok {
|
|
events[addr] = newTxSortedMap()
|
|
}
|
|
events[addr].Put(tx)
|
|
}
|
|
// If a new block appeared, validate the pool of pending transactions. This will
|
|
// remove any transaction that has been included in the block or was invalidated
|
|
// because of another transaction (e.g. higher gas price).
|
|
if reset != nil {
|
|
pool.demoteUnexecutables()
|
|
}
|
|
// Ensure pool.queue and pool.pending sizes stay within the configured limits.
|
|
pool.truncatePending()
|
|
pool.truncateQueue()
|
|
|
|
// Update all accounts to the latest known pending nonce
|
|
for addr, list := range pool.pending {
|
|
txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
|
|
pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
|
|
}
|
|
pool.mu.Unlock()
|
|
|
|
// Notify subsystems for newly added transactions
|
|
if len(events) > 0 {
|
|
var txs []*types.Transaction
|
|
for _, set := range events {
|
|
txs = append(txs, set.Flatten()...)
|
|
}
|
|
pool.txFeed.Send(NewTxsEvent{txs})
|
|
}
|
|
}
|
|
|
|
// reset retrieves the current state of the blockchain and ensures the content
|
|
// of the transaction pool is valid with regard to the chain state.
|
|
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
|
|
// If we're reorging an old state, reinject all dropped transactions
|
|
var reinject types.Transactions
|
|
|
|
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
|
|
// If the reorg is too deep, avoid doing it (will happen during fast sync)
|
|
oldNum := oldHead.Number.Uint64()
|
|
newNum := newHead.Number.Uint64()
|
|
|
|
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
|
|
log.Debug("Skipping deep transaction reorg", "depth", depth)
|
|
} else {
|
|
// Reorg seems shallow enough to pull in all transactions into memory
|
|
var discarded, included types.Transactions
|
|
var (
|
|
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
|
|
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
|
|
)
|
|
if rem == nil {
|
|
// This can happen if a setHead is performed, where we simply discard the old
|
|
// head from the chain.
|
|
// If that is the case, we don't have the lost transactions any more, and
|
|
// there's nothing to add
|
|
if newNum < oldNum {
|
|
// If the reorg ended up on a lower number, it's indicative of setHead being the cause
|
|
log.Debug("Skipping transaction reset caused by setHead",
|
|
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
|
|
} else {
|
|
// If we reorged to a same or higher number, then it's not a case of setHead
|
|
log.Warn("Transaction pool reset with missing oldhead",
|
|
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
|
|
}
|
|
return
|
|
}
|
|
for rem.NumberU64() > add.NumberU64() {
|
|
discarded = append(discarded, rem.Transactions()...)
|
|
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
|
|
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
|
|
return
|
|
}
|
|
}
|
|
for add.NumberU64() > rem.NumberU64() {
|
|
included = append(included, add.Transactions()...)
|
|
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
|
|
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
|
|
return
|
|
}
|
|
}
|
|
for rem.Hash() != add.Hash() {
|
|
discarded = append(discarded, rem.Transactions()...)
|
|
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
|
|
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
|
|
return
|
|
}
|
|
included = append(included, add.Transactions()...)
|
|
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
|
|
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
|
|
return
|
|
}
|
|
}
|
|
reinject = types.TxDifference(discarded, included)
|
|
}
|
|
}
|
|
// Initialize the internal state to the current head
|
|
if newHead == nil {
|
|
newHead = pool.chain.CurrentBlock().Header() // Special case during testing
|
|
}
|
|
statedb, err := pool.chain.StateAt(newHead.Root)
|
|
if err != nil {
|
|
log.Error("Failed to reset txpool state", "err", err)
|
|
return
|
|
}
|
|
pool.currentState = statedb
|
|
pool.pendingState = state.ManageState(statedb)
|
|
pool.currentMaxGas = newHead.GasLimit
|
|
|
|
// Inject any transactions discarded due to reorgs
|
|
log.Debug("Reinjecting stale transactions", "count", len(reinject))
|
|
senderCacher.recover(pool.signer, reinject)
|
|
pool.addTxsLocked(reinject, false)
|
|
}
|
|
|
|
// promoteExecutables moves transactions that have become processable from the
|
|
// future queue to the set of pending transactions. During this process, all
|
|
// invalidated transactions (low nonce, low balance) are deleted.
|
|
func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
|
|
// Track the promoted transactions to broadcast them at once
|
|
var promoted []*types.Transaction
|
|
|
|
// Iterate over all accounts and promote any executable transactions
|
|
for _, addr := range accounts {
|
|
list := pool.queue[addr]
|
|
if list == nil {
|
|
continue // Just in case someone calls with a non existing account
|
|
}
|
|
// Drop all transactions that are deemed too old (low nonce)
|
|
forwards := list.Forward(pool.currentState.GetNonce(addr))
|
|
for _, tx := range forwards {
|
|
hash := tx.Hash()
|
|
pool.all.Remove(hash)
|
|
log.Trace("Removed old queued transaction", "hash", hash)
|
|
}
|
|
// Drop all transactions that are too costly (low balance or out of gas)
|
|
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
|
|
for _, tx := range drops {
|
|
hash := tx.Hash()
|
|
pool.all.Remove(hash)
|
|
log.Trace("Removed unpayable queued transaction", "hash", hash)
|
|
}
|
|
queuedNofundsMeter.Mark(int64(len(drops)))
|
|
|
|
// Gather all executable transactions and promote them
|
|
readies := list.Ready(pool.pendingState.GetNonce(addr))
|
|
for _, tx := range readies {
|
|
hash := tx.Hash()
|
|
if pool.promoteTx(addr, hash, tx) {
|
|
log.Trace("Promoting queued transaction", "hash", hash)
|
|
promoted = append(promoted, tx)
|
|
}
|
|
}
|
|
queuedCounter.Dec(int64(len(readies)))
|
|
|
|
// Drop all transactions over the allowed limit
|
|
var caps types.Transactions
|
|
if !pool.locals.contains(addr) {
|
|
caps = list.Cap(int(pool.config.AccountQueue))
|
|
for _, tx := range caps {
|
|
hash := tx.Hash()
|
|
pool.all.Remove(hash)
|
|
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
|
|
}
|
|
queuedRateLimitMeter.Mark(int64(len(caps)))
|
|
}
|
|
// Mark all the items dropped as removed
|
|
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
|
|
queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
|
|
if pool.locals.contains(addr) {
|
|
localCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
|
|
}
|
|
// Delete the entire queue entry if it became empty.
|
|
if list.Empty() {
|
|
delete(pool.queue, addr)
|
|
}
|
|
}
|
|
return promoted
|
|
}
|
|
|
|
// truncatePending removes transactions from the pending queue if the pool is above the
|
|
// pending limit. The algorithm tries to reduce transaction counts by an approximately
|
|
// equal number for all for accounts with many pending transactions.
|
|
func (pool *TxPool) truncatePending() {
|
|
pending := uint64(0)
|
|
for _, list := range pool.pending {
|
|
pending += uint64(list.Len())
|
|
}
|
|
if pending <= pool.config.GlobalSlots {
|
|
return
|
|
}
|
|
|
|
pendingBeforeCap := pending
|
|
// Assemble a spam order to penalize large transactors first
|
|
spammers := prque.New(nil)
|
|
for addr, list := range pool.pending {
|
|
// Only evict transactions from high rollers
|
|
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
|
|
spammers.Push(addr, int64(list.Len()))
|
|
}
|
|
}
|
|
// Gradually drop transactions from offenders
|
|
offenders := []common.Address{}
|
|
for pending > pool.config.GlobalSlots && !spammers.Empty() {
|
|
// Retrieve the next offender if not local address
|
|
offender, _ := spammers.Pop()
|
|
offenders = append(offenders, offender.(common.Address))
|
|
|
|
// Equalize balances until all the same or below threshold
|
|
if len(offenders) > 1 {
|
|
// Calculate the equalization threshold for all current offenders
|
|
threshold := pool.pending[offender.(common.Address)].Len()
|
|
|
|
// Iteratively reduce all offenders until below limit or threshold reached
|
|
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
|
|
for i := 0; i < len(offenders)-1; i++ {
|
|
list := pool.pending[offenders[i]]
|
|
|
|
caps := list.Cap(list.Len() - 1)
|
|
for _, tx := range caps {
|
|
// Drop the transaction from the global pools too
|
|
hash := tx.Hash()
|
|
pool.all.Remove(hash)
|
|
|
|
// Update the account nonce to the dropped transaction
|
|
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
|
|
pool.pendingState.SetNonce(offenders[i], nonce)
|
|
}
|
|
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
|
|
}
|
|
pool.priced.Removed(len(caps))
|
|
pendingCounter.Dec(int64(len(caps)))
|
|
if pool.locals.contains(offenders[i]) {
|
|
localCounter.Dec(int64(len(caps)))
|
|
}
|
|
pending--
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If still above threshold, reduce to limit or min allowance
|
|
if pending > pool.config.GlobalSlots && len(offenders) > 0 {
|
|
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
|
|
for _, addr := range offenders {
|
|
list := pool.pending[addr]
|
|
|
|
caps := list.Cap(list.Len() - 1)
|
|
for _, tx := range caps {
|
|
// Drop the transaction from the global pools too
|
|
hash := tx.Hash()
|
|
pool.all.Remove(hash)
|
|
|
|
// Update the account nonce to the dropped transaction
|
|
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
|
|
pool.pendingState.SetNonce(addr, nonce)
|
|
}
|
|
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
|
|
}
|
|
pool.priced.Removed(len(caps))
|
|
pendingCounter.Dec(int64(len(caps)))
|
|
if pool.locals.contains(addr) {
|
|
localCounter.Dec(int64(len(caps)))
|
|
}
|
|
pending--
|
|
}
|
|
}
|
|
}
|
|
pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
|
|
}
|
|
|
|
// truncateQueue drops the oldes transactions in the queue if the pool is above the global queue limit.
|
|
func (pool *TxPool) truncateQueue() {
|
|
queued := uint64(0)
|
|
for _, list := range pool.queue {
|
|
queued += uint64(list.Len())
|
|
}
|
|
if queued <= pool.config.GlobalQueue {
|
|
return
|
|
}
|
|
|
|
// Sort all accounts with queued transactions by heartbeat
|
|
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
|
|
for addr := range pool.queue {
|
|
if !pool.locals.contains(addr) { // don't drop locals
|
|
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
|
|
}
|
|
}
|
|
sort.Sort(addresses)
|
|
|
|
// Drop transactions until the total is below the limit or only locals remain
|
|
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
|
|
addr := addresses[len(addresses)-1]
|
|
list := pool.queue[addr.address]
|
|
|
|
addresses = addresses[:len(addresses)-1]
|
|
|
|
// Drop all transactions if they are less than the overflow
|
|
if size := uint64(list.Len()); size <= drop {
|
|
for _, tx := range list.Flatten() {
|
|
pool.removeTx(tx.Hash(), true)
|
|
}
|
|
drop -= size
|
|
queuedRateLimitMeter.Mark(int64(size))
|
|
continue
|
|
}
|
|
// Otherwise drop only last few transactions
|
|
txs := list.Flatten()
|
|
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
|
|
pool.removeTx(txs[i].Hash(), true)
|
|
drop--
|
|
queuedRateLimitMeter.Mark(1)
|
|
}
|
|
}
|
|
}
|
|
|
|
// demoteUnexecutables removes invalid and processed transactions from the pools
|
|
// executable/pending queue and any subsequent transactions that become unexecutable
|
|
// are moved back into the future queue.
|
|
func (pool *TxPool) demoteUnexecutables() {
|
|
// Iterate over all accounts and demote any non-executable transactions
|
|
for addr, list := range pool.pending {
|
|
nonce := pool.currentState.GetNonce(addr)
|
|
|
|
// Drop all transactions that are deemed too old (low nonce)
|
|
olds := list.Forward(nonce)
|
|
for _, tx := range olds {
|
|
hash := tx.Hash()
|
|
pool.all.Remove(hash)
|
|
log.Trace("Removed old pending transaction", "hash", hash)
|
|
}
|
|
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
|
|
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
|
|
for _, tx := range drops {
|
|
hash := tx.Hash()
|
|
log.Trace("Removed unpayable pending transaction", "hash", hash)
|
|
pool.all.Remove(hash)
|
|
}
|
|
pool.priced.Removed(len(olds) + len(drops))
|
|
pendingNofundsMeter.Mark(int64(len(drops)))
|
|
|
|
for _, tx := range invalids {
|
|
hash := tx.Hash()
|
|
log.Trace("Demoting pending transaction", "hash", hash)
|
|
pool.enqueueTx(hash, tx)
|
|
}
|
|
pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
|
|
if pool.locals.contains(addr) {
|
|
localCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
|
|
}
|
|
// If there's a gap in front, alert (should never happen) and postpone all transactions
|
|
if list.Len() > 0 && list.txs.Get(nonce) == nil {
|
|
gapped := list.Cap(0)
|
|
for _, tx := range gapped {
|
|
hash := tx.Hash()
|
|
log.Error("Demoting invalidated transaction", "hash", hash)
|
|
pool.enqueueTx(hash, tx)
|
|
}
|
|
pendingCounter.Dec(int64(len(gapped)))
|
|
}
|
|
// Delete the entire queue entry if it became empty.
|
|
if list.Empty() {
|
|
delete(pool.pending, addr)
|
|
delete(pool.beats, addr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// addressByHeartbeat is an account address tagged with its last activity timestamp.
|
|
type addressByHeartbeat struct {
|
|
address common.Address
|
|
heartbeat time.Time
|
|
}
|
|
|
|
type addressesByHeartbeat []addressByHeartbeat
|
|
|
|
func (a addressesByHeartbeat) Len() int { return len(a) }
|
|
func (a addressesByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
|
|
func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
// accountSet is simply a set of addresses to check for existence, and a signer
|
|
// capable of deriving addresses from transactions.
|
|
type accountSet struct {
|
|
accounts map[common.Address]struct{}
|
|
signer types.Signer
|
|
cache *[]common.Address
|
|
}
|
|
|
|
// newAccountSet creates a new address set with an associated signer for sender
|
|
// derivations.
|
|
func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
|
|
as := &accountSet{
|
|
accounts: make(map[common.Address]struct{}),
|
|
signer: signer,
|
|
}
|
|
for _, addr := range addrs {
|
|
as.add(addr)
|
|
}
|
|
return as
|
|
}
|
|
|
|
// contains checks if a given address is contained within the set.
|
|
func (as *accountSet) contains(addr common.Address) bool {
|
|
_, exist := as.accounts[addr]
|
|
return exist
|
|
}
|
|
|
|
// containsTx checks if the sender of a given tx is within the set. If the sender
|
|
// cannot be derived, this method returns false.
|
|
func (as *accountSet) containsTx(tx *types.Transaction) bool {
|
|
if addr, err := types.Sender(as.signer, tx); err == nil {
|
|
return as.contains(addr)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// add inserts a new address into the set to track.
|
|
func (as *accountSet) add(addr common.Address) {
|
|
as.accounts[addr] = struct{}{}
|
|
as.cache = nil
|
|
}
|
|
|
|
// addTx adds the sender of tx into the set.
|
|
func (as *accountSet) addTx(tx *types.Transaction) {
|
|
if addr, err := types.Sender(as.signer, tx); err == nil {
|
|
as.add(addr)
|
|
}
|
|
}
|
|
|
|
// flatten returns the list of addresses within this set, also caching it for later
|
|
// reuse. The returned slice should not be changed!
|
|
func (as *accountSet) flatten() []common.Address {
|
|
if as.cache == nil {
|
|
accounts := make([]common.Address, 0, len(as.accounts))
|
|
for account := range as.accounts {
|
|
accounts = append(accounts, account)
|
|
}
|
|
as.cache = &accounts
|
|
}
|
|
return *as.cache
|
|
}
|
|
|
|
// merge adds all addresses from the 'other' set into 'as'.
|
|
func (as *accountSet) merge(other *accountSet) {
|
|
for addr := range other.accounts {
|
|
as.accounts[addr] = struct{}{}
|
|
}
|
|
as.cache = nil
|
|
}
|
|
|
|
// txLookup is used internally by TxPool to track transactions while allowing lookup without
|
|
// mutex contention.
|
|
//
|
|
// Note, although this type is properly protected against concurrent access, it
|
|
// is **not** a type that should ever be mutated or even exposed outside of the
|
|
// transaction pool, since its internal state is tightly coupled with the pools
|
|
// internal mechanisms. The sole purpose of the type is to permit out-of-bound
|
|
// peeking into the pool in TxPool.Get without having to acquire the widely scoped
|
|
// TxPool.mu mutex.
|
|
type txLookup struct {
|
|
all map[common.Hash]*types.Transaction
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
// newTxLookup returns a new txLookup structure.
|
|
func newTxLookup() *txLookup {
|
|
return &txLookup{
|
|
all: make(map[common.Hash]*types.Transaction),
|
|
}
|
|
}
|
|
|
|
// Range calls f on each key and value present in the map.
|
|
func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
for key, value := range t.all {
|
|
if !f(key, value) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Get returns a transaction if it exists in the lookup, or nil if not found.
|
|
func (t *txLookup) Get(hash common.Hash) *types.Transaction {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return t.all[hash]
|
|
}
|
|
|
|
// Count returns the current number of items in the lookup.
|
|
func (t *txLookup) Count() int {
|
|
t.lock.RLock()
|
|
defer t.lock.RUnlock()
|
|
|
|
return len(t.all)
|
|
}
|
|
|
|
// Add adds a transaction to the lookup.
|
|
func (t *txLookup) Add(tx *types.Transaction) {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
t.all[tx.Hash()] = tx
|
|
}
|
|
|
|
// Remove removes a transaction from the lookup.
|
|
func (t *txLookup) Remove(hash common.Hash) {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
delete(t.all, hash)
|
|
}
|