collect resource manager metrics
This commit is contained in:
parent
b260c849f7
commit
a9ec408844
@ -47,6 +47,12 @@ var (
|
|||||||
WorkerHostname, _ = tag.NewKey("worker_hostname")
|
WorkerHostname, _ = tag.NewKey("worker_hostname")
|
||||||
StorageID, _ = tag.NewKey("storage_id")
|
StorageID, _ = tag.NewKey("storage_id")
|
||||||
SectorState, _ = tag.NewKey("sector_state")
|
SectorState, _ = tag.NewKey("sector_state")
|
||||||
|
|
||||||
|
// rcmgr
|
||||||
|
ServiceID, _ = tag.NewKey("svc")
|
||||||
|
ProtocolID, _ = tag.NewKey("proto")
|
||||||
|
Direction, _ = tag.NewKey("direction")
|
||||||
|
UseFD, _ = tag.NewKey("use_fd")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Measures
|
// Measures
|
||||||
@ -143,6 +149,22 @@ var (
|
|||||||
SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless)
|
SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless)
|
||||||
SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless)
|
SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless)
|
||||||
SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless)
|
SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless)
|
||||||
|
|
||||||
|
// rcmgr
|
||||||
|
RcmgrAllowConn = stats.Int64("rcmgr/allow_conn", "Number of allowed connections", stats.UnitDimensionless)
|
||||||
|
RcmgrBlockConn = stats.Int64("rcmgr/block_conn", "Number of blocked connections", stats.UnitDimensionless)
|
||||||
|
RcmgrAllowStream = stats.Int64("rcmgr/allow_stream", "Number of allowed streams", stats.UnitDimensionless)
|
||||||
|
RcmgrBlockStream = stats.Int64("rcmgr/block_stream", "Number of blocked streams", stats.UnitDimensionless)
|
||||||
|
RcmgrAllowPeer = stats.Int64("rcmgr/allow_peer", "Number of allowed peer connections", stats.UnitDimensionless)
|
||||||
|
RcmgrBlockPeer = stats.Int64("rcmgr/block_peer", "Number of blocked peer connections", stats.UnitDimensionless)
|
||||||
|
RcmgrAllowProto = stats.Int64("rcmgr/allow_proto", "Number of allowed streams attached to a protocol", stats.UnitDimensionless)
|
||||||
|
RcmgrBlockProto = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol", stats.UnitDimensionless)
|
||||||
|
RcmgrBlockProtoPeer = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol for a specific peer", stats.UnitDimensionless)
|
||||||
|
RcmgrAllowSvc = stats.Int64("rcmgr/allow_svc", "Number of allowed streams attached to a service", stats.UnitDimensionless)
|
||||||
|
RcmgrBlockSvc = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service", stats.UnitDimensionless)
|
||||||
|
RcmgrBlockSvcPeer = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service for a specific peer", stats.UnitDimensionless)
|
||||||
|
RcmgrAllowMem = stats.Int64("rcmgr/allow_mem", "Number of allowed memory reservations", stats.UnitDimensionless)
|
||||||
|
RcmgrBlockMem = stats.Int64("rcmgr/block_mem", "Number of blocked memory reservations", stats.UnitDimensionless)
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -496,6 +518,76 @@ var (
|
|||||||
Measure: GraphsyncSendingPeersPending,
|
Measure: GraphsyncSendingPeersPending,
|
||||||
Aggregation: view.LastValue(),
|
Aggregation: view.LastValue(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rcmgr
|
||||||
|
RcmgrAllowConnView = &view.View{
|
||||||
|
Measure: RcmgrAllowConn,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{Direction, UseFD},
|
||||||
|
}
|
||||||
|
RcmgrBlockConnView = &view.View{
|
||||||
|
Measure: RcmgrBlockConn,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{Direction, UseFD},
|
||||||
|
}
|
||||||
|
RcmgrAllowStreamView = &view.View{
|
||||||
|
Measure: RcmgrAllowStream,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{PeerID, Direction},
|
||||||
|
}
|
||||||
|
RcmgrBlockStreamView = &view.View{
|
||||||
|
Measure: RcmgrBlockStream,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{PeerID, Direction},
|
||||||
|
}
|
||||||
|
RcmgrAllowPeerView = &view.View{
|
||||||
|
Measure: RcmgrAllowPeer,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{PeerID},
|
||||||
|
}
|
||||||
|
RcmgrBlockPeerView = &view.View{
|
||||||
|
Measure: RcmgrBlockPeer,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{PeerID},
|
||||||
|
}
|
||||||
|
RcmgrAllowProtoView = &view.View{
|
||||||
|
Measure: RcmgrAllowProto,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{ProtocolID},
|
||||||
|
}
|
||||||
|
RcmgrBlockProtoView = &view.View{
|
||||||
|
Measure: RcmgrBlockProto,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{ProtocolID},
|
||||||
|
}
|
||||||
|
RcmgrBlockProtoPeerView = &view.View{
|
||||||
|
Measure: RcmgrBlockProtoPeer,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{ProtocolID, PeerID},
|
||||||
|
}
|
||||||
|
RcmgrAllowSvcView = &view.View{
|
||||||
|
Measure: RcmgrAllowSvc,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{ServiceID},
|
||||||
|
}
|
||||||
|
RcmgrBlockSvcView = &view.View{
|
||||||
|
Measure: RcmgrBlockSvc,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{ServiceID},
|
||||||
|
}
|
||||||
|
RcmgrBlockSvcPeerView = &view.View{
|
||||||
|
Measure: RcmgrBlockSvcPeer,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
TagKeys: []tag.Key{ServiceID, PeerID},
|
||||||
|
}
|
||||||
|
RcmgrAllowMemView = &view.View{
|
||||||
|
Measure: RcmgrAllowMem,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
}
|
||||||
|
RcmgrBlockMemView = &view.View{
|
||||||
|
Measure: RcmgrBlockMem,
|
||||||
|
Aggregation: view.Count(),
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultViews is an array of OpenCensus views for metric gathering purposes
|
// DefaultViews is an array of OpenCensus views for metric gathering purposes
|
||||||
@ -517,6 +609,21 @@ var DefaultViews = func() []*view.View {
|
|||||||
GraphsyncSendingTotalMemoryAllocatedView,
|
GraphsyncSendingTotalMemoryAllocatedView,
|
||||||
GraphsyncSendingTotalPendingAllocationsView,
|
GraphsyncSendingTotalPendingAllocationsView,
|
||||||
GraphsyncSendingPeersPendingView,
|
GraphsyncSendingPeersPendingView,
|
||||||
|
|
||||||
|
RcmgrAllowConnView,
|
||||||
|
RcmgrBlockConnView,
|
||||||
|
RcmgrAllowStreamView,
|
||||||
|
RcmgrBlockStreamView,
|
||||||
|
RcmgrAllowPeerView,
|
||||||
|
RcmgrBlockPeerView,
|
||||||
|
RcmgrAllowProtoView,
|
||||||
|
RcmgrBlockProtoView,
|
||||||
|
RcmgrBlockProtoPeerView,
|
||||||
|
RcmgrAllowSvcView,
|
||||||
|
RcmgrBlockSvcView,
|
||||||
|
RcmgrBlockSvcPeerView,
|
||||||
|
RcmgrAllowMemView,
|
||||||
|
RcmgrBlockMemView,
|
||||||
}
|
}
|
||||||
views = append(views, blockstore.DefaultViews...)
|
views = append(views, blockstore.DefaultViews...)
|
||||||
views = append(views, rpcmetrics.DefaultViews...)
|
views = append(views, rpcmetrics.DefaultViews...)
|
||||||
|
@ -11,9 +11,15 @@ import (
|
|||||||
|
|
||||||
"github.com/libp2p/go-libp2p"
|
"github.com/libp2p/go-libp2p"
|
||||||
"github.com/libp2p/go-libp2p-core/network"
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
"github.com/libp2p/go-libp2p-core/protocol"
|
||||||
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
|
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
|
||||||
|
"go.opencensus.io/stats"
|
||||||
|
"go.opencensus.io/tag"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
|
func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
|
||||||
@ -43,6 +49,8 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan
|
|||||||
// TODO: also set appropriate default limits for lotus protocols
|
// TODO: also set appropriate default limits for lotus protocols
|
||||||
libp2p.SetDefaultServiceLimits(limiter)
|
libp2p.SetDefaultServiceLimits(limiter)
|
||||||
|
|
||||||
|
opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{}))
|
||||||
|
|
||||||
if os.Getenv("LOTUS_DEBUG_RCMGR") != "" {
|
if os.Getenv("LOTUS_DEBUG_RCMGR") != "" {
|
||||||
debugPath := filepath.Join(repoPath, "debug")
|
debugPath := filepath.Join(repoPath, "debug")
|
||||||
if err := os.MkdirAll(debugPath, 0755); err != nil {
|
if err := os.MkdirAll(debugPath, 0755); err != nil {
|
||||||
@ -70,3 +78,115 @@ func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {
|
|||||||
Opts: []libp2p.Option{libp2p.ResourceManager(mgr)},
|
Opts: []libp2p.Option{libp2p.ResourceManager(mgr)},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rcmgrMetrics struct{}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) {
|
||||||
|
ctx := context.Background()
|
||||||
|
if dir == network.DirInbound {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
|
||||||
|
} else {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
|
||||||
|
}
|
||||||
|
if usefd {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true"))
|
||||||
|
} else {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false"))
|
||||||
|
}
|
||||||
|
stats.Record(ctx, metrics.RcmgrAllowConn.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) {
|
||||||
|
ctx := context.Background()
|
||||||
|
if dir == network.DirInbound {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
|
||||||
|
} else {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
|
||||||
|
}
|
||||||
|
if usefd {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true"))
|
||||||
|
} else {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false"))
|
||||||
|
}
|
||||||
|
stats.Record(ctx, metrics.RcmgrBlockConn.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) {
|
||||||
|
ctx := context.Background()
|
||||||
|
if dir == network.DirInbound {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
|
||||||
|
} else {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
|
||||||
|
}
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
|
||||||
|
stats.Record(ctx, metrics.RcmgrAllowStream.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) {
|
||||||
|
ctx := context.Background()
|
||||||
|
if dir == network.DirInbound {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
|
||||||
|
} else {
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
|
||||||
|
}
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
|
||||||
|
stats.Record(ctx, metrics.RcmgrBlockStream.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) AllowPeer(p peer.ID) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
|
||||||
|
stats.Record(ctx, metrics.RcmgrAllowPeer.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) BlockPeer(p peer.ID) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
|
||||||
|
stats.Record(ctx, metrics.RcmgrBlockPeer.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
|
||||||
|
stats.Record(ctx, metrics.RcmgrAllowProto.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
|
||||||
|
stats.Record(ctx, metrics.RcmgrBlockProto.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
|
||||||
|
stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) AllowService(svc string) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
|
||||||
|
stats.Record(ctx, metrics.RcmgrAllowSvc.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) BlockService(svc string) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
|
||||||
|
stats.Record(ctx, metrics.RcmgrBlockSvc.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
|
||||||
|
ctx, _ = tag.New(ctx, tag.Upsert(metrics.PeerID, p.Pretty()))
|
||||||
|
stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) AllowMemory(size int) {
|
||||||
|
stats.Record(context.Background(), metrics.RcmgrAllowMem.M(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r rcmgrMetrics) BlockMemory(size int) {
|
||||||
|
stats.Record(context.Background(), metrics.RcmgrBlockMem.M(1))
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user