core, eth, miner: moved nonce management to tx pool.

Removed the managed tx state from the chain manager to the transaction
pool where it's much easier to keep track of nonces (and manage them).
The transaction pool now also uses the queue and pending txs differently
where queued txs are now moved over to the pending queue (i.e. txs ready
for processing and propagation).
This commit is contained in:
obscuren 2015-06-03 22:22:20 +02:00
parent 5197aed7db
commit d09a6e5421
6 changed files with 66 additions and 104 deletions

View File

@ -214,19 +214,6 @@ func (self *ChainManager) TransState() *state.StateDB {
return self.transState return self.transState
} }
func (self *ChainManager) TxState() *state.ManagedState {
self.tsmu.RLock()
defer self.tsmu.RUnlock()
return self.txState
}
func (self *ChainManager) setTxState(statedb *state.StateDB) {
self.tsmu.Lock()
defer self.tsmu.Unlock()
self.txState = state.ManageState(statedb)
}
func (self *ChainManager) setTransState(statedb *state.StateDB) { func (self *ChainManager) setTransState(statedb *state.StateDB) {
self.transState = statedb self.transState = statedb
} }
@ -751,7 +738,7 @@ out:
case ev := <-events.Chan(): case ev := <-events.Chan():
switch ev := ev.(type) { switch ev := ev.(type) {
case queueEvent: case queueEvent:
for i, event := range ev.queue { for _, event := range ev.queue {
switch event := event.(type) { switch event := event.(type) {
case ChainEvent: case ChainEvent:
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
@ -760,12 +747,6 @@ out:
self.currentGasLimit = CalcGasLimit(event.Block) self.currentGasLimit = CalcGasLimit(event.Block)
self.eventMux.Post(ChainHeadEvent{event.Block}) self.eventMux.Post(ChainHeadEvent{event.Block})
} }
case ChainSplitEvent:
// On chain splits we need to reset the transaction state. We can't be sure whether the actual
// state of the accounts are still valid.
if i == ev.splitCount {
self.setTxState(state.New(event.Block.Root(), self.stateDb))
}
} }
self.eventMux.Post(event) self.eventMux.Post(event)

View File

@ -6,7 +6,6 @@ import (
"math/big" "math/big"
"sort" "sort"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
@ -38,10 +37,12 @@ type stateFn func() *state.StateDB
// current state) and future transactions. Transactions move between those // current state) and future transactions. Transactions move between those
// two states over time as they are received and processed. // two states over time as they are received and processed.
type TxPool struct { type TxPool struct {
quit chan bool // Quiting channel quit chan bool // Quiting channel
currentState stateFn // The state function which will allow us to do some pre checkes currentState stateFn // The state function which will allow us to do some pre checkes
state *state.ManagedState
gasLimit func() *big.Int // The current gas limit function callback gasLimit func() *big.Int // The current gas limit function callback
eventMux *event.TypeMux eventMux *event.TypeMux
events event.Subscription
mu sync.RWMutex mu sync.RWMutex
txs map[common.Hash]*types.Transaction // processable transactions txs map[common.Hash]*types.Transaction // processable transactions
@ -56,28 +57,41 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func(
eventMux: eventMux, eventMux: eventMux,
currentState: currentStateFn, currentState: currentStateFn,
gasLimit: gasLimitFn, gasLimit: gasLimitFn,
state: state.ManageState(currentStateFn()),
} }
} }
func (pool *TxPool) Start() { func (pool *TxPool) Start() {
// Queue timer will tick so we can attempt to move items from the queue to the pool.events = pool.eventMux.Subscribe(ChainEvent{})
// main transaction pool. for _ = range pool.events.Chan() {
queueTimer := time.NewTicker(300 * time.Millisecond) pool.mu.Lock()
// Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce) pool.state = state.ManageState(pool.currentState())
removalTimer := time.NewTicker(1 * time.Second)
done: for _, tx := range pool.txs {
for { if addr, err := tx.From(); err == nil {
select { pool.state.SetNonce(addr, tx.Nonce())
case <-queueTimer.C: }
pool.checkQueue()
case <-removalTimer.C:
pool.validatePool()
case <-pool.quit:
break done
} }
pool.checkQueue()
pool.mu.Unlock()
} }
} }
func (pool *TxPool) Stop() {
pool.txs = make(map[common.Hash]*types.Transaction)
close(pool.quit)
pool.events.Unsubscribe()
glog.V(logger.Info).Infoln("TX Pool stopped")
}
func (pool *TxPool) State() *state.ManagedState {
pool.mu.RLock()
defer pool.mu.RUnlock()
return pool.state
}
// validateTx checks whether a transaction is valid according // validateTx checks whether a transaction is valid according
// to the consensus rules. // to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error { func (pool *TxPool) validateTx(tx *types.Transaction) error {
@ -152,6 +166,9 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
} }
// check and validate the queueue
self.checkQueue()
return nil return nil
} }
@ -196,8 +213,13 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
// GetTransactions returns all currently processable transactions. // GetTransactions returns all currently processable transactions.
func (self *TxPool) GetTransactions() (txs types.Transactions) { func (self *TxPool) GetTransactions() (txs types.Transactions) {
self.mu.RLock() self.mu.Lock()
defer self.mu.RUnlock() defer self.mu.Unlock()
// check queue first
self.checkQueue()
// invalidate any txs
self.validatePool()
txs = make(types.Transactions, len(self.txs)) txs = make(types.Transactions, len(self.txs))
i := 0 i := 0
@ -232,12 +254,6 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
} }
} }
func (pool *TxPool) Stop() {
pool.txs = make(map[common.Hash]*types.Transaction)
close(pool.quit)
glog.V(logger.Info).Infoln("TX Pool stopped")
}
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
from, _ := tx.From() // already validated from, _ := tx.From() // already validated
if self.queue[from] == nil { if self.queue[from] == nil {
@ -246,9 +262,11 @@ func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
self.queue[from][hash] = tx self.queue[from][hash] = tx
} }
func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) { func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
if _, ok := pool.txs[hash]; !ok { if _, ok := pool.txs[hash]; !ok {
pool.txs[hash] = tx pool.txs[hash] = tx
pool.state.SetNonce(addr, tx.AccountNonce)
// Notify the subscribers. This event is posted in a goroutine // Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction" // because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock. // gets called which will then wait for the global tx pool lock and deadlock.
@ -258,34 +276,32 @@ func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) {
// checkQueue moves transactions that have become processable to main pool. // checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() { func (pool *TxPool) checkQueue() {
pool.mu.Lock() state := pool.state
defer pool.mu.Unlock()
statedb := pool.currentState()
var addq txQueue var addq txQueue
for address, txs := range pool.queue { for address, txs := range pool.queue {
curnonce := statedb.GetNonce(address) curnonce := state.GetNonce(address)
addq := addq[:0] addq := addq[:0]
for hash, tx := range txs { for hash, tx := range txs {
if tx.AccountNonce < curnonce { if tx.AccountNonce < curnonce {
fmt.Println("delete the tx", tx.AccountNonce, curnonce)
// Drop queued transactions whose nonce is lower than // Drop queued transactions whose nonce is lower than
// the account nonce because they have been processed. // the account nonce because they have been processed.
delete(txs, hash) delete(txs, hash)
} else { } else {
// Collect the remaining transactions for the next pass. // Collect the remaining transactions for the next pass.
addq = append(addq, txQueueEntry{hash, tx}) addq = append(addq, txQueueEntry{hash, address, tx})
} }
} }
// Find the next consecutive nonce range starting at the // Find the next consecutive nonce range starting at the
// current account nonce. // current account nonce.
sort.Sort(addq) sort.Sort(addq)
for _, e := range addq { for _, e := range addq {
if e.AccountNonce != curnonce { if e.AccountNonce > curnonce+1 {
break break
} }
curnonce++
delete(txs, e.hash) delete(txs, e.hash)
pool.addTx(e.hash, e.Transaction) pool.addTx(e.hash, address, e.Transaction)
} }
// Delete the entire queue entry if it became empty. // Delete the entire queue entry if it became empty.
if len(txs) == 0 { if len(txs) == 0 {
@ -313,9 +329,6 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// validatePool removes invalid and processed transactions from the main pool. // validatePool removes invalid and processed transactions from the main pool.
func (pool *TxPool) validatePool() { func (pool *TxPool) validatePool() {
pool.mu.Lock()
defer pool.mu.Unlock()
for hash, tx := range pool.txs { for hash, tx := range pool.txs {
if err := pool.validateTx(tx); err != nil { if err := pool.validateTx(tx); err != nil {
if glog.V(logger.Info) { if glog.V(logger.Info) {
@ -330,6 +343,7 @@ type txQueue []txQueueEntry
type txQueueEntry struct { type txQueueEntry struct {
hash common.Hash hash common.Hash
addr common.Address
*types.Transaction *types.Transaction
} }

View File

@ -37,21 +37,21 @@ func TestInvalidTransactions(t *testing.T) {
} }
from, _ := tx.From() from, _ := tx.From()
pool.currentState().AddBalance(from, big.NewInt(1)) pool.state.AddBalance(from, big.NewInt(1))
err = pool.Add(tx) err = pool.Add(tx)
if err != ErrInsufficientFunds { if err != ErrInsufficientFunds {
t.Error("expected", ErrInsufficientFunds) t.Error("expected", ErrInsufficientFunds)
} }
balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice())) balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice()))
pool.currentState().AddBalance(from, balance) pool.state.AddBalance(from, balance)
err = pool.Add(tx) err = pool.Add(tx)
if err != ErrIntrinsicGas { if err != ErrIntrinsicGas {
t.Error("expected", ErrIntrinsicGas, "got", err) t.Error("expected", ErrIntrinsicGas, "got", err)
} }
pool.currentState().SetNonce(from, 1) pool.state.SetNonce(from, 1)
pool.currentState().AddBalance(from, big.NewInt(0xffffffffffffff)) pool.state.AddBalance(from, big.NewInt(0xffffffffffffff))
tx.GasLimit = big.NewInt(100000) tx.GasLimit = big.NewInt(100000)
tx.Price = big.NewInt(1) tx.Price = big.NewInt(1)
tx.SignECDSA(key) tx.SignECDSA(key)
@ -67,7 +67,7 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction() tx := transaction()
tx.SignECDSA(key) tx.SignECDSA(key)
from, _ := tx.From() from, _ := tx.From()
pool.currentState().AddBalance(from, big.NewInt(1)) pool.state.AddBalance(from, big.NewInt(1))
pool.queueTx(tx.Hash(), tx) pool.queueTx(tx.Hash(), tx)
pool.checkQueue() pool.checkQueue()
@ -76,17 +76,17 @@ func TestTransactionQueue(t *testing.T) {
} }
tx = transaction() tx = transaction()
tx.SetNonce(1)
tx.SignECDSA(key) tx.SignECDSA(key)
from, _ = tx.From() from, _ = tx.From()
pool.currentState().SetNonce(from, 10) pool.state.SetNonce(from, 2)
tx.SetNonce(1)
pool.queueTx(tx.Hash(), tx) pool.queueTx(tx.Hash(), tx)
pool.checkQueue() pool.checkQueue()
if _, ok := pool.txs[tx.Hash()]; ok { if _, ok := pool.txs[tx.Hash()]; ok {
t.Error("expected transaction to be in tx pool") t.Error("expected transaction to be in tx pool")
} }
if len(pool.queue[from]) != 0 { if len(pool.queue[from]) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
} }
@ -117,7 +117,7 @@ func TestRemoveTx(t *testing.T) {
tx := transaction() tx := transaction()
tx.SignECDSA(key) tx.SignECDSA(key)
from, _ := tx.From() from, _ := tx.From()
pool.currentState().AddBalance(from, big.NewInt(1)) pool.state.AddBalance(from, big.NewInt(1))
pool.queueTx(tx.Hash(), tx) pool.queueTx(tx.Hash(), tx)
pool.addTx(tx.Hash(), tx) pool.addTx(tx.Hash(), tx)
if len(pool.queue) != 1 { if len(pool.queue) != 1 {
@ -146,7 +146,7 @@ func TestNegativeValue(t *testing.T) {
tx.Value().Set(big.NewInt(-1)) tx.Value().Set(big.NewInt(-1))
tx.SignECDSA(key) tx.SignECDSA(key)
from, _ := tx.From() from, _ := tx.From()
pool.currentState().AddBalance(from, big.NewInt(1)) pool.state.AddBalance(from, big.NewInt(1))
err := pool.Add(tx) err := pool.Add(tx)
if err != ErrNegativeValue { if err != ErrNegativeValue {
t.Error("expected", ErrNegativeValue, "got", err) t.Error("expected", ErrNegativeValue, "got", err)

View File

@ -198,7 +198,6 @@ type Ethereum struct {
net *p2p.Server net *p2p.Server
eventMux *event.TypeMux eventMux *event.TypeMux
txSub event.Subscription
miner *miner.Miner miner *miner.Miner
// logger logger.LogSystem // logger logger.LogSystem
@ -470,10 +469,6 @@ func (s *Ethereum) Start() error {
s.whisper.Start() s.whisper.Start()
} }
// broadcast transactions
s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
go s.txBroadcastLoop()
glog.V(logger.Info).Infoln("Server started") glog.V(logger.Info).Infoln("Server started")
return nil return nil
} }
@ -531,8 +526,6 @@ func (self *Ethereum) AddPeer(nodeURL string) error {
} }
func (s *Ethereum) Stop() { func (s *Ethereum) Stop() {
s.txSub.Unsubscribe() // quits txBroadcastLoop
s.net.Stop() s.net.Stop()
s.protocolManager.Stop() s.protocolManager.Stop()
s.chainManager.Stop() s.chainManager.Stop()
@ -552,28 +545,6 @@ func (s *Ethereum) WaitForShutdown() {
<-s.shutdownChan <-s.shutdownChan
} }
func (self *Ethereum) txBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range self.txSub.Chan() {
event := obj.(core.TxPreEvent)
self.syncAccounts(event.Tx)
}
}
// keep accounts synced up
func (self *Ethereum) syncAccounts(tx *types.Transaction) {
from, err := tx.From()
if err != nil {
return
}
if self.accountManager.HasAccount(from) {
if self.chainManager.TxState().GetNonce(from) < tx.Nonce() {
self.chainManager.TxState().SetNonce(from, tx.Nonce())
}
}
}
// StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval // StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval
// by default that is 10 times per epoch // by default that is 10 times per epoch
// in epoch n, if we past autoDAGepochHeight within-epoch blocks, // in epoch n, if we past autoDAGepochHeight within-epoch blocks,

View File

@ -494,10 +494,6 @@ func (self *worker) commitTransactions(transactions types.Transactions) {
err := self.commitTransaction(tx) err := self.commitTransaction(tx)
switch { switch {
case core.IsNonceErr(err) || core.IsInvalidTxErr(err): case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
// Remove invalid transactions
from, _ := tx.From()
self.chain.TxState().RemoveNonce(from, tx.Nonce())
current.remove.Add(tx.Hash()) current.remove.Add(tx.Hash())
if glog.V(logger.Detail) { if glog.V(logger.Detail) {

View File

@ -936,22 +936,22 @@ func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceS
tx = types.NewTransactionMessage(to, value, gas, price, data) tx = types.NewTransactionMessage(to, value, gas, price, data)
} }
state := self.backend.ChainManager().TxState() state := self.backend.TxPool().State()
var nonce uint64 var nonce uint64
if len(nonceStr) != 0 { if len(nonceStr) != 0 {
nonce = common.Big(nonceStr).Uint64() nonce = common.Big(nonceStr).Uint64()
} else { } else {
nonce = state.NewNonce(from) nonce = state.GetNonce(from) + 1 //state.NewNonce(from)
} }
tx.SetNonce(nonce) tx.SetNonce(nonce)
if err := self.sign(tx, from, false); err != nil { if err := self.sign(tx, from, false); err != nil {
state.RemoveNonce(from, tx.Nonce()) //state.RemoveNonce(from, tx.Nonce())
return "", err return "", err
} }
if err := self.backend.TxPool().Add(tx); err != nil { if err := self.backend.TxPool().Add(tx); err != nil {
state.RemoveNonce(from, tx.Nonce()) //state.RemoveNonce(from, tx.Nonce())
return "", err return "", err
} }