diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 55455262a..85de78ebf 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -65,12 +65,15 @@ type Downloader struct { // Status synchronising int32 + notified int32 // Channels newPeerCh chan *peer hashCh chan hashPack blockCh chan blockPack - cancelCh chan struct{} + + cancelCh chan struct{} // Channel to cancel mid-flight syncs + cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers } func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { @@ -83,7 +86,6 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), } - return downloader } @@ -123,8 +125,14 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } defer atomic.StoreInt32(&d.synchronising, 0) - // Create cancel channel for aborting midflight + // Post a user notification of the sync (only once per session) + if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { + glog.V(logger.Info).Infoln("Block synchronisation started") + } + // Create cancel channel for aborting mid-flight + d.cancelLock.Lock() d.cancelCh = make(chan struct{}) + d.cancelLock.Unlock() // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { @@ -183,32 +191,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { // Cancel cancels all of the operations and resets the queue. It returns true // if the cancel operation was completed. func (d *Downloader) Cancel() bool { - hs, bs := d.queue.Size() // If we're not syncing just return. + hs, bs := d.queue.Size() if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 { return false } - + // Close the current cancel channel + d.cancelLock.RLock() close(d.cancelCh) - - // clean up -hashDone: - for { - select { - case <-d.hashCh: - default: - break hashDone - } - } - -blockDone: - for { - select { - case <-d.blockCh: - default: - break blockDone - } - } + d.cancelLock.RUnlock() // reset the queue d.queue.Reset() @@ -421,9 +412,18 @@ func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - d.blockCh <- blockPack{id, blocks} + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() - return nil + select { + case d.blockCh <- blockPack{id, blocks}: + return nil + + case <-cancel: + return errNoSyncActive + } } // DeliverHashes injects a new batch of hashes received from a remote node into @@ -434,11 +434,16 @@ func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error { if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - if glog.V(logger.Debug) && len(hashes) != 0 { - from, to := hashes[0], hashes[len(hashes)-1] - glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id) - } - d.hashCh <- hashPack{id, hashes} + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() - return nil + select { + case d.hashCh <- hashPack{id, hashes}: + return nil + + case <-cancel: + return errNoSyncActive + } }