From 498b24270a9c301a9251150afb7f3889c929765c Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 21 Apr 2015 22:01:04 +0200 Subject: [PATCH 01/12] core: implemented a queued approach processing transactions Implemented a new transaction queue. Transactions with a holes in their nonce sequence are also not propagated over the network. N: 0,1,2,5,6,7 = propagate 0..2 -- 5..N is kept in the tx pool --- core/block_processor.go | 2 +- core/transaction_pool.go | 92 ++++++++++++++++++++++++++--------- core/transaction_pool_test.go | 54 +++++++++++++++++++- 3 files changed, 123 insertions(+), 25 deletions(-) diff --git a/core/block_processor.go b/core/block_processor.go index f33f0d433..af47069ad 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st state.Sync() // Remove transactions from the pool - sm.txpool.RemoveSet(block.Transactions()) + sm.txpool.RemoveTransactions(block.Transactions()) // This puts transactions in a extra db for rpc for i, tx := range block.Transactions() { diff --git a/core/transaction_pool.go b/core/transaction_pool.go index eaddcfa09..92a2462c6 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -4,7 +4,9 @@ import ( "errors" "fmt" "math/big" + "sort" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -17,7 +19,7 @@ import ( var ( ErrInvalidSender = errors.New("Invalid sender") - ErrImpossibleNonce = errors.New("Impossible nonce") + ErrNonce = errors.New("Nonce too low") ErrNonExistentAccount = errors.New("Account does not exist") ErrInsufficientFunds = errors.New("Insufficient funds") ErrIntrinsicGas = errors.New("Intrinsic gas too low") @@ -54,20 +56,37 @@ type TxPool struct { txs map[common.Hash]*types.Transaction invalidHashes *set.Set + queue map[common.Address]types.Transactions + subscribers []chan TxMsg eventMux *event.TypeMux } func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool { - return &TxPool{ + txPool := &TxPool{ txs: make(map[common.Hash]*types.Transaction), + queue: make(map[common.Address]types.Transactions), queueChan: make(chan *types.Transaction, txPoolQueueSize), quit: make(chan bool), eventMux: eventMux, invalidHashes: set.New(), currentState: currentStateFn, } + return txPool +} + +func (pool *TxPool) Start() { + ticker := time.NewTicker(300 * time.Millisecond) +done: + for { + select { + case <-ticker.C: + pool.checkQueue() + case <-pool.quit: + break done + } + } } func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { @@ -100,14 +119,15 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { } if pool.currentState().GetNonce(from) > tx.Nonce() { - return ErrImpossibleNonce + return ErrNonce } return nil } func (self *TxPool) addTx(tx *types.Transaction) { - self.txs[tx.Hash()] = tx + from, _ := tx.From() + self.queue[from] = append(self.queue[from], tx) } func (self *TxPool) add(tx *types.Transaction) error { @@ -144,9 +164,6 @@ func (self *TxPool) add(tx *types.Transaction) error { glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) } - // Notify the subscribers - go self.eventMux.Post(TxPreEvent{tx}) - return nil } @@ -189,34 +206,65 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { return } -func (self *TxPool) RemoveSet(txs types.Transactions) { +func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() + for _, tx := range txs { delete(self.txs, tx.Hash()) } } -func (self *TxPool) InvalidateSet(hashes *set.Set) { - self.mu.Lock() - defer self.mu.Unlock() - - hashes.Each(func(v interface{}) bool { - delete(self.txs, v.(common.Hash)) - return true - }) - self.invalidHashes.Merge(hashes) -} - func (pool *TxPool) Flush() { pool.txs = make(map[common.Hash]*types.Transaction) } -func (pool *TxPool) Start() { -} - func (pool *TxPool) Stop() { pool.Flush() + close(pool.quit) glog.V(logger.Info).Infoln("TX Pool stopped") } + +// check queue will attempt to insert +func (pool *TxPool) checkQueue() { + pool.mu.Lock() + defer pool.mu.Unlock() + + for address, txs := range pool.queue { + sort.Sort(types.TxByNonce{txs}) + + var ( + nonce = pool.currentState().GetNonce(address) + start int + ) + // Clean up the transactions first and determine the start of the nonces + for _, tx := range txs { + if tx.Nonce() >= nonce { + break + } + start++ + } + pool.queue[address] = txs[start:] + + // expected nonce + enonce := nonce + for _, tx := range pool.queue[address] { + // If the expected nonce does not match up with the next one + // (i.e. a nonce gap), we stop the loop + if enonce != tx.Nonce() { + break + } + enonce++ + + pool.txs[tx.Hash()] = tx + // Notify the subscribers + go pool.eventMux.Post(TxPreEvent{tx}) + } + //pool.queue[address] = txs[i:] + // delete the entire queue entry if it's empty. There's no need to keep it + if len(pool.queue[address]) == 0 { + delete(pool.queue, address) + } + } +} diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index b7486adb3..5a5cd866f 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) { tx.SignECDSA(key) err = pool.Add(tx) - if err != ErrImpossibleNonce { - t.Error("expected", ErrImpossibleNonce) + if err != ErrNonce { + t.Error("expected", ErrNonce) + } +} + +func TestTransactionQueue(t *testing.T) { + pool, key := setupTxPool() + tx := transaction() + tx.SignECDSA(key) + from, _ := tx.From() + pool.currentState().AddBalance(from, big.NewInt(1)) + pool.addTx(tx) + + pool.checkQueue() + if len(pool.txs) != 1 { + t.Error("expected valid txs to be 1 is", len(pool.txs)) + } + + tx = transaction() + tx.SignECDSA(key) + from, _ = tx.From() + pool.currentState().SetNonce(from, 10) + tx.SetNonce(1) + pool.addTx(tx) + pool.checkQueue() + if _, ok := pool.txs[tx.Hash()]; ok { + t.Error("expected transaction to be in tx pool") + } + + if len(pool.queue[from]) != 0 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) + } + + pool, key = setupTxPool() + tx1, tx2, tx3 := transaction(), transaction(), transaction() + tx2.SetNonce(10) + tx3.SetNonce(11) + tx1.SignECDSA(key) + tx2.SignECDSA(key) + tx3.SignECDSA(key) + pool.addTx(tx1) + pool.addTx(tx2) + pool.addTx(tx3) + from, _ = tx1.From() + pool.checkQueue() + + if len(pool.txs) != 1 { + t.Error("expected tx pool to be 1 =") + } + + if len(pool.queue[from]) != 2 { + t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } } From 5cb5df003de428bc7fb406caf2af46496d70cef4 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 21 Apr 2015 22:02:56 +0200 Subject: [PATCH 02/12] eth: start tx pool in a goroutine --- eth/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/backend.go b/eth/backend.go index 88456e448..a2c0baf8b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -379,7 +379,7 @@ func (s *Ethereum) Start() error { } // Start services - s.txPool.Start() + go s.txPool.Start() if s.whisper != nil { s.whisper.Start() From 4feb5f6f9cd5128b4645c1265d814edb443ef43b Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 21 Apr 2015 22:03:32 +0200 Subject: [PATCH 03/12] xeth, miner: updated some logging --- miner/worker.go | 3 +-- xeth/xeth.go | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index dc1f04d87..9a655831e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -258,7 +258,7 @@ func (self *worker) commitNewWork() { tcount = 0 ignoredTransactors = set.New() ) - //gasLimit: + for _, tx := range transactions { // We can skip err. It has already been validated in the tx pool from, _ := tx.From() @@ -296,7 +296,6 @@ func (self *worker) commitNewWork() { tcount++ } } - //self.eth.TxPool().InvalidateSet(remove) var ( uncles []*types.Header diff --git a/xeth/xeth.go b/xeth/xeth.go index afcb33e4c..251b070e4 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -682,9 +682,11 @@ func (self *XEth) Transact(fromStr, toStr, valueStr, gasStr, gasPriceStr, codeSt if contractCreation { addr := core.AddressFromMessage(tx) - glog.V(logger.Info).Infof("Contract addr %x\n", addr) + glog.V(logger.Info).Infof("Tx(%x) created: %x\n", tx.Hash(), addr) return core.AddressFromMessage(tx).Hex(), nil + } else { + glog.V(logger.Info).Infof("Tx(%x) to: %x\n", tx.Hash(), tx.To()) } return tx.Hash().Hex(), nil } From 7138404cb09aebb990fcce589b87173e66355987 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 21 Apr 2015 23:20:27 +0200 Subject: [PATCH 04/12] core: only post event once per tx & fixed test --- core/transaction_pool.go | 24 +++++++++++++++--------- core/transaction_pool_test.go | 12 ++++++------ 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 92a2462c6..bc6e70c7a 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -125,11 +125,6 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { return nil } -func (self *TxPool) addTx(tx *types.Transaction) { - from, _ := tx.From() - self.queue[from] = append(self.queue[from], tx) -} - func (self *TxPool) add(tx *types.Transaction) error { hash := tx.Hash() @@ -147,7 +142,7 @@ func (self *TxPool) add(tx *types.Transaction) error { return err } - self.addTx(tx) + self.queueTx(tx) var toname string if to := tx.To(); to != nil { @@ -226,6 +221,19 @@ func (pool *TxPool) Stop() { glog.V(logger.Info).Infoln("TX Pool stopped") } +func (self *TxPool) queueTx(tx *types.Transaction) { + from, _ := tx.From() + self.queue[from] = append(self.queue[from], tx) +} + +func (pool *TxPool) addTx(tx *types.Transaction) { + if _, ok := pool.txs[tx.Hash()]; !ok { + pool.txs[tx.Hash()] = tx + // Notify the subscribers + pool.eventMux.Post(TxPreEvent{tx}) + } +} + // check queue will attempt to insert func (pool *TxPool) checkQueue() { pool.mu.Lock() @@ -257,9 +265,7 @@ func (pool *TxPool) checkQueue() { } enonce++ - pool.txs[tx.Hash()] = tx - // Notify the subscribers - go pool.eventMux.Post(TxPreEvent{tx}) + pool.addTx(tx) } //pool.queue[address] = txs[i:] // delete the entire queue entry if it's empty. There's no need to keep it diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index 5a5cd866f..0e049139e 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -67,7 +67,7 @@ func TestTransactionQueue(t *testing.T) { tx.SignECDSA(key) from, _ := tx.From() pool.currentState().AddBalance(from, big.NewInt(1)) - pool.addTx(tx) + pool.queueTx(tx) pool.checkQueue() if len(pool.txs) != 1 { @@ -79,7 +79,7 @@ func TestTransactionQueue(t *testing.T) { from, _ = tx.From() pool.currentState().SetNonce(from, 10) tx.SetNonce(1) - pool.addTx(tx) + pool.queueTx(tx) pool.checkQueue() if _, ok := pool.txs[tx.Hash()]; ok { t.Error("expected transaction to be in tx pool") @@ -96,9 +96,9 @@ func TestTransactionQueue(t *testing.T) { tx1.SignECDSA(key) tx2.SignECDSA(key) tx3.SignECDSA(key) - pool.addTx(tx1) - pool.addTx(tx2) - pool.addTx(tx3) + pool.queueTx(tx1) + pool.queueTx(tx2) + pool.queueTx(tx3) from, _ = tx1.From() pool.checkQueue() @@ -106,7 +106,7 @@ func TestTransactionQueue(t *testing.T) { t.Error("expected tx pool to be 1 =") } - if len(pool.queue[from]) != 2 { + if len(pool.queue[from]) != 3 { t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } } From 1506e00a2361ab641875d5f95966e8034b2902bc Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 22 Apr 2015 12:24:46 +0200 Subject: [PATCH 05/12] core: improved error message for invalid nonce txs --- core/error.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/error.go b/core/error.go index 0642948cd..40db99ecd 100644 --- a/core/error.go +++ b/core/error.go @@ -81,7 +81,7 @@ func (err *NonceErr) Error() string { } func NonceError(is, exp uint64) *NonceErr { - return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce (%d / %d)", is, exp), Is: is, Exp: exp} + return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce. tx=%d state=%d)", is, exp), Is: is, Exp: exp} } func IsNonceErr(err error) bool { From 7edbb0110f6ab04541ed2fef1e373d61d0dc063d Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 22 Apr 2015 12:25:24 +0200 Subject: [PATCH 06/12] core: set the state for the managed tx state Set the state for the managed tx state instead of creating a new managed state. --- core/chain_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/chain_manager.go b/core/chain_manager.go index 1df56b27f..47f84b80a 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -576,7 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { }) self.setTransState(state.New(block.Root(), self.stateDb)) - self.setTxState(state.New(block.Root(), self.stateDb)) + self.txState.SetState(state.New(block.Root(), self.stateDb)) queue[i] = ChainEvent{block, logs} queueEvent.canonicalCount++ From 888ece0cb2c9d07ae821398aeffb0000ef28fb23 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 22 Apr 2015 13:09:59 +0200 Subject: [PATCH 07/12] core: fixed test --- core/chain_makers.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/chain_makers.go b/core/chain_makers.go index 250671ef8..9b4911fba 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -108,7 +108,9 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat // Create a new chain manager starting from given block // Effectively a fork factory func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager { - bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux} + genesis := GenesisBlock(db) + bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: eventMux} + bc.txState = state.ManageState(state.New(genesis.Root(), db)) bc.futureBlocks = NewBlockCache(1000) if block == nil { bc.Reset() From d3be1a271961f13f5bd056d195b790c668552fe1 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 22 Apr 2015 17:56:06 +0200 Subject: [PATCH 08/12] eth: moved mined, tx events to protocol-hnd and improved tx propagation Transactions are now propagated to peers from which we have not yet received the transaction. This will significantly reduce the chatter on the network. Moved new mined block handler to the protocol handler and moved transaction handling to protocol handler. --- core/transaction_pool.go | 6 ++-- eth/backend.go | 34 +++++---------------- eth/handler.go | 66 ++++++++++++++++++++++++++++++++++++++-- eth/peer.go | 6 ++++ 4 files changed, 82 insertions(+), 30 deletions(-) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index bc6e70c7a..9c175e568 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -229,8 +229,10 @@ func (self *TxPool) queueTx(tx *types.Transaction) { func (pool *TxPool) addTx(tx *types.Transaction) { if _, ok := pool.txs[tx.Hash()]; !ok { pool.txs[tx.Hash()] = tx - // Notify the subscribers - pool.eventMux.Post(TxPreEvent{tx}) + // Notify the subscribers. This event is posted in a goroutine + // because it's possible that somewhere during the post "Remove transaction" + // gets called which will then wait for the global tx pool lock and deadlock. + go pool.eventMux.Post(TxPreEvent{tx}) } } diff --git a/eth/backend.go b/eth/backend.go index a2c0baf8b..646a4eaf2 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -3,7 +3,6 @@ package eth import ( "crypto/ecdsa" "fmt" - "math" "path" "strings" @@ -136,11 +135,10 @@ type Ethereum struct { protocolManager *ProtocolManager downloader *downloader.Downloader - net *p2p.Server - eventMux *event.TypeMux - txSub event.Subscription - minedBlockSub event.Subscription - miner *miner.Miner + net *p2p.Server + eventMux *event.TypeMux + txSub event.Subscription + miner *miner.Miner // logger logger.LogSystem @@ -222,7 +220,7 @@ func New(config *Config) (*Ethereum, error) { eth.whisper = whisper.New() eth.shhVersionId = int(eth.whisper.Version()) eth.miner = miner.New(eth, eth.pow, config.MinerThreads) - eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.downloader) + eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) netprv, err := config.nodeKey() if err != nil { @@ -380,6 +378,7 @@ func (s *Ethereum) Start() error { // Start services go s.txPool.Start() + s.protocolManager.Start() if s.whisper != nil { s.whisper.Start() @@ -389,10 +388,6 @@ func (s *Ethereum) Start() error { s.txSub = s.eventMux.Subscribe(core.TxPreEvent{}) go s.txBroadcastLoop() - // broadcast mined blocks - s.minedBlockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{}) - go s.minedBroadcastLoop() - glog.V(logger.Info).Infoln("Server started") return nil } @@ -422,9 +417,9 @@ func (s *Ethereum) Stop() { defer s.stateDb.Close() defer s.extraDb.Close() - s.txSub.Unsubscribe() // quits txBroadcastLoop - s.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + s.txSub.Unsubscribe() // quits txBroadcastLoop + s.protocolManager.Stop() s.txPool.Stop() s.eventMux.Stop() if s.whisper != nil { @@ -440,13 +435,10 @@ func (s *Ethereum) WaitForShutdown() { <-s.shutdownChan } -// now tx broadcasting is taken out of txPool -// handled here via subscription, efficiency? func (self *Ethereum) txBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.txSub.Chan() { event := obj.(core.TxPreEvent) - self.net.BroadcastLimited("eth", TxMsg, math.Sqrt, []*types.Transaction{event.Tx}) self.syncAccounts(event.Tx) } } @@ -465,16 +457,6 @@ func (self *Ethereum) syncAccounts(tx *types.Transaction) { } } -func (self *Ethereum) minedBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.minedBlockSub.Chan() { - switch ev := obj.(type) { - case core.NewMinedBlockEvent: - self.protocolManager.BroadcastBlock(ev.Block.Hash(), ev.Block) - } - } -} - func saveProtocolVersion(db common.Database, protov int) { d, _ := db.Get([]byte("ProtocolVersion")) protocolVersion := common.NewValue(d).Uint() diff --git a/eth/handler.go b/eth/handler.go index 622f22132..d466dbfee 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -44,6 +44,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" @@ -77,12 +78,17 @@ type ProtocolManager struct { peers map[string]*peer SubProtocol p2p.Protocol + + eventMux *event.TypeMux + txSub event.Subscription + minedBlockSub event.Subscription } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { +func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { manager := &ProtocolManager{ + eventMux: mux, txpool: txpool, chainman: chainman, downloader: downloader, @@ -105,6 +111,21 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman return manager } +func (pm *ProtocolManager) Start() { + // broadcast transactions + pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + go pm.txBroadcastLoop() + + // broadcast mined blocks + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop() +} + +func (pm *ProtocolManager) Stop() { + pm.txSub.Unsubscribe() // quits txBroadcastLoop + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop +} + func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { td, current, genesis := pm.chainman.Status() @@ -326,10 +347,51 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) } } // Broadcast block to peer set - // XXX due to the current shit state of the network disable the limit peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendNewBlock(block) } glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers") } + +// BroadcastTx will propagate the block to its connected peers. It will sort +// out which peers do not contain the block in their block set and will do a +// sqrt(peers) to determine the amount of peers we broadcast to. +func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { + pm.pmu.Lock() + defer pm.pmu.Unlock() + + // Find peers who don't know anything about the given hash. Peers that + // don't know about the hash will be a candidate for the broadcast loop + var peers []*peer + for _, peer := range pm.peers { + if !peer.txHashes.Has(hash) { + peers = append(peers, peer) + } + } + // Broadcast block to peer set + peers = peers[:int(math.Sqrt(float64(len(peers))))] + for _, peer := range peers { + peer.sendTransaction(tx) + } + glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers") +} + +// Mined broadcast loop +func (self *ProtocolManager) minedBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.minedBlockSub.Chan() { + switch ev := obj.(type) { + case core.NewMinedBlockEvent: + self.BroadcastBlock(ev.Block.Hash(), ev.Block) + } + } +} + +func (self *ProtocolManager) txBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.txSub.Chan() { + event := obj.(core.TxPreEvent) + self.BroadcastTx(event.Tx.Hash(), event.Tx) + } +} diff --git a/eth/peer.go b/eth/peer.go index 972880845..ec0c4b1f3 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -86,6 +86,12 @@ func (p *peer) sendNewBlock(block *types.Block) error { return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, block.Td}) } +func (p *peer) sendTransaction(tx *types.Transaction) error { + p.txHashes.Add(tx.Hash()) + + return p2p.Send(p.rw, TxMsg, []*types.Transaction{tx}) +} + func (p *peer) requestHashes(from common.Hash) error { glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4]) return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes}) From fba40e18d9c231b3ab7ee7f6eba36ac859dffbb2 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 23 Apr 2015 10:51:13 +0200 Subject: [PATCH 09/12] core: added accessor for queued transactions --- core/transaction_pool.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 9c175e568..7098dba23 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -201,6 +201,18 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { return } +func (self *TxPool) GetQueuedTransactions() types.Transactions { + self.mu.RLock() + defer self.mu.RUnlock() + + var txs types.Transactions + for _, ts := range self.queue { + txs = append(txs, ts...) + } + + return txs +} + func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() From d7bab216811f4444a3183fd3e99061b04b4c7bda Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 23 Apr 2015 10:51:47 +0200 Subject: [PATCH 10/12] natspec: fixed test to work with new queued transactions --- common/natspec/natspec_e2e_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/natspec/natspec_e2e_test.go b/common/natspec/natspec_e2e_test.go index 147abe162..6bdaec8a1 100644 --- a/common/natspec/natspec_e2e_test.go +++ b/common/natspec/natspec_e2e_test.go @@ -220,7 +220,7 @@ func (self *testFrontend) applyTxs() { block := self.ethereum.ChainManager().NewBlock(cb) coinbase := self.stateDb.GetStateObject(cb) coinbase.SetGasPool(big.NewInt(10000000)) - txs := self.ethereum.TxPool().GetTransactions() + txs := self.ethereum.TxPool().GetQueuedTransactions() for i := 0; i < len(txs); i++ { for _, tx := range txs { From 48135657c478ecffead52cdddaf2bbbaa89d388e Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 23 Apr 2015 10:53:48 +0200 Subject: [PATCH 11/12] miner: show error message for gas limit per account --- miner/worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 9a655831e..19ede3c93 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -290,8 +290,8 @@ func (self *worker) commitNewWork() { // ignore the transactor so no nonce errors will be thrown for this account // next time the worker is run, they'll be picked up again. ignoredTransactors.Add(from) - //glog.V(logger.Debug).Infof("Gas limit reached for block. %d TXs included in this block\n", i) - //break gasLimit + + glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) default: tcount++ } From 7f14fbd57936cf74627572da4a06585d35161ea9 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 23 Apr 2015 11:09:58 +0200 Subject: [PATCH 12/12] core: pending txs now re-validated once every second --- core/transaction_pool.go | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 7098dba23..392e17856 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -77,12 +77,18 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool { } func (pool *TxPool) Start() { - ticker := time.NewTicker(300 * time.Millisecond) + // Queue timer will tick so we can attempt to move items from the queue to the + // main transaction pool. + queueTimer := time.NewTicker(300 * time.Millisecond) + // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce) + removalTimer := time.NewTicker(1 * time.Second) done: for { select { - case <-ticker.C: + case <-queueTimer.C: pool.checkQueue() + case <-removalTimer.C: + pool.validatePool() case <-pool.quit: break done } @@ -253,11 +259,12 @@ func (pool *TxPool) checkQueue() { pool.mu.Lock() defer pool.mu.Unlock() + statedb := pool.currentState() for address, txs := range pool.queue { sort.Sort(types.TxByNonce{txs}) var ( - nonce = pool.currentState().GetNonce(address) + nonce = statedb.GetNonce(address) start int ) // Clean up the transactions first and determine the start of the nonces @@ -288,3 +295,20 @@ func (pool *TxPool) checkQueue() { } } } + +func (pool *TxPool) validatePool() { + pool.mu.Lock() + defer pool.mu.Unlock() + + statedb := pool.currentState() + for hash, tx := range pool.txs { + from, _ := tx.From() + if nonce := statedb.GetNonce(from); nonce > tx.Nonce() { + if glog.V(logger.Debug) { + glog.Infof("removed tx (%x) from pool due to nonce error. state=%d tx=%d\n", hash[:4], nonce, tx.Nonce()) + } + + delete(pool.txs, hash) + } + } +}