Merge pull request #15085 from karalabe/txpool-immutable

core: make txpool operate on immutable state
This commit is contained in:
Péter Szilágyi 2017-09-05 13:39:18 +03:00 committed by GitHub
commit c91f7beb53
10 changed files with 233 additions and 307 deletions

View File

@ -81,7 +81,6 @@ type BlockChain struct {
hc *HeaderChain hc *HeaderChain
chainDb ethdb.Database chainDb ethdb.Database
rmTxFeed event.Feed
rmLogsFeed event.Feed rmLogsFeed event.Feed
chainFeed event.Feed chainFeed event.Feed
chainSideFeed event.Feed chainSideFeed event.Feed
@ -1194,15 +1193,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
for _, tx := range diff { for _, tx := range diff {
DeleteTxLookupEntry(bc.chainDb, tx.Hash()) DeleteTxLookupEntry(bc.chainDb, tx.Hash())
} }
// Must be posted in a goroutine because of the transaction pool trying
// to acquire the chain manager lock
if len(diff) > 0 {
go bc.rmTxFeed.Send(RemovedTransactionEvent{diff})
}
if len(deletedLogs) > 0 { if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
} }
if len(oldChain) > 0 { if len(oldChain) > 0 {
go func() { go func() {
for _, block := range oldChain { for _, block := range oldChain {
@ -1401,11 +1394,6 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config }
// Engine retrieves the blockchain's consensus engine. // Engine retrieves the blockchain's consensus engine.
func (bc *BlockChain) Engine() consensus.Engine { return bc.engine } func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }
// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent.
func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription {
return bc.scope.Track(bc.rmTxFeed.Subscribe(ch))
}
// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent. // SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription { func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch)) return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))

View File

@ -28,4 +28,8 @@ var (
// ErrBlacklistedHash is returned if a block to import is on the blacklist. // ErrBlacklistedHash is returned if a block to import is on the blacklist.
ErrBlacklistedHash = errors.New("blacklisted hash") ErrBlacklistedHash = errors.New("blacklisted hash")
// ErrNonceTooHigh is returned if the nonce of a transaction is higher than the
// next one expected based on the local chain.
ErrNonceTooHigh = errors.New("nonce too high")
) )

View File

@ -18,7 +18,6 @@ package core
import ( import (
"errors" "errors"
"fmt"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -197,8 +196,11 @@ func (st *StateTransition) preCheck() error {
// Make sure this transaction's nonce is correct // Make sure this transaction's nonce is correct
if msg.CheckNonce() { if msg.CheckNonce() {
if n := st.state.GetNonce(sender.Address()); n != msg.Nonce() { nonce := st.state.GetNonce(sender.Address())
return fmt.Errorf("invalid nonce: have %d, expected %d", msg.Nonce(), n) if nonce < msg.Nonce() {
return ErrNonceTooHigh
} else if nonce > msg.Nonce() {
return ErrNonceTooLow
} }
} }
return st.buyGas() return st.buyGas()

View File

@ -298,6 +298,7 @@ func (l *txList) Filter(costLimit, gasLimit *big.Int) (types.Transactions, types
// If the list was strict, filter anything above the lowest nonce // If the list was strict, filter anything above the lowest nonce
var invalids types.Transactions var invalids types.Transactions
if l.strict && len(removed) > 0 { if l.strict && len(removed) > 0 {
lowest := uint64(math.MaxUint64) lowest := uint64(math.MaxUint64)
for _, tx := range removed { for _, tx := range removed {

View File

@ -105,10 +105,11 @@ var (
// blockChain provides the state of blockchain and current gas limit to do // blockChain provides the state of blockchain and current gas limit to do
// some pre checks in tx pool and event subscribers. // some pre checks in tx pool and event subscribers.
type blockChain interface { type blockChain interface {
State() (*state.StateDB, error) CurrentHeader() *types.Header
GasLimit() *big.Int
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription
GetBlock(hash common.Hash, number uint64) *types.Block
StateAt(root common.Hash) (*state.StateDB, error)
} }
// TxPoolConfig are the configuration parameters of the transaction pool. // TxPoolConfig are the configuration parameters of the transaction pool.
@ -174,18 +175,19 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
type TxPool struct { type TxPool struct {
config TxPoolConfig config TxPoolConfig
chainconfig *params.ChainConfig chainconfig *params.ChainConfig
blockChain blockChain chain blockChain
pendingState *state.ManagedState
gasPrice *big.Int gasPrice *big.Int
txFeed event.Feed txFeed event.Feed
scope event.SubscriptionScope scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription chainHeadSub event.Subscription
rmTxCh chan RemovedTransactionEvent
rmTxSub event.Subscription
signer types.Signer signer types.Signer
mu sync.RWMutex mu sync.RWMutex
currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
currentMaxGas *big.Int // Current gas limit for transaction caps
locals *accountSet // Set of local transaction to exepmt from evicion rules locals *accountSet // Set of local transaction to exepmt from evicion rules
journal *txJournal // Journal of local transaction to back up to disk journal *txJournal // Journal of local transaction to back up to disk
@ -202,7 +204,7 @@ type TxPool struct {
// NewTxPool creates a new transaction pool to gather, sort and filter inbound // NewTxPool creates a new transaction pool to gather, sort and filter inbound
// trnsactions from the network. // trnsactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool { func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set // Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize() config = (&config).sanitize()
@ -210,20 +212,18 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain
pool := &TxPool{ pool := &TxPool{
config: config, config: config,
chainconfig: chainconfig, chainconfig: chainconfig,
blockChain: blockChain, chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainId), signer: types.NewEIP155Signer(chainconfig.ChainId),
pending: make(map[common.Address]*txList), pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList), queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time), beats: make(map[common.Address]time.Time),
all: make(map[common.Hash]*types.Transaction), all: make(map[common.Hash]*types.Transaction),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
rmTxCh: make(chan RemovedTransactionEvent, rmTxChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit), gasPrice: new(big.Int).SetUint64(config.PriceLimit),
pendingState: nil,
} }
pool.locals = newAccountSet(pool.signer) pool.locals = newAccountSet(pool.signer)
pool.priced = newTxPricedList(&pool.all) pool.priced = newTxPricedList(&pool.all)
pool.reset() pool.reset(nil, chain.CurrentHeader())
// If local transactions and journaling is enabled, load from disk // If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" { if !config.NoLocals && config.Journal != "" {
@ -237,8 +237,8 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain
} }
} }
// Subscribe events from blockchain // Subscribe events from blockchain
pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh) pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh)
// Start the event loop and return // Start the event loop and return
pool.wg.Add(1) pool.wg.Add(1)
go pool.loop() go pool.loop()
@ -264,31 +264,28 @@ func (pool *TxPool) loop() {
journal := time.NewTicker(pool.config.Rejournal) journal := time.NewTicker(pool.config.Rejournal)
defer journal.Stop() defer journal.Stop()
// Track the previous head headers for transaction reorgs
head := pool.chain.CurrentHeader()
// Keep waiting for and reacting to the various events // Keep waiting for and reacting to the various events
for { for {
select { select {
// Handle ChainHeadEvent // Handle ChainHeadEvent
case ev := <-pool.chainHeadCh: case ev := <-pool.chainHeadCh:
pool.mu.Lock()
if ev.Block != nil { if ev.Block != nil {
pool.mu.Lock()
if pool.chainconfig.IsHomestead(ev.Block.Number()) { if pool.chainconfig.IsHomestead(ev.Block.Number()) {
pool.homestead = true pool.homestead = true
} }
pool.reset(head, ev.Block.Header())
head = ev.Block.Header()
}
pool.reset()
pool.mu.Unlock() pool.mu.Unlock()
}
// Be unsubscribed due to system stopped // Be unsubscribed due to system stopped
case <-pool.chainHeadSub.Err(): case <-pool.chainHeadSub.Err():
return return
// Handle RemovedTransactionEvent
case ev := <-pool.rmTxCh:
pool.addTxs(ev.Txs, false)
// Be unsubscribed due to system stopped
case <-pool.rmTxSub.Err():
return
// Handle stats reporting ticks // Handle stats reporting ticks
case <-report.C: case <-report.C:
pool.mu.RLock() pool.mu.RLock()
@ -333,28 +330,76 @@ func (pool *TxPool) loop() {
// lockedReset is a wrapper around reset to allow calling it in a thread safe // lockedReset is a wrapper around reset to allow calling it in a thread safe
// manner. This method is only ever used in the tester! // manner. This method is only ever used in the tester!
func (pool *TxPool) lockedReset() { func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) {
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() defer pool.mu.Unlock()
pool.reset() pool.reset(oldHead, newHead)
} }
// reset retrieves the current state of the blockchain and ensures the content // reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state. // of the transaction pool is valid with regard to the chain state.
func (pool *TxPool) reset() { func (pool *TxPool) reset(oldHead, newHead *types.Header) {
currentState, err := pool.blockChain.State() // If we're reorging an old state, reinject all dropped transactions
if err != nil { var reinject types.Transactions
log.Error("Failed reset txpool state", "err", err)
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return return
} }
pool.pendingState = state.ManageState(currentState) }
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
reinject = types.TxDifference(discarded, included)
}
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentHeader() // Special case during testing
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed to reset txpool state", "err", err)
return
}
pool.currentState = statedb
pool.pendingState = state.ManageState(statedb)
pool.currentMaxGas = newHead.GasLimit
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
pool.addTxsLocked(reinject, false)
// validate the pool of pending transactions, this will remove // validate the pool of pending transactions, this will remove
// any transactions that have been included in the block or // any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g. // have been invalidated because of another transaction (e.g.
// higher gas price) // higher gas price)
pool.demoteUnexecutables(currentState) pool.demoteUnexecutables()
// Update all accounts to the latest known pending nonce // Update all accounts to the latest known pending nonce
for addr, list := range pool.pending { for addr, list := range pool.pending {
@ -363,16 +408,16 @@ func (pool *TxPool) reset() {
} }
// Check the queue and move transactions over to the pending if possible // Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid // or remove those that have become invalid
pool.promoteExecutables(currentState, nil) pool.promoteExecutables(nil)
} }
// Stop terminates the transaction pool. // Stop terminates the transaction pool.
func (pool *TxPool) Stop() { func (pool *TxPool) Stop() {
// Unsubscribe all subscriptions registered from txpool // Unsubscribe all subscriptions registered from txpool
pool.scope.Close() pool.scope.Close()
// Unsubscribe subscriptions registered from blockchain // Unsubscribe subscriptions registered from blockchain
pool.chainHeadSub.Unsubscribe() pool.chainHeadSub.Unsubscribe()
pool.rmTxSub.Unsubscribe()
pool.wg.Wait() pool.wg.Wait()
if pool.journal != nil { if pool.journal != nil {
@ -442,8 +487,8 @@ func (pool *TxPool) stats() (int, int) {
// Content retrieves the data content of the transaction pool, returning all the // Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and sorted by nonce. // pending as well as queued transactions, grouped by account and sorted by nonce.
func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
pool.mu.RLock() pool.mu.Lock()
defer pool.mu.RUnlock() defer pool.mu.Unlock()
pending := make(map[common.Address]types.Transactions) pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending { for addr, list := range pool.pending {
@ -499,7 +544,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrNegativeValue return ErrNegativeValue
} }
// Ensure the transaction doesn't exceed the current block limit gas. // Ensure the transaction doesn't exceed the current block limit gas.
if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 { if pool.currentMaxGas.Cmp(tx.Gas()) < 0 {
return ErrGasLimit return ErrGasLimit
} }
// Make sure the transaction is signed properly // Make sure the transaction is signed properly
@ -513,16 +558,12 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrUnderpriced return ErrUnderpriced
} }
// Ensure the transaction adheres to nonce ordering // Ensure the transaction adheres to nonce ordering
currentState, err := pool.blockChain.State() if pool.currentState.GetNonce(from) > tx.Nonce() {
if err != nil {
return err
}
if currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow return ErrNonceTooLow
} }
// Transactor should have enough funds to cover the costs // Transactor should have enough funds to cover the costs
// cost == V + GP * GL // cost == V + GP * GL
if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds return ErrInsufficientFunds
} }
intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
@ -721,12 +762,8 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
} }
// If we added a new transaction, run promotion checks and return // If we added a new transaction, run promotion checks and return
if !replace { if !replace {
state, err := pool.blockChain.State()
if err != nil {
return err
}
from, _ := types.Sender(pool.signer, tx) // already validated from, _ := types.Sender(pool.signer, tx) // already validated
pool.promoteExecutables(state, []common.Address{from}) pool.promoteExecutables([]common.Address{from})
} }
return nil return nil
} }
@ -736,6 +773,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() defer pool.mu.Unlock()
return pool.addTxsLocked(txs, local)
}
// addTxsLocked attempts to queue a batch of transactions if they are valid,
// whilst assuming the transaction pool lock is already held.
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error {
// Add the batch of transaction, tracking the accepted ones // Add the batch of transaction, tracking the accepted ones
dirty := make(map[common.Address]struct{}) dirty := make(map[common.Address]struct{})
for _, tx := range txs { for _, tx := range txs {
@ -748,15 +791,11 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
} }
// Only reprocess the internal state if something was actually added // Only reprocess the internal state if something was actually added
if len(dirty) > 0 { if len(dirty) > 0 {
state, err := pool.blockChain.State()
if err != nil {
return err
}
addrs := make([]common.Address, 0, len(dirty)) addrs := make([]common.Address, 0, len(dirty))
for addr, _ := range dirty { for addr, _ := range dirty {
addrs = append(addrs, addr) addrs = append(addrs, addr)
} }
pool.promoteExecutables(state, addrs) pool.promoteExecutables(addrs)
} }
return nil return nil
} }
@ -770,24 +809,6 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
return pool.all[hash] return pool.all[hash]
} }
// Remove removes the transaction with the given hash from the pool.
func (pool *TxPool) Remove(hash common.Hash) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.removeTx(hash)
}
// RemoveBatch removes all given transactions from the pool.
func (pool *TxPool) RemoveBatch(txs types.Transactions) {
pool.mu.Lock()
defer pool.mu.Unlock()
for _, tx := range txs {
pool.removeTx(tx.Hash())
}
}
// removeTx removes a single transaction from the queue, moving all subsequent // removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue. // transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash) { func (pool *TxPool) removeTx(hash common.Hash) {
@ -834,9 +855,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// promoteExecutables moves transactions that have become processable from the // promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all // future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted. // invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) { func (pool *TxPool) promoteExecutables(accounts []common.Address) {
gaslimit := pool.blockChain.GasLimit()
// Gather all the accounts potentially needing updates // Gather all the accounts potentially needing updates
if accounts == nil { if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue)) accounts = make([]common.Address, 0, len(pool.queue))
@ -851,14 +870,14 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
continue // Just in case someone calls with a non existing account continue // Just in case someone calls with a non existing account
} }
// Drop all transactions that are deemed too old (low nonce) // Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(state.GetNonce(addr)) { for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash() hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash) log.Trace("Removed old queued transaction", "hash", hash)
delete(pool.all, hash) delete(pool.all, hash)
pool.priced.Removed() pool.priced.Removed()
} }
// Drop all transactions that are too costly (low balance or out of gas) // Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(state.GetBalance(addr), gaslimit) drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops { for _, tx := range drops {
hash := tx.Hash() hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash) log.Trace("Removed unpayable queued transaction", "hash", hash)
@ -1003,12 +1022,10 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
// demoteUnexecutables removes invalid and processed transactions from the pools // demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable // executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue. // are moved back into the future queue.
func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { func (pool *TxPool) demoteUnexecutables() {
gaslimit := pool.blockChain.GasLimit()
// Iterate over all accounts and demote any non-executable transactions // Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.pending { for addr, list := range pool.pending {
nonce := state.GetNonce(addr) nonce := pool.currentState.GetNonce(addr)
// Drop all transactions that are deemed too old (low nonce) // Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(nonce) { for _, tx := range list.Forward(nonce) {
@ -1018,7 +1035,7 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
pool.priced.Removed() pool.priced.Removed()
} }
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(state.GetBalance(addr), gaslimit) drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops { for _, tx := range drops {
hash := tx.Hash() hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash) log.Trace("Removed unpayable pending transaction", "hash", hash)
@ -1031,6 +1048,14 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
log.Trace("Demoting pending transaction", "hash", hash) log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx) pool.enqueueTx(hash, tx)
} }
// If there's a gap in front, warn (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
for _, tx := range list.Cap(0) {
hash := tx.Hash()
log.Error("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
}
// Delete the entire queue entry if it became empty. // Delete the entire queue entry if it became empty.
if list.Empty() { if list.Empty() {
delete(pool.pending, addr) delete(pool.pending, addr)

View File

@ -48,23 +48,24 @@ type testBlockChain struct {
statedb *state.StateDB statedb *state.StateDB
gasLimit *big.Int gasLimit *big.Int
chainHeadFeed *event.Feed chainHeadFeed *event.Feed
rmTxFeed *event.Feed
} }
func (bc *testBlockChain) State() (*state.StateDB, error) { func (bc *testBlockChain) CurrentHeader() *types.Header {
return bc.statedb, nil return &types.Header{
} GasLimit: bc.gasLimit,
}
func (bc *testBlockChain) GasLimit() *big.Int {
return new(big.Int).Set(bc.gasLimit)
} }
func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
return bc.chainHeadFeed.Subscribe(ch) return bc.chainHeadFeed.Subscribe(ch)
} }
func (bc *testBlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
return bc.rmTxFeed.Subscribe(ch) return types.NewBlock(bc.CurrentHeader(), nil, nil, nil)
}
func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
return bc.statedb, nil
} }
func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction { func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction {
@ -79,7 +80,7 @@ func pricedTransaction(nonce uint64, gaslimit, gasprice *big.Int, key *ecdsa.Pri
func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
key, _ := crypto.GenerateKey() key, _ := crypto.GenerateKey()
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
@ -159,7 +160,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) {
// setup pool with 2 transaction in it // setup pool with 2 transaction in it
statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether)) statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether))
blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed), new(event.Feed)}, address, &trigger} blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed)}, address, &trigger}
tx0 := transaction(0, big.NewInt(100000), key) tx0 := transaction(0, big.NewInt(100000), key)
tx1 := transaction(1, big.NewInt(100000), key) tx1 := transaction(1, big.NewInt(100000), key)
@ -182,7 +183,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) {
// trigger state change in the background // trigger state change in the background
trigger = true trigger = true
pool.lockedReset() pool.lockedReset(nil, nil)
pendingTx, err := pool.Pending() pendingTx, err := pool.Pending()
if err != nil { if err != nil {
@ -205,20 +206,20 @@ func TestInvalidTransactions(t *testing.T) {
tx := transaction(0, big.NewInt(100), key) tx := transaction(0, big.NewInt(100), key)
from, _ := deriveSender(tx) from, _ := deriveSender(tx)
currentState, _ := pool.blockChain.State()
currentState.AddBalance(from, big.NewInt(1)) pool.currentState.AddBalance(from, big.NewInt(1))
if err := pool.AddRemote(tx); err != ErrInsufficientFunds { if err := pool.AddRemote(tx); err != ErrInsufficientFunds {
t.Error("expected", ErrInsufficientFunds) t.Error("expected", ErrInsufficientFunds)
} }
balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice())) balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice()))
currentState.AddBalance(from, balance) pool.currentState.AddBalance(from, balance)
if err := pool.AddRemote(tx); err != ErrIntrinsicGas { if err := pool.AddRemote(tx); err != ErrIntrinsicGas {
t.Error("expected", ErrIntrinsicGas, "got", err) t.Error("expected", ErrIntrinsicGas, "got", err)
} }
currentState.SetNonce(from, 1) pool.currentState.SetNonce(from, 1)
currentState.AddBalance(from, big.NewInt(0xffffffffffffff)) pool.currentState.AddBalance(from, big.NewInt(0xffffffffffffff))
tx = transaction(0, big.NewInt(100000), key) tx = transaction(0, big.NewInt(100000), key)
if err := pool.AddRemote(tx); err != ErrNonceTooLow { if err := pool.AddRemote(tx); err != ErrNonceTooLow {
t.Error("expected", ErrNonceTooLow) t.Error("expected", ErrNonceTooLow)
@ -240,21 +241,20 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction(0, big.NewInt(100), key) tx := transaction(0, big.NewInt(100), key)
from, _ := deriveSender(tx) from, _ := deriveSender(tx)
currentState, _ := pool.blockChain.State() pool.currentState.AddBalance(from, big.NewInt(1000))
currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset(nil, nil)
pool.lockedReset()
pool.enqueueTx(tx.Hash(), tx) pool.enqueueTx(tx.Hash(), tx)
pool.promoteExecutables(currentState, []common.Address{from}) pool.promoteExecutables([]common.Address{from})
if len(pool.pending) != 1 { if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending)) t.Error("expected valid txs to be 1 is", len(pool.pending))
} }
tx = transaction(1, big.NewInt(100), key) tx = transaction(1, big.NewInt(100), key)
from, _ = deriveSender(tx) from, _ = deriveSender(tx)
currentState.SetNonce(from, 2) pool.currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx) pool.enqueueTx(tx.Hash(), tx)
pool.promoteExecutables(currentState, []common.Address{from}) pool.promoteExecutables([]common.Address{from})
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool") t.Error("expected transaction to be in tx pool")
} }
@ -270,15 +270,14 @@ func TestTransactionQueue(t *testing.T) {
tx2 := transaction(10, big.NewInt(100), key) tx2 := transaction(10, big.NewInt(100), key)
tx3 := transaction(11, big.NewInt(100), key) tx3 := transaction(11, big.NewInt(100), key)
from, _ = deriveSender(tx1) from, _ = deriveSender(tx1)
currentState, _ = pool.blockChain.State() pool.currentState.AddBalance(from, big.NewInt(1000))
currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset(nil, nil)
pool.lockedReset()
pool.enqueueTx(tx1.Hash(), tx1) pool.enqueueTx(tx1.Hash(), tx1)
pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx2.Hash(), tx2)
pool.enqueueTx(tx3.Hash(), tx3) pool.enqueueTx(tx3.Hash(), tx3)
pool.promoteExecutables(currentState, []common.Address{from}) pool.promoteExecutables([]common.Address{from})
if len(pool.pending) != 1 { if len(pool.pending) != 1 {
t.Error("expected tx pool to be 1, got", len(pool.pending)) t.Error("expected tx pool to be 1, got", len(pool.pending))
@ -288,45 +287,13 @@ func TestTransactionQueue(t *testing.T) {
} }
} }
func TestRemoveTx(t *testing.T) {
pool, key := setupTxPool()
defer pool.Stop()
addr := crypto.PubkeyToAddress(key.PublicKey)
currentState, _ := pool.blockChain.State()
currentState.AddBalance(addr, big.NewInt(1))
tx1 := transaction(0, big.NewInt(100), key)
tx2 := transaction(2, big.NewInt(100), key)
pool.promoteTx(addr, tx1.Hash(), tx1)
pool.enqueueTx(tx2.Hash(), tx2)
if len(pool.queue) != 1 {
t.Error("expected queue to be 1, got", len(pool.queue))
}
if len(pool.pending) != 1 {
t.Error("expected pending to be 1, got", len(pool.pending))
}
pool.Remove(tx1.Hash())
pool.Remove(tx2.Hash())
if len(pool.queue) > 0 {
t.Error("expected queue to be 0, got", len(pool.queue))
}
if len(pool.pending) > 0 {
t.Error("expected pending to be 0, got", len(pool.pending))
}
}
func TestNegativeValue(t *testing.T) { func TestNegativeValue(t *testing.T) {
pool, key := setupTxPool() pool, key := setupTxPool()
defer pool.Stop() defer pool.Stop()
tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(-1), big.NewInt(100), big.NewInt(1), nil), types.HomesteadSigner{}, key) tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(-1), big.NewInt(100), big.NewInt(1), nil), types.HomesteadSigner{}, key)
from, _ := deriveSender(tx) from, _ := deriveSender(tx)
currentState, _ := pool.blockChain.State() pool.currentState.AddBalance(from, big.NewInt(1))
currentState.AddBalance(from, big.NewInt(1))
if err := pool.AddRemote(tx); err != ErrNegativeValue { if err := pool.AddRemote(tx); err != ErrNegativeValue {
t.Error("expected", ErrNegativeValue, "got", err) t.Error("expected", ErrNegativeValue, "got", err)
} }
@ -340,10 +307,10 @@ func TestTransactionChainFork(t *testing.T) {
resetState := func() { resetState := func() {
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} statedb.AddBalance(addr, big.NewInt(100000000000000))
currentState, _ := pool.blockChain.State()
currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.chain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
pool.lockedReset() pool.lockedReset(nil, nil)
} }
resetState() resetState()
@ -351,7 +318,7 @@ func TestTransactionChainFork(t *testing.T) {
if _, err := pool.add(tx, false); err != nil { if _, err := pool.add(tx, false); err != nil {
t.Error("didn't expect error", err) t.Error("didn't expect error", err)
} }
pool.RemoveBatch([]*types.Transaction{tx}) pool.removeTx(tx.Hash())
// reset the pool's internal state // reset the pool's internal state
resetState() resetState()
@ -368,10 +335,10 @@ func TestTransactionDoubleNonce(t *testing.T) {
resetState := func() { resetState := func() {
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} statedb.AddBalance(addr, big.NewInt(100000000000000))
currentState, _ := pool.blockChain.State()
currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.chain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
pool.lockedReset() pool.lockedReset(nil, nil)
} }
resetState() resetState()
@ -387,8 +354,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
if replace, err := pool.add(tx2, false); err != nil || !replace { if replace, err := pool.add(tx2, false); err != nil || !replace {
t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
} }
state, _ := pool.blockChain.State() pool.promoteExecutables([]common.Address{addr})
pool.promoteExecutables(state, []common.Address{addr})
if pool.pending[addr].Len() != 1 { if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
} }
@ -397,7 +363,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
} }
// Add the third transaction and ensure it's not saved (smaller price) // Add the third transaction and ensure it's not saved (smaller price)
pool.add(tx3, false) pool.add(tx3, false)
pool.promoteExecutables(state, []common.Address{addr}) pool.promoteExecutables([]common.Address{addr})
if pool.pending[addr].Len() != 1 { if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
} }
@ -415,8 +381,7 @@ func TestMissingNonce(t *testing.T) {
defer pool.Stop() defer pool.Stop()
addr := crypto.PubkeyToAddress(key.PublicKey) addr := crypto.PubkeyToAddress(key.PublicKey)
currentState, _ := pool.blockChain.State() pool.currentState.AddBalance(addr, big.NewInt(100000000000000))
currentState.AddBalance(addr, big.NewInt(100000000000000))
tx := transaction(1, big.NewInt(100000), key) tx := transaction(1, big.NewInt(100000), key)
if _, err := pool.add(tx, false); err != nil { if _, err := pool.add(tx, false); err != nil {
t.Error("didn't expect error", err) t.Error("didn't expect error", err)
@ -432,47 +397,25 @@ func TestMissingNonce(t *testing.T) {
} }
} }
func TestNonceRecovery(t *testing.T) { func TestTransactionNonceRecovery(t *testing.T) {
const n = 10 const n = 10
pool, key := setupTxPool() pool, key := setupTxPool()
defer pool.Stop() defer pool.Stop()
addr := crypto.PubkeyToAddress(key.PublicKey) addr := crypto.PubkeyToAddress(key.PublicKey)
currentState, _ := pool.blockChain.State() pool.currentState.SetNonce(addr, n)
currentState.SetNonce(addr, n) pool.currentState.AddBalance(addr, big.NewInt(100000000000000))
currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset(nil, nil)
pool.lockedReset()
tx := transaction(n, big.NewInt(100000), key) tx := transaction(n, big.NewInt(100000), key)
if err := pool.AddRemote(tx); err != nil { if err := pool.AddRemote(tx); err != nil {
t.Error(err) t.Error(err)
} }
// simulate some weird re-order of transactions and missing nonce(s) // simulate some weird re-order of transactions and missing nonce(s)
currentState.SetNonce(addr, n-1) pool.currentState.SetNonce(addr, n-1)
pool.lockedReset() pool.lockedReset(nil, nil)
if fn := pool.pendingState.GetNonce(addr); fn != n+1 { if fn := pool.pendingState.GetNonce(addr); fn != n-1 {
t.Errorf("expected nonce to be %d, got %d", n+1, fn) t.Errorf("expected nonce to be %d, got %d", n-1, fn)
}
}
func TestRemovedTxEvent(t *testing.T) {
pool, key := setupTxPool()
defer pool.Stop()
tx := transaction(0, big.NewInt(1000000), key)
from, _ := deriveSender(tx)
currentState, _ := pool.blockChain.State()
currentState.AddBalance(from, big.NewInt(1000000000000))
pool.lockedReset()
blockChain, _ := pool.blockChain.(*testBlockChain)
blockChain.rmTxFeed.Send(RemovedTransactionEvent{types.Transactions{tx}})
blockChain.chainHeadFeed.Send(ChainHeadEvent{nil})
// wait for handling events
<-time.After(500 * time.Millisecond)
if pool.pending[from].Len() != 1 {
t.Error("expected 1 pending tx, got", pool.pending[from].Len())
}
if len(pool.all) != 1 {
t.Error("expected 1 total transactions, got", len(pool.all))
} }
} }
@ -484,9 +427,7 @@ func TestTransactionDropping(t *testing.T) {
defer pool.Stop() defer pool.Stop()
account, _ := deriveSender(transaction(0, big.NewInt(0), key)) account, _ := deriveSender(transaction(0, big.NewInt(0), key))
pool.currentState.AddBalance(account, big.NewInt(1000))
state, _ := pool.blockChain.State()
state.AddBalance(account, big.NewInt(1000))
// Add some pending and some queued transactions // Add some pending and some queued transactions
var ( var (
@ -514,7 +455,7 @@ func TestTransactionDropping(t *testing.T) {
if len(pool.all) != 6 { if len(pool.all) != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6)
} }
pool.lockedReset() pool.lockedReset(nil, nil)
if pool.pending[account].Len() != 3 { if pool.pending[account].Len() != 3 {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3) t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
} }
@ -525,8 +466,8 @@ func TestTransactionDropping(t *testing.T) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6)
} }
// Reduce the balance of the account, and check that invalidated transactions are dropped // Reduce the balance of the account, and check that invalidated transactions are dropped
state.AddBalance(account, big.NewInt(-650)) pool.currentState.AddBalance(account, big.NewInt(-650))
pool.lockedReset() pool.lockedReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0) t.Errorf("funded pending transaction missing: %v", tx0)
@ -550,8 +491,8 @@ func TestTransactionDropping(t *testing.T) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4)
} }
// Reduce the block gas limit, check that invalidated transactions are dropped // Reduce the block gas limit, check that invalidated transactions are dropped
pool.blockChain.(*testBlockChain).gasLimit = big.NewInt(100) pool.chain.(*testBlockChain).gasLimit = big.NewInt(100)
pool.lockedReset() pool.lockedReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0) t.Errorf("funded pending transaction missing: %v", tx0)
@ -579,9 +520,7 @@ func TestTransactionPostponing(t *testing.T) {
defer pool.Stop() defer pool.Stop()
account, _ := deriveSender(transaction(0, big.NewInt(0), key)) account, _ := deriveSender(transaction(0, big.NewInt(0), key))
pool.currentState.AddBalance(account, big.NewInt(1000))
state, _ := pool.blockChain.State()
state.AddBalance(account, big.NewInt(1000))
// Add a batch consecutive pending transactions for validation // Add a batch consecutive pending transactions for validation
txns := []*types.Transaction{} txns := []*types.Transaction{}
@ -605,7 +544,7 @@ func TestTransactionPostponing(t *testing.T) {
if len(pool.all) != len(txns) { if len(pool.all) != len(txns) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)) t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns))
} }
pool.lockedReset() pool.lockedReset(nil, nil)
if pool.pending[account].Len() != len(txns) { if pool.pending[account].Len() != len(txns) {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns)) t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns))
} }
@ -616,8 +555,8 @@ func TestTransactionPostponing(t *testing.T) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)) t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns))
} }
// Reduce the balance of the account, and check that transactions are reorganised // Reduce the balance of the account, and check that transactions are reorganised
state.AddBalance(account, big.NewInt(-750)) pool.currentState.AddBalance(account, big.NewInt(-750))
pool.lockedReset() pool.lockedReset(nil, nil)
if _, ok := pool.pending[account].txs.items[txns[0].Nonce()]; !ok { if _, ok := pool.pending[account].txs.items[txns[0].Nonce()]; !ok {
t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0]) t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0])
@ -655,10 +594,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
defer pool.Stop() defer pool.Stop()
account, _ := deriveSender(transaction(0, big.NewInt(0), key)) account, _ := deriveSender(transaction(0, big.NewInt(0), key))
pool.currentState.AddBalance(account, big.NewInt(1000000))
state, _ := pool.blockChain.State()
state.AddBalance(account, big.NewInt(1000000))
pool.lockedReset()
// Keep queuing up transactions and make sure all above a limit are dropped // Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ { for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ {
@ -699,7 +635,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
// Create the pool to test the limit enforcement with // Create the pool to test the limit enforcement with
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
config := testTxPoolConfig config := testTxPoolConfig
config.NoLocals = nolocals config.NoLocals = nolocals
@ -709,12 +645,10 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
defer pool.Stop() defer pool.Stop()
// Create a number of test accounts and fund them (last one will be the local) // Create a number of test accounts and fund them (last one will be the local)
state, _ := pool.blockChain.State()
keys := make([]*ecdsa.PrivateKey, 5) keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey() keys[i], _ = crypto.GenerateKey()
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
} }
local := keys[len(keys)-1] local := keys[len(keys)-1]
@ -790,7 +724,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
// Create the pool to test the non-expiration enforcement // Create the pool to test the non-expiration enforcement
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
config := testTxPoolConfig config := testTxPoolConfig
config.Lifetime = time.Second config.Lifetime = time.Second
@ -803,9 +737,8 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
local, _ := crypto.GenerateKey() local, _ := crypto.GenerateKey()
remote, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey()
state, _ := pool.blockChain.State() pool.currentState.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
state.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) pool.currentState.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
state.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
// Add the two transactions and ensure they both are queued up // Add the two transactions and ensure they both are queued up
if err := pool.AddLocal(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), local)); err != nil { if err := pool.AddLocal(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), local)); err != nil {
@ -854,10 +787,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
defer pool.Stop() defer pool.Stop()
account, _ := deriveSender(transaction(0, big.NewInt(0), key)) account, _ := deriveSender(transaction(0, big.NewInt(0), key))
pool.currentState.AddBalance(account, big.NewInt(1000000))
state, _ := pool.blockChain.State()
state.AddBalance(account, big.NewInt(1000000))
pool.lockedReset()
// Keep queuing up transactions and make sure all above a limit are dropped // Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
@ -887,8 +817,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
defer pool1.Stop() defer pool1.Stop()
account1, _ := deriveSender(transaction(0, big.NewInt(0), key1)) account1, _ := deriveSender(transaction(0, big.NewInt(0), key1))
state1, _ := pool1.blockChain.State() pool1.currentState.AddBalance(account1, big.NewInt(1000000))
state1.AddBalance(account1, big.NewInt(1000000))
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
if err := pool1.AddRemote(transaction(origin+i, big.NewInt(100000), key1)); err != nil { if err := pool1.AddRemote(transaction(origin+i, big.NewInt(100000), key1)); err != nil {
@ -900,8 +829,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
defer pool2.Stop() defer pool2.Stop()
account2, _ := deriveSender(transaction(0, big.NewInt(0), key2)) account2, _ := deriveSender(transaction(0, big.NewInt(0), key2))
state2, _ := pool2.blockChain.State() pool2.currentState.AddBalance(account2, big.NewInt(1000000))
state2.AddBalance(account2, big.NewInt(1000000))
txns := []*types.Transaction{} txns := []*types.Transaction{}
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
@ -934,7 +862,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
// Create the pool to test the limit enforcement with // Create the pool to test the limit enforcement with
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
config := testTxPoolConfig config := testTxPoolConfig
config.GlobalSlots = config.AccountSlots * 10 config.GlobalSlots = config.AccountSlots * 10
@ -943,12 +871,10 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
defer pool.Stop() defer pool.Stop()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
state, _ := pool.blockChain.State()
keys := make([]*ecdsa.PrivateKey, 5) keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey() keys[i], _ = crypto.GenerateKey()
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
} }
// Generate and queue a batch of transactions // Generate and queue a batch of transactions
nonces := make(map[common.Address]uint64) nonces := make(map[common.Address]uint64)
@ -981,7 +907,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
// Create the pool to test the limit enforcement with // Create the pool to test the limit enforcement with
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
config := testTxPoolConfig config := testTxPoolConfig
config.AccountSlots = 2 config.AccountSlots = 2
@ -992,11 +918,9 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
defer pool.Stop() defer pool.Stop()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
state, _ := pool.blockChain.State()
key, _ := crypto.GenerateKey() key, _ := crypto.GenerateKey()
addr := crypto.PubkeyToAddress(key.PublicKey) addr := crypto.PubkeyToAddress(key.PublicKey)
state.AddBalance(addr, big.NewInt(1000000)) pool.currentState.AddBalance(addr, big.NewInt(1000000))
txs := types.Transactions{} txs := types.Transactions{}
for j := 0; j < int(config.GlobalSlots)*2; j++ { for j := 0; j < int(config.GlobalSlots)*2; j++ {
@ -1016,7 +940,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
// Create the pool to test the limit enforcement with // Create the pool to test the limit enforcement with
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
config := testTxPoolConfig config := testTxPoolConfig
config.GlobalSlots = 0 config.GlobalSlots = 0
@ -1025,12 +949,10 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
defer pool.Stop() defer pool.Stop()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
state, _ := pool.blockChain.State()
keys := make([]*ecdsa.PrivateKey, 5) keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey() keys[i], _ = crypto.GenerateKey()
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
} }
// Generate and queue a batch of transactions // Generate and queue a batch of transactions
nonces := make(map[common.Address]uint64) nonces := make(map[common.Address]uint64)
@ -1065,18 +987,16 @@ func TestTransactionPoolRepricing(t *testing.T) {
// Create the pool to test the pricing enforcement with // Create the pool to test the pricing enforcement with
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop() defer pool.Stop()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
state, _ := pool.blockChain.State()
keys := make([]*ecdsa.PrivateKey, 3) keys := make([]*ecdsa.PrivateKey, 3)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey() keys[i], _ = crypto.GenerateKey()
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
} }
// Generate and queue a batch of transactions, both pending and queued // Generate and queue a batch of transactions, both pending and queued
txs := types.Transactions{} txs := types.Transactions{}
@ -1147,18 +1067,16 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
// Create the pool to test the pricing enforcement with // Create the pool to test the pricing enforcement with
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop() defer pool.Stop()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
state, _ := pool.blockChain.State()
keys := make([]*ecdsa.PrivateKey, 3) keys := make([]*ecdsa.PrivateKey, 3)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey() keys[i], _ = crypto.GenerateKey()
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000*1000000)) pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000*1000000))
} }
// Create transaction (both pending and queued) with a linearly growing gasprice // Create transaction (both pending and queued) with a linearly growing gasprice
for i := uint64(0); i < 500; i++ { for i := uint64(0); i < 500; i++ {
@ -1210,7 +1128,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
// Create the pool to test the pricing enforcement with // Create the pool to test the pricing enforcement with
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
config := testTxPoolConfig config := testTxPoolConfig
config.GlobalSlots = 2 config.GlobalSlots = 2
@ -1220,12 +1138,10 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
defer pool.Stop() defer pool.Stop()
// Create a number of test accounts and fund them // Create a number of test accounts and fund them
state, _ := pool.blockChain.State()
keys := make([]*ecdsa.PrivateKey, 3) keys := make([]*ecdsa.PrivateKey, 3)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey() keys[i], _ = crypto.GenerateKey()
state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
} }
// Generate and queue a batch of transactions, both pending and queued // Generate and queue a batch of transactions, both pending and queued
txs := types.Transactions{} txs := types.Transactions{}
@ -1298,16 +1214,14 @@ func TestTransactionReplacement(t *testing.T) {
// Create the pool to test the pricing enforcement with // Create the pool to test the pricing enforcement with
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop() defer pool.Stop()
// Create a test account to add transactions with // Create a test account to add transactions with
key, _ := crypto.GenerateKey() key, _ := crypto.GenerateKey()
pool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000))
state, _ := pool.blockChain.State()
state.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000))
// Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) // Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
price := int64(100) price := int64(100)
@ -1378,7 +1292,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
// Create the original pool to inject transaction into the journal // Create the original pool to inject transaction into the journal
db, _ := ethdb.NewMemDatabase() db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
config := testTxPoolConfig config := testTxPoolConfig
config.NoLocals = nolocals config.NoLocals = nolocals
@ -1391,9 +1305,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
local, _ := crypto.GenerateKey() local, _ := crypto.GenerateKey()
remote, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey()
statedb, _ = pool.blockChain.State() pool.currentState.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
statedb.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) pool.currentState.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
statedb.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
// Add three local and a remote transactions and ensure they are queued up // Add three local and a remote transactions and ensure they are queued up
if err := pool.AddLocal(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), local)); err != nil { if err := pool.AddLocal(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), local)); err != nil {
@ -1421,7 +1334,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
// Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive
pool.Stop() pool.Stop()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
pool = NewTxPool(config, params.TestChainConfig, blockchain) pool = NewTxPool(config, params.TestChainConfig, blockchain)
pending, queued = pool.Stats() pending, queued = pool.Stats()
@ -1442,11 +1355,11 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
} }
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed // Bump the nonce temporarily and ensure the newly invalidated transaction is removed
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
pool.lockedReset() pool.lockedReset(nil, nil)
time.Sleep(2 * config.Rejournal) time.Sleep(2 * config.Rejournal)
pool.Stop() pool.Stop()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
pool = NewTxPool(config, params.TestChainConfig, blockchain) pool = NewTxPool(config, params.TestChainConfig, blockchain)
pending, queued = pool.Stats() pending, queued = pool.Stats()
@ -1480,8 +1393,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
defer pool.Stop() defer pool.Stop()
account, _ := deriveSender(transaction(0, big.NewInt(0), key)) account, _ := deriveSender(transaction(0, big.NewInt(0), key))
state, _ := pool.blockChain.State() pool.currentState.AddBalance(account, big.NewInt(1000000))
state.AddBalance(account, big.NewInt(1000000))
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
tx := transaction(uint64(i), big.NewInt(100000), key) tx := transaction(uint64(i), big.NewInt(100000), key)
@ -1490,7 +1402,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
// Benchmark the speed of pool validation // Benchmark the speed of pool validation
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
pool.demoteUnexecutables(state) pool.demoteUnexecutables()
} }
} }
@ -1506,8 +1418,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
defer pool.Stop() defer pool.Stop()
account, _ := deriveSender(transaction(0, big.NewInt(0), key)) account, _ := deriveSender(transaction(0, big.NewInt(0), key))
state, _ := pool.blockChain.State() pool.currentState.AddBalance(account, big.NewInt(1000000))
state.AddBalance(account, big.NewInt(1000000))
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
tx := transaction(uint64(1+i), big.NewInt(100000), key) tx := transaction(uint64(1+i), big.NewInt(100000), key)
@ -1516,7 +1427,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
// Benchmark the speed of pool validation // Benchmark the speed of pool validation
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
pool.promoteExecutables(state, nil) pool.promoteExecutables(nil)
} }
} }
@ -1527,8 +1438,7 @@ func BenchmarkPoolInsert(b *testing.B) {
defer pool.Stop() defer pool.Stop()
account, _ := deriveSender(transaction(0, big.NewInt(0), key)) account, _ := deriveSender(transaction(0, big.NewInt(0), key))
state, _ := pool.blockChain.State() pool.currentState.AddBalance(account, big.NewInt(1000000))
state.AddBalance(account, big.NewInt(1000000))
txs := make(types.Transactions, b.N) txs := make(types.Transactions, b.N)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -1552,8 +1462,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) {
defer pool.Stop() defer pool.Stop()
account, _ := deriveSender(transaction(0, big.NewInt(0), key)) account, _ := deriveSender(transaction(0, big.NewInt(0), key))
state, _ := pool.blockChain.State() pool.currentState.AddBalance(account, big.NewInt(1000000))
state.AddBalance(account, big.NewInt(1000000))
batches := make([]types.Transactions, b.N) batches := make([]types.Transactions, b.N)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {

View File

@ -115,10 +115,6 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta
return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil
} }
func (b *EthApiBackend) SubscribeRemovedTxEvent(ch chan<- core.RemovedTransactionEvent) event.Subscription {
return b.eth.BlockChain().SubscribeRemovedTxEvent(ch)
}
func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch) return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
} }
@ -143,10 +139,6 @@ func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
return b.eth.txPool.AddLocal(signedTx) return b.eth.txPool.AddLocal(signedTx)
} }
func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
b.eth.txPool.Remove(txHash)
}
func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) { func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
pending, err := b.eth.txPool.Pending() pending, err := b.eth.txPool.Pending()
if err != nil { if err != nil {

View File

@ -1265,7 +1265,6 @@ func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, sendArgs SendTxAr
if err != nil { if err != nil {
return common.Hash{}, err return common.Hash{}, err
} }
s.b.RemoveTx(p.Hash())
if err = s.b.SendTx(ctx, signedTx); err != nil { if err = s.b.SendTx(ctx, signedTx); err != nil {
return common.Hash{}, err return common.Hash{}, err
} }

View File

@ -59,7 +59,6 @@ type Backend interface {
// TxPool API // TxPool API
SendTx(ctx context.Context, signedTx *types.Transaction) error SendTx(ctx context.Context, signedTx *types.Transaction) error
RemoveTx(txHash common.Hash)
GetPoolTransactions() (types.Transactions, error) GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction GetPoolTransaction(txHash common.Hash) *types.Transaction
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)

View File

@ -71,7 +71,6 @@ type Work struct {
family *set.Set // family set (used for checking uncle invalidity) family *set.Set // family set (used for checking uncle invalidity)
uncles *set.Set // uncle set uncles *set.Set // uncle set
tcount int // tx count in cycle tcount int // tx count in cycle
failedTxs types.Transactions
Block *types.Block // the new block Block *types.Block // the new block
@ -477,8 +476,6 @@ func (self *worker) commitNewWork() {
txs := types.NewTransactionsByPriceAndNonce(pending) txs := types.NewTransactionsByPriceAndNonce(pending)
work.commitTransactions(self.mux, txs, self.chain, self.coinbase) work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
self.eth.TxPool().RemoveBatch(work.failedTxs)
// compute uncles for the new block. // compute uncles for the new block.
var ( var (
uncles []*types.Header uncles []*types.Header
@ -563,6 +560,16 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
log.Trace("Gas limit exceeded for current block", "sender", from) log.Trace("Gas limit exceeded for current block", "sender", from)
txs.Pop() txs.Pop()
case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
txs.Shift()
case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
txs.Pop()
case nil: case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account // Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...) coalescedLogs = append(coalescedLogs, logs...)
@ -570,10 +577,10 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
txs.Shift() txs.Shift()
default: default:
// Pop the current failed transaction without shifting in the next from the account // Strange error, discard the transaction and get the next in line (note, the
log.Trace("Transaction failed, will be removed", "hash", tx.Hash(), "err", err) // nonce-too-high clause will prevent us from executing in vain).
env.failedTxs = append(env.failedTxs, tx) log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
txs.Pop() txs.Shift()
} }
} }