lotus/node/hello/hello.go

207 lines
5.3 KiB
Go
Raw Normal View History

2019-07-03 17:39:07 +00:00
package hello
import (
"context"
"time"
2019-07-03 17:39:07 +00:00
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
2022-08-25 18:20:41 +00:00
"github.com/libp2p/go-libp2p/core/host"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
2022-06-14 15:00:51 +00:00
"golang.org/x/xerrors"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
2020-07-10 14:43:14 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
2021-09-02 16:45:18 +00:00
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/store"
2019-10-17 08:57:56 +00:00
"github.com/filecoin-project/lotus/chain/types"
2020-02-22 11:36:22 +00:00
"github.com/filecoin-project/lotus/lib/peermgr"
2019-07-03 17:39:07 +00:00
)
2021-05-17 12:28:09 +00:00
// TODO(TEST): missing test coverage.
2019-07-03 17:39:07 +00:00
const ProtocolID = "/fil/hello/1.0.0"
2019-07-03 17:41:54 +00:00
2019-07-03 17:39:07 +00:00
var log = logging.Logger("hello")
var streamDeadline = 10 * time.Second
2019-07-03 17:39:07 +00:00
type HelloMessage struct {
2019-07-03 17:39:07 +00:00
HeaviestTipSet []cid.Cid
2020-02-24 17:45:25 +00:00
HeaviestTipSetHeight abi.ChainEpoch
2020-02-21 19:25:57 +00:00
HeaviestTipSetWeight big.Int
2019-07-03 17:39:07 +00:00
GenesisHash cid.Cid
}
2021-05-17 12:28:09 +00:00
type LatencyMessage struct {
2020-08-04 15:33:29 +00:00
TArrival int64
TSent int64
2019-07-03 17:39:07 +00:00
}
2019-11-09 23:00:22 +00:00
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
2021-05-17 12:28:09 +00:00
2019-07-03 17:39:07 +00:00
type Service struct {
h host.Host
2019-07-03 17:39:07 +00:00
2019-07-26 04:54:22 +00:00
cs *store.ChainStore
2019-07-05 14:46:21 +00:00
syncer *chain.Syncer
2021-09-02 16:07:23 +00:00
cons consensus.Consensus
2019-10-17 08:57:56 +00:00
pmgr *peermgr.PeerMgr
2019-07-03 17:39:07 +00:00
}
2021-09-02 16:07:23 +00:00
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, cons consensus.Consensus, pmgr peermgr.MaybePeerMgr) *Service {
2019-10-23 11:02:00 +00:00
if pmgr.Mgr == nil {
log.Warn("running without peer manager")
}
2019-07-03 17:39:07 +00:00
return &Service{
h: h,
2019-07-08 13:36:43 +00:00
cs: cs,
syncer: syncer,
2021-09-02 16:07:23 +00:00
cons: cons,
2019-10-23 11:02:00 +00:00
pmgr: pmgr.Mgr,
2019-07-03 17:39:07 +00:00
}
}
func (hs *Service) HandleStream(s inet.Stream) {
var hmsg HelloMessage
_ = s.SetReadDeadline(time.Now().Add(streamDeadline))
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(s, &hmsg); err != nil {
_ = s.SetReadDeadline(time.Time{})
2020-05-14 22:27:47 +00:00
log.Infow("failed to read hello message, disconnecting", "error", err)
_ = s.Conn().Close()
2019-07-03 17:39:07 +00:00
return
}
_ = s.SetReadDeadline(time.Time{})
2020-07-10 14:43:14 +00:00
arrived := build.Clock.Now()
log.Debugw("genesis from hello",
"tipset", hmsg.HeaviestTipSet,
"peer", s.Conn().RemotePeer(),
"hash", hmsg.GenesisHash)
2019-07-03 17:39:07 +00:00
2019-07-05 14:46:21 +00:00
if hmsg.GenesisHash != hs.syncer.Genesis.Cids()[0] {
log.Debugf("other peer has different genesis! (%s)", hmsg.GenesisHash)
_ = s.Conn().Close()
2019-07-03 17:39:07 +00:00
return
}
go func() {
defer s.Close() //nolint:errcheck
2020-07-10 14:43:14 +00:00
sent := build.Clock.Now()
msg := &LatencyMessage{
2020-08-04 15:33:29 +00:00
TArrival: arrived.UnixNano(),
TSent: sent.UnixNano(),
}
_ = s.SetWriteDeadline(time.Now().Add(streamDeadline))
if err := cborutil.WriteCborRPC(s, msg); err != nil {
log.Debugf("error while responding to latency: %v", err)
}
_ = s.SetWriteDeadline(time.Time{})
}()
2019-07-03 17:39:07 +00:00
protos, err := hs.h.Peerstore().GetProtocols(s.Conn().RemotePeer())
if err != nil {
log.Warnf("got error from peerstore.GetProtocols: %s", err)
}
if len(protos) == 0 {
log.Warn("other peer hasnt completed libp2p identify, waiting a bit")
// TODO: this better
2020-07-10 14:43:14 +00:00
build.Clock.Sleep(time.Millisecond * 300)
}
if hs.pmgr != nil {
hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer())
}
2019-12-16 19:22:56 +00:00
ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), types.NewTipSetKey(hmsg.HeaviestTipSet...))
2019-07-03 17:39:07 +00:00
if err != nil {
log.Errorf("failed to fetch tipset from peer during hello: %+v", err)
2019-07-03 17:39:07 +00:00
return
}
2019-12-16 17:14:21 +00:00
if ts.TipSet().Height() > 0 {
2019-12-17 18:11:26 +00:00
hs.h.ConnManager().TagPeer(s.Conn().RemotePeer(), "fcpeer", 10)
2019-12-16 17:14:21 +00:00
// don't bother informing about genesis
2020-11-03 12:28:31 +00:00
log.Debugf("Got new tipset through Hello: %s from %s", ts.Cids(), s.Conn().RemotePeer())
2019-12-16 17:14:21 +00:00
hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts)
}
2019-07-03 17:39:07 +00:00
}
func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
s, err := hs.h.NewStream(ctx, pid, ProtocolID)
2019-07-03 17:39:07 +00:00
if err != nil {
return xerrors.Errorf("error opening stream: %w", err)
2019-07-03 17:39:07 +00:00
}
hts := hs.cs.GetHeaviestTipSet()
2019-10-15 05:00:30 +00:00
weight, err := hs.cs.Weight(ctx, hts)
if err != nil {
return err
}
2019-10-17 08:57:56 +00:00
2021-12-11 21:03:00 +00:00
gen, err := hs.cs.GetGenesis(ctx)
2019-07-03 17:39:07 +00:00
if err != nil {
return err
}
hmsg := &HelloMessage{
2019-07-03 17:39:07 +00:00
HeaviestTipSet: hts.Cids(),
HeaviestTipSetHeight: hts.Height(),
2019-07-03 17:39:07 +00:00
HeaviestTipSetWeight: weight,
GenesisHash: gen.Cid(),
}
2019-12-16 17:14:21 +00:00
log.Debug("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())
2019-07-03 17:39:07 +00:00
2020-07-10 14:43:14 +00:00
t0 := build.Clock.Now()
_ = s.SetWriteDeadline(time.Now().Add(streamDeadline))
2019-11-07 14:11:39 +00:00
if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
_ = s.SetWriteDeadline(time.Time{})
return xerrors.Errorf("writing rpc to peer: %w", err)
2019-07-05 14:46:21 +00:00
}
_ = s.SetWriteDeadline(time.Time{})
2023-01-04 12:10:21 +00:00
if err := s.CloseWrite(); err != nil {
log.Warnw("CloseWrite err", "error", err)
}
2019-07-03 17:39:07 +00:00
go func() {
defer s.Close() //nolint:errcheck
lmsg := &LatencyMessage{}
2020-07-10 14:43:14 +00:00
_ = s.SetReadDeadline(build.Clock.Now().Add(10 * time.Second))
err := cborutil.ReadCborRPC(s, lmsg)
if err != nil {
2020-11-03 12:28:31 +00:00
log.Debugw("reading latency message", "error", err)
}
2020-07-10 14:43:14 +00:00
t3 := build.Clock.Now()
lat := t3.Sub(t0)
// add to peer tracker
if hs.pmgr != nil {
hs.pmgr.SetPeerLatency(pid, lat)
}
if err == nil {
2020-08-04 15:33:29 +00:00
if lmsg.TArrival != 0 && lmsg.TSent != 0 {
t1 := time.Unix(0, lmsg.TArrival)
t2 := time.Unix(0, lmsg.TSent)
offset := t0.Sub(t1) + t3.Sub(t2)
offset /= 2
2020-11-03 12:28:31 +00:00
if offset > 5*time.Second || offset < -5*time.Second {
log.Infow("time offset", "offset", offset.Seconds(), "peerid", pid.String())
}
}
}
}()
2019-07-03 17:39:07 +00:00
return nil
2019-07-03 17:41:54 +00:00
}