Collect hello message latency

License: MIT
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2019-12-10 16:02:01 +01:00
parent fe95c59158
commit 9d9040558f
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
3 changed files with 69 additions and 19 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/cborutil"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/peermgr"
)
type BlockSync struct {
@ -29,13 +30,15 @@ type BlockSync struct {
host host.Host
syncPeers *bsPeerTracker
peerMgr *peermgr.PeerMgr
}
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host, pmgr peermgr.MaybePeerMgr) *BlockSync {
return &BlockSync{
bserv: bserv,
host: h,
syncPeers: newPeerTracker(),
syncPeers: newPeerTracker(pmgr.Mgr),
peerMgr: pmgr.Mgr,
}
}
@ -392,11 +395,14 @@ type bsPeerTracker struct {
peers map[peer.ID]*peerStats
avgGlobalTime time.Duration
pmgr *peermgr.PeerMgr
}
func newPeerTracker() *bsPeerTracker {
func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker {
return &bsPeerTracker{
peers: make(map[peer.ID]*peerStats),
pmgr: pmgr,
}
}
@ -435,20 +441,31 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
var costI, costJ float64
getPeerInitLat := func(p peer.ID) float64 {
var res float64
if bpt.pmgr != nil {
if lat, ok := bpt.pmgr.GetPeerLatency(out[i]); ok {
res = float64(lat)
}
}
if res == 0 {
res = float64(bpt.avgGlobalTime)
}
return res * newPeerMul
}
if pi.successes+pi.failures > 0 {
failRateI := float64(pi.failures) / float64(pi.failures+pi.successes)
costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime)
} else {
// we know nothing about this peer
// make them bit better than average
costI = 0.9 * float64(bpt.avgGlobalTime)
costI = getPeerInitLat(out[i])
}
if pj.successes+pj.failures > 0 {
failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes)
costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime)
} else {
costJ = 0.9 * float64(bpt.avgGlobalTime)
costI = getPeerInitLat(out[i])
}
return costI < costJ

View File

@ -2,8 +2,7 @@ package hello
import (
"context"
"go.uber.org/fx"
"time"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
@ -43,13 +42,7 @@ type Service struct {
pmgr *peermgr.PeerMgr
}
type MaybePeerMgr struct {
fx.In
Mgr *peermgr.PeerMgr `optional:"true"`
}
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr MaybePeerMgr) *Service {
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr peermgr.MaybePeerMgr) *Service {
if pmgr.Mgr == nil {
log.Warn("running without peer manager")
}
@ -81,6 +74,11 @@ func (hs *Service) HandleStream(s inet.Stream) {
s.Conn().Close()
return
}
go func() {
if err := cborutil.WriteCborRPC(s, &Message{}); err != nil {
log.Debugf("error while responding to latency: %v", err)
}
}()
ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), hmsg.HeaviestTipSet)
if err != nil {
@ -93,9 +91,11 @@ func (hs *Service) HandleStream(s inet.Stream) {
if hs.pmgr != nil {
hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer())
}
}
func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
start := time.Now()
s, err := hs.newStream(ctx, pid, ProtocolID)
if err != nil {
return err
@ -124,5 +124,15 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
return err
}
go func() {
hmsg = &Message{}
s.SetReadDeadline(time.Now().Add(10 * time.Second))
_ = cborutil.ReadCborRPC(s, hmsg) // ignore error
latency := time.Since(start)
// add to peer tracker
hs.pmgr.SetPeerLatency(pid, latency)
}()
return nil
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"go.uber.org/fx"
host "github.com/libp2p/go-libp2p-core/host"
net "github.com/libp2p/go-libp2p-core/network"
@ -22,6 +23,12 @@ const (
MinFilPeers = 8
)
type MaybePeerMgr struct {
fx.In
Mgr *PeerMgr `optional:"true"`
}
type PeerMgr struct {
bootstrappers []peer.AddrInfo
@ -30,7 +37,7 @@ type PeerMgr struct {
//peerLeads map[peer.ID]time.Time // TODO: unused
peersLk sync.Mutex
peers map[peer.ID]struct{}
peers map[peer.ID]time.Duration
maxFilPeers int
minFilPeers int
@ -49,7 +56,7 @@ func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers)
dht: dht,
bootstrappers: bootstrap,
peers: make(map[peer.ID]struct{}),
peers: make(map[peer.ID]time.Duration),
maxFilPeers: MaxFilPeers,
minFilPeers: MinFilPeers,
@ -69,7 +76,23 @@ func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers)
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
pmgr.peers[p] = struct{}{}
pmgr.peers[p] = time.Duration(0)
}
func (pmgr *PeerMgr) GetPeerLatency(p peer.ID) (time.Duration, bool) {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
dur, ok := pmgr.peers[p]
return dur, ok
}
func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
if _, ok := pmgr.peers[p]; ok {
pmgr.peers[p] = latency
}
}
func (pmgr *PeerMgr) Disconnect(p peer.ID) {