eth/downloader: make syncing error more obvious (#19413)

This commit is contained in:
gary rong 2019-06-05 20:00:46 +08:00 committed by Felix Lange
parent 645756cda5
commit 7641bbe535
3 changed files with 45 additions and 42 deletions

View File

@ -83,16 +83,11 @@ var (
errPeersUnavailable = errors.New("no peers available or all tried for download") errPeersUnavailable = errors.New("no peers available or all tried for download")
errInvalidAncestor = errors.New("retrieved ancestor is invalid") errInvalidAncestor = errors.New("retrieved ancestor is invalid")
errInvalidChain = errors.New("retrieved hash chain is invalid") errInvalidChain = errors.New("retrieved hash chain is invalid")
errInvalidBlock = errors.New("retrieved block is invalid")
errInvalidBody = errors.New("retrieved block body is invalid") errInvalidBody = errors.New("retrieved block body is invalid")
errInvalidReceipt = errors.New("retrieved receipt is invalid") errInvalidReceipt = errors.New("retrieved receipt is invalid")
errCancelBlockFetch = errors.New("block download canceled (requested)")
errCancelHeaderFetch = errors.New("block header download canceled (requested)")
errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)")
errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
errCancelContentProcessing = errors.New("content processing canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)")
errCanceled = errors.New("syncing canceled (requested)")
errNoSyncActive = errors.New("no sync active") errNoSyncActive = errors.New("no sync active")
errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)") errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
) )
@ -319,14 +314,6 @@ func (d *Downloader) UnregisterPeer(id string) error {
} }
d.queue.Revoke(id) d.queue.Revoke(id)
// If this peer was the master peer, abort sync immediately
d.cancelLock.RLock()
master := id == d.cancelPeer
d.cancelLock.RUnlock()
if master {
d.cancel()
}
return nil return nil
} }
@ -336,7 +323,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
err := d.synchronise(id, head, td, mode) err := d.synchronise(id, head, td, mode)
switch err { switch err {
case nil: case nil:
case errBusy: case errBusy, errCanceled:
case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer, case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer,
errEmptyHeaderSet, errPeersUnavailable, errTooOld, errEmptyHeaderSet, errPeersUnavailable, errTooOld,
@ -555,7 +542,7 @@ func (d *Downloader) spawnSync(fetchers []func() error) error {
// it has processed the queue. // it has processed the queue.
d.queue.Close() d.queue.Close()
} }
if err = <-errc; err != nil { if err = <-errc; err != nil && err != errCanceled {
break break
} }
} }
@ -621,7 +608,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
for { for {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return nil, errCancelBlockFetch return nil, errCanceled
case packet := <-d.headerCh: case packet := <-d.headerCh:
// Discard anything not from the origin peer // Discard anything not from the origin peer
@ -767,7 +754,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
for finished := false; !finished; { for finished := false; !finished; {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return 0, errCancelHeaderFetch return 0, errCanceled
case packet := <-d.headerCh: case packet := <-d.headerCh:
// Discard anything not from the origin peer // Discard anything not from the origin peer
@ -853,7 +840,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
for arrived := false; !arrived; { for arrived := false; !arrived; {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return 0, errCancelHeaderFetch return 0, errCanceled
case packer := <-d.headerCh: case packer := <-d.headerCh:
// Discard anything not from the origin peer // Discard anything not from the origin peer
@ -954,7 +941,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
for { for {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderFetch return errCanceled
case packet := <-d.headerCh: case packet := <-d.headerCh:
// Make sure the active peer is giving us the skeleton headers // Make sure the active peer is giving us the skeleton headers
@ -981,7 +968,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
getHeaders(from) getHeaders(from)
continue continue
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderFetch return errCanceled
} }
} }
// Pivot done (or not in fast sync) and no more headers, terminate the process // Pivot done (or not in fast sync) and no more headers, terminate the process
@ -990,7 +977,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
case d.headerProcCh <- nil: case d.headerProcCh <- nil:
return nil return nil
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderFetch return errCanceled
} }
} }
headers := packet.(*headerPack).headers headers := packet.(*headerPack).headers
@ -1041,7 +1028,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
select { select {
case d.headerProcCh <- headers: case d.headerProcCh <- headers:
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderFetch return errCanceled
} }
from += uint64(len(headers)) from += uint64(len(headers))
getHeaders(from) getHeaders(from)
@ -1053,7 +1040,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
getHeaders(from) getHeaders(from)
continue continue
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderFetch return errCanceled
} }
} }
@ -1112,7 +1099,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) } setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
) )
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers") nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
@ -1138,7 +1125,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
) )
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies") d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
@ -1162,7 +1149,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
) )
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts") d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
@ -1195,7 +1182,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks // - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log mesages // - kind: textual label of the type being downloaded to display in log mesages
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error), expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error { idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
@ -1211,7 +1198,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
for { for {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return errCancel return errCanceled
case packet := <-deliveryCh: case packet := <-deliveryCh:
// If the peer was previously banned and failed to deliver its pack // If the peer was previously banned and failed to deliver its pack
@ -1282,12 +1269,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
setIdle(peer, 0) setIdle(peer, 0)
} else { } else {
peer.log.Debug("Stalling delivery, dropping", "type", kind) peer.log.Debug("Stalling delivery, dropping", "type", kind)
if d.dropPeer == nil { if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy. // The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid) peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
} else { } else {
d.dropPeer(pid) d.dropPeer(pid)
// If this peer was the master peer, abort sync immediately
d.cancelLock.RLock()
master := pid == d.cancelPeer
d.cancelLock.RUnlock()
if master {
d.cancel()
return errTimeout
}
} }
} }
} }
@ -1392,7 +1390,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
for { for {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderProcessing return errCanceled
case headers := <-d.headerProcCh: case headers := <-d.headerProcCh:
// Terminate header processing if we synced up // Terminate header processing if we synced up
@ -1445,7 +1443,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// Terminate if something failed in between processing chunks // Terminate if something failed in between processing chunks
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderProcessing return errCanceled
default: default:
} }
// Select the next chunk of headers to import // Select the next chunk of headers to import
@ -1488,7 +1486,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderProcessing return errCanceled
case <-time.After(time.Second): case <-time.After(time.Second):
} }
} }
@ -1579,7 +1577,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync := d.syncState(latest.Root) stateSync := d.syncState(latest.Root)
defer stateSync.Cancel() defer stateSync.Cancel()
go func() { go func() {
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
d.queue.Close() // wake up Results d.queue.Close() // wake up Results
} }
}() }()
@ -1607,7 +1605,8 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
// If sync failed, stop // If sync failed, stop
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return stateSync.Cancel() stateSync.Cancel()
return errCanceled
default: default:
} }
} }
@ -1637,7 +1636,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync = d.syncState(P.Header.Root) stateSync = d.syncState(P.Header.Root)
defer stateSync.Cancel() defer stateSync.Cancel()
go func() { go func() {
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
d.queue.Close() // wake up Results d.queue.Close() // wake up Results
} }
}() }()

View File

@ -1114,14 +1114,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
{errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
{errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
{errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
{errInvalidBody, false}, // A bad peer was detected, but not the sync origin {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
{errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
} }
// Run the tests and check disconnection status // Run the tests and check disconnection status

View File

@ -302,7 +302,7 @@ func (s *stateSync) loop() (err error) {
return errCancelStateFetch return errCancelStateFetch
case <-s.d.cancelCh: case <-s.d.cancelCh:
return errCancelStateFetch return errCanceled
case req := <-s.deliver: case req := <-s.deliver:
// Response, disconnect or timeout triggered, drop the peer if stalling // Response, disconnect or timeout triggered, drop the peer if stalling
@ -317,6 +317,16 @@ func (s *stateSync) loop() (err error) {
req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id) req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id)
} else { } else {
s.d.dropPeer(req.peer.id) s.d.dropPeer(req.peer.id)
// If this peer was the master peer, abort sync immediately
s.d.cancelLock.RLock()
master := req.peer.id == s.d.cancelPeer
s.d.cancelLock.RUnlock()
if master {
s.d.cancel()
return errTimeout
}
} }
} }
// Process all the received blobs and check for stale delivery // Process all the received blobs and check for stale delivery