diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 95eed408f..2c7494c7d 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -237,6 +237,10 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl log.Warn("Forkchoice requested unknown head", "hash", update.HeadBlockHash) return engine.STATUS_SYNCING, nil } + // If the finalized hash is known, we can direct the downloader to move + // potentially more data to the freezer from the get go. + finalized := api.remoteBlocks.get(update.FinalizedBlockHash) + // Header advertised via a past newPayload request. Start syncing to it. // Before we do however, make sure any legacy sync in switched off so we // don't accidentally have 2 cycles running. @@ -244,8 +248,16 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl merger.ReachTTD() api.eth.Downloader().Cancel() } - log.Info("Forkchoice requested sync to new head", "number", header.Number, "hash", header.Hash()) - if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header); err != nil { + context := []interface{}{"number", header.Number, "hash", header.Hash()} + if update.FinalizedBlockHash != (common.Hash{}) { + if finalized == nil { + context = append(context, []interface{}{"finalized", "unknown"}...) + } else { + context = append(context, []interface{}{"finalized", finalized.Number}...) + } + } + log.Info("Forkchoice requested sync to new head", context...) + if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header, finalized); err != nil { return engine.STATUS_SYNCING, err } return engine.STATUS_SYNCING, nil diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index b7213dd59..e8037aaca 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -31,9 +31,12 @@ import ( const maxTrackedPayloads = 10 // maxTrackedHeaders is the maximum number of executed payloads the execution -// engine tracks before evicting old ones. Ideally we should only ever track the -// latest one; but have a slight wiggle room for non-ideal conditions. -const maxTrackedHeaders = 10 +// engine tracks before evicting old ones. These are tracked outside the chain +// during initial sync to allow ForkchoiceUpdate to reference past blocks via +// hashes only. For the sync target it would be enough to track only the latest +// header, but snap sync also needs the latest finalized height for the ancient +// limit. +const maxTrackedHeaders = 96 // payloadQueueItem represents an id->payload tuple to store until it's retrieved // or evicted. diff --git a/eth/catalyst/tester.go b/eth/catalyst/tester.go index 05511eaf7..c4eafd30d 100644 --- a/eth/catalyst/tester.go +++ b/eth/catalyst/tester.go @@ -75,7 +75,7 @@ func (tester *FullSyncTester) Start() error { } // Trigger beacon sync with the provided block header as // trusted chain head. - err := tester.api.eth.Downloader().BeaconSync(downloader.FullSync, tester.block.Header()) + err := tester.api.eth.Downloader().BeaconSync(downloader.FullSync, tester.block.Header(), nil) if err != nil { log.Info("Failed to beacon sync", "err", err) } diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 65d9225f8..c539474c6 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -151,8 +151,8 @@ func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) { // // Internally backfilling and state sync is done the same way, but the header // retrieval and scheduling is replaced. -func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error { - return d.beaconSync(mode, head, true) +func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types.Header) error { + return d.beaconSync(mode, head, final, true) } // BeaconExtend is an optimistic version of BeaconSync, where an attempt is made @@ -162,7 +162,7 @@ func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error { // This is useful if a beacon client is feeding us large chunks of payloads to run, // but is not setting the head after each. func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error { - return d.beaconSync(mode, head, false) + return d.beaconSync(mode, head, nil, false) } // beaconSync is the post-merge version of the chain synchronization, where the @@ -171,7 +171,7 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error { // // Internally backfilling and state sync is done the same way, but the header // retrieval and scheduling is replaced. -func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) error { +func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, final *types.Header, force bool) error { // When the downloader starts a sync cycle, it needs to be aware of the sync // mode to use (full, snap). To keep the skeleton chain oblivious, inject the // mode into the backfiller directly. @@ -181,7 +181,7 @@ func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) e d.skeleton.filler.(*beaconBackfiller).setMode(mode) // Signal the skeleton sync to switch to a new head, however it wants - if err := d.skeleton.Sync(head, force); err != nil { + if err := d.skeleton.Sync(head, final, force); err != nil { return err } return nil @@ -207,7 +207,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) { number := chainHead.Number.Uint64() // Retrieve the skeleton bounds and ensure they are linked to the local chain - beaconHead, beaconTail, err := d.skeleton.Bounds() + beaconHead, beaconTail, _, err := d.skeleton.Bounds() if err != nil { // This is a programming error. The chain backfiller was called with an // invalid beacon sync state. Ideally we would panic here, but erroring @@ -272,7 +272,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) { // until sync errors or is finished. func (d *Downloader) fetchBeaconHeaders(from uint64) error { var head *types.Header - _, tail, err := d.skeleton.Bounds() + _, tail, _, err := d.skeleton.Bounds() if err != nil { return err } @@ -292,7 +292,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { for { // Some beacon headers might have appeared since the last cycle, make // sure we're always syncing to all available ones - head, _, err = d.skeleton.Bounds() + head, _, _, err = d.skeleton.Bounds() if err != nil { return err } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index bb74efe75..ec9cce2eb 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -480,7 +480,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block - var latest, pivot *types.Header + var latest, pivot, final *types.Header if !beaconMode { // In legacy mode, use the master peer to retrieve the headers from latest, pivot, err = d.fetchHead(p) @@ -489,7 +489,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } } else { // In beacon mode, use the skeleton chain to retrieve the headers from - latest, _, err = d.skeleton.Bounds() + latest, _, final, err = d.skeleton.Bounds() if err != nil { return err } @@ -499,7 +499,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * // Retrieve the pivot header from the skeleton chain segment but // fallback to local chain if it's not found in skeleton space. if pivot = d.skeleton.Header(number); pivot == nil { - _, oldest, _ := d.skeleton.Bounds() // error is already checked + _, oldest, _, _ := d.skeleton.Bounds() // error is already checked if number < oldest.Number.Uint64() { count := int(oldest.Number.Uint64() - number) // it's capped by fsMinFullBlocks headers := d.readHeaderRange(oldest, count) @@ -567,26 +567,41 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * d.committed = 0 } if mode == SnapSync { - // Set the ancient data limitation. - // If we are running snap sync, all block data older than ancientLimit will be - // written to the ancient store. More recent data will be written to the active - // database and will wait for the freezer to migrate. + // Set the ancient data limitation. If we are running snap sync, all block + // data older than ancientLimit will be written to the ancient store. More + // recent data will be written to the active database and will wait for the + // freezer to migrate. // - // If there is a checkpoint available, then calculate the ancientLimit through - // that. Otherwise calculate the ancient limit through the advertised height - // of the remote peer. + // If the network is post-merge, use either the last announced finalized + // block as the ancient limit, or if we haven't yet received one, the head- + // a max fork ancestry limit. One quirky case if we've already passed the + // finalized block, in which case the skeleton.Bounds will return nil and + // we'll revert to head - 90K. That's fine, we're finishing sync anyway. // - // The reason for picking checkpoint first is that a malicious peer can give us - // a fake (very high) height, forcing the ancient limit to also be very high. - // The peer would start to feed us valid blocks until head, resulting in all of - // the blocks might be written into the ancient store. A following mini-reorg - // could cause issues. - if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 { - d.ancientLimit = d.checkpoint - } else if height > fullMaxForkAncestry+1 { - d.ancientLimit = height - fullMaxForkAncestry - 1 + // For non-merged networks, if there is a checkpoint available, then calculate + // the ancientLimit through that. Otherwise calculate the ancient limit through + // the advertised height of the remote peer. This most is mostly a fallback for + // legacy networks, but should eventually be droppped. TODO(karalabe). + if beaconMode { + // Beacon sync, use the latest finalized block as the ancient limit + // or a reasonable height if no finalized block is yet announced. + if final != nil { + d.ancientLimit = final.Number.Uint64() + } else if height > fullMaxForkAncestry+1 { + d.ancientLimit = height - fullMaxForkAncestry - 1 + } else { + d.ancientLimit = 0 + } } else { - d.ancientLimit = 0 + // Legacy sync, use any hardcoded checkpoints or the best announcement + // we have from the remote peer. TODO(karalabe): Drop this pathway. + if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 { + d.ancientLimit = d.checkpoint + } else if height > fullMaxForkAncestry+1 { + d.ancientLimit = height - fullMaxForkAncestry - 1 + } else { + d.ancientLimit = 0 + } } frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. @@ -1566,7 +1581,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { // In post-merge, notify the engine API of encountered bad chains if d.badBlock != nil { - head, _, err := d.skeleton.Bounds() + head, _, _, err := d.skeleton.Bounds() if err != nil { log.Error("Failed to retrieve beacon bounds for bad block reporting", "err", err) } else { @@ -1860,7 +1875,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { return } // Retrieve the current chain head and calculate the ETA - latest, _, err := d.skeleton.Bounds() + latest, _, _, err := d.skeleton.Bounds() if err != nil { // We're going to cheat for non-merged networks, but that's fine latest = d.pivotHeader diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 2f0c4acf7..ababb9deb 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1478,7 +1478,7 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) { if c.local > 0 { tester.chain.InsertChain(chain.blocks[1 : c.local+1]) } - if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header()); err != nil { + if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { t.Fatalf("Failed to beacon sync chain %v %v", c.name, err) } select { diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index 142e9e5e6..12eb5700f 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -102,6 +102,7 @@ type subchain struct { // suspended skeleton sync without prior knowledge of all prior suspension points. type skeletonProgress struct { Subchains []*subchain // Disjoint subchains downloaded until now + Finalized *uint64 // Last known finalized block number } // headUpdate is a notification that the beacon sync should switch to a new target. @@ -109,6 +110,7 @@ type skeletonProgress struct { // extend it and fail if it's not possible. type headUpdate struct { header *types.Header // Header to update the sync target to + final *types.Header // Finalized header to use as thresholds force bool // Whether to force the update or only extend if possible errc chan error // Channel to signal acceptance of the new head } @@ -321,12 +323,12 @@ func (s *skeleton) Terminate() error { // // This method does not block, rather it just waits until the syncer receives the // fed header. What the syncer does with it is the syncer's problem. -func (s *skeleton) Sync(head *types.Header, force bool) error { +func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) error { log.Trace("New skeleton head announced", "number", head.Number, "hash", head.Hash(), "force", force) errc := make(chan error) select { - case s.headEvents <- &headUpdate{header: head, force: force, errc: errc}: + case s.headEvents <- &headUpdate{header: head, final: final, force: force, errc: errc}: return <-errc case <-s.terminated: return errTerminated @@ -437,7 +439,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { // we don't seamlessly integrate reorgs to keep things simple. If the // network starts doing many mini reorgs, it might be worthwhile handling // a limited depth without an error. - if reorged := s.processNewHead(event.header, event.force); reorged { + if reorged := s.processNewHead(event.header, event.final, event.force); reorged { // If a reorg is needed, and we're forcing the new head, signal // the syncer to tear down and start over. Otherwise, drop the // non-force reorg. @@ -590,7 +592,17 @@ func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) { // accepts and integrates it into the skeleton or requests a reorg. Upon reorg, // the syncer will tear itself down and restart with a fresh head. It is simpler // to reconstruct the sync state than to mutate it and hope for the best. -func (s *skeleton) processNewHead(head *types.Header, force bool) bool { +func (s *skeleton) processNewHead(head *types.Header, final *types.Header, force bool) bool { + // If a new finalized block was announced, update the sync process independent + // of what happens with the sync head below + if final != nil { + if number := final.Number.Uint64(); s.progress.Finalized == nil || *s.progress.Finalized != number { + s.progress.Finalized = new(uint64) + *s.progress.Finalized = final.Number.Uint64() + + s.saveSyncStatus(s.db) + } + } // If the header cannot be inserted without interruption, return an error for // the outer loop to tear down the skeleton sync and restart it number := head.Number.Uint64() @@ -1150,9 +1162,10 @@ func (s *skeleton) cleanStales(filled *types.Header) error { return nil } -// Bounds retrieves the current head and tail tracked by the skeleton syncer. -// This method is used by the backfiller, whose life cycle is controlled by the -// skeleton syncer. +// Bounds retrieves the current head and tail tracked by the skeleton syncer +// and optionally the last known finalized header if any was announced and if +// it is still in the sync range. This method is used by the backfiller, whose +// life cycle is controlled by the skeleton syncer. // // Note, the method will not use the internal state of the skeleton, but will // rather blindly pull stuff from the database. This is fine, because the back- @@ -1160,28 +1173,34 @@ func (s *skeleton) cleanStales(filled *types.Header) error { // There might be new heads appended, but those are atomic from the perspective // of this method. Any head reorg will first tear down the backfiller and only // then make the modification. -func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, err error) { +func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, final *types.Header, err error) { // Read the current sync progress from disk and figure out the current head. // Although there's a lot of error handling here, these are mostly as sanity // checks to avoid crashing if a programming error happens. These should not // happen in live code. status := rawdb.ReadSkeletonSyncStatus(s.db) if len(status) == 0 { - return nil, nil, errors.New("beacon sync not yet started") + return nil, nil, nil, errors.New("beacon sync not yet started") } progress := new(skeletonProgress) if err := json.Unmarshal(status, progress); err != nil { - return nil, nil, err + return nil, nil, nil, err } head = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head) if head == nil { - return nil, nil, fmt.Errorf("head skeleton header %d is missing", progress.Subchains[0].Head) + return nil, nil, nil, fmt.Errorf("head skeleton header %d is missing", progress.Subchains[0].Head) } tail = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Tail) if tail == nil { - return nil, nil, fmt.Errorf("tail skeleton header %d is missing", progress.Subchains[0].Tail) + return nil, nil, nil, fmt.Errorf("tail skeleton header %d is missing", progress.Subchains[0].Tail) } - return head, tail, nil + if progress.Finalized != nil && tail.Number.Uint64() <= *progress.Finalized && *progress.Finalized <= head.Number.Uint64() { + final = rawdb.ReadSkeletonHeader(s.db, *progress.Finalized) + if final == nil { + return nil, nil, nil, fmt.Errorf("finalized skeleton header %d is missing", *progress.Finalized) + } + } + return head, tail, final, nil } // Header retrieves a specific header tracked by the skeleton syncer. This method diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 3b8e627cb..b19494a7b 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -370,7 +370,7 @@ func TestSkeletonSyncInit(t *testing.T) { skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } - skeleton.Sync(tt.head, true) + skeleton.Sync(tt.head, nil, true) <-wait skeleton.Terminate() @@ -484,10 +484,10 @@ func TestSkeletonSyncExtend(t *testing.T) { skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } - skeleton.Sync(tt.head, true) + skeleton.Sync(tt.head, nil, true) <-wait - if err := skeleton.Sync(tt.extend, false); err != tt.err { + if err := skeleton.Sync(tt.extend, nil, false); err != tt.err { t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err) } skeleton.Terminate() @@ -859,7 +859,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) { } // Create a skeleton sync and run a cycle skeleton := newSkeleton(db, peerset, drop, filler) - skeleton.Sync(tt.head, true) + skeleton.Sync(tt.head, nil, true) var progress skeletonProgress // Wait a bit (bleah) for the initial sync loop to go to idle. This might @@ -910,7 +910,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) { } // Apply the post-init events if there's any if tt.newHead != nil { - skeleton.Sync(tt.newHead, true) + skeleton.Sync(tt.newHead, nil, true) } if tt.newPeer != nil { if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH66, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil {