From 900da3d800ad299c22ecb5d14f477600931d70b6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 13 Nov 2015 16:08:15 +0000 Subject: [PATCH] eth/downloader: don't hang for spurious deliveries Unexpected deliveries could block indefinitely if they arrived at the right time. The fix is to ensure that the cancellation channel is always closed when the sync ends, unblocking any deliveries. Also remove the atomic check for whether a sync is currently running because it doesn't help and can be misleading. Cancelling always seems to break the tests though. The downloader spawned d.process whenever new data arrived, making it somewhat hard to track when block processing was actually done. Fix this by running d.process in a dedicated goroutine that is tied to the lifecycle of the sync. d.process gets notified of new work by the queue instead of being invoked all the time. This removes a ton of weird workaround code, including a hairy use of atomic CAS. --- eth/downloader/downloader.go | 162 ++++++++--------------- eth/downloader/downloader_test.go | 106 ++++++++++----- eth/downloader/queue.go | 210 +++++++++++++++--------------- 3 files changed, 236 insertions(+), 242 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 153427ee4..ac324176d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -74,7 +74,6 @@ var ( errBadPeer = errors.New("action from bad peer ignored") errStallingPeer = errors.New("peer is stalling") errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") errTimeout = errors.New("timeout") errEmptyHashSet = errors.New("empty hash set by peer") errEmptyHeaderSet = errors.New("empty header set by peer") @@ -90,6 +89,7 @@ var ( errCancelBodyFetch = errors.New("block body download canceled (requested)") errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)") + errCancelProcessing = errors.New("processing canceled (requested)") errNoSyncActive = errors.New("no sync active") ) @@ -129,7 +129,6 @@ type Downloader struct { // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchronising int32 - processing int32 notified int32 // Channels @@ -215,7 +214,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) { // Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { - return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0 + return atomic.LoadInt32(&d.synchronising) > 0 } // RegisterPeer injects a new download peer into the set of block source to be @@ -263,9 +262,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) d.dropPeer(id) - case errPendingQueue: - glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) - default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } @@ -290,10 +286,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") } - // Abort if the queue still contains some leftover data - if d.queue.GetHeadResult() != nil { - return errPendingQueue - } // Reset the queue, peer set and wake channels to clean any internal leftover state d.queue.Reset() d.peers.Reset() @@ -335,7 +327,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e defer func() { // reset on error if err != nil { - d.cancel() d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) @@ -365,23 +356,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent hash and block retrieval algorithm + // Initiate the sync using a concurrent hash and block retrieval algorithm + d.queue.Prepare(origin+1, d.mode, 0) if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - d.queue.Prepare(origin+1, d.mode, 0) - - errc := make(chan error, 2) - go func() { errc <- d.fetchHashes61(p, td, origin+1) }() - go func() { errc <- d.fetchBlocks61(origin + 1) }() - - // If any fetcher fails, cancel the other - if err := <-errc; err != nil { - d.cancel() - <-errc - return err - } - return <-errc + return d.spawnSync( + func() error { return d.fetchHashes61(p, td, origin+1) }, + func() error { return d.fetchBlocks61(origin + 1) }, + ) case p.version >= 62: // Look up the sync boundaries: the common ancestor and the target block @@ -405,7 +388,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e switch d.mode { case LightSync: pivot = latest - case FastSync: // Calculate the new fast/slow sync pivot point pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) @@ -426,34 +408,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) } d.queue.Prepare(origin+1, d.mode, pivot) - if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 4) - go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved - go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync - go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync - go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync - - // If any fetcher fails, cancel the others - var fail error - for i := 0; i < cap(errc); i++ { - if err := <-errc; err != nil { - if fail == nil { - fail = err - d.cancel() - } - } - } - return fail + return d.spawnSync( + func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync + ) default: // Something very wrong, stop right here glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) return errBadPeer } - return nil +} + +// spawnSync runs d.process and all given fetcher functions to completion in +// separate goroutines, returning the first error that appears. +func (d *Downloader) spawnSync(fetchers ...func() error) error { + var wg sync.WaitGroup + errc := make(chan error, len(fetchers)+1) + wg.Add(len(fetchers) + 1) + go func() { defer wg.Done(); errc <- d.process() }() + for _, fn := range fetchers { + fn := fn + go func() { defer wg.Done(); errc <- fn() }() + } + // Wait for the first error, then terminate the others. + var err error + for i := 0; i < len(fetchers)+1; i++ { + if i == len(fetchers) { + // Close the queue when all fetchers have exited. + // This will cause the block processor to end when + // it has processed the queue. + d.queue.Close() + } + if err = <-errc; err != nil { + break + } + } + d.queue.Close() + d.cancel() + wg.Wait() + return err } // cancel cancels all of the operations and resets the queue. It returns true @@ -470,12 +469,10 @@ func (d *Downloader) cancel() { } } d.cancelLock.Unlock() - - // Reset the queue - d.queue.Reset() } // Terminate interrupts the downloader, canceling all pending operations. +// The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { atomic.StoreInt32(&d.interrupt, 1) d.cancel() @@ -800,7 +797,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error { peer.Promote() peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) - go d.process() case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort @@ -826,7 +822,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error { peer.Demote() peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) - go d.process() } } // Blocks arrived, try to update the progress @@ -1336,10 +1331,8 @@ func (d *Downloader) fetchNodeData() error { d.cancel() return } - // Processing succeeded, notify state fetcher and processor of continuation - if d.queue.PendingNodeData() == 0 { - go d.process() - } else { + // Processing succeeded, notify state fetcher of continuation + if d.queue.PendingNodeData() > 0 { select { case d.stateWakeCh <- true: default: @@ -1348,7 +1341,6 @@ func (d *Downloader) fetchNodeData() error { // Log a message to the user and return d.syncStatsLock.Lock() defer d.syncStatsLock.Unlock() - d.syncStatsStateDone += uint64(delivered) glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) }) @@ -1415,7 +1407,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv peer.Promote() setIdle(peer) glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) - go d.process() case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort @@ -1441,7 +1432,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv peer.Demote() setIdle(peer) glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) - go d.process() } } // Blocks assembled, try to update the progress @@ -1508,7 +1498,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } if progress { progressed = true - go d.process() } if request == nil { continue @@ -1545,46 +1534,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } // process takes fetch results from the queue and tries to import them into the -// chain. The type of import operation will depend on the result contents: -// - -// -// The algorithmic flow is as follows: -// - The `processing` flag is swapped to 1 to ensure singleton access -// - The current `cancel` channel is retrieved to detect sync abortions -// - Blocks are iteratively taken from the cache and inserted into the chain -// - When the cache becomes empty, insertion stops -// - The `processing` flag is swapped back to 0 -// - A post-exit check is made whether new blocks became available -// - This step is important: it handles a potential race condition between -// checking for no more work, and releasing the processing "mutex". In -// between these state changes, a block may have arrived, but a processing -// attempt denied, so we need to re-enter to ensure the block isn't left -// to idle in the cache. -func (d *Downloader) process() { - // Make sure only one goroutine is ever allowed to process blocks at once - if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { - return - } - // If the processor just exited, but there are freshly pending items, try to - // reenter. This is needed because the goroutine spinned up for processing - // the fresh results might have been rejected entry to to this present thread - // not yet releasing the `processing` state. - defer func() { - if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil { - d.process() - } - }() - // Release the lock upon exit (note, before checking for reentry!) - // the import statistics to zero. - defer atomic.StoreInt32(&d.processing, 0) - - // Repeat the processing as long as there are results to process +// chain. The type of import operation will depend on the result contents. +func (d *Downloader) process() error { + pivot := d.queue.FastSyncPivot() for { - // Fetch the next batch of results - pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race - results := d.queue.TakeResults() + results := d.queue.WaitResults() if len(results) == 0 { - return + return nil // queue empty } if d.chainInsertHook != nil { d.chainInsertHook(results) @@ -1597,7 +1553,7 @@ func (d *Downloader) process() { for len(results) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { - return + return errCancelProcessing } // Retrieve the a batch of results to import var ( @@ -1633,8 +1589,7 @@ func (d *Downloader) process() { } if err != nil { glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) - d.cancel() - return + return err } // Shift the results to the next batch results = results[items:] @@ -1685,19 +1640,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i dropMeter.Mark(int64(packet.Items())) } }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } // Deliver or abort if the sync is canceled while queuing d.cancelLock.RLock() cancel := d.cancelCh d.cancelLock.RUnlock() - + if cancel == nil { + return errNoSyncActive + } select { case destCh <- packet: return nil - case <-cancel: return errNoSyncActive } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index ef6f74a6b..872a4d4f5 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { } } dl.lock.RUnlock() - - err := dl.downloader.synchronise(id, hash, td, mode) - for { - // If the queue is empty and processing stopped, break - if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 { - break - } - // Otherwise sleep a bit and retry - time.Sleep(time.Millisecond) - } - return err + return dl.downloader.synchronise(id, hash, td, mode) } // hasHeader checks if a header is present in the testers canonical chain. @@ -757,8 +747,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { for start := time.Now(); time.Since(start) < time.Second; { time.Sleep(25 * time.Millisecond) - tester.lock.RLock() - tester.downloader.queue.lock.RLock() + tester.lock.Lock() + tester.downloader.queue.lock.Lock() cached = len(tester.downloader.queue.blockDonePool) if mode == FastSync { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { @@ -769,8 +759,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { } frozen = int(atomic.LoadUint32(&blocked)) retrieved = len(tester.ownBlocks) - tester.downloader.queue.lock.RUnlock() - tester.lock.RUnlock() + tester.downloader.queue.lock.Unlock() + tester.lock.Unlock() if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 { break @@ -1209,25 +1199,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { result error drop bool }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end - {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin - {errInvalidBody, false}, // A bad peer was detected, but not the sync origin - {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin - {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin + {errInvalidBody, false}, // A bad peer was detected, but not the sync origin + {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status tester := newTester() @@ -1541,3 +1532,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks) } } + +// This test reproduces an issue where unexpected deliveries would +// block indefinitely if they arrived at the right time. +func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) } +func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) } +func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) } +func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) } +func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) } +func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) } + +func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil) + fakeHeads := []*types.Header{{}, {}, {}, {}} + for i := 0; i < 200; i++ { + tester := newTester() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + // Whenever the downloader requests headers, flood it with + // a lot of unrequested header deliveries. + tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error { + deliveriesDone := make(chan struct{}, 500) + for i := 0; i < cap(deliveriesDone); i++ { + peer := fmt.Sprintf("fake-peer%d", i) + go func() { + tester.downloader.DeliverHeaders(peer, fakeHeads) + deliveriesDone <- struct{}{} + }() + } + // Deliver the actual requested headers. + impl := tester.peerGetAbsHeadersFn("peer", 0) + go impl(from, count, skip, reverse) + // None of the extra deliveries should block. + timeout := time.After(5 * time.Second) + for i := 0; i < cap(deliveriesDone); i++ { + select { + case <-deliveriesDone: + case <-timeout: + panic("blocked") + } + } + return nil + } + if err := tester.sync("peer", nil, mode); err != nil { + t.Errorf("sync failed: %v", err) + } + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 1fb5b6e12..584797d7b 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -101,11 +101,14 @@ type queue struct { resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain - lock sync.RWMutex + lock *sync.Mutex + active *sync.Cond + closed bool } // newQueue creates a new download queue for scheduling block retrieval. func newQueue(stateDb ethdb.Database) *queue { + lock := new(sync.Mutex) return &queue{ hashPool: make(map[common.Hash]int), hashQueue: prque.New(), @@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue { statePendPool: make(map[string]*fetchRequest), stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), + active: sync.NewCond(lock), + lock: lock, } } @@ -133,6 +138,7 @@ func (q *queue) Reset() { q.stateSchedLock.Lock() defer q.stateSchedLock.Unlock() + q.closed = false q.mode = FullSync q.fastSyncPivot = 0 @@ -162,18 +168,27 @@ func (q *queue) Reset() { q.resultOffset = 0 } +// Close marks the end of the sync, unblocking WaitResults. +// It may be called even if the queue is already closed. +func (q *queue) Close() { + q.lock.Lock() + q.closed = true + q.lock.Unlock() + q.active.Broadcast() +} + // PendingBlocks retrieves the number of block (body) requests pending for retrieval. func (q *queue) PendingBlocks() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.hashQueue.Size() + q.blockTaskQueue.Size() } // PendingReceipts retrieves the number of block receipts pending for retrieval. func (q *queue) PendingReceipts() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.receiptTaskQueue.Size() } @@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int { // InFlightBlocks retrieves whether there are block fetch requests currently in // flight. func (q *queue) InFlightBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.blockPendPool) > 0 } @@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool { // InFlightReceipts retrieves whether there are receipt fetch requests currently // in flight. func (q *queue) InFlightReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.receiptPendPool) > 0 } @@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool { // InFlightNodeData retrieves whether there are node data entry fetch requests // currently in flight. func (q *queue) InFlightNodeData() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0 } @@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool { // Idle returns if the queue is fully idle or has some data still inside. This // method is used by the tester to detect termination events. func (q *queue) Idle() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) @@ -237,8 +252,8 @@ func (q *queue) Idle() bool { // FastSyncPivot retrieves the currently used fast sync pivot point. func (q *queue) FastSyncPivot() uint64 { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.fastSyncPivot } @@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 { // ShouldThrottleBlocks checks if the download should be throttled (active block (body) // fetches exceed block cache). func (q *queue) ShouldThrottleBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight block (body) requests pending := 0 @@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool { // ShouldThrottleReceipts checks if the download should be throttled (active receipt // fetches exceed block cache). func (q *queue) ShouldThrottleReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight receipt requests pending := 0 @@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { return inserts } -// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't -// been downloaded yet (or simply non existent). -func (q *queue) GetHeadResult() *fetchResult { - q.lock.RLock() - defer q.lock.RUnlock() - - // If there are no results pending, return nil - if len(q.resultCache) == 0 || q.resultCache[0] == nil { - return nil - } - // If the next result is still incomplete, return nil - if q.resultCache[0].Pending > 0 { - return nil - } - // If the next result is the fast sync pivot... - if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot { - // If the pivot state trie is still being pulled, return nil - if len(q.stateTaskPool) > 0 { - return nil - } - if q.PendingNodeData() > 0 { - return nil - } - // If the state is done, but not enough post-pivot headers were verified, stall... - for i := 0; i < fsHeaderForceVerify; i++ { - if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil { - return nil - } - } - } - return q.resultCache[0] -} - -// TakeResults retrieves and permanently removes a batch of fetch results from -// the cache. -func (q *queue) TakeResults() []*fetchResult { +// WaitResults retrieves and permanently removes a batch of fetch +// results from the cache. the result slice will be empty if the queue +// has been closed. +func (q *queue) WaitResults() []*fetchResult { q.lock.Lock() defer q.lock.Unlock() - // Accumulate all available results - results := []*fetchResult{} - for i, result := range q.resultCache { - // Stop if no more results are ready - if result == nil || result.Pending > 0 { - break + nproc := q.countProcessableItems() + for nproc == 0 && !q.closed { + q.active.Wait() + nproc = q.countProcessableItems() + } + results := make([]*fetchResult, nproc) + copy(results, q.resultCache[:nproc]) + if len(results) > 0 { + // Mark results as done before dropping them from the cache. + for _, result := range results { + hash := result.Header.Hash() + delete(q.blockDonePool, hash) + delete(q.receiptDonePool, hash) } - // The fast sync pivot block may only be processed after state fetch completes - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { - if len(q.stateTaskPool) > 0 { - break - } - if q.PendingNodeData() > 0 { - break - } - // Even is state fetch is done, ensure post-pivot headers passed verifications - safe := true - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - safe = false + // Delete the results from the cache and clear the tail. + copy(q.resultCache, q.resultCache[nproc:]) + for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ { + q.resultCache[i] = nil + } + // Advance the expected block number of the first cache entry. + q.resultOffset += uint64(nproc) + } + return results +} + +// countProcessableItems counts the processable items. +func (q *queue) countProcessableItems() int { + for i, result := range q.resultCache { + // Don't process incomplete or unavailable items. + if result == nil || result.Pending > 0 { + return i + } + // Special handling for the fast-sync pivot block: + if q.mode == FastSync { + bnum := result.Header.Number.Uint64() + if bnum == q.fastSyncPivot { + // If the state of the pivot block is not + // available yet, we cannot proceed and return 0. + // + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 { + return i + } + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { + return i + } } } - if !safe { - break + // If we're just the fast sync pivot, stop as well + // because the following batch needs different insertion. + // This simplifies handling the switchover in d.process. + if bnum == q.fastSyncPivot+1 && i > 0 { + return i } } - // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { - break - } - results = append(results, result) - - hash := result.Header.Hash() - delete(q.blockDonePool, hash) - delete(q.receiptDonePool, hash) } - // Delete the results from the slice and let them be garbage collected - // without this slice trick the results would stay in memory until nil - // would be assigned to them. - copy(q.resultCache, q.resultCache[len(results):]) - for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ { - q.resultCache[k] = nil - } - q.resultOffset += uint64(len(results)) - - return results + return len(q.resultCache) } // ReserveBlocks reserves a set of block hashes for the given peer, skipping any @@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ // If we're the first to request this task, initialise the result container index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { + common.Report("index allocation went beyond available resultCache space") return nil, false, errInvalidChain } if q.resultCache[index] == nil { @@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ for _, header := range skip { taskQueue.Push(header, -float32(header.Number.Uint64())) } + if progress { + // Wake WaitResults, resultCache was modified + q.active.Signal() + } // Assemble and return the block download request if len(send) == 0 { return nil, progress, nil @@ -737,7 +740,7 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string { // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // -// Note, this method expects the queue lock to be already held for writing. The +// Note, this method expects the queue lock to be already held. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { @@ -813,17 +816,16 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } + // Wake up WaitResults + q.active.Signal() // If none of the blocks were good, it's a stale delivery switch { case len(errs) == 0: return nil - case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): return errs[0] - case len(errs) == len(blocks): return errStaleDelivery - default: return fmt.Errorf("multiple failures: %v", errs) } @@ -915,14 +917,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ taskQueue.Push(header, -float32(header.Number.Uint64())) } } + // Wake up WaitResults + q.active.Signal() // If none of the data was good, it's a stale delivery switch { case failure == nil || failure == errInvalidChain: return failure - case useful: return fmt.Errorf("partial failure: %v", failure) - default: return errStaleDelivery } @@ -977,10 +979,8 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i switch { case len(errs) == 0: return nil - case len(errs) == len(request.Hashes): return errStaleDelivery - default: return fmt.Errorf("multiple failures: %v", errs) } @@ -989,6 +989,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // deliverNodeData is the asynchronous node data processor that injects a batch // of sync results into the state scheduler. func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { + // Wake up WaitResults after the state has been written because it + // might be waiting for the pivot block state to get completed. + defer q.active.Signal() + // Process results one by one to permit task fetches in between for i, result := range results { q.stateSchedLock.Lock()