forked from cerc-io/plugeth
downloader: defer peer reset after download
This commit is contained in:
parent
86ecdcd5ff
commit
5c59d95532
@ -205,13 +205,13 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia
|
|||||||
// Get the first batch of hashes
|
// Get the first batch of hashes
|
||||||
p.getHashes(hash)
|
p.getHashes(hash)
|
||||||
|
|
||||||
failureResponse := time.NewTimer(hashTtl)
|
failureResponseTimer := time.NewTimer(hashTtl)
|
||||||
|
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case hashes := <-d.hashCh:
|
case hashes := <-d.hashCh:
|
||||||
failureResponse.Reset(hashTtl)
|
failureResponseTimer.Reset(hashTtl)
|
||||||
|
|
||||||
var done bool // determines whether we're done fetching hashes (i.e. common hash found)
|
var done bool // determines whether we're done fetching hashes (i.e. common hash found)
|
||||||
hashSet := set.New()
|
hashSet := set.New()
|
||||||
@ -239,7 +239,7 @@ out:
|
|||||||
} else { // we're done
|
} else { // we're done
|
||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
case <-failureResponse.C:
|
case <-failureResponseTimer.C:
|
||||||
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
|
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
|
||||||
// TODO instead of reseting the queue select a new peer from which we can start downloading hashes.
|
// TODO instead of reseting the queue select a new peer from which we can start downloading hashes.
|
||||||
// 1. check for peer's best hash to be included in the current hash set;
|
// 1. check for peer's best hash to be included in the current hash set;
|
||||||
@ -258,6 +258,10 @@ func (d *Downloader) startFetchingBlocks(p *peer) error {
|
|||||||
glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "block(s)")
|
glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "block(s)")
|
||||||
atomic.StoreInt32(&d.downloadingBlocks, 1)
|
atomic.StoreInt32(&d.downloadingBlocks, 1)
|
||||||
defer atomic.StoreInt32(&d.downloadingBlocks, 0)
|
defer atomic.StoreInt32(&d.downloadingBlocks, 0)
|
||||||
|
// Defer the peer reset. This will empty the peer requested set
|
||||||
|
// and makes sure there are no lingering peers with an incorrect
|
||||||
|
// state
|
||||||
|
defer d.peers.reset()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
@ -302,7 +306,6 @@ out:
|
|||||||
// and all failed throw an error
|
// and all failed throw an error
|
||||||
if len(d.queue.fetching) == 0 {
|
if len(d.queue.fetching) == 0 {
|
||||||
d.queue.reset()
|
d.queue.reset()
|
||||||
d.peers.reset()
|
|
||||||
|
|
||||||
return fmt.Errorf("%v avaialable = %d. total = %d", errPeersUnavailable, len(availablePeers), len(d.peers))
|
return fmt.Errorf("%v avaialable = %d. total = %d", errPeersUnavailable, len(availablePeers), len(d.peers))
|
||||||
}
|
}
|
||||||
|
@ -137,4 +137,5 @@ func (p *peer) demote() {
|
|||||||
|
|
||||||
func (p *peer) reset() {
|
func (p *peer) reset() {
|
||||||
p.state = idleState
|
p.state = idleState
|
||||||
|
p.requested.Clear()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user