From 9d9040558fc840060766cc93c9b1d710ff28e008 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 10 Dec 2019 16:02:01 +0100 Subject: [PATCH 1/3] Collect hello message latency License: MIT Signed-off-by: Jakub Sztandera --- chain/blocksync/blocksync_client.go | 31 ++++++++++++++++++++++------- node/hello/hello.go | 28 +++++++++++++++++--------- peermgr/peermgr.go | 29 ++++++++++++++++++++++++--- 3 files changed, 69 insertions(+), 19 deletions(-) diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index 8e487b812..d3ae6de43 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/cborutil" "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/peermgr" ) type BlockSync struct { @@ -29,13 +30,15 @@ type BlockSync struct { host host.Host syncPeers *bsPeerTracker + peerMgr *peermgr.PeerMgr } -func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync { +func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host, pmgr peermgr.MaybePeerMgr) *BlockSync { return &BlockSync{ bserv: bserv, host: h, - syncPeers: newPeerTracker(), + syncPeers: newPeerTracker(pmgr.Mgr), + peerMgr: pmgr.Mgr, } } @@ -392,11 +395,14 @@ type bsPeerTracker struct { peers map[peer.ID]*peerStats avgGlobalTime time.Duration + + pmgr *peermgr.PeerMgr } -func newPeerTracker() *bsPeerTracker { +func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker { return &bsPeerTracker{ peers: make(map[peer.ID]*peerStats), + pmgr: pmgr, } } @@ -435,20 +441,31 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID { var costI, costJ float64 + getPeerInitLat := func(p peer.ID) float64 { + var res float64 + if bpt.pmgr != nil { + if lat, ok := bpt.pmgr.GetPeerLatency(out[i]); ok { + res = float64(lat) + } + } + if res == 0 { + res = float64(bpt.avgGlobalTime) + } + return res * newPeerMul + } + if pi.successes+pi.failures > 0 { failRateI := float64(pi.failures) / float64(pi.failures+pi.successes) costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime) } else { - // we know nothing about this peer - // make them bit better than average - costI = 0.9 * float64(bpt.avgGlobalTime) + costI = getPeerInitLat(out[i]) } if pj.successes+pj.failures > 0 { failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes) costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime) } else { - costJ = 0.9 * float64(bpt.avgGlobalTime) + costI = getPeerInitLat(out[i]) } return costI < costJ diff --git a/node/hello/hello.go b/node/hello/hello.go index 171579899..60aa02bcc 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -2,8 +2,7 @@ package hello import ( "context" - - "go.uber.org/fx" + "time" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -43,13 +42,7 @@ type Service struct { pmgr *peermgr.PeerMgr } -type MaybePeerMgr struct { - fx.In - - Mgr *peermgr.PeerMgr `optional:"true"` -} - -func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr MaybePeerMgr) *Service { +func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr peermgr.MaybePeerMgr) *Service { if pmgr.Mgr == nil { log.Warn("running without peer manager") } @@ -81,6 +74,11 @@ func (hs *Service) HandleStream(s inet.Stream) { s.Conn().Close() return } + go func() { + if err := cborutil.WriteCborRPC(s, &Message{}); err != nil { + log.Debugf("error while responding to latency: %v", err) + } + }() ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), hmsg.HeaviestTipSet) if err != nil { @@ -93,9 +91,11 @@ func (hs *Service) HandleStream(s inet.Stream) { if hs.pmgr != nil { hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer()) } + } func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { + start := time.Now() s, err := hs.newStream(ctx, pid, ProtocolID) if err != nil { return err @@ -124,5 +124,15 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { return err } + go func() { + hmsg = &Message{} + s.SetReadDeadline(time.Now().Add(10 * time.Second)) + _ = cborutil.ReadCborRPC(s, hmsg) // ignore error + latency := time.Since(start) + + // add to peer tracker + hs.pmgr.SetPeerLatency(pid, latency) + }() + return nil } diff --git a/peermgr/peermgr.go b/peermgr/peermgr.go index 624fcfcc5..22879141e 100644 --- a/peermgr/peermgr.go +++ b/peermgr/peermgr.go @@ -6,6 +6,7 @@ import ( "time" "github.com/filecoin-project/lotus/node/modules/dtypes" + "go.uber.org/fx" host "github.com/libp2p/go-libp2p-core/host" net "github.com/libp2p/go-libp2p-core/network" @@ -22,6 +23,12 @@ const ( MinFilPeers = 8 ) +type MaybePeerMgr struct { + fx.In + + Mgr *PeerMgr `optional:"true"` +} + type PeerMgr struct { bootstrappers []peer.AddrInfo @@ -30,7 +37,7 @@ type PeerMgr struct { //peerLeads map[peer.ID]time.Time // TODO: unused peersLk sync.Mutex - peers map[peer.ID]struct{} + peers map[peer.ID]time.Duration maxFilPeers int minFilPeers int @@ -49,7 +56,7 @@ func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) dht: dht, bootstrappers: bootstrap, - peers: make(map[peer.ID]struct{}), + peers: make(map[peer.ID]time.Duration), maxFilPeers: MaxFilPeers, minFilPeers: MinFilPeers, @@ -69,7 +76,23 @@ func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() - pmgr.peers[p] = struct{}{} + pmgr.peers[p] = time.Duration(0) +} + +func (pmgr *PeerMgr) GetPeerLatency(p peer.ID) (time.Duration, bool) { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + dur, ok := pmgr.peers[p] + return dur, ok +} + +func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + if _, ok := pmgr.peers[p]; ok { + pmgr.peers[p] = latency + } + } func (pmgr *PeerMgr) Disconnect(p peer.ID) { From a53d1031dbbdce78eee079127fcd5289f5c3b4f7 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 10 Dec 2019 16:22:02 +0100 Subject: [PATCH 2/3] Fix nil check License: MIT Signed-off-by: Jakub Sztandera --- node/hello/hello.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node/hello/hello.go b/node/hello/hello.go index 60aa02bcc..7104e919b 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -131,7 +131,9 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { latency := time.Since(start) // add to peer tracker - hs.pmgr.SetPeerLatency(pid, latency) + if hs.pmgr != nil { + hs.pmgr.SetPeerLatency(pid, latency) + } }() return nil From f528aed5981a0a024c4c77163b3670da63ea3cd7 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 10 Dec 2019 16:58:21 +0100 Subject: [PATCH 3/3] Add offset tracking License: MIT Signed-off-by: Jakub Sztandera --- node/hello/hello.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/node/hello/hello.go b/node/hello/hello.go index 7104e919b..b996ddf43 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -31,6 +31,9 @@ type Message struct { HeaviestTipSet []cid.Cid HeaviestTipSetWeight types.BigInt GenesisHash cid.Cid + + TArrial int64 + TSent int64 } type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error) @@ -64,6 +67,8 @@ func (hs *Service) HandleStream(s inet.Stream) { log.Infow("failed to read hello message", "error", err) return } + arrived := time.Now() + log.Debugw("genesis from hello", "tipset", hmsg.HeaviestTipSet, "peer", s.Conn().RemotePeer(), @@ -75,7 +80,12 @@ func (hs *Service) HandleStream(s inet.Stream) { return } go func() { - if err := cborutil.WriteCborRPC(s, &Message{}); err != nil { + sent := time.Now() + msg := &Message{ + TArrial: arrived.UnixNano(), + TSent: sent.UnixNano(), + } + if err := cborutil.WriteCborRPC(s, msg); err != nil { log.Debugf("error while responding to latency: %v", err) } }() @@ -95,7 +105,6 @@ func (hs *Service) HandleStream(s inet.Stream) { } func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { - start := time.Now() s, err := hs.newStream(ctx, pid, ProtocolID) if err != nil { return err @@ -120,6 +129,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { } log.Info("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid()) + t0 := time.Now() if err := cborutil.WriteCborRPC(s, hmsg); err != nil { return err } @@ -127,12 +137,24 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { go func() { hmsg = &Message{} s.SetReadDeadline(time.Now().Add(10 * time.Second)) - _ = cborutil.ReadCborRPC(s, hmsg) // ignore error - latency := time.Since(start) + err := cborutil.ReadCborRPC(s, hmsg) // ignore error + ok := err != nil + t3 := time.Now() + lat := t3.Sub(t0) // add to peer tracker if hs.pmgr != nil { - hs.pmgr.SetPeerLatency(pid, latency) + hs.pmgr.SetPeerLatency(pid, lat) + } + + if ok { + if hmsg.TArrial != 0 && hmsg.TSent != 0 { + t1 := time.Unix(0, hmsg.TArrial) + t2 := time.Unix(0, hmsg.TSent) + offset := t0.Sub(t1) + t3.Sub(t2) + offset /= 2 + log.Infow("time offset", "offset", offset.Seconds(), "peerid", pid.String()) + } } }()