Merge pull request #8089 from filecoin-project/feat/rcmgr-metrics

Resource Manager Metrics
This commit is contained in:
Łukasz Magiera 2022-02-15 14:40:44 +01:00 committed by GitHub
commit 48dea93c7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 230 additions and 2 deletions

2
go.mod
View File

@ -118,7 +118,7 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.6.1
github.com/libp2p/go-libp2p-quic-transport v0.16.1
github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-libp2p-resource-manager v0.1.3
github.com/libp2p/go-libp2p-resource-manager v0.1.4
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/libp2p/go-libp2p-swarm v0.10.1
github.com/libp2p/go-libp2p-tls v0.3.1

3
go.sum
View File

@ -1158,8 +1158,9 @@ github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGd
github.com/libp2p/go-libp2p-record v0.1.3 h1:R27hoScIhQf/A8XJZ8lYpnqh9LatJ5YbHs28kCIfql0=
github.com/libp2p/go-libp2p-record v0.1.3/go.mod h1:yNUff/adKIfPnYQXgp6FQmNu3gLJ6EMg7+/vv2+9pY4=
github.com/libp2p/go-libp2p-resource-manager v0.1.0/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-resource-manager v0.1.3 h1:Umf0tW6WNXSb6Uoma0YT56azB5iikL/aeGAP7s7+f5o=
github.com/libp2p/go-libp2p-resource-manager v0.1.3/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-resource-manager v0.1.4 h1:RcxMD0pytOUimx3BqTVs6IqItb3H5Qg44SD7XyT68lw=
github.com/libp2p/go-libp2p-resource-manager v0.1.4/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-routing v0.0.1/go.mod h1:N51q3yTr4Zdr7V8Jt2JIktVU+3xBBylx1MZeVA6t1Ys=
github.com/libp2p/go-libp2p-routing v0.1.0/go.mod h1:zfLhI1RI8RLEzmEaaPwzonRvXeeSHddONWkcTcB54nE=
github.com/libp2p/go-libp2p-routing-helpers v0.2.3 h1:xY61alxJ6PurSi+MXbywZpelvuU4U4p/gPTxjqCqTzY=

View File

@ -47,6 +47,12 @@ var (
WorkerHostname, _ = tag.NewKey("worker_hostname")
StorageID, _ = tag.NewKey("storage_id")
SectorState, _ = tag.NewKey("sector_state")
// rcmgr
ServiceID, _ = tag.NewKey("svc")
ProtocolID, _ = tag.NewKey("proto")
Direction, _ = tag.NewKey("direction")
UseFD, _ = tag.NewKey("use_fd")
)
// Measures
@ -143,6 +149,22 @@ var (
SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless)
SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless)
SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless)
// rcmgr
RcmgrAllowConn = stats.Int64("rcmgr/allow_conn", "Number of allowed connections", stats.UnitDimensionless)
RcmgrBlockConn = stats.Int64("rcmgr/block_conn", "Number of blocked connections", stats.UnitDimensionless)
RcmgrAllowStream = stats.Int64("rcmgr/allow_stream", "Number of allowed streams", stats.UnitDimensionless)
RcmgrBlockStream = stats.Int64("rcmgr/block_stream", "Number of blocked streams", stats.UnitDimensionless)
RcmgrAllowPeer = stats.Int64("rcmgr/allow_peer", "Number of allowed peer connections", stats.UnitDimensionless)
RcmgrBlockPeer = stats.Int64("rcmgr/block_peer", "Number of blocked peer connections", stats.UnitDimensionless)
RcmgrAllowProto = stats.Int64("rcmgr/allow_proto", "Number of allowed streams attached to a protocol", stats.UnitDimensionless)
RcmgrBlockProto = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol", stats.UnitDimensionless)
RcmgrBlockProtoPeer = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol for a specific peer", stats.UnitDimensionless)
RcmgrAllowSvc = stats.Int64("rcmgr/allow_svc", "Number of allowed streams attached to a service", stats.UnitDimensionless)
RcmgrBlockSvc = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service", stats.UnitDimensionless)
RcmgrBlockSvcPeer = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service for a specific peer", stats.UnitDimensionless)
RcmgrAllowMem = stats.Int64("rcmgr/allow_mem", "Number of allowed memory reservations", stats.UnitDimensionless)
RcmgrBlockMem = stats.Int64("rcmgr/block_mem", "Number of blocked memory reservations", stats.UnitDimensionless)
)
var (
@ -496,6 +518,76 @@ var (
Measure: GraphsyncSendingPeersPending,
Aggregation: view.LastValue(),
}
// rcmgr
RcmgrAllowConnView = &view.View{
Measure: RcmgrAllowConn,
Aggregation: view.Count(),
TagKeys: []tag.Key{Direction, UseFD},
}
RcmgrBlockConnView = &view.View{
Measure: RcmgrBlockConn,
Aggregation: view.Count(),
TagKeys: []tag.Key{Direction, UseFD},
}
RcmgrAllowStreamView = &view.View{
Measure: RcmgrAllowStream,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID, Direction},
}
RcmgrBlockStreamView = &view.View{
Measure: RcmgrBlockStream,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID, Direction},
}
RcmgrAllowPeerView = &view.View{
Measure: RcmgrAllowPeer,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID},
}
RcmgrBlockPeerView = &view.View{
Measure: RcmgrBlockPeer,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID},
}
RcmgrAllowProtoView = &view.View{
Measure: RcmgrAllowProto,
Aggregation: view.Count(),
TagKeys: []tag.Key{ProtocolID},
}
RcmgrBlockProtoView = &view.View{
Measure: RcmgrBlockProto,
Aggregation: view.Count(),
TagKeys: []tag.Key{ProtocolID},
}
RcmgrBlockProtoPeerView = &view.View{
Measure: RcmgrBlockProtoPeer,
Aggregation: view.Count(),
TagKeys: []tag.Key{ProtocolID, PeerID},
}
RcmgrAllowSvcView = &view.View{
Measure: RcmgrAllowSvc,
Aggregation: view.Count(),
TagKeys: []tag.Key{ServiceID},
}
RcmgrBlockSvcView = &view.View{
Measure: RcmgrBlockSvc,
Aggregation: view.Count(),
TagKeys: []tag.Key{ServiceID},
}
RcmgrBlockSvcPeerView = &view.View{
Measure: RcmgrBlockSvcPeer,
Aggregation: view.Count(),
TagKeys: []tag.Key{ServiceID, PeerID},
}
RcmgrAllowMemView = &view.View{
Measure: RcmgrAllowMem,
Aggregation: view.Count(),
}
RcmgrBlockMemView = &view.View{
Measure: RcmgrBlockMem,
Aggregation: view.Count(),
}
)
// DefaultViews is an array of OpenCensus views for metric gathering purposes
@ -517,6 +609,21 @@ var DefaultViews = func() []*view.View {
GraphsyncSendingTotalMemoryAllocatedView,
GraphsyncSendingTotalPendingAllocationsView,
GraphsyncSendingPeersPendingView,
RcmgrAllowConnView,
RcmgrBlockConnView,
RcmgrAllowStreamView,
RcmgrBlockStreamView,
RcmgrAllowPeerView,
RcmgrBlockPeerView,
RcmgrAllowProtoView,
RcmgrBlockProtoView,
RcmgrBlockProtoPeerView,
RcmgrAllowSvcView,
RcmgrBlockSvcView,
RcmgrBlockSvcPeerView,
RcmgrAllowMemView,
RcmgrBlockMemView,
}
views = append(views, blockstore.DefaultViews...)
views = append(views, rpcmetrics.DefaultViews...)

View File

@ -11,9 +11,15 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/repo"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
@ -43,6 +49,8 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan
// TODO: also set appropriate default limits for lotus protocols
libp2p.SetDefaultServiceLimits(limiter)
opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{}))
if os.Getenv("LOTUS_DEBUG_RCMGR") != "" {
debugPath := filepath.Join(repoPath, "debug")
if err := os.MkdirAll(debugPath, 0755); err != nil {
@ -70,3 +78,115 @@ func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {
Opts: []libp2p.Option{libp2p.ResourceManager(mgr)},
}
}
type rcmgrMetrics struct{}
func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
}
if usefd {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false"))
}
stats.Record(ctx, metrics.RcmgrAllowConn.M(1))
}
func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
}
if usefd {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false"))
}
stats.Record(ctx, metrics.RcmgrBlockConn.M(1))
}
func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
}
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
stats.Record(ctx, metrics.RcmgrAllowStream.M(1))
}
func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
}
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
stats.Record(ctx, metrics.RcmgrBlockStream.M(1))
}
func (r rcmgrMetrics) AllowPeer(p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
stats.Record(ctx, metrics.RcmgrAllowPeer.M(1))
}
func (r rcmgrMetrics) BlockPeer(p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
stats.Record(ctx, metrics.RcmgrBlockPeer.M(1))
}
func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrAllowProto.M(1))
}
func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrBlockProto.M(1))
}
func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1))
}
func (r rcmgrMetrics) AllowService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrAllowSvc.M(1))
}
func (r rcmgrMetrics) BlockService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrBlockSvc.M(1))
}
func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1))
}
func (r rcmgrMetrics) AllowMemory(size int) {
stats.Record(context.Background(), metrics.RcmgrAllowMem.M(1))
}
func (r rcmgrMetrics) BlockMemory(size int) {
stats.Record(context.Background(), metrics.RcmgrBlockMem.M(1))
}