eth/downloader: fix skeleton cleanup (#28581)

* eth/downloader: fix skeleton cleanup

* eth/downloader: short circuit if nothing to delete

* eth/downloader: polish the logic in cleanup

* eth/downloader: address comments
This commit is contained in:
rjl493456442 2024-01-31 16:57:33 +08:00 committed by GitHub
parent 3adf1cecf2
commit 5c67066a05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 48 additions and 30 deletions

View File

@ -50,7 +50,8 @@ func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
} }
// suspend cancels any background downloader threads and returns the last header // suspend cancels any background downloader threads and returns the last header
// that has been successfully backfilled. // that has been successfully backfilled (potentially in a previous run), or the
// genesis.
func (b *beaconBackfiller) suspend() *types.Header { func (b *beaconBackfiller) suspend() *types.Header {
// If no filling is running, don't waste cycles // If no filling is running, don't waste cycles
b.lock.Lock() b.lock.Lock()

View File

@ -611,6 +611,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
if err := d.lightchain.SetHead(origin); err != nil { if err := d.lightchain.SetHead(origin); err != nil {
return err return err
} }
log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin)
} }
} }
// Initiate the sync using a concurrent header and content retrieval algorithm // Initiate the sync using a concurrent header and content retrieval algorithm

View File

@ -161,7 +161,7 @@ type backfiller interface {
// on initial startup. // on initial startup.
// //
// The method should return the last block header that has been successfully // The method should return the last block header that has been successfully
// backfilled, or nil if the backfiller was not resumed. // backfilled (in the current or a previous run), falling back to the genesis.
suspend() *types.Header suspend() *types.Header
// resume requests the backfiller to start running fill or snap sync based on // resume requests the backfiller to start running fill or snap sync based on
@ -382,7 +382,11 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
if filled := s.filler.suspend(); filled != nil { filled := s.filler.suspend()
if filled == nil {
log.Error("Latest filled block is not available")
return
}
// If something was filled, try to delete stale sync helpers. If // If something was filled, try to delete stale sync helpers. If
// unsuccessful, warn the user, but not much else we can do (it's // unsuccessful, warn the user, but not much else we can do (it's
// a programming error, just let users report an issue and don't // a programming error, just let users report an issue and don't
@ -390,7 +394,6 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
if err := s.cleanStales(filled); err != nil { if err := s.cleanStales(filled); err != nil {
log.Error("Failed to clean stale beacon headers", "err", err) log.Error("Failed to clean stale beacon headers", "err", err)
} }
}
}() }()
// Wait for the suspend to finish, consuming head events in the meantime // Wait for the suspend to finish, consuming head events in the meantime
// and dropping them on the floor. // and dropping them on the floor.
@ -1120,26 +1123,38 @@ func (s *skeleton) cleanStales(filled *types.Header) error {
number := filled.Number.Uint64() number := filled.Number.Uint64()
log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash()) log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())
// If the filled header is below the linked subchain, something's // If the filled header is below the linked subchain, something's corrupted
// corrupted internally. Report and error and refuse to do anything. // internally. Report and error and refuse to do anything.
if number < s.progress.Subchains[0].Tail { if number+1 < s.progress.Subchains[0].Tail {
return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail) return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail)
} }
// Subchain seems trimmable, push the tail forward up to the last // If nothing in subchain is filled, don't bother to do cleanup.
// filled header and delete everything before it - if available. In if number+1 == s.progress.Subchains[0].Tail {
// case we filled past the head, recreate the subchain with a new return nil
// head to keep it consistent with the data on disk. }
var ( var (
start = s.progress.Subchains[0].Tail // start deleting from the first known header start uint64
end = number // delete until the requested threshold end uint64
batch = s.db.NewBatch() batch = s.db.NewBatch()
) )
s.progress.Subchains[0].Tail = number if number < s.progress.Subchains[0].Head {
// The skeleton chain is partially consumed, set the new tail as filled+1.
tail := rawdb.ReadSkeletonHeader(s.db, number+1)
if tail.ParentHash != filled.Hash() {
return fmt.Errorf("filled header is discontinuous with subchain: %d %s, please file an issue", number, filled.Hash())
}
start, end = s.progress.Subchains[0].Tail, number+1 // remove headers in [tail, filled]
s.progress.Subchains[0].Tail = tail.Number.Uint64()
s.progress.Subchains[0].Next = tail.ParentHash
} else {
// The skeleton chain is fully consumed, set both head and tail as filled.
start, end = s.progress.Subchains[0].Tail, filled.Number.Uint64() // remove headers in [tail, filled)
s.progress.Subchains[0].Tail = filled.Number.Uint64()
s.progress.Subchains[0].Next = filled.ParentHash s.progress.Subchains[0].Next = filled.ParentHash
if s.progress.Subchains[0].Head < number { // If more headers were filled than available, push the entire subchain
// If more headers were filled than available, push the entire // forward to keep tracking the node's block imports.
// subchain forward to keep tracking the node's block imports if number > s.progress.Subchains[0].Head {
end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head
s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this) s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this)
@ -1148,6 +1163,7 @@ func (s *skeleton) cleanStales(filled *types.Header) error {
// disk to keep internal state consistent. // disk to keep internal state consistent.
rawdb.WriteSkeletonHeader(batch, filled) rawdb.WriteSkeletonHeader(batch, filled)
} }
}
// Execute the trimming and the potential rewiring of the progress // Execute the trimming and the potential rewiring of the progress
s.saveSyncStatus(batch) s.saveSyncStatus(batch)
for n := start; n < end; n++ { for n := start; n < end; n++ {