core: polish side chain importer a bit

This commit is contained in:
Péter Szilágyi 2018-11-20 14:15:26 +02:00
parent 493903eede
commit 333b5fb123
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
3 changed files with 332 additions and 298 deletions

View File

@ -1036,6 +1036,18 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return status, nil return status, nil
} }
// addFutureBlock checks if the block is within the max allowed window to get
// accepted for future processing, and returns an error if the block is too far
// ahead and was not added.
func (bc *BlockChain) addFutureBlock(block *types.Block) error {
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 {
return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max)
}
bc.futureBlocks.Add(block.Hash(), block)
return nil
}
// InsertChain attempts to insert the given batch of blocks in to the canonical // InsertChain attempts to insert the given batch of blocks in to the canonical
// chain or, otherwise, create a fork. If an error is returned it will return // chain or, otherwise, create a fork. If an error is returned it will return
// the index number of the failing block as well an error describing what went // the index number of the failing block as well an error describing what went
@ -1043,92 +1055,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
// //
// After insertion is done, all accumulated events will be fired. // After insertion is done, all accumulated events will be fired.
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
n, events, logs, err := bc.insertChain(chain)
bc.PostChainEvents(events, logs)
return n, err
}
// addFutureBlock checks if the block is within the max allowed window to get accepted for future processing, and
// returns an error if the block is too far ahead and was not added.
func (bc *BlockChain) addFutureBlock(block *types.Block) error {
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 {
return fmt.Errorf("future block: %v > %v", block.Time(), max)
}
bc.futureBlocks.Add(block.Hash(), block)
return nil
}
// importBatch is a helper function to assist during chain import
type importBatch struct {
chain types.Blocks
results <-chan error
index int
validator Validator
}
// newBatch creates a new batch based on the given blocks, which are assumed to be a contiguous chain
func newBatch(chain types.Blocks, results <-chan error, validator Validator) *importBatch {
return &importBatch{
chain: chain,
results: results,
index: -1,
validator: validator,
}
}
// next returns the next block in the batch, along with any potential validation error for that block
// When the end is reached, it will return (nil, nil), but Current() will always return the last element.
func (batch *importBatch) next() (*types.Block, error) {
if batch.index+1 >= len(batch.chain) {
return nil, nil
}
batch.index++
if err := <-batch.results; err != nil {
return batch.chain[batch.index], err
}
return batch.chain[batch.index], batch.validator.ValidateBody(batch.chain[batch.index])
}
// current returns the current block that's being processed. Even after the next() has progressed the entire
// chain, current will always return the last element
func (batch *importBatch) current() *types.Block {
if batch.index < 0 {
return nil
}
return batch.chain[batch.index]
}
// previous returns the previous block was being processed, or nil
func (batch *importBatch) previous() *types.Block {
if batch.index < 1 {
return nil
}
return batch.chain[batch.index-1]
}
// first returns the first block in the batch
func (batch *importBatch) first() *types.Block {
return batch.chain[0]
}
// remaining returns the number of remaining blocks
func (batch *importBatch) remaining() int {
return len(batch.chain) - batch.index
}
// processed returns the number of processed blocks
func (batch *importBatch) processed() int {
return batch.index + 1
}
// insertChain will execute the actual chain insertion and event aggregation. The
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import // Sanity check that we have something meaningful to import
if len(chain) == 0 { if len(chain) == 0 {
return 0, nil, nil, nil return 0, nil
} }
// Do a sanity check that the provided chain is actually ordered and linked // Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ { for i := 1; i < len(chain); i++ {
@ -1137,29 +1066,34 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
} }
} }
log.Info("insertChain", "from", chain[0].NumberU64(), "to", chain[len(chain)-1].NumberU64())
// Pre-checks passed, start the full block imports // Pre-checks passed, start the full block imports
bc.wg.Add(1) bc.wg.Add(1)
defer bc.wg.Done()
bc.chainmu.Lock() bc.chainmu.Lock()
defer bc.chainmu.Unlock() n, events, logs, err := bc.insertChain(chain, true)
return bc.insertChainInternal(chain, true) bc.chainmu.Unlock()
bc.wg.Done()
bc.PostChainEvents(events, logs)
return n, err
} }
//insertChainInternal is the internal implementation of insertChain, which assumes that // insertChain is the internal implementation of insertChain, which assumes that
// 1. chains are contiguous, and // 1) chains are contiguous, and 2) The chain mutex is held.
// 2. The `chainMu` lock is held //
// This method is split out so that import batches that require re-injecting historical blocks can do // This method is split out so that import batches that require re-injecting
// so without releasing the lock, which could lead to racey behaviour. If a sidechain import is in progress, // historical blocks can do so without releasing the lock, which could lead to
// and the historic state is imported, but then new canon-head is added before the actual sidechain completes, // racey behaviour. If a sidechain import is in progress, and the historic state
// then the historic state could be pruned again // is imported, but then new canon-head is added before the actual sidechain
func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { // completes, then the historic state could be pruned again
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// If the chain is terminating, don't even bother starting u
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil, nil, nil
}
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
@ -1171,8 +1105,6 @@ func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool)
events = make([]interface{}, 0, len(chain)) events = make([]interface{}, 0, len(chain))
lastCanon *types.Block lastCanon *types.Block
coalescedLogs []*types.Log coalescedLogs []*types.Log
block *types.Block
err error
) )
// Start the parallel header verifier // Start the parallel header verifier
headers := make([]*types.Header, len(chain)) headers := make([]*types.Header, len(chain))
@ -1185,52 +1117,51 @@ func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool)
abort, results := bc.engine.VerifyHeaders(bc, headers, seals) abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort) defer close(abort)
// Peek the error for the first block // Peek the error for the first block to decide the directing import logic
batch := newBatch(chain, results, bc.Validator()) it := newInsertIterator(chain, results, bc.Validator())
if block, err = batch.next(); err != nil {
if err == consensus.ErrPrunedAncestor {
return bc.insertSidechainInternal(batch, err)
} else if err == consensus.ErrFutureBlock ||
(err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(batch.first().ParentHash())) {
// The first block is a future block block, err := it.next()
// We can shove that one and any child blocks (that fail because of UnknownAncestor) into the future-queue switch {
for block != nil && (batch.index == 0 || err == consensus.ErrUnknownAncestor) { // First block is pruned, insert as sidechain and reorg only if TD grows enough
block := batch.current() case err == consensus.ErrPrunedAncestor:
if futureError := bc.addFutureBlock(block); futureError != nil { return bc.insertSidechain(it)
return batch.index, events, coalescedLogs, futureError
// First block is future, shove it (and all children) to the future queue (unknown ancestor)
case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())):
for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
} }
block, err = batch.next() block, err = it.next()
} }
stats.queued += batch.processed() stats.queued += it.processed()
stats.ignored += batch.remaining() stats.ignored += it.remaining()
// If there are any still remaining, mark as ignored // If there are any still remaining, mark as ignored
return batch.index, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} else if err == ErrKnownBlock {
// Block and state both already known -- there can be two explanations. // First block (and state) is known
// 1. We did a roll-back, and should now do a re-import // 1. We did a roll-back, and should now do a re-import
// 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot
// from the canonical chain, which has not been verified. // from the canonical chain, which has not been verified.
case err == ErrKnownBlock:
// Skip all known blocks that behind us
current := bc.CurrentBlock().NumberU64()
// Skip all known blocks that are blocks behind us for block != nil && err == ErrKnownBlock && current >= block.NumberU64() {
currentNum := bc.CurrentBlock().NumberU64()
for block != nil && err == ErrKnownBlock && currentNum >= block.NumberU64() {
// We ignore these
stats.ignored++ stats.ignored++
block, err = batch.next() block, err = it.next()
} }
// Falls through to the block import // Falls through to the block import
} else {
// Some other error // Some other error occurred, abort
stats.ignored += len(batch.chain) case err != nil:
stats.ignored += len(it.chain)
bc.reportBlock(block, nil, err) bc.reportBlock(block, nil, err)
return batch.index, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
} // No validation errors for the first block (or chain prefix skipped)
// No validation errors for ; block != nil && err == nil; block, err = it.next() {
for ; block != nil && err == nil; block, err = batch.next() {
// If the chain is terminating, stop processing blocks // If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 { if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing") log.Debug("Premature abort during blocks processing")
@ -1239,43 +1170,43 @@ func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool)
// If the header is a banned one, straight out abort // If the header is a banned one, straight out abort
if BadHashes[block.Hash()] { if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash) bc.reportBlock(block, nil, ErrBlacklistedHash)
return batch.index, events, coalescedLogs, ErrBlacklistedHash return it.index, events, coalescedLogs, ErrBlacklistedHash
} }
bstart := time.Now() // Retrieve the parent block and it's state to execute on top
var parent *types.Block start := time.Now()
parent = batch.previous()
parent := it.previous()
if parent == nil { if parent == nil {
parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
} }
state, err := state.New(parent.Root(), bc.stateCache) state, err := state.New(parent.Root(), bc.stateCache)
if err != nil { if err != nil {
return batch.index, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
// Process block using the parent state as reference point. // Process block using the parent state as reference point.
receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
if err != nil { if err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
return batch.index, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
// Validate the state using the default validator // Validate the state using the default validator
if err := bc.Validator().ValidateState(block, parent, state, receipts, usedGas); err != nil { if err := bc.Validator().ValidateState(block, parent, state, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
return batch.index, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
proctime := time.Since(bstart) proctime := time.Since(start)
// Write the block to the chain and get the status. // Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(block, receipts, state) status, err := bc.WriteBlockWithState(block, receipts, state)
if err != nil { if err != nil {
return batch.index, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
switch status { switch status {
case CanonStatTy: case CanonStatTy:
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
"uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
"elapsed", common.PrettyDuration(time.Since(bstart)), "elapsed", common.PrettyDuration(time.Since(start)),
"root", block.Root().String()) "root", block.Root())
coalescedLogs = append(coalescedLogs, logs...) coalescedLogs = append(coalescedLogs, logs...)
events = append(events, ChainEvent{block, block.Hash(), logs}) events = append(events, ChainEvent{block, block.Hash(), logs})
@ -1286,192 +1217,152 @@ func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool)
case SideStatTy: case SideStatTy:
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(bstart)), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root().String()) "root", block.Root())
events = append(events, ChainSideEvent{block}) events = append(events, ChainSideEvent{block})
} }
blockInsertTimer.UpdateSince(bstart) blockInsertTimer.UpdateSince(start)
stats.processed++ stats.processed++
stats.usedGas += usedGas stats.usedGas += usedGas
cache, _ := bc.stateCache.TrieDB().Size() cache, _ := bc.stateCache.TrieDB().Size()
stats.report(chain, batch.index, cache) stats.report(chain, it.index, cache)
} }
// Any blocks remaining here? The only ones we care about are the future ones
// Any blocks remaining here? If so, the only ones we need to care about are
// shoving future blocks into queue
if block != nil && err == consensus.ErrFutureBlock { if block != nil && err == consensus.ErrFutureBlock {
if futureErr := bc.addFutureBlock(block); futureErr != nil { if err := bc.addFutureBlock(block); err != nil {
return batch.index, events, coalescedLogs, futureErr return it.index, events, coalescedLogs, err
} }
for block, err = batch.next(); block != nil && err == consensus.ErrUnknownAncestor; { block, err = it.next()
if futureErr := bc.addFutureBlock(block); futureErr != nil {
return batch.index, events, coalescedLogs, futureErr for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
} }
stats.queued++ stats.queued++
block, err = batch.next()
} }
} }
stats.ignored += batch.remaining() stats.ignored += it.remaining()
// Append a single chain head event if we've progressed the chain // Append a single chain head event if we've progressed the chain
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon}) events = append(events, ChainHeadEvent{lastCanon})
} }
return 0, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
// insertSidechainInternal should be called when an import batch hits upon a pruned ancestor error, which happens when // insertSidechain is called when an import batch hits upon a pruned ancestor
// an sidechain with a sufficiently old fork-block is found. It writes all (header-and-body-valid) blocks to disk, then // error, which happens when a sidechain with a sufficiently old fork-block is
// tries to switch over to the new chain if the TD exceeded the current chain. // found.
// It assumes that relevant locks are held already (hence 'Internal') //
func (bc *BlockChain) insertSidechainInternal(batch *importBatch, err error) (int, []interface{}, []*types.Log, error) { // The method writes all (header-and-body-valid) blocks to disk, then tries to
// If we're given a chain of blocks, and the first one is pruned, that means we're getting a // switch over to the new chain if the TD exceeded the current chain.
// sidechain imported. On the sidechain, we validate headers, but do not validate body and state func (bc *BlockChain) insertSidechain(it *insertIterator) (int, []interface{}, []*types.Log, error) {
// (and actually import them) until the sidechain reaches a higher TD.
// Until then, we store them in the database (assuming that the header PoW check works out)
var ( var (
externTd *big.Int externTd *big.Int
canonHeadNumber = bc.CurrentBlock().NumberU64() current = bc.CurrentBlock().NumberU64()
events = make([]interface{}, 0)
coalescedLogs []*types.Log
) )
// The first sidechain block error is already verified to be ErrPrunedAncestor. Since we don't import // The first sidechain block error is already verified to be ErrPrunedAncestor.
// them here, we expect ErrUnknownAncestor for the remaining ones. Any other errors means that // Since we don't import them here, we expect ErrUnknownAncestor for the remaining
// the block is invalid, and should not be written to disk. // ones. Any other errors means that the block is invalid, and should not be written
block := batch.current() // to disk.
for block != nil && (err == consensus.ErrPrunedAncestor) { block, err := it.current(), consensus.ErrPrunedAncestor
for ; block != nil && (err == consensus.ErrPrunedAncestor); block, err = it.next() {
// Check the canonical state root for that number // Check the canonical state root for that number
if remoteNum := block.NumberU64(); canonHeadNumber >= remoteNum { if number := block.NumberU64(); current >= number {
canonBlock := bc.GetBlockByNumber(remoteNum) if canonical := bc.GetBlockByNumber(number); canonical != nil && canonical.Root() == block.Root() {
if canonBlock != nil && canonBlock.Root() == block.Root() { // This is most likely a shadow-state attack. When a fork is imported into the
// This is most likely a shadow-state attack. // database, and it eventually reaches a block height which is not pruned, we
// When a fork is imported into the database, and it eventually reaches a block height which is // just found that the state already exist! This means that the sidechain block
// not pruned, we just found that the state already exist! This means that the sidechain block
// refers to a state which already exists in our canon chain. // refers to a state which already exists in our canon chain.
// If left unchecked, we would now proceed importing the blocks, without actually having verified //
// the state of the previous blocks. // If left unchecked, we would now proceed importing the blocks, without actually
log.Warn("Sidechain ghost-state attack detected", "blocknum", block.NumberU64(), // having verified the state of the previous blocks.
"sidechain root", block.Root(), "canon root", canonBlock.Root()) log.Warn("Sidechain ghost-state attack detected", "number", block.NumberU64(), "sideroot", block.Root(), "canonroot", canonical.Root())
// If someone legitimately side-mines blocks, they would still be imported as usual. However, // If someone legitimately side-mines blocks, they would still be imported as usual. However,
// we cannot risk writing unverified blocks to disk when they obviously target the pruning // we cannot risk writing unverified blocks to disk when they obviously target the pruning
// mechanism. // mechanism.
return batch.index, events, coalescedLogs, fmt.Errorf("sidechain ghost-state attack detected") return it.index, nil, nil, errors.New("sidechain ghost-state attack")
} }
} }
if externTd == nil { if externTd == nil {
externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1) externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1)
} }
externTd = new(big.Int).Add(externTd, block.Difficulty()) externTd = new(big.Int).Add(externTd, block.Difficulty())
if !bc.HasBlock(block.Hash(), block.NumberU64()) { if !bc.HasBlock(block.Hash(), block.NumberU64()) {
start := time.Now()
if err := bc.WriteBlockWithoutState(block, externTd); err != nil { if err := bc.WriteBlockWithoutState(block, externTd); err != nil {
return batch.index, events, coalescedLogs, err return it.index, nil, nil, err
}
log.Debug("Inserted sidechain block", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root())
} }
} }
block, err = batch.next() // At this point, we've written all sidechain blocks to database. Loop ended
} // either on some other error or all were processed. If there was some other
// At this point, we've written all sidechain blocks to database. Loop ended either on some other error, // error, we can ignore the rest of those blocks.
// or all were processed. If there was some other error, we can ignore the rest of those blocks.
// //
// If the externTd was larger than our local TD, we now need to reimport the previous // If the externTd was larger than our local TD, we now need to reimport the previous
// blocks to regenerate the required state // blocks to regenerate the required state
currentBlock := bc.CurrentBlock() localTd := bc.GetTd(bc.CurrentBlock().Hash(), current)
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
// don't process until the competitor TD goes above the canonical TD
if localTd.Cmp(externTd) > 0 { if localTd.Cmp(externTd) > 0 {
// If we have hit a sidechain, we may have to reimport pruned blocks log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().NumberU64(), "sidetd", externTd, "localtd", localTd)
log.Info("Sidechain stored", "start", batch.first().NumberU64(), "end", batch.current().NumberU64(), "sidechain TD", externTd, "local TD", localTd) return it.index, nil, nil, err
return batch.index, events, coalescedLogs, err
} }
// Competitor chain beat canonical. Before we reprocess to get the common ancestor, investigate if // Gather all the sidechain hashes (full blocks may be memory heavy)
// any blocks in the chain are 'known bad' blocks. var (
for index, b := range batch.chain { hashes []common.Hash
if bc.badBlocks.Contains(b.Hash()) { numbers []uint64
log.Info("Sidechain import aborted, bad block found", "index", index, "hash", b.Hash()) )
return index, events, coalescedLogs, fmt.Errorf("known bad block %d 0x%x", b.NumberU64(), b.Hash()) parent := bc.GetHeader(it.previous().Hash(), it.previous().NumberU64())
for parent != nil && !bc.HasState(parent.Root) {
hashes = append(hashes, parent.Hash())
numbers = append(numbers, parent.Number.Uint64())
parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
} }
} if parent == nil {
// gather all blocks from the common ancestor return it.index, nil, nil, errors.New("missing parent")
var parents []*types.Block
// Import all the pruned blocks to make the state available
parent := bc.GetBlock(batch.first().ParentHash(), batch.first().NumberU64()-1)
for !bc.HasState(parent.Root()) {
if bc.badBlocks.Contains(parent.Hash()) {
log.Info("Sidechain parent processing aborted, bad block found", "number", parent.NumberU64(), "hash", parent.Hash())
return 0, events, coalescedLogs, fmt.Errorf("known bad block %d 0x%x", parent.NumberU64(), parent.Hash())
}
parents = append(parents, parent)
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
}
for j := 0; j < len(parents)/2; j++ {
parents[j], parents[len(parents)-1-j] = parents[len(parents)-1-j], parents[j]
} }
// Import all the pruned blocks to make the state available // Import all the pruned blocks to make the state available
// During re-import, we can disable PoW-verification, since these are already verified
log.Info("Inserting parent blocks for reprocessing", "first", parents[0].NumberU64(), "count", len(parents), "last", parents[len(parents)-1].NumberU64)
_, evs, logs, err := bc.insertChainInternal(parents, false)
events, coalescedLogs = evs, logs
if err != nil {
return 0, events, coalescedLogs, err
}
log.Info("Inserting sidechain blocks for processing")
errindex, events, coalescedLogs, err := bc.insertChainInternal(batch.chain[0:batch.index], false)
return errindex, events, coalescedLogs, err
}
// insertStats tracks and reports on block insertion.
type insertStats struct {
queued, processed, ignored int
usedGas uint64
lastIndex int
startTime mclock.AbsTime
}
// statsReportLimit is the time limit during import and export after which we
// always print out progress. This avoids the user wondering what's going on.
const statsReportLimit = 8 * time.Second
// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) {
// Fetch the timings for the batch
var ( var (
now = mclock.Now() blocks []*types.Block
elapsed = time.Duration(now) - time.Duration(st.startTime) memory common.StorageSize
) )
// If we're at the last block of the batch or report period reached, log for i := len(hashes) - 1; i >= 0; i-- {
if index == len(chain)-1 || elapsed >= statsReportLimit { // Append the next block to our batch
var ( block := bc.GetBlock(hashes[i], numbers[i])
end = chain[index]
txs = countTransactions(chain[st.lastIndex : index+1])
)
context := []interface{}{
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
"number", end.Number(), "hash", end.Hash(),
}
if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
context = append(context, []interface{}{"cache", cache}...)
if st.queued > 0 { blocks = append(blocks, block)
context = append(context, []interface{}{"queued", st.queued}...) memory += block.Size()
}
if st.ignored > 0 {
context = append(context, []interface{}{"ignored", st.ignored}...)
}
log.Info("Imported new chain segment", context...)
*st = insertStats{startTime: now, lastIndex: index + 1} // If memory use grew too large, import and continue. Sadly we need to discard
} // all raised events and logs from notifications since we're too heavy on the
// memory here.
if len(blocks) >= 2048 || memory > 64*1024*1024 {
log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
if _, _, _, err := bc.insertChain(blocks, false); err != nil {
return 0, nil, nil, err
} }
blocks, memory = blocks[:0], 0
func countTransactions(chain []*types.Block) (c int) { // If the chain is terminating, stop processing blocks
for _, b := range chain { if atomic.LoadInt32(&bc.procInterrupt) == 1 {
c += len(b.Transactions()) log.Debug("Premature abort during blocks processing")
return 0, nil, nil, nil
} }
return c }
}
if len(blocks) > 0 {
log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
return bc.insertChain(blocks, false)
}
return 0, nil, nil, nil
} }
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them // reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them

143
core/blockchain_insert.go Normal file
View File

@ -0,0 +1,143 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// insertStats tracks and reports on block insertion.
type insertStats struct {
queued, processed, ignored int
usedGas uint64
lastIndex int
startTime mclock.AbsTime
}
// statsReportLimit is the time limit during import and export after which we
// always print out progress. This avoids the user wondering what's going on.
const statsReportLimit = 8 * time.Second
// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) {
// Fetch the timings for the batch
var (
now = mclock.Now()
elapsed = time.Duration(now) - time.Duration(st.startTime)
)
// If we're at the last block of the batch or report period reached, log
if index == len(chain)-1 || elapsed >= statsReportLimit {
// Count the number of transactions in this segment
var txs int
for _, block := range chain[st.lastIndex : index+1] {
txs += len(block.Transactions())
}
end := chain[index]
// Assemble the log context and send it to the logger
context := []interface{}{
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
"number", end.Number(), "hash", end.Hash(),
}
if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
context = append(context, []interface{}{"cache", cache}...)
if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
}
if st.ignored > 0 {
context = append(context, []interface{}{"ignored", st.ignored}...)
}
log.Info("Imported new chain segment", context...)
// Bump the stats reported to the next section
*st = insertStats{startTime: now, lastIndex: index + 1}
}
}
// insertIterator is a helper to assist during chain import.
type insertIterator struct {
chain types.Blocks
results <-chan error
index int
validator Validator
}
// newInsertIterator creates a new iterator based on the given blocks, which are
// assumed to be a contiguous chain.
func newInsertIterator(chain types.Blocks, results <-chan error, validator Validator) *insertIterator {
return &insertIterator{
chain: chain,
results: results,
index: -1,
validator: validator,
}
}
// next returns the next block in the iterator, along with any potential validation
// error for that block. When the end is reached, it will return (nil, nil).
func (it *insertIterator) next() (*types.Block, error) {
if it.index+1 >= len(it.chain) {
it.index = len(it.chain)
return nil, nil
}
it.index++
if err := <-it.results; err != nil {
return it.chain[it.index], err
}
return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index])
}
// current returns the current block that's being processed.
func (it *insertIterator) current() *types.Block {
if it.index < 0 || it.index+1 >= len(it.chain) {
return nil
}
return it.chain[it.index]
}
// previous returns the previous block was being processed, or nil
func (it *insertIterator) previous() *types.Block {
if it.index < 1 {
return nil
}
return it.chain[it.index-1]
}
// first returns the first block in the it.
func (it *insertIterator) first() *types.Block {
return it.chain[0]
}
// remaining returns the number of remaining blocks.
func (it *insertIterator) remaining() int {
return len(it.chain) - it.index
}
// processed returns the number of processed blocks.
func (it *insertIterator) processed() int {
return it.index + 1
}

View File

@ -579,11 +579,11 @@ func testInsertNonceError(t *testing.T, full bool) {
blockchain.hc.engine = blockchain.engine blockchain.hc.engine = blockchain.engine
failRes, err = blockchain.InsertHeaderChain(headers, 1) failRes, err = blockchain.InsertHeaderChain(headers, 1)
} }
// Check that the returned error indicates the failure. // Check that the returned error indicates the failure
if failRes != failAt { if failRes != failAt {
t.Errorf("test %d: failure index mismatch: have %d, want %d", i, failRes, failAt) t.Errorf("test %d: failure (%v) index mismatch: have %d, want %d", i, err, failRes, failAt)
} }
// Check that all no blocks after the failing block have been inserted. // Check that all blocks after the failing block have been inserted
for j := 0; j < i-failAt; j++ { for j := 0; j < i-failAt; j++ {
if full { if full {
if block := blockchain.GetBlockByNumber(failNum + uint64(j)); block != nil { if block := blockchain.GetBlockByNumber(failNum + uint64(j)); block != nil {
@ -1345,7 +1345,7 @@ func TestLargeReorgTrieGC(t *testing.T) {
t.Fatalf("failed to insert shared chain: %v", err) t.Fatalf("failed to insert shared chain: %v", err)
} }
if _, err := chain.InsertChain(original); err != nil { if _, err := chain.InsertChain(original); err != nil {
t.Fatalf("failed to insert shared chain: %v", err) t.Fatalf("failed to insert original chain: %v", err)
} }
// Ensure that the state associated with the forking point is pruned away // Ensure that the state associated with the forking point is pruned away
if node, _ := chain.stateCache.TrieDB().Node(shared[len(shared)-1].Root()); node != nil { if node, _ := chain.stateCache.TrieDB().Node(shared[len(shared)-1].Root()); node != nil {