core: implemented a queued approach processing transactions

Implemented a new transaction queue. Transactions with a holes in their
nonce sequence are also not propagated over the network.

N: 0,1,2,5,6,7 = propagate 0..2 -- 5..N is kept in the tx pool
This commit is contained in:
obscuren 2015-04-21 22:01:04 +02:00
parent 2fe54ab233
commit 498b24270a
3 changed files with 123 additions and 25 deletions

View File

@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
state.Sync() state.Sync()
// Remove transactions from the pool // Remove transactions from the pool
sm.txpool.RemoveSet(block.Transactions()) sm.txpool.RemoveTransactions(block.Transactions())
// This puts transactions in a extra db for rpc // This puts transactions in a extra db for rpc
for i, tx := range block.Transactions() { for i, tx := range block.Transactions() {

View File

@ -4,7 +4,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"sort"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
@ -17,7 +19,7 @@ import (
var ( var (
ErrInvalidSender = errors.New("Invalid sender") ErrInvalidSender = errors.New("Invalid sender")
ErrImpossibleNonce = errors.New("Impossible nonce") ErrNonce = errors.New("Nonce too low")
ErrNonExistentAccount = errors.New("Account does not exist") ErrNonExistentAccount = errors.New("Account does not exist")
ErrInsufficientFunds = errors.New("Insufficient funds") ErrInsufficientFunds = errors.New("Insufficient funds")
ErrIntrinsicGas = errors.New("Intrinsic gas too low") ErrIntrinsicGas = errors.New("Intrinsic gas too low")
@ -54,20 +56,37 @@ type TxPool struct {
txs map[common.Hash]*types.Transaction txs map[common.Hash]*types.Transaction
invalidHashes *set.Set invalidHashes *set.Set
queue map[common.Address]types.Transactions
subscribers []chan TxMsg subscribers []chan TxMsg
eventMux *event.TypeMux eventMux *event.TypeMux
} }
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool { func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool {
return &TxPool{ txPool := &TxPool{
txs: make(map[common.Hash]*types.Transaction), txs: make(map[common.Hash]*types.Transaction),
queue: make(map[common.Address]types.Transactions),
queueChan: make(chan *types.Transaction, txPoolQueueSize), queueChan: make(chan *types.Transaction, txPoolQueueSize),
quit: make(chan bool), quit: make(chan bool),
eventMux: eventMux, eventMux: eventMux,
invalidHashes: set.New(), invalidHashes: set.New(),
currentState: currentStateFn, currentState: currentStateFn,
} }
return txPool
}
func (pool *TxPool) Start() {
ticker := time.NewTicker(300 * time.Millisecond)
done:
for {
select {
case <-ticker.C:
pool.checkQueue()
case <-pool.quit:
break done
}
}
} }
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
@ -100,14 +119,15 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
} }
if pool.currentState().GetNonce(from) > tx.Nonce() { if pool.currentState().GetNonce(from) > tx.Nonce() {
return ErrImpossibleNonce return ErrNonce
} }
return nil return nil
} }
func (self *TxPool) addTx(tx *types.Transaction) { func (self *TxPool) addTx(tx *types.Transaction) {
self.txs[tx.Hash()] = tx from, _ := tx.From()
self.queue[from] = append(self.queue[from], tx)
} }
func (self *TxPool) add(tx *types.Transaction) error { func (self *TxPool) add(tx *types.Transaction) error {
@ -144,9 +164,6 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
} }
// Notify the subscribers
go self.eventMux.Post(TxPreEvent{tx})
return nil return nil
} }
@ -189,34 +206,65 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
return return
} }
func (self *TxPool) RemoveSet(txs types.Transactions) { func (self *TxPool) RemoveTransactions(txs types.Transactions) {
self.mu.Lock() self.mu.Lock()
defer self.mu.Unlock() defer self.mu.Unlock()
for _, tx := range txs { for _, tx := range txs {
delete(self.txs, tx.Hash()) delete(self.txs, tx.Hash())
} }
} }
func (self *TxPool) InvalidateSet(hashes *set.Set) {
self.mu.Lock()
defer self.mu.Unlock()
hashes.Each(func(v interface{}) bool {
delete(self.txs, v.(common.Hash))
return true
})
self.invalidHashes.Merge(hashes)
}
func (pool *TxPool) Flush() { func (pool *TxPool) Flush() {
pool.txs = make(map[common.Hash]*types.Transaction) pool.txs = make(map[common.Hash]*types.Transaction)
} }
func (pool *TxPool) Start() {
}
func (pool *TxPool) Stop() { func (pool *TxPool) Stop() {
pool.Flush() pool.Flush()
close(pool.quit)
glog.V(logger.Info).Infoln("TX Pool stopped") glog.V(logger.Info).Infoln("TX Pool stopped")
} }
// check queue will attempt to insert
func (pool *TxPool) checkQueue() {
pool.mu.Lock()
defer pool.mu.Unlock()
for address, txs := range pool.queue {
sort.Sort(types.TxByNonce{txs})
var (
nonce = pool.currentState().GetNonce(address)
start int
)
// Clean up the transactions first and determine the start of the nonces
for _, tx := range txs {
if tx.Nonce() >= nonce {
break
}
start++
}
pool.queue[address] = txs[start:]
// expected nonce
enonce := nonce
for _, tx := range pool.queue[address] {
// If the expected nonce does not match up with the next one
// (i.e. a nonce gap), we stop the loop
if enonce != tx.Nonce() {
break
}
enonce++
pool.txs[tx.Hash()] = tx
// Notify the subscribers
go pool.eventMux.Post(TxPreEvent{tx})
}
//pool.queue[address] = txs[i:]
// delete the entire queue entry if it's empty. There's no need to keep it
if len(pool.queue[address]) == 0 {
delete(pool.queue, address)
}
}
}

View File

@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) {
tx.SignECDSA(key) tx.SignECDSA(key)
err = pool.Add(tx) err = pool.Add(tx)
if err != ErrImpossibleNonce { if err != ErrNonce {
t.Error("expected", ErrImpossibleNonce) t.Error("expected", ErrNonce)
}
}
func TestTransactionQueue(t *testing.T) {
pool, key := setupTxPool()
tx := transaction()
tx.SignECDSA(key)
from, _ := tx.From()
pool.currentState().AddBalance(from, big.NewInt(1))
pool.addTx(tx)
pool.checkQueue()
if len(pool.txs) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.txs))
}
tx = transaction()
tx.SignECDSA(key)
from, _ = tx.From()
pool.currentState().SetNonce(from, 10)
tx.SetNonce(1)
pool.addTx(tx)
pool.checkQueue()
if _, ok := pool.txs[tx.Hash()]; ok {
t.Error("expected transaction to be in tx pool")
}
if len(pool.queue[from]) != 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
}
pool, key = setupTxPool()
tx1, tx2, tx3 := transaction(), transaction(), transaction()
tx2.SetNonce(10)
tx3.SetNonce(11)
tx1.SignECDSA(key)
tx2.SignECDSA(key)
tx3.SignECDSA(key)
pool.addTx(tx1)
pool.addTx(tx2)
pool.addTx(tx3)
from, _ = tx1.From()
pool.checkQueue()
if len(pool.txs) != 1 {
t.Error("expected tx pool to be 1 =")
}
if len(pool.queue[from]) != 2 {
t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
} }
} }