diff --git a/core/blockchain.go b/core/blockchain.go index 1da899ab4..d6f732194 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -897,7 +897,7 @@ func (bc *BlockChain) Stop() { log.Error("Dangling trie nodes after full cleanup") } } - log.Info("Blockchain manager stopped") + log.Info("Blockchain stopped") } func (bc *BlockChain) procFutureBlocks() { diff --git a/eth/backend.go b/eth/backend.go index ed79340f5..589773f81 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -67,9 +67,6 @@ type LesServer interface { type Ethereum struct { config *Config - // Channel for shutting down the service - shutdownChan chan bool - // Handlers txPool *core.TxPool blockchain *core.BlockChain @@ -84,8 +81,9 @@ type Ethereum struct { engine consensus.Engine accountManager *accounts.Manager - bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests - bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests + bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + closeBloomHandler chan struct{} APIBackend *EthAPIBackend @@ -145,17 +143,17 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { log.Info("Initialised chain configuration", "config", chainConfig) eth := &Ethereum{ - config: config, - chainDb: chainDb, - eventMux: ctx.EventMux, - accountManager: ctx.AccountManager, - engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb), - shutdownChan: make(chan bool), - networkID: config.NetworkId, - gasPrice: config.Miner.GasPrice, - etherbase: config.Miner.Etherbase, - bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), + config: config, + chainDb: chainDb, + eventMux: ctx.EventMux, + accountManager: ctx.AccountManager, + engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb), + closeBloomHandler: make(chan struct{}), + networkID: config.NetworkId, + gasPrice: config.Miner.GasPrice, + etherbase: config.Miner.Etherbase, + bloomRequests: make(chan chan *bloombits.Retrieval), + bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), } bcVersion := rawdb.ReadDatabaseVersion(chainDb) @@ -557,18 +555,20 @@ func (s *Ethereum) Start(srvr *p2p.Server) error { // Stop implements node.Service, terminating all internal goroutines used by the // Ethereum protocol. func (s *Ethereum) Stop() error { - s.bloomIndexer.Close() - s.blockchain.Stop() - s.engine.Close() + // Stop all the peer-related stuff first. s.protocolManager.Stop() if s.lesServer != nil { s.lesServer.Stop() } + + // Then stop everything else. + s.bloomIndexer.Close() + close(s.closeBloomHandler) s.txPool.Stop() s.miner.Stop() - s.eventMux.Stop() - + s.blockchain.Stop() + s.engine.Close() s.chainDb.Close() - close(s.shutdownChan) + s.eventMux.Stop() return nil } diff --git a/eth/bloombits.go b/eth/bloombits.go index 9a31997d6..35522b9bf 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -54,7 +54,7 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { go func() { for { select { - case <-eth.shutdownChan: + case <-eth.closeBloomHandler: return case request := <-eth.bloomRequests: diff --git a/eth/handler.go b/eth/handler.go index 236e50729..9a02f1f20 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -87,14 +87,12 @@ type ProtocolManager struct { whitelist map[uint64]common.Hash // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - quitSync chan struct{} - noMorePeers chan struct{} + txsyncCh chan *txsync + quitSync chan struct{} - // wait group is used for graceful shutdowns during downloading - // and processing - wg sync.WaitGroup + chainSync *chainSyncer + wg sync.WaitGroup + peerWG sync.WaitGroup // Test fields or hooks broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation @@ -105,18 +103,17 @@ type ProtocolManager struct { func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - forkFilter: forkid.NewFilter(blockchain), - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - peers: newPeerSet(), - whitelist: whitelist, - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkID: networkID, + forkFilter: forkid.NewFilter(blockchain), + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + peers: newPeerSet(), + whitelist: whitelist, + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } + if mode == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the fast // block is ahead, so fast sync was enabled for this node at a certain point. @@ -140,6 +137,7 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh manager.fastSync = uint32(1) } } + // If we have trusted checkpoints, enforce them on the chain if checkpoint != nil { manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1 @@ -199,6 +197,8 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh } manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx) + manager.chainSync = newChainSyncer(manager) + return manager, nil } @@ -213,15 +213,7 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol { Version: version, Length: length, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := pm.newPeer(int(version), p, rw, pm.txpool.Get) - select { - case pm.newPeerCh <- peer: - pm.wg.Add(1) - defer pm.wg.Done() - return pm.handle(peer) - case <-pm.quitSync: - return p2p.DiscQuitting - } + return pm.runPeer(pm.newPeer(int(version), p, rw, pm.txpool.Get)) }, NodeInfo: func() interface{} { return pm.NodeInfo() @@ -260,40 +252,37 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions + pm.wg.Add(1) pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() // broadcast mined blocks + pm.wg.Add(1) pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() // start sync handlers - go pm.syncer() + pm.wg.Add(2) + go pm.chainSync.loop() go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64. } func (pm *ProtocolManager) Stop() { - log.Info("Stopping Ethereum protocol") - pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - // Quit the sync loop. - // After this send has completed, no new peers will be accepted. - pm.noMorePeers <- struct{}{} - - // Quit fetcher, txsyncLoop. + // Quit chainSync and txsync64. + // After this is done, no new peers will be accepted. close(pm.quitSync) + pm.wg.Wait() // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to pm.peers yet // will exit when they try to register. pm.peers.Close() - - // Wait for all peer handler goroutines and the loops to come down. - pm.wg.Wait() + pm.peerWG.Wait() log.Info("Ethereum protocol stopped") } @@ -302,6 +291,15 @@ func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter, ge return newPeer(pv, p, rw, getPooledTx) } +func (pm *ProtocolManager) runPeer(p *peer) error { + if !pm.chainSync.handlePeerEvent(p) { + return p2p.DiscQuitting + } + pm.peerWG.Add(1) + defer pm.peerWG.Done() + return pm.handle(p) +} + // handle is the callback invoked to manage the life cycle of an eth peer. When // this function terminates, the peer is disconnected. func (pm *ProtocolManager) handle(p *peer) error { @@ -323,6 +321,7 @@ func (pm *ProtocolManager) handle(p *peer) error { p.Log().Debug("Ethereum handshake failed", "err", err) return err } + // Register the peer locally if err := pm.peers.Register(p); err != nil { p.Log().Error("Ethereum peer registration failed", "err", err) @@ -334,6 +333,8 @@ func (pm *ProtocolManager) handle(p *peer) error { if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { return err } + pm.chainSync.handlePeerEvent(p) + // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) @@ -723,14 +724,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Update the peer's total difficulty if better than the previous if _, td := p.Head(); trueTD.Cmp(td) > 0 { p.SetHead(trueHead, trueTD) - - // Schedule a sync if above ours. Note, this will not fire a sync for a gap of - // a single block (as the true TD is below the propagated block), however this - // scenario should easily be covered by the fetcher. - currentHeader := pm.blockchain.CurrentHeader() - if trueTD.Cmp(pm.blockchain.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())) > 0 { - go pm.synchronise(p) - } + pm.chainSync.handlePeerEvent(p) } case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65: @@ -883,9 +877,10 @@ func (pm *ProtocolManager) BroadcastTransactions(txs types.Transactions, propaga } } -// Mined broadcast loop +// minedBroadcastLoop sends mined blocks to connected peers. func (pm *ProtocolManager) minedBroadcastLoop() { - // automatically stops if unsubscribe + defer pm.wg.Done() + for obj := range pm.minedBlockSub.Chan() { if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { pm.BroadcastBlock(ev.Block, true) // First propagate block to peers @@ -894,7 +889,10 @@ func (pm *ProtocolManager) minedBroadcastLoop() { } } +// txBroadcastLoop announces new transactions to connected peers. func (pm *ProtocolManager) txBroadcastLoop() { + defer pm.wg.Done() + for { select { case event := <-pm.txsCh: @@ -906,7 +904,6 @@ func (pm *ProtocolManager) txBroadcastLoop() { pm.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers pm.BroadcastTransactions(event.Txs, false) // Only then announce to the rest - // Err() channel will be closed when unsubscribing. case <-pm.txsSub.Err(): return } diff --git a/eth/helper_test.go b/eth/helper_test.go index bec37e16c..3338af71d 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -170,23 +170,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te // Create a message pipe to communicate through app, net := p2p.MsgPipe() - // Generate a random id and create the peer + // Start the peer on a new thread var id enode.ID rand.Read(id[:]) - peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net, pm.txpool.Get) - - // Start the peer on a new thread errc := make(chan error, 1) - go func() { - select { - case pm.newPeerCh <- peer: - errc <- pm.handle(peer) - case <-pm.quitSync: - errc <- p2p.DiscQuitting - } - }() + go func() { errc <- pm.runPeer(peer) }() tp := &testPeer{app: app, net: net, peer: peer} + // Execute any implicitly requested handshakes and return if shake { var ( diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 4bbfe9bd3..a313e4e6c 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -385,7 +385,7 @@ func testSyncTransaction(t *testing.T, propagtion bool) { go pmFetcher.handle(pmFetcher.newPeer(65, p2p.NewPeer(enode.ID{}, "fetcher", nil), io1, pmFetcher.txpool.Get)) time.Sleep(250 * time.Millisecond) - pmFetcher.synchronise(pmFetcher.peers.BestPeer()) + pmFetcher.doSync(peerToSyncOp(downloader.FullSync, pmFetcher.peers.BestPeer())) atomic.StoreUint32(&pmFetcher.acceptTxs, 1) newTxs := make(chan core.NewTxsEvent, 1024) diff --git a/eth/sync.go b/eth/sync.go index 0709706c9..d689200dc 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -17,6 +17,7 @@ package eth import ( + "math/big" "math/rand" "sync/atomic" "time" @@ -30,9 +31,9 @@ import ( const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - minDesiredPeerCount = 5 // Amount of peers desired to start syncing + defaultMinSyncPeers = 5 // Amount of peers desired to start syncing - // This is the target size for the packs of transactions sent by txsyncLoop. + // This is the target size for the packs of transactions sent by txsyncLoop64. // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 ) @@ -81,12 +82,15 @@ func (pm *ProtocolManager) syncTransactions(p *peer) { // transactions. In order to minimise egress bandwidth usage, we send // the transactions in small packs to one peer at a time. func (pm *ProtocolManager) txsyncLoop64() { + defer pm.wg.Done() + var ( pending = make(map[enode.ID]*txsync) sending = false // whether a send is active pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send ) + // send starts a sending a pack of transactions from the sync. send := func(s *txsync) { if s.p.version >= eth65 { @@ -149,73 +153,148 @@ func (pm *ProtocolManager) txsyncLoop64() { } } -// syncer is responsible for periodically synchronising with the network, both -// downloading hashes and blocks as well as handling the announcement handler. -func (pm *ProtocolManager) syncer() { - // Start and ensure cleanup of sync mechanisms - pm.blockFetcher.Start() - pm.txFetcher.Start() - defer pm.blockFetcher.Stop() - defer pm.txFetcher.Stop() - defer pm.downloader.Terminate() +// chainSyncer coordinates blockchain sync components. +type chainSyncer struct { + pm *ProtocolManager + force *time.Timer + forced bool // true when force timer fired + peerEventCh chan struct{} + doneCh chan error // non-nil when sync is running +} - // Wait for different events to fire synchronisation operations - forceSync := time.NewTicker(forceSyncCycle) - defer forceSync.Stop() +// chainSyncOp is a scheduled sync operation. +type chainSyncOp struct { + mode downloader.SyncMode + peer *peer + td *big.Int + head common.Hash +} + +// newChainSyncer creates a chainSyncer. +func newChainSyncer(pm *ProtocolManager) *chainSyncer { + return &chainSyncer{ + pm: pm, + peerEventCh: make(chan struct{}), + } +} + +// handlePeerEvent notifies the syncer about a change in the peer set. +// This is called for new peers and every time a peer announces a new +// chain head. +func (cs *chainSyncer) handlePeerEvent(p *peer) bool { + select { + case cs.peerEventCh <- struct{}{}: + return true + case <-cs.pm.quitSync: + return false + } +} + +// loop runs in its own goroutine and launches the sync when necessary. +func (cs *chainSyncer) loop() { + defer cs.pm.wg.Done() + + cs.pm.blockFetcher.Start() + cs.pm.txFetcher.Start() + defer cs.pm.blockFetcher.Stop() + defer cs.pm.txFetcher.Stop() + defer cs.pm.downloader.Terminate() + + // The force timer lowers the peer count threshold down to one when it fires. + // This ensures we'll always start sync even if there aren't enough peers. + cs.force = time.NewTimer(forceSyncCycle) + defer cs.force.Stop() for { + if op := cs.nextSyncOp(); op != nil { + cs.startSync(op) + } + select { - case <-pm.newPeerCh: - // Make sure we have peers to select from, then sync - if pm.peers.Len() < minDesiredPeerCount { - break + case <-cs.peerEventCh: + // Peer information changed, recheck. + case <-cs.doneCh: + cs.doneCh = nil + cs.force.Reset(forceSyncCycle) + cs.forced = false + case <-cs.force.C: + cs.forced = true + + case <-cs.pm.quitSync: + if cs.doneCh != nil { + cs.pm.downloader.Cancel() + <-cs.doneCh } - go pm.synchronise(pm.peers.BestPeer()) - - case <-forceSync.C: - // Force a sync even if not enough peers are present - go pm.synchronise(pm.peers.BestPeer()) - - case <-pm.noMorePeers: return } } } -// synchronise tries to sync up our local block chain with a remote peer. -func (pm *ProtocolManager) synchronise(peer *peer) { - // Short circuit if no peers are available - if peer == nil { - return +// nextSyncOp determines whether sync is required at this time. +func (cs *chainSyncer) nextSyncOp() *chainSyncOp { + if cs.doneCh != nil { + return nil // Sync already running. } - // Make sure the peer's TD is higher than our own - currentHeader := pm.blockchain.CurrentHeader() - td := pm.blockchain.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64()) - pHead, pTd := peer.Head() - if pTd.Cmp(td) <= 0 { - return + // Ensure we're at mininum peer count. + minPeers := defaultMinSyncPeers + if cs.forced { + minPeers = 1 + } else if minPeers > cs.pm.maxPeers { + minPeers = cs.pm.maxPeers } - // Otherwise try to sync with the downloader - mode := downloader.FullSync - if atomic.LoadUint32(&pm.fastSync) == 1 { - // Fast sync was explicitly requested, and explicitly granted - mode = downloader.FastSync + if cs.pm.peers.Len() < minPeers { + return nil } - if mode == downloader.FastSync { - // Make sure the peer's total difficulty we are synchronizing is higher. - if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 { - return - } + + // We have enough peers, check TD. + peer := cs.pm.peers.BestPeer() + if peer == nil { + return nil } - // Run the sync cycle, and disable fast sync if we've went past the pivot block - if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { - return + mode, ourTD := cs.modeAndLocalHead() + op := peerToSyncOp(mode, peer) + if op.td.Cmp(ourTD) <= 0 { + return nil // We're in sync. + } + return op +} + +func peerToSyncOp(mode downloader.SyncMode, p *peer) *chainSyncOp { + peerHead, peerTD := p.Head() + return &chainSyncOp{mode: mode, peer: p, td: peerTD, head: peerHead} +} + +func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { + if atomic.LoadUint32(&cs.pm.fastSync) == 1 { + block := cs.pm.blockchain.CurrentFastBlock() + td := cs.pm.blockchain.GetTdByHash(block.Hash()) + return downloader.FastSync, td + } else { + head := cs.pm.blockchain.CurrentHeader() + td := cs.pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + return downloader.FullSync, td + } +} + +// startSync launches doSync in a new goroutine. +func (cs *chainSyncer) startSync(op *chainSyncOp) { + cs.doneCh = make(chan error, 1) + go func() { cs.doneCh <- cs.pm.doSync(op) }() +} + +// doSync synchronizes the local blockchain with a remote peer. +func (pm *ProtocolManager) doSync(op *chainSyncOp) error { + // Run the sync cycle, and disable fast sync if we're past the pivot block + err := pm.downloader.Synchronise(op.peer.id, op.head, op.td, op.mode) + if err != nil { + return err } if atomic.LoadUint32(&pm.fastSync) == 1 { log.Info("Fast sync complete, auto disabling") atomic.StoreUint32(&pm.fastSync, 0) } + // If we've successfully finished a sync cycle and passed any required checkpoint, // enable accepting transactions from the network. head := pm.blockchain.CurrentBlock() @@ -226,6 +305,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { atomic.StoreUint32(&pm.acceptTxs, 1) } } + if head.NumberU64() > 0 { // We've completed a sync cycle, notify all peers of new state. This path is // essential in star-topology networks where a gateway node needs to notify @@ -233,6 +313,8 @@ func (pm *ProtocolManager) synchronise(peer *peer) { // scenario will most often crop up in private and hackathon networks with // degenerate connectivity, but it should be healthy for the mainnet too to // more reliably update peers or the local TD state. - go pm.BroadcastBlock(head, false) + pm.BroadcastBlock(head, false) } + + return nil } diff --git a/eth/sync_test.go b/eth/sync_test.go index d02bc5710..ac1e5fad1 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -33,6 +33,8 @@ func TestFastSyncDisabling65(t *testing.T) { testFastSyncDisabling(t, 65) } // Tests that fast sync gets disabled as soon as a real block is successfully // imported into the blockchain. func testFastSyncDisabling(t *testing.T, protocol int) { + t.Parallel() + // Create a pristine protocol manager, check that fast sync is left enabled pmEmpty, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil) if atomic.LoadUint32(&pmEmpty.fastSync) == 0 { @@ -43,14 +45,17 @@ func testFastSyncDisabling(t *testing.T, protocol int) { if atomic.LoadUint32(&pmFull.fastSync) == 1 { t.Fatalf("fast sync not disabled on non-empty blockchain") } + // Sync up the two peers io1, io2 := p2p.MsgPipe() - go pmFull.handle(pmFull.newPeer(protocol, p2p.NewPeer(enode.ID{}, "empty", nil), io2, pmFull.txpool.Get)) go pmEmpty.handle(pmEmpty.newPeer(protocol, p2p.NewPeer(enode.ID{}, "full", nil), io1, pmEmpty.txpool.Get)) time.Sleep(250 * time.Millisecond) - pmEmpty.synchronise(pmEmpty.peers.BestPeer()) + op := peerToSyncOp(downloader.FastSync, pmEmpty.peers.BestPeer()) + if err := pmEmpty.doSync(op); err != nil { + t.Fatal("sync failed:", err) + } // Check that fast sync was disabled if atomic.LoadUint32(&pmEmpty.fastSync) == 1 {