forked from cerc-io/plugeth
core: safe indexer operation when syncing starts before the checkpoint (#17511)
This commit is contained in:
parent
b69476b372
commit
63352bf424
@ -85,6 +85,9 @@ type ChainIndexer struct {
|
|||||||
knownSections uint64 // Number of sections known to be complete (block wise)
|
knownSections uint64 // Number of sections known to be complete (block wise)
|
||||||
cascadedHead uint64 // Block number of the last completed section cascaded to subindexers
|
cascadedHead uint64 // Block number of the last completed section cascaded to subindexers
|
||||||
|
|
||||||
|
checkpointSections uint64 // Number of sections covered by the checkpoint
|
||||||
|
checkpointHead common.Hash // Section head belonging to the checkpoint
|
||||||
|
|
||||||
throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources
|
throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources
|
||||||
|
|
||||||
log log.Logger
|
log log.Logger
|
||||||
@ -115,12 +118,19 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddKnownSectionHead marks a new section head as known/processed if it is newer
|
// AddCheckpoint adds a checkpoint. Sections are never processed and the chain
|
||||||
// than the already known best section head
|
// is not expected to be available before this point. The indexer assumes that
|
||||||
func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
|
// the backend has sufficient information available to process subsequent sections.
|
||||||
|
//
|
||||||
|
// Note: knownSections == 0 and storedSections == checkpointSections until
|
||||||
|
// syncing reaches the checkpoint
|
||||||
|
func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) {
|
||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
defer c.lock.Unlock()
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
|
c.checkpointSections = section + 1
|
||||||
|
c.checkpointHead = shead
|
||||||
|
|
||||||
if section < c.storedSections {
|
if section < c.storedSections {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -233,16 +243,23 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
|
|||||||
// If a reorg happened, invalidate all sections until that point
|
// If a reorg happened, invalidate all sections until that point
|
||||||
if reorg {
|
if reorg {
|
||||||
// Revert the known section number to the reorg point
|
// Revert the known section number to the reorg point
|
||||||
changed := head / c.sectionSize
|
known := head / c.sectionSize
|
||||||
if changed < c.knownSections {
|
stored := known
|
||||||
c.knownSections = changed
|
if known < c.checkpointSections {
|
||||||
|
known = 0
|
||||||
|
}
|
||||||
|
if stored < c.checkpointSections {
|
||||||
|
stored = c.checkpointSections
|
||||||
|
}
|
||||||
|
if known < c.knownSections {
|
||||||
|
c.knownSections = known
|
||||||
}
|
}
|
||||||
// Revert the stored sections from the database to the reorg point
|
// Revert the stored sections from the database to the reorg point
|
||||||
if changed < c.storedSections {
|
if stored < c.storedSections {
|
||||||
c.setValidSections(changed)
|
c.setValidSections(stored)
|
||||||
}
|
}
|
||||||
// Update the new head number to the finalized section end and notify children
|
// Update the new head number to the finalized section end and notify children
|
||||||
head = changed * c.sectionSize
|
head = known * c.sectionSize
|
||||||
|
|
||||||
if head < c.cascadedHead {
|
if head < c.cascadedHead {
|
||||||
c.cascadedHead = head
|
c.cascadedHead = head
|
||||||
@ -256,7 +273,18 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
|
|||||||
var sections uint64
|
var sections uint64
|
||||||
if head >= c.confirmsReq {
|
if head >= c.confirmsReq {
|
||||||
sections = (head + 1 - c.confirmsReq) / c.sectionSize
|
sections = (head + 1 - c.confirmsReq) / c.sectionSize
|
||||||
|
if sections < c.checkpointSections {
|
||||||
|
sections = 0
|
||||||
|
}
|
||||||
if sections > c.knownSections {
|
if sections > c.knownSections {
|
||||||
|
if c.knownSections < c.checkpointSections {
|
||||||
|
// syncing reached the checkpoint, verify section head
|
||||||
|
syncedHead := rawdb.ReadCanonicalHash(c.chainDb, c.checkpointSections*c.sectionSize-1)
|
||||||
|
if syncedHead != c.checkpointHead {
|
||||||
|
c.log.Error("Synced chain does not match checkpoint", "number", c.checkpointSections*c.sectionSize-1, "expected", c.checkpointHead, "synced", syncedHead)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
c.knownSections = sections
|
c.knownSections = sections
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -401,8 +429,14 @@ func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) {
|
|||||||
c.children = append(c.children, indexer)
|
c.children = append(c.children, indexer)
|
||||||
|
|
||||||
// Cascade any pending updates to new children too
|
// Cascade any pending updates to new children too
|
||||||
if c.storedSections > 0 {
|
sections := c.storedSections
|
||||||
indexer.newHead(c.storedSections*c.sectionSize-1, false)
|
if c.knownSections < sections {
|
||||||
|
// if a section is "stored" but not "known" then it is a checkpoint without
|
||||||
|
// available chain data so we should not cascade it yet
|
||||||
|
sections = c.knownSections
|
||||||
|
}
|
||||||
|
if sections > 0 {
|
||||||
|
indexer.newHead(sections*c.sectionSize-1, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,14 +121,14 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.
|
|||||||
func (self *LightChain) addTrustedCheckpoint(cp TrustedCheckpoint) {
|
func (self *LightChain) addTrustedCheckpoint(cp TrustedCheckpoint) {
|
||||||
if self.odr.ChtIndexer() != nil {
|
if self.odr.ChtIndexer() != nil {
|
||||||
StoreChtRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.CHTRoot)
|
StoreChtRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.CHTRoot)
|
||||||
self.odr.ChtIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
|
self.odr.ChtIndexer().AddCheckpoint(cp.SectionIdx, cp.SectionHead)
|
||||||
}
|
}
|
||||||
if self.odr.BloomTrieIndexer() != nil {
|
if self.odr.BloomTrieIndexer() != nil {
|
||||||
StoreBloomTrieRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.BloomRoot)
|
StoreBloomTrieRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.BloomRoot)
|
||||||
self.odr.BloomTrieIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
|
self.odr.BloomTrieIndexer().AddCheckpoint(cp.SectionIdx, cp.SectionHead)
|
||||||
}
|
}
|
||||||
if self.odr.BloomIndexer() != nil {
|
if self.odr.BloomIndexer() != nil {
|
||||||
self.odr.BloomIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
|
self.odr.BloomIndexer().AddCheckpoint(cp.SectionIdx, cp.SectionHead)
|
||||||
}
|
}
|
||||||
log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*self.indexerConfig.ChtSize-1, "hash", cp.SectionHead)
|
log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*self.indexerConfig.ChtSize-1, "hash", cp.SectionHead)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user