core: define and test chain rewind corner cases (#21409)

* core: define and test chain reparation cornercases

* core: write up a variety of set-head tests

* core, eth: unify chain rollbacks, handle all the cases

* core: make linter smile

* core: remove commented out legacy code

* core, eth/downloader: fix review comments

* core: revert a removed recovery mechanism
This commit is contained in:
Péter Szilágyi 2020-08-20 13:01:24 +03:00 committed by GitHub
parent 0bdd295cc0
commit 8cbdc8638f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 3953 additions and 215 deletions

View File

@ -130,6 +130,16 @@ type CacheConfig struct {
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
} }
// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
}
// BlockChain represents the canonical chain given a database with a genesis // BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations. // block. The Blockchain manages chain imports, reverts, chain reorganisations.
// //
@ -204,13 +214,7 @@ type BlockChain struct {
// Processor. // Processor.
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64) (*BlockChain, error) { func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64) (*BlockChain, error) {
if cacheConfig == nil { if cacheConfig == nil {
cacheConfig = &CacheConfig{ cacheConfig = defaultCacheConfig
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
}
} }
bodyCache, _ := lru.New(bodyCacheLimit) bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit)
@ -268,15 +272,18 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
txIndexBlock = frozen txIndexBlock = frozen
} }
} }
if err := bc.loadLastState(); err != nil { if err := bc.loadLastState(); err != nil {
return nil, err return nil, err
} }
// The first thing the node will do is reconstruct the verification data for // Make sure the state associated with the block is available
// the head block (ethash cache or clique voting snapshot). Might as well do head := bc.CurrentBlock()
// it in advance. if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true) log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if err := bc.SetHead(head.NumberU64()); err != nil {
return nil, err
}
}
// Ensure that a previous crash in SetHead doesn't leave extra ancients
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
var ( var (
needRewind bool needRewind bool
@ -286,7 +293,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// blockchain repair. If the head full block is even lower than the ancient // blockchain repair. If the head full block is even lower than the ancient
// chain, truncate the ancient store. // chain, truncate the ancient store.
fullBlock := bc.CurrentBlock() fullBlock := bc.CurrentBlock()
if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 { if fullBlock != nil && fullBlock.Hash() != bc.genesisBlock.Hash() && fullBlock.NumberU64() < frozen-1 {
needRewind = true needRewind = true
low = fullBlock.NumberU64() low = fullBlock.NumberU64()
} }
@ -301,15 +308,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
} }
} }
if needRewind { if needRewind {
var hashes []common.Hash log.Error("Truncating ancient chain", "from", bc.CurrentHeader().Number.Uint64(), "to", low)
previous := bc.CurrentHeader().Number.Uint64() if err := bc.SetHead(low); err != nil {
for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ { return nil, err
hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i))
} }
bc.Rollback(hashes)
log.Warn("Truncate ancient chain", "from", previous, "to", low)
} }
} }
// The first thing the node will do is reconstruct the verification data for
// the head block (ethash cache or clique voting snapshot). Might as well do
// it in advance.
bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for hash := range BadHashes { for hash := range BadHashes {
if header := bc.GetHeaderByHash(hash); header != nil { if header := bc.GetHeaderByHash(hash); header != nil {
@ -318,7 +327,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// make sure the headerByNumber (if present) is in our current canonical chain // make sure the headerByNumber (if present) is in our current canonical chain
if headerByNumber != nil && headerByNumber.Hash() == header.Hash() { if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
bc.SetHead(header.Number.Uint64() - 1) if err := bc.SetHead(header.Number.Uint64() - 1); err != nil {
return nil, err
}
log.Error("Chain rewind was successful, resuming normal operation") log.Error("Chain rewind was successful, resuming normal operation")
} }
} }
@ -385,15 +396,6 @@ func (bc *BlockChain) loadLastState() error {
log.Warn("Head block missing, resetting chain", "hash", head) log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset() return bc.Reset()
} }
// Make sure the state associated with the block is available
if _, err := state.New(currentBlock.Root(), bc.stateCache, bc.snaps); err != nil {
// Dangling block without a state associated, init from scratch
log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
if err := bc.repair(&currentBlock); err != nil {
return err
}
rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
}
// Everything seems to be fine, set as the head block // Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock) bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64())) headBlockGauge.Update(int64(currentBlock.NumberU64()))
@ -427,30 +429,48 @@ func (bc *BlockChain) loadLastState() error {
log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0))) log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0)))
log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0))) log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0)))
log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0))) log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0)))
if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil {
log.Info("Loaded last fast-sync pivot marker", "number", *pivot)
}
return nil return nil
} }
// SetHead rewinds the local chain to a new head. In the case of headers, everything // SetHead rewinds the local chain to a new head. Depending on whether the node
// above the new head will be deleted and the new one set. In the case of blocks // was fast synced or full synced and in which state, the method will try to
// though, the head may be further rewound if block bodies are missing (non-archive // delete minimal data from disk whilst retaining chain consistency.
// nodes after a fast sync).
func (bc *BlockChain) SetHead(head uint64) error { func (bc *BlockChain) SetHead(head uint64) error {
log.Warn("Rewinding blockchain", "target", head)
bc.chainmu.Lock() bc.chainmu.Lock()
defer bc.chainmu.Unlock() defer bc.chainmu.Unlock()
updateFn := func(db ethdb.KeyValueWriter, header *types.Header) { // Retrieve the last pivot block to short circuit rollbacks beyond it and the
// Rewind the block chain, ensuring we don't end up with a stateless head block // current freezer limit to start nuking id underflown
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() { pivot := rawdb.ReadLastPivotNumber(bc.db)
frozen, _ := bc.db.Ancients()
updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (uint64, bool) {
// Rewind the block chain, ensuring we don't end up with a stateless head
// block. Note, depth equality is permitted to allow using SetHead as a
// chain reparation mechanism without deleting any data!
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.NumberU64() {
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
if newHeadBlock == nil { if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock newHeadBlock = bc.genesisBlock
} else { } else {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil { // Block exists, keep rewinding until we find one with state
// Rewound state missing, rolled back to before pivot, reset to genesis for {
newHeadBlock = bc.genesisBlock if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
if pivot == nil || newHeadBlock.NumberU64() > *pivot {
newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1)
continue
} else {
log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot)
newHeadBlock = bc.genesisBlock
}
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
} }
} }
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
@ -462,7 +482,6 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.currentBlock.Store(newHeadBlock) bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64())) headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
} }
// Rewind the fast block in a simpleton way to the target head // Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() { if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
@ -479,8 +498,17 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.currentFastBlock.Store(newHeadFastBlock) bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64())) headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
} }
} head := bc.CurrentBlock().NumberU64()
// If setHead underflown the freezer threshold and the block processing
// intent afterwards is full block importing, delete the chain segment
// between the stateful-block and the sethead target.
var wipe bool
if head+1 < frozen {
wipe = pivot == nil || head >= *pivot
}
return head, wipe // Only force wipe if full synced
}
// Rewind the header chain, deleting all block bodies until then // Rewind the header chain, deleting all block bodies until then
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path // Ignore the error here since light client won't hit this path
@ -488,10 +516,9 @@ func (bc *BlockChain) SetHead(head uint64) error {
if num+1 <= frozen { if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt // Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store. // and canonical hash) from ancient store.
if err := bc.db.TruncateAncients(num + 1); err != nil { if err := bc.db.TruncateAncients(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err) log.Crit("Failed to truncate ancient data", "number", num, "err", err)
} }
// Remove the hash <-> number mapping from the active store. // Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash) rawdb.DeleteHeaderNumber(db, hash)
} else { } else {
@ -503,8 +530,18 @@ func (bc *BlockChain) SetHead(head uint64) error {
} }
// Todo(rjl493456442) txlookup, bloombits, etc // Todo(rjl493456442) txlookup, bloombits, etc
} }
bc.hc.SetHead(head, updateFn, delFn) // If SetHead was only called as a chain reparation method, try to skip
// touching the header chain altogether, unless the freezer is broken
if block := bc.CurrentBlock(); block.NumberU64() == head {
if target, force := updateFn(bc.db, block.Header()); force {
bc.hc.SetHead(target, updateFn, delFn)
}
} else {
// Rewind the chain to the requested head and keep going backwards until a
// block with a state is found or fast sync pivot is passed
log.Warn("Rewinding blockchain", "target", head)
bc.hc.SetHead(head, updateFn, delFn)
}
// Clear out any stale content from the caches // Clear out any stale content from the caches
bc.bodyCache.Purge() bc.bodyCache.Purge()
bc.bodyRLPCache.Purge() bc.bodyRLPCache.Purge()
@ -627,28 +664,6 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
return nil return nil
} }
// repair tries to repair the current blockchain by rolling back the current block
// until one with associated state is found. This is needed to fix incomplete db
// writes caused either by crashes/power outages, or simply non-committed tries.
//
// This method only rolls back the current block. The current header and current
// fast block are left intact.
func (bc *BlockChain) repair(head **types.Block) error {
for {
// Abort if we've rewound to a head block that does have associated state
if _, err := state.New((*head).Root(), bc.stateCache, bc.snaps); err == nil {
log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash())
return nil
}
// Otherwise rewind one block and recheck state availability there
block := bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1)
if block == nil {
return fmt.Errorf("missing block %d [%x]", (*head).NumberU64()-1, (*head).ParentHash())
}
*head = block
}
}
// Export writes the active chain to the given writer. // Export writes the active chain to the given writer.
func (bc *BlockChain) Export(w io.Writer) error { func (bc *BlockChain) Export(w io.Writer) error {
return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64()) return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64())
@ -985,52 +1000,6 @@ const (
SideStatTy SideStatTy
) )
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (bc *BlockChain) Rollback(chain []common.Hash) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
batch := bc.db.NewBatch()
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of rollback is from high
// to low, so it's safe the update in-memory markers directly.
currentHeader := bc.hc.CurrentHeader()
if currentHeader.Hash() == hash {
newHeadHeader := bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)
rawdb.WriteHeadHeaderHash(batch, currentHeader.ParentHash)
bc.hc.SetCurrentHeader(newHeadHeader)
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
rawdb.WriteHeadFastBlockHash(batch, currentFastBlock.ParentHash())
bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
rawdb.WriteHeadBlockHash(batch, currentBlock.ParentHash())
bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
}
}
if err := batch.Write(); err != nil {
log.Crit("Failed to rollback chain markers", "err", err)
}
// Truncate ancient data which exceeds the current header.
//
// Notably, it can happen that system crashes without truncating the ancient data
// but the head indicator has been updated in the active store. Regarding this issue,
// system will self recovery by truncating the extra data during the setup phase.
if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil {
log.Crit("Truncate ancient store failed", "err", err)
}
}
// truncateAncient rewinds the blockchain to the specified header and deletes all // truncateAncient rewinds the blockchain to the specified header and deletes all
// data in the ancient store that exceeds the specified header. // data in the ancient store that exceeds the specified header.
func (bc *BlockChain) truncateAncient(head uint64) error { func (bc *BlockChain) truncateAncient(head uint64) error {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -731,12 +731,12 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
return db, func() { os.RemoveAll(dir) } return db, func() { os.RemoveAll(dir) }
} }
// Configure a subchain to roll back // Configure a subchain to roll back
remove := []common.Hash{} remove := blocks[height/2].NumberU64()
for _, block := range blocks[height/2:] {
remove = append(remove, block.Hash())
}
// Create a small assertion method to check the three heads // Create a small assertion method to check the three heads
assert := func(t *testing.T, kind string, chain *BlockChain, header uint64, fast uint64, block uint64) { assert := func(t *testing.T, kind string, chain *BlockChain, header uint64, fast uint64, block uint64) {
t.Helper()
if num := chain.CurrentBlock().NumberU64(); num != block { if num := chain.CurrentBlock().NumberU64(); num != block {
t.Errorf("%s head block mismatch: have #%v, want #%v", kind, num, block) t.Errorf("%s head block mismatch: have #%v, want #%v", kind, num, block)
} }
@ -750,14 +750,18 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
// Import the chain as an archive node and ensure all pointers are updated // Import the chain as an archive node and ensure all pointers are updated
archiveDb, delfn := makeDb() archiveDb, delfn := makeDb()
defer delfn() defer delfn()
archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
archiveCaching := *defaultCacheConfig
archiveCaching.TrieDirtyDisabled = true
archive, _ := NewBlockChain(archiveDb, &archiveCaching, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
if n, err := archive.InsertChain(blocks); err != nil { if n, err := archive.InsertChain(blocks); err != nil {
t.Fatalf("failed to process block %d: %v", n, err) t.Fatalf("failed to process block %d: %v", n, err)
} }
defer archive.Stop() defer archive.Stop()
assert(t, "archive", archive, height, height, height) assert(t, "archive", archive, height, height, height)
archive.Rollback(remove) archive.SetHead(remove - 1)
assert(t, "archive", archive, height/2, height/2, height/2) assert(t, "archive", archive, height/2, height/2, height/2)
// Import the chain as a non-archive node and ensure all pointers are updated // Import the chain as a non-archive node and ensure all pointers are updated
@ -777,7 +781,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
assert(t, "fast", fast, height, height, 0) assert(t, "fast", fast, height, height, 0)
fast.Rollback(remove) fast.SetHead(remove - 1)
assert(t, "fast", fast, height/2, height/2, 0) assert(t, "fast", fast, height/2, height/2, 0)
// Import the chain as a ancient-first node and ensure all pointers are updated // Import the chain as a ancient-first node and ensure all pointers are updated
@ -793,12 +797,12 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
assert(t, "ancient", ancient, height, height, 0) assert(t, "ancient", ancient, height, height, 0)
ancient.Rollback(remove) ancient.SetHead(remove - 1)
assert(t, "ancient", ancient, height/2, height/2, 0) assert(t, "ancient", ancient, 0, 0, 0)
if frozen, err := ancientDb.Ancients(); err != nil || frozen != height/2+1 {
t.Fatalf("failed to truncate ancient store, want %v, have %v", height/2+1, frozen)
}
if frozen, err := ancientDb.Ancients(); err != nil || frozen != 1 {
t.Fatalf("failed to truncate ancient store, want %v, have %v", 1, frozen)
}
// Import the chain as a light node and ensure all pointers are updated // Import the chain as a light node and ensure all pointers are updated
lightDb, delfn := makeDb() lightDb, delfn := makeDb()
defer delfn() defer delfn()
@ -809,7 +813,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
defer light.Stop() defer light.Stop()
assert(t, "light", light, height, 0, 0) assert(t, "light", light, height, 0, 0)
light.Rollback(remove) light.SetHead(remove - 1)
assert(t, "light", light, height/2, 0, 0) assert(t, "light", light, height/2, 0, 0)
} }
@ -1585,6 +1589,7 @@ func TestBlockchainRecovery(t *testing.T) {
t.Fatalf("failed to create temp freezer dir: %v", err) t.Fatalf("failed to create temp freezer dir: %v", err)
} }
defer os.Remove(frdir) defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "") ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
if err != nil { if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err) t.Fatalf("failed to create temp freezer db: %v", err)
@ -1602,6 +1607,7 @@ func TestBlockchainRecovery(t *testing.T) {
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
rawdb.WriteLastPivotNumber(ancientDb, blocks[len(blocks)-1].NumberU64()) // Force fast sync behavior
ancient.Stop() ancient.Stop()
// Destroy head fast block manually // Destroy head fast block manually
@ -1912,11 +1918,9 @@ func testInsertKnownChainData(t *testing.T, typ string) {
asserter(t, blocks[len(blocks)-1]) asserter(t, blocks[len(blocks)-1])
// Import a long canonical chain with some known data as prefix. // Import a long canonical chain with some known data as prefix.
var rollback []common.Hash rollback := blocks[len(blocks)/2].NumberU64()
for i := len(blocks) / 2; i < len(blocks); i++ {
rollback = append(rollback, blocks[i].Hash()) chain.SetHead(rollback - 1)
}
chain.Rollback(rollback)
if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil { if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
t.Fatalf("failed to insert chain data: %v", err) t.Fatalf("failed to insert chain data: %v", err)
} }
@ -1936,11 +1940,7 @@ func testInsertKnownChainData(t *testing.T, typ string) {
asserter(t, blocks3[len(blocks3)-1]) asserter(t, blocks3[len(blocks3)-1])
// Rollback the heavier chain and re-insert the longer chain again // Rollback the heavier chain and re-insert the longer chain again
for i := 0; i < len(blocks3); i++ { chain.SetHead(rollback - 1)
rollback = append(rollback, blocks3[i].Hash())
}
chain.Rollback(rollback)
if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil { if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
t.Fatalf("failed to insert chain data: %v", err) t.Fatalf("failed to insert chain data: %v", err)
} }

View File

@ -488,8 +488,10 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
type ( type (
// UpdateHeadBlocksCallback is a callback function that is called by SetHead // UpdateHeadBlocksCallback is a callback function that is called by SetHead
// before head header is updated. // before head header is updated. The method will return the actual block it
UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header) // updated the head to (missing state) and a flag if setHead should continue
// rewinding till that forcefully (exceeded ancient limits)
UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header) (uint64, bool)
// DeleteBlockContentCallback is a callback function that is called by SetHead // DeleteBlockContentCallback is a callback function that is called by SetHead
// before each header is deleted. // before each header is deleted.
@ -502,9 +504,10 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d
var ( var (
parentHash common.Hash parentHash common.Hash
batch = hc.chainDb.NewBatch() batch = hc.chainDb.NewBatch()
origin = true
) )
for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() { for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
hash, num := hdr.Hash(), hdr.Number.Uint64() num := hdr.Number.Uint64()
// Rewind block chain to new head. // Rewind block chain to new head.
parent := hc.GetHeader(hdr.ParentHash, num-1) parent := hc.GetHeader(hdr.ParentHash, num-1)
@ -512,16 +515,21 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d
parent = hc.genesisHeader parent = hc.genesisHeader
} }
parentHash = hdr.ParentHash parentHash = hdr.ParentHash
// Notably, since geth has the possibility for setting the head to a low // Notably, since geth has the possibility for setting the head to a low
// height which is even lower than ancient head. // height which is even lower than ancient head.
// In order to ensure that the head is always no higher than the data in // In order to ensure that the head is always no higher than the data in
// the database(ancient store or active store), we need to update head // the database (ancient store or active store), we need to update head
// first then remove the relative data from the database. // first then remove the relative data from the database.
// //
// Update head first(head fast block, head full block) before deleting the data. // Update head first(head fast block, head full block) before deleting the data.
markerBatch := hc.chainDb.NewBatch() markerBatch := hc.chainDb.NewBatch()
if updateFn != nil { if updateFn != nil {
updateFn(markerBatch, parent) newHead, force := updateFn(markerBatch, parent)
if force && newHead < head {
log.Warn("Force rewinding till ancient limit", "head", newHead)
head = newHead
}
} }
// Update head header then. // Update head header then.
rawdb.WriteHeadHeaderHash(markerBatch, parentHash) rawdb.WriteHeadHeaderHash(markerBatch, parentHash)
@ -532,14 +540,34 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d
hc.currentHeaderHash = parentHash hc.currentHeaderHash = parentHash
headHeaderGauge.Update(parent.Number.Int64()) headHeaderGauge.Update(parent.Number.Int64())
// Remove the relative data from the database. // If this is the first iteration, wipe any leftover data upwards too so
if delFn != nil { // we don't end up with dangling daps in the database
delFn(batch, hash, num) var nums []uint64
if origin {
for n := num + 1; len(rawdb.ReadAllHashes(hc.chainDb, n)) > 0; n++ {
nums = append([]uint64{n}, nums...) // suboptimal, but we don't really expect this path
}
origin = false
}
nums = append(nums, num)
// Remove the related data from the database on all sidechains
for _, num := range nums {
// Gather all the side fork hashes
hashes := rawdb.ReadAllHashes(hc.chainDb, num)
if len(hashes) == 0 {
// No hashes in the database whatsoever, probably frozen already
hashes = append(hashes, hdr.Hash())
}
for _, hash := range hashes {
if delFn != nil {
delFn(batch, hash, num)
}
rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(batch, hash, num)
}
rawdb.DeleteCanonicalHash(batch, num)
} }
// Rewind header chain to new head.
rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(batch, hash, num)
rawdb.DeleteCanonicalHash(batch, num)
} }
// Flush all accumulated deletions. // Flush all accumulated deletions.
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {

View File

@ -187,6 +187,32 @@ func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
} }
} }
// ReadLastPivotNumber retrieves the number of the last pivot block. If the node
// full synced, the last pivot will always be nil.
func ReadLastPivotNumber(db ethdb.KeyValueReader) *uint64 {
data, _ := db.Get(lastPivotKey)
if len(data) == 0 {
return nil
}
var pivot uint64
if err := rlp.DecodeBytes(data, &pivot); err != nil {
log.Error("Invalid pivot block number in database", "err", err)
return nil
}
return &pivot
}
// WriteLastPivotNumber stores the number of the last pivot block.
func WriteLastPivotNumber(db ethdb.KeyValueWriter, pivot uint64) {
enc, err := rlp.EncodeToBytes(pivot)
if err != nil {
log.Crit("Failed to encode pivot block number", "err", err)
}
if err := db.Put(lastPivotKey, enc); err != nil {
log.Crit("Failed to store pivot block number", "err", err)
}
}
// ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow // ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow
// reporting correct numbers across restarts. // reporting correct numbers across restarts.
func ReadFastTrieProgress(db ethdb.KeyValueReader) uint64 { func ReadFastTrieProgress(db ethdb.KeyValueReader) uint64 {

View File

@ -21,6 +21,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -53,6 +54,22 @@ func (frdb *freezerdb) Close() error {
return nil return nil
} }
// Freeze is a helper method used for external testing to trigger and block until
// a freeze cycle completes, without having to sleep for a minute to trigger the
// automatic background run.
func (frdb *freezerdb) Freeze(threshold uint64) {
// Set the freezer threshold to a temporary value
defer func(old uint64) {
atomic.StoreUint64(&frdb.AncientStore.(*freezer).threshold, old)
}(atomic.LoadUint64(&frdb.AncientStore.(*freezer).threshold))
atomic.StoreUint64(&frdb.AncientStore.(*freezer).threshold, threshold)
// Trigger a freeze cycle and block until it's done
trigger := make(chan struct{}, 1)
frdb.AncientStore.(*freezer).trigger <- trigger
<-trigger
}
// nofreezedb is a database wrapper that disables freezer data retrievals. // nofreezedb is a database wrapper that disables freezer data retrievals.
type nofreezedb struct { type nofreezedb struct {
ethdb.KeyValueStore ethdb.KeyValueStore

View File

@ -70,12 +70,16 @@ type freezer struct {
// WARNING: The `frozen` field is accessed atomically. On 32 bit platforms, only // WARNING: The `frozen` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned, // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG). // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
frozen uint64 // Number of blocks already frozen frozen uint64 // Number of blocks already frozen
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
tables map[string]*freezerTable // Data tables for storing everything tables map[string]*freezerTable // Data tables for storing everything
instanceLock fileutil.Releaser // File-system lock to prevent double opens instanceLock fileutil.Releaser // File-system lock to prevent double opens
quit chan struct{}
closeOnce sync.Once trigger chan chan struct{} // Manual blocking freeze trigger, test determinism
quit chan struct{}
closeOnce sync.Once
} }
// newFreezer creates a chain freezer that moves ancient chain data into // newFreezer creates a chain freezer that moves ancient chain data into
@ -102,8 +106,10 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
} }
// Open all the supported data tables // Open all the supported data tables
freezer := &freezer{ freezer := &freezer{
threshold: params.FullImmutabilityThreshold,
tables: make(map[string]*freezerTable), tables: make(map[string]*freezerTable),
instanceLock: lock, instanceLock: lock,
trigger: make(chan chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
for name, disableSnappy := range freezerNoSnappy { for name, disableSnappy := range freezerNoSnappy {
@ -261,7 +267,10 @@ func (f *freezer) Sync() error {
func (f *freezer) freeze(db ethdb.KeyValueStore) { func (f *freezer) freeze(db ethdb.KeyValueStore) {
nfdb := &nofreezedb{KeyValueStore: db} nfdb := &nofreezedb{KeyValueStore: db}
backoff := false var (
backoff bool
triggered chan struct{} // Used in tests
)
for { for {
select { select {
case <-f.quit: case <-f.quit:
@ -270,9 +279,16 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
default: default:
} }
if backoff { if backoff {
// If we were doing a manual trigger, notify it
if triggered != nil {
triggered <- struct{}{}
triggered = nil
}
select { select {
case <-time.NewTimer(freezerRecheckInterval).C: case <-time.NewTimer(freezerRecheckInterval).C:
backoff = false backoff = false
case triggered = <-f.trigger:
backoff = false
case <-f.quit: case <-f.quit:
return return
} }
@ -285,18 +301,20 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
continue continue
} }
number := ReadHeaderNumber(nfdb, hash) number := ReadHeaderNumber(nfdb, hash)
threshold := atomic.LoadUint64(&f.threshold)
switch { switch {
case number == nil: case number == nil:
log.Error("Current full block number unavailable", "hash", hash) log.Error("Current full block number unavailable", "hash", hash)
backoff = true backoff = true
continue continue
case *number < params.FullImmutabilityThreshold: case *number < threshold:
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.FullImmutabilityThreshold) log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", threshold)
backoff = true backoff = true
continue continue
case *number-params.FullImmutabilityThreshold <= f.frozen: case *number-threshold <= f.frozen:
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen) log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen)
backoff = true backoff = true
continue continue
@ -308,7 +326,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
continue continue
} }
// Seems we have data ready to be frozen, process in usable batches // Seems we have data ready to be frozen, process in usable batches
limit := *number - params.FullImmutabilityThreshold limit := *number - threshold
if limit-f.frozen > freezerBatchLimit { if limit-f.frozen > freezerBatchLimit {
limit = f.frozen + freezerBatchLimit limit = f.frozen + freezerBatchLimit
} }
@ -317,7 +335,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
first = f.frozen first = f.frozen
ancients = make([]common.Hash, 0, limit-f.frozen) ancients = make([]common.Hash, 0, limit-f.frozen)
) )
for f.frozen < limit { for f.frozen <= limit {
// Retrieves all the components of the canonical block // Retrieves all the components of the canonical block
hash := ReadCanonicalHash(nfdb, f.frozen) hash := ReadCanonicalHash(nfdb, f.frozen)
if hash == (common.Hash{}) { if hash == (common.Hash{}) {
@ -368,11 +386,15 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
log.Crit("Failed to delete frozen canonical blocks", "err", err) log.Crit("Failed to delete frozen canonical blocks", "err", err)
} }
batch.Reset() batch.Reset()
// Wipe out side chain also.
// Wipe out side chains also and track dangling side chians
var dangling []common.Hash
for number := first; number < f.frozen; number++ { for number := first; number < f.frozen; number++ {
// Always keep the genesis block in active database // Always keep the genesis block in active database
if number != 0 { if number != 0 {
for _, hash := range ReadAllHashes(db, number) { dangling = ReadAllHashes(db, number)
for _, hash := range dangling {
log.Trace("Deleting side chain", "number", number, "hash", hash)
DeleteBlock(batch, hash, number) DeleteBlock(batch, hash, number)
} }
} }
@ -380,6 +402,41 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Failed to delete frozen side blocks", "err", err) log.Crit("Failed to delete frozen side blocks", "err", err)
} }
batch.Reset()
// Step into the future and delete and dangling side chains
if f.frozen > 0 {
tip := f.frozen
for len(dangling) > 0 {
drop := make(map[common.Hash]struct{})
for _, hash := range dangling {
log.Debug("Dangling parent from freezer", "number", tip-1, "hash", hash)
drop[hash] = struct{}{}
}
children := ReadAllHashes(db, tip)
for i := 0; i < len(children); i++ {
// Dig up the child and ensure it's dangling
child := ReadHeader(nfdb, children[i], tip)
if child == nil {
log.Error("Missing dangling header", "number", tip, "hash", children[i])
continue
}
if _, ok := drop[child.ParentHash]; !ok {
children = append(children[:i], children[i+1:]...)
i--
continue
}
// Delete all block data associated with the child
log.Debug("Deleting dangling block", "number", tip, "hash", children[i], "parent", child.ParentHash)
DeleteBlock(batch, children[i], tip)
}
dangling = children
tip++
}
if err := batch.Write(); err != nil {
log.Crit("Failed to delete dangling side blocks", "err", err)
}
}
// Log something friendly for the user // Log something friendly for the user
context := []interface{}{ context := []interface{}{
"blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1, "blocks", f.frozen - first, "elapsed", common.PrettyDuration(time.Since(start)), "number", f.frozen - 1,

View File

@ -38,6 +38,9 @@ var (
// headFastBlockKey tracks the latest known incomplete block's hash during fast sync. // headFastBlockKey tracks the latest known incomplete block's hash during fast sync.
headFastBlockKey = []byte("LastFast") headFastBlockKey = []byte("LastFast")
// lastPivotKey tracks the last pivot block used by fast sync (to reenable on sethead).
lastPivotKey = []byte("LastPivot")
// fastTrieProgressKey tracks the number of trie entries imported during fast sync. // fastTrieProgressKey tracks the number of trie entries imported during fast sync.
fastTrieProgressKey = []byte("TrieSync") fastTrieProgressKey = []byte("TrieSync")

View File

@ -176,8 +176,8 @@ type LightChain interface {
// InsertHeaderChain inserts a batch of headers into the local chain. // InsertHeaderChain inserts a batch of headers into the local chain.
InsertHeaderChain([]*types.Header, int) (int, error) InsertHeaderChain([]*types.Header, int) (int, error)
// Rollback removes a few recently added elements from the local chain. // SetHead rewinds the local chain to a new head.
Rollback([]common.Hash) SetHead(uint64) error
} }
// BlockChain encapsulates functions required to sync a (full or fast) blockchain. // BlockChain encapsulates functions required to sync a (full or fast) blockchain.
@ -469,6 +469,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if pivot <= origin { if pivot <= origin {
origin = pivot - 1 origin = pivot - 1
} }
// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
} }
} }
d.committed = 1 d.committed = 1
@ -496,6 +499,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.ancientLimit = height - fullMaxForkAncestry - 1 d.ancientLimit = height - fullMaxForkAncestry - 1
} }
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
// If a part of blockchain data has already been written into active store, // If a part of blockchain data has already been written into active store,
// disable the ancient style insertion explicitly. // disable the ancient style insertion explicitly.
if origin >= frozen && frozen != 0 { if origin >= frozen && frozen != 0 {
@ -506,11 +510,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
} }
// Rewind the ancient store and blockchain if reorg happens. // Rewind the ancient store and blockchain if reorg happens.
if origin+1 < frozen { if origin+1 < frozen {
var hashes []common.Hash if err := d.lightchain.SetHead(origin + 1); err != nil {
for i := origin + 1; i < d.lightchain.CurrentHeader().Number.Uint64(); i++ { return err
hashes = append(hashes, rawdb.ReadCanonicalHash(d.stateDB, i))
} }
d.lightchain.Rollback(hashes)
} }
} }
// Initiate the sync using a concurrent header and content retrieval algorithm // Initiate the sync using a concurrent header and content retrieval algorithm
@ -1382,35 +1384,32 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back // Keep a count of uncertain headers to roll back
var ( var (
rollback []*types.Header rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
rollbackErr error rollbackErr error
mode = d.getMode() mode = d.getMode()
) )
defer func() { defer func() {
if len(rollback) > 0 { if rollback > 0 {
// Flatten the headers and roll them back
hashes := make([]common.Hash, len(rollback))
for i, header := range rollback {
hashes[i] = header.Hash()
}
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if mode != LightSync { if mode != LightSync {
lastFastBlock = d.blockchain.CurrentFastBlock().Number() lastFastBlock = d.blockchain.CurrentFastBlock().Number()
lastBlock = d.blockchain.CurrentBlock().Number() lastBlock = d.blockchain.CurrentBlock().Number()
} }
d.lightchain.Rollback(hashes) if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
// We're already unwinding the stack, only print the error to make it more visible
log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
}
curFastBlock, curBlock := common.Big0, common.Big0 curFastBlock, curBlock := common.Big0, common.Big0
if mode != LightSync { if mode != LightSync {
curFastBlock = d.blockchain.CurrentFastBlock().Number() curFastBlock = d.blockchain.CurrentFastBlock().Number()
curBlock = d.blockchain.CurrentBlock().Number() curBlock = d.blockchain.CurrentBlock().Number()
} }
log.Warn("Rolled back headers", "count", len(hashes), log.Warn("Rolled back chain segment",
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr) "block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
} }
}() }()
// Wait for batches of headers to process // Wait for batches of headers to process
gotHeaders := false gotHeaders := false
@ -1462,7 +1461,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
} }
} }
// Disable any rollback and return // Disable any rollback and return
rollback = nil rollback = 0
return nil return nil
} }
// Otherwise split the chunk of headers into batches and process them // Otherwise split the chunk of headers into batches and process them
@ -1481,15 +1480,9 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
limit = len(headers) limit = len(headers)
} }
chunk := headers[:limit] chunk := headers[:limit]
// In case of header only syncing, validate the chunk immediately // In case of header only syncing, validate the chunk immediately
if mode == FastSync || mode == LightSync { if mode == FastSync || mode == LightSync {
// Collect the yet unknown headers to mark them as uncertain
unknown := make([]*types.Header, 0, len(chunk))
for _, header := range chunk {
if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
unknown = append(unknown, header)
}
}
// If we're importing pure headers, verify based on their recentness // If we're importing pure headers, verify based on their recentness
frequency := fsHeaderCheckFrequency frequency := fsHeaderCheckFrequency
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
@ -1497,17 +1490,18 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
} }
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
rollbackErr = err rollbackErr = err
// If some headers were inserted, add them too to the rollback list
if n > 0 { // If some headers were inserted, track them as uncertain
rollback = append(rollback, chunk[:n]...) if n > 0 && rollback == 0 {
rollback = chunk[0].Number.Uint64()
} }
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err) log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err) return fmt.Errorf("%w: %v", errInvalidChain, err)
} }
// All verifications passed, store newly found uncertain headers // All verifications passed, track all headers within the alloted limits
rollback = append(rollback, unknown...) head := chunk[len(chunk)-1].Number.Uint64()
if len(rollback) > fsHeaderSafetyNet { if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) rollback = head - uint64(fsHeaderSafetyNet)
} }
} }
// Unless we're doing light chains, schedule the headers for associated content retrieval // Unless we're doing light chains, schedule the headers for associated content retrieval
@ -1613,6 +1607,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
} }
} }
go closeOnErr(sync) go closeOnErr(sync)
// Figure out the ideal pivot block. Note, that this goalpost may move if the // Figure out the ideal pivot block. Note, that this goalpost may move if the
// sync takes long enough for the chain head to move significantly. // sync takes long enough for the chain head to move significantly.
pivot := uint64(0) pivot := uint64(0)
@ -1654,6 +1649,10 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) { if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks)) log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
pivot = height - uint64(fsMinFullBlocks) pivot = height - uint64(fsMinFullBlocks)
// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
} }
} }
P, beforeP, afterP := splitAroundPivot(pivot, results) P, beforeP, afterP := splitAroundPivot(pivot, results)

View File

@ -341,25 +341,52 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ
return len(blocks), nil return len(blocks), nil
} }
// Rollback removes some recently added elements from the chain. // SetHead rewinds the local chain to a new head.
func (dl *downloadTester) Rollback(hashes []common.Hash) { func (dl *downloadTester) SetHead(head uint64) error {
dl.lock.Lock() dl.lock.Lock()
defer dl.lock.Unlock() defer dl.lock.Unlock()
for i := len(hashes) - 1; i >= 0; i-- { // Find the hash of the head to reset to
if dl.ownHashes[len(dl.ownHashes)-1] == hashes[i] { var hash common.Hash
dl.ownHashes = dl.ownHashes[:len(dl.ownHashes)-1] for h, header := range dl.ownHeaders {
if header.Number.Uint64() == head {
hash = h
} }
delete(dl.ownChainTd, hashes[i])
delete(dl.ownHeaders, hashes[i])
delete(dl.ownReceipts, hashes[i])
delete(dl.ownBlocks, hashes[i])
delete(dl.ancientChainTd, hashes[i])
delete(dl.ancientHeaders, hashes[i])
delete(dl.ancientReceipts, hashes[i])
delete(dl.ancientBlocks, hashes[i])
} }
for h, header := range dl.ancientHeaders {
if header.Number.Uint64() == head {
hash = h
}
}
if hash == (common.Hash{}) {
return fmt.Errorf("unknown head to set: %d", head)
}
// Find the offset in the header chain
var offset int
for o, h := range dl.ownHashes {
if h == hash {
offset = o
break
}
}
// Remove all the hashes and associated data afterwards
for i := offset + 1; i < len(dl.ownHashes); i++ {
delete(dl.ownChainTd, dl.ownHashes[i])
delete(dl.ownHeaders, dl.ownHashes[i])
delete(dl.ownReceipts, dl.ownHashes[i])
delete(dl.ownBlocks, dl.ownHashes[i])
delete(dl.ancientChainTd, dl.ownHashes[i])
delete(dl.ancientHeaders, dl.ownHashes[i])
delete(dl.ancientReceipts, dl.ownHashes[i])
delete(dl.ancientBlocks, dl.ownHashes[i])
}
dl.ownHashes = dl.ownHashes[:offset+1]
return nil
}
// Rollback removes some recently added elements from the chain.
func (dl *downloadTester) Rollback(hashes []common.Hash) {
} }
// newPeer registers a new block download source into the downloader. // newPeer registers a new block download source into the downloader.

View File

@ -271,15 +271,25 @@ func peerToSyncOp(mode downloader.SyncMode, p *peer) *chainSyncOp {
} }
func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
// If we're in fast sync mode, return that directly
if atomic.LoadUint32(&cs.pm.fastSync) == 1 { if atomic.LoadUint32(&cs.pm.fastSync) == 1 {
block := cs.pm.blockchain.CurrentFastBlock() block := cs.pm.blockchain.CurrentFastBlock()
td := cs.pm.blockchain.GetTdByHash(block.Hash()) td := cs.pm.blockchain.GetTdByHash(block.Hash())
return downloader.FastSync, td return downloader.FastSync, td
} else {
head := cs.pm.blockchain.CurrentHeader()
td := cs.pm.blockchain.GetTd(head.Hash(), head.Number.Uint64())
return downloader.FullSync, td
} }
// We are probably in full sync, but we might have rewound to before the
// fast sync pivot, check if we should reenable
if pivot := rawdb.ReadLastPivotNumber(cs.pm.chaindb); pivot != nil {
if head := cs.pm.blockchain.CurrentBlock(); head.NumberU64() < *pivot {
block := cs.pm.blockchain.CurrentFastBlock()
td := cs.pm.blockchain.GetTdByHash(block.Hash())
return downloader.FastSync, td
}
}
// Nope, we're really full syncing
head := cs.pm.blockchain.CurrentHeader()
td := cs.pm.blockchain.GetTd(head.Hash(), head.Number.Uint64())
return downloader.FullSync, td
} }
// startSync launches doSync in a new goroutine. // startSync launches doSync in a new goroutine.

View File

@ -99,7 +99,7 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
if _, ok := s.membatch.batch[root]; ok { if _, ok := s.membatch.batch[root]; ok {
return return
} }
if s.bloom.Contains(root[:]) { if s.bloom == nil || s.bloom.Contains(root[:]) {
// Bloom filter says this might be a duplicate, double check // Bloom filter says this might be a duplicate, double check
blob, _ := s.database.Get(root[:]) blob, _ := s.database.Get(root[:])
if local, err := decodeNode(root[:], blob); local != nil && err == nil { if local, err := decodeNode(root[:], blob); local != nil && err == nil {
@ -138,7 +138,7 @@ func (s *Sync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) {
if _, ok := s.membatch.batch[hash]; ok { if _, ok := s.membatch.batch[hash]; ok {
return return
} }
if s.bloom.Contains(hash[:]) { if s.bloom == nil || s.bloom.Contains(hash[:]) {
// Bloom filter says this might be a duplicate, double check // Bloom filter says this might be a duplicate, double check
if ok, _ := s.database.Has(hash[:]); ok { if ok, _ := s.database.Has(hash[:]); ok {
return return
@ -300,7 +300,7 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
if _, ok := s.membatch.batch[hash]; ok { if _, ok := s.membatch.batch[hash]; ok {
continue continue
} }
if s.bloom.Contains(node) { if s.bloom == nil || s.bloom.Contains(node) {
// Bloom filter says this might be a duplicate, double check // Bloom filter says this might be a duplicate, double check
if ok, _ := s.database.Has(node); ok { if ok, _ := s.database.Has(node); ok {
continue continue