lotus/chain/blocksync/peer_tracker.go

170 lines
3.5 KiB
Go
Raw Normal View History

2020-07-27 15:31:36 +00:00
package blocksync
// FIXME: This needs to be reviewed.
import (
"sort"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/peermgr"
)
type peerStats struct {
successes int
failures int
firstSeen time.Time
averageTime time.Duration
}
type bsPeerTracker struct {
lk sync.Mutex
peers map[peer.ID]*peerStats
avgGlobalTime time.Duration
pmgr *peermgr.PeerMgr
}
func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker {
return &bsPeerTracker{
peers: make(map[peer.ID]*peerStats),
pmgr: pmgr,
}
}
func (bpt *bsPeerTracker) addPeer(p peer.ID) {
bpt.lk.Lock()
defer bpt.lk.Unlock()
if _, ok := bpt.peers[p]; ok {
return
}
bpt.peers[p] = &peerStats{
firstSeen: build.Clock.Now(),
}
}
const (
// newPeerMul is how much better than average is the new peer assumed to be
// less than one to encourouge trying new peers
newPeerMul = 0.9
)
func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
// TODO: this could probably be cached, but as long as its not too many peers, fine for now
bpt.lk.Lock()
defer bpt.lk.Unlock()
out := make([]peer.ID, 0, len(bpt.peers))
for p := range bpt.peers {
out = append(out, p)
}
// sort by 'expected cost' of requesting data from that peer
// additionally handle edge cases where not enough data is available
sort.Slice(out, func(i, j int) bool {
pi := bpt.peers[out[i]]
pj := bpt.peers[out[j]]
var costI, costJ float64
getPeerInitLat := func(p peer.ID) float64 {
var res float64
if bpt.pmgr != nil {
if lat, ok := bpt.pmgr.GetPeerLatency(p); ok {
res = float64(lat)
}
}
if res == 0 {
res = float64(bpt.avgGlobalTime)
}
return res * newPeerMul
}
if pi.successes+pi.failures > 0 {
failRateI := float64(pi.failures) / float64(pi.failures+pi.successes)
costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime)
} else {
costI = getPeerInitLat(out[i])
}
if pj.successes+pj.failures > 0 {
failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes)
costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime)
} else {
costJ = getPeerInitLat(out[j])
}
return costI < costJ
})
return out
}
const (
// xInvAlpha = (N+1)/2
localInvAlpha = 5 // 86% of the value is the last 9
globalInvAlpha = 20 // 86% of the value is the last 39
)
func (bpt *bsPeerTracker) logGlobalSuccess(dur time.Duration) {
bpt.lk.Lock()
defer bpt.lk.Unlock()
if bpt.avgGlobalTime == 0 {
bpt.avgGlobalTime = dur
return
}
delta := (dur - bpt.avgGlobalTime) / globalInvAlpha
bpt.avgGlobalTime += delta
}
func logTime(pi *peerStats, dur time.Duration) {
if pi.averageTime == 0 {
pi.averageTime = dur
return
}
delta := (dur - pi.averageTime) / localInvAlpha
pi.averageTime += delta
}
func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) {
bpt.lk.Lock()
defer bpt.lk.Unlock()
var pi *peerStats
var ok bool
if pi, ok = bpt.peers[p]; !ok {
log.Warnw("log success called on peer not in tracker", "peerid", p.String())
return
}
pi.successes++
logTime(pi, dur)
}
func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) {
bpt.lk.Lock()
defer bpt.lk.Unlock()
var pi *peerStats
var ok bool
if pi, ok = bpt.peers[p]; !ok {
log.Warn("log failure called on peer not in tracker", "peerid", p.String())
return
}
pi.failures++
logTime(pi, dur)
}
func (bpt *bsPeerTracker) removePeer(p peer.ID) {
bpt.lk.Lock()
defer bpt.lk.Unlock()
delete(bpt.peers, p)
}