core: changed interrupt strategy

Removed chain manager's select/channel approach when checking for
interrupts. Now using an atomic int32 instead which checked for every
block processed.
This commit is contained in:
obscuren 2015-06-12 16:45:53 +02:00
parent 90c4493a10
commit 645dfd9693

View File

@ -8,6 +8,7 @@ import (
"os" "os"
"runtime" "runtime"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -100,9 +101,10 @@ type ChainManager struct {
cache *BlockCache cache *BlockCache
futureBlocks *BlockCache futureBlocks *BlockCache
quit chan struct{} quit chan struct{}
procInterupt chan struct{} // interupt signaler for block processing // procInterrupt must be atomically called
wg sync.WaitGroup procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup
pow pow.PoW pow pow.PoW
} }
@ -114,7 +116,6 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow
genesisBlock: GenesisBlock(42, stateDb), genesisBlock: GenesisBlock(42, stateDb),
eventMux: mux, eventMux: mux,
quit: make(chan struct{}), quit: make(chan struct{}),
procInterupt: make(chan struct{}),
cache: NewBlockCache(blockCacheLimit), cache: NewBlockCache(blockCacheLimit),
pow: pow, pow: pow,
} }
@ -518,7 +519,7 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
func (bc *ChainManager) Stop() { func (bc *ChainManager) Stop() {
close(bc.quit) close(bc.quit)
close(bc.procInterupt) atomic.StoreInt32(&bc.procInterrupt, 1)
bc.wg.Wait() bc.wg.Wait()
@ -571,126 +572,124 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
defer close(nonceQuit) defer close(nonceQuit)
txcount := 0 txcount := 0
done:
for i, block := range chain { for i, block := range chain {
select { if atomic.LoadInt32(&self.procInterrupt) == 1 {
case <-self.procInterupt:
glog.V(logger.Debug).Infoln("Premature abort during chain processing") glog.V(logger.Debug).Infoln("Premature abort during chain processing")
break done break
default:
bstart := time.Now()
// Wait for block i's nonce to be verified before processing
// its state transition.
for !nonceChecked[i] {
r := <-nonceDone
nonceChecked[r.i] = true
if !r.valid {
block := chain[r.i]
return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
}
}
if BadHashes[block.Hash()] {
err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
blockErr(block, err)
return i, err
}
// Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
// Call in to the block processor and check for errors. It's likely that if one block fails
// all others will fail too (unless a known block is returned).
logs, err := self.processor.Process(block)
if err != nil {
if IsKnownBlockErr(err) {
stats.ignored++
continue
}
if err == BlockFutureErr {
// Allow up to MaxFuture second in the future blocks. If this limit
// is exceeded the chain is discarded and processed at a later time
// if given.
if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
}
block.SetQueued(true)
self.futureBlocks.Push(block)
stats.queued++
continue
}
if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
block.SetQueued(true)
self.futureBlocks.Push(block)
stats.queued++
continue
}
blockErr(block, err)
return i, err
}
txcount += len(block.Transactions())
cblock := self.currentBlock
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
// At this point it's possible that a different chain (fork) becomes the new canonical chain.
if block.Td.Cmp(self.Td()) > 0 {
// chain fork
if block.ParentHash() != cblock.Hash() {
// during split we merge two different chains and create the new canonical chain
err := self.merge(cblock, block)
if err != nil {
return i, err
}
queue[i] = ChainSplitEvent{block, logs}
queueEvent.splitCount++
}
self.mu.Lock()
self.setTotalDifficulty(block.Td)
self.insert(block)
self.mu.Unlock()
jsonlogger.LogJson(&logger.EthChainNewHead{
BlockHash: block.Hash().Hex(),
BlockNumber: block.Number(),
ChainHeadHash: cblock.Hash().Hex(),
BlockPrevHash: block.ParentHash().Hex(),
})
self.setTransState(state.New(block.Root(), self.stateDb))
self.txState.SetState(state.New(block.Root(), self.stateDb))
queue[i] = ChainEvent{block, block.Hash(), logs}
queueEvent.canonicalCount++
if glog.V(logger.Debug) {
glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
} else {
if glog.V(logger.Detail) {
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
queue[i] = ChainSideEvent{block, logs}
queueEvent.sideCount++
}
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
// not in the canonical chain.
self.write(block)
// Delete from future blocks
self.futureBlocks.Delete(block.Hash())
stats.processed++
} }
bstart := time.Now()
// Wait for block i's nonce to be verified before processing
// its state transition.
for !nonceChecked[i] {
r := <-nonceDone
nonceChecked[r.i] = true
if !r.valid {
block := chain[r.i]
return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
}
}
if BadHashes[block.Hash()] {
err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
blockErr(block, err)
return i, err
}
// Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
// Call in to the block processor and check for errors. It's likely that if one block fails
// all others will fail too (unless a known block is returned).
logs, err := self.processor.Process(block)
if err != nil {
if IsKnownBlockErr(err) {
stats.ignored++
continue
}
if err == BlockFutureErr {
// Allow up to MaxFuture second in the future blocks. If this limit
// is exceeded the chain is discarded and processed at a later time
// if given.
if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
}
block.SetQueued(true)
self.futureBlocks.Push(block)
stats.queued++
continue
}
if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
block.SetQueued(true)
self.futureBlocks.Push(block)
stats.queued++
continue
}
blockErr(block, err)
return i, err
}
txcount += len(block.Transactions())
cblock := self.currentBlock
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
// At this point it's possible that a different chain (fork) becomes the new canonical chain.
if block.Td.Cmp(self.Td()) > 0 {
// chain fork
if block.ParentHash() != cblock.Hash() {
// during split we merge two different chains and create the new canonical chain
err := self.merge(cblock, block)
if err != nil {
return i, err
}
queue[i] = ChainSplitEvent{block, logs}
queueEvent.splitCount++
}
self.mu.Lock()
self.setTotalDifficulty(block.Td)
self.insert(block)
self.mu.Unlock()
jsonlogger.LogJson(&logger.EthChainNewHead{
BlockHash: block.Hash().Hex(),
BlockNumber: block.Number(),
ChainHeadHash: cblock.Hash().Hex(),
BlockPrevHash: block.ParentHash().Hex(),
})
self.setTransState(state.New(block.Root(), self.stateDb))
self.txState.SetState(state.New(block.Root(), self.stateDb))
queue[i] = ChainEvent{block, block.Hash(), logs}
queueEvent.canonicalCount++
if glog.V(logger.Debug) {
glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
} else {
if glog.V(logger.Detail) {
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
queue[i] = ChainSideEvent{block, logs}
queueEvent.sideCount++
}
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
// not in the canonical chain.
self.write(block)
// Delete from future blocks
self.futureBlocks.Delete(block.Hash())
stats.processed++
} }
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) { if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {