// Contains the active peer-set of the downloader, maintaining both failures // as well as reputation metrics to prioritize the block retrievals. package downloader import ( "errors" "math" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/fatih/set.v0" ) type hashFetcherFn func(common.Hash) error type blockFetcherFn func([]common.Hash) error var ( errAlreadyFetching = errors.New("already fetching blocks from peer") errAlreadyRegistered = errors.New("peer is already registered") errNotRegistered = errors.New("peer is not registered") ) // peer represents an active peer from which hashes and blocks are retrieved. type peer struct { id string // Unique identifier of the peer head common.Hash // Hash of the peers latest known block idle int32 // Current activity state of the peer (idle = 0, active = 1) rep int32 // Simple peer reputation capacity int32 // Number of blocks allowed to fetch per request started time.Time // Time instance when the last fetch was started ignored *set.Set // Set of hashes not to request (didn't have previously) getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing) getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing) } // newPeer create a new downloader peer, with specific hash and block retrieval // mechanisms. func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { return &peer{ id: id, head: head, capacity: 1, getHashes: getHashes, getBlocks: getBlocks, ignored: set.New(), } } // Reset clears the internal state of a peer entity. func (p *peer) Reset() { atomic.StoreInt32(&p.idle, 0) atomic.StoreInt32(&p.capacity, 1) p.ignored.Clear() } // Fetch sends a block retrieval request to the remote peer. func (p *peer) Fetch(request *fetchRequest) error { // Short circuit if the peer is already fetching if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { return errAlreadyFetching } p.started = time.Now() // Convert the hash set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Hashes)) for hash, _ := range request.Hashes { hashes = append(hashes, hash) } p.getBlocks(hashes) return nil } // SetIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its block retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. func (p *peer) SetIdle() { // Update the peer's download allowance based on previous performance scale := 2.0 if time.Since(p.started) > blockSoftTTL { scale = 0.5 } for { // Calculate the new download bandwidth allowance prev := atomic.LoadInt32(&p.capacity) next := int32(math.Max(1, math.Min(MaxBlockFetch, float64(prev)*scale))) if scale < 1 { glog.V(logger.Detail).Infof("%s: reducing block allowance from %d to %d", p.id, prev, next) } // Try to update the old value if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { break } } // Set the peer to idle to allow further block requests atomic.StoreInt32(&p.idle, 0) } // Capacity retrieves the peers block download allowance based on its previously // discovered bandwidth capacity. func (p *peer) Capacity() int { return int(atomic.LoadInt32(&p.capacity)) } // Promote increases the peer's reputation. func (p *peer) Promote() { atomic.AddInt32(&p.rep, 1) } // Demote decreases the peer's reputation or leaves it at 0. func (p *peer) Demote() { for { // Calculate the new reputation value prev := atomic.LoadInt32(&p.rep) next := prev / 2 // Try to update the old value if atomic.CompareAndSwapInt32(&p.rep, prev, next) { return } } } // peerSet represents the collection of active peer participating in the block // download procedure. type peerSet struct { peers map[string]*peer lock sync.RWMutex } // newPeerSet creates a new peer set top track the active download sources. func newPeerSet() *peerSet { return &peerSet{ peers: make(map[string]*peer), } } // Reset iterates over the current peer set, and resets each of the known peers // to prepare for a next batch of block retrieval. func (ps *peerSet) Reset() { ps.lock.RLock() defer ps.lock.RUnlock() for _, peer := range ps.peers { peer.Reset() } } // Register injects a new peer into the working set, or returns an error if the // peer is already known. func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() defer ps.lock.Unlock() if _, ok := ps.peers[p.id]; ok { return errAlreadyRegistered } ps.peers[p.id] = p return nil } // Unregister removes a remote peer from the active set, disabling any further // actions to/from that particular entity. func (ps *peerSet) Unregister(id string) error { ps.lock.Lock() defer ps.lock.Unlock() if _, ok := ps.peers[id]; !ok { return errNotRegistered } delete(ps.peers, id) return nil } // Peer retrieves the registered peer with the given id. func (ps *peerSet) Peer(id string) *peer { ps.lock.RLock() defer ps.lock.RUnlock() return ps.peers[id] } // Len returns if the current number of peers in the set. func (ps *peerSet) Len() int { ps.lock.RLock() defer ps.lock.RUnlock() return len(ps.peers) } // AllPeers retrieves a flat list of all the peers within the set. func (ps *peerSet) AllPeers() []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { list = append(list, p) } return list } // IdlePeers retrieves a flat list of all the currently idle peers within the // active peer set, ordered by their reputation. func (ps *peerSet) IdlePeers() []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { if atomic.LoadInt32(&p.idle) == 0 { list = append(list, p) } } for i := 0; i < len(list); i++ { for j := i + 1; j < len(list); j++ { if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) { list[i], list[j] = list[j], list[i] } } } return list }