improve resource manager integration

- add opt-in env var to control instantation, until we are comfortable with testing to enble by default.
- adjust default limits if the connection manager high mark is higher than the default inbound conn limit.
This commit is contained in:
vyzo 2022-03-15 09:43:18 +02:00
parent 362c73bfbd
commit 53c525f0ed
2 changed files with 94 additions and 42 deletions

View File

@ -222,7 +222,7 @@ var LibP2P = Options(
Override(ConnGaterKey, lp2p.ConnGaterOption), Override(ConnGaterKey, lp2p.ConnGaterOption),
// Services (resource management) // Services (resource management)
Override(new(network.ResourceManager), lp2p.ResourceManager), Override(new(network.ResourceManager), lp2p.ResourceManager(200)),
Override(ResourceManagerKey, lp2p.ResourceManagerOption), Override(ResourceManagerKey, lp2p.ResourceManagerOption),
) )
@ -282,6 +282,7 @@ func ConfigCommon(cfg *config.Common, enableLibp2pNode bool) Option {
cfg.Libp2p.ConnMgrHigh, cfg.Libp2p.ConnMgrHigh,
time.Duration(cfg.Libp2p.ConnMgrGrace), time.Duration(cfg.Libp2p.ConnMgrGrace),
cfg.Libp2p.ProtectedPeers)), cfg.Libp2p.ProtectedPeers)),
Override(new(network.ResourceManager), lp2p.ResourceManager(cfg.Libp2p.ConnMgrHigh)),
Override(new(*pubsub.PubSub), lp2p.GossipSub), Override(new(*pubsub.PubSub), lp2p.GossipSub),
Override(new(*config.Pubsub), &cfg.Pubsub), Override(new(*config.Pubsub), &cfg.Pubsub),

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/bits"
"os" "os"
"path/filepath" "path/filepath"
@ -15,6 +16,8 @@ import (
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager" rcmgr "github.com/libp2p/go-libp2p-resource-manager"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
@ -22,55 +25,103 @@ import (
"go.opencensus.io/tag" "go.opencensus.io/tag"
) )
func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
var limiter *rcmgr.BasicLimiter return func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
var opts []rcmgr.Option envvar := os.Getenv("LOTUS_RCMGR")
if envvar == "" || envvar == "0" {
// TODO opt-in for now -- flip this to enabled by default once we are comfortable with testing
log.Info("libp2p resource manager is disabled")
return network.NullResourceManager, nil
}
repoPath := repo.Path() log.Info("libp2p resource manager is enabled")
// enable debug logs for rcmgr
logging.SetLogLevel("rcmgr", "debug")
// create limiter -- parse $repo/limits.json if exists // Adjust default limits
limitsFile := filepath.Join(repoPath, "limits.json") // - give it more memory, up to 4G, min of 1G
limitsIn, err := os.Open(limitsFile) // - if maxconns are too high, adjust Conn/FD/Stream limits
switch { defaultLimits := rcmgr.DefaultLimits.WithSystemMemory(.125, 1<<30, 4<<30)
case err == nil: maxconns := int(connMgrHi)
defer limitsIn.Close() //nolint:errcheck if maxconns > defaultLimits.SystemBaseLimit.ConnsInbound {
limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitsIn) defaultLimits.SystemBaseLimit.ConnsInbound = logScale(maxconns)
defaultLimits.SystemBaseLimit.ConnsOutbound = logScale(maxconns)
defaultLimits.SystemBaseLimit.Conns = logScale(2 * maxconns)
defaultLimits.SystemBaseLimit.StreamsInbound = logScale(16 * maxconns)
defaultLimits.SystemBaseLimit.StreamsOutbound = logScale(64 * maxconns)
defaultLimits.SystemBaseLimit.Streams = logScale(64 * maxconns)
if maxconns > defaultLimits.SystemBaseLimit.FD {
defaultLimits.SystemBaseLimit.FD = logScale(maxconns)
}
defaultLimits.ServiceBaseLimit.StreamsInbound = logScale(8 * maxconns)
defaultLimits.ServiceBaseLimit.StreamsOutbound = logScale(32 * maxconns)
defaultLimits.ServiceBaseLimit.Streams = logScale(32 * maxconns)
defaultLimits.ProtocolBaseLimit.StreamsInbound = logScale(8 * maxconns)
defaultLimits.ProtocolBaseLimit.StreamsOutbound = logScale(32 * maxconns)
defaultLimits.ProtocolBaseLimit.Streams = logScale(32 * maxconns)
log.Info("adjusted default resource manager limits")
}
// initialize
var limiter *rcmgr.BasicLimiter
var opts []rcmgr.Option
repoPath := repo.Path()
// create limiter -- parse $repo/limits.json if exists
limitsFile := filepath.Join(repoPath, "limits.json")
limitsIn, err := os.Open(limitsFile)
switch {
case err == nil:
defer limitsIn.Close() //nolint:errcheck
limiter, err = rcmgr.NewLimiterFromJSON(limitsIn, defaultLimits)
if err != nil {
return nil, fmt.Errorf("error parsing limit file: %w", err)
}
case errors.Is(err, os.ErrNotExist):
limiter = rcmgr.NewStaticLimiter(defaultLimits)
default:
return nil, err
}
// TODO: also set appropriate default limits for lotus protocols
libp2p.SetDefaultServiceLimits(limiter)
opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{}))
if os.Getenv("LOTUS_DEBUG_RCMGR") != "" {
debugPath := filepath.Join(repoPath, "debug")
if err := os.MkdirAll(debugPath, 0755); err != nil {
return nil, fmt.Errorf("error creating debug directory: %w", err)
}
traceFile := filepath.Join(debugPath, "rcmgr.json.gz")
opts = append(opts, rcmgr.WithTrace(traceFile))
}
mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil { if err != nil {
return nil, fmt.Errorf("error parsing limit file: %w", err) return nil, fmt.Errorf("error creating resource manager: %w", err)
} }
case errors.Is(err, os.ErrNotExist): lc.Append(fx.Hook{
limiter = rcmgr.NewDefaultLimiter() OnStop: func(_ context.Context) error {
return mgr.Close()
}})
default: return mgr, nil
return nil, err
} }
}
// TODO: also set appropriate default limits for lotus protocols func logScale(val int) int {
libp2p.SetDefaultServiceLimits(limiter) bitlen := bits.Len(uint(val))
return 1 << bitlen
opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{}))
if os.Getenv("LOTUS_DEBUG_RCMGR") != "" {
debugPath := filepath.Join(repoPath, "debug")
if err := os.MkdirAll(debugPath, 0755); err != nil {
return nil, fmt.Errorf("error creating debug directory: %w", err)
}
traceFile := filepath.Join(debugPath, "rcmgr.json.gz")
opts = append(opts, rcmgr.WithTrace(traceFile))
}
mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
return nil, fmt.Errorf("error creating resource manager: %w", err)
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return mgr.Close()
}})
return mgr, nil
} }
func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts { func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {