eth/downloader: don't hang for spurious deliveries

Unexpected deliveries could block indefinitely if they arrived at the
right time. The fix is to ensure that the cancellation channel is
always closed when the sync ends, unblocking any deliveries. Also remove
the atomic check for whether a sync is currently running because it
doesn't help and can be misleading.

Cancelling always seems to break the tests though. The downloader
spawned d.process whenever new data arrived, making it somewhat hard to
track when block processing was actually done. Fix this by running
d.process in a dedicated goroutine that is tied to the lifecycle of the
sync. d.process gets notified of new work by the queue instead of being
invoked all the time. This removes a ton of weird workaround code,
including a hairy use of atomic CAS.
This commit is contained in:
Felix Lange 2015-11-13 16:08:15 +00:00
parent 9422eec554
commit 900da3d800
3 changed files with 236 additions and 242 deletions

View File

@ -74,7 +74,6 @@ var (
errBadPeer = errors.New("action from bad peer ignored") errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling") errStallingPeer = errors.New("peer is stalling")
errNoPeers = errors.New("no peers to keep download active") errNoPeers = errors.New("no peers to keep download active")
errPendingQueue = errors.New("pending items in queue")
errTimeout = errors.New("timeout") errTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer") errEmptyHashSet = errors.New("empty hash set by peer")
errEmptyHeaderSet = errors.New("empty header set by peer") errEmptyHeaderSet = errors.New("empty header set by peer")
@ -90,6 +89,7 @@ var (
errCancelBodyFetch = errors.New("block body download canceled (requested)") errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)")
errCancelProcessing = errors.New("processing canceled (requested)")
errNoSyncActive = errors.New("no sync active") errNoSyncActive = errors.New("no sync active")
) )
@ -129,7 +129,6 @@ type Downloader struct {
// Status // Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32 synchronising int32
processing int32
notified int32 notified int32
// Channels // Channels
@ -215,7 +214,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) {
// Synchronising returns whether the downloader is currently retrieving blocks. // Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool { func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0 return atomic.LoadInt32(&d.synchronising) > 0
} }
// RegisterPeer injects a new download peer into the set of block source to be // RegisterPeer injects a new download peer into the set of block source to be
@ -263,9 +262,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
d.dropPeer(id) d.dropPeer(id)
case errPendingQueue:
glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
default: default:
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
} }
@ -290,10 +286,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started") glog.V(logger.Info).Infoln("Block synchronisation started")
} }
// Abort if the queue still contains some leftover data
if d.queue.GetHeadResult() != nil {
return errPendingQueue
}
// Reset the queue, peer set and wake channels to clean any internal leftover state // Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset() d.queue.Reset()
d.peers.Reset() d.peers.Reset()
@ -335,7 +327,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
defer func() { defer func() {
// reset on error // reset on error
if err != nil { if err != nil {
d.cancel()
d.mux.Post(FailedEvent{err}) d.mux.Post(FailedEvent{err})
} else { } else {
d.mux.Post(DoneEvent{}) d.mux.Post(DoneEvent{})
@ -366,22 +357,14 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.syncStatsLock.Unlock() d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent hash and block retrieval algorithm // Initiate the sync using a concurrent hash and block retrieval algorithm
d.queue.Prepare(origin+1, d.mode, 0)
if d.syncInitHook != nil { if d.syncInitHook != nil {
d.syncInitHook(origin, latest) d.syncInitHook(origin, latest)
} }
d.queue.Prepare(origin+1, d.mode, 0) return d.spawnSync(
func() error { return d.fetchHashes61(p, td, origin+1) },
errc := make(chan error, 2) func() error { return d.fetchBlocks61(origin + 1) },
go func() { errc <- d.fetchHashes61(p, td, origin+1) }() )
go func() { errc <- d.fetchBlocks61(origin + 1) }()
// If any fetcher fails, cancel the other
if err := <-errc; err != nil {
d.cancel()
<-errc
return err
}
return <-errc
case p.version >= 62: case p.version >= 62:
// Look up the sync boundaries: the common ancestor and the target block // Look up the sync boundaries: the common ancestor and the target block
@ -405,7 +388,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
switch d.mode { switch d.mode {
case LightSync: case LightSync:
pivot = latest pivot = latest
case FastSync: case FastSync:
// Calculate the new fast/slow sync pivot point // Calculate the new fast/slow sync pivot point
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
@ -426,34 +408,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
} }
d.queue.Prepare(origin+1, d.mode, pivot) d.queue.Prepare(origin+1, d.mode, pivot)
if d.syncInitHook != nil { if d.syncInitHook != nil {
d.syncInitHook(origin, latest) d.syncInitHook(origin, latest)
} }
errc := make(chan error, 4) return d.spawnSync(
go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
)
// If any fetcher fails, cancel the others
var fail error
for i := 0; i < cap(errc); i++ {
if err := <-errc; err != nil {
if fail == nil {
fail = err
d.cancel()
}
}
}
return fail
default: default:
// Something very wrong, stop right here // Something very wrong, stop right here
glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
return errBadPeer return errBadPeer
} }
return nil }
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(fetchers ...func() error) error {
var wg sync.WaitGroup
errc := make(chan error, len(fetchers)+1)
wg.Add(len(fetchers) + 1)
go func() { defer wg.Done(); errc <- d.process() }()
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
for i := 0; i < len(fetchers)+1; i++ {
if i == len(fetchers) {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
d.queue.Close()
}
if err = <-errc; err != nil {
break
}
}
d.queue.Close()
d.cancel()
wg.Wait()
return err
} }
// cancel cancels all of the operations and resets the queue. It returns true // cancel cancels all of the operations and resets the queue. It returns true
@ -470,12 +469,10 @@ func (d *Downloader) cancel() {
} }
} }
d.cancelLock.Unlock() d.cancelLock.Unlock()
// Reset the queue
d.queue.Reset()
} }
// Terminate interrupts the downloader, canceling all pending operations. // Terminate interrupts the downloader, canceling all pending operations.
// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() { func (d *Downloader) Terminate() {
atomic.StoreInt32(&d.interrupt, 1) atomic.StoreInt32(&d.interrupt, 1)
d.cancel() d.cancel()
@ -800,7 +797,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
peer.Promote() peer.Promote()
peer.SetBlocksIdle() peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
go d.process()
case errInvalidChain: case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort // The hash chain is invalid (blocks are not ordered properly), abort
@ -826,7 +822,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
peer.Demote() peer.Demote()
peer.SetBlocksIdle() peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
go d.process()
} }
} }
// Blocks arrived, try to update the progress // Blocks arrived, try to update the progress
@ -1336,10 +1331,8 @@ func (d *Downloader) fetchNodeData() error {
d.cancel() d.cancel()
return return
} }
// Processing succeeded, notify state fetcher and processor of continuation // Processing succeeded, notify state fetcher of continuation
if d.queue.PendingNodeData() == 0 { if d.queue.PendingNodeData() > 0 {
go d.process()
} else {
select { select {
case d.stateWakeCh <- true: case d.stateWakeCh <- true:
default: default:
@ -1348,7 +1341,6 @@ func (d *Downloader) fetchNodeData() error {
// Log a message to the user and return // Log a message to the user and return
d.syncStatsLock.Lock() d.syncStatsLock.Lock()
defer d.syncStatsLock.Unlock() defer d.syncStatsLock.Unlock()
d.syncStatsStateDone += uint64(delivered) d.syncStatsStateDone += uint64(delivered)
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
}) })
@ -1415,7 +1407,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
peer.Promote() peer.Promote()
setIdle(peer) setIdle(peer)
glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
go d.process()
case errInvalidChain: case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort // The hash chain is invalid (blocks are not ordered properly), abort
@ -1441,7 +1432,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
peer.Demote() peer.Demote()
setIdle(peer) setIdle(peer)
glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
go d.process()
} }
} }
// Blocks assembled, try to update the progress // Blocks assembled, try to update the progress
@ -1508,7 +1498,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
} }
if progress { if progress {
progressed = true progressed = true
go d.process()
} }
if request == nil { if request == nil {
continue continue
@ -1545,46 +1534,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
} }
// process takes fetch results from the queue and tries to import them into the // process takes fetch results from the queue and tries to import them into the
// chain. The type of import operation will depend on the result contents: // chain. The type of import operation will depend on the result contents.
// - func (d *Downloader) process() error {
// pivot := d.queue.FastSyncPivot()
// The algorithmic flow is as follows:
// - The `processing` flag is swapped to 1 to ensure singleton access
// - The current `cancel` channel is retrieved to detect sync abortions
// - Blocks are iteratively taken from the cache and inserted into the chain
// - When the cache becomes empty, insertion stops
// - The `processing` flag is swapped back to 0
// - A post-exit check is made whether new blocks became available
// - This step is important: it handles a potential race condition between
// checking for no more work, and releasing the processing "mutex". In
// between these state changes, a block may have arrived, but a processing
// attempt denied, so we need to re-enter to ensure the block isn't left
// to idle in the cache.
func (d *Downloader) process() {
// Make sure only one goroutine is ever allowed to process blocks at once
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
return
}
// If the processor just exited, but there are freshly pending items, try to
// reenter. This is needed because the goroutine spinned up for processing
// the fresh results might have been rejected entry to to this present thread
// not yet releasing the `processing` state.
defer func() {
if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
d.process()
}
}()
// Release the lock upon exit (note, before checking for reentry!)
// the import statistics to zero.
defer atomic.StoreInt32(&d.processing, 0)
// Repeat the processing as long as there are results to process
for { for {
// Fetch the next batch of results results := d.queue.WaitResults()
pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race
results := d.queue.TakeResults()
if len(results) == 0 { if len(results) == 0 {
return return nil // queue empty
} }
if d.chainInsertHook != nil { if d.chainInsertHook != nil {
d.chainInsertHook(results) d.chainInsertHook(results)
@ -1597,7 +1553,7 @@ func (d *Downloader) process() {
for len(results) != 0 { for len(results) != 0 {
// Check for any termination requests // Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 { if atomic.LoadInt32(&d.interrupt) == 1 {
return return errCancelProcessing
} }
// Retrieve the a batch of results to import // Retrieve the a batch of results to import
var ( var (
@ -1633,8 +1589,7 @@ func (d *Downloader) process() {
} }
if err != nil { if err != nil {
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
d.cancel() return err
return
} }
// Shift the results to the next batch // Shift the results to the next batch
results = results[items:] results = results[items:]
@ -1685,19 +1640,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
dropMeter.Mark(int64(packet.Items())) dropMeter.Mark(int64(packet.Items()))
} }
}() }()
// Make sure the downloader is active
if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive
}
// Deliver or abort if the sync is canceled while queuing // Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock() d.cancelLock.RLock()
cancel := d.cancelCh cancel := d.cancelCh
d.cancelLock.RUnlock() d.cancelLock.RUnlock()
if cancel == nil {
return errNoSyncActive
}
select { select {
case destCh <- packet: case destCh <- packet:
return nil return nil
case <-cancel: case <-cancel:
return errNoSyncActive return errNoSyncActive
} }

View File

@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
} }
} }
dl.lock.RUnlock() dl.lock.RUnlock()
return dl.downloader.synchronise(id, hash, td, mode)
err := dl.downloader.synchronise(id, hash, td, mode)
for {
// If the queue is empty and processing stopped, break
if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 {
break
}
// Otherwise sleep a bit and retry
time.Sleep(time.Millisecond)
}
return err
} }
// hasHeader checks if a header is present in the testers canonical chain. // hasHeader checks if a header is present in the testers canonical chain.
@ -757,8 +747,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
for start := time.Now(); time.Since(start) < time.Second; { for start := time.Now(); time.Since(start) < time.Second; {
time.Sleep(25 * time.Millisecond) time.Sleep(25 * time.Millisecond)
tester.lock.RLock() tester.lock.Lock()
tester.downloader.queue.lock.RLock() tester.downloader.queue.lock.Lock()
cached = len(tester.downloader.queue.blockDonePool) cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync { if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
@ -769,8 +759,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
} }
frozen = int(atomic.LoadUint32(&blocked)) frozen = int(atomic.LoadUint32(&blocked))
retrieved = len(tester.ownBlocks) retrieved = len(tester.ownBlocks)
tester.downloader.queue.lock.RUnlock() tester.downloader.queue.lock.Unlock()
tester.lock.RUnlock() tester.lock.Unlock()
if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 { if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 {
break break
@ -1215,7 +1205,6 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errBadPeer, true}, // Peer was deemed bad for some reason, drop it {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
{errStallingPeer, true}, // Peer was detected to be stalling, drop it {errStallingPeer, true}, // Peer was detected to be stalling, drop it
{errNoPeers, false}, // No peers to download from, soft race, no issue {errNoPeers, false}, // No peers to download from, soft race, no issue
{errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
{errTimeout, true}, // No hashes received in due time, drop the peer {errTimeout, true}, // No hashes received in due time, drop the peer
{errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
{errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
@ -1228,6 +1217,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
} }
// Run the tests and check disconnection status // Run the tests and check disconnection status
tester := newTester() tester := newTester()
@ -1541,3 +1532,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks) t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks)
} }
} }
// This test reproduces an issue where unexpected deliveries would
// block indefinitely if they arrived at the right time.
func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) }
func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) }
func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) }
func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) }
func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) }
func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) }
func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil)
fakeHeads := []*types.Header{{}, {}, {}, {}}
for i := 0; i < 200; i++ {
tester := newTester()
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Whenever the downloader requests headers, flood it with
// a lot of unrequested header deliveries.
tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error {
deliveriesDone := make(chan struct{}, 500)
for i := 0; i < cap(deliveriesDone); i++ {
peer := fmt.Sprintf("fake-peer%d", i)
go func() {
tester.downloader.DeliverHeaders(peer, fakeHeads)
deliveriesDone <- struct{}{}
}()
}
// Deliver the actual requested headers.
impl := tester.peerGetAbsHeadersFn("peer", 0)
go impl(from, count, skip, reverse)
// None of the extra deliveries should block.
timeout := time.After(5 * time.Second)
for i := 0; i < cap(deliveriesDone); i++ {
select {
case <-deliveriesDone:
case <-timeout:
panic("blocked")
}
}
return nil
}
if err := tester.sync("peer", nil, mode); err != nil {
t.Errorf("sync failed: %v", err)
}
}
}

View File

@ -101,11 +101,14 @@ type queue struct {
resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain resultOffset uint64 // Offset of the first cached fetch result in the block chain
lock sync.RWMutex lock *sync.Mutex
active *sync.Cond
closed bool
} }
// newQueue creates a new download queue for scheduling block retrieval. // newQueue creates a new download queue for scheduling block retrieval.
func newQueue(stateDb ethdb.Database) *queue { func newQueue(stateDb ethdb.Database) *queue {
lock := new(sync.Mutex)
return &queue{ return &queue{
hashPool: make(map[common.Hash]int), hashPool: make(map[common.Hash]int),
hashQueue: prque.New(), hashQueue: prque.New(),
@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue {
statePendPool: make(map[string]*fetchRequest), statePendPool: make(map[string]*fetchRequest),
stateDatabase: stateDb, stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit), resultCache: make([]*fetchResult, blockCacheLimit),
active: sync.NewCond(lock),
lock: lock,
} }
} }
@ -133,6 +138,7 @@ func (q *queue) Reset() {
q.stateSchedLock.Lock() q.stateSchedLock.Lock()
defer q.stateSchedLock.Unlock() defer q.stateSchedLock.Unlock()
q.closed = false
q.mode = FullSync q.mode = FullSync
q.fastSyncPivot = 0 q.fastSyncPivot = 0
@ -162,18 +168,27 @@ func (q *queue) Reset() {
q.resultOffset = 0 q.resultOffset = 0
} }
// Close marks the end of the sync, unblocking WaitResults.
// It may be called even if the queue is already closed.
func (q *queue) Close() {
q.lock.Lock()
q.closed = true
q.lock.Unlock()
q.active.Broadcast()
}
// PendingBlocks retrieves the number of block (body) requests pending for retrieval. // PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int { func (q *queue) PendingBlocks() int {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return q.hashQueue.Size() + q.blockTaskQueue.Size() return q.hashQueue.Size() + q.blockTaskQueue.Size()
} }
// PendingReceipts retrieves the number of block receipts pending for retrieval. // PendingReceipts retrieves the number of block receipts pending for retrieval.
func (q *queue) PendingReceipts() int { func (q *queue) PendingReceipts() int {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return q.receiptTaskQueue.Size() return q.receiptTaskQueue.Size()
} }
@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int {
// InFlightBlocks retrieves whether there are block fetch requests currently in // InFlightBlocks retrieves whether there are block fetch requests currently in
// flight. // flight.
func (q *queue) InFlightBlocks() bool { func (q *queue) InFlightBlocks() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return len(q.blockPendPool) > 0 return len(q.blockPendPool) > 0
} }
@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool {
// InFlightReceipts retrieves whether there are receipt fetch requests currently // InFlightReceipts retrieves whether there are receipt fetch requests currently
// in flight. // in flight.
func (q *queue) InFlightReceipts() bool { func (q *queue) InFlightReceipts() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return len(q.receiptPendPool) > 0 return len(q.receiptPendPool) > 0
} }
@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool {
// InFlightNodeData retrieves whether there are node data entry fetch requests // InFlightNodeData retrieves whether there are node data entry fetch requests
// currently in flight. // currently in flight.
func (q *queue) InFlightNodeData() bool { func (q *queue) InFlightNodeData() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0 return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
} }
@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool {
// Idle returns if the queue is fully idle or has some data still inside. This // Idle returns if the queue is fully idle or has some data still inside. This
// method is used by the tester to detect termination events. // method is used by the tester to detect termination events.
func (q *queue) Idle() bool { func (q *queue) Idle() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
@ -237,8 +252,8 @@ func (q *queue) Idle() bool {
// FastSyncPivot retrieves the currently used fast sync pivot point. // FastSyncPivot retrieves the currently used fast sync pivot point.
func (q *queue) FastSyncPivot() uint64 { func (q *queue) FastSyncPivot() uint64 {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
return q.fastSyncPivot return q.fastSyncPivot
} }
@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 {
// ShouldThrottleBlocks checks if the download should be throttled (active block (body) // ShouldThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache). // fetches exceed block cache).
func (q *queue) ShouldThrottleBlocks() bool { func (q *queue) ShouldThrottleBlocks() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
// Calculate the currently in-flight block (body) requests // Calculate the currently in-flight block (body) requests
pending := 0 pending := 0
@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool {
// ShouldThrottleReceipts checks if the download should be throttled (active receipt // ShouldThrottleReceipts checks if the download should be throttled (active receipt
// fetches exceed block cache). // fetches exceed block cache).
func (q *queue) ShouldThrottleReceipts() bool { func (q *queue) ShouldThrottleReceipts() bool {
q.lock.RLock() q.lock.Lock()
defer q.lock.RUnlock() defer q.lock.Unlock()
// Calculate the currently in-flight receipt requests // Calculate the currently in-flight receipt requests
pending := 0 pending := 0
@ -351,93 +366,76 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
return inserts return inserts
} }
// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't // WaitResults retrieves and permanently removes a batch of fetch
// been downloaded yet (or simply non existent). // results from the cache. the result slice will be empty if the queue
func (q *queue) GetHeadResult() *fetchResult { // has been closed.
q.lock.RLock() func (q *queue) WaitResults() []*fetchResult {
defer q.lock.RUnlock()
// If there are no results pending, return nil
if len(q.resultCache) == 0 || q.resultCache[0] == nil {
return nil
}
// If the next result is still incomplete, return nil
if q.resultCache[0].Pending > 0 {
return nil
}
// If the next result is the fast sync pivot...
if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot {
// If the pivot state trie is still being pulled, return nil
if len(q.stateTaskPool) > 0 {
return nil
}
if q.PendingNodeData() > 0 {
return nil
}
// If the state is done, but not enough post-pivot headers were verified, stall...
for i := 0; i < fsHeaderForceVerify; i++ {
if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil {
return nil
}
}
}
return q.resultCache[0]
}
// TakeResults retrieves and permanently removes a batch of fetch results from
// the cache.
func (q *queue) TakeResults() []*fetchResult {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Accumulate all available results nproc := q.countProcessableItems()
results := []*fetchResult{} for nproc == 0 && !q.closed {
for i, result := range q.resultCache { q.active.Wait()
// Stop if no more results are ready nproc = q.countProcessableItems()
if result == nil || result.Pending > 0 {
break
} }
// The fast sync pivot block may only be processed after state fetch completes results := make([]*fetchResult, nproc)
if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { copy(results, q.resultCache[:nproc])
if len(q.stateTaskPool) > 0 { if len(results) > 0 {
break // Mark results as done before dropping them from the cache.
} for _, result := range results {
if q.PendingNodeData() > 0 {
break
}
// Even is state fetch is done, ensure post-pivot headers passed verifications
safe := true
for j := 0; j < fsHeaderForceVerify; j++ {
if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
safe = false
}
}
if !safe {
break
}
}
// If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
break
}
results = append(results, result)
hash := result.Header.Hash() hash := result.Header.Hash()
delete(q.blockDonePool, hash) delete(q.blockDonePool, hash)
delete(q.receiptDonePool, hash) delete(q.receiptDonePool, hash)
} }
// Delete the results from the slice and let them be garbage collected // Delete the results from the cache and clear the tail.
// without this slice trick the results would stay in memory until nil copy(q.resultCache, q.resultCache[nproc:])
// would be assigned to them. for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
copy(q.resultCache, q.resultCache[len(results):]) q.resultCache[i] = nil
for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ { }
q.resultCache[k] = nil // Advance the expected block number of the first cache entry.
q.resultOffset += uint64(nproc)
} }
q.resultOffset += uint64(len(results))
return results return results
} }
// countProcessableItems counts the processable items.
func (q *queue) countProcessableItems() int {
for i, result := range q.resultCache {
// Don't process incomplete or unavailable items.
if result == nil || result.Pending > 0 {
return i
}
// Special handling for the fast-sync pivot block:
if q.mode == FastSync {
bnum := result.Header.Number.Uint64()
if bnum == q.fastSyncPivot {
// If the state of the pivot block is not
// available yet, we cannot proceed and return 0.
//
// Stop before processing the pivot block to ensure that
// resultCache has space for fsHeaderForceVerify items. Not
// doing this could leave us unable to download the required
// amount of headers.
if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
return i
}
for j := 0; j < fsHeaderForceVerify; j++ {
if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
return i
}
}
}
// If we're just the fast sync pivot, stop as well
// because the following batch needs different insertion.
// This simplifies handling the switchover in d.process.
if bnum == q.fastSyncPivot+1 && i > 0 {
return i
}
}
}
return len(q.resultCache)
}
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any // ReserveBlocks reserves a set of block hashes for the given peer, skipping any
// previously failed download. // previously failed download.
func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest { func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
// If we're the first to request this task, initialise the result container // If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset)) index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 { if index >= len(q.resultCache) || index < 0 {
common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain return nil, false, errInvalidChain
} }
if q.resultCache[index] == nil { if q.resultCache[index] == nil {
@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
for _, header := range skip { for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64())) taskQueue.Push(header, -float32(header.Number.Uint64()))
} }
if progress {
// Wake WaitResults, resultCache was modified
q.active.Signal()
}
// Assemble and return the block download request // Assemble and return the block download request
if len(send) == 0 { if len(send) == 0 {
return nil, progress, nil return nil, progress, nil
@ -737,7 +740,7 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// expire is the generic check that move expired tasks from a pending pool back // expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks. // into a task pool, returning all entities caught with expired tasks.
// //
// Note, this method expects the queue lock to be already held for writing. The // Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need // reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway. // to access the queue, so they already need a lock anyway.
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
@ -813,17 +816,16 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
for hash, index := range request.Hashes { for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index)) q.hashQueue.Push(hash, float32(index))
} }
// Wake up WaitResults
q.active.Signal()
// If none of the blocks were good, it's a stale delivery // If none of the blocks were good, it's a stale delivery
switch { switch {
case len(errs) == 0: case len(errs) == 0:
return nil return nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
return errs[0] return errs[0]
case len(errs) == len(blocks): case len(errs) == len(blocks):
return errStaleDelivery return errStaleDelivery
default: default:
return fmt.Errorf("multiple failures: %v", errs) return fmt.Errorf("multiple failures: %v", errs)
} }
@ -915,14 +917,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
taskQueue.Push(header, -float32(header.Number.Uint64())) taskQueue.Push(header, -float32(header.Number.Uint64()))
} }
} }
// Wake up WaitResults
q.active.Signal()
// If none of the data was good, it's a stale delivery // If none of the data was good, it's a stale delivery
switch { switch {
case failure == nil || failure == errInvalidChain: case failure == nil || failure == errInvalidChain:
return failure return failure
case useful: case useful:
return fmt.Errorf("partial failure: %v", failure) return fmt.Errorf("partial failure: %v", failure)
default: default:
return errStaleDelivery return errStaleDelivery
} }
@ -977,10 +979,8 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
switch { switch {
case len(errs) == 0: case len(errs) == 0:
return nil return nil
case len(errs) == len(request.Hashes): case len(errs) == len(request.Hashes):
return errStaleDelivery return errStaleDelivery
default: default:
return fmt.Errorf("multiple failures: %v", errs) return fmt.Errorf("multiple failures: %v", errs)
} }
@ -989,6 +989,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// deliverNodeData is the asynchronous node data processor that injects a batch // deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler. // of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
// Wake up WaitResults after the state has been written because it
// might be waiting for the pivot block state to get completed.
defer q.active.Signal()
// Process results one by one to permit task fetches in between // Process results one by one to permit task fetches in between
for i, result := range results { for i, result := range results {
q.stateSchedLock.Lock() q.stateSchedLock.Lock()