Merge pull request #17973 from holiman/splitter2

core: better side-chain importing
This commit is contained in:
Péter Szilágyi 2018-11-22 15:01:10 +02:00 committed by GitHub
commit 3ba0418a9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 372 additions and 161 deletions

View File

@ -1036,6 +1036,18 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return status, nil return status, nil
} }
// addFutureBlock checks if the block is within the max allowed window to get
// accepted for future processing, and returns an error if the block is too far
// ahead and was not added.
func (bc *BlockChain) addFutureBlock(block *types.Block) error {
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 {
return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max)
}
bc.futureBlocks.Add(block.Hash(), block)
return nil
}
// InsertChain attempts to insert the given batch of blocks in to the canonical // InsertChain attempts to insert the given batch of blocks in to the canonical
// chain or, otherwise, create a fork. If an error is returned it will return // chain or, otherwise, create a fork. If an error is returned it will return
// the index number of the failing block as well an error describing what went // the index number of the failing block as well an error describing what went
@ -1043,18 +1055,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
// //
// After insertion is done, all accumulated events will be fired. // After insertion is done, all accumulated events will be fired.
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
n, events, logs, err := bc.insertChain(chain)
bc.PostChainEvents(events, logs)
return n, err
}
// insertChain will execute the actual chain insertion and event aggregation. The
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import // Sanity check that we have something meaningful to import
if len(chain) == 0 { if len(chain) == 0 {
return 0, nil, nil, nil return 0, nil
} }
// Do a sanity check that the provided chain is actually ordered and linked // Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ { for i := 1; i < len(chain); i++ {
@ -1063,16 +1066,36 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
} }
} }
// Pre-checks passed, start the full block imports // Pre-checks passed, start the full block imports
bc.wg.Add(1) bc.wg.Add(1)
defer bc.wg.Done()
bc.chainmu.Lock() bc.chainmu.Lock()
defer bc.chainmu.Unlock() n, events, logs, err := bc.insertChain(chain, true)
bc.chainmu.Unlock()
bc.wg.Done()
bc.PostChainEvents(events, logs)
return n, err
}
// insertChain is the internal implementation of insertChain, which assumes that
// 1) chains are contiguous, and 2) The chain mutex is held.
//
// This method is split out so that import batches that require re-injecting
// historical blocks can do so without releasing the lock, which could lead to
// racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// If the chain is terminating, don't even bother starting u
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil, nil, nil
}
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
// A queued approach to delivering events. This is generally // A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex // faster than direct delivery and requires much less mutex
@ -1089,16 +1112,56 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
for i, block := range chain { for i, block := range chain {
headers[i] = block.Header() headers[i] = block.Header()
seals[i] = true seals[i] = verifySeals
} }
abort, results := bc.engine.VerifyHeaders(bc, headers, seals) abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort) defer close(abort)
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) // Peek the error for the first block to decide the directing import logic
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) it := newInsertIterator(chain, results, bc.Validator())
// Iterate over the blocks and insert when the verifier permits block, err := it.next()
for i, block := range chain { switch {
// First block is pruned, insert as sidechain and reorg only if TD grows enough
case err == consensus.ErrPrunedAncestor:
return bc.insertSidechain(it)
// First block is future, shove it (and all children) to the future queue (unknown ancestor)
case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())):
for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
}
block, err = it.next()
}
stats.queued += it.processed()
stats.ignored += it.remaining()
// If there are any still remaining, mark as ignored
return it.index, events, coalescedLogs, err
// First block (and state) is known
// 1. We did a roll-back, and should now do a re-import
// 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot
// from the canonical chain, which has not been verified.
case err == ErrKnownBlock:
// Skip all known blocks that behind us
current := bc.CurrentBlock().NumberU64()
for block != nil && err == ErrKnownBlock && current >= block.NumberU64() {
stats.ignored++
block, err = it.next()
}
// Falls through to the block import
// Some other error occurred, abort
case err != nil:
stats.ignored += len(it.chain)
bc.reportBlock(block, nil, err)
return it.index, events, coalescedLogs, err
}
// No validation errors for the first block (or chain prefix skipped)
for ; block != nil && err == nil; block, err = it.next() {
// If the chain is terminating, stop processing blocks // If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 { if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing") log.Debug("Premature abort during blocks processing")
@ -1107,115 +1170,45 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
// If the header is a banned one, straight out abort // If the header is a banned one, straight out abort
if BadHashes[block.Hash()] { if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash) bc.reportBlock(block, nil, ErrBlacklistedHash)
return i, events, coalescedLogs, ErrBlacklistedHash return it.index, events, coalescedLogs, ErrBlacklistedHash
} }
// Wait for the block's verification to complete // Retrieve the parent block and it's state to execute on top
bstart := time.Now() start := time.Now()
err := <-results parent := it.previous()
if err == nil { if parent == nil {
err = bc.Validator().ValidateBody(block)
}
switch {
case err == ErrKnownBlock:
// Block and state both already known. However if the current block is below
// this number we did a rollback and we should reimport it nonetheless.
if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
stats.ignored++
continue
}
case err == consensus.ErrFutureBlock:
// Allow up to MaxFuture second in the future blocks. If this limit is exceeded
// the chain is discarded and processed at a later time if given.
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 {
return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max)
}
bc.futureBlocks.Add(block.Hash(), block)
stats.queued++
continue
case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()):
bc.futureBlocks.Add(block.Hash(), block)
stats.queued++
continue
case err == consensus.ErrPrunedAncestor:
// Block competing with the canonical chain, store in the db, but don't process
// until the competitor TD goes above the canonical TD
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
if localTd.Cmp(externTd) > 0 {
if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
return i, events, coalescedLogs, err
}
continue
}
// Competitor chain beat canonical, gather all blocks from the common ancestor
var winner []*types.Block
parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
for !bc.HasState(parent.Root()) {
winner = append(winner, parent)
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
}
for j := 0; j < len(winner)/2; j++ {
winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j]
}
// Import all the pruned blocks to make the state available
bc.chainmu.Unlock()
_, evs, logs, err := bc.insertChain(winner)
bc.chainmu.Lock()
events, coalescedLogs = evs, logs
if err != nil {
return i, events, coalescedLogs, err
}
case err != nil:
bc.reportBlock(block, nil, err)
return i, events, coalescedLogs, err
}
// Create a new statedb using the parent block and report an
// error if it fails.
var parent *types.Block
if i == 0 {
parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
} else {
parent = chain[i-1]
} }
state, err := state.New(parent.Root(), bc.stateCache) state, err := state.New(parent.Root(), bc.stateCache)
if err != nil { if err != nil {
return i, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
// Process block using the parent state as reference point. // Process block using the parent state as reference point.
receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
if err != nil { if err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
return i, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
// Validate the state using the default validator // Validate the state using the default validator
err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas) if err := bc.Validator().ValidateState(block, parent, state, receipts, usedGas); err != nil {
if err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
return i, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
proctime := time.Since(bstart) proctime := time.Since(start)
// Write the block to the chain and get the status. // Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(block, receipts, state) status, err := bc.WriteBlockWithState(block, receipts, state)
if err != nil { if err != nil {
return i, events, coalescedLogs, err return it.index, events, coalescedLogs, err
} }
switch status { switch status {
case CanonStatTy: case CanonStatTy:
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
"elapsed", common.PrettyDuration(time.Since(start)),
"root", block.Root())
coalescedLogs = append(coalescedLogs, logs...) coalescedLogs = append(coalescedLogs, logs...)
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainEvent{block, block.Hash(), logs}) events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block lastCanon = block
@ -1223,78 +1216,153 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
bc.gcproc += proctime bc.gcproc += proctime
case SideStatTy: case SideStatTy:
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(),
common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
blockInsertTimer.UpdateSince(bstart) "root", block.Root())
events = append(events, ChainSideEvent{block}) events = append(events, ChainSideEvent{block})
} }
blockInsertTimer.UpdateSince(start)
stats.processed++ stats.processed++
stats.usedGas += usedGas stats.usedGas += usedGas
cache, _ := bc.stateCache.TrieDB().Size() cache, _ := bc.stateCache.TrieDB().Size()
stats.report(chain, i, cache) stats.report(chain, it.index, cache)
} }
// Any blocks remaining here? The only ones we care about are the future ones
if block != nil && err == consensus.ErrFutureBlock {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
}
block, err = it.next()
for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
}
stats.queued++
}
}
stats.ignored += it.remaining()
// Append a single chain head event if we've progressed the chain // Append a single chain head event if we've progressed the chain
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon}) events = append(events, ChainHeadEvent{lastCanon})
} }
return 0, events, coalescedLogs, nil return it.index, events, coalescedLogs, err
} }
// insertStats tracks and reports on block insertion. // insertSidechain is called when an import batch hits upon a pruned ancestor
type insertStats struct { // error, which happens when a sidechain with a sufficiently old fork-block is
queued, processed, ignored int // found.
usedGas uint64 //
lastIndex int // The method writes all (header-and-body-valid) blocks to disk, then tries to
startTime mclock.AbsTime // switch over to the new chain if the TD exceeded the current chain.
} func (bc *BlockChain) insertSidechain(it *insertIterator) (int, []interface{}, []*types.Log, error) {
// statsReportLimit is the time limit during import and export after which we
// always print out progress. This avoids the user wondering what's going on.
const statsReportLimit = 8 * time.Second
// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) {
// Fetch the timings for the batch
var ( var (
now = mclock.Now() externTd *big.Int
elapsed = time.Duration(now) - time.Duration(st.startTime) current = bc.CurrentBlock().NumberU64()
) )
// If we're at the last block of the batch or report period reached, log // The first sidechain block error is already verified to be ErrPrunedAncestor.
if index == len(chain)-1 || elapsed >= statsReportLimit { // Since we don't import them here, we expect ErrUnknownAncestor for the remaining
// ones. Any other errors means that the block is invalid, and should not be written
// to disk.
block, err := it.current(), consensus.ErrPrunedAncestor
for ; block != nil && (err == consensus.ErrPrunedAncestor); block, err = it.next() {
// Check the canonical state root for that number
if number := block.NumberU64(); current >= number {
if canonical := bc.GetBlockByNumber(number); canonical != nil && canonical.Root() == block.Root() {
// This is most likely a shadow-state attack. When a fork is imported into the
// database, and it eventually reaches a block height which is not pruned, we
// just found that the state already exist! This means that the sidechain block
// refers to a state which already exists in our canon chain.
//
// If left unchecked, we would now proceed importing the blocks, without actually
// having verified the state of the previous blocks.
log.Warn("Sidechain ghost-state attack detected", "number", block.NumberU64(), "sideroot", block.Root(), "canonroot", canonical.Root())
// If someone legitimately side-mines blocks, they would still be imported as usual. However,
// we cannot risk writing unverified blocks to disk when they obviously target the pruning
// mechanism.
return it.index, nil, nil, errors.New("sidechain ghost-state attack")
}
}
if externTd == nil {
externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1)
}
externTd = new(big.Int).Add(externTd, block.Difficulty())
if !bc.HasBlock(block.Hash(), block.NumberU64()) {
start := time.Now()
if err := bc.WriteBlockWithoutState(block, externTd); err != nil {
return it.index, nil, nil, err
}
log.Debug("Inserted sidechain block", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root())
}
}
// At this point, we've written all sidechain blocks to database. Loop ended
// either on some other error or all were processed. If there was some other
// error, we can ignore the rest of those blocks.
//
// If the externTd was larger than our local TD, we now need to reimport the previous
// blocks to regenerate the required state
localTd := bc.GetTd(bc.CurrentBlock().Hash(), current)
if localTd.Cmp(externTd) > 0 {
log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().NumberU64(), "sidetd", externTd, "localtd", localTd)
return it.index, nil, nil, err
}
// Gather all the sidechain hashes (full blocks may be memory heavy)
var ( var (
end = chain[index] hashes []common.Hash
txs = countTransactions(chain[st.lastIndex : index+1]) numbers []uint64
) )
context := []interface{}{ parent := bc.GetHeader(it.previous().Hash(), it.previous().NumberU64())
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, for parent != nil && !bc.HasState(parent.Root) {
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), hashes = append(hashes, parent.Hash())
"number", end.Number(), "hash", end.Hash(), numbers = append(numbers, parent.Number.Uint64())
}
if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
context = append(context, []interface{}{"cache", cache}...)
if st.queued > 0 { parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
context = append(context, []interface{}{"queued", st.queued}...)
} }
if st.ignored > 0 { if parent == nil {
context = append(context, []interface{}{"ignored", st.ignored}...) return it.index, nil, nil, errors.New("missing parent")
} }
log.Info("Imported new chain segment", context...) // Import all the pruned blocks to make the state available
var (
blocks []*types.Block
memory common.StorageSize
)
for i := len(hashes) - 1; i >= 0; i-- {
// Append the next block to our batch
block := bc.GetBlock(hashes[i], numbers[i])
*st = insertStats{startTime: now, lastIndex: index + 1} blocks = append(blocks, block)
} memory += block.Size()
}
func countTransactions(chain []*types.Block) (c int) { // If memory use grew too large, import and continue. Sadly we need to discard
for _, b := range chain { // all raised events and logs from notifications since we're too heavy on the
c += len(b.Transactions()) // memory here.
if len(blocks) >= 2048 || memory > 64*1024*1024 {
log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
if _, _, _, err := bc.insertChain(blocks, false); err != nil {
return 0, nil, nil, err
} }
return c blocks, memory = blocks[:0], 0
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
return 0, nil, nil, nil
}
}
}
if len(blocks) > 0 {
log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
return bc.insertChain(blocks, false)
}
return 0, nil, nil, nil
} }
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them // reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them

143
core/blockchain_insert.go Normal file
View File

@ -0,0 +1,143 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// insertStats tracks and reports on block insertion.
type insertStats struct {
queued, processed, ignored int
usedGas uint64
lastIndex int
startTime mclock.AbsTime
}
// statsReportLimit is the time limit during import and export after which we
// always print out progress. This avoids the user wondering what's going on.
const statsReportLimit = 8 * time.Second
// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) {
// Fetch the timings for the batch
var (
now = mclock.Now()
elapsed = time.Duration(now) - time.Duration(st.startTime)
)
// If we're at the last block of the batch or report period reached, log
if index == len(chain)-1 || elapsed >= statsReportLimit {
// Count the number of transactions in this segment
var txs int
for _, block := range chain[st.lastIndex : index+1] {
txs += len(block.Transactions())
}
end := chain[index]
// Assemble the log context and send it to the logger
context := []interface{}{
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
"number", end.Number(), "hash", end.Hash(),
}
if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
context = append(context, []interface{}{"cache", cache}...)
if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
}
if st.ignored > 0 {
context = append(context, []interface{}{"ignored", st.ignored}...)
}
log.Info("Imported new chain segment", context...)
// Bump the stats reported to the next section
*st = insertStats{startTime: now, lastIndex: index + 1}
}
}
// insertIterator is a helper to assist during chain import.
type insertIterator struct {
chain types.Blocks
results <-chan error
index int
validator Validator
}
// newInsertIterator creates a new iterator based on the given blocks, which are
// assumed to be a contiguous chain.
func newInsertIterator(chain types.Blocks, results <-chan error, validator Validator) *insertIterator {
return &insertIterator{
chain: chain,
results: results,
index: -1,
validator: validator,
}
}
// next returns the next block in the iterator, along with any potential validation
// error for that block. When the end is reached, it will return (nil, nil).
func (it *insertIterator) next() (*types.Block, error) {
if it.index+1 >= len(it.chain) {
it.index = len(it.chain)
return nil, nil
}
it.index++
if err := <-it.results; err != nil {
return it.chain[it.index], err
}
return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index])
}
// current returns the current block that's being processed.
func (it *insertIterator) current() *types.Block {
if it.index < 0 || it.index+1 >= len(it.chain) {
return nil
}
return it.chain[it.index]
}
// previous returns the previous block was being processed, or nil
func (it *insertIterator) previous() *types.Block {
if it.index < 1 {
return nil
}
return it.chain[it.index-1]
}
// first returns the first block in the it.
func (it *insertIterator) first() *types.Block {
return it.chain[0]
}
// remaining returns the number of remaining blocks.
func (it *insertIterator) remaining() int {
return len(it.chain) - it.index
}
// processed returns the number of processed blocks.
func (it *insertIterator) processed() int {
return it.index + 1
}

View File

@ -579,11 +579,11 @@ func testInsertNonceError(t *testing.T, full bool) {
blockchain.hc.engine = blockchain.engine blockchain.hc.engine = blockchain.engine
failRes, err = blockchain.InsertHeaderChain(headers, 1) failRes, err = blockchain.InsertHeaderChain(headers, 1)
} }
// Check that the returned error indicates the failure. // Check that the returned error indicates the failure
if failRes != failAt { if failRes != failAt {
t.Errorf("test %d: failure index mismatch: have %d, want %d", i, failRes, failAt) t.Errorf("test %d: failure (%v) index mismatch: have %d, want %d", i, err, failRes, failAt)
} }
// Check that all no blocks after the failing block have been inserted. // Check that all blocks after the failing block have been inserted
for j := 0; j < i-failAt; j++ { for j := 0; j < i-failAt; j++ {
if full { if full {
if block := blockchain.GetBlockByNumber(failNum + uint64(j)); block != nil { if block := blockchain.GetBlockByNumber(failNum + uint64(j)); block != nil {
@ -1345,7 +1345,7 @@ func TestLargeReorgTrieGC(t *testing.T) {
t.Fatalf("failed to insert shared chain: %v", err) t.Fatalf("failed to insert shared chain: %v", err)
} }
if _, err := chain.InsertChain(original); err != nil { if _, err := chain.InsertChain(original); err != nil {
t.Fatalf("failed to insert shared chain: %v", err) t.Fatalf("failed to insert original chain: %v", err)
} }
// Ensure that the state associated with the forking point is pruned away // Ensure that the state associated with the forking point is pruned away
if node, _ := chain.stateCache.TrieDB().Node(shared[len(shared)-1].Root()); node != nil { if node, _ := chain.stateCache.TrieDB().Node(shared[len(shared)-1].Root()); node != nil {

View File

@ -287,7 +287,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil { if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err return err
} }
n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace) n.log.Debug("InProc registered", "namespace", api.Namespace)
} }
n.inprocHandler = handler n.inprocHandler = handler
return nil return nil

View File

@ -434,7 +434,7 @@ func (tab *Table) loadSeedNodes() {
for i := range seeds { for i := range seeds {
seed := seeds[i] seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }} age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }}
log.Debug("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.add(seed) tab.add(seed)
} }
} }