Add offset tracking
License: MIT Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
a53d1031db
commit
f528aed598
@ -31,6 +31,9 @@ type Message struct {
|
|||||||
HeaviestTipSet []cid.Cid
|
HeaviestTipSet []cid.Cid
|
||||||
HeaviestTipSetWeight types.BigInt
|
HeaviestTipSetWeight types.BigInt
|
||||||
GenesisHash cid.Cid
|
GenesisHash cid.Cid
|
||||||
|
|
||||||
|
TArrial int64
|
||||||
|
TSent int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
|
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
|
||||||
@ -64,6 +67,8 @@ func (hs *Service) HandleStream(s inet.Stream) {
|
|||||||
log.Infow("failed to read hello message", "error", err)
|
log.Infow("failed to read hello message", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
arrived := time.Now()
|
||||||
|
|
||||||
log.Debugw("genesis from hello",
|
log.Debugw("genesis from hello",
|
||||||
"tipset", hmsg.HeaviestTipSet,
|
"tipset", hmsg.HeaviestTipSet,
|
||||||
"peer", s.Conn().RemotePeer(),
|
"peer", s.Conn().RemotePeer(),
|
||||||
@ -75,7 +80,12 @@ func (hs *Service) HandleStream(s inet.Stream) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
if err := cborutil.WriteCborRPC(s, &Message{}); err != nil {
|
sent := time.Now()
|
||||||
|
msg := &Message{
|
||||||
|
TArrial: arrived.UnixNano(),
|
||||||
|
TSent: sent.UnixNano(),
|
||||||
|
}
|
||||||
|
if err := cborutil.WriteCborRPC(s, msg); err != nil {
|
||||||
log.Debugf("error while responding to latency: %v", err)
|
log.Debugf("error while responding to latency: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -95,7 +105,6 @@ func (hs *Service) HandleStream(s inet.Stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
||||||
start := time.Now()
|
|
||||||
s, err := hs.newStream(ctx, pid, ProtocolID)
|
s, err := hs.newStream(ctx, pid, ProtocolID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -120,6 +129,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
|||||||
}
|
}
|
||||||
log.Info("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())
|
log.Info("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())
|
||||||
|
|
||||||
|
t0 := time.Now()
|
||||||
if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
|
if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -127,12 +137,24 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
|||||||
go func() {
|
go func() {
|
||||||
hmsg = &Message{}
|
hmsg = &Message{}
|
||||||
s.SetReadDeadline(time.Now().Add(10 * time.Second))
|
s.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||||
_ = cborutil.ReadCborRPC(s, hmsg) // ignore error
|
err := cborutil.ReadCborRPC(s, hmsg) // ignore error
|
||||||
latency := time.Since(start)
|
ok := err != nil
|
||||||
|
|
||||||
|
t3 := time.Now()
|
||||||
|
lat := t3.Sub(t0)
|
||||||
// add to peer tracker
|
// add to peer tracker
|
||||||
if hs.pmgr != nil {
|
if hs.pmgr != nil {
|
||||||
hs.pmgr.SetPeerLatency(pid, latency)
|
hs.pmgr.SetPeerLatency(pid, lat)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
if hmsg.TArrial != 0 && hmsg.TSent != 0 {
|
||||||
|
t1 := time.Unix(0, hmsg.TArrial)
|
||||||
|
t2 := time.Unix(0, hmsg.TSent)
|
||||||
|
offset := t0.Sub(t1) + t3.Sub(t2)
|
||||||
|
offset /= 2
|
||||||
|
log.Infow("time offset", "offset", offset.Seconds(), "peerid", pid.String())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user