diff --git a/core/tx_pool.go b/core/tx_pool.go index e997e8cd0..f2eb2bbdd 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -60,8 +60,7 @@ type stateFn func() (*state.StateDB, error) // two states over time as they are received and processed. type TxPool struct { config *ChainConfig - quit chan bool // Quitting channel - currentState stateFn // The state function which will allow us to do some pre checks + currentState stateFn // The state function which will allow us to do some pre checks pendingState *state.ManagedState gasLimit func() *big.Int // The current gas limit function callback minGasPrice *big.Int @@ -72,6 +71,8 @@ type TxPool struct { pending map[common.Hash]*types.Transaction // processable transactions queue map[common.Address]map[common.Hash]*types.Transaction + wg sync.WaitGroup // for shutdown sync + homestead bool } @@ -80,7 +81,6 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat config: config, pending: make(map[common.Hash]*types.Transaction), queue: make(map[common.Address]map[common.Hash]*types.Transaction), - quit: make(chan bool), eventMux: eventMux, currentState: currentStateFn, gasLimit: gasLimitFn, @@ -90,12 +90,15 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}), } + pool.wg.Add(1) go pool.eventLoop() return pool } func (pool *TxPool) eventLoop() { + defer pool.wg.Done() + // Track chain events. When a chain events occurs (new chain canon block) // we need to know the new state. The new state will help us determine // the nonces in the managed state @@ -155,8 +158,8 @@ func (pool *TxPool) resetState() { } func (pool *TxPool) Stop() { - close(pool.quit) pool.events.Unsubscribe() + pool.wg.Wait() glog.V(logger.Info).Infoln("Transaction pool stopped") } diff --git a/eth/backend.go b/eth/backend.go index 9722e9625..f43dea777 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -416,6 +416,7 @@ func (s *Ethereum) Stop() error { s.blockchain.Stop() s.protocolManager.Stop() s.txPool.Stop() + s.miner.Stop() s.eventMux.Stop() s.StopAutoDAG() diff --git a/eth/handler.go b/eth/handler.go index d6b474a91..3980a625e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -74,14 +74,14 @@ type ProtocolManager struct { minedBlockSub event.Subscription // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - quitSync chan struct{} + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} + noMorePeers chan struct{} // wait group is used for graceful shutdowns during downloading // and processing - wg sync.WaitGroup - quit bool + wg sync.WaitGroup } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -94,16 +94,17 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, } // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkId: networkId, - fastSync: fastSync, - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - chaindb: chaindb, - peers: newPeerSet(), - newPeerCh: make(chan *peer, 1), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkId: networkId, + fastSync: fastSync, + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + chaindb: chaindb, + peers: newPeerSet(), + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) @@ -120,8 +121,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) - manager.newPeerCh <- peer - return manager.handle(peer) + select { + case manager.newPeerCh <- peer: + manager.wg.Add(1) + defer manager.wg.Done() + return manager.handle(peer) + case <-manager.quitSync: + return p2p.DiscQuitting + } }, NodeInfo: func() interface{} { return manager.NodeInfo() @@ -187,16 +194,25 @@ func (pm *ProtocolManager) Start() { } func (pm *ProtocolManager) Stop() { - // Showing a log message. During download / process this could actually - // take between 5 to 10 seconds and therefor feedback is required. glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...") - pm.quit = true pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - close(pm.quitSync) // quits syncer, fetcher, txsyncLoop - // Wait for any process action + // Quit the sync loop. + // After this send has completed, no new peers will be accepted. + pm.noMorePeers <- struct{}{} + + // Quit fetcher, txsyncLoop. + close(pm.quitSync) + + // Disconnect existing sessions. + // This also closes the gate for any new registrations on the peer set. + // sessions which are already established but not added to pm.peers yet + // will exit when they try to register. + pm.peers.Close() + + // Wait for all peer handler goroutines and the loops to come down. pm.wg.Wait() glog.V(logger.Info).Infoln("Ethereum protocol handler stopped") diff --git a/eth/helper_test.go b/eth/helper_test.go index 5703d44cc..dacb1593f 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -140,14 +140,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te // Start the peer on a new thread errc := make(chan error, 1) go func() { - pm.newPeerCh <- peer - errc <- pm.handle(peer) + select { + case pm.newPeerCh <- peer: + errc <- pm.handle(peer) + case <-pm.quitSync: + errc <- p2p.DiscQuitting + } }() - tp := &testPeer{ - app: app, - net: net, - peer: peer, - } + tp := &testPeer{app: app, net: net, peer: peer} // Execute any implicitly requested handshakes and return if shake { td, head, genesis := pm.blockchain.Status() diff --git a/eth/peer.go b/eth/peer.go index 15ba22ff5..8eb41b0f9 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -34,6 +34,7 @@ import ( ) var ( + errClosed = errors.New("peer set is closed") errAlreadyRegistered = errors.New("peer is already registered") errNotRegistered = errors.New("peer is not registered") ) @@ -351,8 +352,9 @@ func (p *peer) String() string { // peerSet represents the collection of active peers currently participating in // the Ethereum sub-protocol. type peerSet struct { - peers map[string]*peer - lock sync.RWMutex + peers map[string]*peer + lock sync.RWMutex + closed bool } // newPeerSet creates a new peer set to track the active participants. @@ -368,6 +370,9 @@ func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() defer ps.lock.Unlock() + if ps.closed { + return errClosed + } if _, ok := ps.peers[p.id]; ok { return errAlreadyRegistered } @@ -450,3 +455,15 @@ func (ps *peerSet) BestPeer() *peer { } return bestPeer } + +// Close disconnects all peers. +// No new peers can be registered after Close has returned. +func (ps *peerSet) Close() { + ps.lock.Lock() + defer ps.lock.Unlock() + + for _, p := range ps.peers { + p.Disconnect(p2p.DiscQuitting) + } + ps.closed = true +} diff --git a/eth/sync.go b/eth/sync.go index dd8aef8e4..69881530d 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -148,7 +148,7 @@ func (pm *ProtocolManager) syncer() { // Force a sync even if not enough peers are present go pm.synchronise(pm.peers.BestPeer()) - case <-pm.quitSync: + case <-pm.noMorePeers: return } } diff --git a/miner/worker.go b/miner/worker.go index 21588e310..3d1928bf6 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -94,10 +94,13 @@ type worker struct { mu sync.Mutex + // update loop + mux *event.TypeMux + events event.Subscription + wg sync.WaitGroup + agents map[Agent]struct{} recv chan *Result - mux *event.TypeMux - quit chan struct{} pow pow.PoW eth core.Backend @@ -138,13 +141,14 @@ func newWorker(config *core.ChainConfig, coinbase common.Address, eth core.Backe possibleUncles: make(map[common.Hash]*types.Block), coinbase: coinbase, txQueue: make(map[common.Hash]*types.Transaction), - quit: make(chan struct{}), agents: make(map[Agent]struct{}), fullValidation: false, } + worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) + worker.wg.Add(1) go worker.update() - go worker.wait() + go worker.wait() worker.commitNewWork() return worker @@ -184,9 +188,12 @@ func (self *worker) start() { } func (self *worker) stop() { + // Quit update. + self.events.Unsubscribe() + self.wg.Wait() + self.mu.Lock() defer self.mu.Unlock() - if atomic.LoadInt32(&self.mining) == 1 { // Stop all agents. for agent := range self.agents { @@ -217,36 +224,23 @@ func (self *worker) unregister(agent Agent) { } func (self *worker) update() { - eventSub := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) - defer eventSub.Unsubscribe() - - eventCh := eventSub.Chan() - for { - select { - case event, ok := <-eventCh: - if !ok { - // Event subscription closed, set the channel to nil to stop spinning - eventCh = nil - continue + defer self.wg.Done() + for event := range self.events.Chan() { + // A real event arrived, process interesting content + switch ev := event.Data.(type) { + case core.ChainHeadEvent: + self.commitNewWork() + case core.ChainSideEvent: + self.uncleMu.Lock() + self.possibleUncles[ev.Block.Hash()] = ev.Block + self.uncleMu.Unlock() + case core.TxPreEvent: + // Apply transaction to the pending state if we're not mining + if atomic.LoadInt32(&self.mining) == 0 { + self.currentMu.Lock() + self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain) + self.currentMu.Unlock() } - // A real event arrived, process interesting content - switch ev := event.Data.(type) { - case core.ChainHeadEvent: - self.commitNewWork() - case core.ChainSideEvent: - self.uncleMu.Lock() - self.possibleUncles[ev.Block.Hash()] = ev.Block - self.uncleMu.Unlock() - case core.TxPreEvent: - // Apply transaction to the pending state if we're not mining - if atomic.LoadInt32(&self.mining) == 0 { - self.currentMu.Lock() - self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain) - self.currentMu.Unlock() - } - } - case <-self.quit: - return } } }