forked from cerc-io/plugeth
Merge pull request #1184 from karalabe/nonstop-block-fetches
eth/downloader: fix #1178, don't request blocks beyond the cache bounds
This commit is contained in:
commit
10fc733767
@ -281,19 +281,19 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
case hashPack := <-d.hashCh:
|
||||
// Make sure the active peer is giving us the hashes
|
||||
if hashPack.peerId != active.id {
|
||||
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId)
|
||||
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
|
||||
break
|
||||
}
|
||||
timeout.Reset(hashTTL)
|
||||
|
||||
// Make sure the peer actually gave something valid
|
||||
if len(hashPack.hashes) == 0 {
|
||||
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id)
|
||||
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id)
|
||||
return errEmptyHashSet
|
||||
}
|
||||
for _, hash := range hashPack.hashes {
|
||||
if d.banned.Has(hash) {
|
||||
glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain\n", active.id)
|
||||
glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id)
|
||||
return ErrInvalidChain
|
||||
}
|
||||
}
|
||||
@ -301,7 +301,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
done, index := false, 0
|
||||
for index, head = range hashPack.hashes {
|
||||
if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
|
||||
glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4])
|
||||
glog.V(logger.Debug).Infof("Found common hash %x", head[:4])
|
||||
hashPack.hashes = hashPack.hashes[:index]
|
||||
done = true
|
||||
break
|
||||
@ -310,7 +310,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
// Insert all the new hashes, but only continue if got something useful
|
||||
inserts := d.queue.Insert(hashPack.hashes)
|
||||
if len(inserts) == 0 && !done {
|
||||
glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id)
|
||||
glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id)
|
||||
return ErrBadPeer
|
||||
}
|
||||
if !done {
|
||||
@ -365,7 +365,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
}
|
||||
|
||||
case <-timeout.C:
|
||||
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
|
||||
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id)
|
||||
|
||||
var p *peer // p will be set if a peer can be found
|
||||
// Attempt to find a new peer by checking inclusion of peers best hash in our
|
||||
@ -386,10 +386,10 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
||||
// by our previous (delayed) peer.
|
||||
active = p
|
||||
p.getHashes(head)
|
||||
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
|
||||
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id)
|
||||
}
|
||||
}
|
||||
glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start))
|
||||
glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start))
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -421,24 +421,29 @@ out:
|
||||
// If the peer was previously banned and failed to deliver it's pack
|
||||
// in a reasonable time frame, ignore it's message.
|
||||
if peer := d.peers.Peer(blockPack.peerId); peer != nil {
|
||||
// Deliver the received chunk of blocks
|
||||
// Deliver the received chunk of blocks, and demote in case of errors
|
||||
if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil {
|
||||
if err == ErrInvalidChain {
|
||||
// The hash chain is invalid (blocks are not ordered properly), abort
|
||||
return err
|
||||
}
|
||||
// Peer did deliver, but some blocks were off, penalize
|
||||
glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err)
|
||||
peer.Demote()
|
||||
peer.SetIdle()
|
||||
glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err)
|
||||
break
|
||||
}
|
||||
if glog.V(logger.Debug) && len(blockPack.blocks) > 0 {
|
||||
glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId)
|
||||
// If no blocks were delivered, demote the peer (above code is needed to mark the packet done!)
|
||||
if len(blockPack.blocks) == 0 {
|
||||
peer.Demote()
|
||||
peer.SetIdle()
|
||||
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
|
||||
break
|
||||
}
|
||||
// Promote the peer and update it's idle state
|
||||
// All was successful, promote the peer
|
||||
peer.Promote()
|
||||
peer.SetIdle()
|
||||
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
|
||||
}
|
||||
case <-ticker.C:
|
||||
// Check for bad peers. Bad peers may indicate a peer not responding
|
||||
@ -481,11 +486,14 @@ out:
|
||||
if request == nil {
|
||||
continue
|
||||
}
|
||||
if glog.V(logger.Detail) {
|
||||
glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
|
||||
}
|
||||
// Fetch the chunk and check for error. If the peer was somehow
|
||||
// already fetching a chunk due to a bug, it will be returned to
|
||||
// the queue
|
||||
if err := peer.Fetch(request); err != nil {
|
||||
glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id)
|
||||
glog.V(logger.Error).Infof("Peer %s received double work", peer.id)
|
||||
d.queue.Cancel(request)
|
||||
}
|
||||
}
|
||||
|
@ -5,14 +5,13 @@ package downloader
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"gopkg.in/fatih/set.v0"
|
||||
)
|
||||
|
||||
@ -100,9 +99,6 @@ func (p *peer) SetIdle() {
|
||||
if next == 1 {
|
||||
p.Demote()
|
||||
}
|
||||
if prev != next {
|
||||
glog.V(logger.Detail).Infof("%s: changing block download capacity from %d to %d", p.id, prev, next)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -135,6 +131,15 @@ func (p *peer) Demote() {
|
||||
}
|
||||
}
|
||||
|
||||
// String implements fmt.Stringer.
|
||||
func (p *peer) String() string {
|
||||
return fmt.Sprintf("Peer %s [%s]", p.id,
|
||||
fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
|
||||
fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+
|
||||
fmt.Sprintf("ignored %4d", p.ignored.Size()),
|
||||
)
|
||||
}
|
||||
|
||||
// peerSet represents the collection of active peer participating in the block
|
||||
// download procedure.
|
||||
type peerSet struct {
|
||||
|
@ -225,7 +225,7 @@ func (q *queue) Reserve(p *peer) *fetchRequest {
|
||||
skip := make(map[common.Hash]int)
|
||||
|
||||
capacity := p.Capacity()
|
||||
for len(send) < space && len(send) < capacity && !q.hashQueue.Empty() {
|
||||
for proc := 0; proc < space && len(send) < capacity && !q.hashQueue.Empty(); proc++ {
|
||||
hash, priority := q.hashQueue.Pop()
|
||||
if p.ignored.Has(hash) {
|
||||
skip[hash.(common.Hash)] = int(priority)
|
||||
|
Loading…
Reference in New Issue
Block a user