From bd5720f4804788d91154a10ef5bb10425c502658 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 8 May 2015 15:22:48 +0300 Subject: [PATCH] eth, eth/downloader: handle sync errors a bit more gracefully --- eth/downloader/downloader.go | 28 ++++++++-------- eth/downloader/downloader_test.go | 6 ++-- eth/handler.go | 8 ++--- eth/sync.go | 54 ++++++++++++++++--------------- 4 files changed, 48 insertions(+), 48 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index ef2a193ff..a97cce1ef 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -24,12 +24,12 @@ var ( blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out errLowTd = errors.New("peer's TD is too low") - errBusy = errors.New("busy") + ErrBusy = errors.New("busy") errUnknownPeer = errors.New("peer's unknown or unhealthy") - ErrBadPeer = errors.New("action from bad peer ignored") + errBadPeer = errors.New("action from bad peer ignored") errNoPeers = errors.New("no peers to keep download active") errPendingQueue = errors.New("pending items in queue") - errTimeout = errors.New("timeout") + ErrTimeout = errors.New("timeout") errEmptyHashSet = errors.New("empty hash set by peer") errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errAlreadyInPool = errors.New("hash already in pool") @@ -68,7 +68,7 @@ type Downloader struct { getBlock getBlockFn // Status - synchronizing int32 + synchronising int32 // Channels newPeerCh chan *peer @@ -119,15 +119,15 @@ func (d *Downloader) UnregisterPeer(id string) { delete(d.peers, id) } -// Synchronize will select the peer and use it for synchronizing. If an empty string is given +// Synchronise will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) Synchronize(id string, hash common.Hash) error { +func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Make sure only one goroutine is ever allowed past this point at once - if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) { - return nil + if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { + return ErrBusy } - defer atomic.StoreInt32(&d.synchronizing, 0) + defer atomic.StoreInt32(&d.synchronising, 0) // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 { @@ -272,7 +272,7 @@ out: // the zero hash. if p == nil || (hash == common.Hash{}) { d.queue.Reset() - return errTimeout + return ErrTimeout } // set p to the active peer. this will invalidate any hashes that may be returned @@ -282,7 +282,7 @@ out: glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) } } - glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start)) + glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start)) return nil } @@ -384,7 +384,6 @@ out: } } } - glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start)) return nil @@ -404,11 +403,10 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer) } - if glog.V(logger.Detail) && len(hashes) != 0 { + if glog.V(logger.Debug) && len(hashes) != 0 { from, to := hashes[0], hashes[len(hashes)-1] - glog.Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id) + glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id) } - d.hashCh <- hashPack{id, hashes} return nil diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index f3402794b..8ccc4d1a5 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types func (dl *downloadTester) sync(peerId string, hash common.Hash) error { dl.activePeerId = peerId - return dl.downloader.Synchronize(peerId, hash) + return dl.downloader.Synchronise(peerId, hash) } func (dl *downloadTester) hasBlock(hash common.Hash) bool { @@ -217,13 +217,13 @@ func TestThrottling(t *testing.T) { } }() - // Synchronize the two threads and verify + // Synchronise the two threads and verify err := <-errc done <- struct{}{} <-done if err != nil { - t.Fatalf("failed to synchronize blocks: %v", err) + t.Fatalf("failed to synchronise blocks: %v", err) } if len(took) != targetBlocks { t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks) diff --git a/eth/handler.go b/eth/handler.go index b2018f336..41b6728d9 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -19,9 +19,9 @@ import ( ) const ( - peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount - blockProcTimer = 500 * time.Millisecond - minDesiredPeerCount = 5 // Amount of peers desired to start syncing + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process + minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockProcAmount = 256 ) @@ -324,7 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } self.BroadcastBlock(hash, request.Block) } else { - go self.synchronize(p) + go self.synchronise(p) } default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) diff --git a/eth/sync.go b/eth/sync.go index b259c1d47..c49f5209d 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -12,10 +12,8 @@ import ( // Sync contains all synchronisation code for the eth protocol func (pm *ProtocolManager) update() { - // itimer is used to determine when to start ignoring `minDesiredPeerCount` - itimer := time.NewTimer(peerCountTimeout) - // btimer is used for picking of blocks from the downloader - btimer := time.Tick(blockProcTimer) + forceSync := time.Tick(forceSyncCycle) + blockProc := time.Tick(blockProcCycle) for { select { @@ -24,27 +22,22 @@ func (pm *ProtocolManager) update() { if len(pm.peers) < minDesiredPeerCount { break } - - // Find the best peer + // Find the best peer and synchronise with it peer := getBestPeer(pm.peers) if peer == nil { - glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available") + glog.V(logger.Debug).Infoln("Sync attempt canceled. No peers available") } + go pm.synchronise(peer) - itimer.Stop() - go pm.synchronize(peer) - case <-itimer.C: - // The timer will make sure that the downloader keeps an active state - // in which it attempts to always check the network for highest td peers - // Either select the peer or restart the timer if no peers could - // be selected. + case <-forceSync: + // Force a sync even if not enough peers are present if peer := getBestPeer(pm.peers); peer != nil { - go pm.synchronize(peer) - } else { - itimer.Reset(5 * time.Second) + go pm.synchronise(peer) } - case <-btimer: + case <-blockProc: + // Try to pull some blocks from the downloaded go pm.processBlocks() + case <-pm.quitSync: return } @@ -59,11 +52,11 @@ func (pm *ProtocolManager) processBlocks() error { pm.wg.Add(1) defer pm.wg.Done() + // Take a batch of blocks (will return nil if a previous batch has not reached the chain yet) blocks := pm.downloader.TakeBlocks() if len(blocks) == 0 { return nil } - glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number()) for len(blocks) != 0 && !pm.quit { @@ -77,7 +70,7 @@ func (pm *ProtocolManager) processBlocks() error { return nil } -func (pm *ProtocolManager) synchronize(peer *peer) { +func (pm *ProtocolManager) synchronise(peer *peer) { // Make sure the peer's TD is higher than our own. If not drop. if peer.td.Cmp(pm.chainman.Td()) <= 0 { return @@ -89,12 +82,21 @@ func (pm *ProtocolManager) synchronize(peer *peer) { return } // Get the hashes from the peer (synchronously) - err := pm.downloader.Synchronize(peer.id, peer.recentHash) - if err != nil && err == downloader.ErrBadPeer { - glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action") + glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) + + err := pm.downloader.Synchronise(peer.id, peer.recentHash) + switch err { + case nil: + glog.V(logger.Debug).Infof("Synchronisation completed") + + case downloader.ErrBusy: + glog.V(logger.Debug).Infof("Synchronisation already in progress") + + case downloader.ErrTimeout: + glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id) pm.removePeer(peer) - } else if err != nil { - // handle error - glog.V(logger.Detail).Infoln("error downloading:", err) + + default: + glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } }