forked from cerc-io/plugeth
eth/downloader: terminate beacon sync early when linked to local chain (#24550)
* eth/downloader: terminate beacon sync early when linked to local chain * eth/downloader: fix backfiller resume on early beacon termination
This commit is contained in:
parent
afe9558bba
commit
51de2bc9dc
@ -175,7 +175,7 @@ func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) e
|
||||
// sync and on the correct chain, checking the top N links should already get us
|
||||
// a match. In the rare scenario when we ended up on a long reorganisation (i.e.
|
||||
// none of the head links match), we do a binary search to find the ancestor.
|
||||
func (d *Downloader) findBeaconAncestor() uint64 {
|
||||
func (d *Downloader) findBeaconAncestor() (uint64, error) {
|
||||
// Figure out the current local head position
|
||||
var chainHead *types.Header
|
||||
|
||||
@ -189,17 +189,36 @@ func (d *Downloader) findBeaconAncestor() uint64 {
|
||||
}
|
||||
number := chainHead.Number.Uint64()
|
||||
|
||||
// If the head is present in the skeleton chain, return that
|
||||
if chainHead.Hash() == d.skeleton.Header(number).Hash() {
|
||||
return number
|
||||
}
|
||||
// Head header not present, binary search to find the ancestor
|
||||
start, end := uint64(0), number
|
||||
|
||||
beaconHead, err := d.skeleton.Head()
|
||||
// Retrieve the skeleton bounds and ensure they are linked to the local chain
|
||||
beaconHead, beaconTail, err := d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("failed to read skeleton head: %v", err)) // can't reach this method without a head
|
||||
// This is a programming error. The chain backfiller was called with an
|
||||
// invalid beacon sync state. Ideally we would panic here, but erroring
|
||||
// gives us at least a remote chance to recover. It's still a big fault!
|
||||
log.Error("Failed to retrieve beacon bounds", "err", err)
|
||||
return 0, err
|
||||
}
|
||||
var linked bool
|
||||
switch d.getMode() {
|
||||
case FullSync:
|
||||
linked = d.blockchain.HasBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
|
||||
case SnapSync:
|
||||
linked = d.blockchain.HasFastBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
|
||||
default:
|
||||
linked = d.blockchain.HasHeader(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
|
||||
}
|
||||
if !linked {
|
||||
// This is a programming error. The chain backfiller was called with a
|
||||
// tail that's not linked to the local chain. Whilst this should never
|
||||
// happen, there might be some weirdnesses if beacon sync backfilling
|
||||
// races with the user (or beacon client) calling setHead. Whilst panic
|
||||
// would be the ideal thing to do, it is safer long term to attempt a
|
||||
// recovery and fix any noticed issue after the fact.
|
||||
log.Error("Beacon sync linkup unavailable", "number", beaconTail.Number.Uint64()-1, "hash", beaconTail.ParentHash)
|
||||
return 0, fmt.Errorf("beacon linkup unavailable locally: %d [%x]", beaconTail.Number.Uint64()-1, beaconTail.ParentHash)
|
||||
}
|
||||
// Binary search to find the ancestor
|
||||
start, end := beaconTail.Number.Uint64()-1, number
|
||||
if number := beaconHead.Number.Uint64(); end > number {
|
||||
// This shouldn't really happen in a healty network, but if the consensus
|
||||
// clients feeds us a shorter chain as the canonical, we should not attempt
|
||||
@ -229,13 +248,13 @@ func (d *Downloader) findBeaconAncestor() uint64 {
|
||||
}
|
||||
start = check
|
||||
}
|
||||
return start
|
||||
return start, nil
|
||||
}
|
||||
|
||||
// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling
|
||||
// until sync errors or is finished.
|
||||
func (d *Downloader) fetchBeaconHeaders(from uint64) error {
|
||||
head, err := d.skeleton.Head()
|
||||
head, _, err := d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -281,7 +300,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
|
||||
case <-d.cancelCh:
|
||||
return errCanceled
|
||||
}
|
||||
head, err = d.skeleton.Head()
|
||||
head, _, err = d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -472,7 +472,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
|
||||
}
|
||||
} else {
|
||||
// In beacon mode, user the skeleton chain to retrieve the headers from
|
||||
latest, err = d.skeleton.Head()
|
||||
latest, _, err = d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -498,7 +498,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
|
||||
}
|
||||
} else {
|
||||
// In beacon mode, use the skeleton chain for the ancestor lookup
|
||||
origin = d.findBeaconAncestor()
|
||||
origin, err = d.findBeaconAncestor()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
d.syncStatsLock.Lock()
|
||||
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
|
||||
|
@ -352,7 +352,10 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
|
||||
|
||||
// If the sync is already done, resume the backfiller. When the loop stops,
|
||||
// terminate the backfiller too.
|
||||
if s.scratchHead == 0 {
|
||||
linked := len(s.progress.Subchains) == 1 &&
|
||||
rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
|
||||
rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead)
|
||||
if linked {
|
||||
s.filler.resume()
|
||||
}
|
||||
defer s.filler.suspend()
|
||||
@ -391,8 +394,9 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
|
||||
}
|
||||
for {
|
||||
// Something happened, try to assign new tasks to any idle peers
|
||||
if !linked {
|
||||
s.assignTasks(responses, requestFails, cancel)
|
||||
|
||||
}
|
||||
// Wait for something to happen
|
||||
select {
|
||||
case event := <-peering:
|
||||
@ -434,7 +438,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
|
||||
// New head was integrated into the skeleton chain. If the backfiller
|
||||
// is still running, it will pick it up. If it already terminated,
|
||||
// a new cycle needs to be spun up.
|
||||
if s.scratchHead == 0 {
|
||||
if linked {
|
||||
s.filler.resume()
|
||||
}
|
||||
|
||||
@ -443,23 +447,20 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
|
||||
|
||||
case res := <-responses:
|
||||
// Process the batch of headers. If though processing we managed to
|
||||
// link the curret subchain to a previously downloaded one, abort the
|
||||
// sync and restart with the merged subchains. We could probably hack
|
||||
// the internal state to switch the scratch space over to the tail of
|
||||
// the extended subchain, but since the scenario is rare, it's cleaner
|
||||
// to rely on the restart mechanism than a stateful modification.
|
||||
if merged := s.processResponse(res); merged {
|
||||
// link the current subchain to a previously downloaded one, abort the
|
||||
// sync and restart with the merged subchains.
|
||||
//
|
||||
// If we managed to link to the existing local chain or genesis block,
|
||||
// abort sync altogether.
|
||||
linked, merged := s.processResponse(res)
|
||||
if linked {
|
||||
log.Debug("Beacon sync linked to local chain")
|
||||
return nil, errSyncLinked
|
||||
}
|
||||
if merged {
|
||||
log.Debug("Beacon sync merged subchains")
|
||||
return nil, errSyncMerged
|
||||
}
|
||||
// If we've just reached the genesis block, tear down the sync cycle
|
||||
// and restart it to resume the backfiller. We could just as well do
|
||||
// a signalling here, but it's a tad cleaner to have only one entry
|
||||
// pathway to suspending/resuming it.
|
||||
if len(s.progress.Subchains) == 1 && s.progress.Subchains[0].Tail == 1 {
|
||||
log.Debug("Beacon sync linked to genesis")
|
||||
return nil, errSyncLinked
|
||||
}
|
||||
// We still have work to do, loop and repeat
|
||||
}
|
||||
}
|
||||
@ -852,7 +853,7 @@ func (s *skeleton) revertRequest(req *headerRequest) {
|
||||
s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = ""
|
||||
}
|
||||
|
||||
func (s *skeleton) processResponse(res *headerResponse) bool {
|
||||
func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged bool) {
|
||||
res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers))
|
||||
|
||||
// Whether the response is valid, we can mark the peer as idle and notify
|
||||
@ -866,7 +867,7 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
|
||||
// gets fulfilled successfully. It should not be possible to deliver a
|
||||
// response to a non-existing request.
|
||||
res.peer.log.Error("Unexpected header packet")
|
||||
return false
|
||||
return false, false
|
||||
}
|
||||
delete(s.requests, res.reqid)
|
||||
|
||||
@ -877,11 +878,9 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
|
||||
|
||||
// If there's still a gap in the head of the scratch space, abort
|
||||
if s.scratchSpace[0] == nil {
|
||||
return false
|
||||
return false, false
|
||||
}
|
||||
// Try to consume any head headers, validating the boundary conditions
|
||||
var merged bool // Whether subchains were merged
|
||||
|
||||
batch := s.db.NewBatch()
|
||||
for s.scratchSpace[0] != nil {
|
||||
// Next batch of headers available, cross-reference with the subchain
|
||||
@ -916,15 +915,44 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
|
||||
|
||||
s.progress.Subchains[0].Tail--
|
||||
s.progress.Subchains[0].Next = header.ParentHash
|
||||
|
||||
// If we've reached an existing block in the chain, stop retrieving
|
||||
// headers. Note, if we want to support light clients with the same
|
||||
// code we'd need to switch here based on the downloader mode. That
|
||||
// said, there's no such functionality for now, so don't complicate.
|
||||
//
|
||||
// In the case of full sync it would be enough to check for the body,
|
||||
// but even a full syncing node will generate a receipt once block
|
||||
// processing is done, so it's just one more "needless" check.
|
||||
var (
|
||||
hasBody = rawdb.HasBody(s.db, header.ParentHash, header.Number.Uint64()-1)
|
||||
hasReceipt = rawdb.HasReceipts(s.db, header.ParentHash, header.Number.Uint64()-1)
|
||||
)
|
||||
if hasBody && hasReceipt {
|
||||
linked = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// Batch of headers consumed, shift the download window forward
|
||||
head := s.progress.Subchains[0].Head
|
||||
tail := s.progress.Subchains[0].Tail
|
||||
next := s.progress.Subchains[0].Next
|
||||
|
||||
log.Trace("Primary subchain extended", "head", head, "tail", tail, "next", next)
|
||||
|
||||
// If the beacon chain was linked to the local chain, completely swap out
|
||||
// all internal progress and abort header synchronization.
|
||||
if linked {
|
||||
// Note, linking into the local chain should also mean that there are
|
||||
// no leftover subchains, but just in case there's some junk due to
|
||||
// strange conditions or bugs, clean up all internal state.
|
||||
if len(s.progress.Subchains) > 1 {
|
||||
log.Error("Cleaning up leftovers after beacon link")
|
||||
s.progress.Subchains = s.progress.Subchains[:1]
|
||||
}
|
||||
break
|
||||
}
|
||||
// Batch of headers consumed, shift the download window forward
|
||||
copy(s.scratchSpace, s.scratchSpace[requestHeaders:])
|
||||
for i := 0; i < requestHeaders; i++ {
|
||||
s.scratchSpace[scratchHeaders-i-1] = nil
|
||||
@ -979,6 +1007,9 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
|
||||
}
|
||||
// Print a progress report making the UX a bit nicer
|
||||
left := s.progress.Subchains[0].Tail - 1
|
||||
if linked {
|
||||
left = 0
|
||||
}
|
||||
if time.Since(s.logged) > 8*time.Second || left == 0 {
|
||||
s.logged = time.Now()
|
||||
|
||||
@ -989,11 +1020,11 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
|
||||
log.Info("Syncing beacon headers", "downloaded", s.pulled, "left", left, "eta", common.PrettyDuration(eta))
|
||||
}
|
||||
}
|
||||
return merged
|
||||
return linked, merged
|
||||
}
|
||||
|
||||
// Head retrieves the current head tracked by the skeleton syncer. This method
|
||||
// is meant to be used by the backfiller, whose life cycle is controlled by the
|
||||
// Bounds retrieves the current head and tail tracked by the skeleton syncer.
|
||||
// This method is used by the backfiller, whose life cycle is controlled by the
|
||||
// skeleton syncer.
|
||||
//
|
||||
// Note, the method will not use the internal state of the skeleton, but will
|
||||
@ -1002,23 +1033,23 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
|
||||
// There might be new heads appended, but those are atomic from the perspective
|
||||
// of this method. Any head reorg will first tear down the backfiller and only
|
||||
// then make the modification.
|
||||
func (s *skeleton) Head() (*types.Header, error) {
|
||||
func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, err error) {
|
||||
// Read the current sync progress from disk and figure out the current head.
|
||||
// Although there's a lot of error handling here, these are mostly as sanity
|
||||
// checks to avoid crashing if a programming error happens. These should not
|
||||
// happen in live code.
|
||||
status := rawdb.ReadSkeletonSyncStatus(s.db)
|
||||
if len(status) == 0 {
|
||||
return nil, errors.New("beacon sync not yet started")
|
||||
return nil, nil, errors.New("beacon sync not yet started")
|
||||
}
|
||||
progress := new(skeletonProgress)
|
||||
if err := json.Unmarshal(status, progress); err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if progress.Subchains[0].Tail != 1 {
|
||||
return nil, errors.New("beacon sync not yet finished")
|
||||
}
|
||||
return rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head), nil
|
||||
head = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head)
|
||||
tail = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Tail)
|
||||
|
||||
return head, tail, nil
|
||||
}
|
||||
|
||||
// Header retrieves a specific header tracked by the skeleton syncer. This method
|
||||
|
Loading…
Reference in New Issue
Block a user