diff --git a/core/blockchain.go b/core/blockchain.go index 972d8c3c4..0987d65be 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -178,11 +178,10 @@ type BlockChain struct { txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing - quit chan struct{} // blockchain quit channel - running int32 // running must be called atomically - // procInterrupt must be atomically called - procInterrupt int32 // interrupt signaler for block processing + quit chan struct{} // blockchain quit channel wg sync.WaitGroup // chain processing wait group for shutting down + running int32 // 0 if chain is running, 1 when stopped + procInterrupt int32 // interrupt signaler for block processing engine consensus.Engine validator Validator // Block and state validator interface @@ -239,7 +238,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error - bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) + bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) if err != nil { return nil, err } @@ -332,10 +331,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par return bc, nil } -func (bc *BlockChain) getProcInterrupt() bool { - return atomic.LoadInt32(&bc.procInterrupt) == 1 -} - // GetVMConfig returns the block chain VM config. func (bc *BlockChain) GetVMConfig() *vm.Config { return &bc.vmConfig @@ -882,8 +877,7 @@ func (bc *BlockChain) Stop() { // Unsubscribe all subscriptions registered from blockchain bc.scope.Close() close(bc.quit) - atomic.StoreInt32(&bc.procInterrupt, 1) - + bc.StopInsert() bc.wg.Wait() // Ensure that the entirety of the state snapshot is journalled to disk. @@ -928,6 +922,18 @@ func (bc *BlockChain) Stop() { log.Info("Blockchain stopped") } +// StopInsert interrupts all insertion methods, causing them to return +// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after +// calling this method. +func (bc *BlockChain) StopInsert() { + atomic.StoreInt32(&bc.procInterrupt, 1) +} + +// insertStopped returns true after StopInsert has been called. +func (bc *BlockChain) insertStopped() bool { + return atomic.LoadInt32(&bc.procInterrupt) == 1 +} + func (bc *BlockChain) procFutureBlocks() { blocks := make([]*types.Block, 0, bc.futureBlocks.Len()) for _, hash := range bc.futureBlocks.Keys() { @@ -1113,7 +1119,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var deleted []*numberHash for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.insertStopped() { return 0, errInsertionInterrupted } // Short circuit insertion if it is required(used in testing only) @@ -1260,7 +1266,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ batch := bc.db.NewBatch() for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.insertStopped() { return 0, errInsertionInterrupted } // Short circuit if the owner header is unknown @@ -1708,8 +1714,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // No validation errors for the first block (or chain prefix skipped) for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() { // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - log.Debug("Premature abort during blocks processing") + if bc.insertStopped() { + log.Debug("Abort during block processing") break } // If the header is a banned one, straight out abort @@ -1996,8 +2002,8 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i blocks, memory = blocks[:0], 0 // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - log.Debug("Premature abort during blocks processing") + if bc.insertStopped() { + log.Debug("Abort during blocks processing") return 0, nil } } diff --git a/eth/sync.go b/eth/sync.go index 91af8282a..0982a9702 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -199,7 +199,6 @@ func (cs *chainSyncer) loop() { cs.pm.txFetcher.Start() defer cs.pm.blockFetcher.Stop() defer cs.pm.txFetcher.Stop() - defer cs.pm.downloader.Terminate() // The force timer lowers the peer count threshold down to one when it fires. // This ensures we'll always start sync even if there aren't enough peers. @@ -222,8 +221,13 @@ func (cs *chainSyncer) loop() { cs.forced = true case <-cs.pm.quitSync: + // Disable all insertion on the blockchain. This needs to happen before + // terminating the downloader because the downloader waits for blockchain + // inserts, and these can take a long time to finish. + cs.pm.blockchain.StopInsert() + cs.pm.downloader.Terminate() if cs.doneCh != nil { - cs.pm.downloader.Terminate() // Double term is fine, Cancel would block until queue is emptied + // Wait for the current sync to end. <-cs.doneCh } return diff --git a/light/lightchain.go b/light/lightchain.go index 636e06f51..79eba62c9 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -314,10 +314,16 @@ func (lc *LightChain) Stop() { return } close(lc.quit) - atomic.StoreInt32(&lc.procInterrupt, 1) - + lc.StopInsert() lc.wg.Wait() - log.Info("Blockchain manager stopped") + log.Info("Blockchain stopped") +} + +// StopInsert interrupts all insertion methods, causing them to return +// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after +// calling this method. +func (lc *LightChain) StopInsert() { + atomic.StoreInt32(&lc.procInterrupt, 1) } // Rollback is designed to remove a chain of links from the database that aren't