Merge pull request #8318 from filecoin-project/feat/rcmgr-limits-envvar
improve resource manager integration
This commit is contained in:
commit
c3cadcba0e
@ -222,7 +222,7 @@ var LibP2P = Options(
|
|||||||
Override(ConnGaterKey, lp2p.ConnGaterOption),
|
Override(ConnGaterKey, lp2p.ConnGaterOption),
|
||||||
|
|
||||||
// Services (resource management)
|
// Services (resource management)
|
||||||
Override(new(network.ResourceManager), lp2p.ResourceManager),
|
Override(new(network.ResourceManager), lp2p.ResourceManager(200)),
|
||||||
Override(ResourceManagerKey, lp2p.ResourceManagerOption),
|
Override(ResourceManagerKey, lp2p.ResourceManagerOption),
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -282,6 +282,7 @@ func ConfigCommon(cfg *config.Common, enableLibp2pNode bool) Option {
|
|||||||
cfg.Libp2p.ConnMgrHigh,
|
cfg.Libp2p.ConnMgrHigh,
|
||||||
time.Duration(cfg.Libp2p.ConnMgrGrace),
|
time.Duration(cfg.Libp2p.ConnMgrGrace),
|
||||||
cfg.Libp2p.ProtectedPeers)),
|
cfg.Libp2p.ProtectedPeers)),
|
||||||
|
Override(new(network.ResourceManager), lp2p.ResourceManager(cfg.Libp2p.ConnMgrHigh)),
|
||||||
Override(new(*pubsub.PubSub), lp2p.GossipSub),
|
Override(new(*pubsub.PubSub), lp2p.GossipSub),
|
||||||
Override(new(*config.Pubsub), &cfg.Pubsub),
|
Override(new(*config.Pubsub), &cfg.Pubsub),
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/bits"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
@ -15,6 +16,8 @@ import (
|
|||||||
"github.com/libp2p/go-libp2p-core/protocol"
|
"github.com/libp2p/go-libp2p-core/protocol"
|
||||||
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
|
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
|
||||||
@ -22,7 +25,50 @@ import (
|
|||||||
"go.opencensus.io/tag"
|
"go.opencensus.io/tag"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
|
func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
|
||||||
|
return func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
|
||||||
|
envvar := os.Getenv("LOTUS_RCMGR")
|
||||||
|
if envvar == "" || envvar == "0" {
|
||||||
|
// TODO opt-in for now -- flip this to enabled by default once we are comfortable with testing
|
||||||
|
log.Info("libp2p resource manager is disabled")
|
||||||
|
return network.NullResourceManager, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("libp2p resource manager is enabled")
|
||||||
|
// enable debug logs for rcmgr
|
||||||
|
logging.SetLogLevel("rcmgr", "debug")
|
||||||
|
|
||||||
|
// Adjust default limits
|
||||||
|
// - give it more memory, up to 4G, min of 1G
|
||||||
|
// - if maxconns are too high, adjust Conn/FD/Stream limits
|
||||||
|
defaultLimits := rcmgr.DefaultLimits.WithSystemMemory(.125, 1<<30, 4<<30)
|
||||||
|
maxconns := int(connMgrHi)
|
||||||
|
if 2*maxconns > defaultLimits.SystemBaseLimit.ConnsInbound {
|
||||||
|
// adjust conns to 2x to allow for two conns per peer (TCP+QUIC)
|
||||||
|
defaultLimits.SystemBaseLimit.ConnsInbound = logScale(2 * maxconns)
|
||||||
|
defaultLimits.SystemBaseLimit.ConnsOutbound = logScale(2 * maxconns)
|
||||||
|
defaultLimits.SystemBaseLimit.Conns = logScale(4 * maxconns)
|
||||||
|
|
||||||
|
defaultLimits.SystemBaseLimit.StreamsInbound = logScale(16 * maxconns)
|
||||||
|
defaultLimits.SystemBaseLimit.StreamsOutbound = logScale(64 * maxconns)
|
||||||
|
defaultLimits.SystemBaseLimit.Streams = logScale(64 * maxconns)
|
||||||
|
|
||||||
|
if 2*maxconns > defaultLimits.SystemBaseLimit.FD {
|
||||||
|
defaultLimits.SystemBaseLimit.FD = logScale(2 * maxconns)
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultLimits.ServiceBaseLimit.StreamsInbound = logScale(8 * maxconns)
|
||||||
|
defaultLimits.ServiceBaseLimit.StreamsOutbound = logScale(32 * maxconns)
|
||||||
|
defaultLimits.ServiceBaseLimit.Streams = logScale(32 * maxconns)
|
||||||
|
|
||||||
|
defaultLimits.ProtocolBaseLimit.StreamsInbound = logScale(8 * maxconns)
|
||||||
|
defaultLimits.ProtocolBaseLimit.StreamsOutbound = logScale(32 * maxconns)
|
||||||
|
defaultLimits.ProtocolBaseLimit.Streams = logScale(32 * maxconns)
|
||||||
|
|
||||||
|
log.Info("adjusted default resource manager limits")
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize
|
||||||
var limiter *rcmgr.BasicLimiter
|
var limiter *rcmgr.BasicLimiter
|
||||||
var opts []rcmgr.Option
|
var opts []rcmgr.Option
|
||||||
|
|
||||||
@ -34,13 +80,13 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan
|
|||||||
switch {
|
switch {
|
||||||
case err == nil:
|
case err == nil:
|
||||||
defer limitsIn.Close() //nolint:errcheck
|
defer limitsIn.Close() //nolint:errcheck
|
||||||
limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitsIn)
|
limiter, err = rcmgr.NewLimiterFromJSON(limitsIn, defaultLimits)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error parsing limit file: %w", err)
|
return nil, fmt.Errorf("error parsing limit file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
case errors.Is(err, os.ErrNotExist):
|
case errors.Is(err, os.ErrNotExist):
|
||||||
limiter = rcmgr.NewDefaultLimiter()
|
limiter = rcmgr.NewStaticLimiter(defaultLimits)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -71,6 +117,12 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan
|
|||||||
}})
|
}})
|
||||||
|
|
||||||
return mgr, nil
|
return mgr, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func logScale(val int) int {
|
||||||
|
bitlen := bits.Len(uint(val))
|
||||||
|
return 1 << bitlen
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {
|
func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {
|
||||||
|
Loading…
Reference in New Issue
Block a user