From f60c1ce7e6823a5a029867c3647a359a0aca23c6 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 14 May 2024 14:12:03 +0400 Subject: [PATCH] feat: libp2p: Lotus stream cleanup (#11993) * set stream deadlines in Lotus * reduce timeout * whitelist bootstrappers * fix tests --- chain/exchange/protocol.go | 1 + chain/exchange/server.go | 4 ++++ cmd/lotus-shed/hello.go | 1 + node/hello/hello.go | 10 ++++++++++ node/modules/lp2p/rcmgr.go | 21 +++++++++++++++++++-- 5 files changed, 35 insertions(+), 2 deletions(-) diff --git a/chain/exchange/protocol.go b/chain/exchange/protocol.go index cd25f4a43..7a22de8a3 100644 --- a/chain/exchange/protocol.go +++ b/chain/exchange/protocol.go @@ -38,6 +38,7 @@ const ( ReadResMinSpeed = 50 << 10 ShufflePeersPrefix = 16 WriteResDeadline = 60 * time.Second + streamReadDeadline = 10 * time.Second ) // FIXME: Rename. Make private. diff --git a/chain/exchange/server.go b/chain/exchange/server.go index ac3454c90..e8b2a414e 100644 --- a/chain/exchange/server.go +++ b/chain/exchange/server.go @@ -40,11 +40,15 @@ func (s *server) HandleStream(stream inet.Stream) { defer stream.Close() //nolint:errcheck + _ = stream.SetReadDeadline(time.Now().Add(streamReadDeadline)) var req Request if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil { + _ = stream.SetReadDeadline(time.Time{}) log.Warnf("failed to read block sync request: %s", err) return } + _ = stream.SetReadDeadline(time.Time{}) + log.Debugw("block sync request", "start", req.Head, "len", req.Length) diff --git a/cmd/lotus-shed/hello.go b/cmd/lotus-shed/hello.go index d16f93735..3c905b798 100644 --- a/cmd/lotus-shed/hello.go +++ b/cmd/lotus-shed/hello.go @@ -51,6 +51,7 @@ var helloCmd = &cli.Command{ func HandleStream(s inet.Stream) { var hmsg hello.HelloMessage + _ = s.SetReadDeadline(time.Now().Add(30 * time.Second)) if err := cborutil.ReadCborRPC(s, &hmsg); err != nil { log.Infow("failed to read hello message, disconnecting", "error", err) _ = s.Conn().Close() diff --git a/node/hello/hello.go b/node/hello/hello.go index e05b8a482..cd1645d3e 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -29,6 +29,7 @@ import ( const ProtocolID = "/fil/hello/1.0.0" var log = logging.Logger("hello") +var streamDeadline = 10 * time.Second type HelloMessage struct { HeaviestTipSet []cid.Cid @@ -70,11 +71,15 @@ func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, co func (hs *Service) HandleStream(s inet.Stream) { var hmsg HelloMessage + _ = s.SetReadDeadline(time.Now().Add(streamDeadline)) if err := cborutil.ReadCborRPC(s, &hmsg); err != nil { + _ = s.SetReadDeadline(time.Time{}) log.Infow("failed to read hello message, disconnecting", "error", err) _ = s.Conn().Close() return } + _ = s.SetReadDeadline(time.Time{}) + arrived := build.Clock.Now() log.Debugw("genesis from hello", @@ -95,9 +100,11 @@ func (hs *Service) HandleStream(s inet.Stream) { TArrival: arrived.UnixNano(), TSent: sent.UnixNano(), } + _ = s.SetWriteDeadline(time.Now().Add(streamDeadline)) if err := cborutil.WriteCborRPC(s, msg); err != nil { log.Debugf("error while responding to latency: %v", err) } + _ = s.SetWriteDeadline(time.Time{}) }() protos, err := hs.h.Peerstore().GetProtocols(s.Conn().RemotePeer()) @@ -155,9 +162,12 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { log.Debug("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid()) t0 := build.Clock.Now() + _ = s.SetWriteDeadline(time.Now().Add(streamDeadline)) if err := cborutil.WriteCborRPC(s, hmsg); err != nil { + _ = s.SetWriteDeadline(time.Time{}) return xerrors.Errorf("writing rpc to peer: %w", err) } + _ = s.SetWriteDeadline(time.Time{}) if err := s.CloseWrite(); err != nil { log.Warnw("CloseWrite err", "error", err) } diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index f2b284986..92c3bca6f 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -15,19 +15,22 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + ma "github.com/multiformats/go-multiaddr" + madns "github.com/multiformats/go-multiaddr-dns" "github.com/prometheus/client_golang/prometheus" "go.opencensus.io/stats" "go.opencensus.io/tag" "go.uber.org/fx" "github.com/filecoin-project/lotus/metrics" + "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" ) var rcmgrMetricsOnce sync.Once -func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { - return func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { +func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo, bs dtypes.BootstrapPeers) (network.ResourceManager, error) { + return func(lc fx.Lifecycle, repo repo.LockedRepo, bs dtypes.BootstrapPeers) (network.ResourceManager, error) { isFullNode := repo.RepoType().Type() == "FullNode" envvar := os.Getenv("LOTUS_RCMGR") if (isFullNode && envvar == "0") || // only set NullResourceManager if envvar is explicitly "0" @@ -133,6 +136,20 @@ func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) opts = append(opts, rcmgr.WithTrace(traceFile)) } + resolver := madns.DefaultResolver + var bootstrapperMaddrs []ma.Multiaddr + for _, pi := range bs { + for _, addr := range pi.Addrs { + resolved, err := resolver.Resolve(context.Background(), addr) + if err != nil { + continue + } + bootstrapperMaddrs = append(bootstrapperMaddrs, resolved...) + } + } + + opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(bootstrapperMaddrs)) + mgr, err := rcmgr.NewResourceManager(limiter, opts...) if err != nil { return nil, fmt.Errorf("error creating resource manager: %w", err)