lotus/node/hello/hello.go

163 lines
3.7 KiB
Go
Raw Normal View History

2019-07-03 17:39:07 +00:00
package hello
import (
"context"
"time"
2019-07-03 17:39:07 +00:00
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
2019-07-03 17:39:07 +00:00
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
2019-12-05 05:14:19 +00:00
protocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/store"
2019-10-17 08:57:56 +00:00
"github.com/filecoin-project/lotus/chain/types"
2019-11-07 14:11:39 +00:00
"github.com/filecoin-project/lotus/lib/cborutil"
2019-10-17 08:57:56 +00:00
"github.com/filecoin-project/lotus/peermgr"
2019-07-03 17:39:07 +00:00
)
const ProtocolID = "/fil/hello/1.0.0"
2019-07-03 17:41:54 +00:00
2019-07-03 17:39:07 +00:00
var log = logging.Logger("hello")
func init() {
cbor.RegisterCborType(Message{})
}
type Message struct {
HeaviestTipSet []cid.Cid
2019-10-15 05:00:30 +00:00
HeaviestTipSetWeight types.BigInt
2019-07-03 17:39:07 +00:00
GenesisHash cid.Cid
TArrial int64
TSent int64
2019-07-03 17:39:07 +00:00
}
2019-11-09 23:00:22 +00:00
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
2019-07-03 17:39:07 +00:00
type Service struct {
2019-11-09 23:00:22 +00:00
newStream NewStreamFunc
2019-07-03 17:39:07 +00:00
2019-07-26 04:54:22 +00:00
cs *store.ChainStore
2019-07-05 14:46:21 +00:00
syncer *chain.Syncer
2019-10-17 08:57:56 +00:00
pmgr *peermgr.PeerMgr
2019-07-03 17:39:07 +00:00
}
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr peermgr.MaybePeerMgr) *Service {
2019-10-23 11:02:00 +00:00
if pmgr.Mgr == nil {
log.Warn("running without peer manager")
}
2019-07-03 17:39:07 +00:00
return &Service{
newStream: h.NewStream,
2019-07-08 13:36:43 +00:00
cs: cs,
syncer: syncer,
2019-10-23 11:02:00 +00:00
pmgr: pmgr.Mgr,
2019-07-03 17:39:07 +00:00
}
}
func (hs *Service) HandleStream(s inet.Stream) {
defer s.Close()
var hmsg Message
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(s, &hmsg); err != nil {
log.Infow("failed to read hello message", "error", err)
2019-07-03 17:39:07 +00:00
return
}
arrived := time.Now()
log.Debugw("genesis from hello",
"tipset", hmsg.HeaviestTipSet,
"peer", s.Conn().RemotePeer(),
"hash", hmsg.GenesisHash)
2019-07-03 17:39:07 +00:00
2019-07-05 14:46:21 +00:00
if hmsg.GenesisHash != hs.syncer.Genesis.Cids()[0] {
log.Warnf("other peer has different genesis! (%s)", hmsg.GenesisHash)
2019-07-03 17:39:07 +00:00
s.Conn().Close()
return
}
go func() {
sent := time.Now()
msg := &Message{
TArrial: arrived.UnixNano(),
TSent: sent.UnixNano(),
}
if err := cborutil.WriteCborRPC(s, msg); err != nil {
log.Debugf("error while responding to latency: %v", err)
}
}()
2019-07-03 17:39:07 +00:00
ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), hmsg.HeaviestTipSet)
if err != nil {
log.Errorf("failed to fetch tipset from peer during hello: %s", err)
return
}
2019-10-10 03:04:10 +00:00
log.Infof("Got new tipset through Hello: %s from %s", ts.Cids(), s.Conn().RemotePeer())
2019-07-05 14:46:21 +00:00
hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts)
2019-10-23 11:02:00 +00:00
if hs.pmgr != nil {
hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer())
}
2019-07-03 17:39:07 +00:00
}
func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
2019-07-05 14:46:21 +00:00
s, err := hs.newStream(ctx, pid, ProtocolID)
2019-07-03 17:39:07 +00:00
if err != nil {
return err
}
2019-07-24 22:16:47 +00:00
defer s.Close()
2019-07-03 17:39:07 +00:00
hts := hs.cs.GetHeaviestTipSet()
2019-10-15 05:00:30 +00:00
weight, err := hs.cs.Weight(ctx, hts)
if err != nil {
return err
}
2019-10-17 08:57:56 +00:00
2019-07-03 17:39:07 +00:00
gen, err := hs.cs.GetGenesis()
if err != nil {
return err
}
hmsg := &Message{
HeaviestTipSet: hts.Cids(),
HeaviestTipSetWeight: weight,
GenesisHash: gen.Cid(),
}
2019-12-07 22:32:34 +00:00
log.Info("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())
2019-07-03 17:39:07 +00:00
t0 := time.Now()
2019-11-07 14:11:39 +00:00
if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
2019-07-03 17:39:07 +00:00
return err
2019-07-05 14:46:21 +00:00
}
2019-07-03 17:39:07 +00:00
go func() {
hmsg = &Message{}
s.SetReadDeadline(time.Now().Add(10 * time.Second))
err := cborutil.ReadCborRPC(s, hmsg) // ignore error
ok := err != nil
t3 := time.Now()
lat := t3.Sub(t0)
// add to peer tracker
if hs.pmgr != nil {
hs.pmgr.SetPeerLatency(pid, lat)
}
if ok {
if hmsg.TArrial != 0 && hmsg.TSent != 0 {
t1 := time.Unix(0, hmsg.TArrial)
t2 := time.Unix(0, hmsg.TSent)
offset := t0.Sub(t1) + t3.Sub(t2)
offset /= 2
log.Infow("time offset", "offset", offset.Seconds(), "peerid", pid.String())
}
}
}()
2019-07-03 17:39:07 +00:00
return nil
2019-07-03 17:41:54 +00:00
}