diff --git a/core/block_processor.go b/core/block_processor.go index f33f0d433..af47069ad 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st state.Sync() // Remove transactions from the pool - sm.txpool.RemoveSet(block.Transactions()) + sm.txpool.RemoveTransactions(block.Transactions()) // This puts transactions in a extra db for rpc for i, tx := range block.Transactions() { diff --git a/core/transaction_pool.go b/core/transaction_pool.go index eaddcfa09..92a2462c6 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -4,7 +4,9 @@ import ( "errors" "fmt" "math/big" + "sort" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -17,7 +19,7 @@ import ( var ( ErrInvalidSender = errors.New("Invalid sender") - ErrImpossibleNonce = errors.New("Impossible nonce") + ErrNonce = errors.New("Nonce too low") ErrNonExistentAccount = errors.New("Account does not exist") ErrInsufficientFunds = errors.New("Insufficient funds") ErrIntrinsicGas = errors.New("Intrinsic gas too low") @@ -54,20 +56,37 @@ type TxPool struct { txs map[common.Hash]*types.Transaction invalidHashes *set.Set + queue map[common.Address]types.Transactions + subscribers []chan TxMsg eventMux *event.TypeMux } func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool { - return &TxPool{ + txPool := &TxPool{ txs: make(map[common.Hash]*types.Transaction), + queue: make(map[common.Address]types.Transactions), queueChan: make(chan *types.Transaction, txPoolQueueSize), quit: make(chan bool), eventMux: eventMux, invalidHashes: set.New(), currentState: currentStateFn, } + return txPool +} + +func (pool *TxPool) Start() { + ticker := time.NewTicker(300 * time.Millisecond) +done: + for { + select { + case <-ticker.C: + pool.checkQueue() + case <-pool.quit: + break done + } + } } func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { @@ -100,14 +119,15 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { } if pool.currentState().GetNonce(from) > tx.Nonce() { - return ErrImpossibleNonce + return ErrNonce } return nil } func (self *TxPool) addTx(tx *types.Transaction) { - self.txs[tx.Hash()] = tx + from, _ := tx.From() + self.queue[from] = append(self.queue[from], tx) } func (self *TxPool) add(tx *types.Transaction) error { @@ -144,9 +164,6 @@ func (self *TxPool) add(tx *types.Transaction) error { glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) } - // Notify the subscribers - go self.eventMux.Post(TxPreEvent{tx}) - return nil } @@ -189,34 +206,65 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { return } -func (self *TxPool) RemoveSet(txs types.Transactions) { +func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() + for _, tx := range txs { delete(self.txs, tx.Hash()) } } -func (self *TxPool) InvalidateSet(hashes *set.Set) { - self.mu.Lock() - defer self.mu.Unlock() - - hashes.Each(func(v interface{}) bool { - delete(self.txs, v.(common.Hash)) - return true - }) - self.invalidHashes.Merge(hashes) -} - func (pool *TxPool) Flush() { pool.txs = make(map[common.Hash]*types.Transaction) } -func (pool *TxPool) Start() { -} - func (pool *TxPool) Stop() { pool.Flush() + close(pool.quit) glog.V(logger.Info).Infoln("TX Pool stopped") } + +// check queue will attempt to insert +func (pool *TxPool) checkQueue() { + pool.mu.Lock() + defer pool.mu.Unlock() + + for address, txs := range pool.queue { + sort.Sort(types.TxByNonce{txs}) + + var ( + nonce = pool.currentState().GetNonce(address) + start int + ) + // Clean up the transactions first and determine the start of the nonces + for _, tx := range txs { + if tx.Nonce() >= nonce { + break + } + start++ + } + pool.queue[address] = txs[start:] + + // expected nonce + enonce := nonce + for _, tx := range pool.queue[address] { + // If the expected nonce does not match up with the next one + // (i.e. a nonce gap), we stop the loop + if enonce != tx.Nonce() { + break + } + enonce++ + + pool.txs[tx.Hash()] = tx + // Notify the subscribers + go pool.eventMux.Post(TxPreEvent{tx}) + } + //pool.queue[address] = txs[i:] + // delete the entire queue entry if it's empty. There's no need to keep it + if len(pool.queue[address]) == 0 { + delete(pool.queue, address) + } + } +} diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index b7486adb3..5a5cd866f 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) { tx.SignECDSA(key) err = pool.Add(tx) - if err != ErrImpossibleNonce { - t.Error("expected", ErrImpossibleNonce) + if err != ErrNonce { + t.Error("expected", ErrNonce) + } +} + +func TestTransactionQueue(t *testing.T) { + pool, key := setupTxPool() + tx := transaction() + tx.SignECDSA(key) + from, _ := tx.From() + pool.currentState().AddBalance(from, big.NewInt(1)) + pool.addTx(tx) + + pool.checkQueue() + if len(pool.txs) != 1 { + t.Error("expected valid txs to be 1 is", len(pool.txs)) + } + + tx = transaction() + tx.SignECDSA(key) + from, _ = tx.From() + pool.currentState().SetNonce(from, 10) + tx.SetNonce(1) + pool.addTx(tx) + pool.checkQueue() + if _, ok := pool.txs[tx.Hash()]; ok { + t.Error("expected transaction to be in tx pool") + } + + if len(pool.queue[from]) != 0 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) + } + + pool, key = setupTxPool() + tx1, tx2, tx3 := transaction(), transaction(), transaction() + tx2.SetNonce(10) + tx3.SetNonce(11) + tx1.SignECDSA(key) + tx2.SignECDSA(key) + tx3.SignECDSA(key) + pool.addTx(tx1) + pool.addTx(tx2) + pool.addTx(tx3) + from, _ = tx1.From() + pool.checkQueue() + + if len(pool.txs) != 1 { + t.Error("expected tx pool to be 1 =") + } + + if len(pool.queue[from]) != 2 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } }