eth/downloader: fix deliveries to check for sync cancels

This commit is contained in:
Péter Szilágyi 2015-05-13 13:47:21 +03:00
parent 7cb0e24245
commit ee0c892303

View File

@ -70,7 +70,9 @@ type Downloader struct {
newPeerCh chan *peer newPeerCh chan *peer
hashCh chan hashPack hashCh chan hashPack
blockCh chan blockPack blockCh chan blockPack
cancelCh chan struct{}
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
} }
func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
@ -83,6 +85,9 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
hashCh: make(chan hashPack, 1), hashCh: make(chan hashPack, 1),
blockCh: make(chan blockPack, 1), blockCh: make(chan blockPack, 1),
} }
// Set the initial downloader state as canceled (sanity check)
downloader.cancelCh = make(chan struct{})
close(downloader.cancelCh)
return downloader return downloader
} }
@ -123,8 +128,10 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
} }
defer atomic.StoreInt32(&d.synchronising, 0) defer atomic.StoreInt32(&d.synchronising, 0)
// Create cancel channel for aborting midflight // Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
d.cancelCh = make(chan struct{}) d.cancelCh = make(chan struct{})
d.cancelLock.Unlock()
// Abort if the queue still contains some leftover data // Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
@ -421,9 +428,18 @@ func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
if atomic.LoadInt32(&d.synchronising) == 0 { if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive return errNoSyncActive
} }
d.blockCh <- blockPack{id, blocks} // Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
return nil select {
case d.blockCh <- blockPack{id, blocks}:
return nil
case <-cancel:
return errNoSyncActive
}
} }
// DeliverHashes injects a new batch of hashes received from a remote node into // DeliverHashes injects a new batch of hashes received from a remote node into
@ -434,11 +450,16 @@ func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
if atomic.LoadInt32(&d.synchronising) == 0 { if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive return errNoSyncActive
} }
if glog.V(logger.Debug) && len(hashes) != 0 { // Deliver or abort if the sync is canceled while queuing
from, to := hashes[0], hashes[len(hashes)-1] d.cancelLock.RLock()
glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id) cancel := d.cancelCh
} d.cancelLock.RUnlock()
d.hashCh <- hashPack{id, hashes}
return nil select {
case d.hashCh <- hashPack{id, hashes}:
return nil
case <-cancel:
return errNoSyncActive
}
} }