From a9ec40884491533e923569ecb32cf566803d88fb Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 14 Feb 2022 19:46:05 +0200 Subject: [PATCH] collect resource manager metrics --- metrics/metrics.go | 107 +++++++++++++++++++++++++++++++++ node/modules/lp2p/rcmgr.go | 120 +++++++++++++++++++++++++++++++++++++ 2 files changed, 227 insertions(+) diff --git a/metrics/metrics.go b/metrics/metrics.go index b4032bb1d..400e76537 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -47,6 +47,12 @@ var ( WorkerHostname, _ = tag.NewKey("worker_hostname") StorageID, _ = tag.NewKey("storage_id") SectorState, _ = tag.NewKey("sector_state") + + // rcmgr + ServiceID, _ = tag.NewKey("svc") + ProtocolID, _ = tag.NewKey("proto") + Direction, _ = tag.NewKey("direction") + UseFD, _ = tag.NewKey("use_fd") ) // Measures @@ -143,6 +149,22 @@ var ( SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless) SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless) SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless) + + // rcmgr + RcmgrAllowConn = stats.Int64("rcmgr/allow_conn", "Number of allowed connections", stats.UnitDimensionless) + RcmgrBlockConn = stats.Int64("rcmgr/block_conn", "Number of blocked connections", stats.UnitDimensionless) + RcmgrAllowStream = stats.Int64("rcmgr/allow_stream", "Number of allowed streams", stats.UnitDimensionless) + RcmgrBlockStream = stats.Int64("rcmgr/block_stream", "Number of blocked streams", stats.UnitDimensionless) + RcmgrAllowPeer = stats.Int64("rcmgr/allow_peer", "Number of allowed peer connections", stats.UnitDimensionless) + RcmgrBlockPeer = stats.Int64("rcmgr/block_peer", "Number of blocked peer connections", stats.UnitDimensionless) + RcmgrAllowProto = stats.Int64("rcmgr/allow_proto", "Number of allowed streams attached to a protocol", stats.UnitDimensionless) + RcmgrBlockProto = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol", stats.UnitDimensionless) + RcmgrBlockProtoPeer = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol for a specific peer", stats.UnitDimensionless) + RcmgrAllowSvc = stats.Int64("rcmgr/allow_svc", "Number of allowed streams attached to a service", stats.UnitDimensionless) + RcmgrBlockSvc = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service", stats.UnitDimensionless) + RcmgrBlockSvcPeer = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service for a specific peer", stats.UnitDimensionless) + RcmgrAllowMem = stats.Int64("rcmgr/allow_mem", "Number of allowed memory reservations", stats.UnitDimensionless) + RcmgrBlockMem = stats.Int64("rcmgr/block_mem", "Number of blocked memory reservations", stats.UnitDimensionless) ) var ( @@ -496,6 +518,76 @@ var ( Measure: GraphsyncSendingPeersPending, Aggregation: view.LastValue(), } + + // rcmgr + RcmgrAllowConnView = &view.View{ + Measure: RcmgrAllowConn, + Aggregation: view.Count(), + TagKeys: []tag.Key{Direction, UseFD}, + } + RcmgrBlockConnView = &view.View{ + Measure: RcmgrBlockConn, + Aggregation: view.Count(), + TagKeys: []tag.Key{Direction, UseFD}, + } + RcmgrAllowStreamView = &view.View{ + Measure: RcmgrAllowStream, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID, Direction}, + } + RcmgrBlockStreamView = &view.View{ + Measure: RcmgrBlockStream, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID, Direction}, + } + RcmgrAllowPeerView = &view.View{ + Measure: RcmgrAllowPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID}, + } + RcmgrBlockPeerView = &view.View{ + Measure: RcmgrBlockPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID}, + } + RcmgrAllowProtoView = &view.View{ + Measure: RcmgrAllowProto, + Aggregation: view.Count(), + TagKeys: []tag.Key{ProtocolID}, + } + RcmgrBlockProtoView = &view.View{ + Measure: RcmgrBlockProto, + Aggregation: view.Count(), + TagKeys: []tag.Key{ProtocolID}, + } + RcmgrBlockProtoPeerView = &view.View{ + Measure: RcmgrBlockProtoPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{ProtocolID, PeerID}, + } + RcmgrAllowSvcView = &view.View{ + Measure: RcmgrAllowSvc, + Aggregation: view.Count(), + TagKeys: []tag.Key{ServiceID}, + } + RcmgrBlockSvcView = &view.View{ + Measure: RcmgrBlockSvc, + Aggregation: view.Count(), + TagKeys: []tag.Key{ServiceID}, + } + RcmgrBlockSvcPeerView = &view.View{ + Measure: RcmgrBlockSvcPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{ServiceID, PeerID}, + } + RcmgrAllowMemView = &view.View{ + Measure: RcmgrAllowMem, + Aggregation: view.Count(), + } + RcmgrBlockMemView = &view.View{ + Measure: RcmgrBlockMem, + Aggregation: view.Count(), + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes @@ -517,6 +609,21 @@ var DefaultViews = func() []*view.View { GraphsyncSendingTotalMemoryAllocatedView, GraphsyncSendingTotalPendingAllocationsView, GraphsyncSendingPeersPendingView, + + RcmgrAllowConnView, + RcmgrBlockConnView, + RcmgrAllowStreamView, + RcmgrBlockStreamView, + RcmgrAllowPeerView, + RcmgrBlockPeerView, + RcmgrAllowProtoView, + RcmgrBlockProtoView, + RcmgrBlockProtoPeerView, + RcmgrAllowSvcView, + RcmgrBlockSvcView, + RcmgrBlockSvcPeerView, + RcmgrAllowMemView, + RcmgrBlockMemView, } views = append(views, blockstore.DefaultViews...) views = append(views, rpcmetrics.DefaultViews...) diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 8b286ff5e..71ed4e754 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -11,9 +11,15 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/repo" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" ) func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { @@ -43,6 +49,8 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan // TODO: also set appropriate default limits for lotus protocols libp2p.SetDefaultServiceLimits(limiter) + opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{})) + if os.Getenv("LOTUS_DEBUG_RCMGR") != "" { debugPath := filepath.Join(repoPath, "debug") if err := os.MkdirAll(debugPath, 0755); err != nil { @@ -70,3 +78,115 @@ func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts { Opts: []libp2p.Option{libp2p.ResourceManager(mgr)}, } } + +type rcmgrMetrics struct{} + +func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + if usefd { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false")) + } + stats.Record(ctx, metrics.RcmgrAllowConn.M(1)) +} + +func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + if usefd { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false")) + } + stats.Record(ctx, metrics.RcmgrBlockConn.M(1)) +} + +func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty())) + stats.Record(ctx, metrics.RcmgrAllowStream.M(1)) +} + +func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty())) + stats.Record(ctx, metrics.RcmgrBlockStream.M(1)) +} + +func (r rcmgrMetrics) AllowPeer(p peer.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty())) + stats.Record(ctx, metrics.RcmgrAllowPeer.M(1)) +} + +func (r rcmgrMetrics) BlockPeer(p peer.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty())) + stats.Record(ctx, metrics.RcmgrBlockPeer.M(1)) +} + +func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) + stats.Record(ctx, metrics.RcmgrAllowProto.M(1)) +} + +func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) + stats.Record(ctx, metrics.RcmgrBlockProto.M(1)) +} + +func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) + ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty())) + stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1)) +} + +func (r rcmgrMetrics) AllowService(svc string) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) + stats.Record(ctx, metrics.RcmgrAllowSvc.M(1)) +} + +func (r rcmgrMetrics) BlockService(svc string) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) + stats.Record(ctx, metrics.RcmgrBlockSvc.M(1)) +} + +func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) + ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty())) + stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1)) +} + +func (r rcmgrMetrics) AllowMemory(size int) { + stats.Record(context.Background(), metrics.RcmgrAllowMem.M(1)) +} + +func (r rcmgrMetrics) BlockMemory(size int) { + stats.Record(context.Background(), metrics.RcmgrBlockMem.M(1)) +}