From 28c32d1b1bc9ed687713aa4de4eaab38d2a4cb08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 4 Jun 2015 14:51:14 +0300 Subject: [PATCH] eth/downloader: fix #1178, don't request blocks beyond the cache bounds --- eth/downloader/downloader.go | 38 +++++++++++++++++++++++------------- eth/downloader/peer.go | 10 ++++++++++ eth/downloader/queue.go | 2 +- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f52a97610..806f60f1b 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -281,19 +281,19 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes if hashPack.peerId != active.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId) + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) break } timeout.Reset(hashTTL) // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { - glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) return errEmptyHashSet } for _, hash := range hashPack.hashes { if d.banned.Has(hash) { - glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id) return ErrInvalidChain } } @@ -301,7 +301,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { done, index := false, 0 for index, head = range hashPack.hashes { if d.hasBlock(head) || d.queue.GetBlock(head) != nil { - glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4]) + glog.V(logger.Debug).Infof("Found common hash %x", head[:4]) hashPack.hashes = hashPack.hashes[:index] done = true break @@ -310,7 +310,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // Insert all the new hashes, but only continue if got something useful inserts := d.queue.Insert(hashPack.hashes) if len(inserts) == 0 && !done { - glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) return ErrBadPeer } if !done { @@ -365,7 +365,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } case <-timeout.C: - glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) + glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id) var p *peer // p will be set if a peer can be found // Attempt to find a new peer by checking inclusion of peers best hash in our @@ -386,10 +386,10 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // by our previous (delayed) peer. active = p p.getHashes(head) - glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) + glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id) } } - glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start)) + glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start)) return nil } @@ -421,22 +421,29 @@ out: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { - // Deliver the received chunk of blocks + // Deliver the received chunk of blocks, and demote in case of errors if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { if err == ErrInvalidChain { // The hash chain is invalid (blocks are not ordered properly), abort return err } // Peer did deliver, but some blocks were off, penalize - glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) + glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) peer.Demote() peer.SetIdle() break } - if glog.V(logger.Debug) && len(blockPack.blocks) > 0 { - glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) + // If no blocks were delivered, demote the peer (above code is needed to mark the packet done!) + if len(blockPack.blocks) == 0 { + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + peer.Demote() + peer.SetIdle() + break + } + // All was successful, promote the peer + if glog.V(logger.Detail) && len(blockPack.blocks) > 0 { + glog.Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) } - // Promote the peer and update it's idle state peer.Promote() peer.SetIdle() } @@ -481,11 +488,14 @@ out: if request == nil { continue } + if glog.V(logger.Detail) { + glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes)) + } // Fetch the chunk and check for error. If the peer was somehow // already fetching a chunk due to a bug, it will be returned to // the queue if err := peer.Fetch(request); err != nil { - glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id) + glog.V(logger.Error).Infof("Peer %s received double work", peer.id) d.queue.Cancel(request) } } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 8ef017df7..2b3f8798e 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -5,6 +5,7 @@ package downloader import ( "errors" + "fmt" "math" "sync" "sync/atomic" @@ -135,6 +136,15 @@ func (p *peer) Demote() { } } +// String implements fmt.Stringer. +func (p *peer) String() string { + return fmt.Sprintf("Peer %s [%s]", p.id, + fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ + fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+ + fmt.Sprintf("ignored %4d", p.ignored.Size()), + ) +} + // peerSet represents the collection of active peer participating in the block // download procedure. type peerSet struct { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 69d91512a..02fa667f1 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -225,7 +225,7 @@ func (q *queue) Reserve(p *peer) *fetchRequest { skip := make(map[common.Hash]int) capacity := p.Capacity() - for len(send) < space && len(send) < capacity && !q.hashQueue.Empty() { + for proc := 0; proc < space && len(send) < capacity && !q.hashQueue.Empty(); proc++ { hash, priority := q.hashQueue.Pop() if p.ignored.Has(hash) { skip[hash.(common.Hash)] = int(priority)