|
|
|
@ -23,7 +23,6 @@ import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"math"
|
|
|
|
|
"math/big"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
@ -248,9 +247,10 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea
|
|
|
|
|
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
|
|
|
|
|
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
|
|
|
|
|
|
|
|
|
|
log.Trace(fmt.Sprint("Registering peer", id))
|
|
|
|
|
if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
|
|
|
|
|
log.Error(fmt.Sprint("Register failed:", err))
|
|
|
|
|
logger := log.New("peer", id)
|
|
|
|
|
logger.Trace("Registering sync peer")
|
|
|
|
|
if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData, logger)); err != nil {
|
|
|
|
|
logger.Error("Failed to register sync peer", "err", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
d.qosReduceConfidence()
|
|
|
|
@ -263,9 +263,10 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea
|
|
|
|
|
// the queue.
|
|
|
|
|
func (d *Downloader) UnregisterPeer(id string) error {
|
|
|
|
|
// Unregister the peer from the active peer set and revoke any fetch tasks
|
|
|
|
|
log.Trace(fmt.Sprint("Unregistering peer", id))
|
|
|
|
|
logger := log.New("peer", id)
|
|
|
|
|
logger.Trace("Unregistering sync peer")
|
|
|
|
|
if err := d.peers.Unregister(id); err != nil {
|
|
|
|
|
log.Error(fmt.Sprint("Unregister failed:", err))
|
|
|
|
|
logger.Error("Failed to unregister sync peer", "err", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
d.queue.Revoke(id)
|
|
|
|
@ -284,24 +285,19 @@ func (d *Downloader) UnregisterPeer(id string) error {
|
|
|
|
|
// Synchronise tries to sync up our local block chain with a remote peer, both
|
|
|
|
|
// adding various sanity checks as well as wrapping it with various log entries.
|
|
|
|
|
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
|
|
|
|
|
log.Trace(fmt.Sprintf("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td))
|
|
|
|
|
|
|
|
|
|
err := d.synchronise(id, head, td, mode)
|
|
|
|
|
switch err {
|
|
|
|
|
case nil:
|
|
|
|
|
log.Trace(fmt.Sprintf("Synchronisation completed"))
|
|
|
|
|
|
|
|
|
|
case errBusy:
|
|
|
|
|
log.Trace(fmt.Sprintf("Synchronisation already in progress"))
|
|
|
|
|
|
|
|
|
|
case errTimeout, errBadPeer, errStallingPeer,
|
|
|
|
|
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
|
|
|
|
|
errInvalidAncestor, errInvalidChain:
|
|
|
|
|
log.Debug(fmt.Sprintf("Removing peer %v: %v", id, err))
|
|
|
|
|
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
|
|
|
|
|
d.dropPeer(id)
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
log.Warn(fmt.Sprintf("Synchronisation failed: %v", err))
|
|
|
|
|
log.Warn("Synchronisation failed, retrying", "err", err)
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -322,7 +318,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
|
|
|
|
|
|
|
|
|
|
// Post a user notification of the sync (only once per session)
|
|
|
|
|
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
|
|
|
|
|
log.Info(fmt.Sprint("Block synchronisation started"))
|
|
|
|
|
log.Info("Block synchronisation started")
|
|
|
|
|
}
|
|
|
|
|
// Reset the queue, peer set and wake channels to clean any internal leftover state
|
|
|
|
|
d.queue.Reset()
|
|
|
|
@ -387,9 +383,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
|
|
|
|
|
return errTooOld
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug(fmt.Sprintf("Synchronising with the network using: %s [eth/%d]", p.id, p.version))
|
|
|
|
|
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash.Hex()[2:10], "td", td, "mode", d.mode)
|
|
|
|
|
defer func(start time.Time) {
|
|
|
|
|
log.Debug(fmt.Sprintf("Synchronisation terminated after %v", time.Since(start)))
|
|
|
|
|
log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
|
|
|
|
|
}(time.Now())
|
|
|
|
|
|
|
|
|
|
// Look up the sync boundaries: the common ancestor and the target block
|
|
|
|
@ -437,7 +433,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
|
|
|
|
|
origin = 0
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
log.Debug(fmt.Sprintf("Fast syncing until pivot block #%d", pivot))
|
|
|
|
|
log.Debug("Fast syncing until pivot block", "pivot", pivot)
|
|
|
|
|
}
|
|
|
|
|
d.queue.Prepare(origin+1, d.mode, pivot, latest)
|
|
|
|
|
if d.syncInitHook != nil {
|
|
|
|
@ -522,13 +518,14 @@ func (d *Downloader) Terminate() {
|
|
|
|
|
// fetchHeight retrieves the head header of the remote peer to aid in estimating
|
|
|
|
|
// the total time a pending synchronisation would take.
|
|
|
|
|
func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: retrieving remote chain height", p))
|
|
|
|
|
p.logger.Debug("Retrieving remote chain height")
|
|
|
|
|
|
|
|
|
|
// Request the advertised remote head block and wait for the response
|
|
|
|
|
head, _ := p.currentHead()
|
|
|
|
|
go p.getRelHeaders(head, 1, 0, false)
|
|
|
|
|
|
|
|
|
|
timeout := time.After(d.requestTTL())
|
|
|
|
|
ttl := d.requestTTL()
|
|
|
|
|
timeout := time.After(ttl)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-d.cancelCh:
|
|
|
|
@ -537,19 +534,21 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
|
|
|
|
|
case packet := <-d.headerCh:
|
|
|
|
|
// Discard anything not from the origin peer
|
|
|
|
|
if packet.PeerId() != p.id {
|
|
|
|
|
log.Debug(fmt.Sprintf("Received headers from incorrect peer(%s)", packet.PeerId()))
|
|
|
|
|
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
// Make sure the peer actually gave something valid
|
|
|
|
|
headers := packet.(*headerPack).headers
|
|
|
|
|
if len(headers) != 1 {
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: invalid number of head headers: %d != 1", p, len(headers)))
|
|
|
|
|
p.logger.Debug("Multiple headers for single request", "headers", len(headers))
|
|
|
|
|
return nil, errBadPeer
|
|
|
|
|
}
|
|
|
|
|
return headers[0], nil
|
|
|
|
|
head := headers[0]
|
|
|
|
|
p.logger.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash().Hex()[2:10])
|
|
|
|
|
return head, nil
|
|
|
|
|
|
|
|
|
|
case <-timeout:
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: head header timeout", p))
|
|
|
|
|
p.logger.Debug("Waiting for head header timed out", "elapsed", ttl)
|
|
|
|
|
return nil, errTimeout
|
|
|
|
|
|
|
|
|
|
case <-d.bodyCh:
|
|
|
|
@ -566,10 +565,10 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
|
|
|
|
|
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
|
|
|
|
|
// the head links match), we do a binary search to find the common ancestor.
|
|
|
|
|
func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: looking for common ancestor (remote height %d)", p, height))
|
|
|
|
|
|
|
|
|
|
// Figure out the valid ancestor range to prevent rewrite attacks
|
|
|
|
|
floor, ceil := int64(-1), d.headHeader().Number.Uint64()
|
|
|
|
|
|
|
|
|
|
p.logger.Debug("Looking for common ancestor", "local", ceil, "remote", height)
|
|
|
|
|
if d.mode == FullSync {
|
|
|
|
|
ceil = d.headBlock().NumberU64()
|
|
|
|
|
} else if d.mode == FastSync {
|
|
|
|
@ -597,7 +596,9 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
|
|
|
|
|
// Wait for the remote response to the head fetch
|
|
|
|
|
number, hash := uint64(0), common.Hash{}
|
|
|
|
|
timeout := time.After(d.requestTTL())
|
|
|
|
|
|
|
|
|
|
ttl := d.requestTTL()
|
|
|
|
|
timeout := time.After(ttl)
|
|
|
|
|
|
|
|
|
|
for finished := false; !finished; {
|
|
|
|
|
select {
|
|
|
|
@ -607,19 +608,19 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
case packet := <-d.headerCh:
|
|
|
|
|
// Discard anything not from the origin peer
|
|
|
|
|
if packet.PeerId() != p.id {
|
|
|
|
|
log.Debug(fmt.Sprintf("Received headers from incorrect peer(%s)", packet.PeerId()))
|
|
|
|
|
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
// Make sure the peer actually gave something valid
|
|
|
|
|
headers := packet.(*headerPack).headers
|
|
|
|
|
if len(headers) == 0 {
|
|
|
|
|
log.Warn(fmt.Sprintf("%v: empty head header set", p))
|
|
|
|
|
p.logger.Warn("Empty head header set")
|
|
|
|
|
return 0, errEmptyHeaderSet
|
|
|
|
|
}
|
|
|
|
|
// Make sure the peer's reply conforms to the request
|
|
|
|
|
for i := 0; i < len(headers); i++ {
|
|
|
|
|
if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
|
|
|
|
|
log.Warn(fmt.Sprintf("%v: head header set (item %d) broke chain ordering: requested %d, got %d", p, i, from+int64(i)*16, number))
|
|
|
|
|
p.logger.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
|
|
|
|
|
return 0, errInvalidChain
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -636,7 +637,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
|
|
|
|
|
// If every header is known, even future ones, the peer straight out lied about its head
|
|
|
|
|
if number > height && i == limit-1 {
|
|
|
|
|
log.Warn(fmt.Sprintf("%v: lied about chain head: reported %d, found above %d", p, height, number))
|
|
|
|
|
p.logger.Warn("Lied about chain head", "reported", height, "found", number)
|
|
|
|
|
return 0, errStallingPeer
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
@ -644,7 +645,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case <-timeout:
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: head header timeout", p))
|
|
|
|
|
p.logger.Debug("Waiting for head header timed out", "elapsed", ttl)
|
|
|
|
|
return 0, errTimeout
|
|
|
|
|
|
|
|
|
|
case <-d.bodyCh:
|
|
|
|
@ -656,10 +657,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
// If the head fetch already found an ancestor, return
|
|
|
|
|
if !common.EmptyHash(hash) {
|
|
|
|
|
if int64(number) <= floor {
|
|
|
|
|
log.Warn(fmt.Sprintf("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor))
|
|
|
|
|
p.logger.Warn("Ancestor below allowance", "number", number, "hash", hash.Hex()[2:10], "allowance", floor)
|
|
|
|
|
return 0, errInvalidAncestor
|
|
|
|
|
}
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: common ancestor: #%d [%x…]", p, number, hash[:4]))
|
|
|
|
|
p.logger.Debug("Found common ancestor", "number", number, "hash", hash.Hex()[2:10])
|
|
|
|
|
return number, nil
|
|
|
|
|
}
|
|
|
|
|
// Ancestor not found, we need to binary search over our chain
|
|
|
|
@ -671,7 +672,9 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
// Split our chain interval in two, and request the hash to cross check
|
|
|
|
|
check := (start + end) / 2
|
|
|
|
|
|
|
|
|
|
timeout := time.After(d.requestTTL())
|
|
|
|
|
ttl := d.requestTTL()
|
|
|
|
|
timeout := time.After(ttl)
|
|
|
|
|
|
|
|
|
|
go p.getAbsHeaders(uint64(check), 1, 0, false)
|
|
|
|
|
|
|
|
|
|
// Wait until a reply arrives to this request
|
|
|
|
@ -683,13 +686,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
case packer := <-d.headerCh:
|
|
|
|
|
// Discard anything not from the origin peer
|
|
|
|
|
if packer.PeerId() != p.id {
|
|
|
|
|
log.Debug(fmt.Sprintf("Received headers from incorrect peer(%s)", packer.PeerId()))
|
|
|
|
|
log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
// Make sure the peer actually gave something valid
|
|
|
|
|
headers := packer.(*headerPack).headers
|
|
|
|
|
if len(headers) != 1 {
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: invalid search header set (%d)", p, len(headers)))
|
|
|
|
|
p.logger.Debug("Multiple headers for single request", "headers", len(headers))
|
|
|
|
|
return 0, errBadPeer
|
|
|
|
|
}
|
|
|
|
|
arrived = true
|
|
|
|
@ -701,13 +704,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
}
|
|
|
|
|
header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
|
|
|
|
|
if header.Number.Uint64() != check {
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check))
|
|
|
|
|
p.logger.Debug("Received non requested header", "number", header.Number, "hash", header.Hash().Hex()[2:10], "request", check)
|
|
|
|
|
return 0, errBadPeer
|
|
|
|
|
}
|
|
|
|
|
start = check
|
|
|
|
|
|
|
|
|
|
case <-timeout:
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: search header timeout", p))
|
|
|
|
|
p.logger.Debug("Waiting for search header timed out", "elapsed", ttl)
|
|
|
|
|
return 0, errTimeout
|
|
|
|
|
|
|
|
|
|
case <-d.bodyCh:
|
|
|
|
@ -719,10 +722,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
}
|
|
|
|
|
// Ensure valid ancestry and return
|
|
|
|
|
if int64(start) <= floor {
|
|
|
|
|
log.Warn(fmt.Sprintf("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, start, hash[:4], floor))
|
|
|
|
|
p.logger.Warn("Ancestor below allowance", "number", start, "hash", hash.Hex()[2:10], "allowance", floor)
|
|
|
|
|
return 0, errInvalidAncestor
|
|
|
|
|
}
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: common ancestor: #%d [%x…]", p, start, hash[:4]))
|
|
|
|
|
p.logger.Debug("Found common ancestor", "number", start, "hash", hash.Hex()[2:10])
|
|
|
|
|
return start, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -735,8 +738,8 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
|
|
|
|
|
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
|
|
|
|
|
// the origin is dropped.
|
|
|
|
|
func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: directing header downloads from #%d", p, from))
|
|
|
|
|
defer log.Debug(fmt.Sprintf("%v: header download terminated", p))
|
|
|
|
|
p.logger.Debug("Directing header downloads", "origin", from)
|
|
|
|
|
defer p.logger.Debug("Header download terminated")
|
|
|
|
|
|
|
|
|
|
// Create a timeout timer, and the associated header fetcher
|
|
|
|
|
skeleton := true // Skeleton assembly phase or finishing up
|
|
|
|
@ -745,15 +748,18 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
|
|
|
|
<-timeout.C // timeout channel should be initially empty
|
|
|
|
|
defer timeout.Stop()
|
|
|
|
|
|
|
|
|
|
var ttl time.Duration
|
|
|
|
|
getHeaders := func(from uint64) {
|
|
|
|
|
request = time.Now()
|
|
|
|
|
timeout.Reset(d.requestTTL())
|
|
|
|
|
|
|
|
|
|
ttl = d.requestTTL()
|
|
|
|
|
timeout.Reset(ttl)
|
|
|
|
|
|
|
|
|
|
if skeleton {
|
|
|
|
|
log.Trace(fmt.Sprintf("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from))
|
|
|
|
|
p.logger.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
|
|
|
|
|
go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
|
|
|
|
|
} else {
|
|
|
|
|
log.Trace(fmt.Sprintf("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from))
|
|
|
|
|
p.logger.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
|
|
|
|
|
go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -768,7 +774,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
|
|
|
|
case packet := <-d.headerCh:
|
|
|
|
|
// Make sure the active peer is giving us the skeleton headers
|
|
|
|
|
if packet.PeerId() != p.id {
|
|
|
|
|
log.Debug(fmt.Sprintf("Received skeleton headers from incorrect peer (%s)", packet.PeerId()))
|
|
|
|
|
log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
headerReqTimer.UpdateSince(request)
|
|
|
|
@ -782,7 +788,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
|
|
|
|
}
|
|
|
|
|
// If no more headers are inbound, notify the content fetchers and return
|
|
|
|
|
if packet.Items() == 0 {
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: no available headers", p))
|
|
|
|
|
p.logger.Debug("No more headers available")
|
|
|
|
|
select {
|
|
|
|
|
case d.headerProcCh <- nil:
|
|
|
|
|
return nil
|
|
|
|
@ -796,7 +802,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
|
|
|
|
if skeleton {
|
|
|
|
|
filled, proced, err := d.fillHeaderSkeleton(from, headers)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: skeleton chain invalid: %v", p, err))
|
|
|
|
|
p.logger.Debug("Skeleton chain invalid", "err", err)
|
|
|
|
|
return errInvalidChain
|
|
|
|
|
}
|
|
|
|
|
headers = filled[proced:]
|
|
|
|
@ -804,7 +810,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
|
|
|
|
}
|
|
|
|
|
// Insert all the new headers and fetch the next batch
|
|
|
|
|
if len(headers) > 0 {
|
|
|
|
|
log.Trace(fmt.Sprintf("%v: schedule %d headers from #%d", p, len(headers), from))
|
|
|
|
|
p.logger.Trace("Scheduling new headers", "count", len(headers), "from", from)
|
|
|
|
|
select {
|
|
|
|
|
case d.headerProcCh <- headers:
|
|
|
|
|
case <-d.cancelCh:
|
|
|
|
@ -816,7 +822,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
|
|
|
|
|
|
|
|
|
case <-timeout.C:
|
|
|
|
|
// Header retrieval timed out, consider the peer bad and drop
|
|
|
|
|
log.Debug(fmt.Sprintf("%v: header request timed out", p))
|
|
|
|
|
p.logger.Debug("Header request timed out", "elapsed", ttl)
|
|
|
|
|
headerTimeoutMeter.Mark(1)
|
|
|
|
|
d.dropPeer(p.id)
|
|
|
|
|
|
|
|
|
@ -846,7 +852,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
|
|
|
|
|
// The method returs the entire filled skeleton and also the number of headers
|
|
|
|
|
// already forwarded for processing.
|
|
|
|
|
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
|
|
|
|
|
log.Debug(fmt.Sprintf("Filling up skeleton from #%d", from))
|
|
|
|
|
log.Debug("Filling up skeleton", "from", from)
|
|
|
|
|
d.queue.ScheduleSkeleton(from, skeleton)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
@ -865,9 +871,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
|
|
|
|
|
)
|
|
|
|
|
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
|
|
|
|
|
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
|
|
|
|
|
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")
|
|
|
|
|
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
|
|
|
|
|
|
|
|
|
|
log.Debug(fmt.Sprintf("Skeleton fill terminated: %v", err))
|
|
|
|
|
log.Debug("Skeleton fill terminated", "err", err)
|
|
|
|
|
|
|
|
|
|
filled, proced := d.queue.RetrieveHeaders()
|
|
|
|
|
return filled, proced, err
|
|
|
|
@ -877,7 +883,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
|
|
|
|
|
// available peers, reserving a chunk of blocks for each, waiting for delivery
|
|
|
|
|
// and also periodically checking for timeouts.
|
|
|
|
|
func (d *Downloader) fetchBodies(from uint64) error {
|
|
|
|
|
log.Debug(fmt.Sprintf("Downloading block bodies from #%d", from))
|
|
|
|
|
log.Debug("Downloading block bodies", "origin", from)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
deliver = func(packet dataPack) (int, error) {
|
|
|
|
@ -891,9 +897,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
|
|
|
|
|
)
|
|
|
|
|
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
|
|
|
|
|
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
|
|
|
|
|
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "Body")
|
|
|
|
|
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
|
|
|
|
|
|
|
|
|
|
log.Debug(fmt.Sprintf("Block body download terminated: %v", err))
|
|
|
|
|
log.Debug("Block body download terminated", "err", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -901,7 +907,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
|
|
|
|
|
// available peers, reserving a chunk of receipts for each, waiting for delivery
|
|
|
|
|
// and also periodically checking for timeouts.
|
|
|
|
|
func (d *Downloader) fetchReceipts(from uint64) error {
|
|
|
|
|
log.Debug(fmt.Sprintf("Downloading receipts from #%d", from))
|
|
|
|
|
log.Debug("Downloading transaction receipts", "origin", from)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
deliver = func(packet dataPack) (int, error) {
|
|
|
|
@ -915,9 +921,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
|
|
|
|
|
)
|
|
|
|
|
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
|
|
|
|
|
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
|
|
|
|
|
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
|
|
|
|
|
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
|
|
|
|
|
|
|
|
|
|
log.Debug(fmt.Sprintf("Receipt download terminated: %v", err))
|
|
|
|
|
log.Debug("Transaction receipt download terminated", "err", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -925,7 +931,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
|
|
|
|
|
// available peers, reserving a chunk of nodes for each, waiting for delivery and
|
|
|
|
|
// also periodically checking for timeouts.
|
|
|
|
|
func (d *Downloader) fetchNodeData() error {
|
|
|
|
|
log.Debug(fmt.Sprintf("Downloading node state data"))
|
|
|
|
|
log.Debug("Downloading node state data")
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
deliver = func(packet dataPack) (int, error) {
|
|
|
|
@ -933,12 +939,12 @@ func (d *Downloader) fetchNodeData() error {
|
|
|
|
|
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
|
|
|
|
|
// If the peer returned old-requested data, forgive
|
|
|
|
|
if err == trie.ErrNotRequested {
|
|
|
|
|
log.Debug(fmt.Sprintf("peer %s: replied to stale state request, forgiving", packet.PeerId()))
|
|
|
|
|
log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
// If the node data processing failed, the root hash is very wrong, abort
|
|
|
|
|
log.Error(fmt.Sprintf("peer %s: state processing failed: %v", packet.PeerId(), err))
|
|
|
|
|
log.Error("State processing failed", "peer", packet.PeerId(), "err", err)
|
|
|
|
|
d.cancel()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@ -957,12 +963,12 @@ func (d *Downloader) fetchNodeData() error {
|
|
|
|
|
|
|
|
|
|
// If real database progress was made, reset any fast-sync pivot failure
|
|
|
|
|
if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
|
|
|
|
|
log.Debug(fmt.Sprintf("fast-sync progressed, resetting fail counter from %d", atomic.LoadUint32(&d.fsPivotFails)))
|
|
|
|
|
log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails))
|
|
|
|
|
atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
|
|
|
|
|
}
|
|
|
|
|
// Log a message to the user and return
|
|
|
|
|
if delivered > 0 {
|
|
|
|
|
log.Info(fmt.Sprintf("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending))
|
|
|
|
|
log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
@ -977,9 +983,9 @@ func (d *Downloader) fetchNodeData() error {
|
|
|
|
|
)
|
|
|
|
|
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
|
|
|
|
|
d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
|
|
|
|
|
d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "State")
|
|
|
|
|
d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states")
|
|
|
|
|
|
|
|
|
|
log.Debug(fmt.Sprintf("Node state data download terminated: %v", err))
|
|
|
|
|
log.Debug("Node state data download terminated", "err", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1044,11 +1050,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
|
|
|
|
|
// Issue a log to the user to see what's going on
|
|
|
|
|
switch {
|
|
|
|
|
case err == nil && packet.Items() == 0:
|
|
|
|
|
log.Trace(fmt.Sprintf("%s: no %s delivered", peer, strings.ToLower(kind)))
|
|
|
|
|
peer.logger.Trace("Requested data not delivered", "type", kind)
|
|
|
|
|
case err == nil:
|
|
|
|
|
log.Trace(fmt.Sprintf("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)))
|
|
|
|
|
peer.logger.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
|
|
|
|
|
default:
|
|
|
|
|
log.Trace(fmt.Sprintf("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err))
|
|
|
|
|
peer.logger.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Blocks assembled, try to update the progress
|
|
|
|
@ -1091,10 +1097,10 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
|
|
|
|
|
// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
|
|
|
|
|
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
|
|
|
|
|
if fails > 2 {
|
|
|
|
|
log.Trace(fmt.Sprintf("%s: %s delivery timeout", peer, strings.ToLower(kind)))
|
|
|
|
|
peer.logger.Trace("Data delivery timed out", "type", kind)
|
|
|
|
|
setIdle(peer, 0)
|
|
|
|
|
} else {
|
|
|
|
|
log.Debug(fmt.Sprintf("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind)))
|
|
|
|
|
peer.logger.Debug("Stalling delivery, dropping", "type", kind)
|
|
|
|
|
d.dropPeer(pid)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1102,7 +1108,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
|
|
|
|
|
// If there's nothing more to fetch, wait or terminate
|
|
|
|
|
if pending() == 0 {
|
|
|
|
|
if !inFlight() && finished {
|
|
|
|
|
log.Debug(fmt.Sprintf("%s fetching completed", kind))
|
|
|
|
|
log.Debug("Data fetching completed", "type", kind)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
@ -1130,15 +1136,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
|
|
|
|
|
if request == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
log.Trace("", "msg", log.Lazy{Fn: func() string {
|
|
|
|
|
if request.From > 0 {
|
|
|
|
|
return fmt.Sprintf("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From)
|
|
|
|
|
peer.logger.Trace("Requesting new batch of data", "type", kind, "from", request.From)
|
|
|
|
|
} else if len(request.Headers) > 0 {
|
|
|
|
|
return fmt.Sprintf("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
|
|
|
|
|
peer.logger.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
|
|
|
|
|
} else {
|
|
|
|
|
return fmt.Sprintf("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
|
|
|
|
|
peer.logger.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes))
|
|
|
|
|
}
|
|
|
|
|
}})
|
|
|
|
|
// Fetch the chunk and make sure any errors return the hashes to the queue
|
|
|
|
|
if fetchHook != nil {
|
|
|
|
|
fetchHook(request.Headers)
|
|
|
|
@ -1149,7 +1153,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
|
|
|
|
|
// case, the internal state of the downloader and the queue is very wrong so
|
|
|
|
|
// better hard crash and note the error instead of silently accumulating into
|
|
|
|
|
// a much bigger issue.
|
|
|
|
|
panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, strings.ToLower(kind)))
|
|
|
|
|
panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
|
|
|
|
|
}
|
|
|
|
|
running = true
|
|
|
|
|
}
|
|
|
|
@ -1193,8 +1197,10 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
|
|
|
|
|
if d.headBlock != nil {
|
|
|
|
|
curBlock = d.headBlock().Number()
|
|
|
|
|
}
|
|
|
|
|
log.Warn(fmt.Sprintf("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
|
|
|
|
|
len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, curFastBlock, lastBlock, curBlock))
|
|
|
|
|
log.Warn("Rolled back headers", "count", len(hashes),
|
|
|
|
|
"header", fmt.Sprintf("%d->%d", lastHeader, d.headHeader().Number),
|
|
|
|
|
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
|
|
|
|
|
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
|
|
|
|
|
|
|
|
|
|
// If we're already past the pivot point, this could be an attack, thread carefully
|
|
|
|
|
if rollback[len(rollback)-1].Number.Uint64() > pivot {
|
|
|
|
@ -1202,7 +1208,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
|
|
|
|
|
if atomic.LoadUint32(&d.fsPivotFails) == 0 {
|
|
|
|
|
for _, header := range rollback {
|
|
|
|
|
if header.Number.Uint64() == pivot {
|
|
|
|
|
log.Warn(fmt.Sprintf("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4]))
|
|
|
|
|
log.Warn("Fast-sync critical section failure, locked pivot to header", "number", pivot, "hash", header.Hash().Hex()[2:10])
|
|
|
|
|
d.fsPivotLock = header
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1298,7 +1304,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
|
|
|
|
|
if n > 0 {
|
|
|
|
|
rollback = append(rollback, chunk[:n]...)
|
|
|
|
|
}
|
|
|
|
|
log.Debug(fmt.Sprintf("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err))
|
|
|
|
|
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash().Hex()[2:10], "err", err)
|
|
|
|
|
return errInvalidChain
|
|
|
|
|
}
|
|
|
|
|
// All verifications passed, store newly found uncertain headers
|
|
|
|
@ -1310,7 +1316,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
|
|
|
|
|
// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
|
|
|
|
|
if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
|
|
|
|
|
if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
|
|
|
|
|
log.Warn(fmt.Sprintf("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4]))
|
|
|
|
|
log.Warn("Pivot doesn't match locked in one", "remoteNumber", pivot.Number, "remoteHash", pivot.Hash().Hex()[2:10], "localNumber", d.fsPivotLock.Number, "localHash", d.fsPivotLock.Hash().Hex()[2:10])
|
|
|
|
|
return errInvalidChain
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1327,7 +1333,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
|
|
|
|
|
// Otherwise insert the headers for content retrieval
|
|
|
|
|
inserts := d.queue.Schedule(chunk, origin)
|
|
|
|
|
if len(inserts) != len(chunk) {
|
|
|
|
|
log.Debug(fmt.Sprintf("stale headers"))
|
|
|
|
|
log.Debug("Stale headers")
|
|
|
|
|
return errBadPeer
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1358,10 +1364,15 @@ func (d *Downloader) processContent() error {
|
|
|
|
|
d.chainInsertHook(results)
|
|
|
|
|
}
|
|
|
|
|
// Actually import the blocks
|
|
|
|
|
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
|
|
|
|
first, last := results[0].Header, results[len(results)-1].Header
|
|
|
|
|
return fmt.Sprintf("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
|
|
|
|
|
log.Debug("Inserting downloaded chain", "items", len(results),
|
|
|
|
|
"from", log.Lazy{Fn: func() string {
|
|
|
|
|
return fmt.Sprintf("#%d [%x…]", first.Number, first.Hash().Bytes()[:4])
|
|
|
|
|
}},
|
|
|
|
|
"till", log.Lazy{Fn: func() string {
|
|
|
|
|
return fmt.Sprintf("#%d [%x…]", last.Number, last.Hash().Bytes()[:4])
|
|
|
|
|
}})
|
|
|
|
|
|
|
|
|
|
for len(results) != 0 {
|
|
|
|
|
// Check for any termination requests
|
|
|
|
|
select {
|
|
|
|
@ -1395,14 +1406,14 @@ func (d *Downloader) processContent() error {
|
|
|
|
|
case len(receipts) > 0:
|
|
|
|
|
index, err = d.insertReceipts(blocks, receipts)
|
|
|
|
|
if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
|
|
|
|
|
log.Debug(fmt.Sprintf("Committing block #%d [%x…] as the new head", blocks[len(blocks)-1].Number(), blocks[len(blocks)-1].Hash().Bytes()[:4]))
|
|
|
|
|
log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash().Hex()[2:10])
|
|
|
|
|
index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
index, err = d.insertBlocks(blocks)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debug(fmt.Sprintf("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err))
|
|
|
|
|
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash().Hex()[2:10], "err", err)
|
|
|
|
|
return errInvalidChain
|
|
|
|
|
}
|
|
|
|
|
// Shift the results to the next batch
|
|
|
|
@ -1470,7 +1481,7 @@ func (d *Downloader) qosTuner() {
|
|
|
|
|
atomic.StoreUint64(&d.rttConfidence, conf)
|
|
|
|
|
|
|
|
|
|
// Log the new QoS values and sleep until the next RTT
|
|
|
|
|
log.Debug(fmt.Sprintf("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL()))
|
|
|
|
|
log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
|
|
|
|
|
select {
|
|
|
|
|
case <-d.quitCh:
|
|
|
|
|
return
|
|
|
|
@ -1500,7 +1511,7 @@ func (d *Downloader) qosReduceConfidence() {
|
|
|
|
|
atomic.StoreUint64(&d.rttConfidence, conf)
|
|
|
|
|
|
|
|
|
|
rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
|
|
|
|
|
log.Debug(fmt.Sprintf("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL()))
|
|
|
|
|
log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// requestRTT returns the current target round trip time for a download request
|
|
|
|
|