Merge pull request #834 from filecoin-project/feat/initial-latecy
Collect hello message latency
This commit is contained in:
commit
29db00269c
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/peermgr"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BlockSync struct {
|
type BlockSync struct {
|
||||||
@ -29,13 +30,15 @@ type BlockSync struct {
|
|||||||
host host.Host
|
host host.Host
|
||||||
|
|
||||||
syncPeers *bsPeerTracker
|
syncPeers *bsPeerTracker
|
||||||
|
peerMgr *peermgr.PeerMgr
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
|
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host, pmgr peermgr.MaybePeerMgr) *BlockSync {
|
||||||
return &BlockSync{
|
return &BlockSync{
|
||||||
bserv: bserv,
|
bserv: bserv,
|
||||||
host: h,
|
host: h,
|
||||||
syncPeers: newPeerTracker(),
|
syncPeers: newPeerTracker(pmgr.Mgr),
|
||||||
|
peerMgr: pmgr.Mgr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -392,11 +395,14 @@ type bsPeerTracker struct {
|
|||||||
|
|
||||||
peers map[peer.ID]*peerStats
|
peers map[peer.ID]*peerStats
|
||||||
avgGlobalTime time.Duration
|
avgGlobalTime time.Duration
|
||||||
|
|
||||||
|
pmgr *peermgr.PeerMgr
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPeerTracker() *bsPeerTracker {
|
func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker {
|
||||||
return &bsPeerTracker{
|
return &bsPeerTracker{
|
||||||
peers: make(map[peer.ID]*peerStats),
|
peers: make(map[peer.ID]*peerStats),
|
||||||
|
pmgr: pmgr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -435,20 +441,31 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
|
|||||||
|
|
||||||
var costI, costJ float64
|
var costI, costJ float64
|
||||||
|
|
||||||
|
getPeerInitLat := func(p peer.ID) float64 {
|
||||||
|
var res float64
|
||||||
|
if bpt.pmgr != nil {
|
||||||
|
if lat, ok := bpt.pmgr.GetPeerLatency(out[i]); ok {
|
||||||
|
res = float64(lat)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if res == 0 {
|
||||||
|
res = float64(bpt.avgGlobalTime)
|
||||||
|
}
|
||||||
|
return res * newPeerMul
|
||||||
|
}
|
||||||
|
|
||||||
if pi.successes+pi.failures > 0 {
|
if pi.successes+pi.failures > 0 {
|
||||||
failRateI := float64(pi.failures) / float64(pi.failures+pi.successes)
|
failRateI := float64(pi.failures) / float64(pi.failures+pi.successes)
|
||||||
costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime)
|
costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime)
|
||||||
} else {
|
} else {
|
||||||
// we know nothing about this peer
|
costI = getPeerInitLat(out[i])
|
||||||
// make them bit better than average
|
|
||||||
costI = 0.9 * float64(bpt.avgGlobalTime)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if pj.successes+pj.failures > 0 {
|
if pj.successes+pj.failures > 0 {
|
||||||
failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes)
|
failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes)
|
||||||
costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime)
|
costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime)
|
||||||
} else {
|
} else {
|
||||||
costJ = 0.9 * float64(bpt.avgGlobalTime)
|
costI = getPeerInitLat(out[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
return costI < costJ
|
return costI < costJ
|
||||||
|
@ -2,8 +2,7 @@ package hello
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
"go.uber.org/fx"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbor "github.com/ipfs/go-ipld-cbor"
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
@ -32,6 +31,9 @@ type Message struct {
|
|||||||
HeaviestTipSet []cid.Cid
|
HeaviestTipSet []cid.Cid
|
||||||
HeaviestTipSetWeight types.BigInt
|
HeaviestTipSetWeight types.BigInt
|
||||||
GenesisHash cid.Cid
|
GenesisHash cid.Cid
|
||||||
|
|
||||||
|
TArrial int64
|
||||||
|
TSent int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
|
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
|
||||||
@ -43,13 +45,7 @@ type Service struct {
|
|||||||
pmgr *peermgr.PeerMgr
|
pmgr *peermgr.PeerMgr
|
||||||
}
|
}
|
||||||
|
|
||||||
type MaybePeerMgr struct {
|
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr peermgr.MaybePeerMgr) *Service {
|
||||||
fx.In
|
|
||||||
|
|
||||||
Mgr *peermgr.PeerMgr `optional:"true"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr MaybePeerMgr) *Service {
|
|
||||||
if pmgr.Mgr == nil {
|
if pmgr.Mgr == nil {
|
||||||
log.Warn("running without peer manager")
|
log.Warn("running without peer manager")
|
||||||
}
|
}
|
||||||
@ -71,6 +67,8 @@ func (hs *Service) HandleStream(s inet.Stream) {
|
|||||||
log.Infow("failed to read hello message", "error", err)
|
log.Infow("failed to read hello message", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
arrived := time.Now()
|
||||||
|
|
||||||
log.Debugw("genesis from hello",
|
log.Debugw("genesis from hello",
|
||||||
"tipset", hmsg.HeaviestTipSet,
|
"tipset", hmsg.HeaviestTipSet,
|
||||||
"peer", s.Conn().RemotePeer(),
|
"peer", s.Conn().RemotePeer(),
|
||||||
@ -81,6 +79,16 @@ func (hs *Service) HandleStream(s inet.Stream) {
|
|||||||
s.Conn().Close()
|
s.Conn().Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
go func() {
|
||||||
|
sent := time.Now()
|
||||||
|
msg := &Message{
|
||||||
|
TArrial: arrived.UnixNano(),
|
||||||
|
TSent: sent.UnixNano(),
|
||||||
|
}
|
||||||
|
if err := cborutil.WriteCborRPC(s, msg); err != nil {
|
||||||
|
log.Debugf("error while responding to latency: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), hmsg.HeaviestTipSet)
|
ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), hmsg.HeaviestTipSet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -93,6 +101,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
|
|||||||
if hs.pmgr != nil {
|
if hs.pmgr != nil {
|
||||||
hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer())
|
hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer())
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
||||||
@ -120,9 +129,34 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
|||||||
}
|
}
|
||||||
log.Info("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())
|
log.Info("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())
|
||||||
|
|
||||||
|
t0 := time.Now()
|
||||||
if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
|
if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
hmsg = &Message{}
|
||||||
|
s.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||||
|
err := cborutil.ReadCborRPC(s, hmsg) // ignore error
|
||||||
|
ok := err != nil
|
||||||
|
|
||||||
|
t3 := time.Now()
|
||||||
|
lat := t3.Sub(t0)
|
||||||
|
// add to peer tracker
|
||||||
|
if hs.pmgr != nil {
|
||||||
|
hs.pmgr.SetPeerLatency(pid, lat)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
if hmsg.TArrial != 0 && hmsg.TSent != 0 {
|
||||||
|
t1 := time.Unix(0, hmsg.TArrial)
|
||||||
|
t2 := time.Unix(0, hmsg.TSent)
|
||||||
|
offset := t0.Sub(t1) + t3.Sub(t2)
|
||||||
|
offset /= 2
|
||||||
|
log.Infow("time offset", "offset", offset.Seconds(), "peerid", pid.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"go.uber.org/fx"
|
||||||
|
|
||||||
host "github.com/libp2p/go-libp2p-core/host"
|
host "github.com/libp2p/go-libp2p-core/host"
|
||||||
net "github.com/libp2p/go-libp2p-core/network"
|
net "github.com/libp2p/go-libp2p-core/network"
|
||||||
@ -22,6 +23,12 @@ const (
|
|||||||
MinFilPeers = 8
|
MinFilPeers = 8
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type MaybePeerMgr struct {
|
||||||
|
fx.In
|
||||||
|
|
||||||
|
Mgr *PeerMgr `optional:"true"`
|
||||||
|
}
|
||||||
|
|
||||||
type PeerMgr struct {
|
type PeerMgr struct {
|
||||||
bootstrappers []peer.AddrInfo
|
bootstrappers []peer.AddrInfo
|
||||||
|
|
||||||
@ -30,7 +37,7 @@ type PeerMgr struct {
|
|||||||
//peerLeads map[peer.ID]time.Time // TODO: unused
|
//peerLeads map[peer.ID]time.Time // TODO: unused
|
||||||
|
|
||||||
peersLk sync.Mutex
|
peersLk sync.Mutex
|
||||||
peers map[peer.ID]struct{}
|
peers map[peer.ID]time.Duration
|
||||||
|
|
||||||
maxFilPeers int
|
maxFilPeers int
|
||||||
minFilPeers int
|
minFilPeers int
|
||||||
@ -49,7 +56,7 @@ func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers)
|
|||||||
dht: dht,
|
dht: dht,
|
||||||
bootstrappers: bootstrap,
|
bootstrappers: bootstrap,
|
||||||
|
|
||||||
peers: make(map[peer.ID]struct{}),
|
peers: make(map[peer.ID]time.Duration),
|
||||||
|
|
||||||
maxFilPeers: MaxFilPeers,
|
maxFilPeers: MaxFilPeers,
|
||||||
minFilPeers: MinFilPeers,
|
minFilPeers: MinFilPeers,
|
||||||
@ -69,7 +76,23 @@ func NewPeerMgr(h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers)
|
|||||||
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
|
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
|
||||||
pmgr.peersLk.Lock()
|
pmgr.peersLk.Lock()
|
||||||
defer pmgr.peersLk.Unlock()
|
defer pmgr.peersLk.Unlock()
|
||||||
pmgr.peers[p] = struct{}{}
|
pmgr.peers[p] = time.Duration(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pmgr *PeerMgr) GetPeerLatency(p peer.ID) (time.Duration, bool) {
|
||||||
|
pmgr.peersLk.Lock()
|
||||||
|
defer pmgr.peersLk.Unlock()
|
||||||
|
dur, ok := pmgr.peers[p]
|
||||||
|
return dur, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) {
|
||||||
|
pmgr.peersLk.Lock()
|
||||||
|
defer pmgr.peersLk.Unlock()
|
||||||
|
if _, ok := pmgr.peers[p]; ok {
|
||||||
|
pmgr.peers[p] = latency
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pmgr *PeerMgr) Disconnect(p peer.ID) {
|
func (pmgr *PeerMgr) Disconnect(p peer.ID) {
|
||||||
|
Loading…
Reference in New Issue
Block a user