From b86e7526e12a5a49c1739ec02d3c1c5cc667dcb3 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 24 Apr 2015 14:40:32 +0200 Subject: [PATCH] eth, eth/downloader: moved peer selection to protocol handler --- eth/downloader/downloader.go | 107 ++++++++++++++++++---------------- eth/downloader/synchronous.go | 79 ------------------------- eth/handler.go | 65 +++++++++++++++++++-- eth/peer.go | 22 +++++-- 4 files changed, 133 insertions(+), 140 deletions(-) delete mode 100644 eth/downloader/synchronous.go diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index cfc494b2f..6ac8310b3 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -89,7 +89,7 @@ func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) blockCh: make(chan blockPack, 1), quit: make(chan struct{}), } - go downloader.peerHandler() + //go downloader.peerHandler() go downloader.update() return downloader @@ -110,7 +110,6 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH // add peer to our peer set d.peers[id] = peer // broadcast new peer - d.newPeerCh <- peer return nil } @@ -125,55 +124,6 @@ func (d *Downloader) UnregisterPeer(id string) { delete(d.peers, id) } -func (d *Downloader) peerHandler() { - // itimer is used to determine when to start ignoring `minDesiredPeerCount` - itimer := time.NewTimer(peerCountTimeout) -out: - for { - select { - case <-d.newPeerCh: - // Meet the `minDesiredPeerCount` before we select our best peer - if len(d.peers) < minDesiredPeerCount { - break - } - itimer.Stop() - - d.selectPeer(d.peers.bestPeer()) - case <-itimer.C: - // The timer will make sure that the downloader keeps an active state - // in which it attempts to always check the network for highest td peers - // Either select the peer or restart the timer if no peers could - // be selected. - if peer := d.peers.bestPeer(); peer != nil { - d.selectPeer(d.peers.bestPeer()) - } else { - itimer.Reset(5 * time.Second) - } - case <-d.quit: - break out - } - } -} - -func (d *Downloader) selectPeer(p *peer) { - // Make sure it's doing neither. Once done we can restart the - // downloading process if the TD is higher. For now just get on - // with whatever is going on. This prevents unecessary switching. - if d.isBusy() { - return - } - // selected peer must be better than our own - // XXX we also check the peer's recent hash to make sure we - // don't have it. Some peers report (i think) incorrect TD. - if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { - return - } - - glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) - d.syncCh <- syncPack{p, p.recentHash, false} - -} - func (d *Downloader) update() { out: for { @@ -193,6 +143,61 @@ out: } } +// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given +// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the +// checks fail an error will be returned. This method is synchronous +func (d *Downloader) Synchronise(id string, hash common.Hash) (types.Blocks, error) { + // Make sure it's doing neither. Once done we can restart the + // downloading process if the TD is higher. For now just get on + // with whatever is going on. This prevents unecessary switching. + if d.isBusy() { + return nil, errBusy + } + + // Fetch the peer using the id or throw an error if the peer couldn't be found + p := d.peers[id] + if p == nil { + return nil, errUnknownPeer + } + + // Get the hash from the peer and initiate the downloading progress. + err := d.getFromPeer(p, hash, false) + if err != nil { + return nil, err + } + + return d.queue.blocks, nil +} + +func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error { + d.activePeer = p.id + + glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id) + // Start the fetcher. This will block the update entirely + // interupts need to be send to the appropriate channels + // respectively. + if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil { + // handle error + glog.V(logger.Debug).Infoln("Error fetching hashes:", err) + // XXX Reset + return err + } + + // Start fetching blocks in paralel. The strategy is simple + // take any available peers, seserve a chunk for each peer available, + // let the peer deliver the chunkn and periodically check if a peer + // has timedout. When done downloading, process blocks. + if err := d.startFetchingBlocks(p); err != nil { + glog.V(logger.Debug).Infoln("Error downloading blocks:", err) + // XXX reset + return err + } + + glog.V(logger.Detail).Infoln("Sync completed") + + return nil +} + // XXX Make synchronous func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error { atomic.StoreInt32(&d.fetchingHashes, 1) diff --git a/eth/downloader/synchronous.go b/eth/downloader/synchronous.go deleted file mode 100644 index 7bb49d24e..000000000 --- a/eth/downloader/synchronous.go +++ /dev/null @@ -1,79 +0,0 @@ -package downloader - -import ( - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" -) - -// THIS IS PENDING AND TO DO CHANGES FOR MAKING THE DOWNLOADER SYNCHRONOUS - -// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given -// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the -// checks fail an error will be returned. This method is synchronous -func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) { - // Check if we're busy - if d.isBusy() { - return nil, errBusy - } - - // Attempt to select a peer. This can either be nothing, which returns, best peer - // or selected peer. If no peer could be found an error will be returned - var p *peer - if len(id) == 0 { - p = d.peers[id] - if p == nil { - return nil, errUnknownPeer - } - } else { - p = d.peers.bestPeer() - } - - // Make sure our td is lower than the peer's td - if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { - return nil, errLowTd - } - - // Get the hash from the peer and initiate the downloading progress. - err := d.getFromPeer(p, p.recentHash, false) - if err != nil { - return nil, err - } - - return d.queue.blocks, nil -} - -// Synchronise will synchronise using the best peer. -func (d *Downloader) Synchronise() (types.Blocks, error) { - return d.SynchroniseWithPeer("") -} - -func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error { - d.activePeer = p.id - - glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id) - // Start the fetcher. This will block the update entirely - // interupts need to be send to the appropriate channels - // respectively. - if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil { - // handle error - glog.V(logger.Debug).Infoln("Error fetching hashes:", err) - // XXX Reset - return err - } - - // Start fetching blocks in paralel. The strategy is simple - // take any available peers, seserve a chunk for each peer available, - // let the peer deliver the chunkn and periodically check if a peer - // has timedout. When done downloading, process blocks. - if err := d.startFetchingBlocks(p); err != nil { - glog.V(logger.Debug).Infoln("Error downloading blocks:", err) - // XXX reset - return err - } - - glog.V(logger.Detail).Infoln("Sync completed") - - return nil -} diff --git a/eth/handler.go b/eth/handler.go index a634b5bfd..a1b03f57c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -39,6 +39,7 @@ import ( "math" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -51,6 +52,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +const ( + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount + minDesiredPeerCount = 5 // Amount of peers desired to start syncing +) + func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -82,6 +88,9 @@ type ProtocolManager struct { eventMux *event.TypeMux txSub event.Subscription minedBlockSub event.Subscription + + newPeerCh chan *peer + quit chan struct{} } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -93,7 +102,10 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo chainman: chainman, downloader: downloader, peers: make(map[string]*peer), + newPeerCh: make(chan *peer, 1), + quit: make(chan struct{}), } + go manager.peerHandler() manager.SubProtocol = p2p.Protocol{ Name: "eth", @@ -101,16 +113,61 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo Length: ProtocolLength, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(protocolVersion, networkId, p, rw) - err := manager.handle(peer) - //glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err) - return err + manager.newPeerCh <- peer + + return manager.handle(peer) }, } return manager } +func (pm *ProtocolManager) peerHandler() { + // itimer is used to determine when to start ignoring `minDesiredPeerCount` + itimer := time.NewTimer(peerCountTimeout) +out: + for { + select { + case <-pm.newPeerCh: + // Meet the `minDesiredPeerCount` before we select our best peer + if len(pm.peers) < minDesiredPeerCount { + break + } + itimer.Stop() + + // Find the best peer + peer := getBestPeer(pm.peers) + if peer == nil { + glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available") + return + } + go pm.synchronise(peer) + case <-itimer.C: + // The timer will make sure that the downloader keeps an active state + // in which it attempts to always check the network for highest td peers + // Either select the peer or restart the timer if no peers could + // be selected. + if peer := getBestPeer(pm.peers); peer != nil { + go pm.synchronise(peer) + } else { + itimer.Reset(5 * time.Second) + } + case <-pm.quit: + break out + } + } +} + +func (pm *ProtocolManager) synchronise(peer *peer) { + // Get the hashes from the peer (synchronously) + _, err := pm.downloader.Synchronise(peer.id, peer.recentHash) + if err != nil { + // handle error + glog.V(logger.Debug).Infoln("error downloading:", err) + } +} + func (pm *ProtocolManager) Start() { // broadcast transactions pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) @@ -141,7 +198,7 @@ func (pm *ProtocolManager) handle(p *peer) error { pm.peers[p.id] = p pm.pmu.Unlock() - pm.downloader.RegisterPeer(p.id, p.td, p.currentHash, p.requestHashes, p.requestBlocks) + pm.downloader.RegisterPeer(p.id, p.td, p.recentHash, p.requestHashes, p.requestBlocks) defer func() { pm.pmu.Lock() defer pm.pmu.Unlock() diff --git a/eth/peer.go b/eth/peer.go index ec0c4b1f3..861efaaec 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -25,6 +25,16 @@ type getBlockHashesMsgData struct { Amount uint64 } +func getBestPeer(peers map[string]*peer) *peer { + var peer *peer + for _, cp := range peers { + if peer == nil || cp.td.Cmp(peer.td) > 0 { + peer = cp + } + } + return peer +} + type peer struct { *p2p.Peer @@ -32,9 +42,9 @@ type peer struct { protv, netid int - currentHash common.Hash - id string - td *big.Int + recentHash common.Hash + id string + td *big.Int genesis, ourHash common.Hash ourTd *big.Int @@ -43,14 +53,14 @@ type peer struct { blockHashes *set.Set } -func newPeer(protv, netid int, genesis, currentHash common.Hash, td *big.Int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { +func newPeer(protv, netid int, genesis, recentHash common.Hash, td *big.Int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { id := p.ID() return &peer{ Peer: p, rw: rw, genesis: genesis, - ourHash: currentHash, + ourHash: recentHash, ourTd: td, protv: protv, netid: netid, @@ -145,7 +155,7 @@ func (p *peer) handleStatus() error { // Set the total difficulty of the peer p.td = status.TD // set the best hash of the peer - p.currentHash = status.CurrentBlock + p.recentHash = status.CurrentBlock return <-errc }