Merge pull request #1980 from fjl/downloader-deliver-hang

eth/downloader: don't hang for spurious deliveries
This commit is contained in:
Jeffrey Wilcke 2015-11-19 15:19:21 +01:00
commit 65bb07fb4e
4 changed files with 345 additions and 320 deletions

View File

@ -74,7 +74,6 @@ var (
errBadPeer = errors.New("action from bad peer ignored") errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling") errStallingPeer = errors.New("peer is stalling")
errNoPeers = errors.New("no peers to keep download active") errNoPeers = errors.New("no peers to keep download active")
errPendingQueue = errors.New("pending items in queue")
errTimeout = errors.New("timeout") errTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer") errEmptyHashSet = errors.New("empty hash set by peer")
errEmptyHeaderSet = errors.New("empty header set by peer") errEmptyHeaderSet = errors.New("empty header set by peer")
@ -90,6 +89,7 @@ var (
errCancelBodyFetch = errors.New("block body download canceled (requested)") errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)")
errCancelProcessing = errors.New("processing canceled (requested)")
errNoSyncActive = errors.New("no sync active") errNoSyncActive = errors.New("no sync active")
) )
@ -129,7 +129,6 @@ type Downloader struct {
// Status // Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32 synchronising int32
processing int32
notified int32 notified int32
// Channels // Channels
@ -215,7 +214,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) {
// Synchronising returns whether the downloader is currently retrieving blocks. // Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool { func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0 return atomic.LoadInt32(&d.synchronising) > 0
} }
// RegisterPeer injects a new download peer into the set of block source to be // RegisterPeer injects a new download peer into the set of block source to be
@ -263,9 +262,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
d.dropPeer(id) d.dropPeer(id)
case errPendingQueue:
glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
default: default:
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
} }
@ -290,10 +286,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started") glog.V(logger.Info).Infoln("Block synchronisation started")
} }
// Abort if the queue still contains some leftover data
if d.queue.GetHeadResult() != nil {
return errPendingQueue
}
// Reset the queue, peer set and wake channels to clean any internal leftover state // Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset() d.queue.Reset()
d.peers.Reset() d.peers.Reset()
@ -335,7 +327,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
defer func() { defer func() {
// reset on error // reset on error
if err != nil { if err != nil {
d.cancel()
d.mux.Post(FailedEvent{err}) d.mux.Post(FailedEvent{err})
} else { } else {
d.mux.Post(DoneEvent{}) d.mux.Post(DoneEvent{})
@ -366,22 +357,14 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.syncStatsLock.Unlock() d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent hash and block retrieval algorithm // Initiate the sync using a concurrent hash and block retrieval algorithm
d.queue.Prepare(origin+1, d.mode, 0)
if d.syncInitHook != nil { if d.syncInitHook != nil {
d.syncInitHook(origin, latest) d.syncInitHook(origin, latest)
} }
d.queue.Prepare(origin+1, d.mode, 0) return d.spawnSync(
func() error { return d.fetchHashes61(p, td, origin+1) },
errc := make(chan error, 2) func() error { return d.fetchBlocks61(origin + 1) },
go func() { errc <- d.fetchHashes61(p, td, origin+1) }() )
go func() { errc <- d.fetchBlocks61(origin + 1) }()
// If any fetcher fails, cancel the other
if err := <-errc; err != nil {
d.cancel()
<-errc
return err
}
return <-errc
case p.version >= 62: case p.version >= 62:
// Look up the sync boundaries: the common ancestor and the target block // Look up the sync boundaries: the common ancestor and the target block
@ -405,7 +388,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
switch d.mode { switch d.mode {
case LightSync: case LightSync:
pivot = latest pivot = latest
case FastSync: case FastSync:
// Calculate the new fast/slow sync pivot point // Calculate the new fast/slow sync pivot point
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
@ -426,34 +408,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
} }
d.queue.Prepare(origin+1, d.mode, pivot) d.queue.Prepare(origin+1, d.mode, pivot)
if d.syncInitHook != nil { if d.syncInitHook != nil {
d.syncInitHook(origin, latest) d.syncInitHook(origin, latest)
} }
errc := make(chan error, 4) return d.spawnSync(
go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
)
// If any fetcher fails, cancel the others
var fail error
for i := 0; i < cap(errc); i++ {
if err := <-errc; err != nil {
if fail == nil {
fail = err
d.cancel()
}
}
}
return fail
default: default:
// Something very wrong, stop right here // Something very wrong, stop right here
glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
return errBadPeer return errBadPeer
} }
return nil }
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(fetchers ...func() error) error {
var wg sync.WaitGroup
errc := make(chan error, len(fetchers)+1)
wg.Add(len(fetchers) + 1)
go func() { defer wg.Done(); errc <- d.process() }()
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
for i := 0; i < len(fetchers)+1; i++ {
if i == len(fetchers) {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
d.queue.Close()
}
if err = <-errc; err != nil {
break
}
}
d.queue.Close()
d.cancel()
wg.Wait()
return err
} }
// cancel cancels all of the operations and resets the queue. It returns true // cancel cancels all of the operations and resets the queue. It returns true
@ -470,12 +469,10 @@ func (d *Downloader) cancel() {
} }
} }
d.cancelLock.Unlock() d.cancelLock.Unlock()
// Reset the queue
d.queue.Reset()
} }
// Terminate interrupts the downloader, canceling all pending operations. // Terminate interrupts the downloader, canceling all pending operations.
// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() { func (d *Downloader) Terminate() {
atomic.StoreInt32(&d.interrupt, 1) atomic.StoreInt32(&d.interrupt, 1)
d.cancel() d.cancel()
@ -495,15 +492,6 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-d.cancelCh: case <-d.cancelCh:
return 0, errCancelBlockFetch return 0, errCancelBlockFetch
case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
case <-d.hashCh:
// Out of bounds hashes received, ignore them
case packet := <-d.blockCh: case packet := <-d.blockCh:
// Discard anything not from the origin peer // Discard anything not from the origin peer
if packet.PeerId() != p.id { if packet.PeerId() != p.id {
@ -521,6 +509,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-timeout: case <-timeout:
glog.V(logger.Debug).Infof("%v: head block timeout", p) glog.V(logger.Debug).Infof("%v: head block timeout", p)
return 0, errTimeout return 0, errTimeout
case <-d.hashCh:
// Out of bounds hashes received, ignore them
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -571,18 +569,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
} }
} }
case <-timeout:
glog.V(logger.Debug).Infof("%v: head hash timeout", p)
return 0, errTimeout
case <-d.blockCh: case <-d.blockCh:
// Out of bounds blocks received, ignore them // Out of bounds blocks received, ignore them
case <-d.headerCh: case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh: case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them case <-d.stateCh:
case <-d.receiptCh:
case <-timeout: // Ignore eth/{62,63} packets because this is eth/61.
glog.V(logger.Debug).Infof("%v: head hash timeout", p) // These can arrive as a late delivery from a previous sync.
return 0, errTimeout
} }
} }
// If the head fetch already found an ancestor, return // If the head fetch already found an ancestor, return
@ -631,18 +630,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
} }
start = check start = check
case <-timeout:
glog.V(logger.Debug).Infof("%v: search hash timeout", p)
return 0, errTimeout
case <-d.blockCh: case <-d.blockCh:
// Out of bounds blocks received, ignore them // Out of bounds blocks received, ignore them
case <-d.headerCh: case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh: case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them case <-d.stateCh:
case <-d.receiptCh:
case <-timeout: // Ignore eth/{62,63} packets because this is eth/61.
glog.V(logger.Debug).Infof("%v: search hash timeout", p) // These can arrive as a late delivery from a previous sync.
return 0, errTimeout
} }
} }
} }
@ -676,12 +676,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHashFetch return errCancelHashFetch
case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
case packet := <-d.hashCh: case packet := <-d.hashCh:
// Make sure the active peer is giving us the hashes // Make sure the active peer is giving us the hashes
if packet.PeerId() != p.id { if packet.PeerId() != p.id {
@ -750,6 +744,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: hash request timed out", p) glog.V(logger.Debug).Infof("%v: hash request timed out", p)
hashTimeoutMeter.Mark(1) hashTimeoutMeter.Mark(1)
return errTimeout return errTimeout
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -774,12 +775,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelBlockFetch return errCancelBlockFetch
case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
case packet := <-d.blockCh: case packet := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack // If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message. // in a reasonable time frame, ignore it's message.
@ -800,7 +795,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
peer.Promote() peer.Promote()
peer.SetBlocksIdle() peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
go d.process()
case errInvalidChain: case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort // The hash chain is invalid (blocks are not ordered properly), abort
@ -826,7 +820,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
peer.Demote() peer.Demote()
peer.SetBlocksIdle() peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
go d.process()
} }
} }
// Blocks arrived, try to update the progress // Blocks arrived, try to update the progress
@ -909,6 +902,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
if !throttled && !d.queue.InFlightBlocks() && len(idles) == total { if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
return errPeersUnavailable return errPeersUnavailable
} }
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -941,18 +941,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
} }
return headers[0].Number.Uint64(), nil return headers[0].Number.Uint64(), nil
case <-d.bodyCh:
// Out of bounds block bodies received, ignore them
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case <-timeout: case <-timeout:
glog.V(logger.Debug).Infof("%v: head header timeout", p) glog.V(logger.Debug).Infof("%v: head header timeout", p)
return 0, errTimeout return 0, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1008,18 +1009,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
} }
} }
case <-d.bodyCh:
// Out of bounds block bodies received, ignore them
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case <-timeout: case <-timeout:
glog.V(logger.Debug).Infof("%v: head header timeout", p) glog.V(logger.Debug).Infof("%v: head header timeout", p)
return 0, errTimeout return 0, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
// If the head fetch already found an ancestor, return // If the head fetch already found an ancestor, return
@ -1068,18 +1070,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
} }
start = check start = check
case <-d.bodyCh:
// Out of bounds block bodies received, ignore them
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case <-timeout: case <-timeout:
glog.V(logger.Debug).Infof("%v: search header timeout", p) glog.V(logger.Debug).Infof("%v: search header timeout", p)
return 0, errTimeout return 0, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1141,12 +1144,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderFetch return errCancelHeaderFetch
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case packet := <-d.headerCh: case packet := <-d.headerCh:
// Make sure the active peer is giving us the headers // Make sure the active peer is giving us the headers
if packet.PeerId() != p.id { if packet.PeerId() != p.id {
@ -1268,6 +1265,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
} }
} }
return nil return nil
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1336,10 +1338,8 @@ func (d *Downloader) fetchNodeData() error {
d.cancel() d.cancel()
return return
} }
// Processing succeeded, notify state fetcher and processor of continuation // Processing succeeded, notify state fetcher of continuation
if d.queue.PendingNodeData() == 0 { if d.queue.PendingNodeData() > 0 {
go d.process()
} else {
select { select {
case d.stateWakeCh <- true: case d.stateWakeCh <- true:
default: default:
@ -1348,7 +1348,6 @@ func (d *Downloader) fetchNodeData() error {
// Log a message to the user and return // Log a message to the user and return
d.syncStatsLock.Lock() d.syncStatsLock.Lock()
defer d.syncStatsLock.Unlock() defer d.syncStatsLock.Unlock()
d.syncStatsStateDone += uint64(delivered) d.syncStatsStateDone += uint64(delivered)
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
}) })
@ -1391,12 +1390,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
case <-d.cancelCh: case <-d.cancelCh:
return errCancel return errCancel
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case packet := <-deliveryCh: case packet := <-deliveryCh:
// If the peer was previously banned and failed to deliver it's pack // If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message. // in a reasonable time frame, ignore it's message.
@ -1415,7 +1408,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
peer.Promote() peer.Promote()
setIdle(peer) setIdle(peer)
glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
go d.process()
case errInvalidChain: case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort // The hash chain is invalid (blocks are not ordered properly), abort
@ -1441,7 +1433,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
peer.Demote() peer.Demote()
setIdle(peer) setIdle(peer)
glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
go d.process()
} }
} }
// Blocks assembled, try to update the progress // Blocks assembled, try to update the progress
@ -1508,7 +1499,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
} }
if progress { if progress {
progressed = true progressed = true
go d.process()
} }
if request == nil { if request == nil {
continue continue
@ -1540,51 +1530,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
return errPeersUnavailable return errPeersUnavailable
} }
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
// process takes fetch results from the queue and tries to import them into the // process takes fetch results from the queue and tries to import them into the
// chain. The type of import operation will depend on the result contents: // chain. The type of import operation will depend on the result contents.
// - func (d *Downloader) process() error {
// pivot := d.queue.FastSyncPivot()
// The algorithmic flow is as follows:
// - The `processing` flag is swapped to 1 to ensure singleton access
// - The current `cancel` channel is retrieved to detect sync abortions
// - Blocks are iteratively taken from the cache and inserted into the chain
// - When the cache becomes empty, insertion stops
// - The `processing` flag is swapped back to 0
// - A post-exit check is made whether new blocks became available
// - This step is important: it handles a potential race condition between
// checking for no more work, and releasing the processing "mutex". In
// between these state changes, a block may have arrived, but a processing
// attempt denied, so we need to re-enter to ensure the block isn't left
// to idle in the cache.
func (d *Downloader) process() {
// Make sure only one goroutine is ever allowed to process blocks at once
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
return
}
// If the processor just exited, but there are freshly pending items, try to
// reenter. This is needed because the goroutine spinned up for processing
// the fresh results might have been rejected entry to to this present thread
// not yet releasing the `processing` state.
defer func() {
if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
d.process()
}
}()
// Release the lock upon exit (note, before checking for reentry!)
// the import statistics to zero.
defer atomic.StoreInt32(&d.processing, 0)
// Repeat the processing as long as there are results to process
for { for {
// Fetch the next batch of results results := d.queue.WaitResults()
pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race
results := d.queue.TakeResults()
if len(results) == 0 { if len(results) == 0 {
return return nil // queue empty
} }
if d.chainInsertHook != nil { if d.chainInsertHook != nil {
d.chainInsertHook(results) d.chainInsertHook(results)
@ -1597,7 +1559,7 @@ func (d *Downloader) process() {
for len(results) != 0 { for len(results) != 0 {
// Check for any termination requests // Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 { if atomic.LoadInt32(&d.interrupt) == 1 {
return return errCancelProcessing
} }
// Retrieve the a batch of results to import // Retrieve the a batch of results to import
var ( var (
@ -1633,8 +1595,7 @@ func (d *Downloader) process() {
} }
if err != nil { if err != nil {
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
d.cancel() return err
return
} }
// Shift the results to the next batch // Shift the results to the next batch
results = results[items:] results = results[items:]
@ -1685,19 +1646,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
dropMeter.Mark(int64(packet.Items())) dropMeter.Mark(int64(packet.Items()))
} }
}() }()
// Make sure the downloader is active
if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive
}
// Deliver or abort if the sync is canceled while queuing // Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock() d.cancelLock.RLock()
cancel := d.cancelCh cancel := d.cancelCh
d.cancelLock.RUnlock() d.cancelLock.RUnlock()
if cancel == nil {
return errNoSyncActive
}
select { select {
case destCh <- packet: case destCh <- packet:
return nil return nil
case <-cancel: case <-cancel:
return errNoSyncActive return errNoSyncActive
} }

View File

@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
} }
} }
dl.lock.RUnlock() dl.lock.RUnlock()
return dl.downloader.synchronise(id, hash, td, mode)
err := dl.downloader.synchronise(id, hash, td, mode)
for {
// If the queue is empty and processing stopped, break
if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 {
break
}
// Otherwise sleep a bit and retry
time.Sleep(time.Millisecond)
}
return err
} }
// hasHeader checks if a header is present in the testers canonical chain. // hasHeader checks if a header is present in the testers canonical chain.
@ -701,6 +691,8 @@ func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronis
func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) } func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a small enough block chain to download // Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@ -725,6 +717,8 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) } func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) { func testThrottling(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a long block chain to download and the tester // Create a long block chain to download and the tester
targetBlocks := 8 * blockCacheLimit targetBlocks := 8 * blockCacheLimit
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@ -757,8 +751,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
for start := time.Now(); time.Since(start) < time.Second; { for start := time.Now(); time.Since(start) < time.Second; {
time.Sleep(25 * time.Millisecond) time.Sleep(25 * time.Millisecond)
tester.lock.RLock() tester.lock.Lock()
tester.downloader.queue.lock.RLock() tester.downloader.queue.lock.Lock()
cached = len(tester.downloader.queue.blockDonePool) cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync { if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
@ -769,8 +763,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
} }
frozen = int(atomic.LoadUint32(&blocked)) frozen = int(atomic.LoadUint32(&blocked))
retrieved = len(tester.ownBlocks) retrieved = len(tester.ownBlocks)
tester.downloader.queue.lock.RUnlock() tester.downloader.queue.lock.Unlock()
tester.lock.RUnlock() tester.lock.Unlock()
if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 { if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 {
break break
@ -810,6 +804,8 @@ func TestForkedSynchronisation64Fast(t *testing.T) { testForkedSynchronisation(
func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) } func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) }
func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) { func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a long enough forked chain // Create a long enough forked chain
common, fork := MaxHashFetch, 2*MaxHashFetch common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil) hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@ -833,6 +829,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Tests that an inactive downloader will not accept incoming hashes and blocks. // Tests that an inactive downloader will not accept incoming hashes and blocks.
func TestInactiveDownloader61(t *testing.T) { func TestInactiveDownloader61(t *testing.T) {
t.Parallel()
tester := newTester() tester := newTester()
// Check that neither hashes nor blocks are accepted // Check that neither hashes nor blocks are accepted
@ -847,6 +844,7 @@ func TestInactiveDownloader61(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers and // Tests that an inactive downloader will not accept incoming block headers and
// bodies. // bodies.
func TestInactiveDownloader62(t *testing.T) { func TestInactiveDownloader62(t *testing.T) {
t.Parallel()
tester := newTester() tester := newTester()
// Check that neither block headers nor bodies are accepted // Check that neither block headers nor bodies are accepted
@ -861,6 +859,7 @@ func TestInactiveDownloader62(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers, // Tests that an inactive downloader will not accept incoming block headers,
// bodies and receipts. // bodies and receipts.
func TestInactiveDownloader63(t *testing.T) { func TestInactiveDownloader63(t *testing.T) {
t.Parallel()
tester := newTester() tester := newTester()
// Check that neither block headers nor bodies are accepted // Check that neither block headers nor bodies are accepted
@ -885,6 +884,8 @@ func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) } func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
func testCancel(t *testing.T, protocol int, mode SyncMode) { func testCancel(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a small enough block chain to download and the tester // Create a small enough block chain to download and the tester
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
if targetBlocks >= MaxHashFetch { if targetBlocks >= MaxHashFetch {
@ -923,6 +924,8 @@ func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t,
func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) } func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create various peers with various parts of the chain // Create various peers with various parts of the chain
targetPeers := 8 targetPeers := 8
targetBlocks := targetPeers*blockCacheLimit - 15 targetBlocks := targetPeers*blockCacheLimit - 15
@ -950,6 +953,8 @@ func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t,
func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) } func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a small enough block chain to download // Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@ -986,6 +991,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F
func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) } func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a block chain to download // Create a block chain to download
targetBlocks := 2*blockCacheLimit - 15 targetBlocks := 2*blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@ -1037,6 +1044,8 @@ func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 6
func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) } func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a small enough block chain to download // Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@ -1188,6 +1197,8 @@ func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttac
func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) } func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester() tester := newTester()
hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil) hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil)
@ -1215,7 +1226,6 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errBadPeer, true}, // Peer was deemed bad for some reason, drop it {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
{errStallingPeer, true}, // Peer was detected to be stalling, drop it {errStallingPeer, true}, // Peer was detected to be stalling, drop it
{errNoPeers, false}, // No peers to download from, soft race, no issue {errNoPeers, false}, // No peers to download from, soft race, no issue
{errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
{errTimeout, true}, // No hashes received in due time, drop the peer {errTimeout, true}, // No hashes received in due time, drop the peer
{errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
{errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
@ -1228,6 +1238,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
} }
// Run the tests and check disconnection status // Run the tests and check disconnection status
tester := newTester() tester := newTester()
@ -1261,6 +1273,8 @@ func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) } func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a small enough block chain to download // Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@ -1331,6 +1345,8 @@ func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64,
func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) } func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a forked chain to simulate origin revertal // Create a forked chain to simulate origin revertal
common, fork := MaxHashFetch, 2*MaxHashFetch common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil) hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@ -1404,6 +1420,8 @@ func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64,
func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) } func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a small enough block chain to download // Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@ -1478,6 +1496,8 @@ func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, F
func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) } func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a small block chain // Create a small block chain
targetBlocks := blockCacheLimit - 15 targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil) hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil)
@ -1541,3 +1561,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks) t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks)
} }
} }
// This test reproduces an issue where unexpected deliveries would
// block indefinitely if they arrived at the right time.
func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) }
func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) }
func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) }
func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) }
func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) }
func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) }
func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil)
fakeHeads := []*types.Header{{}, {}, {}, {}}
for i := 0; i < 200; i++ {
tester := newTester()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Whenever the downloader requests headers, flood it with
// a lot of unrequested header deliveries.
tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error {
deliveriesDone := make(chan struct{}, 500)
for i := 0; i < cap(deliveriesDone); i++ {
peer := fmt.Sprintf("fake-peer%d", i)
go func() {
tester.downloader.DeliverHeaders(peer, fakeHeads)
deliveriesDone <- struct{}{}
}()
}
// Deliver the actual requested headers.
impl := tester.peerGetAbsHeadersFn("peer", 0)
go impl(from, count, skip, reverse)
// None of the extra deliveries should block.
timeout := time.After(5 * time.Second)
for i := 0; i < cap(deliveriesDone); i++ {
select {
case <-deliveriesDone:
case <-timeout:
panic("blocked")
}
}
return nil
}
if err := tester.sync("peer", nil, mode); err != nil {
t.Errorf("sync failed: %v", err)
}
}
}

View File

@ -101,11 +101,14 @@ type queue struct {
resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain resultOffset uint64 // Offset of the first cached fetch result in the block chain
lock sync.RWMutex lock *sync.Mutex
active *sync.Cond
closed bool
} }
// newQueue creates a new download queue for scheduling block retrieval. // newQueue creates a new download queue for scheduling block retrieval.
func newQueue(stateDb ethdb.Database) *queue { func newQueue(stateDb ethdb.Database) *queue {
lock := new(sync.Mutex)
return &queue{ return &queue{
hashPool: make(map[common.Hash]int), hashPool: make(map[common.Hash]int),
hashQueue: prque.New(), hashQueue: prque.New(),
@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue {
statePendPool: make(map[string]*fetchRequest), statePendPool: make(map[string]*fetchRequest),
stateDatabase: stateDb, stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit), resultCache: make([]*fetchResult, blockCacheLimit),
active: sync.NewCond(lock),
lock: lock,
} }
} }
@ -133,6 +138,7 @@ func (q *queue) Reset() {
q.stateSchedLock.Lock() q.stateSchedLock.Lock()
defer q.stateSchedLock.Unlock() defer q.stateSchedLock.Unlock()
q.closed = false
q.mode = FullSync q.mode = FullSync
q.fastSyncPivot = 0 q.fastSyncPivot = 0
@ -162,18 +168,27 @@ func (q *queue) Reset() {
q.resultOffset = 0 q.resultOffset = 0
} }
// Close marks the end of the sync, unblocking WaitResults.
// It may be called even if the queue is already closed.
func (q *queue) Close() {
q.lock.Lock()
q.closed = true
q.lock.Unlock()
q.active.Broadcast()
}
// PendingBlocks retrieves the number of block (body) requests pending for retrieval. // PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int { func (q *queue) PendingBlocks() int {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return q.hashQueue.Size() + q.blockTaskQueue.Size() return q.hashQueue.Size() + q.blockTaskQueue.Size()
} }
// PendingReceipts retrieves the number of block receipts pending for retrieval. // PendingReceipts retrieves the number of block receipts pending for retrieval.
func (q *queue) PendingReceipts() int { func (q *queue) PendingReceipts() int {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return q.receiptTaskQueue.Size() return q.receiptTaskQueue.Size()
} }
@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int {
// InFlightBlocks retrieves whether there are block fetch requests currently in // InFlightBlocks retrieves whether there are block fetch requests currently in
// flight. // flight.
func (q *queue) InFlightBlocks() bool { func (q *queue) InFlightBlocks() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return len(q.blockPendPool) > 0 return len(q.blockPendPool) > 0
} }
@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool {
// InFlightReceipts retrieves whether there are receipt fetch requests currently // InFlightReceipts retrieves whether there are receipt fetch requests currently
// in flight. // in flight.
func (q *queue) InFlightReceipts() bool { func (q *queue) InFlightReceipts() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return len(q.receiptPendPool) > 0 return len(q.receiptPendPool) > 0
} }
@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool {
// InFlightNodeData retrieves whether there are node data entry fetch requests // InFlightNodeData retrieves whether there are node data entry fetch requests
// currently in flight. // currently in flight.
func (q *queue) InFlightNodeData() bool { func (q *queue) InFlightNodeData() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0 return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
} }
@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool {
// Idle returns if the queue is fully idle or has some data still inside. This // Idle returns if the queue is fully idle or has some data still inside. This
// method is used by the tester to detect termination events. // method is used by the tester to detect termination events.
func (q *queue) Idle() bool { func (q *queue) Idle() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
@ -237,8 +252,8 @@ func (q *queue) Idle() bool {
// FastSyncPivot retrieves the currently used fast sync pivot point. // FastSyncPivot retrieves the currently used fast sync pivot point.
func (q *queue) FastSyncPivot() uint64 { func (q *queue) FastSyncPivot() uint64 {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return q.fastSyncPivot return q.fastSyncPivot
} }
@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 {
// ShouldThrottleBlocks checks if the download should be throttled (active block (body) // ShouldThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache). // fetches exceed block cache).
func (q *queue) ShouldThrottleBlocks() bool { func (q *queue) ShouldThrottleBlocks() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
// Calculate the currently in-flight block (body) requests // Calculate the currently in-flight block (body) requests
pending := 0 pending := 0
@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool {
// ShouldThrottleReceipts checks if the download should be throttled (active receipt // ShouldThrottleReceipts checks if the download should be throttled (active receipt
// fetches exceed block cache). // fetches exceed block cache).
func (q *queue) ShouldThrottleReceipts() bool { func (q *queue) ShouldThrottleReceipts() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
// Calculate the currently in-flight receipt requests // Calculate the currently in-flight receipt requests
pending := 0 pending := 0
@ -351,93 +366,76 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
return inserts return inserts
} }
// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't // WaitResults retrieves and permanently removes a batch of fetch
// been downloaded yet (or simply non existent). // results from the cache. the result slice will be empty if the queue
func (q *queue) GetHeadResult() *fetchResult { // has been closed.
q.lock.RLock() func (q *queue) WaitResults() []*fetchResult {
defer q.lock.RUnlock()
// If there are no results pending, return nil
if len(q.resultCache) == 0 || q.resultCache[0] == nil {
return nil
}
// If the next result is still incomplete, return nil
if q.resultCache[0].Pending > 0 {
return nil
}
// If the next result is the fast sync pivot...
if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot {
// If the pivot state trie is still being pulled, return nil
if len(q.stateTaskPool) > 0 {
return nil
}
if q.PendingNodeData() > 0 {
return nil
}
// If the state is done, but not enough post-pivot headers were verified, stall...
for i := 0; i < fsHeaderForceVerify; i++ {
if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil {
return nil
}
}
}
return q.resultCache[0]
}
// TakeResults retrieves and permanently removes a batch of fetch results from
// the cache.
func (q *queue) TakeResults() []*fetchResult {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Accumulate all available results nproc := q.countProcessableItems()
results := []*fetchResult{} for nproc == 0 && !q.closed {
for i, result := range q.resultCache { q.active.Wait()
// Stop if no more results are ready nproc = q.countProcessableItems()
if result == nil || result.Pending > 0 {
break
} }
// The fast sync pivot block may only be processed after state fetch completes results := make([]*fetchResult, nproc)
if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { copy(results, q.resultCache[:nproc])
if len(q.stateTaskPool) > 0 { if len(results) > 0 {
break // Mark results as done before dropping them from the cache.
} for _, result := range results {
if q.PendingNodeData() > 0 {
break
}
// Even is state fetch is done, ensure post-pivot headers passed verifications
safe := true
for j := 0; j < fsHeaderForceVerify; j++ {
if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
safe = false
}
}
if !safe {
break
}
}
// If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
break
}
results = append(results, result)
hash := result.Header.Hash() hash := result.Header.Hash()
delete(q.blockDonePool, hash) delete(q.blockDonePool, hash)
delete(q.receiptDonePool, hash) delete(q.receiptDonePool, hash)
} }
// Delete the results from the slice and let them be garbage collected // Delete the results from the cache and clear the tail.
// without this slice trick the results would stay in memory until nil copy(q.resultCache, q.resultCache[nproc:])
// would be assigned to them. for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
copy(q.resultCache, q.resultCache[len(results):]) q.resultCache[i] = nil
for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ { }
q.resultCache[k] = nil // Advance the expected block number of the first cache entry.
q.resultOffset += uint64(nproc)
} }
q.resultOffset += uint64(len(results))
return results return results
} }
// countProcessableItems counts the processable items.
func (q *queue) countProcessableItems() int {
for i, result := range q.resultCache {
// Don't process incomplete or unavailable items.
if result == nil || result.Pending > 0 {
return i
}
// Special handling for the fast-sync pivot block:
if q.mode == FastSync {
bnum := result.Header.Number.Uint64()
if bnum == q.fastSyncPivot {
// If the state of the pivot block is not
// available yet, we cannot proceed and return 0.
//
// Stop before processing the pivot block to ensure that
// resultCache has space for fsHeaderForceVerify items. Not
// doing this could leave us unable to download the required
// amount of headers.
if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
return i
}
for j := 0; j < fsHeaderForceVerify; j++ {
if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
return i
}
}
}
// If we're just the fast sync pivot, stop as well
// because the following batch needs different insertion.
// This simplifies handling the switchover in d.process.
if bnum == q.fastSyncPivot+1 && i > 0 {
return i
}
}
}
return len(q.resultCache)
}
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any // ReserveBlocks reserves a set of block hashes for the given peer, skipping any
// previously failed download. // previously failed download.
func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest { func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
// If we're the first to request this task, initialise the result container // If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset)) index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 { if index >= len(q.resultCache) || index < 0 {
common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain return nil, false, errInvalidChain
} }
if q.resultCache[index] == nil { if q.resultCache[index] == nil {
@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
for _, header := range skip { for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64())) taskQueue.Push(header, -float32(header.Number.Uint64()))
} }
if progress {
// Wake WaitResults, resultCache was modified
q.active.Signal()
}
// Assemble and return the block download request // Assemble and return the block download request
if len(send) == 0 { if len(send) == 0 {
return nil, progress, nil return nil, progress, nil
@ -737,7 +740,7 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// expire is the generic check that move expired tasks from a pending pool back // expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks. // into a task pool, returning all entities caught with expired tasks.
// //
// Note, this method expects the queue lock to be already held for writing. The // Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need // reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway. // to access the queue, so they already need a lock anyway.
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
@ -813,17 +816,16 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
for hash, index := range request.Hashes { for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index)) q.hashQueue.Push(hash, float32(index))
} }
// Wake up WaitResults
q.active.Signal()
// If none of the blocks were good, it's a stale delivery // If none of the blocks were good, it's a stale delivery
switch { switch {
case len(errs) == 0: case len(errs) == 0:
return nil return nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
return errs[0] return errs[0]
case len(errs) == len(blocks): case len(errs) == len(blocks):
return errStaleDelivery return errStaleDelivery
default: default:
return fmt.Errorf("multiple failures: %v", errs) return fmt.Errorf("multiple failures: %v", errs)
} }
@ -915,14 +917,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
taskQueue.Push(header, -float32(header.Number.Uint64())) taskQueue.Push(header, -float32(header.Number.Uint64()))
} }
} }
// Wake up WaitResults
q.active.Signal()
// If none of the data was good, it's a stale delivery // If none of the data was good, it's a stale delivery
switch { switch {
case failure == nil || failure == errInvalidChain: case failure == nil || failure == errInvalidChain:
return failure return failure
case useful: case useful:
return fmt.Errorf("partial failure: %v", failure) return fmt.Errorf("partial failure: %v", failure)
default: default:
return errStaleDelivery return errStaleDelivery
} }
@ -977,10 +979,8 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
switch { switch {
case len(errs) == 0: case len(errs) == 0:
return nil return nil
case len(errs) == len(request.Hashes): case len(errs) == len(request.Hashes):
return errStaleDelivery return errStaleDelivery
default: default:
return fmt.Errorf("multiple failures: %v", errs) return fmt.Errorf("multiple failures: %v", errs)
} }
@ -989,6 +989,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// deliverNodeData is the asynchronous node data processor that injects a batch // deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler. // of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
// Wake up WaitResults after the state has been written because it
// might be waiting for the pivot block state to get completed.
defer q.active.Signal()
// Process results one by one to permit task fetches in between // Process results one by one to permit task fetches in between
for i, result := range results { for i, result := range results {
q.stateSchedLock.Lock() q.stateSchedLock.Lock()

View File

@ -175,10 +175,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
} }
// If fast sync was enabled, and we synced up, disable it // If fast sync was enabled, and we synced up, disable it
if pm.fastSync { if pm.fastSync {
// Wait until all pending imports finish processing
for pm.downloader.Synchronising() {
time.Sleep(100 * time.Millisecond)
}
// Disable fast sync if we indeed have something in our chain // Disable fast sync if we indeed have something in our chain
if pm.blockchain.CurrentBlock().NumberU64() > 0 { if pm.blockchain.CurrentBlock().NumberU64() > 0 {
glog.V(logger.Info).Infof("fast sync complete, auto disabling") glog.V(logger.Info).Infof("fast sync complete, auto disabling")