forked from cerc-io/plugeth
core: bugfix state change race condition in txpool (#3412)
The transaction pool keeps track of the current nonce in its local pendingState. When a new block comes in the pendingState is reset. During the reset it fetches multiple times the current state through the use of the currentState callback. When a second block comes in during the reset its possible that the state changes during the reset. If that block holds transactions that are currently in the pool the local pendingState that is used to determine nonces can get out of sync.
This commit is contained in:
parent
0fe35b907a
commit
4e36b1e3da
@ -176,7 +176,7 @@ func (pool *TxPool) resetState() {
|
||||
// any transactions that have been included in the block or
|
||||
// have been invalidated because of another transaction (e.g.
|
||||
// higher gas price)
|
||||
pool.demoteUnexecutables()
|
||||
pool.demoteUnexecutables(currentState)
|
||||
|
||||
// Update all accounts to the latest known pending nonce
|
||||
for addr, list := range pool.pending {
|
||||
@ -185,7 +185,7 @@ func (pool *TxPool) resetState() {
|
||||
}
|
||||
// Check the queue and move transactions over to the pending if possible
|
||||
// or remove those that have become invalid
|
||||
pool.promoteExecutables()
|
||||
pool.promoteExecutables(currentState)
|
||||
}
|
||||
|
||||
func (pool *TxPool) Stop() {
|
||||
@ -196,8 +196,12 @@ func (pool *TxPool) Stop() {
|
||||
}
|
||||
|
||||
func (pool *TxPool) State() *state.ManagedState {
|
||||
pool.mu.RLock()
|
||||
defer pool.mu.RUnlock()
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
if pool.pendingState == nil {
|
||||
pool.resetState()
|
||||
}
|
||||
|
||||
return pool.pendingState
|
||||
}
|
||||
@ -237,21 +241,26 @@ func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common
|
||||
// Pending retrieves all currently processable transactions, groupped by origin
|
||||
// account and sorted by nonce. The returned transaction set is a copy and can be
|
||||
// freely modified by calling code.
|
||||
func (pool *TxPool) Pending() map[common.Address]types.Transactions {
|
||||
func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
state, err := pool.currentState()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check queue first
|
||||
pool.promoteExecutables()
|
||||
pool.promoteExecutables(state)
|
||||
|
||||
// invalidate any txs
|
||||
pool.demoteUnexecutables()
|
||||
pool.demoteUnexecutables(state)
|
||||
|
||||
pending := make(map[common.Address]types.Transactions)
|
||||
for addr, list := range pool.pending {
|
||||
pending[addr] = list.Flatten()
|
||||
}
|
||||
return pending
|
||||
return pending, nil
|
||||
}
|
||||
|
||||
// SetLocal marks a transaction as local, skipping gas price
|
||||
@ -410,13 +419,19 @@ func (pool *TxPool) Add(tx *types.Transaction) error {
|
||||
if err := pool.add(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
pool.promoteExecutables()
|
||||
|
||||
state, err := pool.currentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pool.promoteExecutables(state)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddBatch attempts to queue a batch of transactions.
|
||||
func (pool *TxPool) AddBatch(txs []*types.Transaction) {
|
||||
func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
@ -425,7 +440,15 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) {
|
||||
glog.V(logger.Debug).Infoln("tx error:", err)
|
||||
}
|
||||
}
|
||||
pool.promoteExecutables()
|
||||
|
||||
state, err := pool.currentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pool.promoteExecutables(state)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get returns a transaction if it is contained in the pool
|
||||
@ -499,17 +522,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
|
||||
// promoteExecutables moves transactions that have become processable from the
|
||||
// future queue to the set of pending transactions. During this process, all
|
||||
// invalidated transactions (low nonce, low balance) are deleted.
|
||||
func (pool *TxPool) promoteExecutables() {
|
||||
// Init delayed since tx pool could have been started before any state sync
|
||||
if pool.pendingState == nil {
|
||||
pool.resetState()
|
||||
}
|
||||
// Retrieve the current state to allow nonce and balance checking
|
||||
state, err := pool.currentState()
|
||||
if err != nil {
|
||||
glog.Errorf("Could not get current state: %v", err)
|
||||
return
|
||||
}
|
||||
func (pool *TxPool) promoteExecutables(state *state.StateDB) {
|
||||
// Iterate over all accounts and promote any executable transactions
|
||||
queued := uint64(0)
|
||||
for addr, list := range pool.queue {
|
||||
@ -645,13 +658,7 @@ func (pool *TxPool) promoteExecutables() {
|
||||
// demoteUnexecutables removes invalid and processed transactions from the pools
|
||||
// executable/pending queue and any subsequent transactions that become unexecutable
|
||||
// are moved back into the future queue.
|
||||
func (pool *TxPool) demoteUnexecutables() {
|
||||
// Retrieve the current state to allow nonce and balance checking
|
||||
state, err := pool.currentState()
|
||||
if err != nil {
|
||||
glog.V(logger.Info).Infoln("failed to get current state: %v", err)
|
||||
return
|
||||
}
|
||||
func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
|
||||
// Iterate over all accounts and demote any non-executable transactions
|
||||
for addr, list := range pool.pending {
|
||||
nonce := state.GetNonce(addr)
|
||||
|
@ -51,6 +51,80 @@ func deriveSender(tx *types.Transaction) (common.Address, error) {
|
||||
return types.Sender(types.HomesteadSigner{}, tx)
|
||||
}
|
||||
|
||||
// This test simulates a scenario where a new block is imported during a
|
||||
// state reset and tests whether the pending state is in sync with the
|
||||
// block head event that initiated the resetState().
|
||||
func TestStateChangeDuringPoolReset(t *testing.T) {
|
||||
var (
|
||||
db, _ = ethdb.NewMemDatabase()
|
||||
key, _ = crypto.GenerateKey()
|
||||
address = crypto.PubkeyToAddress(key.PublicKey)
|
||||
mux = new(event.TypeMux)
|
||||
statedb, _ = state.New(common.Hash{}, db)
|
||||
trigger = false
|
||||
)
|
||||
|
||||
// setup pool with 2 transaction in it
|
||||
statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether))
|
||||
|
||||
tx0 := transaction(0, big.NewInt(100000), key)
|
||||
tx1 := transaction(1, big.NewInt(100000), key)
|
||||
|
||||
// stateFunc is used multiple times to reset the pending state.
|
||||
// when simulate is true it will create a state that indicates
|
||||
// that tx0 and tx1 are included in the chain.
|
||||
stateFunc := func() (*state.StateDB, error) {
|
||||
// delay "state change" by one. The tx pool fetches the
|
||||
// state multiple times and by delaying it a bit we simulate
|
||||
// a state change between those fetches.
|
||||
stdb := statedb
|
||||
if trigger {
|
||||
statedb, _ = state.New(common.Hash{}, db)
|
||||
// simulate that the new head block included tx0 and tx1
|
||||
statedb.SetNonce(address, 2)
|
||||
statedb.SetBalance(address, new(big.Int).Mul(common.Big1, common.Ether))
|
||||
trigger = false
|
||||
}
|
||||
return stdb, nil
|
||||
}
|
||||
|
||||
gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) }
|
||||
|
||||
txpool := NewTxPool(testChainConfig(), mux, stateFunc, gasLimitFunc)
|
||||
txpool.resetState()
|
||||
|
||||
nonce := txpool.State().GetNonce(address)
|
||||
if nonce != 0 {
|
||||
t.Fatalf("Invalid nonce, want 0, got %d", nonce)
|
||||
}
|
||||
|
||||
txpool.AddBatch(types.Transactions{tx0, tx1})
|
||||
|
||||
nonce = txpool.State().GetNonce(address)
|
||||
if nonce != 2 {
|
||||
t.Fatalf("Invalid nonce, want 2, got %d", nonce)
|
||||
}
|
||||
|
||||
// trigger state change in the background
|
||||
trigger = true
|
||||
|
||||
txpool.resetState()
|
||||
|
||||
pendingTx, err := txpool.Pending()
|
||||
if err != nil {
|
||||
t.Fatalf("Could not fetch pending transactions: %v", err)
|
||||
}
|
||||
|
||||
for addr, txs := range pendingTx {
|
||||
t.Logf("%0x: %d\n", addr, len(txs))
|
||||
}
|
||||
|
||||
nonce = txpool.State().GetNonce(address)
|
||||
if nonce != 2 {
|
||||
t.Fatalf("Invalid nonce, want 2, got %d", nonce)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvalidTransactions(t *testing.T) {
|
||||
pool, key := setupTxPool()
|
||||
|
||||
@ -97,9 +171,10 @@ func TestTransactionQueue(t *testing.T) {
|
||||
from, _ := deriveSender(tx)
|
||||
currentState, _ := pool.currentState()
|
||||
currentState.AddBalance(from, big.NewInt(1000))
|
||||
pool.resetState()
|
||||
pool.enqueueTx(tx.Hash(), tx)
|
||||
|
||||
pool.promoteExecutables()
|
||||
pool.promoteExecutables(currentState)
|
||||
if len(pool.pending) != 1 {
|
||||
t.Error("expected valid txs to be 1 is", len(pool.pending))
|
||||
}
|
||||
@ -108,7 +183,7 @@ func TestTransactionQueue(t *testing.T) {
|
||||
from, _ = deriveSender(tx)
|
||||
currentState.SetNonce(from, 2)
|
||||
pool.enqueueTx(tx.Hash(), tx)
|
||||
pool.promoteExecutables()
|
||||
pool.promoteExecutables(currentState)
|
||||
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
|
||||
t.Error("expected transaction to be in tx pool")
|
||||
}
|
||||
@ -124,11 +199,13 @@ func TestTransactionQueue(t *testing.T) {
|
||||
from, _ = deriveSender(tx1)
|
||||
currentState, _ = pool.currentState()
|
||||
currentState.AddBalance(from, big.NewInt(1000))
|
||||
pool.resetState()
|
||||
|
||||
pool.enqueueTx(tx1.Hash(), tx1)
|
||||
pool.enqueueTx(tx2.Hash(), tx2)
|
||||
pool.enqueueTx(tx3.Hash(), tx3)
|
||||
|
||||
pool.promoteExecutables()
|
||||
pool.promoteExecutables(currentState)
|
||||
|
||||
if len(pool.pending) != 1 {
|
||||
t.Error("expected tx pool to be 1, got", len(pool.pending))
|
||||
@ -225,7 +302,8 @@ func TestTransactionDoubleNonce(t *testing.T) {
|
||||
if err := pool.add(tx2); err != nil {
|
||||
t.Error("didn't expect error", err)
|
||||
}
|
||||
pool.promoteExecutables()
|
||||
state, _ := pool.currentState()
|
||||
pool.promoteExecutables(state)
|
||||
if pool.pending[addr].Len() != 1 {
|
||||
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
|
||||
}
|
||||
@ -236,7 +314,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
|
||||
if err := pool.add(tx3); err != nil {
|
||||
t.Error("didn't expect error", err)
|
||||
}
|
||||
pool.promoteExecutables()
|
||||
pool.promoteExecutables(state)
|
||||
if pool.pending[addr].Len() != 1 {
|
||||
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
|
||||
}
|
||||
@ -295,6 +373,7 @@ func TestRemovedTxEvent(t *testing.T) {
|
||||
from, _ := deriveSender(tx)
|
||||
currentState, _ := pool.currentState()
|
||||
currentState.AddBalance(from, big.NewInt(1000000000000))
|
||||
pool.resetState()
|
||||
pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}})
|
||||
pool.eventMux.Post(ChainHeadEvent{nil})
|
||||
if pool.pending[from].Len() != 1 {
|
||||
@ -452,6 +531,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
|
||||
|
||||
state, _ := pool.currentState()
|
||||
state.AddBalance(account, big.NewInt(1000000))
|
||||
pool.resetState()
|
||||
|
||||
// Keep queuing up transactions and make sure all above a limit are dropped
|
||||
for i := uint64(1); i <= maxQueuedPerAccount+5; i++ {
|
||||
@ -564,6 +644,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
|
||||
|
||||
state, _ := pool.currentState()
|
||||
state.AddBalance(account, big.NewInt(1000000))
|
||||
pool.resetState()
|
||||
|
||||
// Keep queuing up transactions and make sure all above a limit are dropped
|
||||
for i := uint64(0); i < maxQueuedPerAccount+5; i++ {
|
||||
@ -733,7 +814,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
|
||||
// Benchmark the speed of pool validation
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pool.demoteUnexecutables()
|
||||
pool.demoteUnexecutables(state)
|
||||
}
|
||||
}
|
||||
|
||||
@ -757,7 +838,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
|
||||
// Benchmark the speed of pool validation
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pool.promoteExecutables()
|
||||
pool.promoteExecutables(state)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -131,15 +131,20 @@ func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
|
||||
b.eth.txPool.Remove(txHash)
|
||||
}
|
||||
|
||||
func (b *EthApiBackend) GetPoolTransactions() types.Transactions {
|
||||
func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
|
||||
b.eth.txMu.Lock()
|
||||
defer b.eth.txMu.Unlock()
|
||||
|
||||
pending, err := b.eth.txPool.Pending()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var txs types.Transactions
|
||||
for _, batch := range b.eth.txPool.Pending() {
|
||||
for _, batch := range pending {
|
||||
txs = append(txs, batch...)
|
||||
}
|
||||
return txs
|
||||
return txs, nil
|
||||
}
|
||||
|
||||
func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
|
||||
|
@ -93,7 +93,7 @@ type testTxPool struct {
|
||||
|
||||
// AddBatch appends a batch of transactions to the pool, and notifies any
|
||||
// listeners if the addition channel is non nil
|
||||
func (p *testTxPool) AddBatch(txs []*types.Transaction) {
|
||||
func (p *testTxPool) AddBatch(txs []*types.Transaction) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
@ -101,10 +101,12 @@ func (p *testTxPool) AddBatch(txs []*types.Transaction) {
|
||||
if p.added != nil {
|
||||
p.added <- txs
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pending returns all the transactions known to the pool
|
||||
func (p *testTxPool) Pending() map[common.Address]types.Transactions {
|
||||
func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
@ -116,7 +118,7 @@ func (p *testTxPool) Pending() map[common.Address]types.Transactions {
|
||||
for _, batch := range batches {
|
||||
sort.Sort(types.TxByNonce(batch))
|
||||
}
|
||||
return batches
|
||||
return batches, nil
|
||||
}
|
||||
|
||||
// newTestTransaction create a new dummy transaction.
|
||||
|
@ -98,11 +98,11 @@ var errorToString = map[int]string{
|
||||
|
||||
type txPool interface {
|
||||
// AddBatch should add the given transactions to the pool.
|
||||
AddBatch([]*types.Transaction)
|
||||
AddBatch([]*types.Transaction) error
|
||||
|
||||
// Pending should return pending transactions.
|
||||
// The slice should be modifiable by the caller.
|
||||
Pending() map[common.Address]types.Transactions
|
||||
Pending() (map[common.Address]types.Transactions, error)
|
||||
}
|
||||
|
||||
// statusData is the network packet for the status message.
|
||||
|
@ -46,7 +46,8 @@ type txsync struct {
|
||||
// syncTransactions starts sending all currently pending transactions to the given peer.
|
||||
func (pm *ProtocolManager) syncTransactions(p *peer) {
|
||||
var txs types.Transactions
|
||||
for _, batch := range pm.txpool.Pending() {
|
||||
pending, _ := pm.txpool.Pending()
|
||||
for _, batch := range pending {
|
||||
txs = append(txs, batch...)
|
||||
}
|
||||
if len(txs) == 0 {
|
||||
|
@ -1273,8 +1273,12 @@ func (s *PublicTransactionPoolAPI) SignTransaction(ctx context.Context, args Sig
|
||||
|
||||
// PendingTransactions returns the transactions that are in the transaction pool and have a from address that is one of
|
||||
// the accounts this node manages.
|
||||
func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
|
||||
pending := s.b.GetPoolTransactions()
|
||||
func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, error) {
|
||||
pending, err := s.b.GetPoolTransactions()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
transactions := make([]*RPCTransaction, 0, len(pending))
|
||||
for _, tx := range pending {
|
||||
var signer types.Signer = types.HomesteadSigner{}
|
||||
@ -1286,13 +1290,17 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
|
||||
transactions = append(transactions, newRPCPendingTransaction(tx))
|
||||
}
|
||||
}
|
||||
return transactions
|
||||
return transactions, nil
|
||||
}
|
||||
|
||||
// Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the
|
||||
// pool and reinsert it with the new gas price and limit.
|
||||
func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, tx Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) {
|
||||
pending := s.b.GetPoolTransactions()
|
||||
pending, err := s.b.GetPoolTransactions()
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
|
||||
for _, p := range pending {
|
||||
var signer types.Signer = types.HomesteadSigner{}
|
||||
if p.Protected() {
|
||||
|
@ -55,7 +55,7 @@ type Backend interface {
|
||||
// TxPool API
|
||||
SendTx(ctx context.Context, signedTx *types.Transaction) error
|
||||
RemoveTx(txHash common.Hash)
|
||||
GetPoolTransactions() types.Transactions
|
||||
GetPoolTransactions() (types.Transactions, error)
|
||||
GetPoolTransaction(txHash common.Hash) *types.Transaction
|
||||
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
|
||||
Stats() (pending int, queued int)
|
||||
|
@ -110,7 +110,7 @@ func (b *LesApiBackend) RemoveTx(txHash common.Hash) {
|
||||
b.eth.txPool.RemoveTx(txHash)
|
||||
}
|
||||
|
||||
func (b *LesApiBackend) GetPoolTransactions() types.Transactions {
|
||||
func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) {
|
||||
return b.eth.txPool.GetTransactions()
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,7 @@ type BlockChain interface {
|
||||
|
||||
type txPool interface {
|
||||
// AddTransactions should add the given transactions to the pool.
|
||||
AddBatch([]*types.Transaction)
|
||||
AddBatch([]*types.Transaction) error
|
||||
}
|
||||
|
||||
type ProtocolManager struct {
|
||||
@ -879,7 +879,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
if reqCnt > maxReqs || reqCnt > MaxTxSend {
|
||||
return errResp(ErrRequestRejected, "")
|
||||
}
|
||||
pm.txpool.AddBatch(txs)
|
||||
|
||||
if err := pm.txpool.AddBatch(txs); err != nil {
|
||||
return errResp(ErrUnexpectedResponse, "msg: %v", err)
|
||||
}
|
||||
|
||||
_, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
|
||||
pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
|
||||
|
||||
|
@ -500,7 +500,7 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
|
||||
|
||||
// GetTransactions returns all currently processable transactions.
|
||||
// The returned slice may be modified by the caller.
|
||||
func (self *TxPool) GetTransactions() (txs types.Transactions) {
|
||||
func (self *TxPool) GetTransactions() (txs types.Transactions, err error) {
|
||||
self.mu.RLock()
|
||||
defer self.mu.RUnlock()
|
||||
|
||||
@ -510,7 +510,7 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
|
||||
txs[i] = tx
|
||||
i++
|
||||
}
|
||||
return txs
|
||||
return txs, nil
|
||||
}
|
||||
|
||||
// Content retrieves the data content of the transaction pool, returning all the
|
||||
|
@ -519,7 +519,14 @@ func (self *worker) commitNewWork() {
|
||||
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
|
||||
core.ApplyDAOHardFork(work.state)
|
||||
}
|
||||
txs := types.NewTransactionsByPriceAndNonce(self.eth.TxPool().Pending())
|
||||
|
||||
pending, err := self.eth.TxPool().Pending()
|
||||
if err != nil {
|
||||
glog.Errorf("Could not fetch pending transactions: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
txs := types.NewTransactionsByPriceAndNonce(pending)
|
||||
work.commitTransactions(self.mux, txs, self.gasPrice, self.chain)
|
||||
|
||||
self.eth.TxPool().RemoveBatch(work.lowGasTxs)
|
||||
|
Loading…
Reference in New Issue
Block a user