eth: added a cancel method for the downloader
Added a cancel method to the downloader which gracefully shuts down any active syncing process (hash fetching or block downloading) and resets the queue and remove any pending blocks. Issue with the downloader which would stall because of an active ongoing process when an invalid block was found.
This commit is contained in:
parent
45dc690947
commit
05715f27cf
@ -34,6 +34,9 @@ var (
|
|||||||
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
||||||
errAlreadyInPool = errors.New("hash already in pool")
|
errAlreadyInPool = errors.New("hash already in pool")
|
||||||
errBlockNumberOverflow = errors.New("received block which overflows")
|
errBlockNumberOverflow = errors.New("received block which overflows")
|
||||||
|
errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
|
||||||
|
errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
|
||||||
|
errNoSyncActive = errors.New("no sync active")
|
||||||
)
|
)
|
||||||
|
|
||||||
type hashCheckFn func(common.Hash) bool
|
type hashCheckFn func(common.Hash) bool
|
||||||
@ -74,6 +77,7 @@ type Downloader struct {
|
|||||||
newPeerCh chan *peer
|
newPeerCh chan *peer
|
||||||
hashCh chan hashPack
|
hashCh chan hashPack
|
||||||
blockCh chan blockPack
|
blockCh chan blockPack
|
||||||
|
cancelCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
|
func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
|
||||||
@ -129,6 +133,9 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
|
|||||||
}
|
}
|
||||||
defer atomic.StoreInt32(&d.synchronising, 0)
|
defer atomic.StoreInt32(&d.synchronising, 0)
|
||||||
|
|
||||||
|
// Create cancel channel for aborting midflight
|
||||||
|
d.cancelCh = make(chan struct{})
|
||||||
|
|
||||||
// Abort if the queue still contains some leftover data
|
// Abort if the queue still contains some leftover data
|
||||||
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
|
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
|
||||||
return errPendingQueue
|
return errPendingQueue
|
||||||
@ -161,7 +168,6 @@ func (d *Downloader) Has(hash common.Hash) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) {
|
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) {
|
||||||
|
|
||||||
d.activePeer = p.id
|
d.activePeer = p.id
|
||||||
defer func() {
|
defer func() {
|
||||||
// reset on error
|
// reset on error
|
||||||
@ -191,6 +197,42 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cancel cancels all of the operations and resets the queue. It returns true
|
||||||
|
// if the cancel operation was completed.
|
||||||
|
func (d *Downloader) Cancel() bool {
|
||||||
|
hs, bs := d.queue.Size()
|
||||||
|
// If we're not syncing just return.
|
||||||
|
if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
close(d.cancelCh)
|
||||||
|
|
||||||
|
// clean up
|
||||||
|
hashDone:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-d.hashCh:
|
||||||
|
default:
|
||||||
|
break hashDone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDone:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-d.blockCh:
|
||||||
|
default:
|
||||||
|
break blockDone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset the queue
|
||||||
|
d.queue.Reset()
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// XXX Make synchronous
|
// XXX Make synchronous
|
||||||
func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
|
func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
|
||||||
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
|
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
|
||||||
@ -217,6 +259,8 @@ func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial b
|
|||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-d.cancelCh:
|
||||||
|
return errCancelHashFetch
|
||||||
case hashPack := <-d.hashCh:
|
case hashPack := <-d.hashCh:
|
||||||
// Make sure the active peer is giving us the hashes
|
// Make sure the active peer is giving us the hashes
|
||||||
if hashPack.peerId != activePeer.id {
|
if hashPack.peerId != activePeer.id {
|
||||||
@ -305,6 +349,8 @@ func (d *Downloader) startFetchingBlocks(p *peer) error {
|
|||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-d.cancelCh:
|
||||||
|
return errCancelBlockFetch
|
||||||
case blockPack := <-d.blockCh:
|
case blockPack := <-d.blockCh:
|
||||||
// If the peer was previously banned and failed to deliver it's pack
|
// If the peer was previously banned and failed to deliver it's pack
|
||||||
// in a reasonable time frame, ignore it's message.
|
// in a reasonable time frame, ignore it's message.
|
||||||
@ -394,11 +440,23 @@ out:
|
|||||||
|
|
||||||
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
|
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
|
||||||
// the protocol handler.
|
// the protocol handler.
|
||||||
func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) {
|
func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) error {
|
||||||
|
// Make sure the downloader is active
|
||||||
|
if atomic.LoadInt32(&d.synchronising) == 0 {
|
||||||
|
return errNoSyncActive
|
||||||
|
}
|
||||||
|
|
||||||
d.blockCh <- blockPack{id, blocks}
|
d.blockCh <- blockPack{id, blocks}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
|
func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
|
||||||
|
// Make sure the downloader is active
|
||||||
|
if atomic.LoadInt32(&d.synchronising) == 0 {
|
||||||
|
return errNoSyncActive
|
||||||
|
}
|
||||||
|
|
||||||
// make sure that the hashes that are being added are actually from the peer
|
// make sure that the hashes that are being added are actually from the peer
|
||||||
// that's the current active peer. hashes that have been received from other
|
// that's the current active peer. hashes that have been received from other
|
||||||
// peers are dropped and ignored.
|
// peers are dropped and ignored.
|
||||||
|
@ -182,6 +182,49 @@ func TestTaking(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInactiveDownloader(t *testing.T) {
|
||||||
|
targetBlocks := 1000
|
||||||
|
hashes := createHashes(0, targetBlocks)
|
||||||
|
blocks := createBlocksFromHashSet(createHashSet(hashes))
|
||||||
|
tester := newTester(t, hashes, nil)
|
||||||
|
|
||||||
|
err := tester.downloader.AddHashes("bad peer 001", hashes)
|
||||||
|
if err != errNoSyncActive {
|
||||||
|
t.Error("expected no sync error, got", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tester.downloader.DeliverChunk("bad peer 001", blocks)
|
||||||
|
if err != errNoSyncActive {
|
||||||
|
t.Error("expected no sync error, got", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCancel(t *testing.T) {
|
||||||
|
minDesiredPeerCount = 4
|
||||||
|
blockTtl = 1 * time.Second
|
||||||
|
|
||||||
|
targetBlocks := 1000
|
||||||
|
hashes := createHashes(0, targetBlocks)
|
||||||
|
blocks := createBlocksFromHashes(hashes)
|
||||||
|
tester := newTester(t, hashes, blocks)
|
||||||
|
|
||||||
|
tester.newPeer("peer1", big.NewInt(10000), hashes[0])
|
||||||
|
|
||||||
|
err := tester.sync("peer1", hashes[0])
|
||||||
|
if err != nil {
|
||||||
|
t.Error("download error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !tester.downloader.Cancel() {
|
||||||
|
t.Error("cancel operation unsuccessfull")
|
||||||
|
}
|
||||||
|
|
||||||
|
hashSize, blockSize := tester.downloader.queue.Size()
|
||||||
|
if hashSize > 0 || blockSize > 0 {
|
||||||
|
t.Error("block (", blockSize, ") or hash (", hashSize, ") not 0")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestThrottling(t *testing.T) {
|
func TestThrottling(t *testing.T) {
|
||||||
minDesiredPeerCount = 4
|
minDesiredPeerCount = 4
|
||||||
blockTtl = 1 * time.Second
|
blockTtl = 1 * time.Second
|
||||||
|
@ -63,6 +63,9 @@ func (pm *ProtocolManager) processBlocks() error {
|
|||||||
max := int(math.Min(float64(len(blocks)), float64(blockProcAmount)))
|
max := int(math.Min(float64(len(blocks)), float64(blockProcAmount)))
|
||||||
_, err := pm.chainman.InsertChain(blocks[:max])
|
_, err := pm.chainman.InsertChain(blocks[:max])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// cancel download process
|
||||||
|
pm.downloader.Cancel()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
blocks = blocks[max:]
|
blocks = blocks[max:]
|
||||||
|
Loading…
Reference in New Issue
Block a user