diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index 8e487b812..d3ae6de43 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/cborutil" "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/peermgr" ) type BlockSync struct { @@ -29,13 +30,15 @@ type BlockSync struct { host host.Host syncPeers *bsPeerTracker + peerMgr *peermgr.PeerMgr } -func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync { +func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host, pmgr peermgr.MaybePeerMgr) *BlockSync { return &BlockSync{ bserv: bserv, host: h, - syncPeers: newPeerTracker(), + syncPeers: newPeerTracker(pmgr.Mgr), + peerMgr: pmgr.Mgr, } } @@ -392,11 +395,14 @@ type bsPeerTracker struct { peers map[peer.ID]*peerStats avgGlobalTime time.Duration + + pmgr *peermgr.PeerMgr } -func newPeerTracker() *bsPeerTracker { +func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker { return &bsPeerTracker{ peers: make(map[peer.ID]*peerStats), + pmgr: pmgr, } } @@ -435,20 +441,31 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID { var costI, costJ float64 + getPeerInitLat := func(p peer.ID) float64 { + var res float64 + if bpt.pmgr != nil { + if lat, ok := bpt.pmgr.GetPeerLatency(out[i]); ok { + res = float64(lat) + } + } + if res == 0 { + res = float64(bpt.avgGlobalTime) + } + return res * newPeerMul + } + if pi.successes+pi.failures > 0 { failRateI := float64(pi.failures) / float64(pi.failures+pi.successes) costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime) } else { - // we know nothing about this peer - // make them bit better than average - costI = 0.9 * float64(bpt.avgGlobalTime) + costI = getPeerInitLat(out[i]) } if pj.successes+pj.failures > 0 { failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes) costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime) } else { - costJ = 0.9 * float64(bpt.avgGlobalTime) + costI = getPeerInitLat(out[i]) } return costI < costJ diff --git a/node/hello/hello.go b/node/hello/hello.go index 171579899..b996ddf43 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -2,8 +2,7 @@ package hello import ( "context" - - "go.uber.org/fx" + "time" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -32,6 +31,9 @@ type Message struct { HeaviestTipSet []cid.Cid HeaviestTipSetWeight types.BigInt GenesisHash cid.Cid + + TArrial int64 + TSent int64 } type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error) @@ -43,13 +45,7 @@ type Service struct { pmgr *peermgr.PeerMgr } -type MaybePeerMgr struct { - fx.In - - Mgr *peermgr.PeerMgr `optional:"true"` -} - -func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr MaybePeerMgr) *Service { +func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr peermgr.MaybePeerMgr) *Service { if pmgr.Mgr == nil { log.Warn("running without peer manager") } @@ -71,6 +67,8 @@ func (hs *Service) HandleStream(s inet.Stream) { log.Infow("failed to read hello message", "error", err) return } + arrived := time.Now() + log.Debugw("genesis from hello", "tipset", hmsg.HeaviestTipSet, "peer", s.Conn().RemotePeer(), @@ -81,6 +79,16 @@ func (hs *Service) HandleStream(s inet.Stream) { s.Conn().Close() return } + go func() { + sent := time.Now() + msg := &Message{ + TArrial: arrived.UnixNano(), + TSent: sent.UnixNano(), + } + if err := cborutil.WriteCborRPC(s, msg); err != nil { + log.Debugf("error while responding to latency: %v", err) + } + }() ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), hmsg.HeaviestTipSet) if err != nil { @@ -93,6 +101,7 @@ func (hs *Service) HandleStream(s inet.Stream) { if hs.pmgr != nil { hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer()) } + } func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { @@ -120,9 +129,34 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { } log.Info("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid()) + t0 := time.Now() if err := cborutil.WriteCborRPC(s, hmsg); err != nil { return err } + go func() { + hmsg = &Message{} + s.SetReadDeadline(time.Now().Add(10 * time.Second)) + err := cborutil.ReadCborRPC(s, hmsg) // ignore error + ok := err != nil + + t3 := time.Now() + lat := t3.Sub(t0) + // add to peer tracker + if hs.pmgr != nil { + hs.pmgr.SetPeerLatency(pid, lat) + } + + if ok { + if hmsg.TArrial != 0 && hmsg.TSent != 0 { + t1 := time.Unix(0, hmsg.TArrial) + t2 := time.Unix(0, hmsg.TSent) + offset := t0.Sub(t1) + t3.Sub(t2) + offset /= 2 + log.Infow("time offset", "offset", offset.Seconds(), "peerid", pid.String()) + } + } + }() + return nil } diff --git a/peermgr/peermgr.go b/peermgr/peermgr.go index 624fcfcc5..22879141e 100644 --- a/peermgr/peermgr.go +++ b/peermgr/peermgr.go @@ -6,6 +6,7 @@ import ( "time" "github.com/filecoin-project/lotus/node/modules/dtypes" + "go.uber.org/fx" host "github.com/libp2p/go-libp2p-core/host" net "github.com/libp2p/go-libp2p-core/network" @@ -22,6 +23,12 @@ const ( MinFilPeers = 8 ) +type MaybePeerMgr struct { + fx.In + + Mgr *PeerMgr `optional:"true"` +} + type PeerMgr struct { bootstrappers []peer.AddrInfo @@ -30,7 +37,7 @@ type PeerMgr struct { //peerLeads map[peer.ID]time.Time // TODO: unused peersLk sync.Mutex - peers map[peer.ID]struct{} + peers map[peer.ID]time.Duration maxFilPeers int minFilPeers int @@ -49,7 +56,7 @@ func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) dht: dht, bootstrappers: bootstrap, - peers: make(map[peer.ID]struct{}), + peers: make(map[peer.ID]time.Duration), maxFilPeers: MaxFilPeers, minFilPeers: MinFilPeers, @@ -69,7 +76,23 @@ func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() - pmgr.peers[p] = struct{}{} + pmgr.peers[p] = time.Duration(0) +} + +func (pmgr *PeerMgr) GetPeerLatency(p peer.ID) (time.Duration, bool) { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + dur, ok := pmgr.peers[p] + return dur, ok +} + +func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) { + pmgr.peersLk.Lock() + defer pmgr.peersLk.Unlock() + if _, ok := pmgr.peers[p]; ok { + pmgr.peers[p] = latency + } + } func (pmgr *PeerMgr) Disconnect(p peer.ID) {