From 5c67066a050e3924e1c663317fd8051bc8d34f43 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 31 Jan 2024 16:57:33 +0800 Subject: [PATCH] eth/downloader: fix skeleton cleanup (#28581) * eth/downloader: fix skeleton cleanup * eth/downloader: short circuit if nothing to delete * eth/downloader: polish the logic in cleanup * eth/downloader: address comments --- eth/downloader/beaconsync.go | 3 +- eth/downloader/downloader.go | 1 + eth/downloader/skeleton.go | 74 ++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 30 deletions(-) diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index df8af68bc..d3f75c852 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -50,7 +50,8 @@ func newBeaconBackfiller(dl *Downloader, success func()) backfiller { } // suspend cancels any background downloader threads and returns the last header -// that has been successfully backfilled. +// that has been successfully backfilled (potentially in a previous run), or the +// genesis. func (b *beaconBackfiller) suspend() *types.Header { // If no filling is running, don't waste cycles b.lock.Lock() diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f1cfa92d5..8d449246a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -611,6 +611,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * if err := d.lightchain.SetHead(origin); err != nil { return err } + log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin) } } // Initiate the sync using a concurrent header and content retrieval algorithm diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index f40ca24d9..873ee950b 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -161,7 +161,7 @@ type backfiller interface { // on initial startup. // // The method should return the last block header that has been successfully - // backfilled, or nil if the backfiller was not resumed. + // backfilled (in the current or a previous run), falling back to the genesis. suspend() *types.Header // resume requests the backfiller to start running fill or snap sync based on @@ -382,14 +382,17 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { done := make(chan struct{}) go func() { defer close(done) - if filled := s.filler.suspend(); filled != nil { - // If something was filled, try to delete stale sync helpers. If - // unsuccessful, warn the user, but not much else we can do (it's - // a programming error, just let users report an issue and don't - // choke in the meantime). - if err := s.cleanStales(filled); err != nil { - log.Error("Failed to clean stale beacon headers", "err", err) - } + filled := s.filler.suspend() + if filled == nil { + log.Error("Latest filled block is not available") + return + } + // If something was filled, try to delete stale sync helpers. If + // unsuccessful, warn the user, but not much else we can do (it's + // a programming error, just let users report an issue and don't + // choke in the meantime). + if err := s.cleanStales(filled); err != nil { + log.Error("Failed to clean stale beacon headers", "err", err) } }() // Wait for the suspend to finish, consuming head events in the meantime @@ -1120,33 +1123,46 @@ func (s *skeleton) cleanStales(filled *types.Header) error { number := filled.Number.Uint64() log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash()) - // If the filled header is below the linked subchain, something's - // corrupted internally. Report and error and refuse to do anything. - if number < s.progress.Subchains[0].Tail { + // If the filled header is below the linked subchain, something's corrupted + // internally. Report and error and refuse to do anything. + if number+1 < s.progress.Subchains[0].Tail { return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail) } - // Subchain seems trimmable, push the tail forward up to the last - // filled header and delete everything before it - if available. In - // case we filled past the head, recreate the subchain with a new - // head to keep it consistent with the data on disk. + // If nothing in subchain is filled, don't bother to do cleanup. + if number+1 == s.progress.Subchains[0].Tail { + return nil + } var ( - start = s.progress.Subchains[0].Tail // start deleting from the first known header - end = number // delete until the requested threshold + start uint64 + end uint64 batch = s.db.NewBatch() ) - s.progress.Subchains[0].Tail = number - s.progress.Subchains[0].Next = filled.ParentHash + if number < s.progress.Subchains[0].Head { + // The skeleton chain is partially consumed, set the new tail as filled+1. + tail := rawdb.ReadSkeletonHeader(s.db, number+1) + if tail.ParentHash != filled.Hash() { + return fmt.Errorf("filled header is discontinuous with subchain: %d %s, please file an issue", number, filled.Hash()) + } + start, end = s.progress.Subchains[0].Tail, number+1 // remove headers in [tail, filled] + s.progress.Subchains[0].Tail = tail.Number.Uint64() + s.progress.Subchains[0].Next = tail.ParentHash + } else { + // The skeleton chain is fully consumed, set both head and tail as filled. + start, end = s.progress.Subchains[0].Tail, filled.Number.Uint64() // remove headers in [tail, filled) + s.progress.Subchains[0].Tail = filled.Number.Uint64() + s.progress.Subchains[0].Next = filled.ParentHash - if s.progress.Subchains[0].Head < number { - // If more headers were filled than available, push the entire - // subchain forward to keep tracking the node's block imports - end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head - s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this) + // If more headers were filled than available, push the entire subchain + // forward to keep tracking the node's block imports. + if number > s.progress.Subchains[0].Head { + end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head + s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this) - // The entire original skeleton chain was deleted and a new one - // defined. Make sure the new single-header chain gets pushed to - // disk to keep internal state consistent. - rawdb.WriteSkeletonHeader(batch, filled) + // The entire original skeleton chain was deleted and a new one + // defined. Make sure the new single-header chain gets pushed to + // disk to keep internal state consistent. + rawdb.WriteSkeletonHeader(batch, filled) + } } // Execute the trimming and the potential rewiring of the progress s.saveSyncStatus(batch)