Patch for concurrent iterator & others (onto v1.11.6) #386
@ -237,6 +237,10 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
|
||||
log.Warn("Forkchoice requested unknown head", "hash", update.HeadBlockHash)
|
||||
return engine.STATUS_SYNCING, nil
|
||||
}
|
||||
// If the finalized hash is known, we can direct the downloader to move
|
||||
// potentially more data to the freezer from the get go.
|
||||
finalized := api.remoteBlocks.get(update.FinalizedBlockHash)
|
||||
|
||||
// Header advertised via a past newPayload request. Start syncing to it.
|
||||
// Before we do however, make sure any legacy sync in switched off so we
|
||||
// don't accidentally have 2 cycles running.
|
||||
@ -244,8 +248,16 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
|
||||
merger.ReachTTD()
|
||||
api.eth.Downloader().Cancel()
|
||||
}
|
||||
log.Info("Forkchoice requested sync to new head", "number", header.Number, "hash", header.Hash())
|
||||
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header); err != nil {
|
||||
context := []interface{}{"number", header.Number, "hash", header.Hash()}
|
||||
if update.FinalizedBlockHash != (common.Hash{}) {
|
||||
if finalized == nil {
|
||||
context = append(context, []interface{}{"finalized", "unknown"}...)
|
||||
} else {
|
||||
context = append(context, []interface{}{"finalized", finalized.Number}...)
|
||||
}
|
||||
}
|
||||
log.Info("Forkchoice requested sync to new head", context...)
|
||||
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header, finalized); err != nil {
|
||||
return engine.STATUS_SYNCING, err
|
||||
}
|
||||
return engine.STATUS_SYNCING, nil
|
||||
|
@ -31,9 +31,12 @@ import (
|
||||
const maxTrackedPayloads = 10
|
||||
|
||||
// maxTrackedHeaders is the maximum number of executed payloads the execution
|
||||
// engine tracks before evicting old ones. Ideally we should only ever track the
|
||||
// latest one; but have a slight wiggle room for non-ideal conditions.
|
||||
const maxTrackedHeaders = 10
|
||||
// engine tracks before evicting old ones. These are tracked outside the chain
|
||||
// during initial sync to allow ForkchoiceUpdate to reference past blocks via
|
||||
// hashes only. For the sync target it would be enough to track only the latest
|
||||
// header, but snap sync also needs the latest finalized height for the ancient
|
||||
// limit.
|
||||
const maxTrackedHeaders = 96
|
||||
|
||||
// payloadQueueItem represents an id->payload tuple to store until it's retrieved
|
||||
// or evicted.
|
||||
|
@ -75,7 +75,7 @@ func (tester *FullSyncTester) Start() error {
|
||||
}
|
||||
// Trigger beacon sync with the provided block header as
|
||||
// trusted chain head.
|
||||
err := tester.api.eth.Downloader().BeaconSync(downloader.FullSync, tester.block.Header())
|
||||
err := tester.api.eth.Downloader().BeaconSync(downloader.FullSync, tester.block.Header(), nil)
|
||||
if err != nil {
|
||||
log.Info("Failed to beacon sync", "err", err)
|
||||
}
|
||||
|
@ -151,8 +151,8 @@ func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
|
||||
//
|
||||
// Internally backfilling and state sync is done the same way, but the header
|
||||
// retrieval and scheduling is replaced.
|
||||
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
|
||||
return d.beaconSync(mode, head, true)
|
||||
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types.Header) error {
|
||||
return d.beaconSync(mode, head, final, true)
|
||||
}
|
||||
|
||||
// BeaconExtend is an optimistic version of BeaconSync, where an attempt is made
|
||||
@ -162,7 +162,7 @@ func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
|
||||
// This is useful if a beacon client is feeding us large chunks of payloads to run,
|
||||
// but is not setting the head after each.
|
||||
func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
|
||||
return d.beaconSync(mode, head, false)
|
||||
return d.beaconSync(mode, head, nil, false)
|
||||
}
|
||||
|
||||
// beaconSync is the post-merge version of the chain synchronization, where the
|
||||
@ -171,7 +171,7 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
|
||||
//
|
||||
// Internally backfilling and state sync is done the same way, but the header
|
||||
// retrieval and scheduling is replaced.
|
||||
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) error {
|
||||
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, final *types.Header, force bool) error {
|
||||
// When the downloader starts a sync cycle, it needs to be aware of the sync
|
||||
// mode to use (full, snap). To keep the skeleton chain oblivious, inject the
|
||||
// mode into the backfiller directly.
|
||||
@ -181,7 +181,7 @@ func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) e
|
||||
d.skeleton.filler.(*beaconBackfiller).setMode(mode)
|
||||
|
||||
// Signal the skeleton sync to switch to a new head, however it wants
|
||||
if err := d.skeleton.Sync(head, force); err != nil {
|
||||
if err := d.skeleton.Sync(head, final, force); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -207,7 +207,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
|
||||
number := chainHead.Number.Uint64()
|
||||
|
||||
// Retrieve the skeleton bounds and ensure they are linked to the local chain
|
||||
beaconHead, beaconTail, err := d.skeleton.Bounds()
|
||||
beaconHead, beaconTail, _, err := d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
// This is a programming error. The chain backfiller was called with an
|
||||
// invalid beacon sync state. Ideally we would panic here, but erroring
|
||||
@ -272,7 +272,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
|
||||
// until sync errors or is finished.
|
||||
func (d *Downloader) fetchBeaconHeaders(from uint64) error {
|
||||
var head *types.Header
|
||||
_, tail, err := d.skeleton.Bounds()
|
||||
_, tail, _, err := d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -292,7 +292,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
|
||||
for {
|
||||
// Some beacon headers might have appeared since the last cycle, make
|
||||
// sure we're always syncing to all available ones
|
||||
head, _, err = d.skeleton.Bounds()
|
||||
head, _, _, err = d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -480,7 +480,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
|
||||
}(time.Now())
|
||||
|
||||
// Look up the sync boundaries: the common ancestor and the target block
|
||||
var latest, pivot *types.Header
|
||||
var latest, pivot, final *types.Header
|
||||
if !beaconMode {
|
||||
// In legacy mode, use the master peer to retrieve the headers from
|
||||
latest, pivot, err = d.fetchHead(p)
|
||||
@ -489,7 +489,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
|
||||
}
|
||||
} else {
|
||||
// In beacon mode, use the skeleton chain to retrieve the headers from
|
||||
latest, _, err = d.skeleton.Bounds()
|
||||
latest, _, final, err = d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -499,7 +499,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
|
||||
// Retrieve the pivot header from the skeleton chain segment but
|
||||
// fallback to local chain if it's not found in skeleton space.
|
||||
if pivot = d.skeleton.Header(number); pivot == nil {
|
||||
_, oldest, _ := d.skeleton.Bounds() // error is already checked
|
||||
_, oldest, _, _ := d.skeleton.Bounds() // error is already checked
|
||||
if number < oldest.Number.Uint64() {
|
||||
count := int(oldest.Number.Uint64() - number) // it's capped by fsMinFullBlocks
|
||||
headers := d.readHeaderRange(oldest, count)
|
||||
@ -567,20 +567,34 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
|
||||
d.committed = 0
|
||||
}
|
||||
if mode == SnapSync {
|
||||
// Set the ancient data limitation.
|
||||
// If we are running snap sync, all block data older than ancientLimit will be
|
||||
// written to the ancient store. More recent data will be written to the active
|
||||
// database and will wait for the freezer to migrate.
|
||||
// Set the ancient data limitation. If we are running snap sync, all block
|
||||
// data older than ancientLimit will be written to the ancient store. More
|
||||
// recent data will be written to the active database and will wait for the
|
||||
// freezer to migrate.
|
||||
//
|
||||
// If there is a checkpoint available, then calculate the ancientLimit through
|
||||
// that. Otherwise calculate the ancient limit through the advertised height
|
||||
// of the remote peer.
|
||||
// If the network is post-merge, use either the last announced finalized
|
||||
// block as the ancient limit, or if we haven't yet received one, the head-
|
||||
// a max fork ancestry limit. One quirky case if we've already passed the
|
||||
// finalized block, in which case the skeleton.Bounds will return nil and
|
||||
// we'll revert to head - 90K. That's fine, we're finishing sync anyway.
|
||||
//
|
||||
// The reason for picking checkpoint first is that a malicious peer can give us
|
||||
// a fake (very high) height, forcing the ancient limit to also be very high.
|
||||
// The peer would start to feed us valid blocks until head, resulting in all of
|
||||
// the blocks might be written into the ancient store. A following mini-reorg
|
||||
// could cause issues.
|
||||
// For non-merged networks, if there is a checkpoint available, then calculate
|
||||
// the ancientLimit through that. Otherwise calculate the ancient limit through
|
||||
// the advertised height of the remote peer. This most is mostly a fallback for
|
||||
// legacy networks, but should eventually be droppped. TODO(karalabe).
|
||||
if beaconMode {
|
||||
// Beacon sync, use the latest finalized block as the ancient limit
|
||||
// or a reasonable height if no finalized block is yet announced.
|
||||
if final != nil {
|
||||
d.ancientLimit = final.Number.Uint64()
|
||||
} else if height > fullMaxForkAncestry+1 {
|
||||
d.ancientLimit = height - fullMaxForkAncestry - 1
|
||||
} else {
|
||||
d.ancientLimit = 0
|
||||
}
|
||||
} else {
|
||||
// Legacy sync, use any hardcoded checkpoints or the best announcement
|
||||
// we have from the remote peer. TODO(karalabe): Drop this pathway.
|
||||
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
|
||||
d.ancientLimit = d.checkpoint
|
||||
} else if height > fullMaxForkAncestry+1 {
|
||||
@ -588,6 +602,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
|
||||
} else {
|
||||
d.ancientLimit = 0
|
||||
}
|
||||
}
|
||||
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
|
||||
|
||||
// If a part of blockchain data has already been written into active store,
|
||||
@ -1566,7 +1581,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
|
||||
|
||||
// In post-merge, notify the engine API of encountered bad chains
|
||||
if d.badBlock != nil {
|
||||
head, _, err := d.skeleton.Bounds()
|
||||
head, _, _, err := d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
log.Error("Failed to retrieve beacon bounds for bad block reporting", "err", err)
|
||||
} else {
|
||||
@ -1860,7 +1875,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
|
||||
return
|
||||
}
|
||||
// Retrieve the current chain head and calculate the ETA
|
||||
latest, _, err := d.skeleton.Bounds()
|
||||
latest, _, _, err := d.skeleton.Bounds()
|
||||
if err != nil {
|
||||
// We're going to cheat for non-merged networks, but that's fine
|
||||
latest = d.pivotHeader
|
||||
|
@ -1478,7 +1478,7 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {
|
||||
if c.local > 0 {
|
||||
tester.chain.InsertChain(chain.blocks[1 : c.local+1])
|
||||
}
|
||||
if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header()); err != nil {
|
||||
if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil {
|
||||
t.Fatalf("Failed to beacon sync chain %v %v", c.name, err)
|
||||
}
|
||||
select {
|
||||
|
@ -102,6 +102,7 @@ type subchain struct {
|
||||
// suspended skeleton sync without prior knowledge of all prior suspension points.
|
||||
type skeletonProgress struct {
|
||||
Subchains []*subchain // Disjoint subchains downloaded until now
|
||||
Finalized *uint64 // Last known finalized block number
|
||||
}
|
||||
|
||||
// headUpdate is a notification that the beacon sync should switch to a new target.
|
||||
@ -109,6 +110,7 @@ type skeletonProgress struct {
|
||||
// extend it and fail if it's not possible.
|
||||
type headUpdate struct {
|
||||
header *types.Header // Header to update the sync target to
|
||||
final *types.Header // Finalized header to use as thresholds
|
||||
force bool // Whether to force the update or only extend if possible
|
||||
errc chan error // Channel to signal acceptance of the new head
|
||||
}
|
||||
@ -321,12 +323,12 @@ func (s *skeleton) Terminate() error {
|
||||
//
|
||||
// This method does not block, rather it just waits until the syncer receives the
|
||||
// fed header. What the syncer does with it is the syncer's problem.
|
||||
func (s *skeleton) Sync(head *types.Header, force bool) error {
|
||||
func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) error {
|
||||
log.Trace("New skeleton head announced", "number", head.Number, "hash", head.Hash(), "force", force)
|
||||
errc := make(chan error)
|
||||
|
||||
select {
|
||||
case s.headEvents <- &headUpdate{header: head, force: force, errc: errc}:
|
||||
case s.headEvents <- &headUpdate{header: head, final: final, force: force, errc: errc}:
|
||||
return <-errc
|
||||
case <-s.terminated:
|
||||
return errTerminated
|
||||
@ -437,7 +439,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
|
||||
// we don't seamlessly integrate reorgs to keep things simple. If the
|
||||
// network starts doing many mini reorgs, it might be worthwhile handling
|
||||
// a limited depth without an error.
|
||||
if reorged := s.processNewHead(event.header, event.force); reorged {
|
||||
if reorged := s.processNewHead(event.header, event.final, event.force); reorged {
|
||||
// If a reorg is needed, and we're forcing the new head, signal
|
||||
// the syncer to tear down and start over. Otherwise, drop the
|
||||
// non-force reorg.
|
||||
@ -590,7 +592,17 @@ func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) {
|
||||
// accepts and integrates it into the skeleton or requests a reorg. Upon reorg,
|
||||
// the syncer will tear itself down and restart with a fresh head. It is simpler
|
||||
// to reconstruct the sync state than to mutate it and hope for the best.
|
||||
func (s *skeleton) processNewHead(head *types.Header, force bool) bool {
|
||||
func (s *skeleton) processNewHead(head *types.Header, final *types.Header, force bool) bool {
|
||||
// If a new finalized block was announced, update the sync process independent
|
||||
// of what happens with the sync head below
|
||||
if final != nil {
|
||||
if number := final.Number.Uint64(); s.progress.Finalized == nil || *s.progress.Finalized != number {
|
||||
s.progress.Finalized = new(uint64)
|
||||
*s.progress.Finalized = final.Number.Uint64()
|
||||
|
||||
s.saveSyncStatus(s.db)
|
||||
}
|
||||
}
|
||||
// If the header cannot be inserted without interruption, return an error for
|
||||
// the outer loop to tear down the skeleton sync and restart it
|
||||
number := head.Number.Uint64()
|
||||
@ -1150,9 +1162,10 @@ func (s *skeleton) cleanStales(filled *types.Header) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Bounds retrieves the current head and tail tracked by the skeleton syncer.
|
||||
// This method is used by the backfiller, whose life cycle is controlled by the
|
||||
// skeleton syncer.
|
||||
// Bounds retrieves the current head and tail tracked by the skeleton syncer
|
||||
// and optionally the last known finalized header if any was announced and if
|
||||
// it is still in the sync range. This method is used by the backfiller, whose
|
||||
// life cycle is controlled by the skeleton syncer.
|
||||
//
|
||||
// Note, the method will not use the internal state of the skeleton, but will
|
||||
// rather blindly pull stuff from the database. This is fine, because the back-
|
||||
@ -1160,28 +1173,34 @@ func (s *skeleton) cleanStales(filled *types.Header) error {
|
||||
// There might be new heads appended, but those are atomic from the perspective
|
||||
// of this method. Any head reorg will first tear down the backfiller and only
|
||||
// then make the modification.
|
||||
func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, err error) {
|
||||
func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, final *types.Header, err error) {
|
||||
// Read the current sync progress from disk and figure out the current head.
|
||||
// Although there's a lot of error handling here, these are mostly as sanity
|
||||
// checks to avoid crashing if a programming error happens. These should not
|
||||
// happen in live code.
|
||||
status := rawdb.ReadSkeletonSyncStatus(s.db)
|
||||
if len(status) == 0 {
|
||||
return nil, nil, errors.New("beacon sync not yet started")
|
||||
return nil, nil, nil, errors.New("beacon sync not yet started")
|
||||
}
|
||||
progress := new(skeletonProgress)
|
||||
if err := json.Unmarshal(status, progress); err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
head = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head)
|
||||
if head == nil {
|
||||
return nil, nil, fmt.Errorf("head skeleton header %d is missing", progress.Subchains[0].Head)
|
||||
return nil, nil, nil, fmt.Errorf("head skeleton header %d is missing", progress.Subchains[0].Head)
|
||||
}
|
||||
tail = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Tail)
|
||||
if tail == nil {
|
||||
return nil, nil, fmt.Errorf("tail skeleton header %d is missing", progress.Subchains[0].Tail)
|
||||
return nil, nil, nil, fmt.Errorf("tail skeleton header %d is missing", progress.Subchains[0].Tail)
|
||||
}
|
||||
return head, tail, nil
|
||||
if progress.Finalized != nil && tail.Number.Uint64() <= *progress.Finalized && *progress.Finalized <= head.Number.Uint64() {
|
||||
final = rawdb.ReadSkeletonHeader(s.db, *progress.Finalized)
|
||||
if final == nil {
|
||||
return nil, nil, nil, fmt.Errorf("finalized skeleton header %d is missing", *progress.Finalized)
|
||||
}
|
||||
}
|
||||
return head, tail, final, nil
|
||||
}
|
||||
|
||||
// Header retrieves a specific header tracked by the skeleton syncer. This method
|
||||
|
@ -370,7 +370,7 @@ func TestSkeletonSyncInit(t *testing.T) {
|
||||
|
||||
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
|
||||
skeleton.syncStarting = func() { close(wait) }
|
||||
skeleton.Sync(tt.head, true)
|
||||
skeleton.Sync(tt.head, nil, true)
|
||||
|
||||
<-wait
|
||||
skeleton.Terminate()
|
||||
@ -484,10 +484,10 @@ func TestSkeletonSyncExtend(t *testing.T) {
|
||||
|
||||
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
|
||||
skeleton.syncStarting = func() { close(wait) }
|
||||
skeleton.Sync(tt.head, true)
|
||||
skeleton.Sync(tt.head, nil, true)
|
||||
|
||||
<-wait
|
||||
if err := skeleton.Sync(tt.extend, false); err != tt.err {
|
||||
if err := skeleton.Sync(tt.extend, nil, false); err != tt.err {
|
||||
t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err)
|
||||
}
|
||||
skeleton.Terminate()
|
||||
@ -859,7 +859,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
|
||||
}
|
||||
// Create a skeleton sync and run a cycle
|
||||
skeleton := newSkeleton(db, peerset, drop, filler)
|
||||
skeleton.Sync(tt.head, true)
|
||||
skeleton.Sync(tt.head, nil, true)
|
||||
|
||||
var progress skeletonProgress
|
||||
// Wait a bit (bleah) for the initial sync loop to go to idle. This might
|
||||
@ -910,7 +910,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
|
||||
}
|
||||
// Apply the post-init events if there's any
|
||||
if tt.newHead != nil {
|
||||
skeleton.Sync(tt.newHead, true)
|
||||
skeleton.Sync(tt.newHead, nil, true)
|
||||
}
|
||||
if tt.newPeer != nil {
|
||||
if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH66, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user