feat: libp2p: Lotus stream cleanup (#11993)

* set stream deadlines in Lotus

* reduce timeout

* whitelist bootstrappers

* fix tests
This commit is contained in:
Aarsh Shah 2024-05-14 14:12:03 +04:00 committed by GitHub
parent d0bbb0b20a
commit af31126ea6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 35 additions and 2 deletions

View File

@ -38,6 +38,7 @@ const (
ReadResMinSpeed = 50 << 10
ShufflePeersPrefix = 16
WriteResDeadline = 60 * time.Second
streamReadDeadline = 10 * time.Second
)
// FIXME: Rename. Make private.

View File

@ -40,11 +40,15 @@ func (s *server) HandleStream(stream inet.Stream) {
defer stream.Close() //nolint:errcheck
_ = stream.SetReadDeadline(time.Now().Add(streamReadDeadline))
var req Request
if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil {
_ = stream.SetReadDeadline(time.Time{})
log.Warnf("failed to read block sync request: %s", err)
return
}
_ = stream.SetReadDeadline(time.Time{})
log.Debugw("block sync request",
"start", req.Head, "len", req.Length)

View File

@ -51,6 +51,7 @@ var helloCmd = &cli.Command{
func HandleStream(s inet.Stream) {
var hmsg hello.HelloMessage
_ = s.SetReadDeadline(time.Now().Add(30 * time.Second))
if err := cborutil.ReadCborRPC(s, &hmsg); err != nil {
log.Infow("failed to read hello message, disconnecting", "error", err)
_ = s.Conn().Close()

View File

@ -29,6 +29,7 @@ import (
const ProtocolID = "/fil/hello/1.0.0"
var log = logging.Logger("hello")
var streamDeadline = 10 * time.Second
type HelloMessage struct {
HeaviestTipSet []cid.Cid
@ -70,11 +71,15 @@ func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, co
func (hs *Service) HandleStream(s inet.Stream) {
var hmsg HelloMessage
_ = s.SetReadDeadline(time.Now().Add(streamDeadline))
if err := cborutil.ReadCborRPC(s, &hmsg); err != nil {
_ = s.SetReadDeadline(time.Time{})
log.Infow("failed to read hello message, disconnecting", "error", err)
_ = s.Conn().Close()
return
}
_ = s.SetReadDeadline(time.Time{})
arrived := build.Clock.Now()
log.Debugw("genesis from hello",
@ -95,9 +100,11 @@ func (hs *Service) HandleStream(s inet.Stream) {
TArrival: arrived.UnixNano(),
TSent: sent.UnixNano(),
}
_ = s.SetWriteDeadline(time.Now().Add(streamDeadline))
if err := cborutil.WriteCborRPC(s, msg); err != nil {
log.Debugf("error while responding to latency: %v", err)
}
_ = s.SetWriteDeadline(time.Time{})
}()
protos, err := hs.h.Peerstore().GetProtocols(s.Conn().RemotePeer())
@ -155,9 +162,12 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
log.Debug("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())
t0 := build.Clock.Now()
_ = s.SetWriteDeadline(time.Now().Add(streamDeadline))
if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
_ = s.SetWriteDeadline(time.Time{})
return xerrors.Errorf("writing rpc to peer: %w", err)
}
_ = s.SetWriteDeadline(time.Time{})
if err := s.CloseWrite(); err != nil {
log.Warnw("CloseWrite err", "error", err)
}

View File

@ -15,19 +15,22 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
var rcmgrMetricsOnce sync.Once
func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
return func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo, bs dtypes.BootstrapPeers) (network.ResourceManager, error) {
return func(lc fx.Lifecycle, repo repo.LockedRepo, bs dtypes.BootstrapPeers) (network.ResourceManager, error) {
isFullNode := repo.RepoType().Type() == "FullNode"
envvar := os.Getenv("LOTUS_RCMGR")
if (isFullNode && envvar == "0") || // only set NullResourceManager if envvar is explicitly "0"
@ -133,6 +136,20 @@ func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo)
opts = append(opts, rcmgr.WithTrace(traceFile))
}
resolver := madns.DefaultResolver
var bootstrapperMaddrs []ma.Multiaddr
for _, pi := range bs {
for _, addr := range pi.Addrs {
resolved, err := resolver.Resolve(context.Background(), addr)
if err != nil {
continue
}
bootstrapperMaddrs = append(bootstrapperMaddrs, resolved...)
}
}
opts = append(opts, rcmgr.WithAllowlistedMultiaddrs(bootstrapperMaddrs))
mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
return nil, fmt.Errorf("error creating resource manager: %w", err)