forked from cerc-io/plugeth
Merge pull request #21529 from karalabe/dynamic-pivot
eth/downloader: dynamically move pivot even during chain sync
This commit is contained in:
commit
2482ba016e
@ -138,7 +138,10 @@ type Downloader struct {
|
|||||||
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
|
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
|
||||||
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
|
headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
|
||||||
|
|
||||||
// for stateFetcher
|
// State sync
|
||||||
|
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
|
||||||
|
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates
|
||||||
|
|
||||||
stateSyncStart chan *stateSync
|
stateSyncStart chan *stateSync
|
||||||
trackStateReq chan *stateReq
|
trackStateReq chan *stateReq
|
||||||
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
|
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
|
||||||
@ -451,10 +454,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
|
|||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
// Look up the sync boundaries: the common ancestor and the target block
|
// Look up the sync boundaries: the common ancestor and the target block
|
||||||
latest, err := d.fetchHeight(p)
|
latest, pivot, err := d.fetchHead(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if mode == FastSync && pivot == nil {
|
||||||
|
// If no pivot block was returned, the head is below the min full block
|
||||||
|
// threshold (i.e. new chian). In that case we won't really fast sync
|
||||||
|
// anyway, but still need a valid pivot block to avoid some code hitting
|
||||||
|
// nil panics on an access.
|
||||||
|
pivot = d.blockchain.CurrentBlock().Header()
|
||||||
|
}
|
||||||
height := latest.Number.Uint64()
|
height := latest.Number.Uint64()
|
||||||
|
|
||||||
origin, err := d.findAncestor(p, latest)
|
origin, err := d.findAncestor(p, latest)
|
||||||
@ -469,22 +479,21 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
|
|||||||
d.syncStatsLock.Unlock()
|
d.syncStatsLock.Unlock()
|
||||||
|
|
||||||
// Ensure our origin point is below any fast sync pivot point
|
// Ensure our origin point is below any fast sync pivot point
|
||||||
pivot := uint64(0)
|
|
||||||
if mode == FastSync {
|
if mode == FastSync {
|
||||||
if height <= uint64(fsMinFullBlocks) {
|
if height <= uint64(fsMinFullBlocks) {
|
||||||
origin = 0
|
origin = 0
|
||||||
} else {
|
} else {
|
||||||
pivot = height - uint64(fsMinFullBlocks)
|
pivotNumber := pivot.Number.Uint64()
|
||||||
if pivot <= origin {
|
if pivotNumber <= origin {
|
||||||
origin = pivot - 1
|
origin = pivotNumber - 1
|
||||||
}
|
}
|
||||||
// Write out the pivot into the database so a rollback beyond it will
|
// Write out the pivot into the database so a rollback beyond it will
|
||||||
// reenable fast sync
|
// reenable fast sync
|
||||||
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
|
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.committed = 1
|
d.committed = 1
|
||||||
if mode == FastSync && pivot != 0 {
|
if mode == FastSync && pivot.Number.Uint64() != 0 {
|
||||||
d.committed = 0
|
d.committed = 0
|
||||||
}
|
}
|
||||||
if mode == FastSync {
|
if mode == FastSync {
|
||||||
@ -530,13 +539,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
|
|||||||
d.syncInitHook(origin, height)
|
d.syncInitHook(origin, height)
|
||||||
}
|
}
|
||||||
fetchers := []func() error{
|
fetchers := []func() error{
|
||||||
func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
|
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
|
||||||
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
|
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
|
||||||
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
|
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
|
||||||
func() error { return d.processHeaders(origin+1, pivot, td) },
|
func() error { return d.processHeaders(origin+1, td) },
|
||||||
}
|
}
|
||||||
if mode == FastSync {
|
if mode == FastSync {
|
||||||
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
|
d.pivotLock.Lock()
|
||||||
|
d.pivotHeader = pivot
|
||||||
|
d.pivotLock.Unlock()
|
||||||
|
|
||||||
|
fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
|
||||||
} else if mode == FullSync {
|
} else if mode == FullSync {
|
||||||
fetchers = append(fetchers, d.processFullSyncContent)
|
fetchers = append(fetchers, d.processFullSyncContent)
|
||||||
}
|
}
|
||||||
@ -617,22 +630,26 @@ func (d *Downloader) Terminate() {
|
|||||||
d.Cancel()
|
d.Cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchHeight retrieves the head header of the remote peer to aid in estimating
|
// fetchHead retrieves the head header and prior pivot block (if available) from
|
||||||
// the total time a pending synchronisation would take.
|
// a remote peer.
|
||||||
func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
|
func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {
|
||||||
p.log.Debug("Retrieving remote chain height")
|
p.log.Debug("Retrieving remote chain head")
|
||||||
|
mode := d.getMode()
|
||||||
|
|
||||||
// Request the advertised remote head block and wait for the response
|
// Request the advertised remote head block and wait for the response
|
||||||
head, _ := p.peer.Head()
|
latest, _ := p.peer.Head()
|
||||||
go p.peer.RequestHeadersByHash(head, 1, 0, false)
|
fetch := 1
|
||||||
|
if mode == FastSync {
|
||||||
|
fetch = 2 // head + pivot headers
|
||||||
|
}
|
||||||
|
go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)
|
||||||
|
|
||||||
ttl := d.requestTTL()
|
ttl := d.requestTTL()
|
||||||
timeout := time.After(ttl)
|
timeout := time.After(ttl)
|
||||||
mode := d.getMode()
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-d.cancelCh:
|
case <-d.cancelCh:
|
||||||
return nil, errCanceled
|
return nil, nil, errCanceled
|
||||||
|
|
||||||
case packet := <-d.headerCh:
|
case packet := <-d.headerCh:
|
||||||
// Discard anything not from the origin peer
|
// Discard anything not from the origin peer
|
||||||
@ -640,23 +657,36 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
|
|||||||
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
|
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Make sure the peer actually gave something valid
|
// Make sure the peer gave us at least one and at most the requested headers
|
||||||
headers := packet.(*headerPack).headers
|
headers := packet.(*headerPack).headers
|
||||||
if len(headers) != 1 {
|
if len(headers) == 0 || len(headers) > fetch {
|
||||||
p.log.Warn("Multiple headers for single request", "headers", len(headers))
|
return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch)
|
||||||
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
|
|
||||||
}
|
}
|
||||||
|
// The first header needs to be the head, validate against the checkpoint
|
||||||
|
// and request. If only 1 header was returned, make sure there's no pivot
|
||||||
|
// or there was not one requested.
|
||||||
head := headers[0]
|
head := headers[0]
|
||||||
if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
|
if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
|
||||||
p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash())
|
return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
|
||||||
return nil, errUnsyncedPeer
|
|
||||||
}
|
}
|
||||||
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
|
if len(headers) == 1 {
|
||||||
return head, nil
|
if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
|
||||||
|
return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
|
||||||
|
}
|
||||||
|
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
|
||||||
|
return head, nil, nil
|
||||||
|
}
|
||||||
|
// At this point we have 2 headers in total and the first is the
|
||||||
|
// validated head of the chian. Check the pivot number and return,
|
||||||
|
pivot := headers[1]
|
||||||
|
if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
|
||||||
|
return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
|
||||||
|
}
|
||||||
|
return head, pivot, nil
|
||||||
|
|
||||||
case <-timeout:
|
case <-timeout:
|
||||||
p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
|
p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
|
||||||
return nil, errTimeout
|
return nil, nil, errTimeout
|
||||||
|
|
||||||
case <-d.bodyCh:
|
case <-d.bodyCh:
|
||||||
case <-d.receiptCh:
|
case <-d.receiptCh:
|
||||||
@ -871,14 +901,14 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
|
|||||||
case <-d.cancelCh:
|
case <-d.cancelCh:
|
||||||
return 0, errCanceled
|
return 0, errCanceled
|
||||||
|
|
||||||
case packer := <-d.headerCh:
|
case packet := <-d.headerCh:
|
||||||
// Discard anything not from the origin peer
|
// Discard anything not from the origin peer
|
||||||
if packer.PeerId() != p.id {
|
if packet.PeerId() != p.id {
|
||||||
log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
|
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Make sure the peer actually gave something valid
|
// Make sure the peer actually gave something valid
|
||||||
headers := packer.(*headerPack).headers
|
headers := packet.(*headerPack).headers
|
||||||
if len(headers) != 1 {
|
if len(headers) != 1 {
|
||||||
p.log.Warn("Multiple headers for single request", "headers", len(headers))
|
p.log.Warn("Multiple headers for single request", "headers", len(headers))
|
||||||
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
|
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
|
||||||
@ -937,12 +967,13 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
|
|||||||
// other peers are only accepted if they map cleanly to the skeleton. If no one
|
// other peers are only accepted if they map cleanly to the skeleton. If no one
|
||||||
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
|
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
|
||||||
// the origin is dropped.
|
// the origin is dropped.
|
||||||
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
|
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
|
||||||
p.log.Debug("Directing header downloads", "origin", from)
|
p.log.Debug("Directing header downloads", "origin", from)
|
||||||
defer p.log.Debug("Header download terminated")
|
defer p.log.Debug("Header download terminated")
|
||||||
|
|
||||||
// Create a timeout timer, and the associated header fetcher
|
// Create a timeout timer, and the associated header fetcher
|
||||||
skeleton := true // Skeleton assembly phase or finishing up
|
skeleton := true // Skeleton assembly phase or finishing up
|
||||||
|
pivoting := false // Whether the next request is pivot verification
|
||||||
request := time.Now() // time of the last skeleton fetch request
|
request := time.Now() // time of the last skeleton fetch request
|
||||||
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
|
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
|
||||||
<-timeout.C // timeout channel should be initially empty
|
<-timeout.C // timeout channel should be initially empty
|
||||||
@ -963,6 +994,20 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
|
|||||||
go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
|
go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
getNextPivot := func() {
|
||||||
|
pivoting = true
|
||||||
|
request = time.Now()
|
||||||
|
|
||||||
|
ttl = d.requestTTL()
|
||||||
|
timeout.Reset(ttl)
|
||||||
|
|
||||||
|
d.pivotLock.RLock()
|
||||||
|
pivot := d.pivotHeader.Number.Uint64()
|
||||||
|
d.pivotLock.RUnlock()
|
||||||
|
|
||||||
|
p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks))
|
||||||
|
go p.peer.RequestHeadersByNumber(pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep
|
||||||
|
}
|
||||||
// Start pulling the header chain skeleton until all is done
|
// Start pulling the header chain skeleton until all is done
|
||||||
ancestor := from
|
ancestor := from
|
||||||
getHeaders(from)
|
getHeaders(from)
|
||||||
@ -982,8 +1027,46 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
|
|||||||
headerReqTimer.UpdateSince(request)
|
headerReqTimer.UpdateSince(request)
|
||||||
timeout.Stop()
|
timeout.Stop()
|
||||||
|
|
||||||
|
// If the pivot is being checked, move if it became stale and run the real retrieval
|
||||||
|
var pivot uint64
|
||||||
|
|
||||||
|
d.pivotLock.RLock()
|
||||||
|
if d.pivotHeader != nil {
|
||||||
|
pivot = d.pivotHeader.Number.Uint64()
|
||||||
|
}
|
||||||
|
d.pivotLock.RUnlock()
|
||||||
|
|
||||||
|
if pivoting {
|
||||||
|
if packet.Items() == 2 {
|
||||||
|
// Retrieve the headers and do some sanity checks, just in case
|
||||||
|
headers := packet.(*headerPack).headers
|
||||||
|
|
||||||
|
if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want {
|
||||||
|
log.Warn("Peer sent invalid next pivot", "have", have, "want", want)
|
||||||
|
return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want)
|
||||||
|
}
|
||||||
|
if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want {
|
||||||
|
log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want)
|
||||||
|
return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want)
|
||||||
|
}
|
||||||
|
log.Warn("Pivot seemingly stale, moving", "old", pivot, "new", headers[0].Number)
|
||||||
|
pivot = headers[0].Number.Uint64()
|
||||||
|
|
||||||
|
d.pivotLock.Lock()
|
||||||
|
d.pivotHeader = headers[0]
|
||||||
|
d.pivotLock.Unlock()
|
||||||
|
|
||||||
|
// Write out the pivot into the database so a rollback beyond
|
||||||
|
// it will reenable fast sync and update the state root that
|
||||||
|
// the state syncer will be downloading.
|
||||||
|
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
|
||||||
|
}
|
||||||
|
pivoting = false
|
||||||
|
getHeaders(from)
|
||||||
|
continue
|
||||||
|
}
|
||||||
// If the skeleton's finished, pull any remaining head headers directly from the origin
|
// If the skeleton's finished, pull any remaining head headers directly from the origin
|
||||||
if packet.Items() == 0 && skeleton {
|
if skeleton && packet.Items() == 0 {
|
||||||
skeleton = false
|
skeleton = false
|
||||||
getHeaders(from)
|
getHeaders(from)
|
||||||
continue
|
continue
|
||||||
@ -1061,7 +1144,14 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
|
|||||||
return errCanceled
|
return errCanceled
|
||||||
}
|
}
|
||||||
from += uint64(len(headers))
|
from += uint64(len(headers))
|
||||||
|
|
||||||
|
// If we're still skeleton filling fast sync, check pivot staleness
|
||||||
|
// before continuing to the next skeleton filling
|
||||||
|
if skeleton && pivot > 0 {
|
||||||
|
getNextPivot()
|
||||||
|
} else {
|
||||||
getHeaders(from)
|
getHeaders(from)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// No headers delivered, or all of them being delayed, sleep a bit and retry
|
// No headers delivered, or all of them being delayed, sleep a bit and retry
|
||||||
p.log.Trace("All headers delayed, waiting")
|
p.log.Trace("All headers delayed, waiting")
|
||||||
@ -1390,7 +1480,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
|
|||||||
// processHeaders takes batches of retrieved headers from an input channel and
|
// processHeaders takes batches of retrieved headers from an input channel and
|
||||||
// keeps processing and scheduling them into the header chain and downloader's
|
// keeps processing and scheduling them into the header chain and downloader's
|
||||||
// queue until the stream ends or a failure occurs.
|
// queue until the stream ends or a failure occurs.
|
||||||
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
|
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
|
||||||
// Keep a count of uncertain headers to roll back
|
// Keep a count of uncertain headers to roll back
|
||||||
var (
|
var (
|
||||||
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
|
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
|
||||||
@ -1493,6 +1583,14 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
|
|||||||
// In case of header only syncing, validate the chunk immediately
|
// In case of header only syncing, validate the chunk immediately
|
||||||
if mode == FastSync || mode == LightSync {
|
if mode == FastSync || mode == LightSync {
|
||||||
// If we're importing pure headers, verify based on their recentness
|
// If we're importing pure headers, verify based on their recentness
|
||||||
|
var pivot uint64
|
||||||
|
|
||||||
|
d.pivotLock.RLock()
|
||||||
|
if d.pivotHeader != nil {
|
||||||
|
pivot = d.pivotHeader.Number.Uint64()
|
||||||
|
}
|
||||||
|
d.pivotLock.RUnlock()
|
||||||
|
|
||||||
frequency := fsHeaderCheckFrequency
|
frequency := fsHeaderCheckFrequency
|
||||||
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
|
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
|
||||||
frequency = 1
|
frequency = 1
|
||||||
@ -1609,10 +1707,13 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
|
|||||||
|
|
||||||
// processFastSyncContent takes fetch results from the queue and writes them to the
|
// processFastSyncContent takes fetch results from the queue and writes them to the
|
||||||
// database. It also controls the synchronisation of state nodes of the pivot block.
|
// database. It also controls the synchronisation of state nodes of the pivot block.
|
||||||
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
|
func (d *Downloader) processFastSyncContent() error {
|
||||||
// Start syncing state of the reported head block. This should get us most of
|
// Start syncing state of the reported head block. This should get us most of
|
||||||
// the state of the pivot block.
|
// the state of the pivot block.
|
||||||
sync := d.syncState(latest.Root)
|
d.pivotLock.RLock()
|
||||||
|
sync := d.syncState(d.pivotHeader.Root)
|
||||||
|
d.pivotLock.RUnlock()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// The `sync` object is replaced every time the pivot moves. We need to
|
// The `sync` object is replaced every time the pivot moves. We need to
|
||||||
// defer close the very last active one, hence the lazy evaluation vs.
|
// defer close the very last active one, hence the lazy evaluation vs.
|
||||||
@ -1627,12 +1728,6 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
|
|||||||
}
|
}
|
||||||
go closeOnErr(sync)
|
go closeOnErr(sync)
|
||||||
|
|
||||||
// Figure out the ideal pivot block. Note, that this goalpost may move if the
|
|
||||||
// sync takes long enough for the chain head to move significantly.
|
|
||||||
pivot := uint64(0)
|
|
||||||
if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
|
|
||||||
pivot = height - uint64(fsMinFullBlocks)
|
|
||||||
}
|
|
||||||
// To cater for moving pivot points, track the pivot block and subsequently
|
// To cater for moving pivot points, track the pivot block and subsequently
|
||||||
// accumulated download results separately.
|
// accumulated download results separately.
|
||||||
var (
|
var (
|
||||||
@ -1659,22 +1754,46 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
|
|||||||
if d.chainInsertHook != nil {
|
if d.chainInsertHook != nil {
|
||||||
d.chainInsertHook(results)
|
d.chainInsertHook(results)
|
||||||
}
|
}
|
||||||
if oldPivot != nil {
|
// If we haven't downloaded the pivot block yet, check pivot staleness
|
||||||
|
// notifications from the header downloader
|
||||||
|
d.pivotLock.RLock()
|
||||||
|
pivot := d.pivotHeader
|
||||||
|
d.pivotLock.RUnlock()
|
||||||
|
|
||||||
|
if oldPivot == nil {
|
||||||
|
if pivot.Root != sync.root {
|
||||||
|
sync.Cancel()
|
||||||
|
sync = d.syncState(pivot.Root)
|
||||||
|
|
||||||
|
go closeOnErr(sync)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
|
results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
|
||||||
}
|
}
|
||||||
// Split around the pivot block and process the two sides via fast/full sync
|
// Split around the pivot block and process the two sides via fast/full sync
|
||||||
if atomic.LoadInt32(&d.committed) == 0 {
|
if atomic.LoadInt32(&d.committed) == 0 {
|
||||||
latest = results[len(results)-1].Header
|
latest := results[len(results)-1].Header
|
||||||
if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
|
// If the height is above the pivot block by 2 sets, it means the pivot
|
||||||
log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
|
// become stale in the network and it was garbage collected, move to a
|
||||||
pivot = height - uint64(fsMinFullBlocks)
|
// new pivot.
|
||||||
|
//
|
||||||
|
// Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those
|
||||||
|
// need to be taken into account, otherwise we're detecting the pivot move
|
||||||
|
// late and will drop peers due to unavailable state!!!
|
||||||
|
if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) {
|
||||||
|
log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay))
|
||||||
|
pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted
|
||||||
|
|
||||||
|
d.pivotLock.Lock()
|
||||||
|
d.pivotHeader = pivot
|
||||||
|
d.pivotLock.Unlock()
|
||||||
|
|
||||||
// Write out the pivot into the database so a rollback beyond it will
|
// Write out the pivot into the database so a rollback beyond it will
|
||||||
// reenable fast sync
|
// reenable fast sync
|
||||||
rawdb.WriteLastPivotNumber(d.stateDB, pivot)
|
rawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
P, beforeP, afterP := splitAroundPivot(pivot, results)
|
P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results)
|
||||||
if err := d.commitFastSyncData(beforeP, sync); err != nil {
|
if err := d.commitFastSyncData(beforeP, sync); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -427,11 +427,7 @@ func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
|
|||||||
// origin; associated with a particular peer in the download tester. The returned
|
// origin; associated with a particular peer in the download tester. The returned
|
||||||
// function can be used to retrieve batches of headers from the particular peer.
|
// function can be used to retrieve batches of headers from the particular peer.
|
||||||
func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
|
func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
|
||||||
if reverse {
|
result := dlp.chain.headersByHash(origin, amount, skip, reverse)
|
||||||
panic("reverse header requests not supported")
|
|
||||||
}
|
|
||||||
|
|
||||||
result := dlp.chain.headersByHash(origin, amount, skip)
|
|
||||||
go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
|
go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -440,11 +436,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i
|
|||||||
// origin; associated with a particular peer in the download tester. The returned
|
// origin; associated with a particular peer in the download tester. The returned
|
||||||
// function can be used to retrieve batches of headers from the particular peer.
|
// function can be used to retrieve batches of headers from the particular peer.
|
||||||
func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
|
func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
|
||||||
if reverse {
|
result := dlp.chain.headersByNumber(origin, amount, skip, reverse)
|
||||||
panic("reverse header requests not supported")
|
|
||||||
}
|
|
||||||
|
|
||||||
result := dlp.chain.headersByNumber(origin, amount, skip)
|
|
||||||
go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
|
go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -1698,7 +1690,7 @@ func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) {
|
|||||||
if mode == FastSync || mode == LightSync {
|
if mode == FastSync || mode == LightSync {
|
||||||
expect = errUnsyncedPeer
|
expect = errUnsyncedPeer
|
||||||
}
|
}
|
||||||
if err := tester.sync("peer", nil, mode); err != expect {
|
if err := tester.sync("peer", nil, mode); !errors.Is(err, expect) {
|
||||||
t.Fatalf("block sync error mismatch: have %v, want %v", err, expect)
|
t.Fatalf("block sync error mismatch: have %v, want %v", err, expect)
|
||||||
}
|
}
|
||||||
if mode == FastSync || mode == LightSync {
|
if mode == FastSync || mode == LightSync {
|
||||||
|
@ -170,20 +170,29 @@ func (tc *testChain) td(hash common.Hash) *big.Int {
|
|||||||
return tc.tdm[hash]
|
return tc.tdm[hash]
|
||||||
}
|
}
|
||||||
|
|
||||||
// headersByHash returns headers in ascending order from the given hash.
|
// headersByHash returns headers in order from the given hash.
|
||||||
func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int) []*types.Header {
|
func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int, reverse bool) []*types.Header {
|
||||||
num, _ := tc.hashToNumber(origin)
|
num, _ := tc.hashToNumber(origin)
|
||||||
return tc.headersByNumber(num, amount, skip)
|
return tc.headersByNumber(num, amount, skip, reverse)
|
||||||
}
|
}
|
||||||
|
|
||||||
// headersByNumber returns headers in ascending order from the given number.
|
// headersByNumber returns headers from the given number.
|
||||||
func (tc *testChain) headersByNumber(origin uint64, amount int, skip int) []*types.Header {
|
func (tc *testChain) headersByNumber(origin uint64, amount int, skip int, reverse bool) []*types.Header {
|
||||||
result := make([]*types.Header, 0, amount)
|
result := make([]*types.Header, 0, amount)
|
||||||
|
|
||||||
|
if !reverse {
|
||||||
for num := origin; num < uint64(len(tc.chain)) && len(result) < amount; num += uint64(skip) + 1 {
|
for num := origin; num < uint64(len(tc.chain)) && len(result) < amount; num += uint64(skip) + 1 {
|
||||||
if header, ok := tc.headerm[tc.chain[int(num)]]; ok {
|
if header, ok := tc.headerm[tc.chain[int(num)]]; ok {
|
||||||
result = append(result, header)
|
result = append(result, header)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
for num := int64(origin); num >= 0 && len(result) < amount; num -= int64(skip) + 1 {
|
||||||
|
if header, ok := tc.headerm[tc.chain[int(num)]]; ok {
|
||||||
|
result = append(result, header)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user